summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_kafka.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_kafka.cc')
-rw-r--r--src/rgw/rgw_kafka.cc59
1 files changed, 32 insertions, 27 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc
index d3586e4900b..b38b1a78ec4 100644
--- a/src/rgw/rgw_kafka.cc
+++ b/src/rgw/rgw_kafka.cc
@@ -13,17 +13,13 @@
#include <thread>
#include <atomic>
#include <mutex>
+#include <boost/algorithm/string.hpp>
#include <boost/functional/hash.hpp>
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"
#define dout_subsys ceph_subsys_rgw_notification
-// comparison operator between topic pointer and name
-bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
- return name == std::string_view(rd_kafka_topic_name(rkt));
-}
-
// this is the inverse of rd_kafka_errno2err
// see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c
inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) {
@@ -141,9 +137,19 @@ struct reply_callback_with_tag_t {
typedef std::vector<reply_callback_with_tag_t> CallbackList;
+
+
struct connection_t {
rd_kafka_t* producer = nullptr;
- std::vector<rd_kafka_topic_t*> topics;
+
+ struct rd_kafka_topic_deleter {
+ void operator()(rd_kafka_topic_t* topic) {
+ rd_kafka_topic_destroy(topic);
+ }
+ };
+ using topic_ptr = std::unique_ptr<rd_kafka_topic_t, rd_kafka_topic_deleter>;
+ std::map<std::string, topic_ptr> topics;
+
uint64_t delivery_tag = 1;
int status = 0;
CephContext* const cct;
@@ -168,7 +174,6 @@ struct connection_t {
// wait for 500ms to try and handle pending callbacks
rd_kafka_flush(producer, 500);
// destroy all topics
- std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
topics.clear();
// destroy producer
rd_kafka_destroy(producer);
@@ -431,10 +436,9 @@ private:
}
// create a new topic unless it was already created
- auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
- rd_kafka_topic_t* topic = nullptr;
+ auto topic_it = conn->topics.find(message->topic);
if (topic_it == conn->topics.end()) {
- topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
+ connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr));
if (!topic) {
const auto err = rd_kafka_last_error();
ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
@@ -444,17 +448,15 @@ private:
}
return;
}
- // TODO use the topics list as an LRU cache
- conn->topics.push_back(topic);
+ topic_it = conn->topics.emplace(message->topic, std::move(topic)).first;
ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
} else {
- topic = *topic_it;
ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
}
const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
const auto rc = rd_kafka_produce(
- topic,
+ topic_it->second.get(),
// TODO: non builtin partitioning
RD_KAFKA_PARTITION_UA,
// make a copy of the payload
@@ -502,6 +504,7 @@ private:
}
void run() noexcept {
+ ceph_pthread_setname("kafka_manager");
while (!stopped) {
// publish all messages in the queue
@@ -574,12 +577,6 @@ public:
// This is to prevent rehashing so that iterators are not invalidated
// when a new connection is added.
connections.max_load_factor(10.0);
- // give the runner thread a name for easier debugging
- const char* thread_name = "kafka_manager";
- if (const auto rc = ceph_pthread_setname(runner.native_handle(), thread_name); rc != 0) {
- ldout(cct, 1) << "ERROR: failed to set kafka manager thread name to: " << thread_name
- << ". error: " << rc << dendl;
- }
}
// non copyable
@@ -599,7 +596,8 @@ public:
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> topic_user_name,
- boost::optional<const std::string&> topic_password) {
+ boost::optional<const std::string&> topic_password,
+ boost::optional<const std::string&> brokers) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
@@ -607,8 +605,8 @@ public:
std::string user;
std::string password;
- std::string broker;
- if (!parse_url_authority(url, broker, user, password)) {
+ std::string broker_list;
+ if (!parse_url_authority(url, broker_list, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
return false;
@@ -636,7 +634,13 @@ public:
ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
return false;
}
- connection_id_t tmp_id(broker, user, password, ca_location, mechanism,
+
+ if (brokers.has_value()) {
+ broker_list.append(",");
+ broker_list.append(brokers.get());
+ }
+
+ connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
use_ssl);
std::lock_guard lock(connections_lock);
const auto it = connections.find(tmp_id);
@@ -656,7 +660,7 @@ public:
return false;
}
- auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+ auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism);
if (!new_producer(conn.get())) {
ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
return false;
@@ -774,11 +778,12 @@ bool connect(connection_id_t& conn_id,
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
- boost::optional<const std::string&> password) {
+ boost::optional<const std::string&> password,
+ boost::optional<const std::string&> brokers) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
- mechanism, user_name, password);
+ mechanism, user_name, password, brokers);
}
int publish(const connection_id_t& conn_id,