diff options
53 files changed, 928 insertions, 411 deletions
diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 61a45391cd3..e30aef360fe 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -164,7 +164,8 @@ set(crimson_net_srcs net/Socket.cc net/Protocol.cc net/ProtocolV1.cc - net/ProtocolV2.cc) + net/ProtocolV2.cc + net/chained_dispatchers.cc) set(crimson_thread_srcs thread/ThreadPool.cc thread/Throttle.cc) diff --git a/src/crimson/common/exception.h b/src/crimson/common/exception.h new file mode 100644 index 00000000000..28faf75f1b9 --- /dev/null +++ b/src/crimson/common/exception.h @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <exception> +#include <seastar/core/future.hh> +#include <seastar/core/future-util.hh> + +#include "crimson/common/log.h" + +namespace crimson::common { + +class system_shutdown_exception final : public std::exception{ +public: + const char* what() const noexcept final { + return "system shutting down"; + } +}; + +template<typename Func, typename... Args> +inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args) +{ + return seastar::futurize_invoke(std::forward<Func>(func), + std::forward<Args>(args)...) + .handle_exception([](std::exception_ptr eptr) { + if (*eptr.__cxa_exception_type() == + typeid(crimson::common::system_shutdown_exception)) { + crimson::get_logger(ceph_subsys_osd).debug( + "operation skipped, system shutdown"); + return seastar::now(); + } + std::rethrow_exception(eptr); + }); +} + +} diff --git a/src/crimson/common/gated.h b/src/crimson/common/gated.h new file mode 100644 index 00000000000..679e06b6491 --- /dev/null +++ b/src/crimson/common/gated.h @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/gate.hh> +#include <seastar/core/future.hh> +#include <seastar/core/future-util.hh> + +#include "crimson/common/exception.h" +#include "crimson/common/log.h" +#include "include/ceph_assert.h" + +namespace crimson::common { + +class Gated { + public: + static seastar::logger& gated_logger() { + return crimson::get_logger(ceph_subsys_osd); + } + template <typename Func, typename T> + inline void dispatch_in_background(const char* what, T& who, Func&& func) { + (void) dispatch(what, who, func); + } + template <typename Func, typename T> + inline seastar::future<> dispatch(const char* what, T& who, Func&& func) { + return seastar::with_gate(pending_dispatch, std::forward<Func>(func) + ).handle_exception([this, what, &who] (std::exception_ptr eptr) { + if (*eptr.__cxa_exception_type() == typeid(system_shutdown_exception)) { + gated_logger().debug( + "{}, {} skipped, system shutdown", who, what); + return seastar::now(); + } + gated_logger().error( + "{} dispatch() {} caught exception: {}", who, what, eptr); + ceph_abort("unexpected exception from dispatch()"); + }); + } + + seastar::future<> close() { + return pending_dispatch.close(); + } + bool is_closed() const { + return pending_dispatch.is_closed(); + } + private: + seastar::gate pending_dispatch; +}; + +}// namespace crimson::common diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index b6331191577..74403fc4e95 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -38,46 +38,54 @@ seastar::future<> Client::start() seastar::future<> Client::stop() { - return gate.close().then([this] { - if (conn) { - conn->mark_down(); - } - }); + logger().info("{}", __func__); + report_timer.cancel(); + auto fut = gate.close(); + if (conn) { + conn->mark_down(); + } + return fut; } seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - switch(m->get_type()) { - case MSG_MGR_MAP: - return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m)); - case MSG_MGR_CONFIGURE: - return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m)); - default: - return seastar::now(); - } + return gate.dispatch(__func__, *this, [this, conn, &m] { + switch(m->get_type()) { + case MSG_MGR_MAP: + return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m)); + case MSG_MGR_CONFIGURE: + return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m)); + default: + return seastar::now(); + } + }); } -seastar::future<> Client::ms_handle_connect(crimson::net::ConnectionRef c) +void Client::ms_handle_connect(crimson::net::ConnectionRef c) { - if (conn == c) { - // ask for the mgrconfigure message - auto m = ceph::make_message<MMgrOpen>(); - m->daemon_name = local_conf()->name.get_id(); - return conn->send(std::move(m)); - } else { - return seastar::now(); - } + gate.dispatch_in_background(__func__, *this, [this, c] { + if (conn == c) { + // ask for the mgrconfigure message + auto m = ceph::make_message<MMgrOpen>(); + m->daemon_name = local_conf()->name.get_id(); + return conn->send(std::move(m)); + } else { + return seastar::now(); + } + }); } -seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c, bool is_replace) +void Client::ms_handle_reset(crimson::net::ConnectionRef c, bool is_replace) { - if (conn == c) { - report_timer.cancel(); - return reconnect(); - } else { - return seastar::now(); - } + gate.dispatch_in_background(__func__, *this, [this, c, is_replace] { + if (conn == c) { + report_timer.cancel(); + return reconnect(); + } else { + return seastar::now(); + } + }); } seastar::future<> Client::reconnect() @@ -134,11 +142,16 @@ seastar::future<> Client::handle_mgr_conf(crimson::net::Connection* conn, void Client::report() { - (void) seastar::with_gate(gate, [this] { + gate.dispatch_in_background(__func__, *this, [this] { assert(conn); auto pg_stats = with_stats.get_stats(); return conn->send(std::move(pg_stats)); }); } +void Client::print(std::ostream& out) const +{ + out << "mgrc "; +} + } diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index 3873ba6418b..51130b06bc5 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -39,8 +39,8 @@ public: private: seastar::future<> ms_dispatch(crimson::net::Connection* conn, Ref<Message> m) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; - seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final; + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; + void ms_handle_connect(crimson::net::ConnectionRef conn) final; seastar::future<> handle_mgr_map(crimson::net::Connection* conn, Ref<MMgrMap> m); seastar::future<> handle_mgr_conf(crimson::net::Connection* conn, @@ -48,13 +48,20 @@ private: seastar::future<> reconnect(); void report(); + void print(std::ostream&) const; + friend std::ostream& operator<<(std::ostream& out, const Client& client); private: MgrMap mgrmap; crimson::net::Messenger& msgr; WithStats& with_stats; crimson::net::ConnectionRef conn; seastar::timer<seastar::lowres_clock> report_timer; - seastar::gate gate; + crimson::common::Gated gate; }; +inline std::ostream& operator<<(std::ostream& out, const Client& client) { + client.print(out); + return out; +} + } diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index a71dabdf810..c8cfac8aa09 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -496,7 +496,7 @@ seastar::future<> Client::load_keyring() void Client::tick() { - (void) seastar::with_gate(tick_gate, [this] { + gate.dispatch_in_background(__func__, *this, [this] { if (active_con) { return seastar::when_all_succeed(active_con->get_conn()->keepalive(), active_con->renew_tickets(), @@ -514,50 +514,54 @@ bool Client::is_hunting() const { seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - // we only care about these message types - switch (m->get_type()) { - case CEPH_MSG_MON_MAP: - return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m)); - case CEPH_MSG_AUTH_REPLY: - return handle_auth_reply( - conn, boost::static_pointer_cast<MAuthReply>(m)); - case CEPH_MSG_MON_SUBSCRIBE_ACK: - return handle_subscribe_ack( - boost::static_pointer_cast<MMonSubscribeAck>(m)); - case CEPH_MSG_MON_GET_VERSION_REPLY: - return handle_get_version_reply( - boost::static_pointer_cast<MMonGetVersionReply>(m)); - case MSG_MON_COMMAND_ACK: - return handle_mon_command_ack( - boost::static_pointer_cast<MMonCommandAck>(m)); - case MSG_LOGACK: - return handle_log_ack( - boost::static_pointer_cast<MLogAck>(m)); - case MSG_CONFIG: - return handle_config( - boost::static_pointer_cast<MConfig>(m)); - default: - return seastar::now(); - } + return gate.dispatch(__func__, *this, [this, conn, &m] { + // we only care about these message types + switch (m->get_type()) { + case CEPH_MSG_MON_MAP: + return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m)); + case CEPH_MSG_AUTH_REPLY: + return handle_auth_reply( + conn, boost::static_pointer_cast<MAuthReply>(m)); + case CEPH_MSG_MON_SUBSCRIBE_ACK: + return handle_subscribe_ack( + boost::static_pointer_cast<MMonSubscribeAck>(m)); + case CEPH_MSG_MON_GET_VERSION_REPLY: + return handle_get_version_reply( + boost::static_pointer_cast<MMonGetVersionReply>(m)); + case MSG_MON_COMMAND_ACK: + return handle_mon_command_ack( + boost::static_pointer_cast<MMonCommandAck>(m)); + case MSG_LOGACK: + return handle_log_ack( + boost::static_pointer_cast<MLogAck>(m)); + case MSG_CONFIG: + return handle_config( + boost::static_pointer_cast<MConfig>(m)); + default: + return seastar::now(); + } + }); } -seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) +void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { - auto found = std::find_if(pending_conns.begin(), pending_conns.end(), - [peer_addr = conn->get_peer_addr()](auto& mc) { - return mc->is_my_peer(peer_addr); - }); - if (found != pending_conns.end()) { - logger().warn("pending conn reset by {}", conn->get_peer_addr()); - (*found)->close(); - return seastar::now(); - } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { - logger().warn("active conn reset {}", conn->get_peer_addr()); - active_con.reset(); - return reopen_session(-1); - } else { - return seastar::now(); - } + gate.dispatch_in_background(__func__, *this, [this, conn, is_replace] { + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn->get_peer_addr()](auto& mc) { + return mc->is_my_peer(peer_addr); + }); + if (found != pending_conns.end()) { + logger().warn("pending conn reset by {}", conn->get_peer_addr()); + (*found)->close(); + return seastar::now(); + } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { + logger().warn("active conn reset {}", conn->get_peer_addr()); + active_con.reset(); + return reopen_session(-1); + } else { + return seastar::now(); + } + }); } std::pair<std::vector<uint32_t>, std::vector<uint32_t>> @@ -613,7 +617,10 @@ int Client::handle_auth_request(crimson::net::ConnectionRef con, return -EOPNOTSUPP; } auto authorizer_challenge = &auth_meta->authorizer_challenge; - ceph_assert(active_con); + if (!active_con) { + logger().error("connection to monitors is down, abort connection for now"); + return -EBUSY; + } if (!HAVE_FEATURE(active_con->get_conn()->get_features(), CEPHX_V2)) { if (local_conf().get_val<uint64_t>("cephx_service_require_version") >= 2) { return -EACCES; @@ -915,12 +922,16 @@ seastar::future<> Client::authenticate() seastar::future<> Client::stop() { - return tick_gate.close().then([this] { - timer.cancel(); - if (active_con) { - active_con->close(); - } - }); + logger().info("{}", __func__); + auto fut = gate.close(); + timer.cancel(); + for (auto& pending_con : pending_conns) { + pending_con->close(); + } + if (active_con) { + active_con->close(); + } + return fut; } seastar::future<> Client::reopen_session(int rank) @@ -995,7 +1006,10 @@ seastar::future<> Client::reopen_session(int rank) return seastar::make_exception_future(ep); }); }).then([this] { - ceph_assert_always(active_con); + if (!active_con) { + return seastar::make_exception_future( + crimson::common::system_shutdown_exception()); + } return active_con->renew_rotating_keyring(); }); } @@ -1058,4 +1072,9 @@ seastar::future<> Client::renew_subs() }); } +void Client::print(std::ostream& out) const +{ + out << "mon." << entity_name; +} + } // namespace crimson::mon diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index 3c3a5b6eddc..c33ac992698 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -17,6 +17,7 @@ #include "crimson/auth/AuthClient.h" #include "crimson/auth/AuthServer.h" #include "crimson/common/auth_handler.h" +#include "crimson/common/gated.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Fwd.h" @@ -54,7 +55,6 @@ class Client : public crimson::net::Dispatcher, std::unique_ptr<Connection> active_con; std::vector<std::unique_ptr<Connection>> pending_conns; seastar::timer<seastar::lowres_clock> timer; - seastar::gate tick_gate; crimson::net::Messenger& msgr; @@ -91,6 +91,7 @@ public: bool sub_want_increment(const std::string& what, version_t start, unsigned flags); seastar::future<> renew_subs(); + void print(std::ostream&) const; private: // AuthServer methods std::pair<std::vector<uint32_t>, std::vector<uint32_t>> @@ -141,7 +142,7 @@ private: seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; seastar::future<> handle_monmap(crimson::net::Connection* conn, Ref<MMonMap> m); @@ -161,6 +162,12 @@ private: seastar::future<> reopen_session(int rank); std::vector<unsigned> get_random_mons(unsigned n) const; seastar::future<> _add_conn(unsigned rank, uint64_t global_id); + crimson::common::Gated gate; }; +inline std::ostream& operator<<(std::ostream& out, const Client& client) { + client.print(out); + return out; +} + } // namespace crimson::mon diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index a7ca3d9d2b3..fd26d146166 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -16,40 +16,35 @@ #include <seastar/core/future.hh> #include <seastar/core/sharded.hh> +#include <boost/intrusive/slist.hpp> +#include "crimson/common/gated.h" #include "Fwd.h" class AuthAuthorizer; namespace crimson::net { -class Dispatcher { +class Dispatcher : public boost::intrusive::slist_base_hook< + boost::intrusive::link_mode< + boost::intrusive::safe_link>> { public: virtual ~Dispatcher() {} virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) { return seastar::make_ready_future<>(); } + virtual void ms_handle_accept(ConnectionRef conn) {} - virtual seastar::future<> ms_handle_accept(ConnectionRef conn) { - return seastar::make_ready_future<>(); - } - - virtual seastar::future<> ms_handle_connect(ConnectionRef conn) { - return seastar::make_ready_future<>(); - } + virtual void ms_handle_connect(ConnectionRef conn) {} // a reset event is dispatched when the connection is closed unexpectedly. // is_replace=true means the reset connection is going to be replaced by // another accepting connection with the same peer_addr, which currently only // happens under lossy policy when both sides wish to connect to each other. - virtual seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) { - return seastar::make_ready_future<>(); - } + virtual void ms_handle_reset(ConnectionRef conn, bool is_replace) {} - virtual seastar::future<> ms_handle_remote_reset(ConnectionRef conn) { - return seastar::make_ready_future<>(); - } + virtual void ms_handle_remote_reset(ConnectionRef conn) {} }; } // namespace crimson::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 6423f89d967..2b136aa4fe0 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -17,6 +17,7 @@ #include <seastar/core/future.hh> #include "Fwd.h" +#include "crimson/net/chained_dispatchers.h" #include "crimson/thread/Throttle.h" #include "msg/Message.h" #include "msg/Policy.h" @@ -72,7 +73,7 @@ public: uint32_t min_port, uint32_t max_port) = 0; /// start the messenger - virtual seastar::future<> start(Dispatcher *dispatcher) = 0; + virtual seastar::future<> start(ChainedDispatchersRef) = 0; /// either return an existing connection to the peer, /// or a new pending connection @@ -89,6 +90,11 @@ public: // wait for messenger shutdown virtual seastar::future<> wait() = 0; + virtual void add_dispatcher(Dispatcher&) = 0; + + virtual void remove_dispatcher(Dispatcher&) = 0; + + virtual bool dispatcher_chain_empty() const = 0; /// stop listenening and wait for all connections to close. safe to destruct /// after this future becomes available virtual seastar::future<> shutdown() = 0; diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index de2e02cee03..78578a5959a 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -21,7 +21,7 @@ namespace { namespace crimson::net { Protocol::Protocol(proto_t type, - Dispatcher& dispatcher, + ChainedDispatchersRef& dispatcher, SocketConnection& conn) : proto_type(type), dispatcher(dispatcher), @@ -31,7 +31,7 @@ Protocol::Protocol(proto_t type, Protocol::~Protocol() { - ceph_assert(pending_dispatch.is_closed()); + ceph_assert(gate.is_closed()); assert(!exit_open); } @@ -51,6 +51,7 @@ void Protocol::close(bool dispatch_reset, // unregister_conn() drops a reference, so hold another until completion auto cleanup = [conn_ref = conn.shared_from_this(), this] { logger().debug("{} closed!", conn); + on_closed(); #ifdef UNIT_TESTS_BUILT is_closed_clean = true; if (conn.interceptor) { @@ -69,12 +70,12 @@ void Protocol::close(bool dispatch_reset, socket->shutdown(); } set_write_state(write_state_t::drop); - auto gate_closed = pending_dispatch.close(); + auto gate_closed = gate.close(); auto reset_dispatched = seastar::futurize_invoke([this, dispatch_reset, is_replace] { if (dispatch_reset) { - return dispatcher.ms_handle_reset( - seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()), - is_replace); + dispatcher->ms_handle_reset( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()), + is_replace); } return seastar::now(); }).handle_exception([this] (std::exception_ptr eptr) { @@ -312,7 +313,7 @@ void Protocol::write_event() case write_state_t::open: [[fallthrough]]; case write_state_t::delay: - gated_dispatch("do_write_dispatch_sweep", [this] { + gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] { return do_write_dispatch_sweep(); }); return; diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 290cb3fb843..3deb706acb4 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -6,6 +6,7 @@ #include <seastar/core/gate.hh> #include <seastar/core/shared_future.hh> +#include "crimson/common/gated.h" #include "crimson/common/log.h" #include "Fwd.h" #include "SocketConnection.h" @@ -46,9 +47,10 @@ class Protocol { virtual void start_accept(SocketRef&& socket, const entity_addr_t& peer_addr) = 0; + virtual void print(std::ostream&) const = 0; protected: Protocol(proto_t type, - Dispatcher& dispatcher, + ChainedDispatchersRef& dispatcher, SocketConnection& conn); virtual void trigger_close() = 0; @@ -62,22 +64,14 @@ class Protocol { virtual void notify_write() {}; + virtual void on_closed() {} + public: const proto_t proto_type; SocketRef socket; protected: - template <typename Func> - void gated_dispatch(const char* what, Func&& func) { - (void) seastar::with_gate(pending_dispatch, std::forward<Func>(func) - ).handle_exception([this, what] (std::exception_ptr eptr) { - crimson::get_logger(ceph_subsys_ms).error( - "{} gated_dispatch() {} caught exception: {}", conn, what, eptr); - ceph_abort("unexpected exception from gated_dispatch()"); - }); - } - - Dispatcher &dispatcher; + ChainedDispatchersRef dispatcher; SocketConnection &conn; AuthConnectionMetaRef auth_meta; @@ -86,7 +80,6 @@ class Protocol { bool closed = false; // become valid only after closed == true seastar::shared_future<> close_ready; - seastar::gate pending_dispatch; // the write state-machine public: @@ -150,6 +143,7 @@ class Protocol { } void ack_writes(seq_num_t seq); + crimson::common::Gated gate; private: write_state_t write_state = write_state_t::none; @@ -170,4 +164,10 @@ class Protocol { void write_event(); }; +inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) { + proto.print(out); + return out; +} + + } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index b7ceb3c50e3..eaed769dcd3 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -125,7 +125,7 @@ void discard_up_to(std::deque<MessageRef>* queue, namespace crimson::net { -ProtocolV1::ProtocolV1(Dispatcher& dispatcher, +ProtocolV1::ProtocolV1(ChainedDispatchersRef& dispatcher, SocketConnection& conn, SocketMessenger& messenger) : Protocol(proto_t::v1, dispatcher, conn), messenger{messenger} {} @@ -322,7 +322,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, conn.policy = messenger.get_policy(_peer_name.type()); messenger.register_conn( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); - gated_dispatch("start_connect", [this] { + gate.dispatch_in_background("start_connect", *this, [this] { return Socket::connect(conn.peer_addr) .then([this](SocketRef sock) { socket = std::move(sock); @@ -624,7 +624,7 @@ void ProtocolV1::start_accept(SocketRef&& sock, socket = std::move(sock); messenger.accept_conn( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); - gated_dispatch("start_accept", [this] { + gate.dispatch_in_background("start_accept", *this, [this] { // stop learning my_addr before sending it out, so it won't change return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] { // encode/send server's handshake header @@ -850,10 +850,10 @@ seastar::future<> ProtocolV1::read_message() } // start dispatch, ignoring exceptions from the application layer - gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] { + gate.dispatch_in_background("ms_dispatch", *this, [this, msg = std::move(msg_ref)] { logger().debug("{} <== #{} === {} ({})", conn, msg->get_seq(), *msg, msg->get_type()); - return dispatcher.ms_dispatch(&conn, std::move(msg)); + return dispatcher->ms_dispatch(&conn, std::move(msg)); }); }); } @@ -894,18 +894,18 @@ void ProtocolV1::execute_open(open_t type) set_write_state(write_state_t::open); if (type == open_t::connected) { - gated_dispatch("ms_handle_connect", [this] { - return dispatcher.ms_handle_connect( + gate.dispatch_in_background("ms_handle_connect", *this, [this] { + return dispatcher->ms_handle_connect( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); }); } else { // type == open_t::accepted - gated_dispatch("ms_handle_accept", [this] { - return dispatcher.ms_handle_accept( + gate.dispatch_in_background("ms_handle_accept", *this, [this] { + return dispatcher->ms_handle_accept( seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); }); } - gated_dispatch("execute_open", [this] { + gate.dispatch_in_background("execute_open", *this, [this] { // start background processing of tags return handle_tags() .handle_exception_type([this] (const std::system_error& e) { @@ -966,4 +966,9 @@ seastar::future<> ProtocolV1::fault() return seastar::now(); } +void ProtocolV1::print(std::ostream& out) const +{ + out << conn; +} + } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h index 8278230f779..8ab0da18673 100644 --- a/src/crimson/net/ProtocolV1.h +++ b/src/crimson/net/ProtocolV1.h @@ -12,11 +12,11 @@ namespace crimson::net { class ProtocolV1 final : public Protocol { public: - ProtocolV1(Dispatcher& dispatcher, + ProtocolV1(ChainedDispatchersRef& dispatcher, SocketConnection& conn, SocketMessenger& messenger); ~ProtocolV1() override; - + void print(std::ostream&) const final; private: bool is_connected() const override; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index beb94fd9db1..05f53582914 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -143,7 +143,7 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds) }); } -ProtocolV2::ProtocolV2(Dispatcher& dispatcher, +ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher, SocketConnection& conn, SocketMessenger& messenger) : Protocol(proto_t::v2, dispatcher, conn), @@ -444,10 +444,8 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; reset_write(); - gated_dispatch("ms_handle_remote_reset", [this] { - return dispatcher.ms_handle_remote_reset( - seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); - }); + dispatcher->ms_handle_remote_reset( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); } } @@ -881,7 +879,7 @@ void ProtocolV2::execute_connecting() abort_protocol(); } if (socket) { - gated_dispatch("close_sockect_connecting", + gate.dispatch_in_background("close_sockect_connecting", *this, [sock = std::move(socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); @@ -1526,7 +1524,7 @@ ProtocolV2::server_reconnect() void ProtocolV2::execute_accepting() { trigger_state(state_t::ACCEPTING, write_state_t::none, false); - gated_dispatch("execute_accepting", [this] { + gate.dispatch_in_background("execute_accepting", *this, [this] { return seastar::futurize_invoke([this] { INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED); auth_meta = seastar::make_lw_shared<AuthConnectionMeta>(); @@ -1653,10 +1651,8 @@ void ProtocolV2::execute_establishing( accept_me(); } - gated_dispatch("ms_handle_accept_establishing", [this] { - return dispatcher.ms_handle_accept( - seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); - }); + dispatcher->ms_handle_accept( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); gated_execute("execute_establishing", [this] { return seastar::futurize_invoke([this] { @@ -1751,11 +1747,9 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (socket) { socket->shutdown(); } - gated_dispatch("ms_handle_accept_replacing", [this] { - return dispatcher.ms_handle_accept( - seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); - }); - gated_dispatch("trigger_replacing", + dispatcher->ms_handle_accept( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); + gate.dispatch_in_background("trigger_replacing", *this, [this, reconnect, do_reset, @@ -1786,7 +1780,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, } if (socket) { - gated_dispatch("close_socket_replacing", + gate.dispatch_in_background("close_socket_replacing", *this, [sock = std::move(socket)] () mutable { return sock->close().then([sock = std::move(sock)] {}); }); @@ -1982,9 +1976,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; - gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] { - return dispatcher.ms_dispatch(&conn, std::move(msg)); - }); + std::ignore = dispatcher->ms_dispatch(&conn, std::move(msg_ref)); }); } @@ -1993,10 +1985,8 @@ void ProtocolV2::execute_ready(bool dispatch_connect) assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); trigger_state(state_t::READY, write_state_t::open, false); if (dispatch_connect) { - gated_dispatch("ms_handle_connect", [this] { - return dispatcher.ms_handle_connect( - seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); - }); + dispatcher->ms_handle_connect( + seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this())); } #ifdef UNIT_TESTS_BUILT if (conn.interceptor) { @@ -2170,8 +2160,22 @@ void ProtocolV2::trigger_close() } protocol_timer.cancel(); - + messenger.closing_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); trigger_state(state_t::CLOSING, write_state_t::drop, false); } +void ProtocolV2::on_closed() +{ + messenger.closed_conn( + seastar::static_pointer_cast<SocketConnection>( + conn.shared_from_this())); +} + +void ProtocolV2::print(std::ostream& out) const +{ + out << conn; +} + } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index dfca59a3361..5f0aef78fc2 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -13,12 +13,13 @@ namespace crimson::net { class ProtocolV2 final : public Protocol { public: - ProtocolV2(Dispatcher& dispatcher, + ProtocolV2(ChainedDispatchersRef& dispatcher, SocketConnection& conn, SocketMessenger& messenger); ~ProtocolV2() override; - + void print(std::ostream&) const final; private: + void on_closed() override; bool is_connected() const override; void start_connect(const entity_addr_t& peer_addr, @@ -84,7 +85,7 @@ class ProtocolV2 final : public Protocol { template <typename Func> void gated_execute(const char* what, Func&& func) { - gated_dispatch(what, [this, &func] { + gate.dispatch_in_background(what, *this, [this, &func] { execution_done = seastar::futurize_invoke(std::forward<Func>(func)); return execution_done.get_future(); }); diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index ca4c63b9f30..b0c7197eedb 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -26,7 +26,7 @@ using namespace crimson::net; using crimson::common::local_conf; SocketConnection::SocketConnection(SocketMessenger& messenger, - Dispatcher& dispatcher, + ChainedDispatchersRef& dispatcher, bool is_msgr2) : messenger(messenger) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 90b84181995..67657a6e190 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -17,6 +17,7 @@ #include <seastar/core/sharded.hh> #include "msg/Policy.h" +#include "crimson/net/chained_dispatchers.h" #include "crimson/net/Connection.h" #include "crimson/net/Socket.h" #include "crimson/thread/Throttle.h" @@ -54,7 +55,7 @@ class SocketConnection : public Connection { public: SocketConnection(SocketMessenger& messenger, - Dispatcher& dispatcher, + ChainedDispatchersRef& dispatcher, bool is_msgr2); ~SocketConnection() override; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 9535917f18f..f3d23f4aae6 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -111,10 +111,10 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs, }); } -seastar::future<> SocketMessenger::start(Dispatcher *disp) { +seastar::future<> SocketMessenger::start(ChainedDispatchersRef chained_dispatchers) { assert(seastar::this_shard_id() == master_sid); - dispatcher = disp; + dispatchers = chained_dispatchers; if (listener) { // make sure we have already bound to a valid address ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2()); @@ -123,7 +123,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) { return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) { assert(seastar::this_shard_id() == master_sid); SocketConnectionRef conn = seastar::make_shared<SocketConnection>( - *this, *dispatcher, get_myaddr().is_msgr2()); + *this, dispatchers, get_myaddr().is_msgr2()); conn->start_accept(std::move(socket), peer_addr); return seastar::now(); }); @@ -145,7 +145,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe return found->shared_from_this(); } SocketConnectionRef conn = seastar::make_shared<SocketConnection>( - *this, *dispatcher, peer_addr.is_msgr2()); + *this, dispatchers, peer_addr.is_msgr2()); conn->start_connect(peer_addr, peer_name); return conn->shared_from_this(); } @@ -154,6 +154,9 @@ seastar::future<> SocketMessenger::shutdown() { assert(seastar::this_shard_id() == master_sid); return seastar::futurize_invoke([this] { + if (dispatchers) { + assert(dispatchers->empty()); + } if (listener) { auto d_listener = listener; listener = nullptr; @@ -172,6 +175,10 @@ seastar::future<> SocketMessenger::shutdown() return conn.second->close_clean(false); }); }).then([this] { + return seastar::parallel_for_each(closing_conns, [] (auto conn) { + return conn->close_clean(false); + }); + }).then([this] { ceph_assert(connections.empty()); shutdown_promise.set_value(); }); @@ -303,6 +310,23 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn) connections.erase(found); } +void SocketMessenger::closing_conn(SocketConnectionRef conn) +{ + closing_conns.push_back(conn); +} + +void SocketMessenger::closed_conn(SocketConnectionRef conn) +{ + for (auto it = closing_conns.begin(); + it != closing_conns.end();) { + if (*it == conn) { + it = closing_conns.erase(it); + } else { + it++; + } + } +} + seastar::future<uint32_t> SocketMessenger::get_global_seq(uint32_t old) { diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 5f1c02ce1ee..e86a44d6719 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -22,6 +22,7 @@ #include <seastar/core/sharded.hh> #include <seastar/core/shared_future.hh> +#include "crimson/net/chained_dispatchers.h" #include "Messenger.h" #include "SocketConnection.h" @@ -34,9 +35,18 @@ class SocketMessenger final : public Messenger { seastar::promise<> shutdown_promise; FixedCPUServerSocket* listener = nullptr; - Dispatcher *dispatcher = nullptr; + // as we want to unregister a dispatcher from the messengers when stopping + // that dispatcher, we have to use intrusive slist which, when used with + // "boost::intrusive::linear<true>", can tolerate ongoing iteration of the + // list when removing an element. However, the downside of this is that an + // element can only be attached to one slist. So, as we need to make multiple + // messenger reference the same set of dispatchers, we have to make them share + // the same ChainedDispatchers, which means registering/unregistering an element + // to one messenger will affect other messengers that share the same ChainedDispatchers. + ChainedDispatchersRef dispatchers; std::map<entity_addr_t, SocketConnectionRef> connections; std::set<SocketConnectionRef> accepting_conns; + std::vector<SocketConnectionRef> closing_conns; ceph::net::PolicySet<Throttle> policy_set; // Distinguish messengers with meaningful names for debugging const std::string logic_name; @@ -63,7 +73,10 @@ class SocketMessenger final : public Messenger { seastar::future<> try_bind(const entity_addrvec_t& addr, uint32_t min_port, uint32_t max_port) override; - seastar::future<> start(Dispatcher *dispatcher) override; + seastar::future<> start(ChainedDispatchersRef dispatchers) override; + void add_dispatcher(Dispatcher& disp) { + dispatchers->push_back(disp); + } ConnectionRef connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) override; @@ -73,6 +86,12 @@ class SocketMessenger final : public Messenger { return shutdown_promise.get_future(); } + void remove_dispatcher(Dispatcher& disp) override { + dispatchers->erase(disp); + } + bool dispatcher_chain_empty() const override { + return !dispatchers || dispatchers->empty(); + } seastar::future<> shutdown() override; void print(ostream& out) const override { @@ -101,6 +120,8 @@ class SocketMessenger final : public Messenger { void unaccept_conn(SocketConnectionRef); void register_conn(SocketConnectionRef); void unregister_conn(SocketConnectionRef); + void closing_conn(SocketConnectionRef); + void closed_conn(SocketConnectionRef); seastar::shard_id shard_id() const { assert(seastar::this_shard_id() == master_sid); return master_sid; diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc new file mode 100644 index 00000000000..19c7d3d4863 --- /dev/null +++ b/src/crimson/net/chained_dispatchers.cc @@ -0,0 +1,39 @@ +#include "crimson/net/chained_dispatchers.h" +#include "crimson/net/Connection.h" +#include "msg/Message.h" + +seastar::future<> +ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn, + MessageRef m) { + return seastar::do_for_each(dispatchers, [conn, m](Dispatcher& dispatcher) { + return dispatcher.ms_dispatch(conn, m); + }); +} + +void +ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) { + for (auto& dispatcher : dispatchers) { + dispatcher.ms_handle_accept(conn); + } +} + +void +ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) { + for(auto& dispatcher : dispatchers) { + dispatcher.ms_handle_connect(conn); + } +} + +void +ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { + for (auto& dispatcher : dispatchers) { + dispatcher.ms_handle_reset(conn, is_replace); + } +} + +void +ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) { + for (auto& dispatcher : dispatchers) { + dispatcher.ms_handle_remote_reset(conn); + } +} diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h new file mode 100644 index 00000000000..a5facef2b96 --- /dev/null +++ b/src/crimson/net/chained_dispatchers.h @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/intrusive/slist.hpp> + +#include "crimson/net/Dispatcher.h" +#include "crimson/common/log.h" + +using crimson::net::Dispatcher; + +// in existing Messenger, dispatchers are put into a chain as described by +// chain-of-responsibility pattern. we could do the same to stop processing +// the message once any of the dispatchers claims this message, and prevent +// other dispatchers from reading it. but this change is more involved as +// it requires changing the ms_ methods to return a bool. so as an intermediate +// solution, we are using an observer dispatcher to notify all the interested +// or unintersted parties. +class ChainedDispatchers { + boost::intrusive::slist< + Dispatcher, + boost::intrusive::linear<true>, + boost::intrusive::cache_last<true>> dispatchers; +public: + void push_front(Dispatcher& dispatcher) { + dispatchers.push_front(dispatcher); + } + void push_back(Dispatcher& dispatcher) { + dispatchers.push_back(dispatcher); + } + void erase(Dispatcher& dispatcher) { + dispatchers.erase(dispatchers.iterator_to(dispatcher)); + } + bool empty() const { + return dispatchers.empty(); + } + seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m); + void ms_handle_accept(crimson::net::ConnectionRef conn); + void ms_handle_connect(crimson::net::ConnectionRef conn); + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace); + void ms_handle_remote_reset(crimson::net::ConnectionRef conn); +}; + +using ChainedDispatchersRef = seastar::lw_shared_ptr<ChainedDispatchers>; diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc index 6a66a6f0626..93105a64221 100644 --- a/src/crimson/os/alienstore/alien_store.cc +++ b/src/crimson/os/alienstore/alien_store.cc @@ -96,7 +96,7 @@ seastar::future<> AlienStore::mount() seastar::future<> AlienStore::umount() { - logger().debug("{}", __func__); + logger().info("{}", __func__); return transaction_gate.close().then([this] { return tp->submit([this] { return store->umount(); diff --git a/src/crimson/os/cyanstore/cyan_store.h b/src/crimson/os/cyanstore/cyan_store.h index e832ca1f043..a53cfea9315 100644 --- a/src/crimson/os/cyanstore/cyan_store.h +++ b/src/crimson/os/cyanstore/cyan_store.h @@ -68,7 +68,9 @@ public: CyanStore(const std::string& path); ~CyanStore() final; - seastar::future<> stop() final {return seastar::now();} + seastar::future<> stop() final { + return seastar::now(); + } seastar::future<> mount() final; seastar::future<> umount() final; diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 02c0a5678c7..1eaf862ab84 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -1,5 +1,4 @@ add_executable(crimson-osd - chained_dispatchers.cc ec_backend.cc heartbeat.cc main.cc diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc deleted file mode 100644 index 3d6ba846b75..00000000000 --- a/src/crimson/osd/chained_dispatchers.cc +++ /dev/null @@ -1,40 +0,0 @@ -#include "crimson/osd/chained_dispatchers.h" -#include "crimson/net/Connection.h" -#include "msg/Message.h" - - -seastar::future<> -ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn, - MessageRef m) { - return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) { - return dispatcher->ms_dispatch(conn, m); - }); -} - -seastar::future<> -ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) { - return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { - return dispatcher->ms_handle_accept(conn); - }); -} - -seastar::future<> -ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) { - return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { - return dispatcher->ms_handle_connect(conn); - }); -} - -seastar::future<> -ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { - return seastar::do_for_each(dispatchers, [conn, is_replace](Dispatcher* dispatcher) { - return dispatcher->ms_handle_reset(conn, is_replace); - }); -} - -seastar::future<> -ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) { - return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { - return dispatcher->ms_handle_remote_reset(conn); - }); -} diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h deleted file mode 100644 index 2ea1e517b78..00000000000 --- a/src/crimson/osd/chained_dispatchers.h +++ /dev/null @@ -1,30 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- -// vim: ts=8 sw=2 smarttab - -#pragma once - -#include <deque> -#include "crimson/net/Dispatcher.h" - -// in existing Messenger, dispatchers are put into a chain as described by -// chain-of-responsibility pattern. we could do the same to stop processing -// the message once any of the dispatchers claims this message, and prevent -// other dispatchers from reading it. but this change is more involved as -// it requires changing the ms_ methods to return a bool. so as an intermediate -// solution, we are using an observer dispatcher to notify all the interested -// or unintersted parties. -class ChainedDispatchers : public crimson::net::Dispatcher { - std::deque<Dispatcher*> dispatchers; -public: - void push_front(Dispatcher* dispatcher) { - dispatchers.push_front(dispatcher); - } - void push_back(Dispatcher* dispatcher) { - dispatchers.push_back(dispatcher); - } - seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override; - seastar::future<> ms_handle_accept(crimson::net::ConnectionRef conn) override; - seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; - seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) override; -}; diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index d9451d2296b..a213a3f1ee4 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -17,6 +17,9 @@ public: crimson::osd::ShardServices& shard_services, const ec_profile_t& ec_profile, uint64_t stripe_width); + seastar::future<> stop() final { + return seastar::now(); + } private: ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid, uint64_t off, diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 67e31267322..6bf4d13cb51 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -54,8 +54,14 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, SocketPolicy::stateless_server(0)); back_msgr->set_policy(entity_name_t::TYPE_OSD, SocketPolicy::stateless_server(0)); - return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs), - start_messenger(*back_msgr, back_addrs)) + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return seastar::when_all_succeed(start_messenger(*front_msgr, + front_addrs, + chained_dispatchers), + start_messenger(*back_msgr, + back_addrs, + chained_dispatchers)) .then([this] { timer.arm_periodic( std::chrono::seconds(local_conf()->osd_heartbeat_interval)); @@ -64,19 +70,29 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, seastar::future<> Heartbeat::start_messenger(crimson::net::Messenger& msgr, - const entity_addrvec_t& addrs) + const entity_addrvec_t& addrs, + ChainedDispatchersRef chained_dispatchers) { return msgr.try_bind(addrs, local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max).then([&msgr, this] { - return msgr.start(this); + local_conf()->ms_bind_port_max) + .then([&msgr, chained_dispatchers]() mutable { + return msgr.start(chained_dispatchers); }); } seastar::future<> Heartbeat::stop() { - return seastar::when_all_succeed(front_msgr->shutdown(), - back_msgr->shutdown()); + logger().info("{}", __func__); + timer.cancel(); + if (!front_msgr->dispatcher_chain_empty()) + front_msgr->remove_dispatcher(*this); + if (!back_msgr->dispatcher_chain_empty()) + back_msgr->remove_dispatcher(*this); + return gate.close().then([this] { + return seastar::when_all_succeed(front_msgr->shutdown(), + back_msgr->shutdown()); + }); } const entity_addrvec_t& Heartbeat::get_front_addrs() const @@ -187,15 +203,17 @@ void Heartbeat::remove_peer(osd_id_t peer) seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - switch (m->get_type()) { - case MSG_OSD_PING: - return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m)); - default: - return seastar::now(); - } + return gate.dispatch(__func__, *this, [this, conn, &m] { + switch (m->get_type()) { + case MSG_OSD_PING: + return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m)); + default: + return seastar::now(); + } + }); } -seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) +void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { auto found = std::find_if(peers.begin(), peers.end(), [conn](const peers_map_t::value_type& peer) { @@ -203,13 +221,12 @@ seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, b peer.second.con_back == conn); }); if (found == peers.end()) { - return seastar::now(); + return; } const auto peer = found->first; const auto epoch = found->second.epoch; remove_peer(peer); add_peer(peer, epoch); - return seastar::now(); } seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn, @@ -428,3 +445,8 @@ bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const // replies from both front/back connections return !is_unhealthy(now); } + +void Heartbeat::print(std::ostream& out) const +{ + out << "heartbeat"; +} diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 55571cff2fb..f5e2ad596cf 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -6,6 +6,7 @@ #include <cstdint> #include <seastar/core/future.hh> #include "common/ceph_time.h" +#include "crimson/net/chained_dispatchers.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Fwd.h" @@ -46,8 +47,9 @@ public: // Dispatcher methods seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; + void print(std::ostream&) const; private: seastar::future<> handle_osd_ping(crimson::net::Connection* conn, Ref<MOSDPing> m); @@ -67,7 +69,8 @@ private: void add_reporter_peers(int whoami); seastar::future<> start_messenger(crimson::net::Messenger& msgr, - const entity_addrvec_t& addrs); + const entity_addrvec_t& addrs, + ChainedDispatchersRef); private: const crimson::osd::ShardServices& service; crimson::mon::Client& monc; @@ -121,4 +124,10 @@ private: // osds we've reported to monior as failed ones, but they are not marked down // yet std::map<osd_id_t, failure_info_t> failure_pending; + crimson::common::Gated gate; }; + +inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) { + hb.print(out); + return out; +} diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 3591173f755..5a082d6c756 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -34,6 +34,7 @@ #include "osd/PGPeeringEvent.h" #include "osd/PeeringState.h" +#include "crimson/common/exception.h" #include "crimson/mon/MonClient.h" #include "crimson/net/Connection.h" #include "crimson/net/Messenger.h" @@ -256,18 +257,23 @@ seastar::future<> OSD::start() cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT, SocketPolicy::stateless_server(0)); - dispatchers.push_front(this); - dispatchers.push_front(monc.get()); - dispatchers.push_front(mgrc.get()); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_front(*mgrc); + chained_dispatchers->push_front(*monc); + chained_dispatchers->push_front(*this); return seastar::when_all_succeed( cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this] { return cluster_msgr->start(&dispatchers); }), + .then([this, chained_dispatchers]() mutable { + return cluster_msgr->start(chained_dispatchers); + }), public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this] { return public_msgr->start(&dispatchers); })); + .then([this, chained_dispatchers]() mutable { + return public_msgr->start(chained_dispatchers); + })); }).then([this] { return seastar::when_all_succeed(monc->start(), mgrc->start()); @@ -453,20 +459,38 @@ seastar::future<> OSD::stop() // see also OSD::shutdown() state.set_stopping(); - return gate.close().then([this] { - return asok->stop(); - }).then([this] { + if (!public_msgr->dispatcher_chain_empty()) { + public_msgr->remove_dispatcher(*this); + public_msgr->remove_dispatcher(*mgrc); + public_msgr->remove_dispatcher(*monc); + } + if (!cluster_msgr->dispatcher_chain_empty()) { + cluster_msgr->remove_dispatcher(*this); + cluster_msgr->remove_dispatcher(*mgrc); + cluster_msgr->remove_dispatcher(*monc); + } + auto gate_close_fut = gate.close(); + return asok->stop().then([this] { return heartbeat->stop(); }).then([this] { - return monc->stop(); - }).then([this] { - return when_all_succeed( - public_msgr->shutdown(), - cluster_msgr->shutdown()); - }).then([this] { return store->umount(); }).then([this] { return store->stop(); + }).then([this] { + return seastar::parallel_for_each(pg_map.get_pgs(), + [](auto& p) { + return p.second->stop(); + }); + }).then([this] { + return monc->stop(); + }).then([this] { + return mgrc->stop(); + }).then([fut=std::move(gate_close_fut)]() mutable { + return std::move(fut); + }).then([this] { + return when_all_succeed( + public_msgr->shutdown(), + cluster_msgr->shutdown()); }).handle_exception([](auto ep) { logger().error("error while stopping osd: {}", ep); }); @@ -483,6 +507,14 @@ void OSD::dump_status(Formatter* f) const f->dump_unsigned("num_pgs", pg_map.get_pgs().size()); } +void OSD::print(std::ostream& out) const +{ + out << "{osd." << superblock.whoami << " " + << superblock.osd_fsid << " [" << superblock.oldest_map + << "," << superblock.newest_map << "] " << pg_map.get_pgs().size() + << " pgs}"; +} + seastar::future<> OSD::load_pgs() { return store->list_collections().then([this](auto colls) { @@ -575,79 +607,69 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid) seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - if (state.is_stopping()) { - return seastar::now(); - } - - switch (m->get_type()) { - case CEPH_MSG_OSD_MAP: - return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m)); - case CEPH_MSG_OSD_OP: - return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m)); - case MSG_OSD_PG_CREATE2: - shard_services.start_operation<CompoundPeeringRequest>( - *this, - conn->get_shared(), - m); - return seastar::now(); - case MSG_COMMAND: - return handle_command(conn, boost::static_pointer_cast<MCommand>(m)); - case MSG_OSD_PG_PULL: - [[fallthrough]]; - case MSG_OSD_PG_PUSH: - [[fallthrough]]; - case MSG_OSD_PG_PUSH_REPLY: - [[fallthrough]]; - case MSG_OSD_PG_RECOVERY_DELETE: - [[fallthrough]]; - case MSG_OSD_PG_RECOVERY_DELETE_REPLY: - return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m)); - case MSG_OSD_PG_LEASE: - [[fallthrough]]; - case MSG_OSD_PG_LEASE_ACK: - [[fallthrough]]; - case MSG_OSD_PG_NOTIFY2: - [[fallthrough]]; - case MSG_OSD_PG_INFO2: - [[fallthrough]]; - case MSG_OSD_PG_QUERY2: - [[fallthrough]]; - case MSG_OSD_BACKFILL_RESERVE: - [[fallthrough]]; - case MSG_OSD_RECOVERY_RESERVE: - [[fallthrough]]; - case MSG_OSD_PG_LOG: - return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m)); - case MSG_OSD_REPOP: - return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m)); - case MSG_OSD_REPOPREPLY: - return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m)); - default: - logger().info("{} unhandled message {}", __func__, *m); - return seastar::now(); - } -} - -seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn) -{ - if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) { - return seastar::now(); - } else { - return seastar::now(); - } + return gate.dispatch(__func__, *this, [this, conn, &m] { + if (state.is_stopping()) { + return seastar::now(); + } + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m)); + case CEPH_MSG_OSD_OP: + return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m)); + case MSG_OSD_PG_CREATE2: + shard_services.start_operation<CompoundPeeringRequest>( + *this, + conn->get_shared(), + m); + return seastar::now(); + case MSG_COMMAND: + return handle_command(conn, boost::static_pointer_cast<MCommand>(m)); + case MSG_OSD_PG_PULL: + [[fallthrough]]; + case MSG_OSD_PG_PUSH: + [[fallthrough]]; + case MSG_OSD_PG_PUSH_REPLY: + [[fallthrough]]; + case MSG_OSD_PG_RECOVERY_DELETE: + [[fallthrough]]; + case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m)); + case MSG_OSD_PG_LEASE: + [[fallthrough]]; + case MSG_OSD_PG_LEASE_ACK: + [[fallthrough]]; + case MSG_OSD_PG_NOTIFY2: + [[fallthrough]]; + case MSG_OSD_PG_INFO2: + [[fallthrough]]; + case MSG_OSD_PG_QUERY2: + [[fallthrough]]; + case MSG_OSD_BACKFILL_RESERVE: + [[fallthrough]]; + case MSG_OSD_RECOVERY_RESERVE: + [[fallthrough]]; + case MSG_OSD_PG_LOG: + return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m)); + case MSG_OSD_REPOP: + return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m)); + case MSG_OSD_REPOPREPLY: + return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m)); + default: + logger().info("ms_dispatch unhandled message {}", *m); + return seastar::now(); + } + }); } -seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) +void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { // TODO: cleanup the session attached to this connection logger().warn("ms_handle_reset"); - return seastar::now(); } -seastar::future<> OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn) +void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn) { logger().warn("ms_handle_remote_reset"); - return seastar::now(); } void OSD::handle_authentication(const EntityName& name, @@ -983,7 +1005,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn, Ref<MOSDOp> m) { - shard_services.start_operation<ClientRequest>( + (void) shard_services.start_operation<ClientRequest>( *this, conn->get_shared(), std::move(m)); @@ -994,7 +1016,7 @@ seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn, Ref<MOSDRepOp> m) { m->finish_decode(); - shard_services.start_operation<RepRequest>( + (void) shard_services.start_operation<RepRequest>( *this, conn->get_shared(), std::move(m)); @@ -1017,7 +1039,7 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn, seastar::future<> OSD::handle_recovery_subreq(crimson::net::Connection* conn, Ref<MOSDFastDispatchOp> m) { - shard_services.start_operation<RecoverySubRequest>( + (void) shard_services.start_operation<RecoverySubRequest>( *this, conn->get_shared(), std::move(m)); @@ -1107,13 +1129,14 @@ seastar::future<> OSD::handle_peering_op( { const int from = m->get_source().num(); logger().debug("handle_peering_op on {} from {}", m->get_spg(), from); - shard_services.start_operation<RemotePeeringEvent>( + std::unique_ptr<PGPeeringEvent> evt(m->get_event()); + (void) shard_services.start_operation<RemotePeeringEvent>( *this, conn->get_shared(), shard_services, pg_shard_t{from, m->get_spg().shard}, m->get_spg(), - std::move(*m->get_event())); + std::move(*evt)); return seastar::now(); } diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 47e8280368a..05d5ad9adfc 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -12,13 +12,14 @@ #include "crimson/common/type_helpers.h" #include "crimson/common/auth_handler.h" +#include "crimson/common/gated.h" +#include "crimson/net/chained_dispatchers.h" #include "crimson/admin/admin_socket.h" #include "crimson/admin/osd_admin.h" #include "crimson/common/simple_lru.h" #include "crimson/common/shared_lru.h" #include "crimson/mgr/client.h" #include "crimson/net/Dispatcher.h" -#include "crimson/osd/chained_dispatchers.h" #include "crimson/osd/osdmap_service.h" #include "crimson/osd/state.h" #include "crimson/osd/shard_services.h" @@ -63,7 +64,6 @@ class OSD final : public crimson::net::Dispatcher, private OSDMapService, private crimson::common::AuthHandler, private crimson::mgr::WithStats { - seastar::gate gate; const int whoami; const uint32_t nonce; seastar::timer<seastar::lowres_clock> beacon_timer; @@ -71,7 +71,6 @@ class OSD final : public crimson::net::Dispatcher, crimson::net::MessengerRef cluster_msgr; // talk with client/mon/mgr crimson::net::MessengerRef public_msgr; - ChainedDispatchers dispatchers; std::unique_ptr<crimson::mon::Client> monc; std::unique_ptr<crimson::mgr::Client> mgrc; @@ -99,9 +98,8 @@ class OSD final : public crimson::net::Dispatcher, // Dispatcher methods seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final; - seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final; - seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; - seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) final; + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; + void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final; // mgr::WithStats methods MessageRef get_stats() final; @@ -133,6 +131,7 @@ public: void dump_status(Formatter*) const; + void print(std::ostream&) const; private: seastar::future<> start_boot(); seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap); @@ -205,7 +204,7 @@ public: private: PGMap pg_map; - + crimson::common::Gated gate; public: blocking_future<Ref<PG>> get_or_create_pg( spg_t pgid, @@ -224,4 +223,9 @@ public: friend class PGAdvanceMap; }; +inline std::ostream& operator<<(std::ostream& out, const OSD& osd) { + osd.print(out); + return out; +} + } diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 7fe092f2e92..d6919ddfc00 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -12,6 +12,8 @@ #include <boost/smart_ptr/intrusive_ref_counter.hpp> #include <seastar/core/shared_mutex.hh> #include <seastar/core/future.hh> +#include <seastar/core/timer.hh> +#include <seastar/core/lowres_clock.hh> #include "include/ceph_assert.h" #include "crimson/osd/scheduler/scheduler.h" @@ -73,6 +75,9 @@ class blocking_future_detail { template <typename... V, typename... U> friend blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args); + template <typename... V, typename Exception> + friend blocking_future_detail<seastar::future<V...>> + make_exception_blocking_future(Exception&& e); template <typename U> friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u); @@ -100,6 +105,14 @@ blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... seastar::make_ready_future<V...>(std::forward<U>(args)...)); } +template <typename... V, typename Exception> +blocking_future_detail<seastar::future<V...>> +make_exception_blocking_future(Exception&& e) { + return blocking_future<V...>( + nullptr, + seastar::make_exception_future<V...>(e)); +} + /** * Provides an interface for dumping diagnostic information about * why a particular op is not making progress. @@ -269,6 +282,8 @@ class OperationRegistry { static_cast<int>(OperationTypeCode::last_op) > op_id_counters = {}; + seastar::timer<seastar::lowres_clock> shutdown_timer; + seastar::promise<> shutdown; public: template <typename T, typename... Args> typename T::IRef create_operation(Args&&... args) { @@ -277,6 +292,21 @@ public: op->set_id(op_id_counters[static_cast<int>(T::type)]++); return op; } + + seastar::future<> stop() { + shutdown_timer.set_callback([this] { + if (std::all_of(registries.begin(), + registries.end(), + [](auto& opl) { + return opl.empty(); + })) { + shutdown.set_value(); + shutdown_timer.cancel(); + } + }); + shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/)); + return shutdown.get_future(); + } }; /** diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 72f9f4396b3..f31621a4867 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -6,6 +6,7 @@ #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "crimson/common/exception.h" #include "crimson/osd/pg.h" #include "crimson/osd/osd.h" #include "common/Formatter.h" @@ -56,7 +57,9 @@ seastar::future<> ClientRequest::start() logger().debug("{}: start", *this); IRef opref = this; - return with_blocking_future(handle.enter(cp().await_map)) + return crimson::common::handle_system_shutdown( + [this, opref=std::move(opref)]() mutable { + return with_blocking_future(handle.enter(cp().await_map)) .then([this]() { return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); }).then([this](epoch_t epoch) { @@ -89,6 +92,7 @@ seastar::future<> ClientRequest::start() }); }); }); + }); } seastar::future<> ClientRequest::process_pg_op( diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc index 46a6d1ca394..ca99f45baee 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.cc +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -10,6 +10,7 @@ #include "common/Formatter.h" +#include "crimson/common/exception.h" #include "crimson/osd/pg.h" #include "crimson/osd/osd.h" #include "crimson/osd/osd_operations/compound_peering_request.h" @@ -162,13 +163,16 @@ seastar::future<> CompoundPeeringRequest::start() IRef ref = this; logger().info("{}: about to fork future", *this); - return with_blocking_future( - blocker->make_blocking_future(state->promise.get_future()) - ).then([this, blocker=std::move(blocker)](auto &&ctx) { - logger().info("{}: sub events complete", *this); - return osd.get_shard_services().dispatch_context_messages(std::move(ctx)); - }).then([this, ref=std::move(ref)] { - logger().info("{}: complete", *this); + return crimson::common::handle_system_shutdown( + [this, ref, blocker=std::move(blocker), state]() mutable { + return with_blocking_future( + blocker->make_blocking_future(state->promise.get_future()) + ).then([this, blocker=std::move(blocker)](auto &&ctx) { + logger().info("{}: sub events complete", *this); + return osd.get_shard_services().dispatch_context_messages(std::move(ctx)); + }).then([this, ref=std::move(ref)] { + logger().info("{}: complete", *this); + }); }); } diff --git a/src/crimson/osd/osdmap_gate.cc b/src/crimson/osd/osdmap_gate.cc index 573fc8061f3..90afc32b4e4 100644 --- a/src/crimson/osd/osdmap_gate.cc +++ b/src/crimson/osd/osdmap_gate.cc @@ -1,6 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include "crimson/common/exception.h" #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/shard_services.h" #include "common/Formatter.h" @@ -22,6 +23,10 @@ void OSDMapGate::OSDMapBlocker::dump_detail(Formatter *f) const blocking_future<epoch_t> OSDMapGate::wait_for_map(epoch_t epoch) { + if (__builtin_expect(stopping, false)) { + return make_exception_blocking_future<epoch_t>( + crimson::common::system_shutdown_exception()); + } if (current >= epoch) { return make_ready_blocking_future<epoch_t>(current); } else { @@ -51,4 +56,16 @@ void OSDMapGate::got_map(epoch_t epoch) { waiting_peering.erase(first, last); } +seastar::future<> OSDMapGate::stop() { + logger().info("osdmap::stop"); + stopping = true; + auto first = waiting_peering.begin(); + auto last = waiting_peering.end(); + std::for_each(first, last, [](auto& blocked_requests) { + blocked_requests.second.promise.set_exception( + crimson::common::system_shutdown_exception()); + }); + return seastar::now(); +} + } diff --git a/src/crimson/osd/osdmap_gate.h b/src/crimson/osd/osdmap_gate.h index 6a891cc152a..15e3d2057ab 100644 --- a/src/crimson/osd/osdmap_gate.h +++ b/src/crimson/osd/osdmap_gate.h @@ -51,6 +51,7 @@ class OSDMapGate { waiting_peering_t waiting_peering; epoch_t current = 0; std::optional<std::reference_wrapper<ShardServices>> shard_services; + bool stopping = false; public: OSDMapGate( const char *blocker_type, @@ -60,6 +61,7 @@ public: // wait for an osdmap whose epoch is greater or equal to given epoch blocking_future<epoch_t> wait_for_map(epoch_t epoch); void got_map(epoch_t epoch); + seastar::future<> stop(); }; } diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 4b80ee55de4..6f2da55c4f6 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -27,6 +27,7 @@ #include "os/Transaction.h" +#include "crimson/common/exception.h" #include "crimson/net/Connection.h" #include "crimson/net/Messenger.h" #include "crimson/os/cyanstore/cyan_store.h" @@ -149,7 +150,7 @@ void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay) // handle the peering event in the background check_readable_timer.cancel(); check_readable_timer.set_callback([last_peering_reset, this] { - shard_services.start_operation<LocalPeeringEvent>( + (void) shard_services.start_operation<LocalPeeringEvent>( this, shard_services, pg_whoami, @@ -237,7 +238,7 @@ void PG::on_activate_complete() wait_for_active_blocker.on_active(); if (peering_state.needs_recovery()) { - shard_services.start_operation<LocalPeeringEvent>( + (void) shard_services.start_operation<LocalPeeringEvent>( this, shard_services, pg_whoami, @@ -246,7 +247,7 @@ void PG::on_activate_complete() get_osdmap_epoch(), PeeringState::DoRecovery{}); } else if (peering_state.needs_backfill()) { - shard_services.start_operation<LocalPeeringEvent>( + (void) shard_services.start_operation<LocalPeeringEvent>( this, shard_services, pg_whoami, @@ -257,7 +258,7 @@ void PG::on_activate_complete() } else { logger().debug("{}: no need to recover or backfill, AllReplicasRecovered", " for pg: {}", __func__, pgid); - shard_services.start_operation<LocalPeeringEvent>( + (void) shard_services.start_operation<LocalPeeringEvent>( this, shard_services, pg_whoami, @@ -342,7 +343,7 @@ void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay) // handle the peering event in the background renew_lease_timer.cancel(); renew_lease_timer.set_callback([last_peering_reset, this] { - shard_services.start_operation<LocalPeeringEvent>( + (void) shard_services.start_operation<LocalPeeringEvent>( this, shard_services, pg_whoami, @@ -372,6 +373,11 @@ void PG::init( seastar::future<> PG::read_state(crimson::os::FuturizedStore* store) { + if (__builtin_expect(stopping, false)) { + return seastar::make_exception_future<>( + crimson::common::system_shutdown_exception()); + } + return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) { return pg_meta.load(); }).then([this, store](auto&& ret) { @@ -400,7 +406,7 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store) peering_state.set_role(rr); epoch_t epoch = get_osdmap_epoch(); - shard_services.start_operation<LocalPeeringEvent>( + (void) shard_services.start_operation<LocalPeeringEvent>( this, shard_services, pg_whoami, @@ -499,12 +505,23 @@ blocking_future<> PG::WaitForActiveBlocker::wait() } } +seastar::future<> PG::WaitForActiveBlocker::stop() +{ + p.set_exception(crimson::common::system_shutdown_exception()); + return seastar::now(); +} + seastar::future<> PG::submit_transaction(const OpInfo& op_info, const std::vector<OSDOp>& ops, ObjectContextRef&& obc, ceph::os::Transaction&& txn, const osd_op_params_t& osd_op_p) { + if (__builtin_expect(stopping, false)) { + return seastar::make_exception_future<>( + crimson::common::system_shutdown_exception()); + } + epoch_t map_epoch = get_osdmap_epoch(); std::vector<pg_log_entry_t> log_entries; @@ -548,6 +565,10 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops( ObjectContextRef obc, const OpInfo &op_info) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + using osd_op_errorator = OpsExecuter::osd_op_errorator; const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head() : m->get_hobj(); @@ -636,6 +657,10 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops( seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + auto ox = std::make_unique<OpsExecuter>(*this/* as const& */, m); return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) { logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op)); @@ -709,6 +734,10 @@ PG::load_obc_ertr::future< std::pair<crimson::osd::ObjectContextRef, bool>> PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + ceph_assert(!oid.is_head()); using ObjectContextRef = crimson::osd::ObjectContextRef; auto coid = resolve_oid(head->get_ro_ss(), oid); @@ -742,6 +771,10 @@ PG::load_obc_ertr::future< std::pair<crimson::osd::ObjectContextRef, bool>> PG::get_or_load_head_obc(hobject_t oid) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + ceph_assert(oid.is_head()); auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid); if (existed) { @@ -795,6 +828,10 @@ PG::load_obc_ertr::future<crimson::osd::ObjectContextRef> PG::get_locked_obc( Operation *op, const hobject_t &oid, RWState::State type) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + return get_or_load_head_obc(oid.get_head()).safe_then( [this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{ auto &[head_obc, head_existed] = p; @@ -835,6 +872,11 @@ PG::get_locked_obc( seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req) { + if (__builtin_expect(stopping, false)) { + return seastar::make_exception_future<>( + crimson::common::system_shutdown_exception()); + } + ceph::os::Transaction txn; auto encoded_txn = req->get_data().cbegin(); decode(txn, encoded_txn); @@ -861,4 +903,19 @@ void PG::handle_rep_op_reply(crimson::net::Connection* conn, backend->got_rep_op_reply(m); } +seastar::future<> PG::stop() +{ + logger().info("PG {} {}", pgid, __func__); + stopping = true; + return osdmap_gate.stop().then([this] { + return wait_for_active_blocker.stop(); + }).then([this] { + return recovery_handler->stop(); + }).then([this] { + return recovery_backend->stop(); + }).then([this] { + return backend->stop(); + }); +} + } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 92881492edf..a4e2650c994 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -183,7 +183,7 @@ public: template <typename T> void start_peering_event_operation(T &&evt, float delay = 0) { - shard_services.start_operation<LocalPeeringEvent>( + (void) shard_services.start_operation<LocalPeeringEvent>( this, shard_services, pg_whoami, @@ -506,6 +506,9 @@ public: const OpInfo &op_info, Operation *op, F &&f) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } auto [oid, type] = get_oid_and_lock(*m, op_info); return get_locked_obc(op, oid, type) .safe_then([f=std::forward<F>(f), type=type](auto obc) { @@ -559,6 +562,8 @@ public: ShardServices& get_shard_services() final { return shard_services; } + seastar::future<> stop(); + private: std::unique_ptr<PGBackend> backend; std::unique_ptr<RecoveryBackend> recovery_backend; @@ -620,6 +625,11 @@ public: } private: + // instead of seastar::gate, we use a boolean flag to indicate + // whether the system is shutting down, as we don't need to track + // continuations here. + bool stopping = false; + class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> { PG *pg; @@ -635,6 +645,7 @@ private: WaitForActiveBlocker(PG *pg) : pg(pg) {} void on_active(); blocking_future<> wait(); + seastar::future<> stop(); } wait_for_active_blocker; friend std::ostream& operator<<(std::ostream&, const PG& pg); diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index a5deceb0892..f203a72f114 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -15,6 +15,7 @@ #include "os/Transaction.h" #include "common/Clock.h" +#include "crimson/common/exception.h" #include "crimson/os/futurized_collection.h" #include "crimson/os/futurized_store.h" #include "crimson/osd/osd_operation.h" @@ -64,6 +65,10 @@ PGBackend::PGBackend(shard_id_t shard, PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t::ref> PGBackend::load_metadata(const hobject_t& oid) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + return store->get_attrs( coll, ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then( @@ -377,6 +382,10 @@ seastar::future<> PGBackend::remove(ObjectState& os, seastar::future<std::tuple<std::vector<hobject_t>, hobject_t>> PGBackend::list_objects(const hobject_t& start, uint64_t limit) const { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard}; return store->list_objects(coll, gstart, @@ -465,6 +474,10 @@ PGBackend::get_attr_errorator::future<ceph::bufferptr> PGBackend::getxattr( const hobject_t& soid, std::string_view key) const { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + return store->get_attr(coll, ghobject_t{soid}, key); } @@ -509,6 +522,10 @@ seastar::future<> PGBackend::omap_get_keys( const ObjectState& os, OSDOp& osd_op) const { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + std::string start_after; uint64_t max_return; try { @@ -551,6 +568,10 @@ seastar::future<> PGBackend::omap_get_vals( const ObjectState& os, OSDOp& osd_op) const { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + std::string start_after; uint64_t max_return; std::string filter_prefix; @@ -602,6 +623,10 @@ seastar::future<> PGBackend::omap_get_vals_by_keys( const ObjectState& os, OSDOp& osd_op) const { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + std::set<std::string> keys_to_get; try { auto p = osd_op.indata.cbegin(); diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 001e98eb268..bb2637fb328 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -136,10 +136,12 @@ public: const ghobject_t& oid); virtual void got_rep_op_reply(const MOSDRepOpReply&) {} + virtual seastar::future<> stop() = 0; protected: const shard_id_t shard; CollectionRef coll; crimson::os::FuturizedStore* store; + bool stopping = false; public: struct loaded_object_md_t { ObjectState os; diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index af63df60dd8..f040c7e3219 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -24,7 +24,7 @@ void PGRecovery::start_background_recovery( crimson::osd::scheduler::scheduler_class_t klass) { using BackgroundRecovery = crimson::osd::BackgroundRecovery; - pg->get_shard_services().start_operation<BackgroundRecovery>( + (void) pg->get_shard_services().start_operation<BackgroundRecovery>( static_cast<crimson::osd::PG*>(pg), pg->get_shard_services(), pg->get_osdmap_epoch(), @@ -58,7 +58,7 @@ PGRecovery::start_recovery_ops(size_t max_to_start) crimson::get_logger(ceph_subsys_osd).debug("start_recovery_ops: AllReplicasRecovered for pg: {}", pg->get_pgid()); using LocalPeeringEvent = crimson::osd::LocalPeeringEvent; - pg->get_shard_services().start_operation<LocalPeeringEvent>( + (void) pg->get_shard_services().start_operation<LocalPeeringEvent>( static_cast<crimson::osd::PG*>(pg), pg->get_shard_services(), pg->get_pg_whoami(), @@ -345,6 +345,7 @@ void PGRecovery::on_global_recover ( const object_stat_sum_t& stat_diff, const bool is_delete) { + crimson::get_logger(ceph_subsys_osd).info("{} {}", __func__, soid); pg->get_peering_state().object_recovered(soid, stat_diff); auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid); if (!is_delete) diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index ccc46237daf..d5309550e4a 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -22,7 +22,7 @@ public: crimson::osd::scheduler::scheduler_class_t klass); crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start); - + seastar::future<> stop() { return seastar::now(); } private: PGRecoveryListener* pg; size_t start_primary_recovery_ops( diff --git a/src/crimson/osd/pg_recovery_listener.h b/src/crimson/osd/pg_recovery_listener.h index 2946f93a53d..29e91e403ef 100644 --- a/src/crimson/osd/pg_recovery_listener.h +++ b/src/crimson/osd/pg_recovery_listener.h @@ -3,6 +3,8 @@ #pragma once +#include <seastar/core/future.hh> + #include "common/hobject.h" #include "include/types.h" #include "osd/osd_types.h" @@ -30,4 +32,5 @@ public: virtual bool is_unreadable_object(const hobject_t&) const = 0; virtual bool has_reset_since(epoch_t) const = 0; virtual std::vector<pg_shard_t> get_replica_recovery_order() const = 0; + virtual seastar::future<> stop() = 0; }; diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index 31ae27f91dd..af31db7d44e 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -1,6 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include "crimson/common/exception.h" #include "crimson/osd/recovery_backend.h" #include "crimson/osd/pg.h" @@ -34,7 +35,7 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t, temp_contents.clear(); for (auto& [soid, recovery_waiter] : recovering) { - if (recovery_waiter.obc) { + if (recovery_waiter.obc && recovery_waiter.obc->obs.exists) { recovery_waiter.obc->drop_recovery_read(); recovery_waiter.interrupt(why); } @@ -42,3 +43,15 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t, recovering.clear(); } +void RecoveryBackend::WaitForObjectRecovery::stop() { + readable.set_exception( + crimson::common::system_shutdown_exception()); + recovered.set_exception( + crimson::common::system_shutdown_exception()); + pulled.set_exception( + crimson::common::system_shutdown_exception()); + for (auto& [pg_shard, pr] : pushes) { + pr.set_exception( + crimson::common::system_shutdown_exception()); + } +} diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index 461ae620fb7..f5853c4da07 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -62,6 +62,13 @@ public: void on_peering_interval_change(ceph::os::Transaction& t) { clean_up(t, "new peering interval"); } + + seastar::future<> stop() { + for (auto& [soid, recovery_waiter] : recovering) { + recovery_waiter.stop(); + } + return on_stop(); + } protected: crimson::osd::PG& pg; crimson::osd::ShardServices& shard_services; @@ -135,6 +142,7 @@ protected: std::make_error_code(std::errc::interrupted), why)); } } + void stop(); void dump_detail(Formatter* f) const { } }; @@ -152,4 +160,5 @@ protected: temp_contents.erase(oid); } void clean_up(ceph::os::Transaction& t, const std::string& why); + virtual seastar::future<> on_stop() = 0; }; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index e4a5f39dca3..780d6a74619 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -5,6 +5,7 @@ #include "messages/MOSDRepOpReply.h" +#include "crimson/common/exception.h" #include "crimson/common/log.h" #include "crimson/os/futurized_store.h" #include "crimson/osd/shard_services.h" @@ -32,6 +33,9 @@ ReplicatedBackend::_read(const hobject_t& hoid, const uint64_t len, const uint32_t flags) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } return store->read(coll, ghobject_t{hoid}, off, len, flags); } @@ -43,6 +47,10 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards, epoch_t min_epoch, epoch_t map_epoch, std::vector<pg_log_entry_t>&& log_entries) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + const ceph_tid_t tid = next_txn_id++; auto req_id = osd_op_p.req->get_reqid(); auto pending_txn = @@ -101,3 +109,14 @@ void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply) } } } + +seastar::future<> ReplicatedBackend::stop() +{ + logger().info("ReplicatedBackend::stop {}", coll->get_cid()); + stopping = true; + for (auto& [tid, pending_on] : pending_trans) { + pending_on.all_committed.set_exception( + crimson::common::system_shutdown_exception()); + } + return seastar::now(); +} diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index 53a0038da8a..dbefb303006 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -22,6 +22,7 @@ public: CollectionRef coll, crimson::osd::ShardServices& shard_services); void got_rep_op_reply(const MOSDRepOpReply& reply) final; + seastar::future<> stop() final; private: ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid, uint64_t off, diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index 467d7a5f656..3994e8b873b 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -105,4 +105,7 @@ protected: const hobject_t& soid, eversion_t need, epoch_t epoch_frozen); + seastar::future<> on_stop() final { + return seastar::now(); + } }; diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 443fd0ee451..38d5d192ebd 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -9,6 +9,7 @@ #include "include/common_fwd.h" #include "osd_operation.h" #include "msg/MessageRef.h" +#include "crimson/common/exception.h" #include "crimson/os/futurized_collection.h" #include "osd/PeeringState.h" #include "crimson/osd/osdmap_service.h" @@ -90,10 +91,18 @@ public: template <typename T, typename... Args> auto start_operation(Args&&... args) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } auto op = registry.create_operation<T>(std::forward<Args>(args)...); return std::make_pair(op, op->start()); } + seastar::future<> stop() { + stopping = true; + return registry.stop(); + } + // Loggers PerfCounters &get_recoverystate_perf_logger() { return *recoverystate_perf; @@ -186,6 +195,12 @@ private: c->complete(0); } } finisher; + // prevent creating new osd operations when system is shutting down, + // this is necessary because there are chances that a new operation + // is created, after the interruption of all ongoing operations, and + // creats and waits on a new and may-never-resolve future, in which + // case the shutdown may never succeed. + bool stopping = false; public: AsyncReserver<spg_t, DirectFinisher> local_reserver; AsyncReserver<spg_t, DirectFinisher> remote_reserver; diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index 318c61156c3..0a0f22eb9ae 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -181,7 +181,9 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) server.msgr->set_auth_server(&server.dummy_auth); return server.msgr->bind(entity_addrvec_t{addr} ).then([&server] { - return server.msgr->start(&server.dispatcher); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(server.dispatcher); + return server.msgr->start(chained_dispatchers); }).then([&dispatcher=server.dispatcher, count] { return dispatcher.on_reply.wait([&dispatcher, count] { return dispatcher.count >= count; @@ -203,7 +205,9 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count) client.msgr->set_require_authorizer(false); client.msgr->set_auth_client(&client.dummy_auth); client.msgr->set_auth_server(&client.dummy_auth); - return client.msgr->start(&client.dispatcher).then( + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(client.dispatcher); + return client.msgr->start(chained_dispatchers).then( [addr, &client, &disp=client.dispatcher, count] { auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD); return seastar::do_until( diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index deb06990329..ad52ee39196 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -66,12 +66,15 @@ static seastar::future<> test_echo(unsigned rounds, msgr->set_require_authorizer(false); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).then([this] { - return msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { + return msgr->start(chained_dispatchers); }); } seastar::future<> shutdown() { ceph_assert(msgr); + msgr->remove_dispatcher(*this); return msgr->shutdown(); } }; @@ -104,13 +107,12 @@ static seastar::future<> test_echo(unsigned rounds, return found->second; } - seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override { + void ms_handle_connect(crimson::net::ConnectionRef conn) override { auto session = seastar::make_shared<PingSession>(); auto [i, added] = sessions.emplace(conn.get(), session); std::ignore = i; ceph_assert(added); session->connected_time = mono_clock::now(); - return seastar::now(); } seastar::future<> ms_dispatch(crimson::net::Connection* c, MessageRef m) override { @@ -137,11 +139,14 @@ static seastar::future<> test_echo(unsigned rounds, msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return msgr->start(chained_dispatchers); } seastar::future<> shutdown() { ceph_assert(msgr); + msgr->remove_dispatcher(*this); return msgr->shutdown(); } @@ -287,8 +292,10 @@ static seastar::future<> test_concurrent_dispatch(bool v2) msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).then([this] { - return msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { + return msgr->start(chained_dispatchers); }); } }; @@ -305,7 +312,9 @@ static seastar::future<> test_concurrent_dispatch(bool v2) msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return msgr->start(chained_dispatchers); } }; }; @@ -335,9 +344,11 @@ static seastar::future<> test_concurrent_dispatch(bool v2) return server->wait(); }).finally([client] { logger().info("client shutdown..."); + client->msgr->remove_dispatcher(*client); return client->msgr->shutdown(); }).finally([server] { logger().info("server shutdown..."); + server->msgr->remove_dispatcher(*server); return server->msgr->shutdown(); }).finally([server, client] { logger().info("test_concurrent_dispatch() done!\n"); @@ -365,14 +376,17 @@ seastar::future<> test_preemptive_shutdown(bool v2) { msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).then([this] { - return msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { + return msgr->start(chained_dispatchers); }); } entity_addr_t get_addr() const { return msgr->get_myaddr(); } seastar::future<> shutdown() { + msgr->remove_dispatcher(*this); return msgr->shutdown(); } }; @@ -398,7 +412,9 @@ seastar::future<> test_preemptive_shutdown(bool v2) { msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return msgr->start(chained_dispatchers); } void send_pings(const entity_addr_t& addr) { auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD); @@ -415,6 +431,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) { }); } seastar::future<> shutdown() { + msgr->remove_dispatcher(*this); return msgr->shutdown().then([this] { stop_send = true; return stopped_send_promise.get_future(); @@ -804,7 +821,7 @@ class FailoverSuite : public Dispatcher { return seastar::now(); } - seastar::future<> ms_handle_accept(ConnectionRef conn) override { + void ms_handle_accept(ConnectionRef conn) override { auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked accepted connection: {}", *conn); @@ -824,10 +841,10 @@ class FailoverSuite : public Dispatcher { ++result->cnt_accept_dispatched; logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}", result->cnt_accept_dispatched, result->index, *conn); - return flush_pending_send(); + std::ignore = flush_pending_send(); } - seastar::future<> ms_handle_connect(ConnectionRef conn) override { + void ms_handle_connect(ConnectionRef conn) override { auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked connected connection: {}", *conn); @@ -844,10 +861,9 @@ class FailoverSuite : public Dispatcher { ++result->cnt_connect_dispatched; logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}", result->cnt_connect_dispatched, result->index, *conn); - return seastar::now(); } - seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override { + void ms_handle_reset(ConnectionRef conn, bool is_replace) override { auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked reset connection: {}", *conn); @@ -866,10 +882,9 @@ class FailoverSuite : public Dispatcher { ++result->cnt_reset_dispatched; logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}", result->cnt_reset_dispatched, result->index, *conn); - return seastar::now(); } - seastar::future<> ms_handle_remote_reset(ConnectionRef conn) override { + void ms_handle_remote_reset(ConnectionRef conn) override { auto result = interceptor.find_result(conn); if (result == nullptr) { logger().error("Untracked remotely reset connection: {}", *conn); @@ -886,7 +901,6 @@ class FailoverSuite : public Dispatcher { ++result->cnt_remote_reset_dispatched; logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}", result->cnt_remote_reset_dispatched, result->index, *conn); - return seastar::now(); } private: @@ -895,8 +909,10 @@ class FailoverSuite : public Dispatcher { test_msgr->set_auth_client(&dummy_auth); test_msgr->set_auth_server(&dummy_auth); test_msgr->interceptor = &interceptor; - return test_msgr->bind(entity_addrvec_t{addr}).then([this] { - return test_msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return test_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { + return test_msgr->start(chained_dispatchers); }); } @@ -1017,6 +1033,7 @@ class FailoverSuite : public Dispatcher { } seastar::future<> shutdown() { + test_msgr->remove_dispatcher(*this); return test_msgr->shutdown(); } @@ -1239,7 +1256,9 @@ class FailoverTest : public Dispatcher { cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0)); cmd_msgr->set_auth_client(&dummy_auth); cmd_msgr->set_auth_server(&dummy_auth); - return cmd_msgr->start(this).then([this, cmd_peer_addr] { + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return cmd_msgr->start(chained_dispatchers).then([this, cmd_peer_addr] { logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr); cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD); return pingpong(); @@ -1262,6 +1281,7 @@ class FailoverTest : public Dispatcher { return cmd_conn->send(m).then([] { return seastar::sleep(200ms); }).finally([this] { + cmd_msgr->remove_dispatcher(*this); return cmd_msgr->shutdown(); }); } @@ -1376,20 +1396,19 @@ class FailoverSuitePeer : public Dispatcher { return op_callback(); } - seastar::future<> ms_handle_accept(ConnectionRef conn) override { + void ms_handle_accept(ConnectionRef conn) override { logger().info("[TestPeer] got accept from Test"); ceph_assert(!tracked_conn || tracked_conn->is_closed() || tracked_conn == conn); tracked_conn = conn; - return flush_pending_send(); + std::ignore = flush_pending_send(); } - seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override { + void ms_handle_reset(ConnectionRef conn, bool is_replace) override { logger().info("[TestPeer] got reset from Test"); ceph_assert(tracked_conn == conn); tracked_conn = nullptr; - return seastar::now(); } private: @@ -1397,8 +1416,10 @@ class FailoverSuitePeer : public Dispatcher { peer_msgr->set_default_policy(policy); peer_msgr->set_auth_client(&dummy_auth); peer_msgr->set_auth_server(&dummy_auth); - return peer_msgr->bind(entity_addrvec_t{addr}).then([this] { - return peer_msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + return peer_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { + chained_dispatchers->push_back(*this); + return peer_msgr->start(chained_dispatchers); }); } @@ -1430,6 +1451,7 @@ class FailoverSuitePeer : public Dispatcher { : peer_msgr(peer_msgr), op_callback(op_callback) { } seastar::future<> shutdown() { + peer_msgr->remove_dispatcher(*this); return peer_msgr->shutdown(); } @@ -1505,6 +1527,7 @@ class FailoverTestPeer : public Dispatcher { if (cmd == cmd_t::shutdown) { logger().info("CmdSrv shutdown..."); // forwarded to FailoverTestPeer::wait() + cmd_msgr->remove_dispatcher(*this); (void) cmd_msgr->shutdown(); return seastar::now(); } @@ -1518,9 +1541,8 @@ class FailoverTestPeer : public Dispatcher { } } - seastar::future<> ms_handle_accept(ConnectionRef conn) override { + void ms_handle_accept(ConnectionRef conn) override { cmd_conn = conn; - return seastar::now(); } private: @@ -1573,8 +1595,10 @@ class FailoverTestPeer : public Dispatcher { cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0)); cmd_msgr->set_auth_client(&dummy_auth); cmd_msgr->set_auth_server(&dummy_auth); - return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] { - return cmd_msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this, chained_dispatchers]() mutable { + return cmd_msgr->start(chained_dispatchers); }); } diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index 1275b451ebe..45335bd5030 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -24,6 +24,7 @@ DummyAuthHandler dummy_handler; static seastar::future<> test_monc() { + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); return crimson::common::sharded_conf().start(EntityName{}, string_view{"ceph"}).then([] { std::vector<const char*> args; std::string cluster; @@ -38,7 +39,7 @@ static seastar::future<> test_monc() return conf.parse_config_files(conf_file_list); }).then([] { return crimson::common::sharded_perf_coll().start(); - }).then([] { + }).then([chained_dispatchers]() mutable { auto msgr = crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0); auto& conf = crimson::common::local_conf(); if (conf->ms_crc_data) { @@ -49,8 +50,9 @@ static seastar::future<> test_monc() } msgr->set_require_authorizer(false); return seastar::do_with(MonClient{*msgr, dummy_handler}, - [msgr](auto& monc) { - return msgr->start(&monc).then([&monc] { + [msgr, chained_dispatchers](auto& monc) mutable { + chained_dispatchers->push_back(monc); + return msgr->start(chained_dispatchers).then([&monc] { return seastar::with_timeout( seastar::lowres_clock::now() + std::chrono::seconds{10}, monc.start()); diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc index 70335c44476..30793487d78 100644 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -181,7 +181,9 @@ static seastar::future<> run( msgr->set_crc_data(); } return msgr->bind(entity_addrvec_t{addr}).then([this] { - return msgr->start(this); + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return msgr->start(chained_dispatchers); }); }); } @@ -297,9 +299,8 @@ static seastar::future<> run( return nr_depth - depth.current(); } - seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override { + void ms_handle_connect(crimson::net::ConnectionRef conn) override { conn_stats.connected_time = mono_clock::now(); - return seastar::now(); } seastar::future<> ms_dispatch(crimson::net::Connection* c, MessageRef m) override { @@ -331,7 +332,9 @@ static seastar::future<> run( } seastar::future<> init(bool v1_crc_enabled) { - return container().invoke_on_all([v1_crc_enabled] (auto& client) { + auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>(); + chained_dispatchers->push_back(*this); + return container().invoke_on_all([v1_crc_enabled, chained_dispatchers] (auto& client) mutable { if (client.is_active()) { client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid); client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); @@ -342,7 +345,7 @@ static seastar::future<> run( client.msgr->set_crc_header(); client.msgr->set_crc_data(); } - return client.msgr->start(&client); + return client.msgr->start(chained_dispatchers); } return seastar::now(); }); |