diff options
Diffstat (limited to 'src/crimson/net/ProtocolV2.cc')
-rw-r--r-- | src/crimson/net/ProtocolV2.cc | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 750f458bd9d..4d7d06d7a33 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -12,7 +12,7 @@ #include "crimson/auth/AuthServer.h" #include "crimson/common/formatter.h" -#include "Dispatcher.h" +#include "chained_dispatchers.h" #include "Errors.h" #include "Socket.h" #include "SocketConnection.h" @@ -143,10 +143,10 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds) }); } -ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher, +ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers, SocketConnection& conn, SocketMessenger& messenger) - : Protocol(proto_t::v2, dispatcher, conn), + : Protocol(proto_t::v2, dispatchers, conn), messenger{messenger}, protocol_timer{conn} {} @@ -385,7 +385,7 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; reset_write(); - dispatcher->ms_handle_remote_reset( + dispatchers.ms_handle_remote_reset( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); } } @@ -1601,7 +1601,7 @@ void ProtocolV2::execute_establishing( accept_me(); } - dispatcher->ms_handle_accept( + dispatchers.ms_handle_accept( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); gated_execute("execute_establishing", [this] { @@ -1699,7 +1699,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (socket) { socket->shutdown(); } - dispatcher->ms_handle_accept( + dispatchers.ms_handle_accept( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); gate.dispatch_in_background("trigger_replacing", *this, [this, @@ -1883,11 +1883,10 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) ceph_msg_footer footer{init_le32(0), init_le32(0), init_le32(0), init_le64(0), current_header.flags}; - auto pconn = seastar::static_pointer_cast<SocketConnection>( + auto conn_ref = seastar::static_pointer_cast<SocketConnection>( conn.shared_from_this()); Message *message = decode_message(nullptr, 0, header, footer, - msg_frame.front(), msg_frame.middle(), msg_frame.data(), - std::move(pconn)); + msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref); if (!message) { logger().warn("{} decode message failed", conn); abort_in_fault(); @@ -1914,7 +1913,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) local_conf()->ms_die_on_old_message) { ceph_assert(0 == "old msgs despite reconnect_seq feature"); } - return; + return seastar::now(); } else if (message->get_seq() > cur_seq + 1) { logger().error("{} missed message? skipped from seq {} to {}", conn, cur_seq, message->get_seq()); @@ -1932,7 +1931,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; - std::ignore = dispatcher->ms_dispatch(&conn, std::move(msg_ref)); + // throttle the reading process by the returned future + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); }); } @@ -1941,7 +1941,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect) assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); trigger_state(state_t::READY, write_state_t::open, false); if (dispatch_connect) { - dispatcher->ms_handle_connect( + dispatchers.ms_handle_connect( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); } #ifdef UNIT_TESTS_BUILT |