diff options
author | Casey Bodley <cbodley@redhat.com> | 2024-01-10 00:55:40 +0100 |
---|---|---|
committer | Casey Bodley <cbodley@redhat.com> | 2024-03-05 18:55:25 +0100 |
commit | db6c73a0cdcf60a920c91b6d4506df36d98b7308 (patch) | |
tree | d619501417c3398dfd838962b34de3aef3c1b70d | |
parent | rgw/mdlog: add complete_entry() for most common usage (diff) | |
download | ceph-db6c73a0cdcf60a920c91b6d4506df36d98b7308.tar.xz ceph-db6c73a0cdcf60a920c91b6d4506df36d98b7308.zip |
rgw: paginate ListTopics
rename read_topics()/write_topics() to 'v1' and only call them from
internal v1 call paths
public get_topics() now calls read_topics_v1() for the v1 case, and does
the paginated listing with driver->meta_list_keys_next() for v2
RGWPSListTopicsOp now uses the NextToken request/response params with
the paginated get_topics(), limiting responses to 100 entries like AWS
'radosgw-admin topic list' also paginates the listing according to
--max-entries to avoid reading everything into memory at once
Signed-off-by: Casey Bodley <cbodley@redhat.com>
-rw-r--r-- | src/rgw/rgw_admin.cc | 62 | ||||
-rw-r--r-- | src/rgw/rgw_pubsub.cc | 102 | ||||
-rw-r--r-- | src/rgw/rgw_pubsub.h | 20 | ||||
-rw-r--r-- | src/rgw/rgw_rest_pubsub.cc | 11 |
4 files changed, 114 insertions, 81 deletions
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 395e05ce517..a5933604b7e 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1169,7 +1169,7 @@ static void show_reshard_status( } static void show_topics_info_v2(const rgw_pubsub_topic& topic, - std::set<std::string> subscribed_buckets, + const std::set<std::string>& subscribed_buckets, Formatter* formatter) { formatter->open_object_section("topic"); topic.dump(formatter); @@ -10650,39 +10650,47 @@ next: if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) { RGWPubSub ps(driver, tenant, *site); - rgw_pubsub_topics result; - ret = ps.get_topics(dpp(), result, null_yield); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - if (!rgw::sal::User::empty(user)) { - for (auto it = result.topics.cbegin(); it != result.topics.cend();) { - const auto& topic = it->second; - if (user->get_id() != topic.user) { - result.topics.erase(it++); - } else { - ++it; - } + std::string next_token = marker; + + formatter->open_object_section("result"); + formatter->open_array_section("topics"); + do { + rgw_pubsub_topics result; + int ret = ps.get_topics(dpp(), next_token, max_entries, + result, next_token, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; + return -ret; } - } - if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) { - Formatter::ObjectSection top_section(*formatter, "result"); - Formatter::ArraySection s(*formatter, "topics"); for (const auto& [_, topic] : result.topics) { + if (!rgw::sal::User::empty(user) && user->get_id() != topic.user) { + continue; + } std::set<std::string> subscribed_buckets; - ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets, - null_yield, dpp()); - if (ret < 0) { - cerr << "failed to fetch bucket topic mapping info for topic: " - << topic.name << ", ret=" << ret << std::endl; - } else { + if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) { + ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets, + null_yield, dpp()); + if (ret < 0) { + cerr << "failed to fetch bucket topic mapping info for topic: " + << topic.name << ", ret=" << ret << std::endl; + } show_topics_info_v2(topic, subscribed_buckets, formatter.get()); + } else { + encode_json("result", result, formatter.get()); + } + if (max_entries_specified) { + --max_entries; } } - } else { - encode_json("result", result, formatter.get()); + } while (!next_token.empty() && max_entries > 0); + formatter->close_section(); // topics + if (max_entries_specified) { + encode_json("truncated", !next_token.empty(), formatter.get()); + if (!next_token.empty()) { + encode_json("marker", next_token, formatter.get()); + } } + formatter->close_section(); // result formatter->flush(cout); } diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index f4ddb118cd3..bec78b687c8 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -510,45 +510,61 @@ RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, { } -int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, - RGWObjVersionTracker *objv_tracker, optional_yield y) const +int RGWPubSub::get_topics(const DoutPrefixProvider* dpp, + const std::string& start_marker, int max_items, + rgw_pubsub_topics& result, std::string& next_marker, + optional_yield y) const { - if (use_notification_v2) { - void* handle = NULL; - auto ret = - driver->meta_list_keys_init(dpp, "topic", std::string(), &handle); - if (ret < 0) { - return ret; - } - bool truncated; - int max = 1000; - do { - std::list<std::string> topics; - ret = driver->meta_list_keys_next(dpp, handle, max, topics, &truncated); - if (ret < 0) { - ldpp_dout(dpp, 1) - << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; - break; - } - for (auto& topic_entry : topics) { - std::string topic_name; - std::string topic_tenant; - parse_topic_entry(topic_entry, &topic_tenant, &topic_name); - if (tenant != topic_tenant) { - continue; - } - rgw_pubsub_topic topic; - const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr); - if (op_ret < 0) { - ret = op_ret; - continue; - } - result.topics[topic_name] = std::move(topic); - } - } while (truncated); - driver->meta_list_keys_complete(handle); + if (!use_notification_v2) { + // v1 returns all topics, ignoring marker/max_items + return read_topics_v1(dpp, result, nullptr, y); + } + + // TODO: prefix filter on 'tenant:' + void* handle = NULL; + int ret = driver->meta_list_keys_init(dpp, "topic", start_marker, &handle); + if (ret < 0) { + return ret; + } + auto g = make_scope_guard( + [this, handle] { driver->meta_list_keys_complete(handle); }); + + if (max_items > 1000) { + max_items = 1000; + } + std::list<std::string> topics; + bool truncated = false; + ret = driver->meta_list_keys_next(dpp, handle, max_items, topics, &truncated); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; return ret; } + for (auto& topic_entry : topics) { + std::string topic_name; + std::string topic_tenant; + parse_topic_entry(topic_entry, &topic_tenant, &topic_name); + if (tenant != topic_tenant) { + continue; + } + rgw_pubsub_topic topic; + int r = get_topic(dpp, topic_name, topic, y, nullptr); + if (r < 0) { + continue; + } + result.topics[topic_name] = std::move(topic); + } + if (truncated) { + next_marker = driver->meta_get_marker(handle); + } else { + next_marker.clear(); + } + return ret; +} + +int RGWPubSub::read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, + RGWObjVersionTracker *objv_tracker, optional_yield y) const +{ const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp); if (ret < 0) { ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; @@ -557,8 +573,8 @@ int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& res return 0; } -int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, - RGWObjVersionTracker *objv_tracker, optional_yield y) const +int RGWPubSub::write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, + RGWObjVersionTracker *objv_tracker, optional_yield y) const { const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp); if (ret < 0 && ret != -ENOENT) { @@ -616,7 +632,7 @@ int RGWPubSub::get_topic(const DoutPrefixProvider* dpp, return ret; } rgw_pubsub_topics topics; - const int ret = read_topics(dpp, topics, nullptr, y); + const int ret = read_topics_v1(dpp, topics, nullptr, y); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; @@ -932,7 +948,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; - int ret = read_topics(dpp, topics, &objv_tracker, y); + int ret = read_topics_v1(dpp, topics, &objv_tracker, y); if (ret < 0 && ret != -ENOENT) { // its not an error if not topics exist, we create one ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; @@ -947,7 +963,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp, new_topic.opaque_data = opaque_data; new_topic.policy_text = policy_text; - ret = write_topics(dpp, topics, &objv_tracker, y); + ret = write_topics_v1(dpp, topics, &objv_tracker, y); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; @@ -989,7 +1005,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na RGWObjVersionTracker objv_tracker; rgw_pubsub_topics topics; - int ret = read_topics(dpp, topics, &objv_tracker, y); + int ret = read_topics_v1(dpp, topics, &objv_tracker, y); if (ret < 0 && ret != -ENOENT) { ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; @@ -1001,7 +1017,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na topics.topics.erase(name); - ret = write_topics(dpp, topics, &objv_tracker, y); + ret = write_topics_v1(dpp, topics, &objv_tracker, y); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; return ret; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index ed785672175..519c1053ab3 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -562,10 +562,10 @@ class RGWPubSub const std::string tenant; bool use_notification_v2 = false; - int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, - RGWObjVersionTracker* objv_tracker, optional_yield y) const; - int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, - RGWObjVersionTracker* objv_tracker, optional_yield y) const; + int read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, + RGWObjVersionTracker* objv_tracker, optional_yield y) const; + int write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, + RGWObjVersionTracker* objv_tracker, optional_yield y) const; public: RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant); @@ -620,11 +620,13 @@ public: int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const; }; - // get the list of topics - // return 0 on success or if no topic was associated with the bucket, error code otherwise - int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const { - return read_topics(dpp, result, nullptr, y); - } + // get a paginated list of topics + // return 0 on success, error code otherwise + int get_topics(const DoutPrefixProvider* dpp, + const std::string& start_marker, int max_items, + rgw_pubsub_topics& result, std::string& next_marker, + optional_yield y) const; + // get a topic with by its name and populate it into "result" // return -ENOENT if the topic does not exists // return 0 on success, error code otherwise. diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index c64b5785337..66574cc8272 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -293,6 +293,7 @@ void RGWPSCreateTopicOp::execute(optional_yield y) { class RGWPSListTopicsOp : public RGWOp { private: rgw_pubsub_topics result; + std::string next_token; public: int verify_permission(optional_yield) override { @@ -325,15 +326,21 @@ public: f->close_section(); // ListTopicsResult f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); - f->close_section(); // ResponseMetadat + f->close_section(); // ResponseMetadata + if (!next_token.empty()) { + encode_xml("NextToken", next_token, f); + } f->close_section(); // ListTopicsResponse rgw_flush_formatter_and_reset(s, f); } }; void RGWPSListTopicsOp::execute(optional_yield y) { + const std::string start_token = s->info.args.get("NextToken"); + const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site); - op_ret = ps.get_topics(this, result, y); + constexpr int max_items = 100; + op_ret = ps.get_topics(this, start_token, max_items, result, next_token, y); // if there are no topics it is not considered an error op_ret = op_ret == -ENOENT ? 0 : op_ret; if (op_ret < 0) { |