diff options
author | Patrick Donnelly <pdonnell@redhat.com> | 2024-05-23 03:16:32 +0200 |
---|---|---|
committer | Patrick Donnelly <pdonnell@redhat.com> | 2024-05-23 03:16:32 +0200 |
commit | eb10f3df6e5116a020011365579584449ac76f04 (patch) | |
tree | 1e87396bd761b4094d7b5d028d352926a1e52ac3 /src/msg | |
parent | Merge PR #57392 into main (diff) | |
parent | mds: set dispatcher order (diff) | |
download | ceph-eb10f3df6e5116a020011365579584449ac76f04.tar.xz ceph-eb10f3df6e5116a020011365579584449ac76f04.zip |
Merge PR #57469 into main
* refs/pull/57469/head:
mds: set dispatcher order
mds: use regular dispatch for processing beacons
msg: add priority to dispatcher invocation order
mds: note when dispatcher is called
Reviewed-by: Leonid Usov <leonid.usov@ibm.com>
Diffstat (limited to 'src/msg')
-rw-r--r-- | src/msg/Dispatcher.h | 6 | ||||
-rw-r--r-- | src/msg/Messenger.h | 82 |
2 files changed, 57 insertions, 31 deletions
diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index ddf273288b3..799a4a9de91 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -29,6 +29,12 @@ class KeyStore; class Dispatcher { public: + /* Ordering of dispatch for a list of Dispatchers. */ + using priority_t = uint32_t; + static constexpr priority_t PRIORITY_HIGH = std::numeric_limits<priority_t>::max() / 4; + static constexpr priority_t PRIORITY_DEFAULT = std::numeric_limits<priority_t>::max() / 2; + static constexpr priority_t PRIORITY_LOW = (std::numeric_limits<priority_t>::max() / 4) * 3; + explicit Dispatcher(CephContext *cct_) : cct(cct_) { diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index fe8d7a72b38..830ae9050bb 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -17,9 +17,9 @@ #ifndef CEPH_MESSENGER_H #define CEPH_MESSENGER_H -#include <deque> #include <map> #include <optional> +#include <vector> #include <errno.h> #include <sstream> @@ -92,8 +92,18 @@ struct Interceptor { class Messenger { private: - std::deque<Dispatcher*> dispatchers; - std::deque<Dispatcher*> fast_dispatchers; + struct PriorityDispatcher { + using priority_t = Dispatcher::priority_t; + priority_t priority; + Dispatcher* dispatcher; + + bool operator<(const PriorityDispatcher& other) const { + return priority < other.priority; + } + }; + std::vector<PriorityDispatcher> dispatchers; + std::vector<PriorityDispatcher> fast_dispatchers; + ZTracer::Endpoint trace_endpoint; protected: @@ -389,11 +399,14 @@ public: * * @param d The Dispatcher to insert into the list. */ - void add_dispatcher_head(Dispatcher *d) { + void add_dispatcher_head(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) { bool first = dispatchers.empty(); - dispatchers.push_front(d); - if (d->ms_can_fast_dispatch_any()) - fast_dispatchers.push_front(d); + dispatchers.insert(dispatchers.begin(), PriorityDispatcher{priority, d}); + std::stable_sort(dispatchers.begin(), dispatchers.end()); + if (d->ms_can_fast_dispatch_any()) { + fast_dispatchers.insert(fast_dispatchers.begin(), PriorityDispatcher{priority, d}); + std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end()); + } if (first) ready(); } @@ -404,11 +417,14 @@ public: * * @param d The Dispatcher to insert into the list. */ - void add_dispatcher_tail(Dispatcher *d) { + void add_dispatcher_tail(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) { bool first = dispatchers.empty(); - dispatchers.push_back(d); - if (d->ms_can_fast_dispatch_any()) - fast_dispatchers.push_back(d); + dispatchers.push_back(PriorityDispatcher{priority, d}); + std::stable_sort(dispatchers.begin(), dispatchers.end()); + if (d->ms_can_fast_dispatch_any()) { + fast_dispatchers.push_back(PriorityDispatcher{priority, d}); + std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end()); + } if (first) ready(); } @@ -667,9 +683,10 @@ public: * @param m The Message we are testing. */ bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) { - for (const auto &dispatcher : fast_dispatchers) { - if (dispatcher->ms_can_fast_dispatch2(m)) - return true; + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { + if (dispatcher->ms_can_fast_dispatch2(m)) { + return true; + } } return false; } @@ -682,10 +699,10 @@ public: */ void ms_fast_dispatch(const ceph::ref_t<Message> &m) { m->set_dispatch_stamp(ceph_clock_now()); - for (const auto &dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { if (dispatcher->ms_can_fast_dispatch2(m)) { - dispatcher->ms_fast_dispatch2(m); - return; + dispatcher->ms_fast_dispatch2(m); + return; } } ceph_abort(); @@ -697,7 +714,7 @@ public: * */ void ms_fast_preprocess(const ceph::ref_t<Message> &m) { - for (const auto &dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_fast_preprocess2(m); } } @@ -710,9 +727,10 @@ public: */ void ms_deliver_dispatch(const ceph::ref_t<Message> &m) { m->set_dispatch_stamp(ceph_clock_now()); - for (const auto &dispatcher : dispatchers) { - if (dispatcher->ms_dispatch2(m)) - return; + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_dispatch2(m)) { + return; + } } lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " << m->get_source_inst() << dendl; @@ -729,7 +747,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_connect(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_connect(con); } } @@ -742,7 +760,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_fast_connect(Connection *con) { - for (const auto& dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_handle_fast_connect(con); } } @@ -754,7 +772,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_accept(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_accept(con); } } @@ -766,7 +784,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_fast_accept(Connection *con) { - for (const auto& dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_handle_fast_accept(con); } } @@ -779,9 +797,10 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_reset(Connection *con) { - for (const auto& dispatcher : dispatchers) { - if (dispatcher->ms_handle_reset(con)) - return; + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_handle_reset(con)) { + return; + } } } /** @@ -792,7 +811,7 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_remote_reset(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_remote_reset(con); } } @@ -806,9 +825,10 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_refused(Connection *con) { - for (const auto& dispatcher : dispatchers) { - if (dispatcher->ms_handle_refused(con)) + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_handle_refused(con)) { return; + } } } |