summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@redhat.com>2019-08-22 05:27:45 +0200
committerSage Weil <sage@redhat.com>2019-09-09 18:22:11 +0200
commit49a0573aa80bc679c8fed79e4070dad7f105dfc6 (patch)
tree266c464f33c1d0e9ad2f241cd285c39f39bcf8ce
parentosd/PeeringState: add message_map to PeeringCtx/BufferedRecoveryMessages (diff)
downloadceph-49a0573aa80bc679c8fed79e4070dad7f105dfc6.tar.xz
ceph-49a0573aa80bc679c8fed79e4070dad7f105dfc6.zip
osd/PeeringState: send notifies via message_map (not notify_list)
Queue up a separate message for each notify, and drop the old notify_list plumbing. Signed-off-by: Sage Weil <sage@redhat.com>
-rw-r--r--src/osd/OSD.cc31
-rw-r--r--src/osd/OSD.h2
-rw-r--r--src/osd/PeeringState.cc20
-rw-r--r--src/osd/PeeringState.h83
4 files changed, 49 insertions, 87 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 579a232ea78..1cbbdd61991 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -9262,7 +9262,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
} else if (!is_active()) {
dout(20) << __func__ << " not active" << dendl;
} else {
- do_notifies(ctx.notify_list, curmap);
do_queries(ctx.query_map, curmap);
do_infos(ctx.info_map, curmap);
@@ -9294,36 +9293,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
}
}
-/** do_notifies
- * Send an MOSDPGNotify to a primary, with a list of PGs that I have
- * content for, and they are primary for.
- */
-
-void OSD::do_notifies(
- map<int,vector<pg_notify_t>>& notify_list,
- OSDMapRef curmap)
-{
- for (auto& [osd, notifies] : notify_list) {
- if (!curmap->is_up(osd)) {
- dout(20) << __func__ << " skipping down osd." << osd << dendl;
- continue;
- }
- ConnectionRef con = service.get_con_osd_cluster(
- osd, curmap->get_epoch());
- if (!con) {
- dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
- continue;
- }
- service.maybe_share_map(con.get(), curmap);
- dout(7) << __func__ << " osd." << osd
- << " on " << notifies.size() << " PGs" << dendl;
- MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
- std::move(notifies));
- con->send_message(m);
- }
-}
-
-
/** do_queries
* send out pending queries for info | summaries
*/
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index c7a2750cbfe..4fa7f7db861 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -1916,8 +1916,6 @@ protected:
void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle = NULL);
void discard_context(PeeringCtx &ctx);
- void do_notifies(map<int,vector<pg_notify_t>>& notify_list,
- OSDMapRef map);
void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
OSDMapRef map);
void do_infos(map<int,vector<pg_notify_t>>& info_map,
diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc
index cd4c385ac66..1df535093ac 100644
--- a/src/osd/PeeringState.cc
+++ b/src/osd/PeeringState.cc
@@ -13,6 +13,7 @@
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGNotify.h"
#define dout_context cct
#define dout_subsys ceph_subsys_osd
@@ -20,15 +21,22 @@
BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
: query_map(std::move(ctx.query_map)),
info_map(std::move(ctx.info_map)),
- notify_list(std::move(ctx.notify_list)),
message_map(std::move(ctx.message_map))
{
ctx.query_map.clear();
ctx.info_map.clear();
- ctx.notify_list.clear();
ctx.message_map.clear();
}
+void PeeringCtxWrapper::send_notify(int to, const pg_notify_t &n)
+{
+ vector<pg_notify_t> notifies;
+ notifies.push_back(n);
+ message_map[to].push_back(
+ new MOSDPGNotify(n.epoch_sent, std::move(notifies))
+ );
+}
+
void PGPool::update(CephContext *cct, OSDMapRef map)
{
const pg_pool_t *pi = map->get_pg_pool(id);
@@ -2566,7 +2574,7 @@ void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx)
update_history(query.query.history);
fulfill_info(query.from, query.query, notify_info);
rctx.send_notify(
- notify_info.first,
+ notify_info.first.osd,
pg_notify_t(
notify_info.first.shard, pg_whoami.shard,
query.query_epoch,
@@ -4104,7 +4112,7 @@ boost::statechart::result PeeringState::Reset::react(const ActMap&)
DECLARE_LOCALS;
if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
context< PeeringMachine >().send_notify(
- ps->get_primary(),
+ ps->get_primary().osd,
pg_notify_t(
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
@@ -5690,7 +5698,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&)
DECLARE_LOCALS;
if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
context< PeeringMachine >().send_notify(
- ps->get_primary(),
+ ps->get_primary().osd,
pg_notify_t(
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
@@ -5810,7 +5818,7 @@ boost::statechart::result PeeringState::Stray::react(const ActMap&)
DECLARE_LOCALS;
if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
context< PeeringMachine >().send_notify(
- ps->get_primary(),
+ ps->get_primary().osd,
pg_notify_t(
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h
index 56a3835ccff..f60f9d66bf5 100644
--- a/src/osd/PeeringState.h
+++ b/src/osd/PeeringState.h
@@ -53,7 +53,6 @@ class PeeringCtx;
struct BufferedRecoveryMessages {
map<int, map<spg_t, pg_query_t> > query_map;
map<int, vector<pg_notify_t>> info_map;
- map<int, vector<pg_notify_t>> notify_list;
map<int, vector<MessageRef>> message_map;
BufferedRecoveryMessages() = default;
@@ -71,11 +70,6 @@ struct BufferedRecoveryMessages {
ovec.reserve(ovec.size() + ivec.size());
ovec.insert(ovec.end(), ivec.begin(), ivec.end());
}
- for (auto &[target, nlist] : m.notify_list) {
- auto &ovec = notify_list[target];
- ovec.reserve(ovec.size() + nlist.size());
- ovec.insert(ovec.end(), nlist.begin(), nlist.end());
- }
for (auto &[target, ls] : m.message_map) {
auto &ovec = message_map[target];
// put buffered messages in front
@@ -197,6 +191,40 @@ struct PeeringCtx : BufferedRecoveryMessages {
}
};
+/**
+ * Wraps PeeringCtx to hide the difference between buffering messages to
+ * be sent after flush or immediately.
+ */
+struct PeeringCtxWrapper {
+ utime_t start_time;
+ map<int, map<spg_t, pg_query_t> > &query_map;
+ map<int, vector<pg_notify_t>> &info_map;
+ map<int, vector<MessageRef>> &message_map;
+ ObjectStore::Transaction &transaction;
+ HBHandle * const handle = nullptr;
+
+ PeeringCtxWrapper(PeeringCtx &wrapped) :
+ query_map(wrapped.query_map),
+ info_map(wrapped.info_map),
+ message_map(wrapped.message_map),
+ transaction(wrapped.transaction),
+ handle(wrapped.handle) {}
+
+ PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
+ : query_map(buf.query_map),
+ info_map(buf.info_map),
+ message_map(buf.message_map),
+ transaction(wrapped.transaction),
+ handle(wrapped.handle) {}
+
+ PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
+
+ void send_osd_message(int target, Message *m) {
+ message_map[target].push_back(m);
+ }
+ void send_notify(int to, const pg_notify_t &n);
+};
+
/* Encapsulates PG recovery process */
class PeeringState : public MissingLoc::MappingInfo {
public:
@@ -372,47 +400,6 @@ public:
virtual ~PeeringListener() {}
};
-private:
- /**
- * Wraps PeeringCtx to hide the difference between buffering messages to
- * be sent after flush or immediately.
- */
- struct PeeringCtxWrapper {
- utime_t start_time;
- map<int, map<spg_t, pg_query_t> > &query_map;
- map<int, vector<pg_notify_t>> &info_map;
- map<int, vector<pg_notify_t>> &notify_list;
- map<int, vector<MessageRef>> &message_map;
- ObjectStore::Transaction &transaction;
- HBHandle * const handle = nullptr;
-
- PeeringCtxWrapper(PeeringCtx &wrapped) :
- query_map(wrapped.query_map),
- info_map(wrapped.info_map),
- notify_list(wrapped.notify_list),
- message_map(wrapped.message_map),
- transaction(wrapped.transaction),
- handle(wrapped.handle) {}
-
- PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
- : query_map(buf.query_map),
- info_map(buf.info_map),
- notify_list(buf.notify_list),
- message_map(buf.message_map),
- transaction(wrapped.transaction),
- handle(wrapped.handle) {}
-
- PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
-
- void send_osd_message(int target, Message *m) {
- message_map[target].push_back(m);
- }
- void send_notify(pg_shard_t to, const pg_notify_t &n) {
- notify_list[to.osd].emplace_back(n);
- }
- };
-public:
-
struct QueryState : boost::statechart::event< QueryState > {
Formatter *f;
explicit QueryState(Formatter *f) : f(f) {}
@@ -594,7 +581,7 @@ public:
return *(state->rctx);
}
- void send_notify(pg_shard_t to, const pg_notify_t &n) {
+ void send_notify(int to, const pg_notify_t &n) {
ceph_assert(state->rctx);
state->rctx->send_notify(to, n);
}