summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/crimson/CMakeLists.txt3
-rw-r--r--src/crimson/common/exception.h37
-rw-r--r--src/crimson/common/gated.h50
-rw-r--r--src/crimson/mgr/client.cc73
-rw-r--r--src/crimson/mgr/client.h13
-rw-r--r--src/crimson/mon/MonClient.cc119
-rw-r--r--src/crimson/mon/MonClient.h11
-rw-r--r--src/crimson/net/Dispatcher.h23
-rw-r--r--src/crimson/net/Messenger.h8
-rw-r--r--src/crimson/net/Protocol.cc15
-rw-r--r--src/crimson/net/Protocol.h26
-rw-r--r--src/crimson/net/ProtocolV1.cc25
-rw-r--r--src/crimson/net/ProtocolV1.h4
-rw-r--r--src/crimson/net/ProtocolV2.cc54
-rw-r--r--src/crimson/net/ProtocolV2.h7
-rw-r--r--src/crimson/net/SocketConnection.cc2
-rw-r--r--src/crimson/net/SocketConnection.h3
-rw-r--r--src/crimson/net/SocketMessenger.cc32
-rw-r--r--src/crimson/net/SocketMessenger.h25
-rw-r--r--src/crimson/net/chained_dispatchers.cc39
-rw-r--r--src/crimson/net/chained_dispatchers.h45
-rw-r--r--src/crimson/os/alienstore/alien_store.cc2
-rw-r--r--src/crimson/os/cyanstore/cyan_store.h4
-rw-r--r--src/crimson/osd/CMakeLists.txt1
-rw-r--r--src/crimson/osd/chained_dispatchers.cc40
-rw-r--r--src/crimson/osd/chained_dispatchers.h30
-rw-r--r--src/crimson/osd/ec_backend.h3
-rw-r--r--src/crimson/osd/heartbeat.cc54
-rw-r--r--src/crimson/osd/heartbeat.h13
-rw-r--r--src/crimson/osd/osd.cc189
-rw-r--r--src/crimson/osd/osd.h18
-rw-r--r--src/crimson/osd/osd_operation.h30
-rw-r--r--src/crimson/osd/osd_operations/client_request.cc6
-rw-r--r--src/crimson/osd/osd_operations/compound_peering_request.cc18
-rw-r--r--src/crimson/osd/osdmap_gate.cc17
-rw-r--r--src/crimson/osd/osdmap_gate.h2
-rw-r--r--src/crimson/osd/pg.cc69
-rw-r--r--src/crimson/osd/pg.h13
-rw-r--r--src/crimson/osd/pg_backend.cc25
-rw-r--r--src/crimson/osd/pg_backend.h2
-rw-r--r--src/crimson/osd/pg_recovery.cc5
-rw-r--r--src/crimson/osd/pg_recovery.h2
-rw-r--r--src/crimson/osd/pg_recovery_listener.h3
-rw-r--r--src/crimson/osd/recovery_backend.cc15
-rw-r--r--src/crimson/osd/recovery_backend.h9
-rw-r--r--src/crimson/osd/replicated_backend.cc19
-rw-r--r--src/crimson/osd/replicated_backend.h1
-rw-r--r--src/crimson/osd/replicated_recovery_backend.h3
-rw-r--r--src/crimson/osd/shard_services.h15
-rw-r--r--src/test/crimson/test_alien_echo.cc8
-rw-r--r--src/test/crimson/test_messenger.cc88
-rw-r--r--src/test/crimson/test_monc.cc8
-rw-r--r--src/tools/crimson/perf_crimson_msgr.cc13
53 files changed, 928 insertions, 411 deletions
diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt
index 61a45391cd3..e30aef360fe 100644
--- a/src/crimson/CMakeLists.txt
+++ b/src/crimson/CMakeLists.txt
@@ -164,7 +164,8 @@ set(crimson_net_srcs
net/Socket.cc
net/Protocol.cc
net/ProtocolV1.cc
- net/ProtocolV2.cc)
+ net/ProtocolV2.cc
+ net/chained_dispatchers.cc)
set(crimson_thread_srcs
thread/ThreadPool.cc
thread/Throttle.cc)
diff --git a/src/crimson/common/exception.h b/src/crimson/common/exception.h
new file mode 100644
index 00000000000..28faf75f1b9
--- /dev/null
+++ b/src/crimson/common/exception.h
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <exception>
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
+
+#include "crimson/common/log.h"
+
+namespace crimson::common {
+
+class system_shutdown_exception final : public std::exception{
+public:
+ const char* what() const noexcept final {
+ return "system shutting down";
+ }
+};
+
+template<typename Func, typename... Args>
+inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args)
+{
+ return seastar::futurize_invoke(std::forward<Func>(func),
+ std::forward<Args>(args)...)
+ .handle_exception([](std::exception_ptr eptr) {
+ if (*eptr.__cxa_exception_type() ==
+ typeid(crimson::common::system_shutdown_exception)) {
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "operation skipped, system shutdown");
+ return seastar::now();
+ }
+ std::rethrow_exception(eptr);
+ });
+}
+
+}
diff --git a/src/crimson/common/gated.h b/src/crimson/common/gated.h
new file mode 100644
index 00000000000..679e06b6491
--- /dev/null
+++ b/src/crimson/common/gated.h
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
+
+#include "crimson/common/exception.h"
+#include "crimson/common/log.h"
+#include "include/ceph_assert.h"
+
+namespace crimson::common {
+
+class Gated {
+ public:
+ static seastar::logger& gated_logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+ template <typename Func, typename T>
+ inline void dispatch_in_background(const char* what, T& who, Func&& func) {
+ (void) dispatch(what, who, func);
+ }
+ template <typename Func, typename T>
+ inline seastar::future<> dispatch(const char* what, T& who, Func&& func) {
+ return seastar::with_gate(pending_dispatch, std::forward<Func>(func)
+ ).handle_exception([this, what, &who] (std::exception_ptr eptr) {
+ if (*eptr.__cxa_exception_type() == typeid(system_shutdown_exception)) {
+ gated_logger().debug(
+ "{}, {} skipped, system shutdown", who, what);
+ return seastar::now();
+ }
+ gated_logger().error(
+ "{} dispatch() {} caught exception: {}", who, what, eptr);
+ ceph_abort("unexpected exception from dispatch()");
+ });
+ }
+
+ seastar::future<> close() {
+ return pending_dispatch.close();
+ }
+ bool is_closed() const {
+ return pending_dispatch.is_closed();
+ }
+ private:
+ seastar::gate pending_dispatch;
+};
+
+}// namespace crimson::common
diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc
index b6331191577..74403fc4e95 100644
--- a/src/crimson/mgr/client.cc
+++ b/src/crimson/mgr/client.cc
@@ -38,46 +38,54 @@ seastar::future<> Client::start()
seastar::future<> Client::stop()
{
- return gate.close().then([this] {
- if (conn) {
- conn->mark_down();
- }
- });
+ logger().info("{}", __func__);
+ report_timer.cancel();
+ auto fut = gate.close();
+ if (conn) {
+ conn->mark_down();
+ }
+ return fut;
}
seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn,
MessageRef m)
{
- switch(m->get_type()) {
- case MSG_MGR_MAP:
- return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
- case MSG_MGR_CONFIGURE:
- return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
- default:
- return seastar::now();
- }
+ return gate.dispatch(__func__, *this, [this, conn, &m] {
+ switch(m->get_type()) {
+ case MSG_MGR_MAP:
+ return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
+ case MSG_MGR_CONFIGURE:
+ return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
+ default:
+ return seastar::now();
+ }
+ });
}
-seastar::future<> Client::ms_handle_connect(crimson::net::ConnectionRef c)
+void Client::ms_handle_connect(crimson::net::ConnectionRef c)
{
- if (conn == c) {
- // ask for the mgrconfigure message
- auto m = ceph::make_message<MMgrOpen>();
- m->daemon_name = local_conf()->name.get_id();
- return conn->send(std::move(m));
- } else {
- return seastar::now();
- }
+ gate.dispatch_in_background(__func__, *this, [this, c] {
+ if (conn == c) {
+ // ask for the mgrconfigure message
+ auto m = ceph::make_message<MMgrOpen>();
+ m->daemon_name = local_conf()->name.get_id();
+ return conn->send(std::move(m));
+ } else {
+ return seastar::now();
+ }
+ });
}
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef c, bool is_replace)
+void Client::ms_handle_reset(crimson::net::ConnectionRef c, bool is_replace)
{
- if (conn == c) {
- report_timer.cancel();
- return reconnect();
- } else {
- return seastar::now();
- }
+ gate.dispatch_in_background(__func__, *this, [this, c, is_replace] {
+ if (conn == c) {
+ report_timer.cancel();
+ return reconnect();
+ } else {
+ return seastar::now();
+ }
+ });
}
seastar::future<> Client::reconnect()
@@ -134,11 +142,16 @@ seastar::future<> Client::handle_mgr_conf(crimson::net::Connection* conn,
void Client::report()
{
- (void) seastar::with_gate(gate, [this] {
+ gate.dispatch_in_background(__func__, *this, [this] {
assert(conn);
auto pg_stats = with_stats.get_stats();
return conn->send(std::move(pg_stats));
});
}
+void Client::print(std::ostream& out) const
+{
+ out << "mgrc ";
+}
+
}
diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h
index 3873ba6418b..51130b06bc5 100644
--- a/src/crimson/mgr/client.h
+++ b/src/crimson/mgr/client.h
@@ -39,8 +39,8 @@ public:
private:
seastar::future<> ms_dispatch(crimson::net::Connection* conn,
Ref<Message> m) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
- seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final;
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
+ void ms_handle_connect(crimson::net::ConnectionRef conn) final;
seastar::future<> handle_mgr_map(crimson::net::Connection* conn,
Ref<MMgrMap> m);
seastar::future<> handle_mgr_conf(crimson::net::Connection* conn,
@@ -48,13 +48,20 @@ private:
seastar::future<> reconnect();
void report();
+ void print(std::ostream&) const;
+ friend std::ostream& operator<<(std::ostream& out, const Client& client);
private:
MgrMap mgrmap;
crimson::net::Messenger& msgr;
WithStats& with_stats;
crimson::net::ConnectionRef conn;
seastar::timer<seastar::lowres_clock> report_timer;
- seastar::gate gate;
+ crimson::common::Gated gate;
};
+inline std::ostream& operator<<(std::ostream& out, const Client& client) {
+ client.print(out);
+ return out;
+}
+
}
diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc
index a71dabdf810..c8cfac8aa09 100644
--- a/src/crimson/mon/MonClient.cc
+++ b/src/crimson/mon/MonClient.cc
@@ -496,7 +496,7 @@ seastar::future<> Client::load_keyring()
void Client::tick()
{
- (void) seastar::with_gate(tick_gate, [this] {
+ gate.dispatch_in_background(__func__, *this, [this] {
if (active_con) {
return seastar::when_all_succeed(active_con->get_conn()->keepalive(),
active_con->renew_tickets(),
@@ -514,50 +514,54 @@ bool Client::is_hunting() const {
seastar::future<>
Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
{
- // we only care about these message types
- switch (m->get_type()) {
- case CEPH_MSG_MON_MAP:
- return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m));
- case CEPH_MSG_AUTH_REPLY:
- return handle_auth_reply(
- conn, boost::static_pointer_cast<MAuthReply>(m));
- case CEPH_MSG_MON_SUBSCRIBE_ACK:
- return handle_subscribe_ack(
- boost::static_pointer_cast<MMonSubscribeAck>(m));
- case CEPH_MSG_MON_GET_VERSION_REPLY:
- return handle_get_version_reply(
- boost::static_pointer_cast<MMonGetVersionReply>(m));
- case MSG_MON_COMMAND_ACK:
- return handle_mon_command_ack(
- boost::static_pointer_cast<MMonCommandAck>(m));
- case MSG_LOGACK:
- return handle_log_ack(
- boost::static_pointer_cast<MLogAck>(m));
- case MSG_CONFIG:
- return handle_config(
- boost::static_pointer_cast<MConfig>(m));
- default:
- return seastar::now();
- }
+ return gate.dispatch(__func__, *this, [this, conn, &m] {
+ // we only care about these message types
+ switch (m->get_type()) {
+ case CEPH_MSG_MON_MAP:
+ return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m));
+ case CEPH_MSG_AUTH_REPLY:
+ return handle_auth_reply(
+ conn, boost::static_pointer_cast<MAuthReply>(m));
+ case CEPH_MSG_MON_SUBSCRIBE_ACK:
+ return handle_subscribe_ack(
+ boost::static_pointer_cast<MMonSubscribeAck>(m));
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ return handle_get_version_reply(
+ boost::static_pointer_cast<MMonGetVersionReply>(m));
+ case MSG_MON_COMMAND_ACK:
+ return handle_mon_command_ack(
+ boost::static_pointer_cast<MMonCommandAck>(m));
+ case MSG_LOGACK:
+ return handle_log_ack(
+ boost::static_pointer_cast<MLogAck>(m));
+ case MSG_CONFIG:
+ return handle_config(
+ boost::static_pointer_cast<MConfig>(m));
+ default:
+ return seastar::now();
+ }
+ });
}
-seastar::future<> Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
+void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
- auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
- [peer_addr = conn->get_peer_addr()](auto& mc) {
- return mc->is_my_peer(peer_addr);
- });
- if (found != pending_conns.end()) {
- logger().warn("pending conn reset by {}", conn->get_peer_addr());
- (*found)->close();
- return seastar::now();
- } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
- logger().warn("active conn reset {}", conn->get_peer_addr());
- active_con.reset();
- return reopen_session(-1);
- } else {
- return seastar::now();
- }
+ gate.dispatch_in_background(__func__, *this, [this, conn, is_replace] {
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc->is_my_peer(peer_addr);
+ });
+ if (found != pending_conns.end()) {
+ logger().warn("pending conn reset by {}", conn->get_peer_addr());
+ (*found)->close();
+ return seastar::now();
+ } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
+ logger().warn("active conn reset {}", conn->get_peer_addr());
+ active_con.reset();
+ return reopen_session(-1);
+ } else {
+ return seastar::now();
+ }
+ });
}
std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
@@ -613,7 +617,10 @@ int Client::handle_auth_request(crimson::net::ConnectionRef con,
return -EOPNOTSUPP;
}
auto authorizer_challenge = &auth_meta->authorizer_challenge;
- ceph_assert(active_con);
+ if (!active_con) {
+ logger().error("connection to monitors is down, abort connection for now");
+ return -EBUSY;
+ }
if (!HAVE_FEATURE(active_con->get_conn()->get_features(), CEPHX_V2)) {
if (local_conf().get_val<uint64_t>("cephx_service_require_version") >= 2) {
return -EACCES;
@@ -915,12 +922,16 @@ seastar::future<> Client::authenticate()
seastar::future<> Client::stop()
{
- return tick_gate.close().then([this] {
- timer.cancel();
- if (active_con) {
- active_con->close();
- }
- });
+ logger().info("{}", __func__);
+ auto fut = gate.close();
+ timer.cancel();
+ for (auto& pending_con : pending_conns) {
+ pending_con->close();
+ }
+ if (active_con) {
+ active_con->close();
+ }
+ return fut;
}
seastar::future<> Client::reopen_session(int rank)
@@ -995,7 +1006,10 @@ seastar::future<> Client::reopen_session(int rank)
return seastar::make_exception_future(ep);
});
}).then([this] {
- ceph_assert_always(active_con);
+ if (!active_con) {
+ return seastar::make_exception_future(
+ crimson::common::system_shutdown_exception());
+ }
return active_con->renew_rotating_keyring();
});
}
@@ -1058,4 +1072,9 @@ seastar::future<> Client::renew_subs()
});
}
+void Client::print(std::ostream& out) const
+{
+ out << "mon." << entity_name;
+}
+
} // namespace crimson::mon
diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h
index 3c3a5b6eddc..c33ac992698 100644
--- a/src/crimson/mon/MonClient.h
+++ b/src/crimson/mon/MonClient.h
@@ -17,6 +17,7 @@
#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
#include "crimson/common/auth_handler.h"
+#include "crimson/common/gated.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Fwd.h"
@@ -54,7 +55,6 @@ class Client : public crimson::net::Dispatcher,
std::unique_ptr<Connection> active_con;
std::vector<std::unique_ptr<Connection>> pending_conns;
seastar::timer<seastar::lowres_clock> timer;
- seastar::gate tick_gate;
crimson::net::Messenger& msgr;
@@ -91,6 +91,7 @@ public:
bool sub_want_increment(const std::string& what, version_t start, unsigned flags);
seastar::future<> renew_subs();
+ void print(std::ostream&) const;
private:
// AuthServer methods
std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
@@ -141,7 +142,7 @@ private:
seastar::future<> ms_dispatch(crimson::net::Connection* conn,
MessageRef m) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
seastar::future<> handle_monmap(crimson::net::Connection* conn,
Ref<MMonMap> m);
@@ -161,6 +162,12 @@ private:
seastar::future<> reopen_session(int rank);
std::vector<unsigned> get_random_mons(unsigned n) const;
seastar::future<> _add_conn(unsigned rank, uint64_t global_id);
+ crimson::common::Gated gate;
};
+inline std::ostream& operator<<(std::ostream& out, const Client& client) {
+ client.print(out);
+ return out;
+}
+
} // namespace crimson::mon
diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h
index a7ca3d9d2b3..fd26d146166 100644
--- a/src/crimson/net/Dispatcher.h
+++ b/src/crimson/net/Dispatcher.h
@@ -16,40 +16,35 @@
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
+#include <boost/intrusive/slist.hpp>
+#include "crimson/common/gated.h"
#include "Fwd.h"
class AuthAuthorizer;
namespace crimson::net {
-class Dispatcher {
+class Dispatcher : public boost::intrusive::slist_base_hook<
+ boost::intrusive::link_mode<
+ boost::intrusive::safe_link>> {
public:
virtual ~Dispatcher() {}
virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) {
return seastar::make_ready_future<>();
}
+ virtual void ms_handle_accept(ConnectionRef conn) {}
- virtual seastar::future<> ms_handle_accept(ConnectionRef conn) {
- return seastar::make_ready_future<>();
- }
-
- virtual seastar::future<> ms_handle_connect(ConnectionRef conn) {
- return seastar::make_ready_future<>();
- }
+ virtual void ms_handle_connect(ConnectionRef conn) {}
// a reset event is dispatched when the connection is closed unexpectedly.
// is_replace=true means the reset connection is going to be replaced by
// another accepting connection with the same peer_addr, which currently only
// happens under lossy policy when both sides wish to connect to each other.
- virtual seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) {
- return seastar::make_ready_future<>();
- }
+ virtual void ms_handle_reset(ConnectionRef conn, bool is_replace) {}
- virtual seastar::future<> ms_handle_remote_reset(ConnectionRef conn) {
- return seastar::make_ready_future<>();
- }
+ virtual void ms_handle_remote_reset(ConnectionRef conn) {}
};
} // namespace crimson::net
diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h
index 6423f89d967..2b136aa4fe0 100644
--- a/src/crimson/net/Messenger.h
+++ b/src/crimson/net/Messenger.h
@@ -17,6 +17,7 @@
#include <seastar/core/future.hh>
#include "Fwd.h"
+#include "crimson/net/chained_dispatchers.h"
#include "crimson/thread/Throttle.h"
#include "msg/Message.h"
#include "msg/Policy.h"
@@ -72,7 +73,7 @@ public:
uint32_t min_port, uint32_t max_port) = 0;
/// start the messenger
- virtual seastar::future<> start(Dispatcher *dispatcher) = 0;
+ virtual seastar::future<> start(ChainedDispatchersRef) = 0;
/// either return an existing connection to the peer,
/// or a new pending connection
@@ -89,6 +90,11 @@ public:
// wait for messenger shutdown
virtual seastar::future<> wait() = 0;
+ virtual void add_dispatcher(Dispatcher&) = 0;
+
+ virtual void remove_dispatcher(Dispatcher&) = 0;
+
+ virtual bool dispatcher_chain_empty() const = 0;
/// stop listenening and wait for all connections to close. safe to destruct
/// after this future becomes available
virtual seastar::future<> shutdown() = 0;
diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc
index de2e02cee03..78578a5959a 100644
--- a/src/crimson/net/Protocol.cc
+++ b/src/crimson/net/Protocol.cc
@@ -21,7 +21,7 @@ namespace {
namespace crimson::net {
Protocol::Protocol(proto_t type,
- Dispatcher& dispatcher,
+ ChainedDispatchersRef& dispatcher,
SocketConnection& conn)
: proto_type(type),
dispatcher(dispatcher),
@@ -31,7 +31,7 @@ Protocol::Protocol(proto_t type,
Protocol::~Protocol()
{
- ceph_assert(pending_dispatch.is_closed());
+ ceph_assert(gate.is_closed());
assert(!exit_open);
}
@@ -51,6 +51,7 @@ void Protocol::close(bool dispatch_reset,
// unregister_conn() drops a reference, so hold another until completion
auto cleanup = [conn_ref = conn.shared_from_this(), this] {
logger().debug("{} closed!", conn);
+ on_closed();
#ifdef UNIT_TESTS_BUILT
is_closed_clean = true;
if (conn.interceptor) {
@@ -69,12 +70,12 @@ void Protocol::close(bool dispatch_reset,
socket->shutdown();
}
set_write_state(write_state_t::drop);
- auto gate_closed = pending_dispatch.close();
+ auto gate_closed = gate.close();
auto reset_dispatched = seastar::futurize_invoke([this, dispatch_reset, is_replace] {
if (dispatch_reset) {
- return dispatcher.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
- is_replace);
+ dispatcher->ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
}
return seastar::now();
}).handle_exception([this] (std::exception_ptr eptr) {
@@ -312,7 +313,7 @@ void Protocol::write_event()
case write_state_t::open:
[[fallthrough]];
case write_state_t::delay:
- gated_dispatch("do_write_dispatch_sweep", [this] {
+ gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
return do_write_dispatch_sweep();
});
return;
diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h
index 290cb3fb843..3deb706acb4 100644
--- a/src/crimson/net/Protocol.h
+++ b/src/crimson/net/Protocol.h
@@ -6,6 +6,7 @@
#include <seastar/core/gate.hh>
#include <seastar/core/shared_future.hh>
+#include "crimson/common/gated.h"
#include "crimson/common/log.h"
#include "Fwd.h"
#include "SocketConnection.h"
@@ -46,9 +47,10 @@ class Protocol {
virtual void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) = 0;
+ virtual void print(std::ostream&) const = 0;
protected:
Protocol(proto_t type,
- Dispatcher& dispatcher,
+ ChainedDispatchersRef& dispatcher,
SocketConnection& conn);
virtual void trigger_close() = 0;
@@ -62,22 +64,14 @@ class Protocol {
virtual void notify_write() {};
+ virtual void on_closed() {}
+
public:
const proto_t proto_type;
SocketRef socket;
protected:
- template <typename Func>
- void gated_dispatch(const char* what, Func&& func) {
- (void) seastar::with_gate(pending_dispatch, std::forward<Func>(func)
- ).handle_exception([this, what] (std::exception_ptr eptr) {
- crimson::get_logger(ceph_subsys_ms).error(
- "{} gated_dispatch() {} caught exception: {}", conn, what, eptr);
- ceph_abort("unexpected exception from gated_dispatch()");
- });
- }
-
- Dispatcher &dispatcher;
+ ChainedDispatchersRef dispatcher;
SocketConnection &conn;
AuthConnectionMetaRef auth_meta;
@@ -86,7 +80,6 @@ class Protocol {
bool closed = false;
// become valid only after closed == true
seastar::shared_future<> close_ready;
- seastar::gate pending_dispatch;
// the write state-machine
public:
@@ -150,6 +143,7 @@ class Protocol {
}
void ack_writes(seq_num_t seq);
+ crimson::common::Gated gate;
private:
write_state_t write_state = write_state_t::none;
@@ -170,4 +164,10 @@ class Protocol {
void write_event();
};
+inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
+ proto.print(out);
+ return out;
+}
+
+
} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc
index b7ceb3c50e3..eaed769dcd3 100644
--- a/src/crimson/net/ProtocolV1.cc
+++ b/src/crimson/net/ProtocolV1.cc
@@ -125,7 +125,7 @@ void discard_up_to(std::deque<MessageRef>* queue,
namespace crimson::net {
-ProtocolV1::ProtocolV1(Dispatcher& dispatcher,
+ProtocolV1::ProtocolV1(ChainedDispatchersRef& dispatcher,
SocketConnection& conn,
SocketMessenger& messenger)
: Protocol(proto_t::v1, dispatcher, conn), messenger{messenger} {}
@@ -322,7 +322,7 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
conn.policy = messenger.get_policy(_peer_name.type());
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- gated_dispatch("start_connect", [this] {
+ gate.dispatch_in_background("start_connect", *this, [this] {
return Socket::connect(conn.peer_addr)
.then([this](SocketRef sock) {
socket = std::move(sock);
@@ -624,7 +624,7 @@ void ProtocolV1::start_accept(SocketRef&& sock,
socket = std::move(sock);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- gated_dispatch("start_accept", [this] {
+ gate.dispatch_in_background("start_accept", *this, [this] {
// stop learning my_addr before sending it out, so it won't change
return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
// encode/send server's handshake header
@@ -850,10 +850,10 @@ seastar::future<> ProtocolV1::read_message()
}
// start dispatch, ignoring exceptions from the application layer
- gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
+ gate.dispatch_in_background("ms_dispatch", *this, [this, msg = std::move(msg_ref)] {
logger().debug("{} <== #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
- return dispatcher.ms_dispatch(&conn, std::move(msg));
+ return dispatcher->ms_dispatch(&conn, std::move(msg));
});
});
}
@@ -894,18 +894,18 @@ void ProtocolV1::execute_open(open_t type)
set_write_state(write_state_t::open);
if (type == open_t::connected) {
- gated_dispatch("ms_handle_connect", [this] {
- return dispatcher.ms_handle_connect(
+ gate.dispatch_in_background("ms_handle_connect", *this, [this] {
+ return dispatcher->ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
});
} else { // type == open_t::accepted
- gated_dispatch("ms_handle_accept", [this] {
- return dispatcher.ms_handle_accept(
+ gate.dispatch_in_background("ms_handle_accept", *this, [this] {
+ return dispatcher->ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
});
}
- gated_dispatch("execute_open", [this] {
+ gate.dispatch_in_background("execute_open", *this, [this] {
// start background processing of tags
return handle_tags()
.handle_exception_type([this] (const std::system_error& e) {
@@ -966,4 +966,9 @@ seastar::future<> ProtocolV1::fault()
return seastar::now();
}
+void ProtocolV1::print(std::ostream& out) const
+{
+ out << conn;
+}
+
} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h
index 8278230f779..8ab0da18673 100644
--- a/src/crimson/net/ProtocolV1.h
+++ b/src/crimson/net/ProtocolV1.h
@@ -12,11 +12,11 @@ namespace crimson::net {
class ProtocolV1 final : public Protocol {
public:
- ProtocolV1(Dispatcher& dispatcher,
+ ProtocolV1(ChainedDispatchersRef& dispatcher,
SocketConnection& conn,
SocketMessenger& messenger);
~ProtocolV1() override;
-
+ void print(std::ostream&) const final;
private:
bool is_connected() const override;
diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc
index beb94fd9db1..05f53582914 100644
--- a/src/crimson/net/ProtocolV2.cc
+++ b/src/crimson/net/ProtocolV2.cc
@@ -143,7 +143,7 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds)
});
}
-ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
+ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher,
SocketConnection& conn,
SocketMessenger& messenger)
: Protocol(proto_t::v2, dispatcher, conn),
@@ -444,10 +444,8 @@ void ProtocolV2::reset_session(bool full)
client_cookie = generate_client_cookie();
peer_global_seq = 0;
reset_write();
- gated_dispatch("ms_handle_remote_reset", [this] {
- return dispatcher.ms_handle_remote_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- });
+ dispatcher->ms_handle_remote_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
}
@@ -881,7 +879,7 @@ void ProtocolV2::execute_connecting()
abort_protocol();
}
if (socket) {
- gated_dispatch("close_sockect_connecting",
+ gate.dispatch_in_background("close_sockect_connecting", *this,
[sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
@@ -1526,7 +1524,7 @@ ProtocolV2::server_reconnect()
void ProtocolV2::execute_accepting()
{
trigger_state(state_t::ACCEPTING, write_state_t::none, false);
- gated_dispatch("execute_accepting", [this] {
+ gate.dispatch_in_background("execute_accepting", *this, [this] {
return seastar::futurize_invoke([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
@@ -1653,10 +1651,8 @@ void ProtocolV2::execute_establishing(
accept_me();
}
- gated_dispatch("ms_handle_accept_establishing", [this] {
- return dispatcher.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- });
+ dispatcher->ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
gated_execute("execute_establishing", [this] {
return seastar::futurize_invoke([this] {
@@ -1751,11 +1747,9 @@ void ProtocolV2::trigger_replacing(bool reconnect,
if (socket) {
socket->shutdown();
}
- gated_dispatch("ms_handle_accept_replacing", [this] {
- return dispatcher.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- });
- gated_dispatch("trigger_replacing",
+ dispatcher->ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ gate.dispatch_in_background("trigger_replacing", *this,
[this,
reconnect,
do_reset,
@@ -1786,7 +1780,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
}
if (socket) {
- gated_dispatch("close_socket_replacing",
+ gate.dispatch_in_background("close_socket_replacing", *this,
[sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
@@ -1982,9 +1976,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- gated_dispatch("ms_dispatch", [this, msg = std::move(msg_ref)] {
- return dispatcher.ms_dispatch(&conn, std::move(msg));
- });
+ std::ignore = dispatcher->ms_dispatch(&conn, std::move(msg_ref));
});
}
@@ -1993,10 +1985,8 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
trigger_state(state_t::READY, write_state_t::open, false);
if (dispatch_connect) {
- gated_dispatch("ms_handle_connect", [this] {
- return dispatcher.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- });
+ dispatcher->ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
@@ -2170,8 +2160,22 @@ void ProtocolV2::trigger_close()
}
protocol_timer.cancel();
-
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
trigger_state(state_t::CLOSING, write_state_t::drop, false);
}
+void ProtocolV2::on_closed()
+{
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+}
+
+void ProtocolV2::print(std::ostream& out) const
+{
+ out << conn;
+}
+
} // namespace crimson::net
diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h
index dfca59a3361..5f0aef78fc2 100644
--- a/src/crimson/net/ProtocolV2.h
+++ b/src/crimson/net/ProtocolV2.h
@@ -13,12 +13,13 @@ namespace crimson::net {
class ProtocolV2 final : public Protocol {
public:
- ProtocolV2(Dispatcher& dispatcher,
+ ProtocolV2(ChainedDispatchersRef& dispatcher,
SocketConnection& conn,
SocketMessenger& messenger);
~ProtocolV2() override;
-
+ void print(std::ostream&) const final;
private:
+ void on_closed() override;
bool is_connected() const override;
void start_connect(const entity_addr_t& peer_addr,
@@ -84,7 +85,7 @@ class ProtocolV2 final : public Protocol {
template <typename Func>
void gated_execute(const char* what, Func&& func) {
- gated_dispatch(what, [this, &func] {
+ gate.dispatch_in_background(what, *this, [this, &func] {
execution_done = seastar::futurize_invoke(std::forward<Func>(func));
return execution_done.get_future();
});
diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc
index ca4c63b9f30..b0c7197eedb 100644
--- a/src/crimson/net/SocketConnection.cc
+++ b/src/crimson/net/SocketConnection.cc
@@ -26,7 +26,7 @@ using namespace crimson::net;
using crimson::common::local_conf;
SocketConnection::SocketConnection(SocketMessenger& messenger,
- Dispatcher& dispatcher,
+ ChainedDispatchersRef& dispatcher,
bool is_msgr2)
: messenger(messenger)
{
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
index 90b84181995..67657a6e190 100644
--- a/src/crimson/net/SocketConnection.h
+++ b/src/crimson/net/SocketConnection.h
@@ -17,6 +17,7 @@
#include <seastar/core/sharded.hh>
#include "msg/Policy.h"
+#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Socket.h"
#include "crimson/thread/Throttle.h"
@@ -54,7 +55,7 @@ class SocketConnection : public Connection {
public:
SocketConnection(SocketMessenger& messenger,
- Dispatcher& dispatcher,
+ ChainedDispatchersRef& dispatcher,
bool is_msgr2);
~SocketConnection() override;
diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc
index 9535917f18f..f3d23f4aae6 100644
--- a/src/crimson/net/SocketMessenger.cc
+++ b/src/crimson/net/SocketMessenger.cc
@@ -111,10 +111,10 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs,
});
}
-seastar::future<> SocketMessenger::start(Dispatcher *disp) {
+seastar::future<> SocketMessenger::start(ChainedDispatchersRef chained_dispatchers) {
assert(seastar::this_shard_id() == master_sid);
- dispatcher = disp;
+ dispatchers = chained_dispatchers;
if (listener) {
// make sure we have already bound to a valid address
ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
@@ -123,7 +123,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) {
return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
assert(seastar::this_shard_id() == master_sid);
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
- *this, *dispatcher, get_myaddr().is_msgr2());
+ *this, dispatchers, get_myaddr().is_msgr2());
conn->start_accept(std::move(socket), peer_addr);
return seastar::now();
});
@@ -145,7 +145,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe
return found->shared_from_this();
}
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
- *this, *dispatcher, peer_addr.is_msgr2());
+ *this, dispatchers, peer_addr.is_msgr2());
conn->start_connect(peer_addr, peer_name);
return conn->shared_from_this();
}
@@ -154,6 +154,9 @@ seastar::future<> SocketMessenger::shutdown()
{
assert(seastar::this_shard_id() == master_sid);
return seastar::futurize_invoke([this] {
+ if (dispatchers) {
+ assert(dispatchers->empty());
+ }
if (listener) {
auto d_listener = listener;
listener = nullptr;
@@ -172,6 +175,10 @@ seastar::future<> SocketMessenger::shutdown()
return conn.second->close_clean(false);
});
}).then([this] {
+ return seastar::parallel_for_each(closing_conns, [] (auto conn) {
+ return conn->close_clean(false);
+ });
+ }).then([this] {
ceph_assert(connections.empty());
shutdown_promise.set_value();
});
@@ -303,6 +310,23 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
connections.erase(found);
}
+void SocketMessenger::closing_conn(SocketConnectionRef conn)
+{
+ closing_conns.push_back(conn);
+}
+
+void SocketMessenger::closed_conn(SocketConnectionRef conn)
+{
+ for (auto it = closing_conns.begin();
+ it != closing_conns.end();) {
+ if (*it == conn) {
+ it = closing_conns.erase(it);
+ } else {
+ it++;
+ }
+ }
+}
+
seastar::future<uint32_t>
SocketMessenger::get_global_seq(uint32_t old)
{
diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h
index 5f1c02ce1ee..e86a44d6719 100644
--- a/src/crimson/net/SocketMessenger.h
+++ b/src/crimson/net/SocketMessenger.h
@@ -22,6 +22,7 @@
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_future.hh>
+#include "crimson/net/chained_dispatchers.h"
#include "Messenger.h"
#include "SocketConnection.h"
@@ -34,9 +35,18 @@ class SocketMessenger final : public Messenger {
seastar::promise<> shutdown_promise;
FixedCPUServerSocket* listener = nullptr;
- Dispatcher *dispatcher = nullptr;
+ // as we want to unregister a dispatcher from the messengers when stopping
+ // that dispatcher, we have to use intrusive slist which, when used with
+ // "boost::intrusive::linear<true>", can tolerate ongoing iteration of the
+ // list when removing an element. However, the downside of this is that an
+ // element can only be attached to one slist. So, as we need to make multiple
+ // messenger reference the same set of dispatchers, we have to make them share
+ // the same ChainedDispatchers, which means registering/unregistering an element
+ // to one messenger will affect other messengers that share the same ChainedDispatchers.
+ ChainedDispatchersRef dispatchers;
std::map<entity_addr_t, SocketConnectionRef> connections;
std::set<SocketConnectionRef> accepting_conns;
+ std::vector<SocketConnectionRef> closing_conns;
ceph::net::PolicySet<Throttle> policy_set;
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
@@ -63,7 +73,10 @@ class SocketMessenger final : public Messenger {
seastar::future<> try_bind(const entity_addrvec_t& addr,
uint32_t min_port, uint32_t max_port) override;
- seastar::future<> start(Dispatcher *dispatcher) override;
+ seastar::future<> start(ChainedDispatchersRef dispatchers) override;
+ void add_dispatcher(Dispatcher& disp) {
+ dispatchers->push_back(disp);
+ }
ConnectionRef connect(const entity_addr_t& peer_addr,
const entity_name_t& peer_name) override;
@@ -73,6 +86,12 @@ class SocketMessenger final : public Messenger {
return shutdown_promise.get_future();
}
+ void remove_dispatcher(Dispatcher& disp) override {
+ dispatchers->erase(disp);
+ }
+ bool dispatcher_chain_empty() const override {
+ return !dispatchers || dispatchers->empty();
+ }
seastar::future<> shutdown() override;
void print(ostream& out) const override {
@@ -101,6 +120,8 @@ class SocketMessenger final : public Messenger {
void unaccept_conn(SocketConnectionRef);
void register_conn(SocketConnectionRef);
void unregister_conn(SocketConnectionRef);
+ void closing_conn(SocketConnectionRef);
+ void closed_conn(SocketConnectionRef);
seastar::shard_id shard_id() const {
assert(seastar::this_shard_id() == master_sid);
return master_sid;
diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc
new file mode 100644
index 00000000000..19c7d3d4863
--- /dev/null
+++ b/src/crimson/net/chained_dispatchers.cc
@@ -0,0 +1,39 @@
+#include "crimson/net/chained_dispatchers.h"
+#include "crimson/net/Connection.h"
+#include "msg/Message.h"
+
+seastar::future<>
+ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
+ MessageRef m) {
+ return seastar::do_for_each(dispatchers, [conn, m](Dispatcher& dispatcher) {
+ return dispatcher.ms_dispatch(conn, m);
+ });
+}
+
+void
+ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_accept(conn);
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
+ for(auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_connect(conn);
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_reset(conn, is_replace);
+ }
+}
+
+void
+ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_remote_reset(conn);
+ }
+}
diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h
new file mode 100644
index 00000000000..a5facef2b96
--- /dev/null
+++ b/src/crimson/net/chained_dispatchers.h
@@ -0,0 +1,45 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive/slist.hpp>
+
+#include "crimson/net/Dispatcher.h"
+#include "crimson/common/log.h"
+
+using crimson::net::Dispatcher;
+
+// in existing Messenger, dispatchers are put into a chain as described by
+// chain-of-responsibility pattern. we could do the same to stop processing
+// the message once any of the dispatchers claims this message, and prevent
+// other dispatchers from reading it. but this change is more involved as
+// it requires changing the ms_ methods to return a bool. so as an intermediate
+// solution, we are using an observer dispatcher to notify all the interested
+// or unintersted parties.
+class ChainedDispatchers {
+ boost::intrusive::slist<
+ Dispatcher,
+ boost::intrusive::linear<true>,
+ boost::intrusive::cache_last<true>> dispatchers;
+public:
+ void push_front(Dispatcher& dispatcher) {
+ dispatchers.push_front(dispatcher);
+ }
+ void push_back(Dispatcher& dispatcher) {
+ dispatchers.push_back(dispatcher);
+ }
+ void erase(Dispatcher& dispatcher) {
+ dispatchers.erase(dispatchers.iterator_to(dispatcher));
+ }
+ bool empty() const {
+ return dispatchers.empty();
+ }
+ seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m);
+ void ms_handle_accept(crimson::net::ConnectionRef conn);
+ void ms_handle_connect(crimson::net::ConnectionRef conn);
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
+ void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
+};
+
+using ChainedDispatchersRef = seastar::lw_shared_ptr<ChainedDispatchers>;
diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc
index 6a66a6f0626..93105a64221 100644
--- a/src/crimson/os/alienstore/alien_store.cc
+++ b/src/crimson/os/alienstore/alien_store.cc
@@ -96,7 +96,7 @@ seastar::future<> AlienStore::mount()
seastar::future<> AlienStore::umount()
{
- logger().debug("{}", __func__);
+ logger().info("{}", __func__);
return transaction_gate.close().then([this] {
return tp->submit([this] {
return store->umount();
diff --git a/src/crimson/os/cyanstore/cyan_store.h b/src/crimson/os/cyanstore/cyan_store.h
index e832ca1f043..a53cfea9315 100644
--- a/src/crimson/os/cyanstore/cyan_store.h
+++ b/src/crimson/os/cyanstore/cyan_store.h
@@ -68,7 +68,9 @@ public:
CyanStore(const std::string& path);
~CyanStore() final;
- seastar::future<> stop() final {return seastar::now();}
+ seastar::future<> stop() final {
+ return seastar::now();
+ }
seastar::future<> mount() final;
seastar::future<> umount() final;
diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt
index 02c0a5678c7..1eaf862ab84 100644
--- a/src/crimson/osd/CMakeLists.txt
+++ b/src/crimson/osd/CMakeLists.txt
@@ -1,5 +1,4 @@
add_executable(crimson-osd
- chained_dispatchers.cc
ec_backend.cc
heartbeat.cc
main.cc
diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc
deleted file mode 100644
index 3d6ba846b75..00000000000
--- a/src/crimson/osd/chained_dispatchers.cc
+++ /dev/null
@@ -1,40 +0,0 @@
-#include "crimson/osd/chained_dispatchers.h"
-#include "crimson/net/Connection.h"
-#include "msg/Message.h"
-
-
-seastar::future<>
-ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
- MessageRef m) {
- return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) {
- return dispatcher->ms_dispatch(conn, m);
- });
-}
-
-seastar::future<>
-ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
- return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
- return dispatcher->ms_handle_accept(conn);
- });
-}
-
-seastar::future<>
-ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
- return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
- return dispatcher->ms_handle_connect(conn);
- });
-}
-
-seastar::future<>
-ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
- return seastar::do_for_each(dispatchers, [conn, is_replace](Dispatcher* dispatcher) {
- return dispatcher->ms_handle_reset(conn, is_replace);
- });
-}
-
-seastar::future<>
-ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
- return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) {
- return dispatcher->ms_handle_remote_reset(conn);
- });
-}
diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h
deleted file mode 100644
index 2ea1e517b78..00000000000
--- a/src/crimson/osd/chained_dispatchers.h
+++ /dev/null
@@ -1,30 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <deque>
-#include "crimson/net/Dispatcher.h"
-
-// in existing Messenger, dispatchers are put into a chain as described by
-// chain-of-responsibility pattern. we could do the same to stop processing
-// the message once any of the dispatchers claims this message, and prevent
-// other dispatchers from reading it. but this change is more involved as
-// it requires changing the ms_ methods to return a bool. so as an intermediate
-// solution, we are using an observer dispatcher to notify all the interested
-// or unintersted parties.
-class ChainedDispatchers : public crimson::net::Dispatcher {
- std::deque<Dispatcher*> dispatchers;
-public:
- void push_front(Dispatcher* dispatcher) {
- dispatchers.push_front(dispatcher);
- }
- void push_back(Dispatcher* dispatcher) {
- dispatchers.push_back(dispatcher);
- }
- seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override;
- seastar::future<> ms_handle_accept(crimson::net::ConnectionRef conn) override;
- seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
- seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) override;
-};
diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h
index d9451d2296b..a213a3f1ee4 100644
--- a/src/crimson/osd/ec_backend.h
+++ b/src/crimson/osd/ec_backend.h
@@ -17,6 +17,9 @@ public:
crimson::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile,
uint64_t stripe_width);
+ seastar::future<> stop() final {
+ return seastar::now();
+ }
private:
ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
uint64_t off,
diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc
index 67e31267322..6bf4d13cb51 100644
--- a/src/crimson/osd/heartbeat.cc
+++ b/src/crimson/osd/heartbeat.cc
@@ -54,8 +54,14 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
SocketPolicy::stateless_server(0));
back_msgr->set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::stateless_server(0));
- return seastar::when_all_succeed(start_messenger(*front_msgr, front_addrs),
- start_messenger(*back_msgr, back_addrs))
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return seastar::when_all_succeed(start_messenger(*front_msgr,
+ front_addrs,
+ chained_dispatchers),
+ start_messenger(*back_msgr,
+ back_addrs,
+ chained_dispatchers))
.then([this] {
timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_heartbeat_interval));
@@ -64,19 +70,29 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
seastar::future<>
Heartbeat::start_messenger(crimson::net::Messenger& msgr,
- const entity_addrvec_t& addrs)
+ const entity_addrvec_t& addrs,
+ ChainedDispatchersRef chained_dispatchers)
{
return msgr.try_bind(addrs,
local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max).then([&msgr, this] {
- return msgr.start(this);
+ local_conf()->ms_bind_port_max)
+ .then([&msgr, chained_dispatchers]() mutable {
+ return msgr.start(chained_dispatchers);
});
}
seastar::future<> Heartbeat::stop()
{
- return seastar::when_all_succeed(front_msgr->shutdown(),
- back_msgr->shutdown());
+ logger().info("{}", __func__);
+ timer.cancel();
+ if (!front_msgr->dispatcher_chain_empty())
+ front_msgr->remove_dispatcher(*this);
+ if (!back_msgr->dispatcher_chain_empty())
+ back_msgr->remove_dispatcher(*this);
+ return gate.close().then([this] {
+ return seastar::when_all_succeed(front_msgr->shutdown(),
+ back_msgr->shutdown());
+ });
}
const entity_addrvec_t& Heartbeat::get_front_addrs() const
@@ -187,15 +203,17 @@ void Heartbeat::remove_peer(osd_id_t peer)
seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
MessageRef m)
{
- switch (m->get_type()) {
- case MSG_OSD_PING:
- return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
- default:
- return seastar::now();
- }
+ return gate.dispatch(__func__, *this, [this, conn, &m] {
+ switch (m->get_type()) {
+ case MSG_OSD_PING:
+ return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
+ default:
+ return seastar::now();
+ }
+ });
}
-seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
+void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
auto found = std::find_if(peers.begin(), peers.end(),
[conn](const peers_map_t::value_type& peer) {
@@ -203,13 +221,12 @@ seastar::future<> Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, b
peer.second.con_back == conn);
});
if (found == peers.end()) {
- return seastar::now();
+ return;
}
const auto peer = found->first;
const auto epoch = found->second.epoch;
remove_peer(peer);
add_peer(peer, epoch);
- return seastar::now();
}
seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
@@ -428,3 +445,8 @@ bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
// replies from both front/back connections
return !is_unhealthy(now);
}
+
+void Heartbeat::print(std::ostream& out) const
+{
+ out << "heartbeat";
+}
diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h
index 55571cff2fb..f5e2ad596cf 100644
--- a/src/crimson/osd/heartbeat.h
+++ b/src/crimson/osd/heartbeat.h
@@ -6,6 +6,7 @@
#include <cstdint>
#include <seastar/core/future.hh>
#include "common/ceph_time.h"
+#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Fwd.h"
@@ -46,8 +47,9 @@ public:
// Dispatcher methods
seastar::future<> ms_dispatch(crimson::net::Connection* conn,
MessageRef m) override;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+ void print(std::ostream&) const;
private:
seastar::future<> handle_osd_ping(crimson::net::Connection* conn,
Ref<MOSDPing> m);
@@ -67,7 +69,8 @@ private:
void add_reporter_peers(int whoami);
seastar::future<> start_messenger(crimson::net::Messenger& msgr,
- const entity_addrvec_t& addrs);
+ const entity_addrvec_t& addrs,
+ ChainedDispatchersRef);
private:
const crimson::osd::ShardServices& service;
crimson::mon::Client& monc;
@@ -121,4 +124,10 @@ private:
// osds we've reported to monior as failed ones, but they are not marked down
// yet
std::map<osd_id_t, failure_info_t> failure_pending;
+ crimson::common::Gated gate;
};
+
+inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {
+ hb.print(out);
+ return out;
+}
diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc
index 3591173f755..5a082d6c756 100644
--- a/src/crimson/osd/osd.cc
+++ b/src/crimson/osd/osd.cc
@@ -34,6 +34,7 @@
#include "osd/PGPeeringEvent.h"
#include "osd/PeeringState.h"
+#include "crimson/common/exception.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
@@ -256,18 +257,23 @@ seastar::future<> OSD::start()
cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
SocketPolicy::stateless_server(0));
- dispatchers.push_front(this);
- dispatchers.push_front(monc.get());
- dispatchers.push_front(mgrc.get());
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_front(*mgrc);
+ chained_dispatchers->push_front(*monc);
+ chained_dispatchers->push_front(*this);
return seastar::when_all_succeed(
cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this] { return cluster_msgr->start(&dispatchers); }),
+ .then([this, chained_dispatchers]() mutable {
+ return cluster_msgr->start(chained_dispatchers);
+ }),
public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this] { return public_msgr->start(&dispatchers); }));
+ .then([this, chained_dispatchers]() mutable {
+ return public_msgr->start(chained_dispatchers);
+ }));
}).then([this] {
return seastar::when_all_succeed(monc->start(),
mgrc->start());
@@ -453,20 +459,38 @@ seastar::future<> OSD::stop()
// see also OSD::shutdown()
state.set_stopping();
- return gate.close().then([this] {
- return asok->stop();
- }).then([this] {
+ if (!public_msgr->dispatcher_chain_empty()) {
+ public_msgr->remove_dispatcher(*this);
+ public_msgr->remove_dispatcher(*mgrc);
+ public_msgr->remove_dispatcher(*monc);
+ }
+ if (!cluster_msgr->dispatcher_chain_empty()) {
+ cluster_msgr->remove_dispatcher(*this);
+ cluster_msgr->remove_dispatcher(*mgrc);
+ cluster_msgr->remove_dispatcher(*monc);
+ }
+ auto gate_close_fut = gate.close();
+ return asok->stop().then([this] {
return heartbeat->stop();
}).then([this] {
- return monc->stop();
- }).then([this] {
- return when_all_succeed(
- public_msgr->shutdown(),
- cluster_msgr->shutdown());
- }).then([this] {
return store->umount();
}).then([this] {
return store->stop();
+ }).then([this] {
+ return seastar::parallel_for_each(pg_map.get_pgs(),
+ [](auto& p) {
+ return p.second->stop();
+ });
+ }).then([this] {
+ return monc->stop();
+ }).then([this] {
+ return mgrc->stop();
+ }).then([fut=std::move(gate_close_fut)]() mutable {
+ return std::move(fut);
+ }).then([this] {
+ return when_all_succeed(
+ public_msgr->shutdown(),
+ cluster_msgr->shutdown());
}).handle_exception([](auto ep) {
logger().error("error while stopping osd: {}", ep);
});
@@ -483,6 +507,14 @@ void OSD::dump_status(Formatter* f) const
f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
}
+void OSD::print(std::ostream& out) const
+{
+ out << "{osd." << superblock.whoami << " "
+ << superblock.osd_fsid << " [" << superblock.oldest_map
+ << "," << superblock.newest_map << "] " << pg_map.get_pgs().size()
+ << " pgs}";
+}
+
seastar::future<> OSD::load_pgs()
{
return store->list_collections().then([this](auto colls) {
@@ -575,79 +607,69 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
{
- if (state.is_stopping()) {
- return seastar::now();
- }
-
- switch (m->get_type()) {
- case CEPH_MSG_OSD_MAP:
- return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
- case CEPH_MSG_OSD_OP:
- return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
- case MSG_OSD_PG_CREATE2:
- shard_services.start_operation<CompoundPeeringRequest>(
- *this,
- conn->get_shared(),
- m);
- return seastar::now();
- case MSG_COMMAND:
- return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
- case MSG_OSD_PG_PULL:
- [[fallthrough]];
- case MSG_OSD_PG_PUSH:
- [[fallthrough]];
- case MSG_OSD_PG_PUSH_REPLY:
- [[fallthrough]];
- case MSG_OSD_PG_RECOVERY_DELETE:
- [[fallthrough]];
- case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
- return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
- case MSG_OSD_PG_LEASE:
- [[fallthrough]];
- case MSG_OSD_PG_LEASE_ACK:
- [[fallthrough]];
- case MSG_OSD_PG_NOTIFY2:
- [[fallthrough]];
- case MSG_OSD_PG_INFO2:
- [[fallthrough]];
- case MSG_OSD_PG_QUERY2:
- [[fallthrough]];
- case MSG_OSD_BACKFILL_RESERVE:
- [[fallthrough]];
- case MSG_OSD_RECOVERY_RESERVE:
- [[fallthrough]];
- case MSG_OSD_PG_LOG:
- return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
- case MSG_OSD_REPOP:
- return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
- case MSG_OSD_REPOPREPLY:
- return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
- default:
- logger().info("{} unhandled message {}", __func__, *m);
- return seastar::now();
- }
-}
-
-seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn)
-{
- if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
- return seastar::now();
- } else {
- return seastar::now();
- }
+ return gate.dispatch(__func__, *this, [this, conn, &m] {
+ if (state.is_stopping()) {
+ return seastar::now();
+ }
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_MAP:
+ return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+ case CEPH_MSG_OSD_OP:
+ return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+ case MSG_OSD_PG_CREATE2:
+ shard_services.start_operation<CompoundPeeringRequest>(
+ *this,
+ conn->get_shared(),
+ m);
+ return seastar::now();
+ case MSG_COMMAND:
+ return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
+ case MSG_OSD_PG_PULL:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
+ case MSG_OSD_PG_LEASE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LEASE_ACK:
+ [[fallthrough]];
+ case MSG_OSD_PG_NOTIFY2:
+ [[fallthrough]];
+ case MSG_OSD_PG_INFO2:
+ [[fallthrough]];
+ case MSG_OSD_PG_QUERY2:
+ [[fallthrough]];
+ case MSG_OSD_BACKFILL_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_RECOVERY_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LOG:
+ return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+ case MSG_OSD_REPOP:
+ return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+ case MSG_OSD_REPOPREPLY:
+ return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
+ default:
+ logger().info("ms_dispatch unhandled message {}", *m);
+ return seastar::now();
+ }
+ });
}
-seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
+void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
// TODO: cleanup the session attached to this connection
logger().warn("ms_handle_reset");
- return seastar::now();
}
-seastar::future<> OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
+void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
{
logger().warn("ms_handle_remote_reset");
- return seastar::now();
}
void OSD::handle_authentication(const EntityName& name,
@@ -983,7 +1005,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn,
Ref<MOSDOp> m)
{
- shard_services.start_operation<ClientRequest>(
+ (void) shard_services.start_operation<ClientRequest>(
*this,
conn->get_shared(),
std::move(m));
@@ -994,7 +1016,7 @@ seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn,
Ref<MOSDRepOp> m)
{
m->finish_decode();
- shard_services.start_operation<RepRequest>(
+ (void) shard_services.start_operation<RepRequest>(
*this,
conn->get_shared(),
std::move(m));
@@ -1017,7 +1039,7 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
seastar::future<> OSD::handle_recovery_subreq(crimson::net::Connection* conn,
Ref<MOSDFastDispatchOp> m)
{
- shard_services.start_operation<RecoverySubRequest>(
+ (void) shard_services.start_operation<RecoverySubRequest>(
*this,
conn->get_shared(),
std::move(m));
@@ -1107,13 +1129,14 @@ seastar::future<> OSD::handle_peering_op(
{
const int from = m->get_source().num();
logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
- shard_services.start_operation<RemotePeeringEvent>(
+ std::unique_ptr<PGPeeringEvent> evt(m->get_event());
+ (void) shard_services.start_operation<RemotePeeringEvent>(
*this,
conn->get_shared(),
shard_services,
pg_shard_t{from, m->get_spg().shard},
m->get_spg(),
- std::move(*m->get_event()));
+ std::move(*evt));
return seastar::now();
}
diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h
index 47e8280368a..05d5ad9adfc 100644
--- a/src/crimson/osd/osd.h
+++ b/src/crimson/osd/osd.h
@@ -12,13 +12,14 @@
#include "crimson/common/type_helpers.h"
#include "crimson/common/auth_handler.h"
+#include "crimson/common/gated.h"
+#include "crimson/net/chained_dispatchers.h"
#include "crimson/admin/admin_socket.h"
#include "crimson/admin/osd_admin.h"
#include "crimson/common/simple_lru.h"
#include "crimson/common/shared_lru.h"
#include "crimson/mgr/client.h"
#include "crimson/net/Dispatcher.h"
-#include "crimson/osd/chained_dispatchers.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/state.h"
#include "crimson/osd/shard_services.h"
@@ -63,7 +64,6 @@ class OSD final : public crimson::net::Dispatcher,
private OSDMapService,
private crimson::common::AuthHandler,
private crimson::mgr::WithStats {
- seastar::gate gate;
const int whoami;
const uint32_t nonce;
seastar::timer<seastar::lowres_clock> beacon_timer;
@@ -71,7 +71,6 @@ class OSD final : public crimson::net::Dispatcher,
crimson::net::MessengerRef cluster_msgr;
// talk with client/mon/mgr
crimson::net::MessengerRef public_msgr;
- ChainedDispatchers dispatchers;
std::unique_ptr<crimson::mon::Client> monc;
std::unique_ptr<crimson::mgr::Client> mgrc;
@@ -99,9 +98,8 @@ class OSD final : public crimson::net::Dispatcher,
// Dispatcher methods
seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final;
- seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) final;
- seastar::future<> ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
- seastar::future<> ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
+ void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
// mgr::WithStats methods
MessageRef get_stats() final;
@@ -133,6 +131,7 @@ public:
void dump_status(Formatter*) const;
+ void print(std::ostream&) const;
private:
seastar::future<> start_boot();
seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap);
@@ -205,7 +204,7 @@ public:
private:
PGMap pg_map;
-
+ crimson::common::Gated gate;
public:
blocking_future<Ref<PG>> get_or_create_pg(
spg_t pgid,
@@ -224,4 +223,9 @@ public:
friend class PGAdvanceMap;
};
+inline std::ostream& operator<<(std::ostream& out, const OSD& osd) {
+ osd.print(out);
+ return out;
+}
+
}
diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h
index 7fe092f2e92..d6919ddfc00 100644
--- a/src/crimson/osd/osd_operation.h
+++ b/src/crimson/osd/osd_operation.h
@@ -12,6 +12,8 @@
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <seastar/core/shared_mutex.hh>
#include <seastar/core/future.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/lowres_clock.hh>
#include "include/ceph_assert.h"
#include "crimson/osd/scheduler/scheduler.h"
@@ -73,6 +75,9 @@ class blocking_future_detail {
template <typename... V, typename... U>
friend blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args);
+ template <typename... V, typename Exception>
+ friend blocking_future_detail<seastar::future<V...>>
+ make_exception_blocking_future(Exception&& e);
template <typename U>
friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
@@ -100,6 +105,14 @@ blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&...
seastar::make_ready_future<V...>(std::forward<U>(args)...));
}
+template <typename... V, typename Exception>
+blocking_future_detail<seastar::future<V...>>
+make_exception_blocking_future(Exception&& e) {
+ return blocking_future<V...>(
+ nullptr,
+ seastar::make_exception_future<V...>(e));
+}
+
/**
* Provides an interface for dumping diagnostic information about
* why a particular op is not making progress.
@@ -269,6 +282,8 @@ class OperationRegistry {
static_cast<int>(OperationTypeCode::last_op)
> op_id_counters = {};
+ seastar::timer<seastar::lowres_clock> shutdown_timer;
+ seastar::promise<> shutdown;
public:
template <typename T, typename... Args>
typename T::IRef create_operation(Args&&... args) {
@@ -277,6 +292,21 @@ public:
op->set_id(op_id_counters[static_cast<int>(T::type)]++);
return op;
}
+
+ seastar::future<> stop() {
+ shutdown_timer.set_callback([this] {
+ if (std::all_of(registries.begin(),
+ registries.end(),
+ [](auto& opl) {
+ return opl.empty();
+ })) {
+ shutdown.set_value();
+ shutdown_timer.cancel();
+ }
+ });
+ shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/));
+ return shutdown.get_future();
+ }
};
/**
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc
index 72f9f4396b3..f31621a4867 100644
--- a/src/crimson/osd/osd_operations/client_request.cc
+++ b/src/crimson/osd/osd_operations/client_request.cc
@@ -6,6 +6,7 @@
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "crimson/common/exception.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
#include "common/Formatter.h"
@@ -56,7 +57,9 @@ seastar::future<> ClientRequest::start()
logger().debug("{}: start", *this);
IRef opref = this;
- return with_blocking_future(handle.enter(cp().await_map))
+ return crimson::common::handle_system_shutdown(
+ [this, opref=std::move(opref)]() mutable {
+ return with_blocking_future(handle.enter(cp().await_map))
.then([this]() {
return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
}).then([this](epoch_t epoch) {
@@ -89,6 +92,7 @@ seastar::future<> ClientRequest::start()
});
});
});
+ });
}
seastar::future<> ClientRequest::process_pg_op(
diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc
index 46a6d1ca394..ca99f45baee 100644
--- a/src/crimson/osd/osd_operations/compound_peering_request.cc
+++ b/src/crimson/osd/osd_operations/compound_peering_request.cc
@@ -10,6 +10,7 @@
#include "common/Formatter.h"
+#include "crimson/common/exception.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/osd_operations/compound_peering_request.h"
@@ -162,13 +163,16 @@ seastar::future<> CompoundPeeringRequest::start()
IRef ref = this;
logger().info("{}: about to fork future", *this);
- return with_blocking_future(
- blocker->make_blocking_future(state->promise.get_future())
- ).then([this, blocker=std::move(blocker)](auto &&ctx) {
- logger().info("{}: sub events complete", *this);
- return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
- }).then([this, ref=std::move(ref)] {
- logger().info("{}: complete", *this);
+ return crimson::common::handle_system_shutdown(
+ [this, ref, blocker=std::move(blocker), state]() mutable {
+ return with_blocking_future(
+ blocker->make_blocking_future(state->promise.get_future())
+ ).then([this, blocker=std::move(blocker)](auto &&ctx) {
+ logger().info("{}: sub events complete", *this);
+ return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+ }).then([this, ref=std::move(ref)] {
+ logger().info("{}: complete", *this);
+ });
});
}
diff --git a/src/crimson/osd/osdmap_gate.cc b/src/crimson/osd/osdmap_gate.cc
index 573fc8061f3..90afc32b4e4 100644
--- a/src/crimson/osd/osdmap_gate.cc
+++ b/src/crimson/osd/osdmap_gate.cc
@@ -1,6 +1,7 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include "crimson/common/exception.h"
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/shard_services.h"
#include "common/Formatter.h"
@@ -22,6 +23,10 @@ void OSDMapGate::OSDMapBlocker::dump_detail(Formatter *f) const
blocking_future<epoch_t> OSDMapGate::wait_for_map(epoch_t epoch)
{
+ if (__builtin_expect(stopping, false)) {
+ return make_exception_blocking_future<epoch_t>(
+ crimson::common::system_shutdown_exception());
+ }
if (current >= epoch) {
return make_ready_blocking_future<epoch_t>(current);
} else {
@@ -51,4 +56,16 @@ void OSDMapGate::got_map(epoch_t epoch) {
waiting_peering.erase(first, last);
}
+seastar::future<> OSDMapGate::stop() {
+ logger().info("osdmap::stop");
+ stopping = true;
+ auto first = waiting_peering.begin();
+ auto last = waiting_peering.end();
+ std::for_each(first, last, [](auto& blocked_requests) {
+ blocked_requests.second.promise.set_exception(
+ crimson::common::system_shutdown_exception());
+ });
+ return seastar::now();
+}
+
}
diff --git a/src/crimson/osd/osdmap_gate.h b/src/crimson/osd/osdmap_gate.h
index 6a891cc152a..15e3d2057ab 100644
--- a/src/crimson/osd/osdmap_gate.h
+++ b/src/crimson/osd/osdmap_gate.h
@@ -51,6 +51,7 @@ class OSDMapGate {
waiting_peering_t waiting_peering;
epoch_t current = 0;
std::optional<std::reference_wrapper<ShardServices>> shard_services;
+ bool stopping = false;
public:
OSDMapGate(
const char *blocker_type,
@@ -60,6 +61,7 @@ public:
// wait for an osdmap whose epoch is greater or equal to given epoch
blocking_future<epoch_t> wait_for_map(epoch_t epoch);
void got_map(epoch_t epoch);
+ seastar::future<> stop();
};
}
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc
index 4b80ee55de4..6f2da55c4f6 100644
--- a/src/crimson/osd/pg.cc
+++ b/src/crimson/osd/pg.cc
@@ -27,6 +27,7 @@
#include "os/Transaction.h"
+#include "crimson/common/exception.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/cyanstore/cyan_store.h"
@@ -149,7 +150,7 @@ void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
// handle the peering event in the background
check_readable_timer.cancel();
check_readable_timer.set_callback([last_peering_reset, this] {
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
@@ -237,7 +238,7 @@ void PG::on_activate_complete()
wait_for_active_blocker.on_active();
if (peering_state.needs_recovery()) {
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
@@ -246,7 +247,7 @@ void PG::on_activate_complete()
get_osdmap_epoch(),
PeeringState::DoRecovery{});
} else if (peering_state.needs_backfill()) {
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
@@ -257,7 +258,7 @@ void PG::on_activate_complete()
} else {
logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
" for pg: {}", __func__, pgid);
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
@@ -342,7 +343,7 @@ void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
// handle the peering event in the background
renew_lease_timer.cancel();
renew_lease_timer.set_callback([last_peering_reset, this] {
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
@@ -372,6 +373,11 @@ void PG::init(
seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
return pg_meta.load();
}).then([this, store](auto&& ret) {
@@ -400,7 +406,7 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
peering_state.set_role(rr);
epoch_t epoch = get_osdmap_epoch();
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
@@ -499,12 +505,23 @@ blocking_future<> PG::WaitForActiveBlocker::wait()
}
}
+seastar::future<> PG::WaitForActiveBlocker::stop()
+{
+ p.set_exception(crimson::common::system_shutdown_exception());
+ return seastar::now();
+}
+
seastar::future<> PG::submit_transaction(const OpInfo& op_info,
const std::vector<OSDOp>& ops,
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
const osd_op_params_t& osd_op_p)
{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
epoch_t map_epoch = get_osdmap_epoch();
std::vector<pg_log_entry_t> log_entries;
@@ -548,6 +565,10 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
ObjectContextRef obc,
const OpInfo &op_info)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
using osd_op_errorator = OpsExecuter::osd_op_errorator;
const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
: m->get_hobj();
@@ -636,6 +657,10 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
auto ox = std::make_unique<OpsExecuter>(*this/* as const& */, m);
return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
@@ -709,6 +734,10 @@ PG::load_obc_ertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
ceph_assert(!oid.is_head());
using ObjectContextRef = crimson::osd::ObjectContextRef;
auto coid = resolve_oid(head->get_ro_ss(), oid);
@@ -742,6 +771,10 @@ PG::load_obc_ertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
PG::get_or_load_head_obc(hobject_t oid)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
ceph_assert(oid.is_head());
auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
if (existed) {
@@ -795,6 +828,10 @@ PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
PG::get_locked_obc(
Operation *op, const hobject_t &oid, RWState::State type)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
return get_or_load_head_obc(oid.get_head()).safe_then(
[this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
auto &[head_obc, head_existed] = p;
@@ -835,6 +872,11 @@ PG::get_locked_obc(
seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
ceph::os::Transaction txn;
auto encoded_txn = req->get_data().cbegin();
decode(txn, encoded_txn);
@@ -861,4 +903,19 @@ void PG::handle_rep_op_reply(crimson::net::Connection* conn,
backend->got_rep_op_reply(m);
}
+seastar::future<> PG::stop()
+{
+ logger().info("PG {} {}", pgid, __func__);
+ stopping = true;
+ return osdmap_gate.stop().then([this] {
+ return wait_for_active_blocker.stop();
+ }).then([this] {
+ return recovery_handler->stop();
+ }).then([this] {
+ return recovery_backend->stop();
+ }).then([this] {
+ return backend->stop();
+ });
+}
+
}
diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h
index 92881492edf..a4e2650c994 100644
--- a/src/crimson/osd/pg.h
+++ b/src/crimson/osd/pg.h
@@ -183,7 +183,7 @@ public:
template <typename T>
void start_peering_event_operation(T &&evt, float delay = 0) {
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
@@ -506,6 +506,9 @@ public:
const OpInfo &op_info,
Operation *op,
F &&f) {
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
auto [oid, type] = get_oid_and_lock(*m, op_info);
return get_locked_obc(op, oid, type)
.safe_then([f=std::forward<F>(f), type=type](auto obc) {
@@ -559,6 +562,8 @@ public:
ShardServices& get_shard_services() final {
return shard_services;
}
+ seastar::future<> stop();
+
private:
std::unique_ptr<PGBackend> backend;
std::unique_ptr<RecoveryBackend> recovery_backend;
@@ -620,6 +625,11 @@ public:
}
private:
+ // instead of seastar::gate, we use a boolean flag to indicate
+ // whether the system is shutting down, as we don't need to track
+ // continuations here.
+ bool stopping = false;
+
class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
PG *pg;
@@ -635,6 +645,7 @@ private:
WaitForActiveBlocker(PG *pg) : pg(pg) {}
void on_active();
blocking_future<> wait();
+ seastar::future<> stop();
} wait_for_active_blocker;
friend std::ostream& operator<<(std::ostream&, const PG& pg);
diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc
index a5deceb0892..f203a72f114 100644
--- a/src/crimson/osd/pg_backend.cc
+++ b/src/crimson/osd/pg_backend.cc
@@ -15,6 +15,7 @@
#include "os/Transaction.h"
#include "common/Clock.h"
+#include "crimson/common/exception.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/osd_operation.h"
@@ -64,6 +65,10 @@ PGBackend::PGBackend(shard_id_t shard,
PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t::ref>
PGBackend::load_metadata(const hobject_t& oid)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
return store->get_attrs(
coll,
ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then(
@@ -377,6 +382,10 @@ seastar::future<> PGBackend::remove(ObjectState& os,
seastar::future<std::tuple<std::vector<hobject_t>, hobject_t>>
PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
return store->list_objects(coll,
gstart,
@@ -465,6 +474,10 @@ PGBackend::get_attr_errorator::future<ceph::bufferptr> PGBackend::getxattr(
const hobject_t& soid,
std::string_view key) const
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
return store->get_attr(coll, ghobject_t{soid}, key);
}
@@ -509,6 +522,10 @@ seastar::future<> PGBackend::omap_get_keys(
const ObjectState& os,
OSDOp& osd_op) const
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
std::string start_after;
uint64_t max_return;
try {
@@ -551,6 +568,10 @@ seastar::future<> PGBackend::omap_get_vals(
const ObjectState& os,
OSDOp& osd_op) const
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
std::string start_after;
uint64_t max_return;
std::string filter_prefix;
@@ -602,6 +623,10 @@ seastar::future<> PGBackend::omap_get_vals_by_keys(
const ObjectState& os,
OSDOp& osd_op) const
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
std::set<std::string> keys_to_get;
try {
auto p = osd_op.indata.cbegin();
diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h
index 001e98eb268..bb2637fb328 100644
--- a/src/crimson/osd/pg_backend.h
+++ b/src/crimson/osd/pg_backend.h
@@ -136,10 +136,12 @@ public:
const ghobject_t& oid);
virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
+ virtual seastar::future<> stop() = 0;
protected:
const shard_id_t shard;
CollectionRef coll;
crimson::os::FuturizedStore* store;
+ bool stopping = false;
public:
struct loaded_object_md_t {
ObjectState os;
diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc
index af63df60dd8..f040c7e3219 100644
--- a/src/crimson/osd/pg_recovery.cc
+++ b/src/crimson/osd/pg_recovery.cc
@@ -24,7 +24,7 @@ void PGRecovery::start_background_recovery(
crimson::osd::scheduler::scheduler_class_t klass)
{
using BackgroundRecovery = crimson::osd::BackgroundRecovery;
- pg->get_shard_services().start_operation<BackgroundRecovery>(
+ (void) pg->get_shard_services().start_operation<BackgroundRecovery>(
static_cast<crimson::osd::PG*>(pg),
pg->get_shard_services(),
pg->get_osdmap_epoch(),
@@ -58,7 +58,7 @@ PGRecovery::start_recovery_ops(size_t max_to_start)
crimson::get_logger(ceph_subsys_osd).debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
pg->get_pgid());
using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
- pg->get_shard_services().start_operation<LocalPeeringEvent>(
+ (void) pg->get_shard_services().start_operation<LocalPeeringEvent>(
static_cast<crimson::osd::PG*>(pg),
pg->get_shard_services(),
pg->get_pg_whoami(),
@@ -345,6 +345,7 @@ void PGRecovery::on_global_recover (
const object_stat_sum_t& stat_diff,
const bool is_delete)
{
+ crimson::get_logger(ceph_subsys_osd).info("{} {}", __func__, soid);
pg->get_peering_state().object_recovered(soid, stat_diff);
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
if (!is_delete)
diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h
index ccc46237daf..d5309550e4a 100644
--- a/src/crimson/osd/pg_recovery.h
+++ b/src/crimson/osd/pg_recovery.h
@@ -22,7 +22,7 @@ public:
crimson::osd::scheduler::scheduler_class_t klass);
crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
-
+ seastar::future<> stop() { return seastar::now(); }
private:
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
diff --git a/src/crimson/osd/pg_recovery_listener.h b/src/crimson/osd/pg_recovery_listener.h
index 2946f93a53d..29e91e403ef 100644
--- a/src/crimson/osd/pg_recovery_listener.h
+++ b/src/crimson/osd/pg_recovery_listener.h
@@ -3,6 +3,8 @@
#pragma once
+#include <seastar/core/future.hh>
+
#include "common/hobject.h"
#include "include/types.h"
#include "osd/osd_types.h"
@@ -30,4 +32,5 @@ public:
virtual bool is_unreadable_object(const hobject_t&) const = 0;
virtual bool has_reset_since(epoch_t) const = 0;
virtual std::vector<pg_shard_t> get_replica_recovery_order() const = 0;
+ virtual seastar::future<> stop() = 0;
};
diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc
index 31ae27f91dd..af31db7d44e 100644
--- a/src/crimson/osd/recovery_backend.cc
+++ b/src/crimson/osd/recovery_backend.cc
@@ -1,6 +1,7 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include "crimson/common/exception.h"
#include "crimson/osd/recovery_backend.h"
#include "crimson/osd/pg.h"
@@ -34,7 +35,7 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t,
temp_contents.clear();
for (auto& [soid, recovery_waiter] : recovering) {
- if (recovery_waiter.obc) {
+ if (recovery_waiter.obc && recovery_waiter.obc->obs.exists) {
recovery_waiter.obc->drop_recovery_read();
recovery_waiter.interrupt(why);
}
@@ -42,3 +43,15 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t,
recovering.clear();
}
+void RecoveryBackend::WaitForObjectRecovery::stop() {
+ readable.set_exception(
+ crimson::common::system_shutdown_exception());
+ recovered.set_exception(
+ crimson::common::system_shutdown_exception());
+ pulled.set_exception(
+ crimson::common::system_shutdown_exception());
+ for (auto& [pg_shard, pr] : pushes) {
+ pr.set_exception(
+ crimson::common::system_shutdown_exception());
+ }
+}
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h
index 461ae620fb7..f5853c4da07 100644
--- a/src/crimson/osd/recovery_backend.h
+++ b/src/crimson/osd/recovery_backend.h
@@ -62,6 +62,13 @@ public:
void on_peering_interval_change(ceph::os::Transaction& t) {
clean_up(t, "new peering interval");
}
+
+ seastar::future<> stop() {
+ for (auto& [soid, recovery_waiter] : recovering) {
+ recovery_waiter.stop();
+ }
+ return on_stop();
+ }
protected:
crimson::osd::PG& pg;
crimson::osd::ShardServices& shard_services;
@@ -135,6 +142,7 @@ protected:
std::make_error_code(std::errc::interrupted), why));
}
}
+ void stop();
void dump_detail(Formatter* f) const {
}
};
@@ -152,4 +160,5 @@ protected:
temp_contents.erase(oid);
}
void clean_up(ceph::os::Transaction& t, const std::string& why);
+ virtual seastar::future<> on_stop() = 0;
};
diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc
index e4a5f39dca3..780d6a74619 100644
--- a/src/crimson/osd/replicated_backend.cc
+++ b/src/crimson/osd/replicated_backend.cc
@@ -5,6 +5,7 @@
#include "messages/MOSDRepOpReply.h"
+#include "crimson/common/exception.h"
#include "crimson/common/log.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/shard_services.h"
@@ -32,6 +33,9 @@ ReplicatedBackend::_read(const hobject_t& hoid,
const uint64_t len,
const uint32_t flags)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
@@ -43,6 +47,10 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
epoch_t min_epoch, epoch_t map_epoch,
std::vector<pg_log_entry_t>&& log_entries)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
const ceph_tid_t tid = next_txn_id++;
auto req_id = osd_op_p.req->get_reqid();
auto pending_txn =
@@ -101,3 +109,14 @@ void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
}
}
}
+
+seastar::future<> ReplicatedBackend::stop()
+{
+ logger().info("ReplicatedBackend::stop {}", coll->get_cid());
+ stopping = true;
+ for (auto& [tid, pending_on] : pending_trans) {
+ pending_on.all_committed.set_exception(
+ crimson::common::system_shutdown_exception());
+ }
+ return seastar::now();
+}
diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h
index 53a0038da8a..dbefb303006 100644
--- a/src/crimson/osd/replicated_backend.h
+++ b/src/crimson/osd/replicated_backend.h
@@ -22,6 +22,7 @@ public:
CollectionRef coll,
crimson::osd::ShardServices& shard_services);
void got_rep_op_reply(const MOSDRepOpReply& reply) final;
+ seastar::future<> stop() final;
private:
ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
uint64_t off,
diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h
index 467d7a5f656..3994e8b873b 100644
--- a/src/crimson/osd/replicated_recovery_backend.h
+++ b/src/crimson/osd/replicated_recovery_backend.h
@@ -105,4 +105,7 @@ protected:
const hobject_t& soid,
eversion_t need,
epoch_t epoch_frozen);
+ seastar::future<> on_stop() final {
+ return seastar::now();
+ }
};
diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h
index 443fd0ee451..38d5d192ebd 100644
--- a/src/crimson/osd/shard_services.h
+++ b/src/crimson/osd/shard_services.h
@@ -9,6 +9,7 @@
#include "include/common_fwd.h"
#include "osd_operation.h"
#include "msg/MessageRef.h"
+#include "crimson/common/exception.h"
#include "crimson/os/futurized_collection.h"
#include "osd/PeeringState.h"
#include "crimson/osd/osdmap_service.h"
@@ -90,10 +91,18 @@ public:
template <typename T, typename... Args>
auto start_operation(Args&&... args) {
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
auto op = registry.create_operation<T>(std::forward<Args>(args)...);
return std::make_pair(op, op->start());
}
+ seastar::future<> stop() {
+ stopping = true;
+ return registry.stop();
+ }
+
// Loggers
PerfCounters &get_recoverystate_perf_logger() {
return *recoverystate_perf;
@@ -186,6 +195,12 @@ private:
c->complete(0);
}
} finisher;
+ // prevent creating new osd operations when system is shutting down,
+ // this is necessary because there are chances that a new operation
+ // is created, after the interruption of all ongoing operations, and
+ // creats and waits on a new and may-never-resolve future, in which
+ // case the shutdown may never succeed.
+ bool stopping = false;
public:
AsyncReserver<spg_t, DirectFinisher> local_reserver;
AsyncReserver<spg_t, DirectFinisher> remote_reserver;
diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc
index 318c61156c3..0a0f22eb9ae 100644
--- a/src/test/crimson/test_alien_echo.cc
+++ b/src/test/crimson/test_alien_echo.cc
@@ -181,7 +181,9 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
server.msgr->set_auth_server(&server.dummy_auth);
return server.msgr->bind(entity_addrvec_t{addr}
).then([&server] {
- return server.msgr->start(&server.dispatcher);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(server.dispatcher);
+ return server.msgr->start(chained_dispatchers);
}).then([&dispatcher=server.dispatcher, count] {
return dispatcher.on_reply.wait([&dispatcher, count] {
return dispatcher.count >= count;
@@ -203,7 +205,9 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
client.msgr->set_require_authorizer(false);
client.msgr->set_auth_client(&client.dummy_auth);
client.msgr->set_auth_server(&client.dummy_auth);
- return client.msgr->start(&client.dispatcher).then(
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(client.dispatcher);
+ return client.msgr->start(chained_dispatchers).then(
[addr, &client, &disp=client.dispatcher, count] {
auto conn = client.msgr->connect(addr, entity_name_t::TYPE_OSD);
return seastar::do_until(
diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc
index deb06990329..ad52ee39196 100644
--- a/src/test/crimson/test_messenger.cc
+++ b/src/test/crimson/test_messenger.cc
@@ -66,12 +66,15 @@ static seastar::future<> test_echo(unsigned rounds,
msgr->set_require_authorizer(false);
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).then([this] {
- return msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
+ return msgr->start(chained_dispatchers);
});
}
seastar::future<> shutdown() {
ceph_assert(msgr);
+ msgr->remove_dispatcher(*this);
return msgr->shutdown();
}
};
@@ -104,13 +107,12 @@ static seastar::future<> test_echo(unsigned rounds,
return found->second;
}
- seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override {
+ void ms_handle_connect(crimson::net::ConnectionRef conn) override {
auto session = seastar::make_shared<PingSession>();
auto [i, added] = sessions.emplace(conn.get(), session);
std::ignore = i;
ceph_assert(added);
session->connected_time = mono_clock::now();
- return seastar::now();
}
seastar::future<> ms_dispatch(crimson::net::Connection* c,
MessageRef m) override {
@@ -137,11 +139,14 @@ static seastar::future<> test_echo(unsigned rounds,
msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return msgr->start(chained_dispatchers);
}
seastar::future<> shutdown() {
ceph_assert(msgr);
+ msgr->remove_dispatcher(*this);
return msgr->shutdown();
}
@@ -287,8 +292,10 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).then([this] {
- return msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
+ return msgr->start(chained_dispatchers);
});
}
};
@@ -305,7 +312,9 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return msgr->start(chained_dispatchers);
}
};
};
@@ -335,9 +344,11 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
return server->wait();
}).finally([client] {
logger().info("client shutdown...");
+ client->msgr->remove_dispatcher(*client);
return client->msgr->shutdown();
}).finally([server] {
logger().info("server shutdown...");
+ server->msgr->remove_dispatcher(*server);
return server->msgr->shutdown();
}).finally([server, client] {
logger().info("test_concurrent_dispatch() done!\n");
@@ -365,14 +376,17 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).then([this] {
- return msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
+ return msgr->start(chained_dispatchers);
});
}
entity_addr_t get_addr() const {
return msgr->get_myaddr();
}
seastar::future<> shutdown() {
+ msgr->remove_dispatcher(*this);
return msgr->shutdown();
}
};
@@ -398,7 +412,9 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return msgr->start(chained_dispatchers);
}
void send_pings(const entity_addr_t& addr) {
auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD);
@@ -415,6 +431,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
});
}
seastar::future<> shutdown() {
+ msgr->remove_dispatcher(*this);
return msgr->shutdown().then([this] {
stop_send = true;
return stopped_send_promise.get_future();
@@ -804,7 +821,7 @@ class FailoverSuite : public Dispatcher {
return seastar::now();
}
- seastar::future<> ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(ConnectionRef conn) override {
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked accepted connection: {}", *conn);
@@ -824,10 +841,10 @@ class FailoverSuite : public Dispatcher {
++result->cnt_accept_dispatched;
logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
result->cnt_accept_dispatched, result->index, *conn);
- return flush_pending_send();
+ std::ignore = flush_pending_send();
}
- seastar::future<> ms_handle_connect(ConnectionRef conn) override {
+ void ms_handle_connect(ConnectionRef conn) override {
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked connected connection: {}", *conn);
@@ -844,10 +861,9 @@ class FailoverSuite : public Dispatcher {
++result->cnt_connect_dispatched;
logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
result->cnt_connect_dispatched, result->index, *conn);
- return seastar::now();
}
- seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override {
+ void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked reset connection: {}", *conn);
@@ -866,10 +882,9 @@ class FailoverSuite : public Dispatcher {
++result->cnt_reset_dispatched;
logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
result->cnt_reset_dispatched, result->index, *conn);
- return seastar::now();
}
- seastar::future<> ms_handle_remote_reset(ConnectionRef conn) override {
+ void ms_handle_remote_reset(ConnectionRef conn) override {
auto result = interceptor.find_result(conn);
if (result == nullptr) {
logger().error("Untracked remotely reset connection: {}", *conn);
@@ -886,7 +901,6 @@ class FailoverSuite : public Dispatcher {
++result->cnt_remote_reset_dispatched;
logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
result->cnt_remote_reset_dispatched, result->index, *conn);
- return seastar::now();
}
private:
@@ -895,8 +909,10 @@ class FailoverSuite : public Dispatcher {
test_msgr->set_auth_client(&dummy_auth);
test_msgr->set_auth_server(&dummy_auth);
test_msgr->interceptor = &interceptor;
- return test_msgr->bind(entity_addrvec_t{addr}).then([this] {
- return test_msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return test_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
+ return test_msgr->start(chained_dispatchers);
});
}
@@ -1017,6 +1033,7 @@ class FailoverSuite : public Dispatcher {
}
seastar::future<> shutdown() {
+ test_msgr->remove_dispatcher(*this);
return test_msgr->shutdown();
}
@@ -1239,7 +1256,9 @@ class FailoverTest : public Dispatcher {
cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0));
cmd_msgr->set_auth_client(&dummy_auth);
cmd_msgr->set_auth_server(&dummy_auth);
- return cmd_msgr->start(this).then([this, cmd_peer_addr] {
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return cmd_msgr->start(chained_dispatchers).then([this, cmd_peer_addr] {
logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
return pingpong();
@@ -1262,6 +1281,7 @@ class FailoverTest : public Dispatcher {
return cmd_conn->send(m).then([] {
return seastar::sleep(200ms);
}).finally([this] {
+ cmd_msgr->remove_dispatcher(*this);
return cmd_msgr->shutdown();
});
}
@@ -1376,20 +1396,19 @@ class FailoverSuitePeer : public Dispatcher {
return op_callback();
}
- seastar::future<> ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(ConnectionRef conn) override {
logger().info("[TestPeer] got accept from Test");
ceph_assert(!tracked_conn ||
tracked_conn->is_closed() ||
tracked_conn == conn);
tracked_conn = conn;
- return flush_pending_send();
+ std::ignore = flush_pending_send();
}
- seastar::future<> ms_handle_reset(ConnectionRef conn, bool is_replace) override {
+ void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
logger().info("[TestPeer] got reset from Test");
ceph_assert(tracked_conn == conn);
tracked_conn = nullptr;
- return seastar::now();
}
private:
@@ -1397,8 +1416,10 @@ class FailoverSuitePeer : public Dispatcher {
peer_msgr->set_default_policy(policy);
peer_msgr->set_auth_client(&dummy_auth);
peer_msgr->set_auth_server(&dummy_auth);
- return peer_msgr->bind(entity_addrvec_t{addr}).then([this] {
- return peer_msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ return peer_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
+ chained_dispatchers->push_back(*this);
+ return peer_msgr->start(chained_dispatchers);
});
}
@@ -1430,6 +1451,7 @@ class FailoverSuitePeer : public Dispatcher {
: peer_msgr(peer_msgr), op_callback(op_callback) { }
seastar::future<> shutdown() {
+ peer_msgr->remove_dispatcher(*this);
return peer_msgr->shutdown();
}
@@ -1505,6 +1527,7 @@ class FailoverTestPeer : public Dispatcher {
if (cmd == cmd_t::shutdown) {
logger().info("CmdSrv shutdown...");
// forwarded to FailoverTestPeer::wait()
+ cmd_msgr->remove_dispatcher(*this);
(void) cmd_msgr->shutdown();
return seastar::now();
}
@@ -1518,9 +1541,8 @@ class FailoverTestPeer : public Dispatcher {
}
}
- seastar::future<> ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(ConnectionRef conn) override {
cmd_conn = conn;
- return seastar::now();
}
private:
@@ -1573,8 +1595,10 @@ class FailoverTestPeer : public Dispatcher {
cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
cmd_msgr->set_auth_client(&dummy_auth);
cmd_msgr->set_auth_server(&dummy_auth);
- return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
- return cmd_msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this, chained_dispatchers]() mutable {
+ return cmd_msgr->start(chained_dispatchers);
});
}
diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc
index 1275b451ebe..45335bd5030 100644
--- a/src/test/crimson/test_monc.cc
+++ b/src/test/crimson/test_monc.cc
@@ -24,6 +24,7 @@ DummyAuthHandler dummy_handler;
static seastar::future<> test_monc()
{
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
return crimson::common::sharded_conf().start(EntityName{}, string_view{"ceph"}).then([] {
std::vector<const char*> args;
std::string cluster;
@@ -38,7 +39,7 @@ static seastar::future<> test_monc()
return conf.parse_config_files(conf_file_list);
}).then([] {
return crimson::common::sharded_perf_coll().start();
- }).then([] {
+ }).then([chained_dispatchers]() mutable {
auto msgr = crimson::net::Messenger::create(entity_name_t::OSD(0), "monc", 0);
auto& conf = crimson::common::local_conf();
if (conf->ms_crc_data) {
@@ -49,8 +50,9 @@ static seastar::future<> test_monc()
}
msgr->set_require_authorizer(false);
return seastar::do_with(MonClient{*msgr, dummy_handler},
- [msgr](auto& monc) {
- return msgr->start(&monc).then([&monc] {
+ [msgr, chained_dispatchers](auto& monc) mutable {
+ chained_dispatchers->push_back(monc);
+ return msgr->start(chained_dispatchers).then([&monc] {
return seastar::with_timeout(
seastar::lowres_clock::now() + std::chrono::seconds{10},
monc.start());
diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc
index 70335c44476..30793487d78 100644
--- a/src/tools/crimson/perf_crimson_msgr.cc
+++ b/src/tools/crimson/perf_crimson_msgr.cc
@@ -181,7 +181,9 @@ static seastar::future<> run(
msgr->set_crc_data();
}
return msgr->bind(entity_addrvec_t{addr}).then([this] {
- return msgr->start(this);
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return msgr->start(chained_dispatchers);
});
});
}
@@ -297,9 +299,8 @@ static seastar::future<> run(
return nr_depth - depth.current();
}
- seastar::future<> ms_handle_connect(crimson::net::ConnectionRef conn) override {
+ void ms_handle_connect(crimson::net::ConnectionRef conn) override {
conn_stats.connected_time = mono_clock::now();
- return seastar::now();
}
seastar::future<> ms_dispatch(crimson::net::Connection* c,
MessageRef m) override {
@@ -331,7 +332,9 @@ static seastar::future<> run(
}
seastar::future<> init(bool v1_crc_enabled) {
- return container().invoke_on_all([v1_crc_enabled] (auto& client) {
+ auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
+ chained_dispatchers->push_back(*this);
+ return container().invoke_on_all([v1_crc_enabled, chained_dispatchers] (auto& client) mutable {
if (client.is_active()) {
client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
@@ -342,7 +345,7 @@ static seastar::future<> run(
client.msgr->set_crc_header();
client.msgr->set_crc_data();
}
- return client.msgr->start(&client);
+ return client.msgr->start(chained_dispatchers);
}
return seastar::now();
});