summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/ProtocolV2.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net/ProtocolV2.cc')
-rw-r--r--src/crimson/net/ProtocolV2.cc24
1 files changed, 12 insertions, 12 deletions
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc
index 750f458bd9d..4d7d06d7a33 100644
--- a/src/crimson/net/ProtocolV2.cc
+++ b/src/crimson/net/ProtocolV2.cc
@@ -12,7 +12,7 @@
#include "crimson/auth/AuthServer.h"
#include "crimson/common/formatter.h"
-#include "Dispatcher.h"
+#include "chained_dispatchers.h"
#include "Errors.h"
#include "Socket.h"
#include "SocketConnection.h"
@@ -143,10 +143,10 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds)
});
}
-ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher,
+ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
SocketConnection& conn,
SocketMessenger& messenger)
- : Protocol(proto_t::v2, dispatcher, conn),
+ : Protocol(proto_t::v2, dispatchers, conn),
messenger{messenger},
protocol_timer{conn}
{}
@@ -385,7 +385,7 @@ void ProtocolV2::reset_session(bool full)
client_cookie = generate_client_cookie();
peer_global_seq = 0;
reset_write();
- dispatcher->ms_handle_remote_reset(
+ dispatchers.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
}
@@ -1601,7 +1601,7 @@ void ProtocolV2::execute_establishing(
accept_me();
}
- dispatcher->ms_handle_accept(
+ dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
gated_execute("execute_establishing", [this] {
@@ -1699,7 +1699,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
if (socket) {
socket->shutdown();
}
- dispatcher->ms_handle_accept(
+ dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
gate.dispatch_in_background("trigger_replacing", *this,
[this,
@@ -1883,11 +1883,10 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
ceph_msg_footer footer{init_le32(0), init_le32(0),
init_le32(0), init_le64(0), current_header.flags};
- auto pconn = seastar::static_pointer_cast<SocketConnection>(
+ auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this());
Message *message = decode_message(nullptr, 0, header, footer,
- msg_frame.front(), msg_frame.middle(), msg_frame.data(),
- std::move(pconn));
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
if (!message) {
logger().warn("{} decode message failed", conn);
abort_in_fault();
@@ -1914,7 +1913,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
local_conf()->ms_die_on_old_message) {
ceph_assert(0 == "old msgs despite reconnect_seq feature");
}
- return;
+ return seastar::now();
} else if (message->get_seq() > cur_seq + 1) {
logger().error("{} missed message? skipped from seq {} to {}",
conn, cur_seq, message->get_seq());
@@ -1932,7 +1931,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- std::ignore = dispatcher->ms_dispatch(&conn, std::move(msg_ref));
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
});
}
@@ -1941,7 +1941,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
trigger_state(state_t::READY, write_state_t::open, false);
if (dispatch_connect) {
- dispatcher->ms_handle_connect(
+ dispatchers.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
#ifdef UNIT_TESTS_BUILT