summaryrefslogtreecommitdiffstats
path: root/src/mds/Migrator.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds/Migrator.cc')
-rw-r--r--src/mds/Migrator.cc216
1 files changed, 91 insertions, 125 deletions
diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc
index 4fc8db4f6cb..c3ed9b63e01 100644
--- a/src/mds/Migrator.cc
+++ b/src/mds/Migrator.cc
@@ -38,22 +38,6 @@
#include "messages/MClientCaps.h"
-#include "messages/MExportDirDiscover.h"
-#include "messages/MExportDirDiscoverAck.h"
-#include "messages/MExportDirCancel.h"
-#include "messages/MExportDirPrep.h"
-#include "messages/MExportDirPrepAck.h"
-#include "messages/MExportDir.h"
-#include "messages/MExportDirAck.h"
-#include "messages/MExportDirNotify.h"
-#include "messages/MExportDirNotifyAck.h"
-#include "messages/MExportDirFinish.h"
-
-#include "messages/MExportCaps.h"
-#include "messages/MExportCapsAck.h"
-#include "messages/MGatherCaps.h"
-
-
/*
* this is what the dir->dir_auth values look like
*
@@ -107,60 +91,59 @@ public:
}
};
-/* This function DOES put the passed message before returning*/
-void Migrator::dispatch(Message *m)
+void Migrator::dispatch(const Message::const_ref &m)
{
switch (m->get_type()) {
// import
case MSG_MDS_EXPORTDIRDISCOVER:
- handle_export_discover(static_cast<MExportDirDiscover*>(m));
+ handle_export_discover(boost::static_pointer_cast<MExportDirDiscover::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_EXPORTDIRPREP:
- handle_export_prep(static_cast<MExportDirPrep*>(m));
+ handle_export_prep(boost::static_pointer_cast<MExportDirPrep::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_EXPORTDIR:
if (unlikely(inject_session_race)) {
dout(0) << "waiting for inject_session_race" << dendl;
mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
} else {
- handle_export_dir(static_cast<MExportDir*>(m));
+ handle_export_dir(boost::static_pointer_cast<MExportDir::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
}
break;
case MSG_MDS_EXPORTDIRFINISH:
- handle_export_finish(static_cast<MExportDirFinish*>(m));
+ handle_export_finish(boost::static_pointer_cast<MExportDirFinish::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_EXPORTDIRCANCEL:
- handle_export_cancel(static_cast<MExportDirCancel*>(m));
+ handle_export_cancel(boost::static_pointer_cast<MExportDirCancel::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
// export
case MSG_MDS_EXPORTDIRDISCOVERACK:
- handle_export_discover_ack(static_cast<MExportDirDiscoverAck*>(m));
+ handle_export_discover_ack(boost::static_pointer_cast<MExportDirDiscoverAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_EXPORTDIRPREPACK:
- handle_export_prep_ack(static_cast<MExportDirPrepAck*>(m));
+ handle_export_prep_ack(boost::static_pointer_cast<MExportDirPrepAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_EXPORTDIRACK:
- handle_export_ack(static_cast<MExportDirAck*>(m));
+ handle_export_ack(boost::static_pointer_cast<MExportDirAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_EXPORTDIRNOTIFYACK:
- handle_export_notify_ack(static_cast<MExportDirNotifyAck*>(m));
- break;
+ handle_export_notify_ack(boost::static_pointer_cast<MExportDirNotifyAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
+ break;
// export 3rd party (dir_auth adjustments)
case MSG_MDS_EXPORTDIRNOTIFY:
- handle_export_notify(static_cast<MExportDirNotify*>(m));
+ handle_export_notify(boost::static_pointer_cast<MExportDirNotify::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
// caps
case MSG_MDS_EXPORTCAPS:
- handle_export_caps(static_cast<MExportCaps*>(m));
+ handle_export_caps(boost::static_pointer_cast<MExportCaps::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_EXPORTCAPSACK:
- handle_export_caps_ack(static_cast<MExportCapsAck*>(m));
+ handle_export_caps_ack(boost::static_pointer_cast<MExportCapsAck::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
case MSG_MDS_GATHERCAPS:
- handle_gather_caps(static_cast<MGatherCaps*>(m));
+ handle_gather_caps(boost::static_pointer_cast<MGatherCaps::const_ref::element_type, std::remove_reference<decltype(m)>::type::element_type>(m));
break;
default:
@@ -969,10 +952,8 @@ void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
/*
* called on receipt of MExportDirDiscoverAck
* the importer now has the directory's _inode_ in memory, and pinned.
- *
- * This function DOES put the passed message before returning
*/
-void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
+void Migrator::handle_export_discover_ack(const MExportDirDiscoverAck::const_ref &m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
mds_rank_t dest(m->get_source().num());
@@ -1007,8 +988,6 @@ void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
export_try_cancel(dir, false);
}
}
-
- m->put(); // done
}
class C_M_ExportSessionsFlushed : public MigratorContext {
@@ -1290,8 +1269,7 @@ void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
}
}
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
+void Migrator::handle_export_prep_ack(const MExportDirPrepAck::const_ref &m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
mds_rank_t dest(m->get_source().num());
@@ -1307,7 +1285,6 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
it->second.peer != mds_rank_t(m->get_source().num())) {
// export must have aborted.
dout(7) << "export must have aborted" << dendl;
- m->put();
return;
}
assert(it->second.state == EXPORT_PREPPING);
@@ -1315,7 +1292,6 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
if (!m->is_success()) {
dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl;
export_try_cancel(dir, false);
- m->put();
return;
}
@@ -1340,8 +1316,9 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), it->second.tid, true,
mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
mds_authority_t(mds->get_nodeid(),it->second.peer));
- for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
- notify->get_bounds().push_back((*q)->dirfrag());
+ for (auto &cdir : bounds) {
+ notify->get_bounds().push_back(cdir->dirfrag());
+ }
mds->send_message_mds(notify, p.first);
}
@@ -1352,9 +1329,6 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
// nobody to warn?
if (it->second.warning_ack_waiting.empty())
export_go(dir); // start export.
-
- // done.
- m->put();
}
@@ -1725,10 +1699,8 @@ public:
/*
* i should get an export_ack from the export target.
- *
- * This function DOES put the passed message before returning
*/
-void Migrator::handle_export_ack(MExportDirAck *m)
+void Migrator::handle_export_ack(const MExportDirAck::const_ref &m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
mds_rank_t dest(m->get_source().num());
@@ -1777,8 +1749,6 @@ void Migrator::handle_export_ack(MExportDirAck *m)
mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
mds->mdlog->flush();
assert (g_conf()->mds_kill_export_at != 10);
-
- m->put();
}
void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
@@ -1938,10 +1908,8 @@ void Migrator::export_logged_finish(CDir *dir)
* notify:
* i'll get an ack from each bystander.
* when i get them all, unfreeze and send the finish.
- *
- * This function DOES put the passed message before returning
*/
-void Migrator::handle_export_notify_ack(MExportDirNotifyAck *m)
+void Migrator::handle_export_notify_ack(const MExportDirNotifyAck::const_ref &m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
mds_rank_t dest(m->get_source().num());
@@ -1993,8 +1961,6 @@ void Migrator::handle_export_notify_ack(MExportDirNotifyAck *m)
}
}
}
-
- m->put();
}
void Migrator::export_finish(CDir *dir)
@@ -2091,15 +2057,31 @@ void Migrator::export_finish(CDir *dir)
+class C_MDS_ExportDiscover : public MigratorContext {
+public:
+ C_MDS_ExportDiscover(Migrator *mig, const MExportDirDiscover::const_ref& m) : MigratorContext(mig), m(m) {}
+ void finish(int r) override {
+ mig->handle_export_discover(m, true);
+ }
+private:
+ MExportDirDiscover::const_ref m;
+};
-
-
-
+class C_MDS_ExportDiscoverFactory : public MDSContextFactory {
+public:
+ C_MDS_ExportDiscoverFactory(Migrator *mig, MExportDirDiscover::const_ref m) : mig(mig), m(m) {}
+ MDSInternalContextBase *build() {
+ return new C_MDS_ExportDiscover(mig, m);
+ }
+private:
+ Migrator *mig;
+ MExportDirDiscover::const_ref m;
+};
// ==========================================================
// IMPORT
-void Migrator::handle_export_discover(MExportDirDiscover *m)
+void Migrator::handle_export_discover(const MExportDirDiscover::const_ref &m, bool started)
{
mds_rank_t from = m->get_source_mds();
assert(from != mds->get_nodeid());
@@ -2112,16 +2094,14 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
if (!mds->is_active()) {
dout(7) << " not active, send NACK " << dendl;
mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid(), false), from);
- m->put();
return;
}
// only start discovering on this message once.
import_state_t *p_state;
map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
- if (!m->started) {
+ if (!started) {
assert(it == import_state.end());
- m->started = true;
p_state = &import_state[df];
p_state->state = IMPORT_DISCOVERING;
p_state->peer = from;
@@ -2132,16 +2112,16 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
it->second.peer != from ||
it->second.tid != m->get_tid()) {
dout(7) << " dropping obsolete message" << dendl;
- m->put();
return;
}
assert(it->second.state == IMPORT_DISCOVERING);
p_state = &it->second;
}
+ C_MDS_ExportDiscoverFactory cf(this, m);
if (!mds->mdcache->is_open()) {
dout(5) << " waiting for root" << dendl;
- mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
+ mds->mdcache->wait_for_open(cf.build());
return;
}
@@ -2154,7 +2134,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
filepath fpath(m->get_path());
vector<CDentry*> trace;
MDRequestRef null_ref;
- int r = cache->path_traverse(null_ref, m, NULL, fpath, &trace, NULL, MDS_TRAVERSE_DISCOVER);
+ int r = cache->path_traverse(null_ref, cf, fpath, &trace, NULL, MDS_TRAVERSE_DISCOVER);
if (r > 0) return;
if (r < 0) {
dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
@@ -2176,7 +2156,6 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
// reply
dout(7) << " sending export_discover_ack on " << *in << dendl;
mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), p_state->peer);
- m->put();
assert (g_conf()->mds_kill_import_at != 2);
}
@@ -2200,8 +2179,7 @@ void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
import_reverse_final(dir);
}
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_cancel(MExportDirCancel *m)
+void Migrator::handle_export_cancel(const MExportDirCancel::const_ref &m)
{
dout(7) << "handle_export_cancel on " << m->get_dirfrag() << dendl;
dirfrag_t df = m->get_dirfrag();
@@ -2230,11 +2208,30 @@ void Migrator::handle_export_cancel(MExportDirCancel *m)
} else {
assert(0 == "got export_cancel in weird state");
}
- m->put();
}
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_prep(MExportDirPrep *m)
+class C_MDS_ExportPrep : public MigratorContext {
+public:
+ C_MDS_ExportPrep(Migrator *mig, const MExportDirPrep::const_ref& m) : MigratorContext(mig), m(m) {}
+ void finish(int r) override {
+ mig->handle_export_prep(m, true);
+ }
+private:
+ MExportDirPrep::const_ref m;
+};
+
+class C_MDS_ExportPrepFactory : public MDSContextFactory {
+public:
+ C_MDS_ExportPrepFactory(Migrator *mig, MExportDirPrep::const_ref m) : mig(mig), m(m) {}
+ MDSInternalContextBase *build() {
+ return new C_MDS_ExportPrep(mig, m);
+ }
+private:
+ Migrator *mig;
+ MExportDirPrep::const_ref m;
+};
+
+void Migrator::handle_export_prep(const MExportDirPrep::const_ref &m, bool did_assim)
{
mds_rank_t oldauth = mds_rank_t(m->get_source().num());
assert(oldauth != mds->get_nodeid());
@@ -2245,7 +2242,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
// assimilate root dir.
map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
- if (!m->did_assim()) {
+ if (!did_assim) {
assert(it != import_state.end());
assert(it->second.state == IMPORT_DISCOVERED);
assert(it->second.peer == oldauth);
@@ -2259,7 +2256,6 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
it->second.peer != oldauth ||
it->second.tid != m->get_tid()) {
dout(7) << "handle_export_prep obsolete message, dropping" << dendl;
- m->put();
return;
}
assert(it->second.state == IMPORT_PREPPING);
@@ -2276,17 +2272,14 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
// build import bound map
map<inodeno_t, fragset_t> import_bound_fragset;
- for (list<dirfrag_t>::iterator p = m->get_bounds().begin();
- p != m->get_bounds().end();
- ++p) {
- dout(10) << " bound " << *p << dendl;
- import_bound_fragset[p->ino].insert(p->frag);
+ for (const auto &bound : m->get_bounds()) {
+ dout(10) << " bound " << bound << dendl;
+ import_bound_fragset[bound.ino].insert(bound.frag);
}
// assimilate contents?
- if (!m->did_assim()) {
+ if (!did_assim) {
dout(7) << "doing assim on " << *dir << dendl;
- m->mark_assim(); // only do this the first time!
// change import state
it->second.state = IMPORT_PREPPING;
@@ -2304,15 +2297,13 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
// assimilate traces to exports
// each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
- for (list<bufferlist>::iterator p = m->traces.begin();
- p != m->traces.end();
- ++p) {
- auto q = p->cbegin();
+ for (const auto &bl : m->traces) {
+ auto q = bl.cbegin();
dirfrag_t df;
decode(df, q);
char start;
decode(start, q);
- dout(10) << " trace from " << df << " start " << start << " len " << p->length() << dendl;
+ dout(10) << " trace from " << df << " start " << start << " len " << bl.length() << dendl;
CDir *cur = 0;
if (start == 'd') {
@@ -2356,6 +2347,8 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
dout(7) << " not doing assim on " << *dir << dendl;
}
+ C_MDS_ExportPrepFactory cf(this, m);
+
if (!finished.empty())
mds->queue_waiters(finished);
@@ -2382,8 +2375,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
CDir *bound = cache->get_dirfrag(dirfrag_t(p->first, *q));
if (!bound) {
dout(7) << " opening bounding dirfrag " << *q << " on " << *in << dendl;
- cache->open_remote_dirfrag(in, *q,
- new C_MDS_RetryMessage(mds, m));
+ cache->open_remote_dirfrag(in, *q, cf.build());
return;
}
@@ -2434,8 +2426,6 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
mds->send_message(new MExportDirPrepAck(dir->dirfrag(), success, m->get_tid()), m->get_connection());
assert(g_conf()->mds_kill_import_at != 4);
- // done
- m->put();
}
@@ -2456,8 +2446,7 @@ public:
}
};
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_dir(MExportDir *m)
+void Migrator::handle_export_dir(const MExportDir::const_ref &m)
{
assert (g_conf()->mds_kill_import_at != 5);
CDir *dir = cache->get_dirfrag(m->dirfrag);
@@ -2519,10 +2508,8 @@ void Migrator::handle_export_dir(MExportDir *m)
// include bounds in EImportStart
set<CDir*> import_bounds;
- for (vector<dirfrag_t>::iterator p = m->bounds.begin();
- p != m->bounds.end();
- ++p) {
- CDir *bd = cache->get_dirfrag(*p);
+ for (const auto &bound : m->bounds) {
+ CDir *bd = cache->get_dirfrag(bound);
assert(bd);
le->metablob.add_dir(bd, false); // note that parent metadata is already in the event
import_bounds.insert(bd);
@@ -2547,8 +2534,6 @@ void Migrator::handle_export_dir(MExportDir *m)
mds->logger->inc(l_mds_imported);
mds->logger->inc(l_mds_imported_inodes, num_imported_inodes);
}
-
- m->put();
}
@@ -2862,8 +2847,7 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
cache->show_subtrees();
}
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_finish(MExportDirFinish *m)
+void Migrator::handle_export_finish(const MExportDirFinish::const_ref &m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
assert(dir);
@@ -2874,8 +2858,6 @@ void Migrator::handle_export_finish(MExportDirFinish *m)
assert(it->second.tid == m->get_tid());
import_finish(dir, false, m->is_last());
-
- m->put();
}
void Migrator::import_finish(CDir *dir, bool notify, bool last)
@@ -3251,11 +3233,9 @@ int Migrator::decode_import_dir(bufferlist::const_iterator& blp,
// authority bystander
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_notify(MExportDirNotify *m)
+void Migrator::handle_export_notify(const MExportDirNotify::const_ref &m)
{
if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
- m->put();
return;
}
@@ -3291,8 +3271,6 @@ void Migrator::handle_export_notify(MExportDirNotify *m)
// aborted. no ack.
dout(7) << "handle_export_notify no ack requested" << dendl;
}
-
- m->put();
}
/** cap exports **/
@@ -3314,8 +3292,7 @@ void Migrator::export_caps(CInode *in)
mds->send_message_mds(ex, dest);
}
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_caps_ack(MExportCapsAck *ack)
+void Migrator::handle_export_caps_ack(const MExportCapsAck::const_ref &ack)
{
mds_rank_t from = ack->get_source().num();
CInode *in = cache->get_inode(ack->ino);
@@ -3350,15 +3327,13 @@ void Migrator::handle_export_caps_ack(MExportCapsAck *ack)
mds->locker->request_inode_file_caps(in);
mds->locker->try_eval(in, CEPH_CAP_LOCKS);
}
-
- ack->put();
}
-void Migrator::handle_gather_caps(MGatherCaps *m)
+void Migrator::handle_gather_caps(const MGatherCaps::const_ref &m)
{
CInode *in = cache->get_inode(m->ino);
if (!in)
- goto out;
+ return;
dout(10) << "handle_gather_caps " << *m << " from " << m->get_source()
<< " on " << *in << dendl;
@@ -3368,9 +3343,6 @@ void Migrator::handle_gather_caps(MGatherCaps *m)
!in->is_ambiguous_auth() &&
!in->state_test(CInode::STATE_EXPORTINGCAPS))
export_caps(in);
-
-out:
- m->put();
}
class C_M_LoggedImportCaps : public MigratorLogContext {
@@ -3386,8 +3358,7 @@ public:
}
};
-/* This function DOES put the passed message before returning*/
-void Migrator::handle_export_caps(MExportCaps *ex)
+void Migrator::handle_export_caps(const MExportCaps::const_ref &ex)
{
dout(10) << "handle_export_caps " << *ex << " from " << ex->get_source() << dendl;
CInode *in = cache->get_inode(ex->ino);
@@ -3397,16 +3368,13 @@ void Migrator::handle_export_caps(MExportCaps *ex)
// FIXME
if (!in->can_auth_pin()) {
- ex->put();
return;
}
in->auth_pin(this);
- map<client_t,entity_inst_t> client_map;
- map<client_t,client_metadata_t> client_metadata_map;
- client_map.swap(ex->client_map);
- client_metadata_map.swap(ex->client_metadata_map);
+ map<client_t,entity_inst_t> client_map{ex->client_map};
+ map<client_t,client_metadata_t> client_metadata_map{ex->client_metadata_map};
C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
this, in, mds_rank_t(ex->get_source().num()));
@@ -3423,8 +3391,6 @@ void Migrator::handle_export_caps(MExportCaps *ex)
std::move(client_metadata_map));
mds->mdlog->start_submit_entry(le, finish);
mds->mdlog->flush();
-
- ex->put();
}