summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_kafka.cc
diff options
context:
space:
mode:
authorYuval Lifshitz <ylifshit@ibm.com>2024-09-11 17:12:22 +0200
committerYuval Lifshitz <ylifshit@ibm.com>2024-09-11 17:12:22 +0200
commit80beab8eb2cf0c496019c9850c3b979127d24c63 (patch)
tree64ee240107408c760f0a2452afbb8b949c8df93d /src/rgw/rgw_kafka.cc
parentMerge pull request #59066 from xxhdx1985126/wip-67380 (diff)
downloadceph-80beab8eb2cf0c496019c9850c3b979127d24c63.tar.xz
ceph-80beab8eb2cf0c496019c9850c3b979127d24c63.zip
rgw/kafka: refactor topic creation to avoid rd_kafka_topic_name()
also, use unique_ptr for safer memory management. Fixes: https://tracker.ceph.com/issues/68033 Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
Diffstat (limited to 'src/rgw/rgw_kafka.cc')
-rw-r--r--src/rgw/rgw_kafka.cc29
1 files changed, 15 insertions, 14 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc
index d3586e4900b..c0ec3dc2c55 100644
--- a/src/rgw/rgw_kafka.cc
+++ b/src/rgw/rgw_kafka.cc
@@ -19,11 +19,6 @@
#define dout_subsys ceph_subsys_rgw_notification
-// comparison operator between topic pointer and name
-bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
- return name == std::string_view(rd_kafka_topic_name(rkt));
-}
-
// this is the inverse of rd_kafka_errno2err
// see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c
inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) {
@@ -141,9 +136,19 @@ struct reply_callback_with_tag_t {
typedef std::vector<reply_callback_with_tag_t> CallbackList;
+
+
struct connection_t {
rd_kafka_t* producer = nullptr;
- std::vector<rd_kafka_topic_t*> topics;
+
+ struct rd_kafka_topic_deleter {
+ void operator()(rd_kafka_topic_t* topic) {
+ rd_kafka_topic_destroy(topic);
+ }
+ };
+ using topic_ptr = std::unique_ptr<rd_kafka_topic_t, rd_kafka_topic_deleter>;
+ std::map<std::string, topic_ptr> topics;
+
uint64_t delivery_tag = 1;
int status = 0;
CephContext* const cct;
@@ -168,7 +173,6 @@ struct connection_t {
// wait for 500ms to try and handle pending callbacks
rd_kafka_flush(producer, 500);
// destroy all topics
- std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
topics.clear();
// destroy producer
rd_kafka_destroy(producer);
@@ -431,10 +435,9 @@ private:
}
// create a new topic unless it was already created
- auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic);
- rd_kafka_topic_t* topic = nullptr;
+ auto topic_it = conn->topics.find(message->topic);
if (topic_it == conn->topics.end()) {
- topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
+ connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr));
if (!topic) {
const auto err = rd_kafka_last_error();
ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
@@ -444,17 +447,15 @@ private:
}
return;
}
- // TODO use the topics list as an LRU cache
- conn->topics.push_back(topic);
+ topic_it = conn->topics.emplace(message->topic, std::move(topic)).first;
ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl;
} else {
- topic = *topic_it;
ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl;
}
const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++));
const auto rc = rd_kafka_produce(
- topic,
+ topic_it->second.get(),
// TODO: non builtin partitioning
RD_KAFKA_PARTITION_UA,
// make a copy of the payload