diff options
author | Sage Weil <sage@redhat.com> | 2019-08-22 05:27:45 +0200 |
---|---|---|
committer | Sage Weil <sage@redhat.com> | 2019-09-09 18:22:11 +0200 |
commit | 49a0573aa80bc679c8fed79e4070dad7f105dfc6 (patch) | |
tree | 266c464f33c1d0e9ad2f241cd285c39f39bcf8ce | |
parent | osd/PeeringState: add message_map to PeeringCtx/BufferedRecoveryMessages (diff) | |
download | ceph-49a0573aa80bc679c8fed79e4070dad7f105dfc6.tar.xz ceph-49a0573aa80bc679c8fed79e4070dad7f105dfc6.zip |
osd/PeeringState: send notifies via message_map (not notify_list)
Queue up a separate message for each notify, and drop the old notify_list
plumbing.
Signed-off-by: Sage Weil <sage@redhat.com>
-rw-r--r-- | src/osd/OSD.cc | 31 | ||||
-rw-r--r-- | src/osd/OSD.h | 2 | ||||
-rw-r--r-- | src/osd/PeeringState.cc | 20 | ||||
-rw-r--r-- | src/osd/PeeringState.h | 83 |
4 files changed, 49 insertions, 87 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 579a232ea78..1cbbdd61991 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -9262,7 +9262,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, } else if (!is_active()) { dout(20) << __func__ << " not active" << dendl; } else { - do_notifies(ctx.notify_list, curmap); do_queries(ctx.query_map, curmap); do_infos(ctx.info_map, curmap); @@ -9294,36 +9293,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, } } -/** do_notifies - * Send an MOSDPGNotify to a primary, with a list of PGs that I have - * content for, and they are primary for. - */ - -void OSD::do_notifies( - map<int,vector<pg_notify_t>>& notify_list, - OSDMapRef curmap) -{ - for (auto& [osd, notifies] : notify_list) { - if (!curmap->is_up(osd)) { - dout(20) << __func__ << " skipping down osd." << osd << dendl; - continue; - } - ConnectionRef con = service.get_con_osd_cluster( - osd, curmap->get_epoch()); - if (!con) { - dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl; - continue; - } - service.maybe_share_map(con.get(), curmap); - dout(7) << __func__ << " osd." << osd - << " on " << notifies.size() << " PGs" << dendl; - MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(), - std::move(notifies)); - con->send_message(m); - } -} - - /** do_queries * send out pending queries for info | summaries */ diff --git a/src/osd/OSD.h b/src/osd/OSD.h index c7a2750cbfe..4fa7f7db861 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1916,8 +1916,6 @@ protected: void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, ThreadPool::TPHandle *handle = NULL); void discard_context(PeeringCtx &ctx); - void do_notifies(map<int,vector<pg_notify_t>>& notify_list, - OSDMapRef map); void do_queries(map<int, map<spg_t,pg_query_t> >& query_map, OSDMapRef map); void do_infos(map<int,vector<pg_notify_t>>& info_map, diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index cd4c385ac66..1df535093ac 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -13,6 +13,7 @@ #include "messages/MOSDPGInfo.h" #include "messages/MOSDPGTrim.h" #include "messages/MOSDPGLog.h" +#include "messages/MOSDPGNotify.h" #define dout_context cct #define dout_subsys ceph_subsys_osd @@ -20,15 +21,22 @@ BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx) : query_map(std::move(ctx.query_map)), info_map(std::move(ctx.info_map)), - notify_list(std::move(ctx.notify_list)), message_map(std::move(ctx.message_map)) { ctx.query_map.clear(); ctx.info_map.clear(); - ctx.notify_list.clear(); ctx.message_map.clear(); } +void PeeringCtxWrapper::send_notify(int to, const pg_notify_t &n) +{ + vector<pg_notify_t> notifies; + notifies.push_back(n); + message_map[to].push_back( + new MOSDPGNotify(n.epoch_sent, std::move(notifies)) + ); +} + void PGPool::update(CephContext *cct, OSDMapRef map) { const pg_pool_t *pi = map->get_pg_pool(id); @@ -2566,7 +2574,7 @@ void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx) update_history(query.query.history); fulfill_info(query.from, query.query, notify_info); rctx.send_notify( - notify_info.first, + notify_info.first.osd, pg_notify_t( notify_info.first.shard, pg_whoami.shard, query.query_epoch, @@ -4104,7 +4112,7 @@ boost::statechart::result PeeringState::Reset::react(const ActMap&) DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { context< PeeringMachine >().send_notify( - ps->get_primary(), + ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), @@ -5690,7 +5698,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&) DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { context< PeeringMachine >().send_notify( - ps->get_primary(), + ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), @@ -5810,7 +5818,7 @@ boost::statechart::result PeeringState::Stray::react(const ActMap&) DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { context< PeeringMachine >().send_notify( - ps->get_primary(), + ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 56a3835ccff..f60f9d66bf5 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -53,7 +53,6 @@ class PeeringCtx; struct BufferedRecoveryMessages { map<int, map<spg_t, pg_query_t> > query_map; map<int, vector<pg_notify_t>> info_map; - map<int, vector<pg_notify_t>> notify_list; map<int, vector<MessageRef>> message_map; BufferedRecoveryMessages() = default; @@ -71,11 +70,6 @@ struct BufferedRecoveryMessages { ovec.reserve(ovec.size() + ivec.size()); ovec.insert(ovec.end(), ivec.begin(), ivec.end()); } - for (auto &[target, nlist] : m.notify_list) { - auto &ovec = notify_list[target]; - ovec.reserve(ovec.size() + nlist.size()); - ovec.insert(ovec.end(), nlist.begin(), nlist.end()); - } for (auto &[target, ls] : m.message_map) { auto &ovec = message_map[target]; // put buffered messages in front @@ -197,6 +191,40 @@ struct PeeringCtx : BufferedRecoveryMessages { } }; +/** + * Wraps PeeringCtx to hide the difference between buffering messages to + * be sent after flush or immediately. + */ +struct PeeringCtxWrapper { + utime_t start_time; + map<int, map<spg_t, pg_query_t> > &query_map; + map<int, vector<pg_notify_t>> &info_map; + map<int, vector<MessageRef>> &message_map; + ObjectStore::Transaction &transaction; + HBHandle * const handle = nullptr; + + PeeringCtxWrapper(PeeringCtx &wrapped) : + query_map(wrapped.query_map), + info_map(wrapped.info_map), + message_map(wrapped.message_map), + transaction(wrapped.transaction), + handle(wrapped.handle) {} + + PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped) + : query_map(buf.query_map), + info_map(buf.info_map), + message_map(buf.message_map), + transaction(wrapped.transaction), + handle(wrapped.handle) {} + + PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default; + + void send_osd_message(int target, Message *m) { + message_map[target].push_back(m); + } + void send_notify(int to, const pg_notify_t &n); +}; + /* Encapsulates PG recovery process */ class PeeringState : public MissingLoc::MappingInfo { public: @@ -372,47 +400,6 @@ public: virtual ~PeeringListener() {} }; -private: - /** - * Wraps PeeringCtx to hide the difference between buffering messages to - * be sent after flush or immediately. - */ - struct PeeringCtxWrapper { - utime_t start_time; - map<int, map<spg_t, pg_query_t> > &query_map; - map<int, vector<pg_notify_t>> &info_map; - map<int, vector<pg_notify_t>> ¬ify_list; - map<int, vector<MessageRef>> &message_map; - ObjectStore::Transaction &transaction; - HBHandle * const handle = nullptr; - - PeeringCtxWrapper(PeeringCtx &wrapped) : - query_map(wrapped.query_map), - info_map(wrapped.info_map), - notify_list(wrapped.notify_list), - message_map(wrapped.message_map), - transaction(wrapped.transaction), - handle(wrapped.handle) {} - - PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped) - : query_map(buf.query_map), - info_map(buf.info_map), - notify_list(buf.notify_list), - message_map(buf.message_map), - transaction(wrapped.transaction), - handle(wrapped.handle) {} - - PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default; - - void send_osd_message(int target, Message *m) { - message_map[target].push_back(m); - } - void send_notify(pg_shard_t to, const pg_notify_t &n) { - notify_list[to.osd].emplace_back(n); - } - }; -public: - struct QueryState : boost::statechart::event< QueryState > { Formatter *f; explicit QueryState(Formatter *f) : f(f) {} @@ -594,7 +581,7 @@ public: return *(state->rctx); } - void send_notify(pg_shard_t to, const pg_notify_t &n) { + void send_notify(int to, const pg_notify_t &n) { ceph_assert(state->rctx); state->rctx->send_notify(to, n); } |