diff options
author | Yuval Lifshitz <yuvalif@yahoo.com> | 2019-11-03 19:58:58 +0100 |
---|---|---|
committer | Yuval Lifshitz <yuvalif@yahoo.com> | 2020-01-08 13:11:53 +0100 |
commit | 529d5c63623a02c05ee70e9e8658b9ade05469ff (patch) | |
tree | 1bed63bfb8c8bf4629bd4afd16b3a366ebc6a351 /src/rgw/rgw_kafka.cc | |
parent | Merge pull request #32474 from kalebskeithley/fedora32 (diff) | |
download | ceph-529d5c63623a02c05ee70e9e8658b9ade05469ff.tar.xz ceph-529d5c63623a02c05ee70e9e8658b9ade05469ff.zip |
rgw/pubsub: add ssl+sasl security to kafka
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_kafka.cc')
-rw-r--r-- | src/rgw/rgw_kafka.cc | 166 |
1 files changed, 104 insertions, 62 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 4f7751ae6c6..dfaefdfb270 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -1,13 +1,12 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp -//#include "include/compat.h" #include "rgw_kafka.h" +#include "rgw_url.h" #include <librdkafka/rdkafka.h> #include "include/ceph_assert.h" #include <sstream> #include <cstring> -#include <regex> #include <unordered_map> #include <string> #include <vector> @@ -32,17 +31,14 @@ bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { namespace rgw::kafka { // status codes for publishing +// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client static const int STATUS_CONNECTION_CLOSED = -0x1002; static const int STATUS_QUEUE_FULL = -0x1003; static const int STATUS_MAX_INFLIGHT = -0x1004; static const int STATUS_MANAGER_STOPPED = -0x1005; // status code for connection opening -static const int STATUS_CONF_ALLOC_FAILED = -0x2001; -static const int STATUS_GET_BROKER_LIST_FAILED = -0x2002; -static const int STATUS_CREATE_PRODUCER_FAILED = -0x2003; +static const int STATUS_CONF_ALLOC_FAILED = -0x2001; -static const int STATUS_CREATE_TOPIC_FAILED = -0x3008; -static const int NO_REPLY_CODE = 0x0; static const int STATUS_OK = 0x0; // struct for holding the callback and its tag in the callback list @@ -70,8 +66,14 @@ struct connection_t { uint64_t delivery_tag = 1; int status; mutable std::atomic<int> ref_count = 0; - CephContext* cct = nullptr; + CephContext* const cct; CallbackList callbacks; + const std::string broker; + const bool use_ssl; + const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6 + const boost::optional<std::string> ca_location; + const std::string user; + const std::string password; // cleanup of all internal connection resource // the object can still remain, and internal connection @@ -102,6 +104,12 @@ struct connection_t { return (producer != nullptr && !marked_for_deletion); } + // ctor for setting immutable values + connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, + const boost::optional<const std::string&>& _ca_location, + const std::string& _user, const std::string& _password) : + cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {} + // dtor also destroys the internals ~connection_t() { destroy(STATUS_CONNECTION_CLOSED); @@ -111,6 +119,13 @@ struct connection_t { friend void intrusive_ptr_release(const connection_t* p); }; +std::string to_string(const connection_ptr_t& conn) { + std::string str; + str += "\nBroker: " + conn->broker; + str += conn->use_ssl ? "\nUse SSL" : ""; + str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : ""; + return str; +} // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr void intrusive_ptr_add_ref(const connection_t* p) { ++p->ref_count; @@ -124,6 +139,8 @@ void intrusive_ptr_release(const connection_t* p) { // convert int status to string - including RGW specific values std::string status_to_string(int s) { switch (s) { + case STATUS_OK: + return "STATUS_OK"; case STATUS_CONNECTION_CLOSED: return "RGW_KAFKA_STATUS_CONNECTION_CLOSED"; case STATUS_QUEUE_FULL: @@ -134,13 +151,8 @@ std::string status_to_string(int s) { return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; case STATUS_CONF_ALLOC_FAILED: return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; - case STATUS_CREATE_PRODUCER_FAILED: - return "STATUS_CREATE_PRODUCER_FAILED"; - case STATUS_CREATE_TOPIC_FAILED: - return "STATUS_CREATE_TOPIC_FAILED"; } - // TODO: how to handle "s" in this case? - return std::string(rd_kafka_err2str(rd_kafka_last_error())); + return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); } void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) { @@ -160,7 +172,7 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag); if (tag_it != callbacks_end) { ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << - *tag << " and result=" << result << dendl; + *tag << " and result=" << rd_kafka_err2str(result) << dendl; tag_it->cb(result); conn->callbacks.erase(tag_it); } else { @@ -173,14 +185,13 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* } // utility function to create a connection, when the connection object already exists -connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& broker) { +connection_ptr_t& create_connection(connection_ptr_t& conn) { // pointer must be valid and not marked for deletion ceph_assert(conn && !conn->marked_for_deletion); // reset all status codes conn->status = STATUS_OK; - - char errstr[512]; + char errstr[512] = {0}; conn->temp_conf = rd_kafka_conf_new(); if (!conn->temp_conf) { @@ -189,38 +200,68 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& b } // get list of brokers based on the bootsrap broker - if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { - conn->status = STATUS_GET_BROKER_LIST_FAILED; - // TODO: use errstr - return conn; + if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + + if (conn->use_ssl) { + if (!conn->user.empty()) { + // use SSL+SASL + if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl; + } else { + // use only SSL + if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl; + } + if (conn->ca_location) { + if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl; + } else { + ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl; + } + // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call + // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + + ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl; } // set the global callback for delivery success/fail rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback); // set the global opaque pointer to be the connection itself - rd_kafka_conf_set_opaque (conn->temp_conf, conn.get()); + rd_kafka_conf_set_opaque(conn->temp_conf, conn.get()); // create the producer conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); - if (conn->producer) { - conn->status = STATUS_CREATE_PRODUCER_FAILED; - // TODO: use errstr + if (!conn->producer) { + conn->status = rd_kafka_last_error(); + ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl; return conn; } + ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl; // conf ownership passed to producer conn->temp_conf = nullptr; return conn; -} +conf_error: + conn->status = rd_kafka_last_error(); + ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl; + return conn; +} // utility function to create a new connection -connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct) { +connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct, + bool use_ssl, + bool verify_ssl, + boost::optional<const std::string&> ca_location, + const std::string& user, + const std::string& password) { // create connection state - connection_ptr_t conn = new connection_t; - conn->cct = cct; - return create_connection(conn, broker); + connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password)); + return create_connection(conn); } /// struct used for holding messages in the message queue @@ -236,22 +277,6 @@ struct message_wrapper_t { reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {} }; -// parse a URL of the form: kafka://<host>[:port] -// to a: host[:port] -int parse_url(const std::string& url, std::string& broker) { - std::regex url_regex ( - R"(^(([^:\/?#]+):)?(//([^\/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?)", - std::regex::extended - ); - const auto HOST_AND_PORT = 4; - std::smatch url_match_result; - if (std::regex_match(url, url_match_result, url_regex)) { - broker = url_match_result[HOST_AND_PORT]; - return 0; - } - return -1; -} - typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList; typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue; @@ -290,9 +315,9 @@ private: if (!conn->is_ok()) { // connection had an issue while message was in the queue // TODO add error stats - ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue" << dendl; + ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl; if (message->cb) { - message->cb(STATUS_CONNECTION_CLOSED); + message->cb(conn->status); } return; } @@ -303,11 +328,12 @@ private: if (topic_it == conn->topics.end()) { topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); if (!topic) { - ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << dendl; + const auto err = rd_kafka_last_error(); + ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl; if (message->cb) { - message->cb(STATUS_CREATE_TOPIC_FAILED); + message->cb(err); } - conn->destroy(STATUS_CREATE_TOPIC_FAILED); + conn->destroy(err); return; } // TODO use the topics list as an LRU cache @@ -337,12 +363,13 @@ private: tag); if (rc == -1) { const auto err = rd_kafka_last_error(); - ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; - // TODO: dont error on full queue, retry instead + ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; + // TODO: dont error on full queue, and don't destroy connection, retry instead // immediatly invoke callback on error if needed if (message->cb) { message->cb(err); } + conn->destroy(err); delete tag; } @@ -352,7 +379,7 @@ private: ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl; conn->callbacks.emplace_back(*tag, message->cb); } else { - // immediately invoke callback with error + // immediately invoke callback with error - this is not a connection error ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl; message->cb(STATUS_MAX_INFLIGHT); // tag will be deleted when the global callback is invoked @@ -400,9 +427,10 @@ private: // try to reconnect the connection if it has an error if (!conn->is_ok()) { + ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl; const auto& broker = conn_it->first; ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl; - if (create_connection(conn, broker)->is_ok() == false) { + if (create_connection(conn)->is_ok() == false) { ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl; // TODO: add error counter for failed retries // TODO: add exponential backoff for retries @@ -477,7 +505,10 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url) { + connection_ptr_t connect(const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional<const std::string&> ca_location) { if (stopped) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; @@ -485,14 +516,25 @@ public: } std::string broker; - if (0 != parse_url(url, broker)) { + std::string user; + std::string password; + if (!parse_url_authority(url, broker, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; return nullptr; } + // this should be validated by the regex in parse_url() + ceph_assert(user.empty() == password.empty()); + + if (!user.empty() && !use_ssl) { + ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; + return nullptr; + } + std::lock_guard lock(connections_lock); const auto it = connections.find(broker); + // note that ssl vs. non-ssl connection to the same host are two separate conenctions if (it != connections.end()) { if (it->second->marked_for_deletion) { // TODO: increment counter @@ -510,14 +552,13 @@ public: ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; return nullptr; } - const auto conn = create_new_connection(broker, cct); + const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password); // create_new_connection must always return a connection object // even if error occurred during creation. // in such a case the creation will be retried in the main thread ceph_assert(conn); ++connection_count; ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; - ldout(cct, 10) << "Kafka connect: new connection status is: " << status_to_string(conn->status) << dendl; return connections.emplace(broker, conn).first->second; } @@ -613,9 +654,10 @@ void shutdown() { s_manager = nullptr; } -connection_ptr_t connect(const std::string& url) { +connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, + boost::optional<const std::string&> ca_location) { if (!s_manager) return nullptr; - return s_manager->connect(url); + return s_manager->connect(url, use_ssl, verify_ssl, ca_location); } int publish(connection_ptr_t& conn, |