summaryrefslogtreecommitdiffstats
path: root/src/msg
diff options
context:
space:
mode:
authorPatrick Donnelly <pdonnell@redhat.com>2024-05-23 03:16:32 +0200
committerPatrick Donnelly <pdonnell@redhat.com>2024-05-23 03:16:32 +0200
commiteb10f3df6e5116a020011365579584449ac76f04 (patch)
tree1e87396bd761b4094d7b5d028d352926a1e52ac3 /src/msg
parentMerge PR #57392 into main (diff)
parentmds: set dispatcher order (diff)
downloadceph-eb10f3df6e5116a020011365579584449ac76f04.tar.xz
ceph-eb10f3df6e5116a020011365579584449ac76f04.zip
Merge PR #57469 into main
* refs/pull/57469/head: mds: set dispatcher order mds: use regular dispatch for processing beacons msg: add priority to dispatcher invocation order mds: note when dispatcher is called Reviewed-by: Leonid Usov <leonid.usov@ibm.com>
Diffstat (limited to 'src/msg')
-rw-r--r--src/msg/Dispatcher.h6
-rw-r--r--src/msg/Messenger.h82
2 files changed, 57 insertions, 31 deletions
diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h
index ddf273288b3..799a4a9de91 100644
--- a/src/msg/Dispatcher.h
+++ b/src/msg/Dispatcher.h
@@ -29,6 +29,12 @@ class KeyStore;
class Dispatcher {
public:
+ /* Ordering of dispatch for a list of Dispatchers. */
+ using priority_t = uint32_t;
+ static constexpr priority_t PRIORITY_HIGH = std::numeric_limits<priority_t>::max() / 4;
+ static constexpr priority_t PRIORITY_DEFAULT = std::numeric_limits<priority_t>::max() / 2;
+ static constexpr priority_t PRIORITY_LOW = (std::numeric_limits<priority_t>::max() / 4) * 3;
+
explicit Dispatcher(CephContext *cct_)
: cct(cct_)
{
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index fe8d7a72b38..830ae9050bb 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -17,9 +17,9 @@
#ifndef CEPH_MESSENGER_H
#define CEPH_MESSENGER_H
-#include <deque>
#include <map>
#include <optional>
+#include <vector>
#include <errno.h>
#include <sstream>
@@ -92,8 +92,18 @@ struct Interceptor {
class Messenger {
private:
- std::deque<Dispatcher*> dispatchers;
- std::deque<Dispatcher*> fast_dispatchers;
+ struct PriorityDispatcher {
+ using priority_t = Dispatcher::priority_t;
+ priority_t priority;
+ Dispatcher* dispatcher;
+
+ bool operator<(const PriorityDispatcher& other) const {
+ return priority < other.priority;
+ }
+ };
+ std::vector<PriorityDispatcher> dispatchers;
+ std::vector<PriorityDispatcher> fast_dispatchers;
+
ZTracer::Endpoint trace_endpoint;
protected:
@@ -389,11 +399,14 @@ public:
*
* @param d The Dispatcher to insert into the list.
*/
- void add_dispatcher_head(Dispatcher *d) {
+ void add_dispatcher_head(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
bool first = dispatchers.empty();
- dispatchers.push_front(d);
- if (d->ms_can_fast_dispatch_any())
- fast_dispatchers.push_front(d);
+ dispatchers.insert(dispatchers.begin(), PriorityDispatcher{priority, d});
+ std::stable_sort(dispatchers.begin(), dispatchers.end());
+ if (d->ms_can_fast_dispatch_any()) {
+ fast_dispatchers.insert(fast_dispatchers.begin(), PriorityDispatcher{priority, d});
+ std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
+ }
if (first)
ready();
}
@@ -404,11 +417,14 @@ public:
*
* @param d The Dispatcher to insert into the list.
*/
- void add_dispatcher_tail(Dispatcher *d) {
+ void add_dispatcher_tail(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
bool first = dispatchers.empty();
- dispatchers.push_back(d);
- if (d->ms_can_fast_dispatch_any())
- fast_dispatchers.push_back(d);
+ dispatchers.push_back(PriorityDispatcher{priority, d});
+ std::stable_sort(dispatchers.begin(), dispatchers.end());
+ if (d->ms_can_fast_dispatch_any()) {
+ fast_dispatchers.push_back(PriorityDispatcher{priority, d});
+ std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
+ }
if (first)
ready();
}
@@ -667,9 +683,10 @@ public:
* @param m The Message we are testing.
*/
bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
- for (const auto &dispatcher : fast_dispatchers) {
- if (dispatcher->ms_can_fast_dispatch2(m))
- return true;
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
+ if (dispatcher->ms_can_fast_dispatch2(m)) {
+ return true;
+ }
}
return false;
}
@@ -682,10 +699,10 @@ public:
*/
void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
- for (const auto &dispatcher : fast_dispatchers) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
if (dispatcher->ms_can_fast_dispatch2(m)) {
- dispatcher->ms_fast_dispatch2(m);
- return;
+ dispatcher->ms_fast_dispatch2(m);
+ return;
}
}
ceph_abort();
@@ -697,7 +714,7 @@ public:
*
*/
void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
- for (const auto &dispatcher : fast_dispatchers) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
dispatcher->ms_fast_preprocess2(m);
}
}
@@ -710,9 +727,10 @@ public:
*/
void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
- for (const auto &dispatcher : dispatchers) {
- if (dispatcher->ms_dispatch2(m))
- return;
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+ if (dispatcher->ms_dispatch2(m)) {
+ return;
+ }
}
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
<< m->get_source_inst() << dendl;
@@ -729,7 +747,7 @@ public:
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_connect(Connection *con) {
- for (const auto& dispatcher : dispatchers) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
dispatcher->ms_handle_connect(con);
}
}
@@ -742,7 +760,7 @@ public:
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_fast_connect(Connection *con) {
- for (const auto& dispatcher : fast_dispatchers) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
dispatcher->ms_handle_fast_connect(con);
}
}
@@ -754,7 +772,7 @@ public:
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_accept(Connection *con) {
- for (const auto& dispatcher : dispatchers) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
dispatcher->ms_handle_accept(con);
}
}
@@ -766,7 +784,7 @@ public:
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_fast_accept(Connection *con) {
- for (const auto& dispatcher : fast_dispatchers) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
dispatcher->ms_handle_fast_accept(con);
}
}
@@ -779,9 +797,10 @@ public:
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_reset(Connection *con) {
- for (const auto& dispatcher : dispatchers) {
- if (dispatcher->ms_handle_reset(con))
- return;
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+ if (dispatcher->ms_handle_reset(con)) {
+ return;
+ }
}
}
/**
@@ -792,7 +811,7 @@ public:
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_remote_reset(Connection *con) {
- for (const auto& dispatcher : dispatchers) {
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
dispatcher->ms_handle_remote_reset(con);
}
}
@@ -806,9 +825,10 @@ public:
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_refused(Connection *con) {
- for (const auto& dispatcher : dispatchers) {
- if (dispatcher->ms_handle_refused(con))
+ for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+ if (dispatcher->ms_handle_refused(con)) {
return;
+ }
}
}