summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/SocketConnection.h
diff options
context:
space:
mode:
authorYingxin Cheng <yingxincheng@gmail.com>2019-02-14 08:49:34 +0100
committerKefu Chai <kchai@redhat.com>2019-04-05 04:48:50 +0200
commit2ab8b741d302769d2f8f8466d81df3431b59c2b9 (patch)
treefad43a8c307e5786e064aab57510334c7ff615b8 /src/crimson/net/SocketConnection.h
parentcrimson/net: batch messages instead of chaining futures (diff)
downloadceph-2ab8b741d302769d2f8f8466d81df3431b59c2b9.tar.xz
ceph-2ab8b741d302769d2f8f8466d81df3431b59c2b9.zip
crimson/net: introduce protocol-level abstraction
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
Diffstat (limited to 'src/crimson/net/SocketConnection.h')
-rw-r--r--src/crimson/net/SocketConnection.h141
1 files changed, 8 insertions, 133 deletions
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
index 93794310c0c..a403b6790d3 100644
--- a/src/crimson/net/SocketConnection.h
+++ b/src/crimson/net/SocketConnection.h
@@ -14,32 +14,23 @@
#pragma once
-#include <seastar/core/gate.hh>
-#include <seastar/core/reactor.hh>
-#include <seastar/core/shared_future.hh>
#include <seastar/core/sharded.hh>
#include "msg/Policy.h"
#include "Connection.h"
-#include "Socket.h"
#include "crimson/thread/Throttle.h"
-class AuthAuthorizer;
-class AuthSessionHandler;
-
namespace ceph::net {
-using stop_t = seastar::stop_iteration;
-
+class Protocol;
+class Socket;
class SocketMessenger;
class SocketConnection;
using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
class SocketConnection : public Connection {
SocketMessenger& messenger;
- seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
- Dispatcher& dispatcher;
- seastar::gate pending_dispatch;
+ std::unique_ptr<Protocol> protocol;
// if acceptor side, socket_port is different from peer_addr.get_port();
// if connector side, socket_port is different from my_addr.get_port().
@@ -51,84 +42,6 @@ class SocketConnection : public Connection {
side_t side = side_t::none;
uint16_t socket_port = 0;
- enum class state_t {
- none,
- accepting,
- connecting,
- open,
- standby,
- wait,
- closing
- };
- state_t state = state_t::none;
- // wait until current state changed
- seastar::shared_promise<> state_changed;
-
- // write_state is changed with state atomically, indicating the write
- // behavior of the according state.
- enum class write_state_t {
- none,
- delay,
- open,
- drop
- };
- write_state_t write_state = write_state_t::none;
-
- /// become valid only when state is state_t::closing
- seastar::shared_future<> close_ready;
-
- /// state for handshake
- struct Handshake {
- ceph_msg_connect connect;
- ceph_msg_connect_reply reply;
- AuthAuthorizer* authorizer = nullptr;
- std::chrono::milliseconds backoff;
- uint32_t connect_seq = 0;
- uint32_t peer_global_seq = 0;
- uint32_t global_seq;
- } h;
-
- /// server side of handshake negotiation
- seastar::future<stop_t> repeat_handle_connect();
- seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing,
- bufferlist&& authorizer_reply);
- seastar::future<stop_t> replace_existing(SocketConnectionRef existing,
- bufferlist&& authorizer_reply,
- bool is_reset_from_peer = false);
- seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag,
- bufferlist&& authorizer_reply = {});
- seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
- bufferlist&& authorizer_reply);
-
- seastar::future<> handle_keepalive2();
- seastar::future<> handle_keepalive2_ack();
-
- bool require_auth_feature() const;
- uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
- /// client side of handshake negotiation
- seastar::future<stop_t> repeat_connect();
- seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
- void reset_session();
-
- /// state for an incoming message
- struct MessageReader {
- ceph_msg_header header;
- ceph_msg_footer footer;
- bufferlist front;
- bufferlist middle;
- bufferlist data;
- } m;
-
- seastar::future<> maybe_throttle();
- seastar::future<> handle_tags();
- seastar::future<> handle_ack();
-
- bool write_dispatching = false;
- void write_event();
-
- /// encode/write a message
- seastar::future<> write_message(MessageRef msg);
-
ceph::net::Policy<ceph::thread::Throttle> policy;
uint64_t features;
void set_features(uint64_t new_features) {
@@ -144,42 +57,15 @@ class SocketConnection : public Connection {
/// false otherwise.
bool update_rx_seq(seq_num_t seq);
- seastar::future<> read_message();
-
- std::unique_ptr<AuthSessionHandler> session_security;
-
// messages to be resent after connection gets reset
std::queue<MessageRef> out_q;
// messages sent, but not yet acked by peer
std::queue<MessageRef> sent;
- static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
-
- struct Keepalive {
- struct {
- const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
- ceph_timespec stamp;
- } __attribute__((packed)) req;
- struct {
- const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
- ceph_timespec stamp;
- } __attribute__((packed)) ack;
- ceph_timespec ack_stamp;
- } k;
- bool m_keepalive = false;
- bool m_keepalive_ack = false;
-
- seastar::future<> fault();
-
- void execute_open();
-
- seastar::future<> do_keepalive();
- seastar::future<> do_keepalive_ack();
- seastar::future<> do_close();
public:
SocketConnection(SocketMessenger& messenger,
Dispatcher& dispatcher);
- ~SocketConnection();
+ ~SocketConnection() override;
Messenger* get_messenger() const override;
@@ -209,28 +95,14 @@ class SocketConnection : public Connection {
void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
const entity_addr_t& peer_addr);
- /// the number of connections initiated in this session, increment when a
- /// new connection is established
- uint32_t connect_seq() const {
- return h.connect_seq;
- }
-
- /// the client side should connect us with a gseq. it will be reset with
- /// the one of exsting connection if it's greater.
- uint32_t peer_global_seq() const {
- return h.peer_global_seq;
- }
seq_num_t rx_seq_num() const {
return in_seq;
}
- /// current state of connection
- state_t get_state() const {
- return state;
- }
bool is_server_side() const {
return policy.server;
}
+
bool is_lossy() const {
return policy.lossy;
}
@@ -241,6 +113,9 @@ class SocketConnection : public Connection {
std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
return {out_seq, std::move(out_q)};
}
+
+ friend class Protocol;
+ friend class ProtocolV1;
};
} // namespace ceph::net