summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_lc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_lc.cc')
-rw-r--r--src/rgw/rgw_lc.cc193
1 files changed, 136 insertions, 57 deletions
diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc
index ab2dda48ec7..4ae3a950ade 100644
--- a/src/rgw/rgw_lc.cc
+++ b/src/rgw/rgw_lc.cc
@@ -480,7 +480,7 @@ struct lc_op_ctx {
LCObjsLister& ol;
std::unique_ptr<rgw::sal::Object> obj;
- RGWObjectCtx rctx;
+ RGWObjectCtx octx;
const DoutPrefixProvider *dpp;
WorkQ* wq;
@@ -493,7 +493,7 @@ struct lc_op_ctx {
: cct(env.driver->ctx()), env(env), o(o), next_key_name(next_key_name),
effective_mtime(effective_mtime),
driver(env.driver), bucket(env.bucket), op(env.op), ol(env.ol),
- rctx(env.driver), dpp(dpp), wq(wq)
+ octx(env.driver), dpp(dpp), wq(wq)
{
obj = bucket->get_object(o.key);
}
@@ -519,26 +519,22 @@ static int remove_expired_obj(
auto obj_key = o.key;
auto& meta = o.meta;
int ret;
- std::string version_id;
+ auto& version_id = obj_key.instance;
std::unique_ptr<rgw::sal::Notification> notify;
+ std::unique_ptr<rgw::sal::User> user;
+ user = driver->get_user(bucket_info.owner);
+
+ /* per discussion w/Daniel, Casey,and Eric, we *do need*
+ * a new sal object handle, based on the following decision
+ * to clear obj_key.instance--which happens in the case
+ * where a delete marker should be created */
if (!remove_indeed) {
obj_key.instance.clear();
} else if (obj_key.instance.empty()) {
obj_key.instance = "null";
}
-
- std::unique_ptr<rgw::sal::User> user;
- std::unique_ptr<rgw::sal::Bucket> bucket;
- std::unique_ptr<rgw::sal::Object> obj;
-
- user = driver->get_user(bucket_info.owner);
- ret = driver->get_bucket(user.get(), bucket_info, &bucket);
- if (ret < 0) {
- return ret;
- }
-
- obj = bucket->get_object(obj_key);
+ auto obj = oc.bucket->get_object(obj_key);
RGWObjState* obj_state{nullptr};
ret = obj->get_obj_state(dpp, &obj_state, null_yield, true);
@@ -554,11 +550,10 @@ static int remove_expired_obj(
del_op->params.obj_owner.set_name(meta.owner_display_name);
del_op->params.bucket_owner.set_id(bucket_info.owner);
del_op->params.unmod_since = meta.mtime;
- del_op->params.marker_version_id = version_id;
// notification supported only for RADOS driver for now
- notify = driver->get_notification(dpp, obj.get(), nullptr, event_type,
- bucket.get(), lc_id,
+ notify = driver->get_notification(dpp, oc.obj.get(), nullptr, event_type,
+ oc.bucket, lc_id,
const_cast<std::string&>(oc.bucket->get_tenant()),
lc_req_id, null_yield);
@@ -573,13 +568,17 @@ static int remove_expired_obj(
ret = del_op->delete_obj(dpp, null_yield);
if (ret < 0) {
ldpp_dout(dpp, 1) <<
- "ERROR: publishing notification failed, with error: " << ret << dendl;
+ fmt::format("ERROR: {} failed, with error: {}", __func__, ret) << dendl;
} else {
// send request to notification manager
- (void) notify->publish_commit(dpp, obj_state->size,
- ceph::real_clock::now(),
- obj_state->attrset[RGW_ATTR_ETAG].to_str(),
- version_id);
+ ret = notify->publish_commit(dpp, obj_state->size,
+ ceph::real_clock::now(),
+ obj_state->attrset[RGW_ATTR_ETAG].to_str(),
+ version_id);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: notify publish_commit failed, with error: "
+ << ret << dendl;
+ }
}
return ret;
@@ -828,33 +827,64 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
params.ns = RGW_OBJ_NS_MULTIPART;
params.access_list_filter = &mp_filter;
- auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
+ const auto event_type = rgw::notify::ObjectExpirationAbortMPU;
+
+ auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi) {
+ int ret{0};
auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
auto& [rule, obj] = wt;
if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
rgw_obj_key key(obj.key);
std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name);
- int ret = mpu->abort(this, cct, null_yield);
+ std::unique_ptr<rgw::sal::Object> sal_obj
+ = target->get_object(key);
+ std::unique_ptr<rgw::sal::Notification> notify
+ = driver->get_notification(
+ this, sal_obj.get(), nullptr, event_type,
+ target, lc_id,
+ const_cast<std::string&>(target->get_tenant()),
+ lc_req_id, null_yield);
+ auto& version_id = obj.key.instance;
+
+ ret = notify->publish_reserve(this, nullptr);
+ if (ret < 0) {
+ ldpp_dout(wk->get_lc(), 0)
+ << "ERROR: reserving persistent notification for "
+ "abort_multipart_upload, ret="
+ << ret << ", thread:" << wq->thr_name()
+ << ", deferring mpu cleanup for meta:" << obj.key << dendl;
+ return ret;
+ }
+
+ ret = mpu->abort(this, cct, null_yield);
if (ret == 0) {
+ ret = notify->publish_commit(
+ this, sal_obj->get_obj_size(), ceph::real_clock::now(),
+ sal_obj->get_attrs()[RGW_ATTR_ETAG].to_str(),
+ version_id);
+ if (ret < 0) {
+ ldpp_dout(wk->get_lc(), 1)
+ << "ERROR: notify publish_commit failed, with error: " << ret
+ << dendl;
+ }
if (perfcounter) {
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
}
} else {
- if (ret == -ERR_NO_SUCH_UPLOAD) {
- ldpp_dout(wk->get_lc(), 5)
- << "ERROR: abort_multipart_upload failed, ret=" << ret
- << ", thread:" << wq->thr_name()
- << ", meta:" << obj.key
- << dendl;
- } else {
- ldpp_dout(wk->get_lc(), 0)
- << "ERROR: abort_multipart_upload failed, ret=" << ret
- << ", thread:" << wq->thr_name()
- << ", meta:" << obj.key
- << dendl;
- }
+ if (ret == -ERR_NO_SUCH_UPLOAD) {
+ ldpp_dout(wk->get_lc(), 5) << "ERROR: abort_multipart_upload "
+ "failed, ret="
+ << ret << ", thread:" << wq->thr_name()
+ << ", meta:" << obj.key << dendl;
+ } else {
+ ldpp_dout(wk->get_lc(), 0) << "ERROR: abort_multipart_upload "
+ "failed, ret="
+ << ret << ", thread:" << wq->thr_name()
+ << ", meta:" << obj.key << dendl;
+ }
} /* abort failed */
- } /* expired */
+ } /* expired */
+ return ret;
};
worker->workpool->setf(pf);
@@ -1267,28 +1297,83 @@ public:
/* If bucket is versioned, create delete_marker for current version
*/
- if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
- ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
- ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+ if (! oc.bucket->versioned()) {
+ ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectTransition);
+ ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key
+ << ") not versioned flags: " << oc.o.flags << dendl;
} else {
- ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
- ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+ /* versioned */
+ if (oc.o.is_current() && !oc.o.is_delete_marker()) {
+ ret = remove_expired_obj(oc.dpp, oc, false,
+ rgw::notify::ObjectTransitionCurrent);
+ ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key
+ << ") current & not delete_marker"
+ << " versioned_epoch: " << oc.o.versioned_epoch
+ << "flags: " << oc.o.flags << dendl;
+ } else {
+ ret = remove_expired_obj(oc.dpp, oc, true,
+ rgw::notify::ObjectTransitionNoncurrent);
+ ldpp_dout(oc.dpp, 20)
+ << "delete_tier_obj Object(key:" << oc.o.key << ") not current "
+ << "versioned_epoch: " << oc.o.versioned_epoch
+ << "flags: " << oc.o.flags << dendl;
+ }
}
+
return ret;
}
int transition_obj_to_cloud(lc_op_ctx& oc) {
+ int ret{0};
/* If CurrentVersion object, remove it & create delete marker */
bool delete_object = (!oc.tier->retain_head_object() ||
(oc.o.is_current() && oc.bucket->versioned()));
- int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
- oc.env.worker->get_cloud_targets(), oc.cct,
- !delete_object, oc.dpp, null_yield);
+ /* notifications */
+ auto& bucket = oc.bucket;
+ std::string version_id;
+
+ auto& obj = oc.obj;
+
+ const auto event_type = (bucket->versioned() &&
+ oc.o.is_current() && !oc.o.is_delete_marker()) ?
+ rgw::notify::ObjectTransitionCurrent :
+ rgw::notify::ObjectTransitionNoncurrent;
+
+ std::unique_ptr<rgw::sal::Notification> notify
+ = oc.driver->get_notification(
+ oc.dpp, obj.get(), nullptr, event_type,
+ bucket, lc_id,
+ const_cast<std::string&>(oc.bucket->get_tenant()),
+ lc_req_id, null_yield);
+
+ ret = notify->publish_reserve(oc.dpp, nullptr);
if (ret < 0) {
+ ldpp_dout(oc.dpp, 1)
+ << "ERROR: notify reservation failed, deferring transition of object k="
+ << oc.o.key
+ << dendl;
return ret;
}
+ ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
+ oc.env.worker->get_cloud_targets(),
+ oc.cct, !delete_object, oc.dpp,
+ null_yield);
+ if (ret < 0) {
+ return ret;
+ } else {
+ // send request to notification manager
+ ret = notify->publish_commit(oc.dpp, obj->get_obj_size(),
+ ceph::real_clock::now(),
+ obj->get_attrs()[RGW_ATTR_ETAG].to_str(),
+ version_id);
+ if (ret < 0) {
+ ldpp_dout(oc.dpp, 1) <<
+ "ERROR: notify publish_commit failed, with error: " << ret << dendl;
+ }
+ }
+
if (delete_object) {
ret = delete_tier_obj(oc);
if (ret < 0) {
@@ -1525,20 +1610,14 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
return 0;
}
- int ret = driver->get_bucket(this, nullptr, bucket_tenant, bucket_name, &bucket, null_yield);
+ int ret = driver->load_bucket(this, rgw_bucket(bucket_tenant, bucket_name),
+ &bucket, null_yield);
if (ret < 0) {
ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
<< " failed" << dendl;
return ret;
}
- ret = bucket->load_bucket(this, null_yield);
- if (ret < 0) {
- ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name
- << " failed" << dendl;
- return ret;
- }
-
auto stack_guard = make_scope_guard(
[&worker]
{
@@ -2054,14 +2133,14 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
utime_t lock_for_s(max_lock_secs, 0);
const auto& lock_lambda = [&]() {
- ret = lock->try_lock(this, lock_for_s, null_yield);
+ int ret = lock->try_lock(this, lock_for_s, null_yield);
if (ret == 0) {
return true;
}
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
return false;
- }
+ }
return false;
};