summaryrefslogtreecommitdiffstats
path: root/src/rgw/driver/rados/rgw_cr_rados.cc
diff options
context:
space:
mode:
authorkchheda3 <kchheda3@bloomberg.net>2024-02-27 21:11:48 +0100
committerkchheda3 <kchheda3@bloomberg.net>2024-03-14 21:09:26 +0100
commit37069ac96b53d2d4927b9d6f743790847861ec8e (patch)
tree9a9602107f5fb06c90d82ba75035f30b9c2dd68d /src/rgw/driver/rados/rgw_cr_rados.cc
parentrgw/notification: Make the Replication events to be aws compatible. (diff)
downloadceph-37069ac96b53d2d4927b9d6f743790847861ec8e.tar.xz
ceph-37069ac96b53d2d4927b9d6f743790847861ec8e.zip
rgw/notification: Support generating multisite sync delete events.
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
Diffstat (limited to 'src/rgw/driver/rados/rgw_cr_rados.cc')
-rw-r--r--src/rgw/driver/rados/rgw_cr_rados.cc107
1 files changed, 57 insertions, 50 deletions
diff --git a/src/rgw/driver/rados/rgw_cr_rados.cc b/src/rgw/driver/rados/rgw_cr_rados.cc
index 7e4164ff7dd..a717b971a89 100644
--- a/src/rgw/driver/rados/rgw_cr_rados.cc
+++ b/src/rgw/driver/rados/rgw_cr_rados.cc
@@ -717,6 +717,55 @@ int RGWRadosBILogTrimCR::request_complete()
return r;
}
+int send_sync_notification(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* store,
+ rgw::sal::Bucket* bucket,
+ rgw::sal::Object* obj,
+ const rgw::sal::Attrs& attrs,
+ uint64_t obj_size,
+ const rgw::notify::EventTypeList& event_types) {
+ // send notification that object was successfully synced
+ std::string user_id = "rgw sync";
+ std::string req_id = "0";
+
+ RGWObjTags obj_tags;
+ auto iter = attrs.find(RGW_ATTR_TAGS);
+ if (iter != attrs.end()) {
+ try {
+ auto it = iter->second.cbegin();
+ obj_tags.decode(it);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "ERROR: " << __func__
+ << ": caught buffer::error couldn't decode TagSet "
+ << dendl;
+ return -EIO;
+ }
+ }
+ rgw::notify::reservation_t notify_res(dpp, store, obj, nullptr, bucket,
+ user_id, bucket->get_tenant(), req_id,
+ null_yield);
+ int ret = rgw::notify::publish_reserve(dpp, *store->svc()->site, event_types,
+ notify_res, &obj_tags);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: "
+ << ret << dendl;
+ } else {
+ std::string etag;
+ const auto iter = attrs.find(RGW_ATTR_ETAG);
+ if (iter != attrs.end()) {
+ etag = iter->second.to_str();
+ }
+ ret =
+ rgw::notify::publish_commit(obj, obj_size, ceph::real_clock::now(),
+ etag, obj->get_instance(), notify_res, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: "
+ << ret << dendl;
+ }
+ }
+ return ret;
+}
+
int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
{
RGWObjectCtx obj_ctx(store);
@@ -784,56 +833,9 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
} else {
// r >= 0
if (bytes_transferred) {
- // send notification that object was successfully synced
- std::string user_id = "rgw sync";
- std::string req_id = "0";
-
- RGWObjTags obj_tags;
- auto iter = attrs.find(RGW_ATTR_TAGS);
- if (iter != attrs.end()) {
- try {
- auto it = iter->second.cbegin();
- obj_tags.decode(it);
- } catch (buffer::error& err) {
- ldpp_dout(dpp, 1)
- << "ERROR: " << __func__
- << ": caught buffer::error couldn't decode TagSet " << dendl;
- }
- }
-
- // NOTE: we create a mutable copy of bucket.get_tenant as the
- // get_notification function expects a std::string&, not const
- std::string tenant(dest_bucket.get_tenant());
-
- std::unique_ptr<rgw::sal::Notification> notify =
- store->get_notification(dpp, &dest_obj, nullptr,
- {rgw::notify::ObjectSyncedCreate,
- rgw::notify::ReplicationCreate},
- &dest_bucket, user_id, tenant, req_id,
- null_yield);
-
- auto notify_res =
- static_cast<rgw::sal::RadosNotification*>(notify.get())
- ->get_reservation();
- int ret = rgw::notify::publish_reserve(
- dpp, *store->svc()->site,
- {rgw::notify::ObjectSyncedCreate, rgw::notify::ReplicationCreate},
- notify_res, &obj_tags);
- if (ret < 0) {
- ldpp_dout(dpp, 1)
- << "ERROR: reserving notification failed, with error: " << ret
- << dendl;
- // no need to return, the sync already happened
- } else {
- ret = rgw::notify::publish_commit(
- &dest_obj, *bytes_transferred, ceph::real_clock::now(), etag,
- dest_obj.get_instance(), notify_res, dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 1)
- << "ERROR: publishing notification failed, with error: " << ret
- << dendl;
- }
- }
+ send_sync_notification(
+ dpp, store, &dest_bucket, &dest_obj, attrs, *bytes_transferred,
+ {rgw::notify::ObjectSyncedCreate, rgw::notify::ReplicationCreate});
}
if (counters) {
@@ -940,6 +942,11 @@ int RGWAsyncRemoveObj::_send_request(const DoutPrefixProvider *dpp)
ret = del_op->delete_obj(dpp, null_yield, true);
if (ret < 0) {
ldpp_dout(dpp, 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
+ } else {
+ send_sync_notification(
+ dpp, store, bucket.get(), obj.get(), obj->get_attrs(),
+ obj->get_obj_size(),
+ {rgw::notify::ObjectSyncedDelete, rgw::notify::ReplicationDelete});
}
return ret;
}