summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/SocketConnection.h
blob: 7d20f68867e897f2059d3a2c43644b127dd82906 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2017 Red Hat, Inc
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#pragma once

#include <seastar/core/sharded.hh>

#include "msg/Policy.h"
#include "crimson/common/throttle.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Socket.h"

namespace crimson::net {

class ProtocolV2;
class SocketMessenger;
class SocketConnection;
using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;

#ifdef UNIT_TESTS_BUILT
class Interceptor;
#endif

/**
 * ConnectionHandler
 *
 * The interface class to implement Connection, called by SocketConnection.
 *
 * The operations must be done in get_shard_id().
 */
class ConnectionHandler {
public:
  using clock_t = seastar::lowres_system_clock;

  virtual ~ConnectionHandler() = default;

  ConnectionHandler(const ConnectionHandler &) = delete;
  ConnectionHandler(ConnectionHandler &&) = delete;
  ConnectionHandler &operator=(const ConnectionHandler &) = delete;
  ConnectionHandler &operator=(ConnectionHandler &&) = delete;

  virtual seastar::shard_id get_shard_id() const = 0;

  virtual bool is_connected() const = 0;

  virtual seastar::future<> send(MessageURef) = 0;

  virtual seastar::future<> send_keepalive() = 0;

  virtual clock_t::time_point get_last_keepalive() const = 0;

  virtual clock_t::time_point get_last_keepalive_ack() const = 0;

  virtual void set_last_keepalive_ack(clock_t::time_point) = 0;

  virtual void mark_down() = 0;

protected:
  ConnectionHandler() = default;
};

class SocketConnection : public Connection {
 /*
  * Connection interfaces, public to users
  * Working in ConnectionHandler::get_shard_id()
  */
 public:
  SocketConnection(SocketMessenger& messenger,
                   ChainedDispatchers& dispatchers);

  ~SocketConnection() override;

  const seastar::shard_id get_shard_id() const override {
    return io_handler->get_shard_id();
  }

  const entity_name_t &get_peer_name() const override {
    return peer_name;
  }

  const entity_addr_t &get_peer_addr() const override {
    return peer_addr;
  }

  const entity_addr_t &get_peer_socket_addr() const override {
    return target_addr;
  }

  uint64_t get_features() const override {
    return features;
  }

  bool is_connected() const override;

  seastar::future<> send(MessageURef msg) override;

  seastar::future<> send_keepalive() override;

  clock_t::time_point get_last_keepalive() const override;

  clock_t::time_point get_last_keepalive_ack() const override;

  void set_last_keepalive_ack(clock_t::time_point when) override;

  void mark_down() override;

  bool has_user_private() const override {
    return user_private != nullptr;
  }

  user_private_t &get_user_private() override {
    assert(has_user_private());
    return *user_private;
  }

  void set_user_private(std::unique_ptr<user_private_t> new_user_private) override {
    assert(!has_user_private());
    user_private = std::move(new_user_private);
  }

  void print(std::ostream& out) const override;

 /*
  * Public to SocketMessenger
  * Working in SocketMessenger::get_shard_id();
  */
 public:
  /// start a handshake from the client's perspective,
  /// only call when SocketConnection first construct
  void start_connect(const entity_addr_t& peer_addr,
                     const entity_name_t& peer_name);

  /// start a handshake from the server's perspective,
  /// only call when SocketConnection first construct
  void start_accept(SocketFRef&& socket,
                    const entity_addr_t& peer_addr);

  seastar::future<> close_clean_yielded();

  seastar::socket_address get_local_address() const;

  seastar::shard_id get_messenger_shard_id() const;

  SocketMessenger &get_messenger() const;

  ConnectionRef get_local_shared_foreign_from_this();

private:
  void set_peer_type(entity_type_t peer_type);

  void set_peer_id(int64_t peer_id);

  void set_peer_name(entity_name_t name) {
    set_peer_type(name.type());
    set_peer_id(name.num());
  }

  void set_features(uint64_t f);

  void set_socket(Socket *s);

#ifdef UNIT_TESTS_BUILT
  bool is_protocol_ready() const override;

  bool is_protocol_standby() const override;

  bool is_protocol_closed_clean() const override;

  bool is_protocol_closed() const override;

  // peer wins if myaddr > peeraddr
  bool peer_wins() const override;

  Interceptor *interceptor = nullptr;
#else
  // peer wins if myaddr > peeraddr
  bool peer_wins() const;
#endif

private:
  const seastar::shard_id msgr_sid;

  /*
   * Core owner is messenger core, may allow to access from the I/O core.
   */
  SocketMessenger& messenger;

  std::unique_ptr<ProtocolV2> protocol;

  Socket *socket = nullptr;

  entity_name_t peer_name = {0, entity_name_t::NEW};

  entity_addr_t peer_addr;

  // which of the peer_addrs we're connecting to (as client)
  // or should reconnect to (as peer)
  entity_addr_t target_addr;

  uint64_t features = 0;

  ceph::net::Policy<crimson::common::Throttle> policy;

  uint64_t peer_global_id = 0;

  /*
   * Core owner is I/O core (mutable).
   */
  std::unique_ptr<ConnectionHandler> io_handler;

  /*
   * Core owner is up to the connection user.
   */
  std::unique_ptr<user_private_t> user_private;

  friend class IOHandler;
  friend class ProtocolV2;
  friend class FrameAssemblerV2;
};

} // namespace crimson::net

#if FMT_VERSION >= 90000
template <> struct fmt::formatter<crimson::net::SocketConnection> : fmt::ostream_formatter {};
#endif