diff options
-rw-r--r-- | src/rgw/driver/rados/rgw_bucket.cc | 132 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_notify.cc | 5 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_rados.cc | 10 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_rados.h | 4 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_sal_rados.cc | 132 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_sal_rados.h | 17 | ||||
-rw-r--r-- | src/rgw/rgw_admin.cc | 39 | ||||
-rw-r--r-- | src/rgw/rgw_pubsub.cc | 132 | ||||
-rw-r--r-- | src/rgw/rgw_pubsub.h | 20 | ||||
-rw-r--r-- | src/rgw/rgw_rest_pubsub.cc | 29 | ||||
-rw-r--r-- | src/rgw/rgw_sal.h | 28 | ||||
-rw-r--r-- | src/rgw/rgw_sal_filter.h | 28 | ||||
-rw-r--r-- | src/rgw/rgw_sal_store.h | 29 | ||||
-rw-r--r-- | src/rgw/services/svc_topic_rados.cc | 8 | ||||
-rw-r--r-- | src/rgw/services/svc_topic_rados.h | 2 |
15 files changed, 530 insertions, 85 deletions
diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc index 9f556553182..ce869f399d6 100644 --- a/src/rgw/driver/rados/rgw_bucket.cc +++ b/src/rgw/driver/rados/rgw_bucket.cc @@ -1336,28 +1336,6 @@ static int bucket_stats(rgw::sal::Driver* driver, } } - // bucket notifications - rgw_pubsub_bucket_topics result; - if (driver->get_zone()->get_zonegroup().supports_feature( - rgw::zone_features::notification_v2)) { - ret = get_bucket_notifications(dpp, bucket.get(), result); - if (ret < 0) { - cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) - << std::endl; - return -ret; - } - } else { - RGWPubSub ps(driver, tenant_name); - const RGWPubSub::Bucket b(ps, bucket.get()); - ret = b.get_topics(dpp, result, y); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) - << std::endl; - return -ret; - } - } - result.dump(formatter); - // TODO: bucket CORS // TODO: bucket LC formatter->close_section(); @@ -2132,6 +2110,92 @@ int RGWMetadataHandlerPut_Bucket::put_post(const DoutPrefixProvider *dpp) return ret; } +int update_bucket_topic_mappings(const DoutPrefixProvider* dpp, + RGWBucketCompleteInfo* orig_bci, + RGWBucketCompleteInfo* current_bci, + rgw::sal::Driver* driver) { + const auto decode_attrs = [](const rgw::sal::Attrs& attrs, + rgw_pubsub_bucket_topics& bucket_topics) -> int { + auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); + if (iter == attrs.end()) { + return 0; + } + try { + const auto& bl = iter->second; + auto biter = bl.cbegin(); + bucket_topics.decode(biter); + } catch (buffer::error& err) { + return -EIO; + } + return 0; + }; + std::string bucket_name; + std::string bucket_tenant; + rgw_pubsub_bucket_topics old_bucket_topics; + if (orig_bci) { + auto ret = decode_attrs(orig_bci->attrs, old_bucket_topics); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: failed to decode OLD bucket topics for bucket: " + << orig_bci->info.bucket.name << dendl; + return ret; + } + bucket_name = orig_bci->info.bucket.name; + bucket_tenant = orig_bci->info.bucket.tenant; + } + rgw_pubsub_bucket_topics current_bucket_topics; + if (current_bci) { + auto ret = decode_attrs(current_bci->attrs, current_bucket_topics); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: failed to decode current bucket topics for bucket: " + << current_bci->info.bucket.name << dendl; + return ret; + } + bucket_name = current_bci->info.bucket.name; + bucket_tenant = current_bci->info.bucket.tenant; + } + // fetch the list of subscribed topics stored inside old_bucket attrs. + std::unordered_map<std::string, rgw_pubsub_topic> old_topics; + for (const auto& [_, topic_filter] : old_bucket_topics.topics) { + old_topics[topic_filter.topic.name] = topic_filter.topic; + } + // fetch the list of subscribed topics stored inside current_bucket attrs. + std::unordered_map<std::string, rgw_pubsub_topic> current_topics; + for (const auto& [_, topic_filter] : current_bucket_topics.topics) { + current_topics[topic_filter.topic.name] = topic_filter.topic; + } + // traverse thru old topics and check if they are not in current, then delete + // the mapping, if present in both current and old then delete from current + // set as we do not need to update those mapping. + int ret = 0; + for (const auto& [topic_name, topic] : old_topics) { + auto it = current_topics.find(topic_name); + if (it == current_topics.end()) { + const auto op_ret = driver->update_bucket_topic_mapping( + topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name), + /*add_mapping=*/false, null_yield, dpp); + if (op_ret < 0) { + ret = op_ret; + } + } else { + // already that attr is present, so do not update the mapping. + current_topics.erase(it); + } + } + // traverse thru current topics and check if they are any present, then add + // the mapping. + for (const auto& [topic_name, topic] : current_topics) { + const auto op_ret = driver->update_bucket_topic_mapping( + topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name), + /*add_mapping=*/true, null_yield, dpp); + if (op_ret < 0) { + ret = op_ret; + } + } + return ret; +} + static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) { char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; @@ -2443,7 +2507,14 @@ public: if (ret < 0 && ret != -ENOENT) return ret; - return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp); + ret = svc.bucket->remove_bucket_instance_info( + ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp); + if (ret < 0) + return ret; + ret = update_bucket_topic_mappings(dpp, &bci, /*current_bci=*/nullptr, + driver); + // update_bucket_topic_mapping error is swallowed. + return 0; } int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) { @@ -2648,6 +2719,21 @@ int RGWMetadataHandlerPut_BucketInstance::put_post(const DoutPrefixProvider *dpp } } /* update lc */ + /* update bucket topic mapping */ + { + auto* orig_obj = static_cast<RGWBucketInstanceMetadataObject*>(old_obj); + auto* orig_bci = (orig_obj ? &orig_obj->get_bci() : nullptr); + ret = update_bucket_topic_mappings(dpp, orig_bci, &bci, bihandler->driver); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ + << " failed to apply bucket topic mapping for " + << bci.info.bucket.name << dendl; + return ret; + } + ldpp_dout(dpp, 20) << __func__ + << " successfully applied bucket topic mapping for " + << bci.info.bucket.name << dendl; + } return STATUS_APPLIED; } diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index e5d5dd602f5..e3ee61ba9df 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -494,7 +494,8 @@ private: RGWPubSub ps(&rados_store, tenant_name); rgw_pubsub_topic topic; - auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield)); + auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, + optional_yield(io_context, yield), nullptr); if (ret_of_get_topic < 0) { // we can't migrate entries without topic info ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: " @@ -1056,7 +1057,7 @@ static inline bool notification_match(reservation_t& res, const RGWPubSub ps( res.store, res.user_tenant, &res.store->svc()->zone->get_current_period().get_map().zonegroups); - auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield); + auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr); if (ret < 0) { ldpp_dout(res.dpp, 1) << "INFO: failed to load topic: " << topic_cfg.name diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 5e111103c1c..b8fcfa2f368 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1211,6 +1211,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) if (ret < 0) return ret; + ret = open_topics_pool_ctx(dpp); + if (ret < 0) + return ret; + pools_initialized = true; if (use_gc) { @@ -1446,6 +1450,12 @@ int RGWRados::open_notif_pool_ctx(const DoutPrefixProvider *dpp) return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().notif_pool, notif_pool_ctx, true, true); } +int RGWRados::open_topics_pool_ctx(const DoutPrefixProvider* dpp) { + return rgw_init_ioctx(dpp, get_rados_handle(), + svc.zone->get_zone_params().topics_pool, + topics_pool_ctx, true, true); +} + int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx, bool mostly_omap, bool bulk) { diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 264f5eb4f33..7e7a58480a6 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -358,6 +358,7 @@ class RGWRados int open_objexp_pool_ctx(const DoutPrefixProvider *dpp); int open_reshard_pool_ctx(const DoutPrefixProvider *dpp); int open_notif_pool_ctx(const DoutPrefixProvider *dpp); + int open_topics_pool_ctx(const DoutPrefixProvider* dpp); int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx, bool mostly_omap, bool bulk); @@ -447,6 +448,7 @@ protected: librados::IoCtx objexp_pool_ctx; librados::IoCtx reshard_pool_ctx; librados::IoCtx notif_pool_ctx; // .rgw.notif + librados::IoCtx topics_pool_ctx; // .rgw.meta:topics bool pools_initialized{false}; @@ -533,6 +535,8 @@ public: return notif_pool_ctx; } + librados::IoCtx& get_topics_pool_ctx() { return topics_pool_ctx; } + void set_context(CephContext *_cct) { cct = _cct; } diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 608800c7e57..2bde7d192e5 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -325,6 +325,30 @@ int RadosBucket::remove(const DoutPrefixProvider* dpp, this, get_attrs(), merge_attrs); } + // remove bucket-topic mapping + auto iter = get_attrs().find(RGW_ATTR_BUCKET_NOTIFICATION); + if (iter != get_attrs().end()) { + rgw_pubsub_bucket_topics bucket_topics; + try { + const auto& bl = iter->second; + auto biter = bl.cbegin(); + bucket_topics.decode(biter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "ERROR: failed to decode bucket topics for bucket: " + << get_name() << dendl; + } + if (!bucket_topics.topics.empty()) { + ret = store->remove_bucket_mapping_from_topics( + bucket_topics, rgw_make_bucket_entry_name(get_tenant(), get_name()), + y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: unable to remove notifications from bucket " + << get_name() << ". ret=" << ret << dendl; + } + } + } + ret = store->ctl()->bucket->sync_user_stats(dpp, info.owner, info, y, nullptr); if (ret < 0) { ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl; @@ -1172,6 +1196,114 @@ int RadosStore::remove_topic_v2(const std::string& topic_name, params, objv_tracker, y, dpp); } +int RadosStore::remove_bucket_mapping_from_topics( + const rgw_pubsub_bucket_topics& bucket_topics, + const std::string& bucket_key, + optional_yield y, + const DoutPrefixProvider* dpp) { + // remove the bucket name from the topic-bucket omap for each topic + // subscribed. + std::unordered_set<std::string> topics_mapping_to_remove; + int ret = 0; + for (const auto& [_, topic_filter] : bucket_topics.topics) { + if (!topics_mapping_to_remove.insert(topic_filter.topic.name).second) { + continue; // already removed. + } + int op_ret = update_bucket_topic_mapping(topic_filter.topic, bucket_key, + /*add_mapping=*/false, y, dpp); + if (op_ret < 0) { + ret = op_ret; + } + } + return ret; +} + +int RadosStore::update_bucket_topic_mapping(const rgw_pubsub_topic& topic, + const std::string& bucket_key, + bool add_mapping, + optional_yield y, + const DoutPrefixProvider* dpp) { + bufferlist empty_bl; + librados::ObjectWriteOperation op; + int ret = 0; + if (add_mapping) { + std::map<std::string, bufferlist> mapping{{bucket_key, empty_bl}}; + op.omap_set(mapping); + } else { + std::set<std::string> to_rm{{bucket_key}}; + op.omap_rm_keys(to_rm); + } + ret = rgw_rados_operate(dpp, rados->get_topics_pool_ctx(), + get_bucket_topic_mapping_oid(topic), &op, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to " << (add_mapping ? "add" : "remove") + << " topic bucket mapping for bucket: " << bucket_key + << " and topic: " << topic.name << " with ret:" << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) << "Successfully " << (add_mapping ? "added" : "removed") + << " topic bucket mapping for bucket: " << bucket_key + << " and topic: " << topic.name << dendl; + return ret; +} + +int RadosStore::get_bucket_topic_mapping(const rgw_pubsub_topic& topic, + std::set<std::string>& bucket_keys, + optional_yield y, + const DoutPrefixProvider* dpp) { + constexpr auto max_chunk = 1024U; + std::string start_after; + bool more = true; + int rval; + while (more) { + librados::ObjectReadOperation op; + std::set<std::string> curr_keys; + op.omap_get_keys2(start_after, max_chunk, &curr_keys, &more, &rval); + const auto ret = + rgw_rados_operate(dpp, rados->get_topics_pool_ctx(), + get_bucket_topic_mapping_oid(topic), &op, nullptr, y); + if (ret == -ENOENT) { + // mapping object was not created - nothing to do + return 0; + } + if (ret < 0) { + // TODO: do we need to check on rval as well as ret? + ldpp_dout(dpp, 1) + << "ERROR: failed to read bucket topic mapping object for topic: " + << topic.name << ", ret= " << ret << dendl; + return ret; + } + if (more) { + if (curr_keys.empty()) { + return -EINVAL; // something wrong. + } + start_after = *curr_keys.rbegin(); + } + bucket_keys.merge(curr_keys); + } + return 0; +} + +int RadosStore::delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, + optional_yield y, + const DoutPrefixProvider* dpp) { + librados::ObjectWriteOperation op; + op.remove(); + const int ret = + rgw_rados_operate(dpp, rados->get_topics_pool_ctx(), + get_bucket_topic_mapping_oid(topic), &op, y); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 1) + << "ERROR: failed removing bucket topic mapping omap for topic: " + << topic.name << ", ret=" << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) + << "Successfully deleted topic bucket mapping omap for topic: " + << topic.name << dendl; + return 0; +} + int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, optional_yield y) { return rados->delete_raw_obj(dpp, obj, y); diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 33db603d785..85612eec1a9 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -184,6 +184,23 @@ class RadosStore : public StoreDriver { RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override; + int update_bucket_topic_mapping(const rgw_pubsub_topic& topic, + const std::string& bucket_key, + bool add_mapping, + optional_yield y, + const DoutPrefixProvider* dpp) override; + int remove_bucket_mapping_from_topics( + const rgw_pubsub_bucket_topics& bucket_topics, + const std::string& bucket_key, + optional_yield y, + const DoutPrefixProvider* dpp) override; + int get_bucket_topic_mapping(const rgw_pubsub_topic& topic, + std::set<std::string>& bucket_keys, + optional_yield y, + const DoutPrefixProvider* dpp) override; + int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, + optional_yield y, + const DoutPrefixProvider* dpp) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index afb161c90ae..bbd9231893f 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1168,6 +1168,15 @@ static void show_reshard_status( formatter->flush(cout); } +static void show_topics_info_v2(const rgw_pubsub_topic& topic, + std::set<std::string> subscribed_buckets, + Formatter* formatter) { + formatter->open_object_section("topic"); + topic.dump(formatter); + encode_json("subscribed_buckets", subscribed_buckets, formatter); + formatter->close_section(); +} + class StoreDestructor { rgw::sal::Driver* driver; public: @@ -10665,7 +10674,24 @@ next: } } } - encode_json("result", result, formatter.get()); + if (driver->get_zone()->get_zonegroup().supports_feature( + rgw::zone_features::notification_v2)) { + Formatter::ObjectSection top_section(*formatter, "result"); + Formatter::ArraySection s(*formatter, "topics"); + for (const auto& [_, topic] : result.topics) { + std::set<std::string> subscribed_buckets; + ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets, + null_yield, dpp()); + if (ret < 0) { + cerr << "failed to fetch bucket topic mapping info for topic: " + << topic.name << ", ret=" << ret << std::endl; + } else { + show_topics_info_v2(topic, subscribed_buckets, formatter.get()); + } + } + } else { + encode_json("result", result, formatter.get()); + } formatter->flush(cout); } @@ -10683,12 +10709,19 @@ next: RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups); rgw_pubsub_topic topic; - ret = ps.get_topic(dpp(), topic_name, topic, null_yield); + std::set<std::string> subscribed_buckets; + ret = + ps.get_topic(dpp(), topic_name, topic, null_yield, &subscribed_buckets); if (ret < 0) { cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl; return -ret; } - encode_json("topic", topic, formatter.get()); + if (driver->get_zone()->get_zonegroup().supports_feature( + rgw::zone_features::notification_v2)) { + show_topics_info_v2(topic, subscribed_buckets, formatter.get()); + } else { + encode_json("topic", topic, formatter.get()); + } formatter->flush(cout); } diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 628f57901af..dec38ee87e8 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -9,7 +9,9 @@ #include "rgw_xml.h" #include "rgw_arn.h" #include "rgw_pubsub_push.h" +#include "rgw_bucket.h" #include "common/errno.h" +#include "svc_topic_rados.h" #include <regex> #include <algorithm> @@ -369,6 +371,7 @@ void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const encode_xml_key_value_entry("TopicArn", arn, f); encode_xml_key_value_entry("OpaqueData", opaque_data, f); encode_xml_key_value_entry("Policy", policy_text, f); + std::ostringstream stream; f->close_section(); // Attributes } @@ -526,15 +529,20 @@ int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& res << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; break; } - for (auto& topic_name : topics) { - rgw_pubsub_topic topic; - int ret = get_topic(dpp, topic_name, topic, y); - if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name - << "' info: ret=" << ret << dendl; - continue; - } - result.topics[topic_name] = std::move(topic); + for (auto& topic_entry : topics) { + std::string topic_name; + std::string topic_tenant; + parse_topic_entry(topic_entry, &topic_tenant, &topic_name); + if (tenant != topic_tenant) { + continue; + } + rgw_pubsub_topic topic; + const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr); + if (op_ret < 0) { + ret = op_ret; + continue; + } + result.topics[topic_name] = std::move(topic); } } while (truncated); driver->meta_list_keys_complete(handle); @@ -583,13 +591,26 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub return 0; } -int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const -{ +int RGWPubSub::get_topic(const DoutPrefixProvider* dpp, + const std::string& name, + rgw_pubsub_topic& result, + optional_yield y, + std::set<std::string>* subscribed_buckets) const { if (use_notification_v2) { - const int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp); + int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp); if (ret < 0) { ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name << " tenant: " << tenant << ", ret=" << ret << dendl; + return ret; + } + if (subscribed_buckets) { + ret = + driver->get_bucket_topic_mapping(result, *subscribed_buckets, y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "failed to fetch bucket topic mapping info for topic: " << name + << " tenant: " << tenant << ", ret=" << ret << dendl; + } } return ret; } @@ -656,19 +677,56 @@ std::optional<rgw_pubsub_topic_filter> find_unique_topic( return std::nullopt; } -int delete_all_notifications(const DoutPrefixProvider* dpp, - const rgw_pubsub_bucket_topics& bucket_topics, - std::map<std::string, bufferlist>& attrs, - rgw::sal::Bucket* bucket, - rgw::sal::Driver* driver, - optional_yield y) { +int store_bucket_attrs_and_update_mapping( + const DoutPrefixProvider* dpp, + rgw::sal::Driver* driver, + rgw::sal::Bucket* bucket, + rgw_pubsub_bucket_topics& bucket_topics, + const rgw_pubsub_topic& topic, + optional_yield y) { + rgw::sal::Attrs& attrs = bucket->get_attrs(); + if (!bucket_topics.topics.empty()) { + bufferlist bl; + bucket_topics.encode(bl); + attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl); + } else { + auto it = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); + if (it != attrs.end()) { + attrs.erase(it); + } + } + auto ret = bucket->merge_and_store_attrs(dpp, attrs, y); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket=" + << bucket->get_name() << " returned err= " << ret << dendl; + return ret; + } + if (bucket_topics.topics.empty()) { + // remove the bucket name from the topic-bucket omap + auto op_ret = driver->update_bucket_topic_mapping( + topic, + rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()), + /*add_mapping=*/false, y, dpp); + if (op_ret < 0) { + // TODO: should the error be reported, as attrs are already deleted. + // ret = op_ret; + } + } + return ret; +} + +int delete_notification_attrs(const DoutPrefixProvider* dpp, + rgw::sal::Bucket* bucket, + optional_yield y) { + auto& attrs = bucket->get_attrs(); auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); if (iter == attrs.end()) { return 0; } // delete all notifications of on a bucket attrs.erase(iter); - const auto ret = bucket->merge_and_store_attrs(dpp, attrs, y); + auto ret = bucket->merge_and_store_attrs(dpp, attrs, y); if (ret < 0) { ldpp_dout(dpp, 1) << "Failed to remove RGW_ATTR_BUCKET_NOTIFICATION attr on bucket=" @@ -691,11 +749,23 @@ int remove_notification_v2(const DoutPrefixProvider* dpp, if (bucket_topics.topics.empty()) { return 0; } - rgw::sal::Attrs& attrs = bucket->get_attrs(); + // delete all notifications if (notification_id.empty()) { - return delete_all_notifications(dpp, bucket_topics, attrs, bucket, driver, - y); + ret = delete_notification_attrs(dpp, bucket, y); + if (ret < 0) { + return ret; + } + int op_ret = driver->remove_bucket_mapping_from_topics( + bucket_topics, + rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()), y, + dpp); + if (op_ret < 0) { + // TODO: should the error be reported, as attrs are already deleted. + // ret = op_ret; + } + return ret; } + // delete a specific notification const auto unique_topic = find_unique_topic(bucket_topics, notification_id); if (!unique_topic) { @@ -706,23 +776,15 @@ int remove_notification_v2(const DoutPrefixProvider* dpp, } const auto& topic_name = unique_topic->topic.name; bucket_topics.topics.erase(topic_to_unique(topic_name, notification_id)); - bufferlist bl; - bucket_topics.encode(bl); - attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl); - ret = bucket->merge_and_store_attrs(dpp, attrs, y); - if (ret < 0) { - ldpp_dout(dpp, 1) - << "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket=" - << bucket->get_name() << " returned err= " << ret << dendl; - } - return ret; + return store_bucket_attrs_and_update_mapping( + dpp, driver, bucket, bucket_topics, unique_topic->topic, y); } int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const { rgw_pubsub_topic topic_info; - int ret = ps.get_topic(dpp, topic_name, topic_info, y); + int ret = ps.get_topic(dpp, topic_name, topic_info, y, nullptr); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; return ret; @@ -908,7 +970,7 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp, optional_yield y) const { RGWObjVersionTracker objv_tracker; rgw_pubsub_topic topic; - int ret = get_topic(dpp, name, topic, y); + int ret = get_topic(dpp, name, topic, y, nullptr); if (ret < 0 && ret != -ENOENT) { return ret; } else if (ret == -ENOENT) { @@ -922,7 +984,9 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp, if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to remove topic info: ret=" << ret << dendl; + return ret; } + ret = driver->delete_bucket_topic_mapping(topic, y, dpp); return ret; } diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 46da8e045a9..8509d86e225 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -602,19 +602,13 @@ public: int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const { return read_topics(dpp, result, nullptr, y); } - // get a bucket_topic with by its name and populate it into "result" - // return -ENOENT if the topic does not exists - // return 0 on success, error code otherwise - int get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, rgw_pubsub_topic_filter& result, optional_yield y) const; // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket // assigning a notification name is optional (needed for S3 compatible notifications) // if the topic already exist on the bucket, the filter event list may be updated // for S3 compliant notifications the version with: s3_filter and notif_name should be used // return -ENOENT if the topic does not exists // return 0 on success, error code otherwise - int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, - const rgw::notify::EventTypeList& events, optional_yield y) const; - int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, + int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const; // remove a topic and filter from bucket // if the topic does not exists on the bucket it is a no-op (considered success) @@ -633,9 +627,15 @@ public: return read_topics(dpp, result, nullptr, y); } // get a topic with by its name and populate it into "result" - // return -ENOENT if the topic does not exists - // return 0 on success, error code otherwise - int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const; + // return -ENOENT if the topic does not exists + // return 0 on success, error code otherwise. + // if |subscribed_buckets| valid, then for notification_v2 read the bucket + // topic mapping object. + int get_topic(const DoutPrefixProvider* dpp, + const std::string& name, + rgw_pubsub_topic& result, + optional_yield y, + std::set<std::string>* subscribed_buckets) const; // create a topic with a name only // if the topic already exists it is a no-op (considered success) // return 0 on success, error code otherwise diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index f2e5439208f..38943736e45 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -200,7 +200,7 @@ class RGWPSCreateTopicOp : public RGWOp { const RGWPubSub ps(driver, s->owner.id.tenant, &s->penv.site->get_period()->get_map().zonegroups); rgw_pubsub_topic result; - ret = ps.get_topic(this, topic_name, result, y); + ret = ps.get_topic(this, topic_name, result, y, nullptr); if (ret == -ENOENT) { // topic not present return 0; @@ -424,7 +424,7 @@ void RGWPSGetTopicOp::execute(optional_yield y) { } const RGWPubSub ps(driver, s->owner.id.tenant, &s->penv.site->get_period()->get_map().zonegroups); - op_ret = ps.get_topic(this, topic_name, result, y); + op_ret = ps.get_topic(this, topic_name, result, y, nullptr); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -509,7 +509,7 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) { } const RGWPubSub ps(driver, s->owner.id.tenant, &s->penv.site->get_period()->get_map().zonegroups); - op_ret = ps.get_topic(this, topic_name, result, y); + op_ret = ps.get_topic(this, topic_name, result, y, nullptr); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -638,7 +638,7 @@ class RGWPSSetTopicAttributesOp : public RGWOp { rgw_pubsub_topic result; const RGWPubSub ps(driver, s->owner.id.tenant, &s->penv.site->get_period()->get_map().zonegroups); - ret = ps.get_topic(this, topic_name, result, y); + ret = ps.get_topic(this, topic_name, result, y, nullptr); if (ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << ret << dendl; @@ -801,7 +801,7 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) { &s->penv.site->get_period()->get_map().zonegroups); rgw_pubsub_topic result; - op_ret = ps.get_topic(this, topic_name, result, y); + op_ret = ps.get_topic(this, topic_name, result, y, nullptr); if (op_ret == 0) { op_ret = verify_topic_owner_or_policy( s, result, driver->get_zone()->get_zonegroup().get_name(), @@ -1071,8 +1071,8 @@ void RGWPSCreateNotifOp::execute(optional_yield y) { const auto topic_name = arn->resource; // get topic information. destination information is stored in the topic - rgw_pubsub_topic topic_info; - op_ret = ps.get_topic(this, topic_name, topic_info, y); + rgw_pubsub_topic topic_info; + op_ret = ps.get_topic(this, topic_name, topic_info, y, nullptr); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; @@ -1177,7 +1177,6 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) { const RGWPubSub ps(driver, s->owner.id.tenant, &s->penv.site->get_period()->get_map().zonegroups); std::unordered_map<std::string, rgw_pubsub_topic> topics; - const auto rgwbucket = rgw_bucket(s->bucket_tenant, s->bucket_name, ""); for (const auto& c : configurations.list) { const auto& notif_name = c.id; if (notif_name.empty()) { @@ -1212,7 +1211,7 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) { if (!topics.contains(topic_name)) { // get topic information. destination information is stored in the topic rgw_pubsub_topic topic_info; - op_ret = ps.get_topic(this, topic_name, topic_info, y); + op_ret = ps.get_topic(this, topic_name, topic_info, y,nullptr); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; @@ -1248,6 +1247,18 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) { << bucket->get_name() << " returned err= " << op_ret << dendl; return; } + for (const auto& [_, topic] : topics) { + const auto ret = driver->update_bucket_topic_mapping( + topic, + rgw_make_bucket_entry_name(bucket->get_tenant(), bucket->get_name()), + /*add_mapping=*/true, y, this); + if (ret < 0) { + ldpp_dout(this, 1) << "Failed to remove topic mapping on bucket=" + << bucket->get_name() << " ret= " << ret << dendl; + // error should be reported ?? + // op_ret = ret; + } + } ldpp_dout(this, 20) << "successfully created bucket notification for bucket: " << bucket->get_name() << dendl; } diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 89ac23341d2..4dc3c9dc836 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -332,6 +332,34 @@ class Driver { RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) = 0; + /** Update the bucket-topic mapping in the store, if |add_mapping|=true then + * adding the |bucket_key| |topic| mapping to store, else delete the + * |bucket_key| |topic| mapping from the store. The |bucket_key| is + * in the format |tenant_name + "/" + bucket_name| if tenant is not empty + * else |bucket_name|*/ + virtual int update_bucket_topic_mapping(const rgw_pubsub_topic& topic, + const std::string& bucket_key, + bool add_mapping, + optional_yield y, + const DoutPrefixProvider* dpp) = 0; + /** Remove the |bucket_key| from bucket-topic mapping in the store, for all + the topics under |bucket_topics|*/ + virtual int remove_bucket_mapping_from_topics( + const rgw_pubsub_bucket_topics& bucket_topics, + const std::string& bucket_key, + optional_yield y, + const DoutPrefixProvider* dpp) = 0; + /** Get the bucket-topic mapping from the backend store. The |bucket_keys| + * are in the format |tenant_name + "/" + bucket_name| if tenant is not + * empty else |bucket_name|*/ + virtual int get_bucket_topic_mapping(const rgw_pubsub_topic& topic, + std::set<std::string>& bucket_keys, + optional_yield y, + const DoutPrefixProvider* dpp) = 0; + /** Remove the bucket-topic mapping from the backend store. */ + virtual int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, + optional_yield y, + const DoutPrefixProvider* dpp) = 0; /** Get access to the lifecycle management thread */ virtual RGWLC* get_rgwlc(void) = 0; /** Get access to the coroutine registry. Used to create new coroutine managers */ diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 71991378e32..d5bf9afe248 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -218,7 +218,33 @@ public: const DoutPrefixProvider* dpp) override { return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp); } - + int update_bucket_topic_mapping(const rgw_pubsub_topic& topic, + const std::string& bucket_key, + bool add_mapping, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return next->update_bucket_topic_mapping(topic, bucket_key, add_mapping, y, + dpp); + } + int remove_bucket_mapping_from_topics( + const rgw_pubsub_bucket_topics& bucket_topics, + const std::string& bucket_key, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return next->remove_bucket_mapping_from_topics(bucket_topics, bucket_key, y, + dpp); + } + int get_bucket_topic_mapping(const rgw_pubsub_topic& topic, + std::set<std::string>& bucket_keys, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp); + } + int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return next->delete_bucket_topic_mapping(topic, y, dpp); + } virtual RGWLC* get_rgwlc(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override; diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index eda0f08ede6..b34276a9daa 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -46,14 +46,39 @@ class StoreDriver : public Driver { RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override { - return -ENOENT; + return -EOPNOTSUPP; } int remove_topic_v2(const std::string& topic_name, const std::string& tenant, RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider* dpp) override { - return -ENOENT; + return -EOPNOTSUPP; + } + int update_bucket_topic_mapping(const rgw_pubsub_topic& topic, + const std::string& bucket_key, + bool add_mapping, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return -EOPNOTSUPP; + } + int remove_bucket_mapping_from_topics( + const rgw_pubsub_bucket_topics& bucket_topics, + const std::string& bucket_key, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return -EOPNOTSUPP; + } + int get_bucket_topic_mapping(const rgw_pubsub_topic& topic, + std::set<std::string>& bucket_keys, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return -EOPNOTSUPP; + } + int delete_bucket_topic_mapping(const rgw_pubsub_topic& topic, + optional_yield y, + const DoutPrefixProvider* dpp) override { + return -EOPNOTSUPP; } }; diff --git a/src/rgw/services/svc_topic_rados.cc b/src/rgw/services/svc_topic_rados.cc index 64c9106776c..9e0b75d4664 100644 --- a/src/rgw/services/svc_topic_rados.cc +++ b/src/rgw/services/svc_topic_rados.cc @@ -10,6 +10,7 @@ static std::string topic_oid_prefix = "topic."; static constexpr char topic_tenant_delim[] = ":"; +static std::string bucket_topic_oid_prefix = "buckets."; std::string get_topic_key(const std::string& topic_name, const std::string& tenant) { @@ -32,6 +33,11 @@ void parse_topic_entry(const std::string& topic_entry, *topic_name = topic_entry; } } + +std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic) { + return bucket_topic_oid_prefix + get_topic_key(topic.name, topic.user.tenant); +} + class RGWSI_Topic_Module : public RGWSI_MBSObj_Handler_Module { RGWSI_Topic_RADOS::Svc& svc; const std::string prefix; @@ -131,7 +137,7 @@ int RGWTopicMetadataHandler::do_get(RGWSI_MetaBackend_Handler::Op* op, parse_topic_entry(entry, &tenant, &topic_name); RGWPubSub ps(driver, tenant, &topic_svc->svc.zone->get_current_period().get_map().zonegroups); - int ret = ps.get_topic(dpp, topic_name, result, y); + int ret = ps.get_topic(dpp, topic_name, result, y, nullptr); if (ret < 0) { return ret; } diff --git a/src/rgw/services/svc_topic_rados.h b/src/rgw/services/svc_topic_rados.h index e630a610e97..bc4e3537345 100644 --- a/src/rgw/services/svc_topic_rados.h +++ b/src/rgw/services/svc_topic_rados.h @@ -94,3 +94,5 @@ std::string get_topic_key(const std::string& topic_name, void parse_topic_entry(const std::string& topic_entry, std::string* tenant_name, std::string* topic_name); + +std::string get_bucket_topic_mapping_oid(const rgw_pubsub_topic& topic);
\ No newline at end of file |