diff options
author | Yuval Lifshitz <ylifshit@ibm.com> | 2024-09-11 17:12:22 +0200 |
---|---|---|
committer | Yuval Lifshitz <ylifshit@ibm.com> | 2024-09-11 17:12:22 +0200 |
commit | 80beab8eb2cf0c496019c9850c3b979127d24c63 (patch) | |
tree | 64ee240107408c760f0a2452afbb8b949c8df93d | |
parent | Merge pull request #59066 from xxhdx1985126/wip-67380 (diff) | |
download | ceph-80beab8eb2cf0c496019c9850c3b979127d24c63.tar.xz ceph-80beab8eb2cf0c496019c9850c3b979127d24c63.zip |
rgw/kafka: refactor topic creation to avoid rd_kafka_topic_name()
also, use unique_ptr for safer memory management.
Fixes: https://tracker.ceph.com/issues/68033
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
-rw-r--r-- | src/rgw/rgw_kafka.cc | 29 |
1 files changed, 15 insertions, 14 deletions
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index d3586e4900b..c0ec3dc2c55 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -19,11 +19,6 @@ #define dout_subsys ceph_subsys_rgw_notification -// comparison operator between topic pointer and name -bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { - return name == std::string_view(rd_kafka_topic_name(rkt)); -} - // this is the inverse of rd_kafka_errno2err // see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) { @@ -141,9 +136,19 @@ struct reply_callback_with_tag_t { typedef std::vector<reply_callback_with_tag_t> CallbackList; + + struct connection_t { rd_kafka_t* producer = nullptr; - std::vector<rd_kafka_topic_t*> topics; + + struct rd_kafka_topic_deleter { + void operator()(rd_kafka_topic_t* topic) { + rd_kafka_topic_destroy(topic); + } + }; + using topic_ptr = std::unique_ptr<rd_kafka_topic_t, rd_kafka_topic_deleter>; + std::map<std::string, topic_ptr> topics; + uint64_t delivery_tag = 1; int status = 0; CephContext* const cct; @@ -168,7 +173,6 @@ struct connection_t { // wait for 500ms to try and handle pending callbacks rd_kafka_flush(producer, 500); // destroy all topics - std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);}); topics.clear(); // destroy producer rd_kafka_destroy(producer); @@ -431,10 +435,9 @@ private: } // create a new topic unless it was already created - auto topic_it = std::find(conn->topics.begin(), conn->topics.end(), message->topic); - rd_kafka_topic_t* topic = nullptr; + auto topic_it = conn->topics.find(message->topic); if (topic_it == conn->topics.end()) { - topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); + connection_t::topic_ptr topic(rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr)); if (!topic) { const auto err = rd_kafka_last_error(); ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " @@ -444,17 +447,15 @@ private: } return; } - // TODO use the topics list as an LRU cache - conn->topics.push_back(topic); + topic_it = conn->topics.emplace(message->topic, std::move(topic)).first; ldout(conn->cct, 20) << "Kafka publish: successfully created topic: " << message->topic << dendl; } else { - topic = *topic_it; ldout(conn->cct, 20) << "Kafka publish: reused existing topic: " << message->topic << dendl; } const auto tag = (message->cb == nullptr ? nullptr : new uint64_t(conn->delivery_tag++)); const auto rc = rd_kafka_produce( - topic, + topic_it->second.get(), // TODO: non builtin partitioning RD_KAFKA_PARTITION_UA, // make a copy of the payload |