diff options
Diffstat (limited to 'src/mds/Migrator.cc')
-rw-r--r-- | src/mds/Migrator.cc | 216 |
1 files changed, 91 insertions, 125 deletions
diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 4fc8db4f6cb..c3ed9b63e01 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -38,22 +38,6 @@ #include "messages/MClientCaps.h" -#include "messages/MExportDirDiscover.h" -#include "messages/MExportDirDiscoverAck.h" -#include "messages/MExportDirCancel.h" -#include "messages/MExportDirPrep.h" -#include "messages/MExportDirPrepAck.h" -#include "messages/MExportDir.h" -#include "messages/MExportDirAck.h" -#include "messages/MExportDirNotify.h" -#include "messages/MExportDirNotifyAck.h" -#include "messages/MExportDirFinish.h" - -#include "messages/MExportCaps.h" -#include "messages/MExportCapsAck.h" -#include "messages/MGatherCaps.h" - - /* * this is what the dir->dir_auth values look like * @@ -107,60 +91,59 @@ public: } }; -/* This function DOES put the passed message before returning*/ -void Migrator::dispatch(Message *m) +void Migrator::dispatch(const Message::const_ref &m) { switch (m->get_type()) { // import case MSG_MDS_EXPORTDIRDISCOVER: - handle_export_discover(static_cast<MExportDirDiscover*>(m)); + handle_export_discover(boost::static_pointer_cast<MExportDirDiscover::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_EXPORTDIRPREP: - handle_export_prep(static_cast<MExportDirPrep*>(m)); + handle_export_prep(boost::static_pointer_cast<MExportDirPrep::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_EXPORTDIR: if (unlikely(inject_session_race)) { dout(0) << "waiting for inject_session_race" << dendl; mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m)); } else { - handle_export_dir(static_cast<MExportDir*>(m)); + handle_export_dir(boost::static_pointer_cast<MExportDir::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); } break; case MSG_MDS_EXPORTDIRFINISH: - handle_export_finish(static_cast<MExportDirFinish*>(m)); + handle_export_finish(boost::static_pointer_cast<MExportDirFinish::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_EXPORTDIRCANCEL: - handle_export_cancel(static_cast<MExportDirCancel*>(m)); + handle_export_cancel(boost::static_pointer_cast<MExportDirCancel::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; // export case MSG_MDS_EXPORTDIRDISCOVERACK: - handle_export_discover_ack(static_cast<MExportDirDiscoverAck*>(m)); + handle_export_discover_ack(boost::static_pointer_cast<MExportDirDiscoverAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_EXPORTDIRPREPACK: - handle_export_prep_ack(static_cast<MExportDirPrepAck*>(m)); + handle_export_prep_ack(boost::static_pointer_cast<MExportDirPrepAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_EXPORTDIRACK: - handle_export_ack(static_cast<MExportDirAck*>(m)); + handle_export_ack(boost::static_pointer_cast<MExportDirAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_EXPORTDIRNOTIFYACK: - handle_export_notify_ack(static_cast<MExportDirNotifyAck*>(m)); - break; + handle_export_notify_ack(boost::static_pointer_cast<MExportDirNotifyAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); + break; // export 3rd party (dir_auth adjustments) case MSG_MDS_EXPORTDIRNOTIFY: - handle_export_notify(static_cast<MExportDirNotify*>(m)); + handle_export_notify(boost::static_pointer_cast<MExportDirNotify::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; // caps case MSG_MDS_EXPORTCAPS: - handle_export_caps(static_cast<MExportCaps*>(m)); + handle_export_caps(boost::static_pointer_cast<MExportCaps::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_EXPORTCAPSACK: - handle_export_caps_ack(static_cast<MExportCapsAck*>(m)); + handle_export_caps_ack(boost::static_pointer_cast<MExportCapsAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; case MSG_MDS_GATHERCAPS: - handle_gather_caps(static_cast<MGatherCaps*>(m)); + handle_gather_caps(boost::static_pointer_cast<MGatherCaps::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m)); break; default: @@ -969,10 +952,8 @@ void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count) /* * called on receipt of MExportDirDiscoverAck * the importer now has the directory's _inode_ in memory, and pinned. - * - * This function DOES put the passed message before returning */ -void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m) +void Migrator::handle_export_discover_ack(const MExportDirDiscoverAck::const_ref &m) { CDir *dir = cache->get_dirfrag(m->get_dirfrag()); mds_rank_t dest(m->get_source().num()); @@ -1007,8 +988,6 @@ void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m) export_try_cancel(dir, false); } } - - m->put(); // done } class C_M_ExportSessionsFlushed : public MigratorContext { @@ -1290,8 +1269,7 @@ void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set) } } -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) +void Migrator::handle_export_prep_ack(const MExportDirPrepAck::const_ref &m) { CDir *dir = cache->get_dirfrag(m->get_dirfrag()); mds_rank_t dest(m->get_source().num()); @@ -1307,7 +1285,6 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) it->second.peer != mds_rank_t(m->get_source().num())) { // export must have aborted. dout(7) << "export must have aborted" << dendl; - m->put(); return; } assert(it->second.state == EXPORT_PREPPING); @@ -1315,7 +1292,6 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) if (!m->is_success()) { dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl; export_try_cancel(dir, false); - m->put(); return; } @@ -1340,8 +1316,9 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), it->second.tid, true, mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN), mds_authority_t(mds->get_nodeid(),it->second.peer)); - for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q) - notify->get_bounds().push_back((*q)->dirfrag()); + for (auto &cdir : bounds) { + notify->get_bounds().push_back(cdir->dirfrag()); + } mds->send_message_mds(notify, p.first); } @@ -1352,9 +1329,6 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) // nobody to warn? if (it->second.warning_ack_waiting.empty()) export_go(dir); // start export. - - // done. - m->put(); } @@ -1725,10 +1699,8 @@ public: /* * i should get an export_ack from the export target. - * - * This function DOES put the passed message before returning */ -void Migrator::handle_export_ack(MExportDirAck *m) +void Migrator::handle_export_ack(const MExportDirAck::const_ref &m) { CDir *dir = cache->get_dirfrag(m->get_dirfrag()); mds_rank_t dest(m->get_source().num()); @@ -1777,8 +1749,6 @@ void Migrator::handle_export_ack(MExportDirAck *m) mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir)); mds->mdlog->flush(); assert (g_conf()->mds_kill_export_at != 10); - - m->put(); } void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds) @@ -1938,10 +1908,8 @@ void Migrator::export_logged_finish(CDir *dir) * notify: * i'll get an ack from each bystander. * when i get them all, unfreeze and send the finish. - * - * This function DOES put the passed message before returning */ -void Migrator::handle_export_notify_ack(MExportDirNotifyAck *m) +void Migrator::handle_export_notify_ack(const MExportDirNotifyAck::const_ref &m) { CDir *dir = cache->get_dirfrag(m->get_dirfrag()); mds_rank_t dest(m->get_source().num()); @@ -1993,8 +1961,6 @@ void Migrator::handle_export_notify_ack(MExportDirNotifyAck *m) } } } - - m->put(); } void Migrator::export_finish(CDir *dir) @@ -2091,15 +2057,31 @@ void Migrator::export_finish(CDir *dir) +class C_MDS_ExportDiscover : public MigratorContext { +public: + C_MDS_ExportDiscover(Migrator *mig, const MExportDirDiscover::const_ref& m) : MigratorContext(mig), m(m) {} + void finish(int r) override { + mig->handle_export_discover(m, true); + } +private: + MExportDirDiscover::const_ref m; +}; - - - +class C_MDS_ExportDiscoverFactory : public MDSContextFactory { +public: + C_MDS_ExportDiscoverFactory(Migrator *mig, MExportDirDiscover::const_ref m) : mig(mig), m(m) {} + MDSInternalContextBase *build() { + return new C_MDS_ExportDiscover(mig, m); + } +private: + Migrator *mig; + MExportDirDiscover::const_ref m; +}; // ========================================================== // IMPORT -void Migrator::handle_export_discover(MExportDirDiscover *m) +void Migrator::handle_export_discover(const MExportDirDiscover::const_ref &m, bool started) { mds_rank_t from = m->get_source_mds(); assert(from != mds->get_nodeid()); @@ -2112,16 +2094,14 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) if (!mds->is_active()) { dout(7) << " not active, send NACK " << dendl; mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid(), false), from); - m->put(); return; } // only start discovering on this message once. import_state_t *p_state; map<dirfrag_t,import_state_t>::iterator it = import_state.find(df); - if (!m->started) { + if (!started) { assert(it == import_state.end()); - m->started = true; p_state = &import_state[df]; p_state->state = IMPORT_DISCOVERING; p_state->peer = from; @@ -2132,16 +2112,16 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) it->second.peer != from || it->second.tid != m->get_tid()) { dout(7) << " dropping obsolete message" << dendl; - m->put(); return; } assert(it->second.state == IMPORT_DISCOVERING); p_state = &it->second; } + C_MDS_ExportDiscoverFactory cf(this, m); if (!mds->mdcache->is_open()) { dout(5) << " waiting for root" << dendl; - mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m)); + mds->mdcache->wait_for_open(cf.build()); return; } @@ -2154,7 +2134,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) filepath fpath(m->get_path()); vector<CDentry*> trace; MDRequestRef null_ref; - int r = cache->path_traverse(null_ref, m, NULL, fpath, &trace, NULL, MDS_TRAVERSE_DISCOVER); + int r = cache->path_traverse(null_ref, cf, fpath, &trace, NULL, MDS_TRAVERSE_DISCOVER); if (r > 0) return; if (r < 0) { dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << dendl; @@ -2176,7 +2156,6 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) // reply dout(7) << " sending export_discover_ack on " << *in << dendl; mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), p_state->peer); - m->put(); assert (g_conf()->mds_kill_import_at != 2); } @@ -2200,8 +2179,7 @@ void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat) import_reverse_final(dir); } -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_cancel(MExportDirCancel *m) +void Migrator::handle_export_cancel(const MExportDirCancel::const_ref &m) { dout(7) << "handle_export_cancel on " << m->get_dirfrag() << dendl; dirfrag_t df = m->get_dirfrag(); @@ -2230,11 +2208,30 @@ void Migrator::handle_export_cancel(MExportDirCancel *m) } else { assert(0 == "got export_cancel in weird state"); } - m->put(); } -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_prep(MExportDirPrep *m) +class C_MDS_ExportPrep : public MigratorContext { +public: + C_MDS_ExportPrep(Migrator *mig, const MExportDirPrep::const_ref& m) : MigratorContext(mig), m(m) {} + void finish(int r) override { + mig->handle_export_prep(m, true); + } +private: + MExportDirPrep::const_ref m; +}; + +class C_MDS_ExportPrepFactory : public MDSContextFactory { +public: + C_MDS_ExportPrepFactory(Migrator *mig, MExportDirPrep::const_ref m) : mig(mig), m(m) {} + MDSInternalContextBase *build() { + return new C_MDS_ExportPrep(mig, m); + } +private: + Migrator *mig; + MExportDirPrep::const_ref m; +}; + +void Migrator::handle_export_prep(const MExportDirPrep::const_ref &m, bool did_assim) { mds_rank_t oldauth = mds_rank_t(m->get_source().num()); assert(oldauth != mds->get_nodeid()); @@ -2245,7 +2242,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m) // assimilate root dir. map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag()); - if (!m->did_assim()) { + if (!did_assim) { assert(it != import_state.end()); assert(it->second.state == IMPORT_DISCOVERED); assert(it->second.peer == oldauth); @@ -2259,7 +2256,6 @@ void Migrator::handle_export_prep(MExportDirPrep *m) it->second.peer != oldauth || it->second.tid != m->get_tid()) { dout(7) << "handle_export_prep obsolete message, dropping" << dendl; - m->put(); return; } assert(it->second.state == IMPORT_PREPPING); @@ -2276,17 +2272,14 @@ void Migrator::handle_export_prep(MExportDirPrep *m) // build import bound map map<inodeno_t, fragset_t> import_bound_fragset; - for (list<dirfrag_t>::iterator p = m->get_bounds().begin(); - p != m->get_bounds().end(); - ++p) { - dout(10) << " bound " << *p << dendl; - import_bound_fragset[p->ino].insert(p->frag); + for (const auto &bound : m->get_bounds()) { + dout(10) << " bound " << bound << dendl; + import_bound_fragset[bound.ino].insert(bound.frag); } // assimilate contents? - if (!m->did_assim()) { + if (!did_assim) { dout(7) << "doing assim on " << *dir << dendl; - m->mark_assim(); // only do this the first time! // change import state it->second.state = IMPORT_PREPPING; @@ -2304,15 +2297,13 @@ void Migrator::handle_export_prep(MExportDirPrep *m) // assimilate traces to exports // each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*) - for (list<bufferlist>::iterator p = m->traces.begin(); - p != m->traces.end(); - ++p) { - auto q = p->cbegin(); + for (const auto &bl : m->traces) { + auto q = bl.cbegin(); dirfrag_t df; decode(df, q); char start; decode(start, q); - dout(10) << " trace from " << df << " start " << start << " len " << p->length() << dendl; + dout(10) << " trace from " << df << " start " << start << " len " << bl.length() << dendl; CDir *cur = 0; if (start == 'd') { @@ -2356,6 +2347,8 @@ void Migrator::handle_export_prep(MExportDirPrep *m) dout(7) << " not doing assim on " << *dir << dendl; } + C_MDS_ExportPrepFactory cf(this, m); + if (!finished.empty()) mds->queue_waiters(finished); @@ -2382,8 +2375,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m) CDir *bound = cache->get_dirfrag(dirfrag_t(p->first, *q)); if (!bound) { dout(7) << " opening bounding dirfrag " << *q << " on " << *in << dendl; - cache->open_remote_dirfrag(in, *q, - new C_MDS_RetryMessage(mds, m)); + cache->open_remote_dirfrag(in, *q, cf.build()); return; } @@ -2434,8 +2426,6 @@ void Migrator::handle_export_prep(MExportDirPrep *m) mds->send_message(new MExportDirPrepAck(dir->dirfrag(), success, m->get_tid()), m->get_connection()); assert(g_conf()->mds_kill_import_at != 4); - // done - m->put(); } @@ -2456,8 +2446,7 @@ public: } }; -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_dir(MExportDir *m) +void Migrator::handle_export_dir(const MExportDir::const_ref &m) { assert (g_conf()->mds_kill_import_at != 5); CDir *dir = cache->get_dirfrag(m->dirfrag); @@ -2519,10 +2508,8 @@ void Migrator::handle_export_dir(MExportDir *m) // include bounds in EImportStart set<CDir*> import_bounds; - for (vector<dirfrag_t>::iterator p = m->bounds.begin(); - p != m->bounds.end(); - ++p) { - CDir *bd = cache->get_dirfrag(*p); + for (const auto &bound : m->bounds) { + CDir *bd = cache->get_dirfrag(bound); assert(bd); le->metablob.add_dir(bd, false); // note that parent metadata is already in the event import_bounds.insert(bd); @@ -2547,8 +2534,6 @@ void Migrator::handle_export_dir(MExportDir *m) mds->logger->inc(l_mds_imported); mds->logger->inc(l_mds_imported_inodes, num_imported_inodes); } - - m->put(); } @@ -2862,8 +2847,7 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from, cache->show_subtrees(); } -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_finish(MExportDirFinish *m) +void Migrator::handle_export_finish(const MExportDirFinish::const_ref &m) { CDir *dir = cache->get_dirfrag(m->get_dirfrag()); assert(dir); @@ -2874,8 +2858,6 @@ void Migrator::handle_export_finish(MExportDirFinish *m) assert(it->second.tid == m->get_tid()); import_finish(dir, false, m->is_last()); - - m->put(); } void Migrator::import_finish(CDir *dir, bool notify, bool last) @@ -3251,11 +3233,9 @@ int Migrator::decode_import_dir(bufferlist::const_iterator& blp, // authority bystander -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_notify(MExportDirNotify *m) +void Migrator::handle_export_notify(const MExportDirNotify::const_ref &m) { if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) { - m->put(); return; } @@ -3291,8 +3271,6 @@ void Migrator::handle_export_notify(MExportDirNotify *m) // aborted. no ack. dout(7) << "handle_export_notify no ack requested" << dendl; } - - m->put(); } /** cap exports **/ @@ -3314,8 +3292,7 @@ void Migrator::export_caps(CInode *in) mds->send_message_mds(ex, dest); } -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_caps_ack(MExportCapsAck *ack) +void Migrator::handle_export_caps_ack(const MExportCapsAck::const_ref &ack) { mds_rank_t from = ack->get_source().num(); CInode *in = cache->get_inode(ack->ino); @@ -3350,15 +3327,13 @@ void Migrator::handle_export_caps_ack(MExportCapsAck *ack) mds->locker->request_inode_file_caps(in); mds->locker->try_eval(in, CEPH_CAP_LOCKS); } - - ack->put(); } -void Migrator::handle_gather_caps(MGatherCaps *m) +void Migrator::handle_gather_caps(const MGatherCaps::const_ref &m) { CInode *in = cache->get_inode(m->ino); if (!in) - goto out; + return; dout(10) << "handle_gather_caps " << *m << " from " << m->get_source() << " on " << *in << dendl; @@ -3368,9 +3343,6 @@ void Migrator::handle_gather_caps(MGatherCaps *m) !in->is_ambiguous_auth() && !in->state_test(CInode::STATE_EXPORTINGCAPS)) export_caps(in); - -out: - m->put(); } class C_M_LoggedImportCaps : public MigratorLogContext { @@ -3386,8 +3358,7 @@ public: } }; -/* This function DOES put the passed message before returning*/ -void Migrator::handle_export_caps(MExportCaps *ex) +void Migrator::handle_export_caps(const MExportCaps::const_ref &ex) { dout(10) << "handle_export_caps " << *ex << " from " << ex->get_source() << dendl; CInode *in = cache->get_inode(ex->ino); @@ -3397,16 +3368,13 @@ void Migrator::handle_export_caps(MExportCaps *ex) // FIXME if (!in->can_auth_pin()) { - ex->put(); return; } in->auth_pin(this); - map<client_t,entity_inst_t> client_map; - map<client_t,client_metadata_t> client_metadata_map; - client_map.swap(ex->client_map); - client_metadata_map.swap(ex->client_metadata_map); + map<client_t,entity_inst_t> client_map{ex->client_map}; + map<client_t,client_metadata_t> client_metadata_map{ex->client_metadata_map}; C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps( this, in, mds_rank_t(ex->get_source().num())); @@ -3423,8 +3391,6 @@ void Migrator::handle_export_caps(MExportCaps *ex) std::move(client_metadata_map)); mds->mdlog->start_submit_entry(le, finish); mds->mdlog->flush(); - - ex->put(); } |