diff options
Diffstat (limited to 'src/rgw/rgw_kafka.cc')
-rw-r--r-- | src/rgw/rgw_kafka.cc | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index c0ec3dc2c55..b38b1a78ec4 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -13,6 +13,7 @@ #include <thread> #include <atomic> #include <mutex> +#include <boost/algorithm/string.hpp> #include <boost/functional/hash.hpp> #include <boost/lockfree/queue.hpp> #include "common/dout.h" @@ -503,6 +504,7 @@ private: } void run() noexcept { + ceph_pthread_setname("kafka_manager"); while (!stopped) { // publish all messages in the queue @@ -575,12 +577,6 @@ public: // This is to prevent rehashing so that iterators are not invalidated // when a new connection is added. connections.max_load_factor(10.0); - // give the runner thread a name for easier debugging - const char* thread_name = "kafka_manager"; - if (const auto rc = ceph_pthread_setname(runner.native_handle(), thread_name); rc != 0) { - ldout(cct, 1) << "ERROR: failed to set kafka manager thread name to: " << thread_name - << ". error: " << rc << dendl; - } } // non copyable @@ -600,7 +596,8 @@ public: boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism, boost::optional<const std::string&> topic_user_name, - boost::optional<const std::string&> topic_password) { + boost::optional<const std::string&> topic_password, + boost::optional<const std::string&> brokers) { if (stopped) { ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; return false; @@ -608,8 +605,8 @@ public: std::string user; std::string password; - std::string broker; - if (!parse_url_authority(url, broker, user, password)) { + std::string broker_list; + if (!parse_url_authority(url, broker_list, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; return false; @@ -637,7 +634,13 @@ public: ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl; return false; } - connection_id_t tmp_id(broker, user, password, ca_location, mechanism, + + if (brokers.has_value()) { + broker_list.append(","); + broker_list.append(brokers.get()); + } + + connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism, use_ssl); std::lock_guard lock(connections_lock); const auto it = connections.find(tmp_id); @@ -657,7 +660,7 @@ public: return false; } - auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism); + auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism); if (!new_producer(conn.get())) { ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl; return false; @@ -775,11 +778,12 @@ bool connect(connection_id_t& conn_id, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism, boost::optional<const std::string&> user_name, - boost::optional<const std::string&> password) { + boost::optional<const std::string&> password, + boost::optional<const std::string&> brokers) { std::shared_lock lock(s_manager_mutex); if (!s_manager) return false; return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location, - mechanism, user_name, password); + mechanism, user_name, password, brokers); } int publish(const connection_id_t& conn_id, |