diff options
-rw-r--r-- | src/crimson/net/Errors.cc | 6 | ||||
-rw-r--r-- | src/crimson/net/Errors.h | 3 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.cc | 170 | ||||
-rw-r--r-- | src/crimson/net/SocketConnection.h | 12 |
4 files changed, 188 insertions, 3 deletions
diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc index fe182377dfc..e925b9acd35 100644 --- a/src/crimson/net/Errors.cc +++ b/src/crimson/net/Errors.cc @@ -25,6 +25,12 @@ const std::error_category& net_category() std::string message(int ev) const override { switch (static_cast<error>(ev)) { + case error::bad_connect_banner: + return "bad connect banner"; + case error::bad_peer_address: + return "bad peer address"; + case error::negotiation_failure: + return "negotiation failure"; case error::read_eof: return "read eof"; case error::connection_aborted: diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h index af3720ae469..55921a9af5d 100644 --- a/src/crimson/net/Errors.h +++ b/src/crimson/net/Errors.h @@ -20,6 +20,9 @@ namespace ceph::net { /// net error codes enum class error { + bad_connect_banner, + bad_peer_address, + negotiation_failure, read_eof, connection_aborted, connection_refused, diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 8ecafa43bdb..26a3f1b1685 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -12,6 +12,7 @@ * */ +#include <algorithm> #include <core/shared_future.hh> #include "SocketConnection.h" @@ -28,7 +29,7 @@ SocketConnection::SocketConnection(Messenger *messenger, socket(std::move(fd)), in(socket.input()), out(socket.output()), - send_ready(seastar::now()) + send_ready(h.promise.get_future()) { } @@ -156,12 +157,175 @@ seastar::future<> SocketConnection::close() return seastar::when_all(in.close(), out.close()).discard_result(); } +// handshake + +/// store the banner in a non-const string for buffer::create_static() +static char banner[] = CEPH_BANNER; +constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; + +constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); +constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); + +WRITE_RAW_ENCODER(ceph_msg_connect); +WRITE_RAW_ENCODER(ceph_msg_connect_reply); + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) +{ + return out << "connect{features=" << std::hex << c.features << std::dec + << " host_type=" << c.host_type + << " global_seq=" << c.global_seq + << " connect_seq=" << c.connect_seq + << " protocol_version=" << c.protocol_version + << " authorizer_protocol=" << c.authorizer_protocol + << " authorizer_len=" << c.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}'; +} + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) +{ + return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag) + << " features=" << std::hex << r.features << std::dec + << " global_seq=" << r.global_seq + << " connect_seq=" << r.connect_seq + << " protocol_version=" << r.protocol_version + << " authorizer_len=" << r.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}'; +} + +// check that the buffer starts with a valid banner without requiring it to +// be contiguous in memory +static void validate_banner(bufferlist::const_iterator& p) +{ + auto b = std::cbegin(banner); + auto end = b + banner_size; + while (b != end) { + const char *buf{nullptr}; + auto remaining = std::distance(b, end); + auto len = p.get_ptr_and_advance(remaining, &buf); + if (!std::equal(buf, buf + len, b)) { + throw std::system_error(make_error_code(error::bad_connect_banner)); + } + b += len; + } +} + +// make sure that we agree with the peer about its address +static void validate_peer_addr(const entity_addr_t& addr, + const entity_addr_t& expected) +{ + if (addr == expected) { + return; + } + // ok if server bound anonymously, as long as port/nonce match + if (addr.is_blank_ip() && + addr.get_port() == expected.get_port() && + addr.get_nonce() == expected.get_nonce()) { + return; + } else { + throw std::system_error(make_error_code(error::bad_peer_address)); + } +} + +/// return a static bufferptr to the given object +template <typename T> +bufferptr create_static(T& obj) +{ + return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj)); +} + +seastar::future<> SocketConnection::handle_connect() +{ + memset(&h.reply, 0, sizeof(h.reply)); + + h.reply.protocol_version = CEPH_OSDC_PROTOCOL; + h.reply.tag = CEPH_MSGR_TAG_READY; + + bufferlist bl; + bl.append(create_static(h.reply)); + + return out.write(std::move(bl)) + .then([this] { return out.flush(); }); +} + +seastar::future<> SocketConnection::handle_connect_reply() +{ + if (h.reply.tag != CEPH_MSGR_TAG_READY) { + throw std::system_error(make_error_code(error::negotiation_failure)); + } + return seastar::now(); +} + seastar::future<> SocketConnection::client_handshake() { - return seastar::now(); // TODO + // read server's handshake header + return read(server_header_size) + .then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + assert(p.end()); + validate_peer_addr(saddr, peer_addr); + + if (my_addr != caddr) { + // take peer's address for me, but preserve my port/nonce + caddr.set_port(my_addr.get_port()); + caddr.nonce = my_addr.nonce; + my_addr = caddr; + } + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(my_addr, bl, 0); + + // encode ceph_msg_connect + memset(&h.connect, 0, sizeof(h.connect)); + h.connect.protocol_version = CEPH_OSDC_PROTOCOL; + bl.append(create_static(h.connect)); + + // TODO: append authorizer + return out.write(std::move(bl)) + .then([this] { return out.flush(); }); + }).then([this] { + // read the reply + return read(sizeof(h.reply)); + }).then([this] (bufferlist bl) { + auto p = bl.begin(); + ::decode(h.reply, p); + // TODO: read authorizer + assert(p.end()); + return handle_connect_reply(); + }).then_wrapped([this] (auto fut) { + // satisfy the handshake's promise + fut.forward_to(std::move(h.promise)); + }); } seastar::future<> SocketConnection::server_handshake() { - return seastar::now(); // TODO + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(my_addr, bl, 0); + ::encode(peer_addr, bl, 0); + return out.write(std::move(bl)) + .then([this] { return out.flush(); }) + .then([this] { + // read client's handshake header and connect request + return read(client_header_size + sizeof(h.connect)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ::decode(h.connect, p); + assert(p.end()); + // TODO: read authorizer + + return handle_connect(); + }).then_wrapped([this] (auto fut) { + // satisfy the handshake's promise + fut.forward_to(std::move(h.promise)); + }); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index ff77f0dd336..de0062b7c30 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -35,6 +35,18 @@ class SocketConnection : public Connection { /// read the requested number of bytes into a bufferlist seastar::future<bufferlist> read(size_t bytes); + /// state for handshake + struct Handshake { + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + seastar::promise<> promise; + } h; + + /// server side of handshake negotiation + seastar::future<> handle_connect(); + /// client side of handshake negotiation + seastar::future<> handle_connect_reply(); + /// state for an incoming message struct MessageReader { ceph_msg_header header; |