diff options
author | kchheda3 <kchheda3@bloomberg.net> | 2024-02-27 21:11:48 +0100 |
---|---|---|
committer | kchheda3 <kchheda3@bloomberg.net> | 2024-03-14 21:09:26 +0100 |
commit | 37069ac96b53d2d4927b9d6f743790847861ec8e (patch) | |
tree | 9a9602107f5fb06c90d82ba75035f30b9c2dd68d /src/rgw/driver/rados/rgw_cr_rados.cc | |
parent | rgw/notification: Make the Replication events to be aws compatible. (diff) | |
download | ceph-37069ac96b53d2d4927b9d6f743790847861ec8e.tar.xz ceph-37069ac96b53d2d4927b9d6f743790847861ec8e.zip |
rgw/notification: Support generating multisite sync delete events.
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
Diffstat (limited to 'src/rgw/driver/rados/rgw_cr_rados.cc')
-rw-r--r-- | src/rgw/driver/rados/rgw_cr_rados.cc | 107 |
1 files changed, 57 insertions, 50 deletions
diff --git a/src/rgw/driver/rados/rgw_cr_rados.cc b/src/rgw/driver/rados/rgw_cr_rados.cc index 7e4164ff7dd..a717b971a89 100644 --- a/src/rgw/driver/rados/rgw_cr_rados.cc +++ b/src/rgw/driver/rados/rgw_cr_rados.cc @@ -717,6 +717,55 @@ int RGWRadosBILogTrimCR::request_complete() return r; } +int send_sync_notification(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + rgw::sal::Bucket* bucket, + rgw::sal::Object* obj, + const rgw::sal::Attrs& attrs, + uint64_t obj_size, + const rgw::notify::EventTypeList& event_types) { + // send notification that object was successfully synced + std::string user_id = "rgw sync"; + std::string req_id = "0"; + + RGWObjTags obj_tags; + auto iter = attrs.find(RGW_ATTR_TAGS); + if (iter != attrs.end()) { + try { + auto it = iter->second.cbegin(); + obj_tags.decode(it); + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "ERROR: " << __func__ + << ": caught buffer::error couldn't decode TagSet " + << dendl; + return -EIO; + } + } + rgw::notify::reservation_t notify_res(dpp, store, obj, nullptr, bucket, + user_id, bucket->get_tenant(), req_id, + null_yield); + int ret = rgw::notify::publish_reserve(dpp, *store->svc()->site, event_types, + notify_res, &obj_tags); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " + << ret << dendl; + } else { + std::string etag; + const auto iter = attrs.find(RGW_ATTR_ETAG); + if (iter != attrs.end()) { + etag = iter->second.to_str(); + } + ret = + rgw::notify::publish_commit(obj, obj_size, ceph::real_clock::now(), + etag, obj->get_instance(), notify_res, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: " + << ret << dendl; + } + } + return ret; +} + int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp) { RGWObjectCtx obj_ctx(store); @@ -784,56 +833,9 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp) } else { // r >= 0 if (bytes_transferred) { - // send notification that object was successfully synced - std::string user_id = "rgw sync"; - std::string req_id = "0"; - - RGWObjTags obj_tags; - auto iter = attrs.find(RGW_ATTR_TAGS); - if (iter != attrs.end()) { - try { - auto it = iter->second.cbegin(); - obj_tags.decode(it); - } catch (buffer::error& err) { - ldpp_dout(dpp, 1) - << "ERROR: " << __func__ - << ": caught buffer::error couldn't decode TagSet " << dendl; - } - } - - // NOTE: we create a mutable copy of bucket.get_tenant as the - // get_notification function expects a std::string&, not const - std::string tenant(dest_bucket.get_tenant()); - - std::unique_ptr<rgw::sal::Notification> notify = - store->get_notification(dpp, &dest_obj, nullptr, - {rgw::notify::ObjectSyncedCreate, - rgw::notify::ReplicationCreate}, - &dest_bucket, user_id, tenant, req_id, - null_yield); - - auto notify_res = - static_cast<rgw::sal::RadosNotification*>(notify.get()) - ->get_reservation(); - int ret = rgw::notify::publish_reserve( - dpp, *store->svc()->site, - {rgw::notify::ObjectSyncedCreate, rgw::notify::ReplicationCreate}, - notify_res, &obj_tags); - if (ret < 0) { - ldpp_dout(dpp, 1) - << "ERROR: reserving notification failed, with error: " << ret - << dendl; - // no need to return, the sync already happened - } else { - ret = rgw::notify::publish_commit( - &dest_obj, *bytes_transferred, ceph::real_clock::now(), etag, - dest_obj.get_instance(), notify_res, dpp); - if (ret < 0) { - ldpp_dout(dpp, 1) - << "ERROR: publishing notification failed, with error: " << ret - << dendl; - } - } + send_sync_notification( + dpp, store, &dest_bucket, &dest_obj, attrs, *bytes_transferred, + {rgw::notify::ObjectSyncedCreate, rgw::notify::ReplicationCreate}); } if (counters) { @@ -940,6 +942,11 @@ int RGWAsyncRemoveObj::_send_request(const DoutPrefixProvider *dpp) ret = del_op->delete_obj(dpp, null_yield, true); if (ret < 0) { ldpp_dout(dpp, 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl; + } else { + send_sync_notification( + dpp, store, bucket.get(), obj.get(), obj->get_attrs(), + obj->get_obj_size(), + {rgw::notify::ObjectSyncedDelete, rgw::notify::ReplicationDelete}); } return ret; } |