diff options
author | Sage Weil <sage@newdream.net> | 2009-05-01 16:11:46 +0200 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2009-05-01 16:12:18 +0200 |
commit | be072bef3ee98070c97dffe83568f9faaaa2a3bc (patch) | |
tree | bbce3447d9d00c79051a60b72739b77043b38c48 | |
parent | msgr: clean up refs to static 'rank' (diff) | |
download | ceph-0.7.3.tar.xz ceph-0.7.3.zip |
msgr: kill static instance 'rank' of SimpleMessengerv0.7.3
-rw-r--r-- | src/ceph.cc | 3 | ||||
-rw-r--r-- | src/cfuse.cc | 7 | ||||
-rw-r--r-- | src/cmds.cc | 9 | ||||
-rw-r--r-- | src/cmon.cc | 12 | ||||
-rw-r--r-- | src/cosd.cc | 9 | ||||
-rw-r--r-- | src/csyn.cc | 7 | ||||
-rw-r--r-- | src/dumpjournal.cc | 1 | ||||
-rw-r--r-- | src/mon/MonClient.cc | 4 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 860 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 34 | ||||
-rw-r--r-- | src/testmsgr.cc | 1 |
11 files changed, 477 insertions, 470 deletions
diff --git a/src/ceph.cc b/src/ceph.cc index eff440dfe0a..5884b0e9ba5 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -587,12 +587,13 @@ int main(int argc, const char **argv, const char *envp[]) return -1; // start up network + SimpleMessenger rank; rank.bind(); messenger = rank.register_entity(entity_name_t::ADMIN()); messenger->set_dispatcher(&dispatcher); rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0)); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); if (watch) { lock.Lock(); diff --git a/src/cfuse.cc b/src/cfuse.cc index b502e6099d0..f44aed720d8 100644 --- a/src/cfuse.cc +++ b/src/cfuse.cc @@ -69,6 +69,7 @@ int main(int argc, const char **argv, const char *envp[]) { return -1; // start up network + SimpleMessenger rank; rank.bind(); cout << "bound to " << rank.get_rank_addr() << ", mounting ceph" << std::endl; @@ -76,9 +77,9 @@ int main(int argc, const char **argv, const char *envp[]) { rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); // start client client->init(); diff --git a/src/cmds.cc b/src/cmds.cc index a1603dd3171..2e76b687ea7 100644 --- a/src/cmds.cc +++ b/src/cmds.cc @@ -68,6 +68,7 @@ int main(int argc, const char **argv) if (mc.get_monmap(&monmap) < 0) return -1; + SimpleMessenger rank; rank.bind(); cout << "starting mds." << g_conf.id << " at " << rank.get_rank_addr() @@ -79,10 +80,10 @@ int main(int argc, const char **argv) if (!m) return 1; - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0)); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossless()); // mds does its own timeout/markdown + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless()); // mds does its own timeout/markdown rank.start(); diff --git a/src/cmon.cc b/src/cmon.cc index d84e217e8a2..d75e8610958 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -122,6 +122,8 @@ int main(int argc, const char **argv) } // bind + SimpleMessenger rank; + cout << "starting mon" << whoami << " at " << monmap.get_inst(whoami).addr << " mon_data " << g_conf.mon_data @@ -141,12 +143,12 @@ int main(int argc, const char **argv) rank.start(); // may daemonize - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_ADMIN, SimpleMessenger::Policy::lossy_fast_fail()); mon->init(); diff --git a/src/cosd.cc b/src/cosd.cc index c49908d0741..0ff53dee227 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -122,6 +122,7 @@ int main(int argc, const char **argv) } // start up network + SimpleMessenger rank; rank.bind(); cout << "starting osd" << whoami @@ -142,14 +143,14 @@ int main(int argc, const char **argv) if (!hbm) return 1; - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); // make a _reasonable_ effort to send acks/replies to requests, but // don't get carried away, as the sender may go away and we won't // ever hear about it. - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail()); rank.start(); diff --git a/src/csyn.cc b/src/csyn.cc index 58110fa6a69..62cd08a0bd3 100644 --- a/src/csyn.cc +++ b/src/csyn.cc @@ -56,12 +56,13 @@ int main(int argc, const char **argv, char *envp[]) return -1; // start up network + SimpleMessenger rank; rank.bind(); cout << "starting csyn at " << rank.get_rank_addr() << std::endl; - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); list<Client*> clients; list<SyntheticClient*> synclients; diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc index 158ff642ffc..00b39721027 100644 --- a/src/dumpjournal.cc +++ b/src/dumpjournal.cc @@ -88,6 +88,7 @@ int main(int argc, const char **argv, const char *envp[]) return -1; // start up network + SimpleMessenger rank; rank.bind(); g_conf.daemonize = false; // not us! rank.start(); diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 73888a98cea..65f79640839 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -34,7 +34,9 @@ int MonClient::probe_mon(MonMap *pmonmap) cerr << "couldn't parse ip:port(s) from '" << g_conf.mon_host << "'" << std::endl; return -1; } - + + SimpleMessenger rank; + rank.bind(); dout(1) << " connecting to monitor(s) at " << monaddrs << " ..." << dendl; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index b967ca0d830..22822405ed0 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -38,9 +38,9 @@ #define DOUT_SUBSYS ms #undef dout_prefix -#define dout_prefix _prefix() -static ostream& _prefix() { - return *_dout << dbeginl << pthread_self() << " -- " << rank.rank_addr << " "; +#define dout_prefix _prefix(rank) +static ostream& _prefix(SimpleMessenger *rank) { + return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " "; } @@ -53,8 +53,6 @@ static ostream& _prefix() { #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl; -Rank rank; - #ifdef DARWIN sig_t old_sigint_handler = 0; #else @@ -70,7 +68,7 @@ void noop_signal_handler(int s) //dout(0) << "blah_handler got " << s << dendl; } -int Rank::Accepter::bind(int64_t force_nonce) +int SimpleMessenger::Accepter::bind(int64_t force_nonce) { // bind to a socket dout(10) << "accepter.bind" << dendl; @@ -152,7 +150,7 @@ int Rank::Accepter::bind(int64_t force_nonce) return 0; } -int Rank::Accepter::start() +int SimpleMessenger::Accepter::start() { dout(1) << "accepter.start" << dendl; @@ -171,7 +169,7 @@ int Rank::Accepter::start() return 0; } -void *Rank::Accepter::entry() +void *SimpleMessenger::Accepter::entry() { dout(10) << "accepter starting" << dendl; @@ -229,7 +227,7 @@ void *Rank::Accepter::entry() return 0; } -void Rank::Accepter::stop() +void SimpleMessenger::Accepter::stop() { done = true; dout(10) << "stop sending SIGUSR1" << dendl; @@ -241,390 +239,13 @@ void Rank::Accepter::stop() -/******************************************** - * Rank - */ - - -/* - * note: assumes lock is held - */ -void Rank::reaper() -{ - dout(10) << "reaper" << dendl; - assert(lock.is_locked()); - - while (!pipe_reap_queue.empty()) { - Pipe *p = pipe_reap_queue.front(); - dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; - p->unregister_pipe(); - pipe_reap_queue.pop_front(); - assert(pipes.count(p)); - pipes.erase(p); - p->join(); - p->discard_queue(); - dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; - assert(p->sd < 0); - delete p; - dout(10) << "reaper deleted pipe " << p << dendl; - } -} - - -int Rank::bind(int64_t force_nonce) -{ - lock.Lock(); - if (started) { - dout(10) << "rank.bind already started" << dendl; - lock.Unlock(); - return -1; - } - dout(10) << "rank.bind" << dendl; - lock.Unlock(); - - // bind to a socket - return accepter.bind(force_nonce); -} - - -class C_Die : public Context { -public: - void finish(int) { - cerr << "die" << std::endl; - exit(1); - } -}; - -static void write_pid_file(int pid) -{ - if (!g_conf.pid_file) - return; - - int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644); - if (fd >= 0) { - char buf[20]; - int len = sprintf(buf, "%d\n", pid); - ::write(fd, buf, len); - ::close(fd); - } -} - -static void remove_pid_file() -{ - if (!g_conf.pid_file) - return; - - // only remove it if it has OUR pid in it! - int fd = ::open(g_conf.pid_file, O_RDONLY); - if (fd >= 0) { - char buf[20]; - ::read(fd, buf, 20); - ::close(fd); - int a = atoi(buf); - - if (a == getpid()) - ::unlink(g_conf.pid_file); - else - dout(0) << "strange, pid file " << g_conf.pid_file - << " has " << a << ", not expected " << getpid() - << dendl; - } -} - -int Rank::start(bool nodaemon) -{ - // register at least one entity, first! - assert(my_type >= 0); - - lock.Lock(); - if (started) { - dout(10) << "rank.start already started" << dendl; - lock.Unlock(); - return 0; - } - - dout(1) << "rank.start at " << rank_addr << dendl; - started = true; - lock.Unlock(); - - // daemonize? - if (g_conf.daemonize && !nodaemon) { - if (Thread::get_num_threads() > 0) { - derr(0) << "rank.start BUG: there are " << Thread::get_num_threads() - << " already started that will now die! call rank.start() sooner." - << dendl; - } - dout(1) << "rank.start daemonizing" << dendl; - - if (1) { - daemon(1, 0); - write_pid_file(getpid()); - } else { - pid_t pid = fork(); - if (pid) { - // i am parent - write_pid_file(pid); - ::close(0); - ::close(1); - ::close(2); - _exit(0); - } - } - - if (g_conf.chdir && g_conf.chdir[0]) { - ::mkdir(g_conf.chdir, 0700); - ::chdir(g_conf.chdir); - } - - _dout_rename_output_file(); - } else if (g_daemon) { - write_pid_file(getpid()); - } - - // some debug hackery? - if (g_conf.kill_after) - g_timer.add_event_after(g_conf.kill_after, new C_Die); - - // go! - accepter.start(); - return 0; -} - - -/* connect_rank - * NOTE: assumes rank.lock held. - */ -Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p) -{ - assert(lock.is_locked()); - assert(addr != rank_addr); - - dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; - - // create pipe - Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING); - pipe->policy = p; - pipe->peer_addr = addr; - pipe->start_writer(); - pipe->register_pipe(); - pipes.insert(pipe); - - return pipe; -} - - - - - - - - -/* register_entity - */ -Rank::Endpoint *Rank::register_entity(entity_name_t name) -{ - dout(10) << "register_entity " << name << dendl; - lock.Lock(); - - // create messenger - int erank = max_local; - Endpoint *msgr = new Endpoint(this, name, erank); - - // now i know my type. - if (my_type >= 0) - assert(my_type == name.type()); - else - my_type = name.type(); - - // add to directory - max_local++; - local.resize(max_local); - stopped.resize(max_local); - - msgr->get(); - local[erank] = msgr; - stopped[erank] = false; - msgr->_myinst.addr = rank_addr; - if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr) - msgr->need_addr = true; - msgr->_myinst.addr.erank = erank; - - dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr - << " need_addr=" << need_addr - << dendl; - - num_local++; - - lock.Unlock(); - return msgr; -} - - -void Rank::unregister_entity(Endpoint *msgr) -{ - lock.Lock(); - dout(10) << "unregister_entity " << msgr->get_myname() << dendl; - - // remove from local directory. - assert(msgr->my_rank >= 0); - assert(local[msgr->my_rank] == msgr); - local[msgr->my_rank] = 0; - stopped[msgr->my_rank] = true; - num_local--; - msgr->my_rank = -1; - - assert(msgr->nref.test() > 1); - msgr->put(); - - wait_cond.Signal(); - - lock.Unlock(); -} - - -void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy) -{ - const entity_name_t dest = m->get_dest(); - - assert(m->nref.test() == 0); - - m->get_header().mon_protocol = CEPH_MON_PROTOCOL; - m->get_header().monc_protocol = CEPH_MONC_PROTOCOL; - m->get_header().mds_protocol = CEPH_MDS_PROTOCOL; - m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL; - m->get_header().osd_protocol = CEPH_OSD_PROTOCOL; - m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL; - - // lookup - entity_addr_t dest_proc_addr = dest_addr; - dest_proc_addr.erank = 0; - - lock.Lock(); - { - // local? - if (rank_addr.is_local_to(dest_addr)) { - if (dest_addr.erank < max_local && local[dest_addr.erank]) { - // local - dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; - local[dest_addr.erank]->queue_message(m); - } else { - derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl; - //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. - delete m; - } - } - else { - // remote. - Pipe *pipe = 0; - if (rank_pipe.count( dest_proc_addr )) { - // connected? - pipe = rank_pipe[ dest_proc_addr ]; - pipe->lock.Lock(); - if (pipe->state == Pipe::STATE_CLOSED) { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; - pipe->unregister_pipe(); - pipe->lock.Unlock(); - pipe = 0; - } else { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl; - - // if this pipe was created by an incoming connection, but we haven't received - // a message yet, then it won't have the policy set. - if (pipe->get_out_seq() == 0) - pipe->policy = policy_map[m->get_dest().type()]; - - pipe->_send(m); - pipe->lock.Unlock(); - } - } - if (!pipe) { - if (lazy) { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl; - delete m; - } else { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl; - // not connected. - pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]); - pipe->send(m); - } - } - } - } - - lock.Unlock(); -} - - - - - -void Rank::wait() -{ - lock.Lock(); - while (1) { - // reap dead pipes - reaper(); - - if (num_local == 0) { - dout(10) << "wait: everything stopped" << dendl; - break; // everything stopped. - } else { - dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl; - } - - wait_cond.Wait(lock); - } - lock.Unlock(); - - // done! clean up. - dout(20) << "wait: stopping accepter thread" << dendl; - accepter.stop(); - dout(20) << "wait: stopped accepter thread" << dendl; - - // close+reap all pipes - lock.Lock(); - { - dout(10) << "wait: closing pipes" << dendl; - list<Pipe*> toclose; - for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin(); - i != rank_pipe.end(); - i++) - toclose.push_back(i->second); - for (list<Pipe*>::iterator i = toclose.begin(); - i != toclose.end(); - i++) { - (*i)->unregister_pipe(); - (*i)->lock.Lock(); - (*i)->stop(); - (*i)->lock.Unlock(); - } - - reaper(); - dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; - while (!pipes.empty()) { - wait_cond.Wait(lock); - reaper(); - } - } - lock.Unlock(); - - dout(10) << "wait: done." << dendl; - dout(1) << "shutdown complete." << dendl; - remove_pid_file(); - started = false; - my_type = -1; -} - - - - /********************************** * Endpoint */ -void Rank::Endpoint::dispatch_entry() +void SimpleMessenger::Endpoint::dispatch_entry() { lock.Lock(); while (!stop) { @@ -699,7 +320,7 @@ void Rank::Endpoint::dispatch_entry() put(); } -void Rank::Endpoint::ready() +void SimpleMessenger::Endpoint::ready() { dout(10) << "ready " << get_myaddr() << dendl; assert(!dispatch_thread.is_started()); @@ -708,7 +329,7 @@ void Rank::Endpoint::ready() } -int Rank::Endpoint::shutdown() +int SimpleMessenger::Endpoint::shutdown() { dout(10) << "shutdown " << get_myaddr() << dendl; @@ -726,14 +347,14 @@ int Rank::Endpoint::shutdown() return 0; } -void Rank::Endpoint::suicide() +void SimpleMessenger::Endpoint::suicide() { dout(10) << "suicide " << get_myaddr() << dendl; shutdown(); // hmm, or exit(0)? } -void Rank::Endpoint::prepare_dest(const entity_inst_t& inst) +void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst) { rank->lock.Lock(); { @@ -743,7 +364,7 @@ void Rank::Endpoint::prepare_dest(const entity_inst_t& inst) rank->lock.Unlock(); } -int Rank::Endpoint::send_message(Message *m, entity_inst_t dest) +int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest) { // set envelope m->set_source_inst(_myinst); @@ -768,7 +389,7 @@ int Rank::Endpoint::send_message(Message *m, entity_inst_t dest) return 0; } -int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest) +int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest) { // set envelope m->set_source_inst(_myinst); @@ -794,7 +415,7 @@ int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest) -int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) +int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) { // set envelope m->set_source_inst(_myinst); @@ -821,7 +442,7 @@ int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) -void Rank::Endpoint::reset_myname(entity_name_t newname) +void SimpleMessenger::Endpoint::reset_myname(entity_name_t newname) { entity_name_t oldname = get_myname(); dout(10) << "reset_myname " << oldname << " to " << newname << dendl; @@ -829,27 +450,11 @@ void Rank::Endpoint::reset_myname(entity_name_t newname) } -void Rank::Endpoint::mark_down(entity_addr_t a) +void SimpleMessenger::Endpoint::mark_down(entity_addr_t a) { rank->mark_down(a); } -void Rank::mark_down(entity_addr_t addr) -{ - lock.Lock(); - if (rank_pipe.count(addr)) { - Pipe *p = rank_pipe[addr]; - dout(1) << "mark_down " << addr << " -- " << p << dendl; - p->unregister_pipe(); - p->lock.Lock(); - p->stop(); - p->lock.Unlock(); - } else { - dout(1) << "mark_down " << addr << " -- pipe dne" << dendl; - } - lock.Unlock(); -} - @@ -860,7 +465,7 @@ void Rank::mark_down(entity_addr_t addr) #undef dout_prefix #define dout_prefix _pipe_prefix() -ostream& Rank::Pipe::_pipe_prefix() { +ostream& SimpleMessenger::Pipe::_pipe_prefix() { return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this << " sd=" << sd @@ -869,7 +474,7 @@ ostream& Rank::Pipe::_pipe_prefix() { << ")."; } -int Rank::Pipe::accept() +int SimpleMessenger::Pipe::accept() { dout(10) << "accept" << dendl; @@ -1131,7 +736,7 @@ int Rank::Pipe::accept() return -1; } -int Rank::Pipe::connect() +int SimpleMessenger::Pipe::connect() { dout(10) << "connect " << connect_seq << dendl; assert(lock.is_locked()); @@ -1354,7 +959,7 @@ int Rank::Pipe::connect() return -1; } -void Rank::Pipe::register_pipe() +void SimpleMessenger::Pipe::register_pipe() { dout(10) << "register_pipe" << dendl; assert(rank->lock.is_locked()); @@ -1362,7 +967,7 @@ void Rank::Pipe::register_pipe() rank->rank_pipe[peer_addr] = this; } -void Rank::Pipe::unregister_pipe() +void SimpleMessenger::Pipe::unregister_pipe() { assert(rank->lock.is_locked()); if (rank->rank_pipe.count(peer_addr) && @@ -1375,7 +980,7 @@ void Rank::Pipe::unregister_pipe() } -void Rank::Pipe::requeue_sent() +void SimpleMessenger::Pipe::requeue_sent() { if (sent.empty()) return; @@ -1391,7 +996,7 @@ void Rank::Pipe::requeue_sent() } } -void Rank::Pipe::discard_queue() +void SimpleMessenger::Pipe::discard_queue() { dout(10) << "discard_queue" << dendl; for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++) @@ -1404,7 +1009,7 @@ void Rank::Pipe::discard_queue() } -void Rank::Pipe::fault(bool onconnect, bool onread) +void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) { assert(lock.is_locked()); cond.Signal(); @@ -1479,7 +1084,7 @@ void Rank::Pipe::fault(bool onconnect, bool onread) last_attempt = now; } -void Rank::Pipe::fail() +void SimpleMessenger::Pipe::fail() { derr(10) << "fail" << dendl; assert(lock.is_locked()); @@ -1499,7 +1104,7 @@ void Rank::Pipe::fail() lock.Lock(); } -void Rank::Pipe::was_session_reset() +void SimpleMessenger::Pipe::was_session_reset() { assert(lock.is_locked()); @@ -1514,7 +1119,7 @@ void Rank::Pipe::was_session_reset() connect_seq = 0; } -void Rank::Pipe::report_failures() +void SimpleMessenger::Pipe::report_failures() { // report failures q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent); @@ -1538,7 +1143,7 @@ void Rank::Pipe::report_failures() } } -void Rank::Pipe::stop() +void SimpleMessenger::Pipe::stop() { dout(10) << "stop" << dendl; state = STATE_CLOSED; @@ -1553,7 +1158,7 @@ void Rank::Pipe::stop() /* read msgs from socket. * also, server. */ -void Rank::Pipe::reader() +void SimpleMessenger::Pipe::reader() { if (state == STATE_ACCEPTING) accept(); @@ -1750,7 +1355,7 @@ public: /* write msgs to socket. * also, client. */ -void Rank::Pipe::writer() +void SimpleMessenger::Pipe::writer() { lock.Lock(); @@ -1855,7 +1460,7 @@ void Rank::Pipe::writer() } -Message *Rank::Pipe::read_message() +Message *SimpleMessenger::Pipe::read_message() { // envelope //dout(10) << "receiver.read_message from sd " << sd << dendl; @@ -1959,7 +1564,7 @@ Message *Rank::Pipe::read_message() } -int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len) +int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len) { while (len > 0) { if (0) { // sanity @@ -2006,7 +1611,7 @@ int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len) } -int Rank::Pipe::write_ack(unsigned seq) +int SimpleMessenger::Pipe::write_ack(unsigned seq) { dout(10) << "write_ack " << seq << dendl; @@ -2030,7 +1635,7 @@ int Rank::Pipe::write_ack(unsigned seq) } -int Rank::Pipe::write_message(Message *m) +int SimpleMessenger::Pipe::write_message(Message *m) { ceph_msg_header& header = m->get_header(); ceph_msg_footer& footer = m->get_footer(); @@ -2125,3 +1730,398 @@ int Rank::Pipe::write_message(Message *m) } +/******************************************** + * SimpleMessenger + */ +#undef dout_prefix +#define dout_prefix _prefix(this) + + +/* + * note: assumes lock is held + */ +void SimpleMessenger::reaper() +{ + dout(10) << "reaper" << dendl; + assert(lock.is_locked()); + + while (!pipe_reap_queue.empty()) { + Pipe *p = pipe_reap_queue.front(); + dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; + p->unregister_pipe(); + pipe_reap_queue.pop_front(); + assert(pipes.count(p)); + pipes.erase(p); + p->join(); + p->discard_queue(); + dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; + assert(p->sd < 0); + delete p; + dout(10) << "reaper deleted pipe " << p << dendl; + } +} + + +int SimpleMessenger::bind(int64_t force_nonce) +{ + lock.Lock(); + if (started) { + dout(10) << "rank.bind already started" << dendl; + lock.Unlock(); + return -1; + } + dout(10) << "rank.bind" << dendl; + lock.Unlock(); + + // bind to a socket + return accepter.bind(force_nonce); +} + + +class C_Die : public Context { +public: + void finish(int) { + cerr << "die" << std::endl; + exit(1); + } +}; + +static void write_pid_file(int pid) +{ + if (!g_conf.pid_file) + return; + + int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644); + if (fd >= 0) { + char buf[20]; + int len = sprintf(buf, "%d\n", pid); + ::write(fd, buf, len); + ::close(fd); + } +} + +static void remove_pid_file() +{ + if (!g_conf.pid_file) + return; + + // only remove it if it has OUR pid in it! + int fd = ::open(g_conf.pid_file, O_RDONLY); + if (fd >= 0) { + char buf[20]; + ::read(fd, buf, 20); + ::close(fd); + int a = atoi(buf); + + if (a == getpid()) + ::unlink(g_conf.pid_file); + else + generic_dout(0) << "strange, pid file " << g_conf.pid_file + << " has " << a << ", not expected " << getpid() + << dendl; + } +} + +int SimpleMessenger::start(bool nodaemon) +{ + // register at least one entity, first! + assert(my_type >= 0); + + lock.Lock(); + if (started) { + dout(10) << "rank.start already started" << dendl; + lock.Unlock(); + return 0; + } + + dout(1) << "rank.start at " << rank_addr << dendl; + started = true; + lock.Unlock(); + + // daemonize? + if (g_conf.daemonize && !nodaemon) { + if (Thread::get_num_threads() > 0) { + derr(0) << "rank.start BUG: there are " << Thread::get_num_threads() + << " already started that will now die! call rank.start() sooner." + << dendl; + } + dout(1) << "rank.start daemonizing" << dendl; + + if (1) { + daemon(1, 0); + write_pid_file(getpid()); + } else { + pid_t pid = fork(); + if (pid) { + // i am parent + write_pid_file(pid); + ::close(0); + ::close(1); + ::close(2); + _exit(0); + } + } + + if (g_conf.chdir && g_conf.chdir[0]) { + ::mkdir(g_conf.chdir, 0700); + ::chdir(g_conf.chdir); + } + + _dout_rename_output_file(); + } else if (g_daemon) { + write_pid_file(getpid()); + } + + // some debug hackery? + if (g_conf.kill_after) + g_timer.add_event_after(g_conf.kill_after, new C_Die); + + // go! + accepter.start(); + return 0; +} + + +/* connect_rank + * NOTE: assumes rank.lock held. + */ +SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, const Policy& p) +{ + assert(lock.is_locked()); + assert(addr != rank_addr); + + dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; + + // create pipe + Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING); + pipe->policy = p; + pipe->peer_addr = addr; + pipe->start_writer(); + pipe->register_pipe(); + pipes.insert(pipe); + + return pipe; +} + + + + + + + + +/* register_entity + */ +SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name) +{ + dout(10) << "register_entity " << name << dendl; + lock.Lock(); + + // create messenger + int erank = max_local; + Endpoint *msgr = new Endpoint(this, name, erank); + + // now i know my type. + if (my_type >= 0) + assert(my_type == name.type()); + else + my_type = name.type(); + + // add to directory + max_local++; + local.resize(max_local); + stopped.resize(max_local); + + msgr->get(); + local[erank] = msgr; + stopped[erank] = false; + msgr->_myinst.addr = rank_addr; + if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr) + msgr->need_addr = true; + msgr->_myinst.addr.erank = erank; + + dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr + << " need_addr=" << need_addr + << dendl; + + num_local++; + + lock.Unlock(); + return msgr; +} + + +void SimpleMessenger::unregister_entity(Endpoint *msgr) +{ + lock.Lock(); + dout(10) << "unregister_entity " << msgr->get_myname() << dendl; + + // remove from local directory. + assert(msgr->my_rank >= 0); + assert(local[msgr->my_rank] == msgr); + local[msgr->my_rank] = 0; + stopped[msgr->my_rank] = true; + num_local--; + msgr->my_rank = -1; + + assert(msgr->nref.test() > 1); + msgr->put(); + + wait_cond.Signal(); + + lock.Unlock(); +} + + +void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy) +{ + const entity_name_t dest = m->get_dest(); + + assert(m->nref.test() == 0); + + m->get_header().mon_protocol = CEPH_MON_PROTOCOL; + m->get_header().monc_protocol = CEPH_MONC_PROTOCOL; + m->get_header().mds_protocol = CEPH_MDS_PROTOCOL; + m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL; + m->get_header().osd_protocol = CEPH_OSD_PROTOCOL; + m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL; + + // lookup + entity_addr_t dest_proc_addr = dest_addr; + dest_proc_addr.erank = 0; + + lock.Lock(); + { + // local? + if (rank_addr.is_local_to(dest_addr)) { + if (dest_addr.erank < max_local && local[dest_addr.erank]) { + // local + dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; + local[dest_addr.erank]->queue_message(m); + } else { + derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl; + //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. + delete m; + } + } + else { + // remote. + Pipe *pipe = 0; + if (rank_pipe.count( dest_proc_addr )) { + // connected? + pipe = rank_pipe[ dest_proc_addr ]; + pipe->lock.Lock(); + if (pipe->state == Pipe::STATE_CLOSED) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; + pipe->unregister_pipe(); + pipe->lock.Unlock(); + pipe = 0; + } else { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl; + + // if this pipe was created by an incoming connection, but we haven't received + // a message yet, then it won't have the policy set. + if (pipe->get_out_seq() == 0) + pipe->policy = policy_map[m->get_dest().type()]; + + pipe->_send(m); + pipe->lock.Unlock(); + } + } + if (!pipe) { + if (lazy) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl; + delete m; + } else { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl; + // not connected. + pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]); + pipe->send(m); + } + } + } + } + + lock.Unlock(); +} + + + + + +void SimpleMessenger::wait() +{ + lock.Lock(); + while (1) { + // reap dead pipes + reaper(); + + if (num_local == 0) { + dout(10) << "wait: everything stopped" << dendl; + break; // everything stopped. + } else { + dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl; + } + + wait_cond.Wait(lock); + } + lock.Unlock(); + + // done! clean up. + dout(20) << "wait: stopping accepter thread" << dendl; + accepter.stop(); + dout(20) << "wait: stopped accepter thread" << dendl; + + // close+reap all pipes + lock.Lock(); + { + dout(10) << "wait: closing pipes" << dendl; + list<Pipe*> toclose; + for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin(); + i != rank_pipe.end(); + i++) + toclose.push_back(i->second); + for (list<Pipe*>::iterator i = toclose.begin(); + i != toclose.end(); + i++) { + (*i)->unregister_pipe(); + (*i)->lock.Lock(); + (*i)->stop(); + (*i)->lock.Unlock(); + } + + reaper(); + dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; + while (!pipes.empty()) { + wait_cond.Wait(lock); + reaper(); + } + } + lock.Unlock(); + + dout(10) << "wait: done." << dendl; + dout(1) << "shutdown complete." << dendl; + remove_pid_file(); + started = false; + my_type = -1; +} + + + + +void SimpleMessenger::mark_down(entity_addr_t addr) +{ + lock.Lock(); + if (rank_pipe.count(addr)) { + Pipe *p = rank_pipe[addr]; + dout(1) << "mark_down " << addr << " -- " << p << dendl; + p->unregister_pipe(); + p->lock.Lock(); + p->stop(); + p->lock.Unlock(); + } else { + dout(1) << "mark_down " << addr << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 98f808c7ceb..d25d58189a1 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -36,7 +36,7 @@ using namespace __gnu_cxx; /* Rank - per-process */ -class Rank { +class SimpleMessenger { public: struct Policy { bool lossy_tx; // @@ -88,11 +88,11 @@ private: // incoming class Accepter : public Thread { public: - Rank *rank; + SimpleMessenger *rank; bool done; int listen_sd; - Accepter(Rank *r) : rank(r), done(false), listen_sd(-1) {} + Accepter(SimpleMessenger *r) : rank(r), done(false), listen_sd(-1) {} void *entry(); void stop(); @@ -105,7 +105,7 @@ private: // pipe class Pipe { public: - Rank *rank; + SimpleMessenger *rank; ostream& _pipe_prefix(); enum { @@ -179,10 +179,10 @@ private: friend class Writer; public: - Pipe(Rank *r, int st) : + Pipe(SimpleMessenger *r, int st) : rank(r), sd(-1), - lock("Rank::Pipe::lock"), + lock("SimpleMessenger::Pipe::lock"), state(st), reader_running(false), writer_running(false), connect_seq(0), peer_global_seq(0), @@ -264,7 +264,7 @@ private: // messenger interface class Endpoint : public Messenger { - Rank *rank; + SimpleMessenger *rank; Mutex lock; Cond cond; map<int, list<Message*> > dispatch_queue; @@ -286,7 +286,7 @@ private: } dispatch_thread; void dispatch_entry(); - friend class Rank; + friend class SimpleMessenger; public: void queue_message(Message *m) { @@ -331,10 +331,10 @@ private: } public: - Endpoint(Rank *r, entity_name_t name, int rn) : + Endpoint(SimpleMessenger *r, entity_name_t name, int rn) : Messenger(name), rank(r), - lock("Rank::Endpoint::lock"), + lock("SimpleMessenger::Endpoint::lock"), stop(false), qlen(0), my_rank(rn), @@ -373,7 +373,7 @@ private: }; - // Rank stuff + // SimpleMessenger stuff public: Mutex lock; Cond wait_cond; // for wait() @@ -409,12 +409,12 @@ private: void reaper(); public: - Rank() : accepter(this), - lock("Rank::lock"), started(false), need_addr(true), + SimpleMessenger() : accepter(this), + lock("SimpleMessenger::lock"), started(false), need_addr(true), max_local(0), num_local(0), my_type(-1), - global_seq_lock("Rank::global_seq_lock"), global_seq(0) { } - ~Rank() { } + global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) { } + ~SimpleMessenger() { } //void set_listen_addr(tcpaddr_t& a); @@ -444,8 +444,4 @@ public: } } ; - - -extern Rank rank; - #endif diff --git a/src/testmsgr.cc b/src/testmsgr.cc index 6da4d057a99..5bb484d159d 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -83,6 +83,7 @@ int main(int argc, const char **argv, const char *envp[]) { // start up network g_my_addr = monmap.get_inst(whoami).addr; + SimpleMessenger rank; int err = rank.bind(); if (err < 0) return 1; |