summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/ProtocolV1.h
blob: 834016e5ec723fb36c6a4652edaea773700c9c4c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include "Protocol.h"

class AuthAuthorizer;
class AuthSessionHandler;

namespace ceph::net {

class ProtocolV1 final : public Protocol {
 public:
  ProtocolV1(Dispatcher& dispatcher,
             SocketConnection& conn,
             SocketMessenger& messenger);
  ~ProtocolV1() override;

 private:
  void start_connect(const entity_addr_t& peer_addr,
                     const entity_type_t& peer_type) override;

  void start_accept(SocketFRef&& socket,
                    const entity_addr_t& peer_addr) override;

  void trigger_close() override;

  ceph::bufferlist do_sweep_messages(
      const std::deque<MessageRef>& msgs,
      size_t num_msgs,
      bool require_keepalive,
      std::optional<utime_t> keepalive_ack,
      bool require_ack) override;

 private:
  SocketMessenger &messenger;

  enum class state_t {
    none,
    accepting,
    connecting,
    open,
    standby,
    wait,
    closing
  };
  state_t state = state_t::none;

  // state for handshake
  struct Handshake {
    ceph_msg_connect connect;
    ceph_msg_connect_reply reply;
    ceph::bufferlist auth_payload;  // auth(orizer) payload read off the wire
    ceph::bufferlist auth_more;     // connect-side auth retry (we added challenge)
    std::chrono::milliseconds backoff;
    uint32_t connect_seq = 0;
    uint32_t peer_global_seq = 0;
    uint32_t global_seq;
  } h;

  std::unique_ptr<AuthSessionHandler> session_security;

  // state for an incoming message
  struct MessageReader {
    ceph_msg_header header;
    ceph_msg_footer footer;
    bufferlist front;
    bufferlist middle;
    bufferlist data;
  } m;

  struct Keepalive {
    struct {
      const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
      ceph_timespec stamp;
    } __attribute__((packed)) req;
    struct {
      const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
      ceph_timespec stamp;
    } __attribute__((packed)) ack;
    ceph_timespec ack_stamp;
  } k;

 private:
  // connecting
  void reset_session();
  seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
  seastar::future<stop_t> repeat_connect();
  ceph::bufferlist get_auth_payload();

  // accepting
  seastar::future<stop_t> send_connect_reply(
      msgr_tag_t tag, bufferlist&& authorizer_reply = {});
  seastar::future<stop_t> send_connect_reply_ready(
      msgr_tag_t tag, bufferlist&& authorizer_reply);
  seastar::future<stop_t> replace_existing(
      SocketConnectionRef existing,
      bufferlist&& authorizer_reply,
      bool is_reset_from_peer = false);
  seastar::future<stop_t> handle_connect_with_existing(
      SocketConnectionRef existing, bufferlist&& authorizer_reply);
  bool require_auth_feature() const;
  seastar::future<stop_t> repeat_handle_connect();

  // open
  seastar::future<> handle_keepalive2_ack();
  seastar::future<> handle_keepalive2();
  seastar::future<> handle_ack();
  seastar::future<> maybe_throttle();
  seastar::future<> read_message();
  seastar::future<> handle_tags();
  void execute_open();

  // replacing
  // the number of connections initiated in this session, increment when a
  // new connection is established
  uint32_t connect_seq() const { return h.connect_seq; }
  // the client side should connect us with a gseq. it will be reset with
  // the one of exsting connection if it's greater.
  uint32_t peer_global_seq() const { return h.peer_global_seq; }
  // current state of ProtocolV1
  state_t get_state() const { return state; }

  seastar::future<> fault();
};

} // namespace ceph::net