summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_kafka.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_kafka.cc')
-rw-r--r--src/rgw/rgw_kafka.cc166
1 files changed, 104 insertions, 62 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc
index 4f7751ae6c6..dfaefdfb270 100644
--- a/src/rgw/rgw_kafka.cc
+++ b/src/rgw/rgw_kafka.cc
@@ -1,13 +1,12 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
-//#include "include/compat.h"
#include "rgw_kafka.h"
+#include "rgw_url.h"
#include <librdkafka/rdkafka.h>
#include "include/ceph_assert.h"
#include <sstream>
#include <cstring>
-#include <regex>
#include <unordered_map>
#include <string>
#include <vector>
@@ -32,17 +31,14 @@ bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
namespace rgw::kafka {
// status codes for publishing
+// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
static const int STATUS_CONNECTION_CLOSED = -0x1002;
static const int STATUS_QUEUE_FULL = -0x1003;
static const int STATUS_MAX_INFLIGHT = -0x1004;
static const int STATUS_MANAGER_STOPPED = -0x1005;
// status code for connection opening
-static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
-static const int STATUS_GET_BROKER_LIST_FAILED = -0x2002;
-static const int STATUS_CREATE_PRODUCER_FAILED = -0x2003;
+static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
-static const int STATUS_CREATE_TOPIC_FAILED = -0x3008;
-static const int NO_REPLY_CODE = 0x0;
static const int STATUS_OK = 0x0;
// struct for holding the callback and its tag in the callback list
@@ -70,8 +66,14 @@ struct connection_t {
uint64_t delivery_tag = 1;
int status;
mutable std::atomic<int> ref_count = 0;
- CephContext* cct = nullptr;
+ CephContext* const cct;
CallbackList callbacks;
+ const std::string broker;
+ const bool use_ssl;
+ const bool verify_ssl; // TODO currently iognored, not supported in librdkafka v0.11.6
+ const boost::optional<std::string> ca_location;
+ const std::string user;
+ const std::string password;
// cleanup of all internal connection resource
// the object can still remain, and internal connection
@@ -102,6 +104,12 @@ struct connection_t {
return (producer != nullptr && !marked_for_deletion);
}
+ // ctor for setting immutable values
+ connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
+ const boost::optional<const std::string&>& _ca_location,
+ const std::string& _user, const std::string& _password) :
+ cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
+
// dtor also destroys the internals
~connection_t() {
destroy(STATUS_CONNECTION_CLOSED);
@@ -111,6 +119,13 @@ struct connection_t {
friend void intrusive_ptr_release(const connection_t* p);
};
+std::string to_string(const connection_ptr_t& conn) {
+ std::string str;
+ str += "\nBroker: " + conn->broker;
+ str += conn->use_ssl ? "\nUse SSL" : "";
+ str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
+ return str;
+}
// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
void intrusive_ptr_add_ref(const connection_t* p) {
++p->ref_count;
@@ -124,6 +139,8 @@ void intrusive_ptr_release(const connection_t* p) {
// convert int status to string - including RGW specific values
std::string status_to_string(int s) {
switch (s) {
+ case STATUS_OK:
+ return "STATUS_OK";
case STATUS_CONNECTION_CLOSED:
return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
case STATUS_QUEUE_FULL:
@@ -134,13 +151,8 @@ std::string status_to_string(int s) {
return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
case STATUS_CONF_ALLOC_FAILED:
return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
- case STATUS_CREATE_PRODUCER_FAILED:
- return "STATUS_CREATE_PRODUCER_FAILED";
- case STATUS_CREATE_TOPIC_FAILED:
- return "STATUS_CREATE_TOPIC_FAILED";
}
- // TODO: how to handle "s" in this case?
- return std::string(rd_kafka_err2str(rd_kafka_last_error()));
+ return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
}
void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
@@ -160,7 +172,7 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
if (tag_it != callbacks_end) {
ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
- *tag << " and result=" << result << dendl;
+ *tag << " and result=" << rd_kafka_err2str(result) << dendl;
tag_it->cb(result);
conn->callbacks.erase(tag_it);
} else {
@@ -173,14 +185,13 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
}
// utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& broker) {
+connection_ptr_t& create_connection(connection_ptr_t& conn) {
// pointer must be valid and not marked for deletion
ceph_assert(conn && !conn->marked_for_deletion);
// reset all status codes
conn->status = STATUS_OK;
-
- char errstr[512];
+ char errstr[512] = {0};
conn->temp_conf = rd_kafka_conf_new();
if (!conn->temp_conf) {
@@ -189,38 +200,68 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const std::string& b
}
// get list of brokers based on the bootsrap broker
- if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- conn->status = STATUS_GET_BROKER_LIST_FAILED;
- // TODO: use errstr
- return conn;
+ if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
+ if (conn->use_ssl) {
+ if (!conn->user.empty()) {
+ // use SSL+SASL
+ if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
+ } else {
+ // use only SSL
+ if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
+ }
+ if (conn->ca_location) {
+ if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
+ } else {
+ ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
+ }
+ // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
+ // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
}
// set the global callback for delivery success/fail
rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
// set the global opaque pointer to be the connection itself
- rd_kafka_conf_set_opaque (conn->temp_conf, conn.get());
+ rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
// create the producer
conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
- if (conn->producer) {
- conn->status = STATUS_CREATE_PRODUCER_FAILED;
- // TODO: use errstr
+ if (!conn->producer) {
+ conn->status = rd_kafka_last_error();
+ ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
return conn;
}
+ ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
// conf ownership passed to producer
conn->temp_conf = nullptr;
return conn;
-}
+conf_error:
+ conn->status = rd_kafka_last_error();
+ ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
+ return conn;
+}
// utility function to create a new connection
-connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct) {
+connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ const std::string& user,
+ const std::string& password) {
// create connection state
- connection_ptr_t conn = new connection_t;
- conn->cct = cct;
- return create_connection(conn, broker);
+ connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password));
+ return create_connection(conn);
}
/// struct used for holding messages in the message queue
@@ -236,22 +277,6 @@ struct message_wrapper_t {
reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
};
-// parse a URL of the form: kafka://<host>[:port]
-// to a: host[:port]
-int parse_url(const std::string& url, std::string& broker) {
- std::regex url_regex (
- R"(^(([^:\/?#]+):)?(//([^\/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?)",
- std::regex::extended
- );
- const auto HOST_AND_PORT = 4;
- std::smatch url_match_result;
- if (std::regex_match(url, url_match_result, url_regex)) {
- broker = url_match_result[HOST_AND_PORT];
- return 0;
- }
- return -1;
-}
-
typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
@@ -290,9 +315,9 @@ private:
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
// TODO add error stats
- ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue" << dendl;
+ ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl;
if (message->cb) {
- message->cb(STATUS_CONNECTION_CLOSED);
+ message->cb(conn->status);
}
return;
}
@@ -303,11 +328,12 @@ private:
if (topic_it == conn->topics.end()) {
topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
if (!topic) {
- ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << dendl;
+ const auto err = rd_kafka_last_error();
+ ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
if (message->cb) {
- message->cb(STATUS_CREATE_TOPIC_FAILED);
+ message->cb(err);
}
- conn->destroy(STATUS_CREATE_TOPIC_FAILED);
+ conn->destroy(err);
return;
}
// TODO use the topics list as an LRU cache
@@ -337,12 +363,13 @@ private:
tag);
if (rc == -1) {
const auto err = rd_kafka_last_error();
- ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
- // TODO: dont error on full queue, retry instead
+ ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
+ // TODO: dont error on full queue, and don't destroy connection, retry instead
// immediatly invoke callback on error if needed
if (message->cb) {
message->cb(err);
}
+ conn->destroy(err);
delete tag;
}
@@ -352,7 +379,7 @@ private:
ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
conn->callbacks.emplace_back(*tag, message->cb);
} else {
- // immediately invoke callback with error
+ // immediately invoke callback with error - this is not a connection error
ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
message->cb(STATUS_MAX_INFLIGHT);
// tag will be deleted when the global callback is invoked
@@ -400,9 +427,10 @@ private:
// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
+ ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
const auto& broker = conn_it->first;
ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
- if (create_connection(conn, broker)->is_ok() == false) {
+ if (create_connection(conn)->is_ok() == false) {
ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
@@ -477,7 +505,10 @@ public:
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url) {
+ connection_ptr_t connect(const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (stopped) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
@@ -485,14 +516,25 @@ public:
}
std::string broker;
- if (0 != parse_url(url, broker)) {
+ std::string user;
+ std::string password;
+ if (!parse_url_authority(url, broker, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
return nullptr;
}
+ // this should be validated by the regex in parse_url()
+ ceph_assert(user.empty() == password.empty());
+
+ if (!user.empty() && !use_ssl) {
+ ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
+ return nullptr;
+ }
+
std::lock_guard lock(connections_lock);
const auto it = connections.find(broker);
+ // note that ssl vs. non-ssl connection to the same host are two separate conenctions
if (it != connections.end()) {
if (it->second->marked_for_deletion) {
// TODO: increment counter
@@ -510,14 +552,13 @@ public:
ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
return nullptr;
}
- const auto conn = create_new_connection(broker, cct);
+ const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password);
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
ceph_assert(conn);
++connection_count;
ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
- ldout(cct, 10) << "Kafka connect: new connection status is: " << status_to_string(conn->status) << dendl;
return connections.emplace(broker, conn).first->second;
}
@@ -613,9 +654,10 @@ void shutdown() {
s_manager = nullptr;
}
-connection_ptr_t connect(const std::string& url) {
+connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
+ boost::optional<const std::string&> ca_location) {
if (!s_manager) return nullptr;
- return s_manager->connect(url);
+ return s_manager->connect(url, use_ssl, verify_ssl, ca_location);
}
int publish(connection_ptr_t& conn,