summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_kafka.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_kafka.cc')
-rw-r--r--src/rgw/rgw_kafka.cc30
1 files changed, 17 insertions, 13 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc
index c0ec3dc2c55..b38b1a78ec4 100644
--- a/src/rgw/rgw_kafka.cc
+++ b/src/rgw/rgw_kafka.cc
@@ -13,6 +13,7 @@
#include <thread>
#include <atomic>
#include <mutex>
+#include <boost/algorithm/string.hpp>
#include <boost/functional/hash.hpp>
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"
@@ -503,6 +504,7 @@ private:
}
void run() noexcept {
+ ceph_pthread_setname("kafka_manager");
while (!stopped) {
// publish all messages in the queue
@@ -575,12 +577,6 @@ public:
// This is to prevent rehashing so that iterators are not invalidated
// when a new connection is added.
connections.max_load_factor(10.0);
- // give the runner thread a name for easier debugging
- const char* thread_name = "kafka_manager";
- if (const auto rc = ceph_pthread_setname(runner.native_handle(), thread_name); rc != 0) {
- ldout(cct, 1) << "ERROR: failed to set kafka manager thread name to: " << thread_name
- << ". error: " << rc << dendl;
- }
}
// non copyable
@@ -600,7 +596,8 @@ public:
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> topic_user_name,
- boost::optional<const std::string&> topic_password) {
+ boost::optional<const std::string&> topic_password,
+ boost::optional<const std::string&> brokers) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
@@ -608,8 +605,8 @@ public:
std::string user;
std::string password;
- std::string broker;
- if (!parse_url_authority(url, broker, user, password)) {
+ std::string broker_list;
+ if (!parse_url_authority(url, broker_list, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
return false;
@@ -637,7 +634,13 @@ public:
ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
return false;
}
- connection_id_t tmp_id(broker, user, password, ca_location, mechanism,
+
+ if (brokers.has_value()) {
+ broker_list.append(",");
+ broker_list.append(brokers.get());
+ }
+
+ connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
use_ssl);
std::lock_guard lock(connections_lock);
const auto it = connections.find(tmp_id);
@@ -657,7 +660,7 @@ public:
return false;
}
- auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+ auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism);
if (!new_producer(conn.get())) {
ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
return false;
@@ -775,11 +778,12 @@ bool connect(connection_id_t& conn_id,
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
- boost::optional<const std::string&> password) {
+ boost::optional<const std::string&> password,
+ boost::optional<const std::string&> brokers) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
- mechanism, user_name, password);
+ mechanism, user_name, password, brokers);
}
int publish(const connection_id_t& conn_id,