diff options
Diffstat (limited to 'src/rgw/rgw_kafka.cc')
-rw-r--r-- | src/rgw/rgw_kafka.cc | 59 |
1 files changed, 32 insertions, 27 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index d3586e4900b..b38b1a78ec4 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -13,17 +13,13 @@ #include <thread> #include <atomic> #include <mutex> +#include <boost/algorithm/string.hpp> #include <boost/functional/hash.hpp> #include <boost/lockfree/queue.hpp> #include "common/dout.h" #define dout_subsys ceph_subsys_rgw_notification -// comparison operator between topic pointer and name -bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { - return name == std::string_view(rd_kafka_topic_name(rkt)); -} - // this is the inverse of rd_kafka_errno2err // see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) { @@ -141,9 +137,19 @@ struct reply_callback_with_tag_t { typedef std::vector<reply_callback_with_tag_t> CallbackList; + + struct connection_t { rd_kafka_t* producer = nullptr; - std::vector<rd_kafka_topic_t*> topics; + + struct rd_kafka_topic_deleter { + void operator()(rd_kafka_topic_t* topic) { + rd_kafka_topic_destroy(topic); + } + }; + using topic_ptr = std::unique_ptr<rd_kafka_topic_t, rd_kafka_topic_deleter>; + std::map<std::string, topic_ptr> topics; + uint64_t delivery_tag = 1; int status = 0; CephContext* const cct; @@ -168,7 +174,6 @@ struct connection_t { // wait for 500ms to try and handle pending callbacks rd_kafka_flush(producer, 500); // destroy all topics - std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);}); topics.clear(); // destroy producer rd_kafka_destroy(producer); @@ -431,10 +436,9 @@ private: } // create a new topic unless it was already created - auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic); - rd_kafka_topic_t* topic = nullptr; + auto topic_it = conn->topics.find(message->topic); if (topic_it == conn->topics.end()) { - topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); + connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr)); if (!topic) { const auto err = rd_kafka_last_error(); ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " @@ -444,17 +448,15 @@ private: } return; } - // TODO use the topics list as an LRU cache - conn->topics.push_back(topic); + topic_it = conn->topics.emplace(message->topic, std::move(topic)).first; ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl; } else { - topic = *topic_it; ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl; } const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++)); const auto rc = rd_kafka_produce( - topic, + topic_it->second.get(), // TODO: non builtin partitioning RD_KAFKA_PARTITION_UA, // make a copy of the payload @@ -502,6 +504,7 @@ private: } void run() noexcept { + ceph_pthread_setname("kafka_manager"); while (!stopped) { // publish all messages in the queue @@ -574,12 +577,6 @@ public: // This is to prevent rehashing so that iterators are not invalidated // when a new connection is added. connections.max_load_factor(10.0); - // give the runner thread a name for easier debugging - const char* thread_name = "kafka_manager"; - if (const auto rc = ceph_pthread_setname(runner.native_handle(), thread_name); rc != 0) { - ldout(cct, 1) << "ERROR: failed to set kafka manager thread name to: " << thread_name - << ". error: " << rc << dendl; - } } // non copyable @@ -599,7 +596,8 @@ public: boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism, boost::optional<const std::string&> topic_user_name, - boost::optional<const std::string&> topic_password) { + boost::optional<const std::string&> topic_password, + boost::optional<const std::string&> brokers) { if (stopped) { ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; return false; @@ -607,8 +605,8 @@ public: std::string user; std::string password; - std::string broker; - if (!parse_url_authority(url, broker, user, password)) { + std::string broker_list; + if (!parse_url_authority(url, broker_list, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; return false; @@ -636,7 +634,13 @@ public: ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; return false; } - connection_id_t tmp_id(broker, user, password, ca_location, mechanism, + + if (brokers.has_value()) { + broker_list.append(","); + broker_list.append(brokers.get()); + } + + connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism, use_ssl); std::lock_guard lock(connections_lock); const auto it = connections.find(tmp_id); @@ -656,7 +660,7 @@ public: return false; } - auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism); + auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism); if (!new_producer(conn.get())) { ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl; return false; @@ -774,11 +778,12 @@ bool connect(connection_id_t& conn_id, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism, boost::optional<const std::string&> user_name, - boost::optional<const std::string&> password) { + boost::optional<const std::string&> password, + boost::optional<const std::string&> brokers) { std::shared_lock lock(s_manager_mutex); if (!s_manager) return false; return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location, - mechanism, user_name, password); + mechanism, user_name, password, brokers); } int publish(const connection_id_t& conn_id, |