summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/rgw/driver/rados/rgw_bucket.cc132
-rw-r--r--src/rgw/driver/rados/rgw_notify.cc5
-rw-r--r--src/rgw/driver/rados/rgw_rados.cc10
-rw-r--r--src/rgw/driver/rados/rgw_rados.h4
-rw-r--r--src/rgw/driver/rados/rgw_sal_rados.cc132
-rw-r--r--src/rgw/driver/rados/rgw_sal_rados.h17
-rw-r--r--src/rgw/rgw_admin.cc39
-rw-r--r--src/rgw/rgw_pubsub.cc132
-rw-r--r--src/rgw/rgw_pubsub.h20
-rw-r--r--src/rgw/rgw_rest_pubsub.cc29
-rw-r--r--src/rgw/rgw_sal.h28
-rw-r--r--src/rgw/rgw_sal_filter.h28
-rw-r--r--src/rgw/rgw_sal_store.h29
-rw-r--r--src/rgw/services/svc_topic_rados.cc8
-rw-r--r--src/rgw/services/svc_topic_rados.h2
15 files changed, 530 insertions, 85 deletions
diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc
index 9f556553182..ce869f399d6 100644
--- a/src/rgw/driver/rados/rgw_bucket.cc
+++ b/src/rgw/driver/rados/rgw_bucket.cc
@@ -1336,28 +1336,6 @@ static int bucket_stats(rgw::sal::Driver* driver,
}
}
- // bucket notifications
- rgw_pubsub_bucket_topics result;
- if (driver->get_zone()->get_zonegroup().supports_feature(
- rgw::zone_features::notification_v2)) {
- ret = get_bucket_notifications(dpp, bucket.get(), result);
- if (ret < 0) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
- << std::endl;
- return -ret;
- }
- } else {
- RGWPubSub ps(driver, tenant_name);
- const RGWPubSub::Bucket b(ps, bucket.get());
- ret = b.get_topics(dpp, result, y);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
- << std::endl;
- return -ret;
- }
- }
- result.dump(formatter);
-
// TODO: bucket CORS
// TODO: bucket LC
formatter->close_section();
@@ -2132,6 +2110,92 @@ int RGWMetadataHandlerPut_Bucket::put_post(const DoutPrefixProvider *dpp)
return ret;
}
+int update_bucket_topic_mappings(const DoutPrefixProvider* dpp,
+ RGWBucketCompleteInfo* orig_bci,
+ RGWBucketCompleteInfo* current_bci,
+ rgw::sal::Driver* driver) {
+ const auto decode_attrs = [](const rgw::sal::Attrs& attrs,
+ rgw_pubsub_bucket_topics& bucket_topics) -> int {
+ auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (iter == attrs.end()) {
+ return 0;
+ }
+ try {
+ const auto& bl = iter->second;
+ auto biter = bl.cbegin();
+ bucket_topics.decode(biter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ return 0;
+ };
+ std::string bucket_name;
+ std::string bucket_tenant;
+ rgw_pubsub_bucket_topics old_bucket_topics;
+ if (orig_bci) {
+ auto ret = decode_attrs(orig_bci->attrs, old_bucket_topics);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed to decode OLD bucket topics for bucket: "
+ << orig_bci->info.bucket.name << dendl;
+ return ret;
+ }
+ bucket_name = orig_bci->info.bucket.name;
+ bucket_tenant = orig_bci->info.bucket.tenant;
+ }
+ rgw_pubsub_bucket_topics current_bucket_topics;
+ if (current_bci) {
+ auto ret = decode_attrs(current_bci->attrs, current_bucket_topics);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed to decode current bucket topics for bucket: "
+ << current_bci->info.bucket.name << dendl;
+ return ret;
+ }
+ bucket_name = current_bci->info.bucket.name;
+ bucket_tenant = current_bci->info.bucket.tenant;
+ }
+ // fetch the list of subscribed topics stored inside old_bucket attrs.
+ std::unordered_map<std::string, rgw_pubsub_topic> old_topics;
+ for (const auto& [_, topic_filter] : old_bucket_topics.topics) {
+ old_topics[topic_filter.topic.name] = topic_filter.topic;
+ }
+ // fetch the list of subscribed topics stored inside current_bucket attrs.
+ std::unordered_map<std::string, rgw_pubsub_topic> current_topics;
+ for (const auto& [_, topic_filter] : current_bucket_topics.topics) {
+ current_topics[topic_filter.topic.name] = topic_filter.topic;
+ }
+ // traverse thru old topics and check if they are not in current, then delete
+ // the mapping, if present in both current and old then delete from current
+ // set as we do not need to update those mapping.
+ int ret = 0;
+ for (const auto& [topic_name, topic] : old_topics) {
+ auto it = current_topics.find(topic_name);
+ if (it == current_topics.end()) {
+ const auto op_ret = driver->update_bucket_topic_mapping(
+ topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
+ /*add_mapping=*/false, null_yield, dpp);
+ if (op_ret < 0) {
+ ret = op_ret;
+ }
+ } else {
+ // already that attr is present, so do not update the mapping.
+ current_topics.erase(it);
+ }
+ }
+ // traverse thru current topics and check if they are any present, then add
+ // the mapping.
+ for (const auto& [topic_name, topic] : current_topics) {
+ const auto op_ret = driver->update_bucket_topic_mapping(
+ topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
+ /*add_mapping=*/true, null_yield, dpp);
+ if (op_ret < 0) {
+ ret = op_ret;
+ }
+ }
+ return ret;
+}
+
static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
@@ -2443,7 +2507,14 @@ public:
if (ret < 0 && ret != -ENOENT)
return ret;
- return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
+ ret = svc.bucket->remove_bucket_instance_info(
+ ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
+ if (ret < 0)
+ return ret;
+ ret = update_bucket_topic_mappings(dpp, &bci, /*current_bci=*/nullptr,
+ driver);
+ // update_bucket_topic_mapping error is swallowed.
+ return 0;
}
int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
@@ -2648,6 +2719,21 @@ int RGWMetadataHandlerPut_BucketInstance::put_post(const DoutPrefixProvider *dpp
}
} /* update lc */
+ /* update bucket topic mapping */
+ {
+ auto* orig_obj = static_cast<RGWBucketInstanceMetadataObject*>(old_obj);
+ auto* orig_bci = (orig_obj ? &orig_obj->get_bci() : nullptr);
+ ret = update_bucket_topic_mappings(dpp, orig_bci, &bci, bihandler->driver);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__
+ << " failed to apply bucket topic mapping for "
+ << bci.info.bucket.name << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << __func__
+ << " successfully applied bucket topic mapping for "
+ << bci.info.bucket.name << dendl;
+ }
return STATUS_APPLIED;
}
diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc
index e5d5dd602f5..e3ee61ba9df 100644
--- a/src/rgw/driver/rados/rgw_notify.cc
+++ b/src/rgw/driver/rados/rgw_notify.cc
@@ -494,7 +494,8 @@ private:
RGWPubSub ps(&rados_store, tenant_name);
rgw_pubsub_topic topic;
- auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield));
+ auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
+ optional_yield(io_context, yield), nullptr);
if (ret_of_get_topic < 0) {
// we can't migrate entries without topic info
ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
@@ -1056,7 +1057,7 @@ static inline bool notification_match(reservation_t& res,
const RGWPubSub ps(
res.store, res.user_tenant,
&res.store->svc()->zone->get_current_period().get_map().zonegroups);
- auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield);
+ auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
if (ret < 0) {
ldpp_dout(res.dpp, 1)
<< "INFO: failed to load topic: " << topic_cfg.name
diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc
index 5e111103c1c..b8fcfa2f368 100644
--- a/src/rgw/driver/rados/rgw_rados.cc
+++ b/src/rgw/driver/rados/rgw_rados.cc
@@ -1211,6 +1211,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
if (ret < 0)
return ret;
+ ret = open_topics_pool_ctx(dpp);
+ if (ret < 0)
+ return ret;
+
pools_initialized = true;
if (use_gc) {
@@ -1446,6 +1450,12 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp)
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true);
}
+int RGWRados::open_topics_pool_ctx(const DoutPrefixProvider* dpp) {
+ return rgw_init_ioctx(dpp, get_rados_handle(),
+ svc.zone->get_zone_params().topics_pool,
+ topics_pool_ctx, true, true);
+}
+
int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk)
{
diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h
index 264f5eb4f33..7e7a58480a6 100644
--- a/src/rgw/driver/rados/rgw_rados.h
+++ b/src/rgw/driver/rados/rgw_rados.h
@@ -358,6 +358,7 @@ class RGWRados
int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
+ int open_topics_pool_ctx(const DoutPrefixProvider* dpp);
int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk);
@@ -447,6 +448,7 @@ protected:
librados::IoCtx objexp_pool_ctx;
librados::IoCtx reshard_pool_ctx;
librados::IoCtx notif_pool_ctx; // .rgw.notif
+ librados::IoCtx topics_pool_ctx; // .rgw.meta:topics
bool pools_initialized{false};
@@ -533,6 +535,8 @@ public:
return notif_pool_ctx;
}
+ librados::IoCtx& get_topics_pool_ctx() { return topics_pool_ctx; }
+
void set_context(CephContext *_cct) {
cct = _cct;
}
diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc
index 608800c7e57..2bde7d192e5 100644
--- a/src/rgw/driver/rados/rgw_sal_rados.cc
+++ b/src/rgw/driver/rados/rgw_sal_rados.cc
@@ -325,6 +325,30 @@ int RadosBucket::remove(const DoutPrefixProvider* dpp,
this, get_attrs(), merge_attrs);
}
+ // remove bucket-topic mapping
+ auto iter = get_attrs().find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (iter != get_attrs().end()) {
+ rgw_pubsub_bucket_topics bucket_topics;
+ try {
+ const auto& bl = iter->second;
+ auto biter = bl.cbegin();
+ bucket_topics.decode(biter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to decode bucket topics for bucket: "
+ << get_name() << dendl;
+ }
+ if (!bucket_topics.topics.empty()) {
+ ret = store->remove_bucket_mapping_from_topics(
+ bucket_topics, rgw_make_bucket_entry_name(get_tenant(), get_name()),
+ y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: unable to remove notifications from bucket "
+ << get_name() << ". ret=" << ret << dendl;
+ }
+ }
+ }
+
ret = store->ctl()->bucket->sync_user_stats(dpp, info.owner, info, y, nullptr);
if (ret < 0) {
ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
@@ -1172,6 +1196,114 @@ int RadosStore::remove_topic_v2(const std::string& topic_name,
params, objv_tracker, y, dpp);
}
+int RadosStore::remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ // remove the bucket name from the topic-bucket omap for each topic
+ // subscribed.
+ std::unordered_set<std::string> topics_mapping_to_remove;
+ int ret = 0;
+ for (const auto& [_, topic_filter] : bucket_topics.topics) {
+ if (!topics_mapping_to_remove.insert(topic_filter.topic.name).second) {
+ continue; // already removed.
+ }
+ int op_ret = update_bucket_topic_mapping(topic_filter.topic, bucket_key,
+ /*add_mapping=*/false, y, dpp);
+ if (op_ret < 0) {
+ ret = op_ret;
+ }
+ }
+ return ret;
+}
+
+int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ bufferlist empty_bl;
+ librados::ObjectWriteOperation op;
+ int ret = 0;
+ if (add_mapping) {
+ std::map<std::string, bufferlist> mapping{{bucket_key, empty_bl}};
+ op.omap_set(mapping);
+ } else {
+ std::set<std::string> to_rm{{bucket_key}};
+ op.omap_rm_keys(to_rm);
+ }
+ ret = rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+ get_bucket_topic_mapping_oid(topic), &op, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to " << (add_mapping ? "add" : "remove")
+ << " topic bucket mapping for bucket: " << bucket_key
+ << " and topic: " << topic.name << " with ret:" << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "Successfully " << (add_mapping ? "added" : "removed")
+ << " topic bucket mapping for bucket: " << bucket_key
+ << " and topic: " << topic.name << dendl;
+ return ret;
+}
+
+int RadosStore::get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ constexpr auto max_chunk = 1024U;
+ std::string start_after;
+ bool more = true;
+ int rval;
+ while (more) {
+ librados::ObjectReadOperation op;
+ std::set<std::string> curr_keys;
+ op.omap_get_keys2(start_after, max_chunk, &curr_keys, &more, &rval);
+ const auto ret =
+ rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+ get_bucket_topic_mapping_oid(topic), &op, nullptr, y);
+ if (ret == -ENOENT) {
+ // mapping object was not created - nothing to do
+ return 0;
+ }
+ if (ret < 0) {
+ // TODO: do we need to check on rval as well as ret?
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed to read bucket topic mapping object for topic: "
+ << topic.name << ", ret= " << ret << dendl;
+ return ret;
+ }
+ if (more) {
+ if (curr_keys.empty()) {
+ return -EINVAL; // something wrong.
+ }
+ start_after = *curr_keys.rbegin();
+ }
+ bucket_keys.merge(curr_keys);
+ }
+ return 0;
+}
+
+int RadosStore::delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) {
+ librados::ObjectWriteOperation op;
+ op.remove();
+ const int ret =
+ rgw_rados_operate(dpp, rados->get_topics_pool_ctx(),
+ get_bucket_topic_mapping_oid(topic), &op, y);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: failed removing bucket topic mapping omap for topic: "
+ << topic.name << ", ret=" << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20)
+ << "Successfully deleted topic bucket mapping omap for topic: "
+ << topic.name << dendl;
+ return 0;
+}
+
int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y)
{
return rados->delete_raw_obj(dpp, obj, y);
diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h
index 33db603d785..85612eec1a9 100644
--- a/src/rgw/driver/rados/rgw_sal_rados.h
+++ b/src/rgw/driver/rados/rgw_sal_rados.h
@@ -184,6 +184,23 @@ class RadosStore : public StoreDriver {
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override;
+ int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
+ int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc
index afb161c90ae..bbd9231893f 100644
--- a/src/rgw/rgw_admin.cc
+++ b/src/rgw/rgw_admin.cc
@@ -1168,6 +1168,15 @@ static void show_reshard_status(
formatter->flush(cout);
}
+static void show_topics_info_v2(const rgw_pubsub_topic& topic,
+ std::set<std::string> subscribed_buckets,
+ Formatter* formatter) {
+ formatter->open_object_section("topic");
+ topic.dump(formatter);
+ encode_json("subscribed_buckets", subscribed_buckets, formatter);
+ formatter->close_section();
+}
+
class StoreDestructor {
rgw::sal::Driver* driver;
public:
@@ -10665,7 +10674,24 @@ next:
}
}
}
- encode_json("result", result, formatter.get());
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ Formatter::ObjectSection top_section(*formatter, "result");
+ Formatter::ArraySection s(*formatter, "topics");
+ for (const auto& [_, topic] : result.topics) {
+ std::set<std::string> subscribed_buckets;
+ ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
+ null_yield, dpp());
+ if (ret < 0) {
+ cerr << "failed to fetch bucket topic mapping info for topic: "
+ << topic.name << ", ret=" << ret << std::endl;
+ } else {
+ show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+ }
+ }
+ } else {
+ encode_json("result", result, formatter.get());
+ }
formatter->flush(cout);
}
@@ -10683,12 +10709,19 @@ next:
RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
rgw_pubsub_topic topic;
- ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
+ std::set<std::string> subscribed_buckets;
+ ret =
+ ps.get_topic(dpp(), topic_name, topic, null_yield, &subscribed_buckets);
if (ret < 0) {
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- encode_json("topic", topic, formatter.get());
+ if (driver->get_zone()->get_zonegroup().supports_feature(
+ rgw::zone_features::notification_v2)) {
+ show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+ } else {
+ encode_json("topic", topic, formatter.get());
+ }
formatter->flush(cout);
}
diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc
index 628f57901af..dec38ee87e8 100644
--- a/src/rgw/rgw_pubsub.cc
+++ b/src/rgw/rgw_pubsub.cc
@@ -9,7 +9,9 @@
#include "rgw_xml.h"
#include "rgw_arn.h"
#include "rgw_pubsub_push.h"
+#include "rgw_bucket.h"
#include "common/errno.h"
+#include "svc_topic_rados.h"
#include <regex>
#include <algorithm>
@@ -369,6 +371,7 @@ void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
encode_xml_key_value_entry("TopicArn", arn, f);
encode_xml_key_value_entry("OpaqueData", opaque_data, f);
encode_xml_key_value_entry("Policy", policy_text, f);
+ std::ostringstream stream;
f->close_section(); // Attributes
}
@@ -526,15 +529,20 @@ int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& res
<< "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
break;
}
- for (auto& topic_name : topics) {
- rgw_pubsub_topic topic;
- int ret = get_topic(dpp, topic_name, topic, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name
- << "' info: ret=" << ret << dendl;
- continue;
- }
- result.topics[topic_name] = std::move(topic);
+ for (auto& topic_entry : topics) {
+ std::string topic_name;
+ std::string topic_tenant;
+ parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
+ if (tenant != topic_tenant) {
+ continue;
+ }
+ rgw_pubsub_topic topic;
+ const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr);
+ if (op_ret < 0) {
+ ret = op_ret;
+ continue;
+ }
+ result.topics[topic_name] = std::move(topic);
}
} while (truncated);
driver->meta_list_keys_complete(handle);
@@ -583,13 +591,26 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub
return 0;
}
-int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
-{
+int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
+ const std::string& name,
+ rgw_pubsub_topic& result,
+ optional_yield y,
+ std::set<std::string>* subscribed_buckets) const {
if (use_notification_v2) {
- const int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
+ int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
<< " tenant: " << tenant << ", ret=" << ret << dendl;
+ return ret;
+ }
+ if (subscribed_buckets) {
+ ret =
+ driver->get_bucket_topic_mapping(result, *subscribed_buckets, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "failed to fetch bucket topic mapping info for topic: " << name
+ << " tenant: " << tenant << ", ret=" << ret << dendl;
+ }
}
return ret;
}
@@ -656,19 +677,56 @@ std::optional<rgw_pubsub_topic_filter> find_unique_topic(
return std::nullopt;
}
-int delete_all_notifications(const DoutPrefixProvider* dpp,
- const rgw_pubsub_bucket_topics& bucket_topics,
- std::map<std::string, bufferlist>& attrs,
- rgw::sal::Bucket* bucket,
- rgw::sal::Driver* driver,
- optional_yield y) {
+int store_bucket_attrs_and_update_mapping(
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Driver* driver,
+ rgw::sal::Bucket* bucket,
+ rgw_pubsub_bucket_topics& bucket_topics,
+ const rgw_pubsub_topic& topic,
+ optional_yield y) {
+ rgw::sal::Attrs& attrs = bucket->get_attrs();
+ if (!bucket_topics.topics.empty()) {
+ bufferlist bl;
+ bucket_topics.encode(bl);
+ attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+ } else {
+ auto it = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
+ if (it != attrs.end()) {
+ attrs.erase(it);
+ }
+ }
+ auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
+ << bucket->get_name() << " returned err= " << ret << dendl;
+ return ret;
+ }
+ if (bucket_topics.topics.empty()) {
+ // remove the bucket name from the topic-bucket omap
+ auto op_ret = driver->update_bucket_topic_mapping(
+ topic,
+ rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()),
+ /*add_mapping=*/false, y, dpp);
+ if (op_ret < 0) {
+ // TODO: should the error be reported, as attrs are already deleted.
+ // ret = op_ret;
+ }
+ }
+ return ret;
+}
+
+int delete_notification_attrs(const DoutPrefixProvider* dpp,
+ rgw::sal::Bucket* bucket,
+ optional_yield y) {
+ auto& attrs = bucket->get_attrs();
auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
if (iter == attrs.end()) {
return 0;
}
// delete all notifications of on a bucket
attrs.erase(iter);
- const auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
+ auto ret = bucket->merge_and_store_attrs(dpp, attrs, y);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "Failed to remove RGW_ATTR_BUCKET_NOTIFICATION attr on bucket="
@@ -691,11 +749,23 @@ int remove_notification_v2(const DoutPrefixProvider* dpp,
if (bucket_topics.topics.empty()) {
return 0;
}
- rgw::sal::Attrs& attrs = bucket->get_attrs();
+ // delete all notifications
if (notification_id.empty()) {
- return delete_all_notifications(dpp, bucket_topics, attrs, bucket, driver,
- y);
+ ret = delete_notification_attrs(dpp, bucket, y);
+ if (ret < 0) {
+ return ret;
+ }
+ int op_ret = driver->remove_bucket_mapping_from_topics(
+ bucket_topics,
+ rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()), y,
+ dpp);
+ if (op_ret < 0) {
+ // TODO: should the error be reported, as attrs are already deleted.
+ // ret = op_ret;
+ }
+ return ret;
}
+
// delete a specific notification
const auto unique_topic = find_unique_topic(bucket_topics, notification_id);
if (!unique_topic) {
@@ -706,23 +776,15 @@ int remove_notification_v2(const DoutPrefixProvider* dpp,
}
const auto& topic_name = unique_topic->topic.name;
bucket_topics.topics.erase(topic_to_unique(topic_name, notification_id));
- bufferlist bl;
- bucket_topics.encode(bl);
- attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
- ret = bucket->merge_and_store_attrs(dpp, attrs, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1)
- << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="
- << bucket->get_name() << " returned err= " << ret << dendl;
- }
- return ret;
+ return store_bucket_attrs_and_update_mapping(
+ dpp, driver, bucket, bucket_topics, unique_topic->topic, y);
}
int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const {
rgw_pubsub_topic topic_info;
- int ret = ps.get_topic(dpp, topic_name, topic_info, y);
+ int ret = ps.get_topic(dpp, topic_name, topic_info, y, nullptr);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
return ret;
@@ -908,7 +970,7 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
optional_yield y) const {
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topic topic;
- int ret = get_topic(dpp, name, topic, y);
+ int ret = get_topic(dpp, name, topic, y, nullptr);
if (ret < 0 && ret != -ENOENT) {
return ret;
} else if (ret == -ENOENT) {
@@ -922,7 +984,9 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret
<< dendl;
+ return ret;
}
+ ret = driver->delete_bucket_topic_mapping(topic, y, dpp);
return ret;
}
diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h
index 46da8e045a9..8509d86e225 100644
--- a/src/rgw/rgw_pubsub.h
+++ b/src/rgw/rgw_pubsub.h
@@ -602,19 +602,13 @@ public:
int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
return read_topics(dpp, result, nullptr, y);
}
- // get a bucket_topic with by its name and populate it into "result"
- // return -ENOENT if the topic does not exists
- // return 0 on success, error code otherwise
- int get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, rgw_pubsub_topic_filter& result, optional_yield y) const;
// adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
// assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
// for S3 compliant notifications the version with: s3_filter and notif_name should be used
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
- const rgw::notify::EventTypeList& events, optional_yield y) const;
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
@@ -633,9 +627,15 @@ public:
return read_topics(dpp, result, nullptr, y);
}
// get a topic with by its name and populate it into "result"
- // return -ENOENT if the topic does not exists
- // return 0 on success, error code otherwise
- int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const;
+ // return -ENOENT if the topic does not exists
+ // return 0 on success, error code otherwise.
+ // if |subscribed_buckets| valid, then for notification_v2 read the bucket
+ // topic mapping object.
+ int get_topic(const DoutPrefixProvider* dpp,
+ const std::string& name,
+ rgw_pubsub_topic& result,
+ optional_yield y,
+ std::set<std::string>* subscribed_buckets) const;
// create a topic with a name only
// if the topic already exists it is a no-op (considered success)
// return 0 on success, error code otherwise
diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc
index f2e5439208f..38943736e45 100644
--- a/src/rgw/rgw_rest_pubsub.cc
+++ b/src/rgw/rgw_rest_pubsub.cc
@@ -200,7 +200,7 @@ class RGWPSCreateTopicOp : public RGWOp {
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
rgw_pubsub_topic result;
- ret = ps.get_topic(this, topic_name, result, y);
+ ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret == -ENOENT) {
// topic not present
return 0;
@@ -424,7 +424,7 @@ void RGWPSGetTopicOp::execute(optional_yield y) {
}
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
- op_ret = ps.get_topic(this, topic_name, result, y);
+ op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
@@ -509,7 +509,7 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
}
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
- op_ret = ps.get_topic(this, topic_name, result, y);
+ op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
@@ -638,7 +638,7 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
rgw_pubsub_topic result;
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
- ret = ps.get_topic(this, topic_name, result, y);
+ ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << ret << dendl;
@@ -801,7 +801,7 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
&s->penv.site->get_period()->get_map().zonegroups);
rgw_pubsub_topic result;
- op_ret = ps.get_topic(this, topic_name, result, y);
+ op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret == 0) {
op_ret = verify_topic_owner_or_policy(
s, result, driver->get_zone()->get_zonegroup().get_name(),
@@ -1071,8 +1071,8 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
const auto topic_name = arn->resource;
// get topic information. destination information is stored in the topic
- rgw_pubsub_topic topic_info;
- op_ret = ps.get_topic(this, topic_name, topic_info, y);
+ rgw_pubsub_topic topic_info;
+ op_ret = ps.get_topic(this, topic_name, topic_info, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
@@ -1177,7 +1177,6 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
const RGWPubSub ps(driver, s->owner.id.tenant,
&s->penv.site->get_period()->get_map().zonegroups);
std::unordered_map<std::string, rgw_pubsub_topic> topics;
- const auto rgwbucket = rgw_bucket(s->bucket_tenant, s->bucket_name, "");
for (const auto& c : configurations.list) {
const auto& notif_name = c.id;
if (notif_name.empty()) {
@@ -1212,7 +1211,7 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
if (!topics.contains(topic_name)) {
// get topic information. destination information is stored in the topic
rgw_pubsub_topic topic_info;
- op_ret = ps.get_topic(this, topic_name, topic_info, y);
+ op_ret = ps.get_topic(this, topic_name, topic_info, y,nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', ret=" << op_ret << dendl;
@@ -1248,6 +1247,18 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
<< bucket->get_name() << " returned err= " << op_ret << dendl;
return;
}
+ for (const auto& [_, topic] : topics) {
+ const auto ret = driver->update_bucket_topic_mapping(
+ topic,
+ rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()),
+ /*add_mapping=*/true, y, this);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "Failed to remove topic mapping on bucket="
+ << bucket->get_name() << " ret= " << ret << dendl;
+ // error should be reported ??
+ // op_ret = ret;
+ }
+ }
ldpp_dout(this, 20) << "successfully created bucket notification for bucket: "
<< bucket->get_name() << dendl;
}
diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h
index 89ac23341d2..4dc3c9dc836 100644
--- a/src/rgw/rgw_sal.h
+++ b/src/rgw/rgw_sal.h
@@ -332,6 +332,34 @@ class Driver {
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) = 0;
+ /** Update the bucket-topic mapping in the store, if |add_mapping|=true then
+ * adding the |bucket_key| |topic| mapping to store, else delete the
+ * |bucket_key| |topic| mapping from the store. The |bucket_key| is
+ * in the format |tenant_name + "/" + bucket_name| if tenant is not empty
+ * else |bucket_name|*/
+ virtual int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Remove the |bucket_key| from bucket-topic mapping in the store, for all
+ the topics under |bucket_topics|*/
+ virtual int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Get the bucket-topic mapping from the backend store. The |bucket_keys|
+ * are in the format |tenant_name + "/" + bucket_name| if tenant is not
+ * empty else |bucket_name|*/
+ virtual int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
+ /** Remove the bucket-topic mapping from the backend store. */
+ virtual int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) = 0;
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h
index 71991378e32..d5bf9afe248 100644
--- a/src/rgw/rgw_sal_filter.h
+++ b/src/rgw/rgw_sal_filter.h
@@ -218,7 +218,33 @@ public:
const DoutPrefixProvider* dpp) override {
return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
}
-
+ int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->update_bucket_topic_mapping(topic, bucket_key, add_mapping, y,
+ dpp);
+ }
+ int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->remove_bucket_mapping_from_topics(bucket_topics, bucket_key, y,
+ dpp);
+ }
+ int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
+ }
+ int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return next->delete_bucket_topic_mapping(topic, y, dpp);
+ }
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h
index eda0f08ede6..b34276a9daa 100644
--- a/src/rgw/rgw_sal_store.h
+++ b/src/rgw/rgw_sal_store.h
@@ -46,14 +46,39 @@ class StoreDriver : public Driver {
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
- return -ENOENT;
+ return -EOPNOTSUPP;
}
int remove_topic_v2(const std::string& topic_name,
const std::string& tenant,
RGWObjVersionTracker* objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override {
- return -ENOENT;
+ return -EOPNOTSUPP;
+ }
+ int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ const std::string& bucket_key,
+ bool add_mapping,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
+ }
+ int remove_bucket_mapping_from_topics(
+ const rgw_pubsub_bucket_topics& bucket_topics,
+ const std::string& bucket_key,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
+ }
+ int get_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ std::set<std::string>& bucket_keys,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
+ }
+ int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic,
+ optional_yield y,
+ const DoutPrefixProvider* dpp) override {
+ return -EOPNOTSUPP;
}
};
diff --git a/src/rgw/services/svc_topic_rados.cc b/src/rgw/services/svc_topic_rados.cc
index 64c9106776c..9e0b75d4664 100644
--- a/src/rgw/services/svc_topic_rados.cc
+++ b/src/rgw/services/svc_topic_rados.cc
@@ -10,6 +10,7 @@
static std::string topic_oid_prefix = "topic.";
static constexpr char topic_tenant_delim[] = ":";
+static std::string bucket_topic_oid_prefix = "buckets.";
std::string get_topic_key(const std::string& topic_name,
const std::string& tenant) {
@@ -32,6 +33,11 @@ void parse_topic_entry(const std::string& topic_entry,
*topic_name = topic_entry;
}
}
+
+std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic) {
+ return bucket_topic_oid_prefix + get_topic_key(topic.name, topic.user.tenant);
+}
+
class RGWSI_Topic_Module : public RGWSI_MBSObj_Handler_Module {
RGWSI_Topic_RADOS::Svc& svc;
const std::string prefix;
@@ -131,7 +137,7 @@ int RGWTopicMetadataHandler::do_get(RGWSI_MetaBackend_Handler::Op* op,
parse_topic_entry(entry, &tenant, &topic_name);
RGWPubSub ps(driver, tenant,
&topic_svc->svc.zone->get_current_period().get_map().zonegroups);
- int ret = ps.get_topic(dpp, topic_name, result, y);
+ int ret = ps.get_topic(dpp, topic_name, result, y, nullptr);
if (ret < 0) {
return ret;
}
diff --git a/src/rgw/services/svc_topic_rados.h b/src/rgw/services/svc_topic_rados.h
index e630a610e97..bc4e3537345 100644
--- a/src/rgw/services/svc_topic_rados.h
+++ b/src/rgw/services/svc_topic_rados.h
@@ -94,3 +94,5 @@ std::string get_topic_key(const std::string& topic_name,
void parse_topic_entry(const std::string& topic_entry,
std::string* tenant_name,
std::string* topic_name);
+
+std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic); \ No newline at end of file