summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/crimson/net/Errors.cc6
-rw-r--r--src/crimson/net/Errors.h3
-rw-r--r--src/crimson/net/SocketConnection.cc170
-rw-r--r--src/crimson/net/SocketConnection.h12
4 files changed, 188 insertions, 3 deletions
diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc
index fe182377dfc..e925b9acd35 100644
--- a/src/crimson/net/Errors.cc
+++ b/src/crimson/net/Errors.cc
@@ -25,6 +25,12 @@ const std::error_category& net_category()
std::string message(int ev) const override {
switch (static_cast<error>(ev)) {
+ case error::bad_connect_banner:
+ return "bad connect banner";
+ case error::bad_peer_address:
+ return "bad peer address";
+ case error::negotiation_failure:
+ return "negotiation failure";
case error::read_eof:
return "read eof";
case error::connection_aborted:
diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h
index af3720ae469..55921a9af5d 100644
--- a/src/crimson/net/Errors.h
+++ b/src/crimson/net/Errors.h
@@ -20,6 +20,9 @@ namespace ceph::net {
/// net error codes
enum class error {
+ bad_connect_banner,
+ bad_peer_address,
+ negotiation_failure,
read_eof,
connection_aborted,
connection_refused,
diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc
index 8ecafa43bdb..26a3f1b1685 100644
--- a/src/crimson/net/SocketConnection.cc
+++ b/src/crimson/net/SocketConnection.cc
@@ -12,6 +12,7 @@
*
*/
+#include <algorithm>
#include <core/shared_future.hh>
#include "SocketConnection.h"
@@ -28,7 +29,7 @@ SocketConnection::SocketConnection(Messenger *messenger,
socket(std::move(fd)),
in(socket.input()),
out(socket.output()),
- send_ready(seastar::now())
+ send_ready(h.promise.get_future())
{
}
@@ -156,12 +157,175 @@ seastar::future<> SocketConnection::close()
return seastar::when_all(in.close(), out.close()).discard_result();
}
+// handshake
+
+/// store the banner in a non-const string for buffer::create_static()
+static char banner[] = CEPH_BANNER;
+constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
+
+constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
+constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
+
+WRITE_RAW_ENCODER(ceph_msg_connect);
+WRITE_RAW_ENCODER(ceph_msg_connect_reply);
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
+{
+ return out << "connect{features=" << std::hex << c.features << std::dec
+ << " host_type=" << c.host_type
+ << " global_seq=" << c.global_seq
+ << " connect_seq=" << c.connect_seq
+ << " protocol_version=" << c.protocol_version
+ << " authorizer_protocol=" << c.authorizer_protocol
+ << " authorizer_len=" << c.authorizer_len
+ << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
+}
+
+std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
+{
+ return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
+ << " features=" << std::hex << r.features << std::dec
+ << " global_seq=" << r.global_seq
+ << " connect_seq=" << r.connect_seq
+ << " protocol_version=" << r.protocol_version
+ << " authorizer_len=" << r.authorizer_len
+ << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
+}
+
+// check that the buffer starts with a valid banner without requiring it to
+// be contiguous in memory
+static void validate_banner(bufferlist::const_iterator& p)
+{
+ auto b = std::cbegin(banner);
+ auto end = b + banner_size;
+ while (b != end) {
+ const char *buf{nullptr};
+ auto remaining = std::distance(b, end);
+ auto len = p.get_ptr_and_advance(remaining, &buf);
+ if (!std::equal(buf, buf + len, b)) {
+ throw std::system_error(make_error_code(error::bad_connect_banner));
+ }
+ b += len;
+ }
+}
+
+// make sure that we agree with the peer about its address
+static void validate_peer_addr(const entity_addr_t& addr,
+ const entity_addr_t& expected)
+{
+ if (addr == expected) {
+ return;
+ }
+ // ok if server bound anonymously, as long as port/nonce match
+ if (addr.is_blank_ip() &&
+ addr.get_port() == expected.get_port() &&
+ addr.get_nonce() == expected.get_nonce()) {
+ return;
+ } else {
+ throw std::system_error(make_error_code(error::bad_peer_address));
+ }
+}
+
+/// return a static bufferptr to the given object
+template <typename T>
+bufferptr create_static(T& obj)
+{
+ return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
+}
+
+seastar::future<> SocketConnection::handle_connect()
+{
+ memset(&h.reply, 0, sizeof(h.reply));
+
+ h.reply.protocol_version = CEPH_OSDC_PROTOCOL;
+ h.reply.tag = CEPH_MSGR_TAG_READY;
+
+ bufferlist bl;
+ bl.append(create_static(h.reply));
+
+ return out.write(std::move(bl))
+ .then([this] { return out.flush(); });
+}
+
+seastar::future<> SocketConnection::handle_connect_reply()
+{
+ if (h.reply.tag != CEPH_MSGR_TAG_READY) {
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ return seastar::now();
+}
+
seastar::future<> SocketConnection::client_handshake()
{
- return seastar::now(); // TODO
+ // read server's handshake header
+ return read(server_header_size)
+ .then([this] (bufferlist headerbl) {
+ auto p = headerbl.cbegin();
+ validate_banner(p);
+ entity_addr_t saddr, caddr;
+ ::decode(saddr, p);
+ ::decode(caddr, p);
+ assert(p.end());
+ validate_peer_addr(saddr, peer_addr);
+
+ if (my_addr != caddr) {
+ // take peer's address for me, but preserve my port/nonce
+ caddr.set_port(my_addr.get_port());
+ caddr.nonce = my_addr.nonce;
+ my_addr = caddr;
+ }
+ // encode/send client's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(my_addr, bl, 0);
+
+ // encode ceph_msg_connect
+ memset(&h.connect, 0, sizeof(h.connect));
+ h.connect.protocol_version = CEPH_OSDC_PROTOCOL;
+ bl.append(create_static(h.connect));
+
+ // TODO: append authorizer
+ return out.write(std::move(bl))
+ .then([this] { return out.flush(); });
+ }).then([this] {
+ // read the reply
+ return read(sizeof(h.reply));
+ }).then([this] (bufferlist bl) {
+ auto p = bl.begin();
+ ::decode(h.reply, p);
+ // TODO: read authorizer
+ assert(p.end());
+ return handle_connect_reply();
+ }).then_wrapped([this] (auto fut) {
+ // satisfy the handshake's promise
+ fut.forward_to(std::move(h.promise));
+ });
}
seastar::future<> SocketConnection::server_handshake()
{
- return seastar::now(); // TODO
+ // encode/send server's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(my_addr, bl, 0);
+ ::encode(peer_addr, bl, 0);
+ return out.write(std::move(bl))
+ .then([this] { return out.flush(); })
+ .then([this] {
+ // read client's handshake header and connect request
+ return read(client_header_size + sizeof(h.connect));
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ validate_banner(p);
+ entity_addr_t addr;
+ ::decode(addr, p);
+ ::decode(h.connect, p);
+ assert(p.end());
+ // TODO: read authorizer
+
+ return handle_connect();
+ }).then_wrapped([this] (auto fut) {
+ // satisfy the handshake's promise
+ fut.forward_to(std::move(h.promise));
+ });
}
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
index ff77f0dd336..de0062b7c30 100644
--- a/src/crimson/net/SocketConnection.h
+++ b/src/crimson/net/SocketConnection.h
@@ -35,6 +35,18 @@ class SocketConnection : public Connection {
/// read the requested number of bytes into a bufferlist
seastar::future<bufferlist> read(size_t bytes);
+ /// state for handshake
+ struct Handshake {
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ seastar::promise<> promise;
+ } h;
+
+ /// server side of handshake negotiation
+ seastar::future<> handle_connect();
+ /// client side of handshake negotiation
+ seastar::future<> handle_connect_reply();
+
/// state for an incoming message
struct MessageReader {
ceph_msg_header header;