summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCasey Bodley <cbodley@redhat.com>2024-01-10 00:55:40 +0100
committerCasey Bodley <cbodley@redhat.com>2024-03-05 18:55:25 +0100
commitdb6c73a0cdcf60a920c91b6d4506df36d98b7308 (patch)
treed619501417c3398dfd838962b34de3aef3c1b70d
parentrgw/mdlog: add complete_entry() for most common usage (diff)
downloadceph-db6c73a0cdcf60a920c91b6d4506df36d98b7308.tar.xz
ceph-db6c73a0cdcf60a920c91b6d4506df36d98b7308.zip
rgw: paginate ListTopics
rename read_topics()/write_topics() to 'v1' and only call them from internal v1 call paths public get_topics() now calls read_topics_v1() for the v1 case, and does the paginated listing with driver->meta_list_keys_next() for v2 RGWPSListTopicsOp now uses the NextToken request/response params with the paginated get_topics(), limiting responses to 100 entries like AWS 'radosgw-admin topic list' also paginates the listing according to --max-entries to avoid reading everything into memory at once Signed-off-by: Casey Bodley <cbodley@redhat.com>
-rw-r--r--src/rgw/rgw_admin.cc62
-rw-r--r--src/rgw/rgw_pubsub.cc102
-rw-r--r--src/rgw/rgw_pubsub.h20
-rw-r--r--src/rgw/rgw_rest_pubsub.cc11
4 files changed, 114 insertions, 81 deletions
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc
index 395e05ce517..a5933604b7e 100644
--- a/src/rgw/rgw_admin.cc
+++ b/src/rgw/rgw_admin.cc
@@ -1169,7 +1169,7 @@ static void show_reshard_status(
}
static void show_topics_info_v2(const rgw_pubsub_topic& topic,
- std::set<std::string> subscribed_buckets,
+ const std::set<std::string>& subscribed_buckets,
Formatter* formatter) {
formatter->open_object_section("topic");
topic.dump(formatter);
@@ -10650,39 +10650,47 @@ next:
if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
RGWPubSub ps(driver, tenant, *site);
- rgw_pubsub_topics result;
- ret = ps.get_topics(dpp(), result, null_yield);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- if (!rgw::sal::User::empty(user)) {
- for (auto it = result.topics.cbegin(); it != result.topics.cend();) {
- const auto& topic = it->second;
- if (user->get_id() != topic.user) {
- result.topics.erase(it++);
- } else {
- ++it;
- }
+ std::string next_token = marker;
+
+ formatter->open_object_section("result");
+ formatter->open_array_section("topics");
+ do {
+ rgw_pubsub_topics result;
+ int ret = ps.get_topics(dpp(), next_token, max_entries,
+ result, next_token, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
}
- }
- if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
- Formatter::ObjectSection top_section(*formatter, "result");
- Formatter::ArraySection s(*formatter, "topics");
for (const auto& [_, topic] : result.topics) {
+ if (!rgw::sal::User::empty(user) && user->get_id() != topic.user) {
+ continue;
+ }
std::set<std::string> subscribed_buckets;
- ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
- null_yield, dpp());
- if (ret < 0) {
- cerr << "failed to fetch bucket topic mapping info for topic: "
- << topic.name << ", ret=" << ret << std::endl;
- } else {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+ ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
+ null_yield, dpp());
+ if (ret < 0) {
+ cerr << "failed to fetch bucket topic mapping info for topic: "
+ << topic.name << ", ret=" << ret << std::endl;
+ }
show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+ } else {
+ encode_json("result", result, formatter.get());
+ }
+ if (max_entries_specified) {
+ --max_entries;
}
}
- } else {
- encode_json("result", result, formatter.get());
+ } while (!next_token.empty() && max_entries > 0);
+ formatter->close_section(); // topics
+ if (max_entries_specified) {
+ encode_json("truncated", !next_token.empty(), formatter.get());
+ if (!next_token.empty()) {
+ encode_json("marker", next_token, formatter.get());
+ }
}
+ formatter->close_section(); // result
formatter->flush(cout);
}
diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc
index f4ddb118cd3..bec78b687c8 100644
--- a/src/rgw/rgw_pubsub.cc
+++ b/src/rgw/rgw_pubsub.cc
@@ -510,45 +510,61 @@ RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
{
}
-int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
- RGWObjVersionTracker *objv_tracker, optional_yield y) const
+int RGWPubSub::get_topics(const DoutPrefixProvider* dpp,
+ const std::string& start_marker, int max_items,
+ rgw_pubsub_topics& result, std::string& next_marker,
+ optional_yield y) const
{
- if (use_notification_v2) {
- void* handle = NULL;
- auto ret =
- driver->meta_list_keys_init(dpp, "topic", std::string(), &handle);
- if (ret < 0) {
- return ret;
- }
- bool truncated;
- int max = 1000;
- do {
- std::list<std::string> topics;
- ret = driver->meta_list_keys_next(dpp, handle, max, topics, &truncated);
- if (ret < 0) {
- ldpp_dout(dpp, 1)
- << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
- break;
- }
- for (auto& topic_entry : topics) {
- std::string topic_name;
- std::string topic_tenant;
- parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
- if (tenant != topic_tenant) {
- continue;
- }
- rgw_pubsub_topic topic;
- const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr);
- if (op_ret < 0) {
- ret = op_ret;
- continue;
- }
- result.topics[topic_name] = std::move(topic);
- }
- } while (truncated);
- driver->meta_list_keys_complete(handle);
+ if (!use_notification_v2) {
+ // v1 returns all topics, ignoring marker/max_items
+ return read_topics_v1(dpp, result, nullptr, y);
+ }
+
+ // TODO: prefix filter on 'tenant:'
+ void* handle = NULL;
+ int ret = driver->meta_list_keys_init(dpp, "topic", start_marker, &handle);
+ if (ret < 0) {
+ return ret;
+ }
+ auto g = make_scope_guard(
+ [this, handle] { driver->meta_list_keys_complete(handle); });
+
+ if (max_items > 1000) {
+ max_items = 1000;
+ }
+ std::list<std::string> topics;
+ bool truncated = false;
+ ret = driver->meta_list_keys_next(dpp, handle, max_items, topics, &truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
return ret;
}
+ for (auto& topic_entry : topics) {
+ std::string topic_name;
+ std::string topic_tenant;
+ parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
+ if (tenant != topic_tenant) {
+ continue;
+ }
+ rgw_pubsub_topic topic;
+ int r = get_topic(dpp, topic_name, topic, y, nullptr);
+ if (r < 0) {
+ continue;
+ }
+ result.topics[topic_name] = std::move(topic);
+ }
+ if (truncated) {
+ next_marker = driver->meta_get_marker(handle);
+ } else {
+ next_marker.clear();
+ }
+ return ret;
+}
+
+int RGWPubSub::read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
+ RGWObjVersionTracker *objv_tracker, optional_yield y) const
+{
const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
@@ -557,8 +573,8 @@ int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& res
return 0;
}
-int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
- RGWObjVersionTracker *objv_tracker, optional_yield y) const
+int RGWPubSub::write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
+ RGWObjVersionTracker *objv_tracker, optional_yield y) const
{
const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp);
if (ret < 0 && ret != -ENOENT) {
@@ -616,7 +632,7 @@ int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
return ret;
}
rgw_pubsub_topics topics;
- const int ret = read_topics(dpp, topics, nullptr, y);
+ const int ret = read_topics_v1(dpp, topics, nullptr, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
@@ -932,7 +948,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
- int ret = read_topics(dpp, topics, &objv_tracker, y);
+ int ret = read_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0 && ret != -ENOENT) {
// its not an error if not topics exist, we create one
ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
@@ -947,7 +963,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
new_topic.opaque_data = opaque_data;
new_topic.policy_text = policy_text;
- ret = write_topics(dpp, topics, &objv_tracker, y);
+ ret = write_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
@@ -989,7 +1005,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
- int ret = read_topics(dpp, topics, &objv_tracker, y);
+ int ret = read_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
@@ -1001,7 +1017,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
topics.topics.erase(name);
- ret = write_topics(dpp, topics, &objv_tracker, y);
+ ret = write_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
return ret;
diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h
index ed785672175..519c1053ab3 100644
--- a/src/rgw/rgw_pubsub.h
+++ b/src/rgw/rgw_pubsub.h
@@ -562,10 +562,10 @@ class RGWPubSub
const std::string tenant;
bool use_notification_v2 = false;
- int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
- RGWObjVersionTracker* objv_tracker, optional_yield y) const;
- int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
- RGWObjVersionTracker* objv_tracker, optional_yield y) const;
+ int read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
+ int write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
public:
RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
@@ -620,11 +620,13 @@ public:
int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
};
- // get the list of topics
- // return 0 on success or if no topic was associated with the bucket, error code otherwise
- int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const {
- return read_topics(dpp, result, nullptr, y);
- }
+ // get a paginated list of topics
+ // return 0 on success, error code otherwise
+ int get_topics(const DoutPrefixProvider* dpp,
+ const std::string& start_marker, int max_items,
+ rgw_pubsub_topics& result, std::string& next_marker,
+ optional_yield y) const;
+
// get a topic with by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise.
diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc
index c64b5785337..66574cc8272 100644
--- a/src/rgw/rgw_rest_pubsub.cc
+++ b/src/rgw/rgw_rest_pubsub.cc
@@ -293,6 +293,7 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
class RGWPSListTopicsOp : public RGWOp {
private:
rgw_pubsub_topics result;
+ std::string next_token;
public:
int verify_permission(optional_yield) override {
@@ -325,15 +326,21 @@ public:
f->close_section(); // ListTopicsResult
f->open_object_section("ResponseMetadata");
encode_xml("RequestId", s->req_id, f);
- f->close_section(); // ResponseMetadat
+ f->close_section(); // ResponseMetadata
+ if (!next_token.empty()) {
+ encode_xml("NextToken", next_token, f);
+ }
f->close_section(); // ListTopicsResponse
rgw_flush_formatter_and_reset(s, f);
}
};
void RGWPSListTopicsOp::execute(optional_yield y) {
+ const std::string start_token = s->info.args.get("NextToken");
+
const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
- op_ret = ps.get_topics(this, result, y);
+ constexpr int max_items = 100;
+ op_ret = ps.get_topics(this, start_token, max_items, result, next_token, y);
// if there are no topics it is not considered an error
op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {