summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-05-01 16:11:46 +0200
committerSage Weil <sage@newdream.net>2009-05-01 16:12:18 +0200
commitbe072bef3ee98070c97dffe83568f9faaaa2a3bc (patch)
treebbce3447d9d00c79051a60b72739b77043b38c48
parentmsgr: clean up refs to static 'rank' (diff)
downloadceph-0.7.3.tar.xz
ceph-0.7.3.zip
msgr: kill static instance 'rank' of SimpleMessengerv0.7.3
-rw-r--r--src/ceph.cc3
-rw-r--r--src/cfuse.cc7
-rw-r--r--src/cmds.cc9
-rw-r--r--src/cmon.cc12
-rw-r--r--src/cosd.cc9
-rw-r--r--src/csyn.cc7
-rw-r--r--src/dumpjournal.cc1
-rw-r--r--src/mon/MonClient.cc4
-rw-r--r--src/msg/SimpleMessenger.cc860
-rw-r--r--src/msg/SimpleMessenger.h34
-rw-r--r--src/testmsgr.cc1
11 files changed, 477 insertions, 470 deletions
diff --git a/src/ceph.cc b/src/ceph.cc
index eff440dfe0a..5884b0e9ba5 100644
--- a/src/ceph.cc
+++ b/src/ceph.cc
@@ -587,12 +587,13 @@ int main(int argc, const char **argv, const char *envp[])
return -1;
// start up network
+ SimpleMessenger rank;
rank.bind();
messenger = rank.register_entity(entity_name_t::ADMIN());
messenger->set_dispatcher(&dispatcher);
rank.start();
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0));
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
if (watch) {
lock.Lock();
diff --git a/src/cfuse.cc b/src/cfuse.cc
index b502e6099d0..f44aed720d8 100644
--- a/src/cfuse.cc
+++ b/src/cfuse.cc
@@ -69,6 +69,7 @@ int main(int argc, const char **argv, const char *envp[]) {
return -1;
// start up network
+ SimpleMessenger rank;
rank.bind();
cout << "bound to " << rank.get_rank_addr() << ", mounting ceph" << std::endl;
@@ -76,9 +77,9 @@ int main(int argc, const char **argv, const char *envp[]) {
rank.start();
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
// start client
client->init();
diff --git a/src/cmds.cc b/src/cmds.cc
index a1603dd3171..2e76b687ea7 100644
--- a/src/cmds.cc
+++ b/src/cmds.cc
@@ -68,6 +68,7 @@ int main(int argc, const char **argv)
if (mc.get_monmap(&monmap) < 0)
return -1;
+ SimpleMessenger rank;
rank.bind();
cout << "starting mds." << g_conf.id
<< " at " << rank.get_rank_addr()
@@ -79,10 +80,10 @@ int main(int argc, const char **argv)
if (!m)
return 1;
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0));
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossless()); // mds does its own timeout/markdown
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
+ rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless()); // mds does its own timeout/markdown
rank.start();
diff --git a/src/cmon.cc b/src/cmon.cc
index d84e217e8a2..d75e8610958 100644
--- a/src/cmon.cc
+++ b/src/cmon.cc
@@ -122,6 +122,8 @@ int main(int argc, const char **argv)
}
// bind
+ SimpleMessenger rank;
+
cout << "starting mon" << whoami
<< " at " << monmap.get_inst(whoami).addr
<< " mon_data " << g_conf.mon_data
@@ -141,12 +143,12 @@ int main(int argc, const char **argv)
rank.start(); // may daemonize
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_ADMIN, SimpleMessenger::Policy::lossy_fast_fail());
mon->init();
diff --git a/src/cosd.cc b/src/cosd.cc
index c49908d0741..0ff53dee227 100644
--- a/src/cosd.cc
+++ b/src/cosd.cc
@@ -122,6 +122,7 @@ int main(int argc, const char **argv)
}
// start up network
+ SimpleMessenger rank;
rank.bind();
cout << "starting osd" << whoami
@@ -142,14 +143,14 @@ int main(int argc, const char **argv)
if (!hbm)
return 1;
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
// make a _reasonable_ effort to send acks/replies to requests, but
// don't get carried away, as the sender may go away and we won't
// ever hear about it.
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail());
rank.start();
diff --git a/src/csyn.cc b/src/csyn.cc
index 58110fa6a69..62cd08a0bd3 100644
--- a/src/csyn.cc
+++ b/src/csyn.cc
@@ -56,12 +56,13 @@ int main(int argc, const char **argv, char *envp[])
return -1;
// start up network
+ SimpleMessenger rank;
rank.bind();
cout << "starting csyn at " << rank.get_rank_addr() << std::endl;
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
list<Client*> clients;
list<SyntheticClient*> synclients;
diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc
index 158ff642ffc..00b39721027 100644
--- a/src/dumpjournal.cc
+++ b/src/dumpjournal.cc
@@ -88,6 +88,7 @@ int main(int argc, const char **argv, const char *envp[])
return -1;
// start up network
+ SimpleMessenger rank;
rank.bind();
g_conf.daemonize = false; // not us!
rank.start();
diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc
index 73888a98cea..65f79640839 100644
--- a/src/mon/MonClient.cc
+++ b/src/mon/MonClient.cc
@@ -34,7 +34,9 @@ int MonClient::probe_mon(MonMap *pmonmap)
cerr << "couldn't parse ip:port(s) from '" << g_conf.mon_host << "'" << std::endl;
return -1;
}
-
+
+ SimpleMessenger rank;
+
rank.bind();
dout(1) << " connecting to monitor(s) at " << monaddrs << " ..." << dendl;
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index b967ca0d830..22822405ed0 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -38,9 +38,9 @@
#define DOUT_SUBSYS ms
#undef dout_prefix
-#define dout_prefix _prefix()
-static ostream& _prefix() {
- return *_dout << dbeginl << pthread_self() << " -- " << rank.rank_addr << " ";
+#define dout_prefix _prefix(rank)
+static ostream& _prefix(SimpleMessenger *rank) {
+ return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " ";
}
@@ -53,8 +53,6 @@ static ostream& _prefix() {
#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
-Rank rank;
-
#ifdef DARWIN
sig_t old_sigint_handler = 0;
#else
@@ -70,7 +68,7 @@ void noop_signal_handler(int s)
//dout(0) << "blah_handler got " << s << dendl;
}
-int Rank::Accepter::bind(int64_t force_nonce)
+int SimpleMessenger::Accepter::bind(int64_t force_nonce)
{
// bind to a socket
dout(10) << "accepter.bind" << dendl;
@@ -152,7 +150,7 @@ int Rank::Accepter::bind(int64_t force_nonce)
return 0;
}
-int Rank::Accepter::start()
+int SimpleMessenger::Accepter::start()
{
dout(1) << "accepter.start" << dendl;
@@ -171,7 +169,7 @@ int Rank::Accepter::start()
return 0;
}
-void *Rank::Accepter::entry()
+void *SimpleMessenger::Accepter::entry()
{
dout(10) << "accepter starting" << dendl;
@@ -229,7 +227,7 @@ void *Rank::Accepter::entry()
return 0;
}
-void Rank::Accepter::stop()
+void SimpleMessenger::Accepter::stop()
{
done = true;
dout(10) << "stop sending SIGUSR1" << dendl;
@@ -241,390 +239,13 @@ void Rank::Accepter::stop()
-/********************************************
- * Rank
- */
-
-
-/*
- * note: assumes lock is held
- */
-void Rank::reaper()
-{
- dout(10) << "reaper" << dendl;
- assert(lock.is_locked());
-
- while (!pipe_reap_queue.empty()) {
- Pipe *p = pipe_reap_queue.front();
- dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
- p->unregister_pipe();
- pipe_reap_queue.pop_front();
- assert(pipes.count(p));
- pipes.erase(p);
- p->join();
- p->discard_queue();
- dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
- assert(p->sd < 0);
- delete p;
- dout(10) << "reaper deleted pipe " << p << dendl;
- }
-}
-
-
-int Rank::bind(int64_t force_nonce)
-{
- lock.Lock();
- if (started) {
- dout(10) << "rank.bind already started" << dendl;
- lock.Unlock();
- return -1;
- }
- dout(10) << "rank.bind" << dendl;
- lock.Unlock();
-
- // bind to a socket
- return accepter.bind(force_nonce);
-}
-
-
-class C_Die : public Context {
-public:
- void finish(int) {
- cerr << "die" << std::endl;
- exit(1);
- }
-};
-
-static void write_pid_file(int pid)
-{
- if (!g_conf.pid_file)
- return;
-
- int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
- if (fd >= 0) {
- char buf[20];
- int len = sprintf(buf, "%d\n", pid);
- ::write(fd, buf, len);
- ::close(fd);
- }
-}
-
-static void remove_pid_file()
-{
- if (!g_conf.pid_file)
- return;
-
- // only remove it if it has OUR pid in it!
- int fd = ::open(g_conf.pid_file, O_RDONLY);
- if (fd >= 0) {
- char buf[20];
- ::read(fd, buf, 20);
- ::close(fd);
- int a = atoi(buf);
-
- if (a == getpid())
- ::unlink(g_conf.pid_file);
- else
- dout(0) << "strange, pid file " << g_conf.pid_file
- << " has " << a << ", not expected " << getpid()
- << dendl;
- }
-}
-
-int Rank::start(bool nodaemon)
-{
- // register at least one entity, first!
- assert(my_type >= 0);
-
- lock.Lock();
- if (started) {
- dout(10) << "rank.start already started" << dendl;
- lock.Unlock();
- return 0;
- }
-
- dout(1) << "rank.start at " << rank_addr << dendl;
- started = true;
- lock.Unlock();
-
- // daemonize?
- if (g_conf.daemonize && !nodaemon) {
- if (Thread::get_num_threads() > 0) {
- derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
- << " already started that will now die! call rank.start() sooner."
- << dendl;
- }
- dout(1) << "rank.start daemonizing" << dendl;
-
- if (1) {
- daemon(1, 0);
- write_pid_file(getpid());
- } else {
- pid_t pid = fork();
- if (pid) {
- // i am parent
- write_pid_file(pid);
- ::close(0);
- ::close(1);
- ::close(2);
- _exit(0);
- }
- }
-
- if (g_conf.chdir && g_conf.chdir[0]) {
- ::mkdir(g_conf.chdir, 0700);
- ::chdir(g_conf.chdir);
- }
-
- _dout_rename_output_file();
- } else if (g_daemon) {
- write_pid_file(getpid());
- }
-
- // some debug hackery?
- if (g_conf.kill_after)
- g_timer.add_event_after(g_conf.kill_after, new C_Die);
-
- // go!
- accepter.start();
- return 0;
-}
-
-
-/* connect_rank
- * NOTE: assumes rank.lock held.
- */
-Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p)
-{
- assert(lock.is_locked());
- assert(addr != rank_addr);
-
- dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
-
- // create pipe
- Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
- pipe->policy = p;
- pipe->peer_addr = addr;
- pipe->start_writer();
- pipe->register_pipe();
- pipes.insert(pipe);
-
- return pipe;
-}
-
-
-
-
-
-
-
-
-/* register_entity
- */
-Rank::Endpoint *Rank::register_entity(entity_name_t name)
-{
- dout(10) << "register_entity " << name << dendl;
- lock.Lock();
-
- // create messenger
- int erank = max_local;
- Endpoint *msgr = new Endpoint(this, name, erank);
-
- // now i know my type.
- if (my_type >= 0)
- assert(my_type == name.type());
- else
- my_type = name.type();
-
- // add to directory
- max_local++;
- local.resize(max_local);
- stopped.resize(max_local);
-
- msgr->get();
- local[erank] = msgr;
- stopped[erank] = false;
- msgr->_myinst.addr = rank_addr;
- if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
- msgr->need_addr = true;
- msgr->_myinst.addr.erank = erank;
-
- dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr
- << " need_addr=" << need_addr
- << dendl;
-
- num_local++;
-
- lock.Unlock();
- return msgr;
-}
-
-
-void Rank::unregister_entity(Endpoint *msgr)
-{
- lock.Lock();
- dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
-
- // remove from local directory.
- assert(msgr->my_rank >= 0);
- assert(local[msgr->my_rank] == msgr);
- local[msgr->my_rank] = 0;
- stopped[msgr->my_rank] = true;
- num_local--;
- msgr->my_rank = -1;
-
- assert(msgr->nref.test() > 1);
- msgr->put();
-
- wait_cond.Signal();
-
- lock.Unlock();
-}
-
-
-void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
-{
- const entity_name_t dest = m->get_dest();
-
- assert(m->nref.test() == 0);
-
- m->get_header().mon_protocol = CEPH_MON_PROTOCOL;
- m->get_header().monc_protocol = CEPH_MONC_PROTOCOL;
- m->get_header().mds_protocol = CEPH_MDS_PROTOCOL;
- m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL;
- m->get_header().osd_protocol = CEPH_OSD_PROTOCOL;
- m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL;
-
- // lookup
- entity_addr_t dest_proc_addr = dest_addr;
- dest_proc_addr.erank = 0;
-
- lock.Lock();
- {
- // local?
- if (rank_addr.is_local_to(dest_addr)) {
- if (dest_addr.erank < max_local && local[dest_addr.erank]) {
- // local
- dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
- local[dest_addr.erank]->queue_message(m);
- } else {
- derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl;
- //assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
- delete m;
- }
- }
- else {
- // remote.
- Pipe *pipe = 0;
- if (rank_pipe.count( dest_proc_addr )) {
- // connected?
- pipe = rank_pipe[ dest_proc_addr ];
- pipe->lock.Lock();
- if (pipe->state == Pipe::STATE_CLOSED) {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
- pipe->unregister_pipe();
- pipe->lock.Unlock();
- pipe = 0;
- } else {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
-
- // if this pipe was created by an incoming connection, but we haven't received
- // a message yet, then it won't have the policy set.
- if (pipe->get_out_seq() == 0)
- pipe->policy = policy_map[m->get_dest().type()];
-
- pipe->_send(m);
- pipe->lock.Unlock();
- }
- }
- if (!pipe) {
- if (lazy) {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
- delete m;
- } else {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
- // not connected.
- pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
- pipe->send(m);
- }
- }
- }
- }
-
- lock.Unlock();
-}
-
-
-
-
-
-void Rank::wait()
-{
- lock.Lock();
- while (1) {
- // reap dead pipes
- reaper();
-
- if (num_local == 0) {
- dout(10) << "wait: everything stopped" << dendl;
- break; // everything stopped.
- } else {
- dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
- }
-
- wait_cond.Wait(lock);
- }
- lock.Unlock();
-
- // done! clean up.
- dout(20) << "wait: stopping accepter thread" << dendl;
- accepter.stop();
- dout(20) << "wait: stopped accepter thread" << dendl;
-
- // close+reap all pipes
- lock.Lock();
- {
- dout(10) << "wait: closing pipes" << dendl;
- list<Pipe*> toclose;
- for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin();
- i != rank_pipe.end();
- i++)
- toclose.push_back(i->second);
- for (list<Pipe*>::iterator i = toclose.begin();
- i != toclose.end();
- i++) {
- (*i)->unregister_pipe();
- (*i)->lock.Lock();
- (*i)->stop();
- (*i)->lock.Unlock();
- }
-
- reaper();
- dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
- while (!pipes.empty()) {
- wait_cond.Wait(lock);
- reaper();
- }
- }
- lock.Unlock();
-
- dout(10) << "wait: done." << dendl;
- dout(1) << "shutdown complete." << dendl;
- remove_pid_file();
- started = false;
- my_type = -1;
-}
-
-
-
-
/**********************************
* Endpoint
*/
-void Rank::Endpoint::dispatch_entry()
+void SimpleMessenger::Endpoint::dispatch_entry()
{
lock.Lock();
while (!stop) {
@@ -699,7 +320,7 @@ void Rank::Endpoint::dispatch_entry()
put();
}
-void Rank::Endpoint::ready()
+void SimpleMessenger::Endpoint::ready()
{
dout(10) << "ready " << get_myaddr() << dendl;
assert(!dispatch_thread.is_started());
@@ -708,7 +329,7 @@ void Rank::Endpoint::ready()
}
-int Rank::Endpoint::shutdown()
+int SimpleMessenger::Endpoint::shutdown()
{
dout(10) << "shutdown " << get_myaddr() << dendl;
@@ -726,14 +347,14 @@ int Rank::Endpoint::shutdown()
return 0;
}
-void Rank::Endpoint::suicide()
+void SimpleMessenger::Endpoint::suicide()
{
dout(10) << "suicide " << get_myaddr() << dendl;
shutdown();
// hmm, or exit(0)?
}
-void Rank::Endpoint::prepare_dest(const entity_inst_t& inst)
+void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst)
{
rank->lock.Lock();
{
@@ -743,7 +364,7 @@ void Rank::Endpoint::prepare_dest(const entity_inst_t& inst)
rank->lock.Unlock();
}
-int Rank::Endpoint::send_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
{
// set envelope
m->set_source_inst(_myinst);
@@ -768,7 +389,7 @@ int Rank::Endpoint::send_message(Message *m, entity_inst_t dest)
return 0;
}
-int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
{
// set envelope
m->set_source_inst(_myinst);
@@ -794,7 +415,7 @@ int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest)
-int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
+int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
{
// set envelope
m->set_source_inst(_myinst);
@@ -821,7 +442,7 @@ int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
-void Rank::Endpoint::reset_myname(entity_name_t newname)
+void SimpleMessenger::Endpoint::reset_myname(entity_name_t newname)
{
entity_name_t oldname = get_myname();
dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
@@ -829,27 +450,11 @@ void Rank::Endpoint::reset_myname(entity_name_t newname)
}
-void Rank::Endpoint::mark_down(entity_addr_t a)
+void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
{
rank->mark_down(a);
}
-void Rank::mark_down(entity_addr_t addr)
-{
- lock.Lock();
- if (rank_pipe.count(addr)) {
- Pipe *p = rank_pipe[addr];
- dout(1) << "mark_down " << addr << " -- " << p << dendl;
- p->unregister_pipe();
- p->lock.Lock();
- p->stop();
- p->lock.Unlock();
- } else {
- dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
- }
- lock.Unlock();
-}
-
@@ -860,7 +465,7 @@ void Rank::mark_down(entity_addr_t addr)
#undef dout_prefix
#define dout_prefix _pipe_prefix()
-ostream& Rank::Pipe::_pipe_prefix() {
+ostream& SimpleMessenger::Pipe::_pipe_prefix() {
return *_dout << dbeginl << pthread_self()
<< " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd
@@ -869,7 +474,7 @@ ostream& Rank::Pipe::_pipe_prefix() {
<< ").";
}
-int Rank::Pipe::accept()
+int SimpleMessenger::Pipe::accept()
{
dout(10) << "accept" << dendl;
@@ -1131,7 +736,7 @@ int Rank::Pipe::accept()
return -1;
}
-int Rank::Pipe::connect()
+int SimpleMessenger::Pipe::connect()
{
dout(10) << "connect " << connect_seq << dendl;
assert(lock.is_locked());
@@ -1354,7 +959,7 @@ int Rank::Pipe::connect()
return -1;
}
-void Rank::Pipe::register_pipe()
+void SimpleMessenger::Pipe::register_pipe()
{
dout(10) << "register_pipe" << dendl;
assert(rank->lock.is_locked());
@@ -1362,7 +967,7 @@ void Rank::Pipe::register_pipe()
rank->rank_pipe[peer_addr] = this;
}
-void Rank::Pipe::unregister_pipe()
+void SimpleMessenger::Pipe::unregister_pipe()
{
assert(rank->lock.is_locked());
if (rank->rank_pipe.count(peer_addr) &&
@@ -1375,7 +980,7 @@ void Rank::Pipe::unregister_pipe()
}
-void Rank::Pipe::requeue_sent()
+void SimpleMessenger::Pipe::requeue_sent()
{
if (sent.empty())
return;
@@ -1391,7 +996,7 @@ void Rank::Pipe::requeue_sent()
}
}
-void Rank::Pipe::discard_queue()
+void SimpleMessenger::Pipe::discard_queue()
{
dout(10) << "discard_queue" << dendl;
for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
@@ -1404,7 +1009,7 @@ void Rank::Pipe::discard_queue()
}
-void Rank::Pipe::fault(bool onconnect, bool onread)
+void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
{
assert(lock.is_locked());
cond.Signal();
@@ -1479,7 +1084,7 @@ void Rank::Pipe::fault(bool onconnect, bool onread)
last_attempt = now;
}
-void Rank::Pipe::fail()
+void SimpleMessenger::Pipe::fail()
{
derr(10) << "fail" << dendl;
assert(lock.is_locked());
@@ -1499,7 +1104,7 @@ void Rank::Pipe::fail()
lock.Lock();
}
-void Rank::Pipe::was_session_reset()
+void SimpleMessenger::Pipe::was_session_reset()
{
assert(lock.is_locked());
@@ -1514,7 +1119,7 @@ void Rank::Pipe::was_session_reset()
connect_seq = 0;
}
-void Rank::Pipe::report_failures()
+void SimpleMessenger::Pipe::report_failures()
{
// report failures
q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent);
@@ -1538,7 +1143,7 @@ void Rank::Pipe::report_failures()
}
}
-void Rank::Pipe::stop()
+void SimpleMessenger::Pipe::stop()
{
dout(10) << "stop" << dendl;
state = STATE_CLOSED;
@@ -1553,7 +1158,7 @@ void Rank::Pipe::stop()
/* read msgs from socket.
* also, server.
*/
-void Rank::Pipe::reader()
+void SimpleMessenger::Pipe::reader()
{
if (state == STATE_ACCEPTING)
accept();
@@ -1750,7 +1355,7 @@ public:
/* write msgs to socket.
* also, client.
*/
-void Rank::Pipe::writer()
+void SimpleMessenger::Pipe::writer()
{
lock.Lock();
@@ -1855,7 +1460,7 @@ void Rank::Pipe::writer()
}
-Message *Rank::Pipe::read_message()
+Message *SimpleMessenger::Pipe::read_message()
{
// envelope
//dout(10) << "receiver.read_message from sd " << sd << dendl;
@@ -1959,7 +1564,7 @@ Message *Rank::Pipe::read_message()
}
-int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len)
+int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len)
{
while (len > 0) {
if (0) { // sanity
@@ -2006,7 +1611,7 @@ int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len)
}
-int Rank::Pipe::write_ack(unsigned seq)
+int SimpleMessenger::Pipe::write_ack(unsigned seq)
{
dout(10) << "write_ack " << seq << dendl;
@@ -2030,7 +1635,7 @@ int Rank::Pipe::write_ack(unsigned seq)
}
-int Rank::Pipe::write_message(Message *m)
+int SimpleMessenger::Pipe::write_message(Message *m)
{
ceph_msg_header& header = m->get_header();
ceph_msg_footer& footer = m->get_footer();
@@ -2125,3 +1730,398 @@ int Rank::Pipe::write_message(Message *m)
}
+/********************************************
+ * SimpleMessenger
+ */
+#undef dout_prefix
+#define dout_prefix _prefix(this)
+
+
+/*
+ * note: assumes lock is held
+ */
+void SimpleMessenger::reaper()
+{
+ dout(10) << "reaper" << dendl;
+ assert(lock.is_locked());
+
+ while (!pipe_reap_queue.empty()) {
+ Pipe *p = pipe_reap_queue.front();
+ dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
+ p->unregister_pipe();
+ pipe_reap_queue.pop_front();
+ assert(pipes.count(p));
+ pipes.erase(p);
+ p->join();
+ p->discard_queue();
+ dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
+ assert(p->sd < 0);
+ delete p;
+ dout(10) << "reaper deleted pipe " << p << dendl;
+ }
+}
+
+
+int SimpleMessenger::bind(int64_t force_nonce)
+{
+ lock.Lock();
+ if (started) {
+ dout(10) << "rank.bind already started" << dendl;
+ lock.Unlock();
+ return -1;
+ }
+ dout(10) << "rank.bind" << dendl;
+ lock.Unlock();
+
+ // bind to a socket
+ return accepter.bind(force_nonce);
+}
+
+
+class C_Die : public Context {
+public:
+ void finish(int) {
+ cerr << "die" << std::endl;
+ exit(1);
+ }
+};
+
+static void write_pid_file(int pid)
+{
+ if (!g_conf.pid_file)
+ return;
+
+ int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
+ if (fd >= 0) {
+ char buf[20];
+ int len = sprintf(buf, "%d\n", pid);
+ ::write(fd, buf, len);
+ ::close(fd);
+ }
+}
+
+static void remove_pid_file()
+{
+ if (!g_conf.pid_file)
+ return;
+
+ // only remove it if it has OUR pid in it!
+ int fd = ::open(g_conf.pid_file, O_RDONLY);
+ if (fd >= 0) {
+ char buf[20];
+ ::read(fd, buf, 20);
+ ::close(fd);
+ int a = atoi(buf);
+
+ if (a == getpid())
+ ::unlink(g_conf.pid_file);
+ else
+ generic_dout(0) << "strange, pid file " << g_conf.pid_file
+ << " has " << a << ", not expected " << getpid()
+ << dendl;
+ }
+}
+
+int SimpleMessenger::start(bool nodaemon)
+{
+ // register at least one entity, first!
+ assert(my_type >= 0);
+
+ lock.Lock();
+ if (started) {
+ dout(10) << "rank.start already started" << dendl;
+ lock.Unlock();
+ return 0;
+ }
+
+ dout(1) << "rank.start at " << rank_addr << dendl;
+ started = true;
+ lock.Unlock();
+
+ // daemonize?
+ if (g_conf.daemonize && !nodaemon) {
+ if (Thread::get_num_threads() > 0) {
+ derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
+ << " already started that will now die! call rank.start() sooner."
+ << dendl;
+ }
+ dout(1) << "rank.start daemonizing" << dendl;
+
+ if (1) {
+ daemon(1, 0);
+ write_pid_file(getpid());
+ } else {
+ pid_t pid = fork();
+ if (pid) {
+ // i am parent
+ write_pid_file(pid);
+ ::close(0);
+ ::close(1);
+ ::close(2);
+ _exit(0);
+ }
+ }
+
+ if (g_conf.chdir && g_conf.chdir[0]) {
+ ::mkdir(g_conf.chdir, 0700);
+ ::chdir(g_conf.chdir);
+ }
+
+ _dout_rename_output_file();
+ } else if (g_daemon) {
+ write_pid_file(getpid());
+ }
+
+ // some debug hackery?
+ if (g_conf.kill_after)
+ g_timer.add_event_after(g_conf.kill_after, new C_Die);
+
+ // go!
+ accepter.start();
+ return 0;
+}
+
+
+/* connect_rank
+ * NOTE: assumes rank.lock held.
+ */
+SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, const Policy& p)
+{
+ assert(lock.is_locked());
+ assert(addr != rank_addr);
+
+ dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
+
+ // create pipe
+ Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
+ pipe->policy = p;
+ pipe->peer_addr = addr;
+ pipe->start_writer();
+ pipe->register_pipe();
+ pipes.insert(pipe);
+
+ return pipe;
+}
+
+
+
+
+
+
+
+
+/* register_entity
+ */
+SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name)
+{
+ dout(10) << "register_entity " << name << dendl;
+ lock.Lock();
+
+ // create messenger
+ int erank = max_local;
+ Endpoint *msgr = new Endpoint(this, name, erank);
+
+ // now i know my type.
+ if (my_type >= 0)
+ assert(my_type == name.type());
+ else
+ my_type = name.type();
+
+ // add to directory
+ max_local++;
+ local.resize(max_local);
+ stopped.resize(max_local);
+
+ msgr->get();
+ local[erank] = msgr;
+ stopped[erank] = false;
+ msgr->_myinst.addr = rank_addr;
+ if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
+ msgr->need_addr = true;
+ msgr->_myinst.addr.erank = erank;
+
+ dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr
+ << " need_addr=" << need_addr
+ << dendl;
+
+ num_local++;
+
+ lock.Unlock();
+ return msgr;
+}
+
+
+void SimpleMessenger::unregister_entity(Endpoint *msgr)
+{
+ lock.Lock();
+ dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
+
+ // remove from local directory.
+ assert(msgr->my_rank >= 0);
+ assert(local[msgr->my_rank] == msgr);
+ local[msgr->my_rank] = 0;
+ stopped[msgr->my_rank] = true;
+ num_local--;
+ msgr->my_rank = -1;
+
+ assert(msgr->nref.test() > 1);
+ msgr->put();
+
+ wait_cond.Signal();
+
+ lock.Unlock();
+}
+
+
+void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
+{
+ const entity_name_t dest = m->get_dest();
+
+ assert(m->nref.test() == 0);
+
+ m->get_header().mon_protocol = CEPH_MON_PROTOCOL;
+ m->get_header().monc_protocol = CEPH_MONC_PROTOCOL;
+ m->get_header().mds_protocol = CEPH_MDS_PROTOCOL;
+ m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL;
+ m->get_header().osd_protocol = CEPH_OSD_PROTOCOL;
+ m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL;
+
+ // lookup
+ entity_addr_t dest_proc_addr = dest_addr;
+ dest_proc_addr.erank = 0;
+
+ lock.Lock();
+ {
+ // local?
+ if (rank_addr.is_local_to(dest_addr)) {
+ if (dest_addr.erank < max_local && local[dest_addr.erank]) {
+ // local
+ dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
+ local[dest_addr.erank]->queue_message(m);
+ } else {
+ derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl;
+ //assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
+ delete m;
+ }
+ }
+ else {
+ // remote.
+ Pipe *pipe = 0;
+ if (rank_pipe.count( dest_proc_addr )) {
+ // connected?
+ pipe = rank_pipe[ dest_proc_addr ];
+ pipe->lock.Lock();
+ if (pipe->state == Pipe::STATE_CLOSED) {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+ pipe->unregister_pipe();
+ pipe->lock.Unlock();
+ pipe = 0;
+ } else {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
+
+ // if this pipe was created by an incoming connection, but we haven't received
+ // a message yet, then it won't have the policy set.
+ if (pipe->get_out_seq() == 0)
+ pipe->policy = policy_map[m->get_dest().type()];
+
+ pipe->_send(m);
+ pipe->lock.Unlock();
+ }
+ }
+ if (!pipe) {
+ if (lazy) {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
+ delete m;
+ } else {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
+ // not connected.
+ pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
+ pipe->send(m);
+ }
+ }
+ }
+ }
+
+ lock.Unlock();
+}
+
+
+
+
+
+void SimpleMessenger::wait()
+{
+ lock.Lock();
+ while (1) {
+ // reap dead pipes
+ reaper();
+
+ if (num_local == 0) {
+ dout(10) << "wait: everything stopped" << dendl;
+ break; // everything stopped.
+ } else {
+ dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
+ }
+
+ wait_cond.Wait(lock);
+ }
+ lock.Unlock();
+
+ // done! clean up.
+ dout(20) << "wait: stopping accepter thread" << dendl;
+ accepter.stop();
+ dout(20) << "wait: stopped accepter thread" << dendl;
+
+ // close+reap all pipes
+ lock.Lock();
+ {
+ dout(10) << "wait: closing pipes" << dendl;
+ list<Pipe*> toclose;
+ for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin();
+ i != rank_pipe.end();
+ i++)
+ toclose.push_back(i->second);
+ for (list<Pipe*>::iterator i = toclose.begin();
+ i != toclose.end();
+ i++) {
+ (*i)->unregister_pipe();
+ (*i)->lock.Lock();
+ (*i)->stop();
+ (*i)->lock.Unlock();
+ }
+
+ reaper();
+ dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
+ while (!pipes.empty()) {
+ wait_cond.Wait(lock);
+ reaper();
+ }
+ }
+ lock.Unlock();
+
+ dout(10) << "wait: done." << dendl;
+ dout(1) << "shutdown complete." << dendl;
+ remove_pid_file();
+ started = false;
+ my_type = -1;
+}
+
+
+
+
+void SimpleMessenger::mark_down(entity_addr_t addr)
+{
+ lock.Lock();
+ if (rank_pipe.count(addr)) {
+ Pipe *p = rank_pipe[addr];
+ dout(1) << "mark_down " << addr << " -- " << p << dendl;
+ p->unregister_pipe();
+ p->lock.Lock();
+ p->stop();
+ p->lock.Unlock();
+ } else {
+ dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
+ }
+ lock.Unlock();
+}
+
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 98f808c7ceb..d25d58189a1 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -36,7 +36,7 @@ using namespace __gnu_cxx;
/* Rank - per-process
*/
-class Rank {
+class SimpleMessenger {
public:
struct Policy {
bool lossy_tx; //
@@ -88,11 +88,11 @@ private:
// incoming
class Accepter : public Thread {
public:
- Rank *rank;
+ SimpleMessenger *rank;
bool done;
int listen_sd;
- Accepter(Rank *r) : rank(r), done(false), listen_sd(-1) {}
+ Accepter(SimpleMessenger *r) : rank(r), done(false), listen_sd(-1) {}
void *entry();
void stop();
@@ -105,7 +105,7 @@ private:
// pipe
class Pipe {
public:
- Rank *rank;
+ SimpleMessenger *rank;
ostream& _pipe_prefix();
enum {
@@ -179,10 +179,10 @@ private:
friend class Writer;
public:
- Pipe(Rank *r, int st) :
+ Pipe(SimpleMessenger *r, int st) :
rank(r),
sd(-1),
- lock("Rank::Pipe::lock"),
+ lock("SimpleMessenger::Pipe::lock"),
state(st),
reader_running(false), writer_running(false),
connect_seq(0), peer_global_seq(0),
@@ -264,7 +264,7 @@ private:
// messenger interface
class Endpoint : public Messenger {
- Rank *rank;
+ SimpleMessenger *rank;
Mutex lock;
Cond cond;
map<int, list<Message*> > dispatch_queue;
@@ -286,7 +286,7 @@ private:
} dispatch_thread;
void dispatch_entry();
- friend class Rank;
+ friend class SimpleMessenger;
public:
void queue_message(Message *m) {
@@ -331,10 +331,10 @@ private:
}
public:
- Endpoint(Rank *r, entity_name_t name, int rn) :
+ Endpoint(SimpleMessenger *r, entity_name_t name, int rn) :
Messenger(name),
rank(r),
- lock("Rank::Endpoint::lock"),
+ lock("SimpleMessenger::Endpoint::lock"),
stop(false),
qlen(0),
my_rank(rn),
@@ -373,7 +373,7 @@ private:
};
- // Rank stuff
+ // SimpleMessenger stuff
public:
Mutex lock;
Cond wait_cond; // for wait()
@@ -409,12 +409,12 @@ private:
void reaper();
public:
- Rank() : accepter(this),
- lock("Rank::lock"), started(false), need_addr(true),
+ SimpleMessenger() : accepter(this),
+ lock("SimpleMessenger::lock"), started(false), need_addr(true),
max_local(0), num_local(0),
my_type(-1),
- global_seq_lock("Rank::global_seq_lock"), global_seq(0) { }
- ~Rank() { }
+ global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) { }
+ ~SimpleMessenger() { }
//void set_listen_addr(tcpaddr_t& a);
@@ -444,8 +444,4 @@ public:
}
} ;
-
-
-extern Rank rank;
-
#endif
diff --git a/src/testmsgr.cc b/src/testmsgr.cc
index 6da4d057a99..5bb484d159d 100644
--- a/src/testmsgr.cc
+++ b/src/testmsgr.cc
@@ -83,6 +83,7 @@ int main(int argc, const char **argv, const char *envp[]) {
// start up network
g_my_addr = monmap.get_inst(whoami).addr;
+ SimpleMessenger rank;
int err = rank.bind();
if (err < 0)
return 1;