diff options
author | Yingxin Cheng <yingxincheng@gmail.com> | 2019-02-14 08:49:34 +0100 |
---|---|---|
committer | Kefu Chai <kchai@redhat.com> | 2019-04-05 04:48:50 +0200 |
commit | 2ab8b741d302769d2f8f8466d81df3431b59c2b9 (patch) | |
tree | fad43a8c307e5786e064aab57510334c7ff615b8 /src/crimson/net/SocketConnection.h | |
parent | crimson/net: batch messages instead of chaining futures (diff) | |
download | ceph-2ab8b741d302769d2f8f8466d81df3431b59c2b9.tar.xz ceph-2ab8b741d302769d2f8f8466d81df3431b59c2b9.zip |
crimson/net: introduce protocol-level abstraction
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
Diffstat (limited to 'src/crimson/net/SocketConnection.h')
-rw-r--r-- | src/crimson/net/SocketConnection.h | 141 |
1 files changed, 8 insertions, 133 deletions
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 93794310c0c..a403b6790d3 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -14,32 +14,23 @@ #pragma once -#include <seastar/core/gate.hh> -#include <seastar/core/reactor.hh> -#include <seastar/core/shared_future.hh> #include <seastar/core/sharded.hh> #include "msg/Policy.h" #include "Connection.h" -#include "Socket.h" #include "crimson/thread/Throttle.h" -class AuthAuthorizer; -class AuthSessionHandler; - namespace ceph::net { -using stop_t = seastar::stop_iteration; - +class Protocol; +class Socket; class SocketMessenger; class SocketConnection; using SocketConnectionRef = seastar::shared_ptr<SocketConnection>; class SocketConnection : public Connection { SocketMessenger& messenger; - seastar::foreign_ptr<std::unique_ptr<Socket>> socket; - Dispatcher& dispatcher; - seastar::gate pending_dispatch; + std::unique_ptr<Protocol> protocol; // if acceptor side, socket_port is different from peer_addr.get_port(); // if connector side, socket_port is different from my_addr.get_port(). @@ -51,84 +42,6 @@ class SocketConnection : public Connection { side_t side = side_t::none; uint16_t socket_port = 0; - enum class state_t { - none, - accepting, - connecting, - open, - standby, - wait, - closing - }; - state_t state = state_t::none; - // wait until current state changed - seastar::shared_promise<> state_changed; - - // write_state is changed with state atomically, indicating the write - // behavior of the according state. - enum class write_state_t { - none, - delay, - open, - drop - }; - write_state_t write_state = write_state_t::none; - - /// become valid only when state is state_t::closing - seastar::shared_future<> close_ready; - - /// state for handshake - struct Handshake { - ceph_msg_connect connect; - ceph_msg_connect_reply reply; - AuthAuthorizer* authorizer = nullptr; - std::chrono::milliseconds backoff; - uint32_t connect_seq = 0; - uint32_t peer_global_seq = 0; - uint32_t global_seq; - } h; - - /// server side of handshake negotiation - seastar::future<stop_t> repeat_handle_connect(); - seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply); - seastar::future<stop_t> replace_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply, - bool is_reset_from_peer = false); - seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag, - bufferlist&& authorizer_reply = {}); - seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag, - bufferlist&& authorizer_reply); - - seastar::future<> handle_keepalive2(); - seastar::future<> handle_keepalive2_ack(); - - bool require_auth_feature() const; - uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; - /// client side of handshake negotiation - seastar::future<stop_t> repeat_connect(); - seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag); - void reset_session(); - - /// state for an incoming message - struct MessageReader { - ceph_msg_header header; - ceph_msg_footer footer; - bufferlist front; - bufferlist middle; - bufferlist data; - } m; - - seastar::future<> maybe_throttle(); - seastar::future<> handle_tags(); - seastar::future<> handle_ack(); - - bool write_dispatching = false; - void write_event(); - - /// encode/write a message - seastar::future<> write_message(MessageRef msg); - ceph::net::Policy<ceph::thread::Throttle> policy; uint64_t features; void set_features(uint64_t new_features) { @@ -144,42 +57,15 @@ class SocketConnection : public Connection { /// false otherwise. bool update_rx_seq(seq_num_t seq); - seastar::future<> read_message(); - - std::unique_ptr<AuthSessionHandler> session_security; - // messages to be resent after connection gets reset std::queue<MessageRef> out_q; // messages sent, but not yet acked by peer std::queue<MessageRef> sent; - static void discard_up_to(std::queue<MessageRef>*, seq_num_t); - - struct Keepalive { - struct { - const char tag = CEPH_MSGR_TAG_KEEPALIVE2; - ceph_timespec stamp; - } __attribute__((packed)) req; - struct { - const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; - ceph_timespec stamp; - } __attribute__((packed)) ack; - ceph_timespec ack_stamp; - } k; - bool m_keepalive = false; - bool m_keepalive_ack = false; - - seastar::future<> fault(); - - void execute_open(); - - seastar::future<> do_keepalive(); - seastar::future<> do_keepalive_ack(); - seastar::future<> do_close(); public: SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher); - ~SocketConnection(); + ~SocketConnection() override; Messenger* get_messenger() const override; @@ -209,28 +95,14 @@ class SocketConnection : public Connection { void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket, const entity_addr_t& peer_addr); - /// the number of connections initiated in this session, increment when a - /// new connection is established - uint32_t connect_seq() const { - return h.connect_seq; - } - - /// the client side should connect us with a gseq. it will be reset with - /// the one of exsting connection if it's greater. - uint32_t peer_global_seq() const { - return h.peer_global_seq; - } seq_num_t rx_seq_num() const { return in_seq; } - /// current state of connection - state_t get_state() const { - return state; - } bool is_server_side() const { return policy.server; } + bool is_lossy() const { return policy.lossy; } @@ -241,6 +113,9 @@ class SocketConnection : public Connection { std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() { return {out_seq, std::move(out_q)}; } + + friend class Protocol; + friend class ProtocolV1; }; } // namespace ceph::net |