diff options
Diffstat (limited to 'src/rgw/rgw_lc.cc')
-rw-r--r-- | src/rgw/rgw_lc.cc | 193 |
1 files changed, 136 insertions, 57 deletions
diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index ab2dda48ec7..4ae3a950ade 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -480,7 +480,7 @@ struct lc_op_ctx { LCObjsLister& ol; std::unique_ptr<rgw::sal::Object> obj; - RGWObjectCtx rctx; + RGWObjectCtx octx; const DoutPrefixProvider *dpp; WorkQ* wq; @@ -493,7 +493,7 @@ struct lc_op_ctx { : cct(env.driver->ctx()), env(env), o(o), next_key_name(next_key_name), effective_mtime(effective_mtime), driver(env.driver), bucket(env.bucket), op(env.op), ol(env.ol), - rctx(env.driver), dpp(dpp), wq(wq) + octx(env.driver), dpp(dpp), wq(wq) { obj = bucket->get_object(o.key); } @@ -519,26 +519,22 @@ static int remove_expired_obj( auto obj_key = o.key; auto& meta = o.meta; int ret; - std::string version_id; + auto& version_id = obj_key.instance; std::unique_ptr<rgw::sal::Notification> notify; + std::unique_ptr<rgw::sal::User> user; + user = driver->get_user(bucket_info.owner); + + /* per discussion w/Daniel, Casey,and Eric, we *do need* + * a new sal object handle, based on the following decision + * to clear obj_key.instance--which happens in the case + * where a delete marker should be created */ if (!remove_indeed) { obj_key.instance.clear(); } else if (obj_key.instance.empty()) { obj_key.instance = "null"; } - - std::unique_ptr<rgw::sal::User> user; - std::unique_ptr<rgw::sal::Bucket> bucket; - std::unique_ptr<rgw::sal::Object> obj; - - user = driver->get_user(bucket_info.owner); - ret = driver->get_bucket(user.get(), bucket_info, &bucket); - if (ret < 0) { - return ret; - } - - obj = bucket->get_object(obj_key); + auto obj = oc.bucket->get_object(obj_key); RGWObjState* obj_state{nullptr}; ret = obj->get_obj_state(dpp, &obj_state, null_yield, true); @@ -554,11 +550,10 @@ static int remove_expired_obj( del_op->params.obj_owner.set_name(meta.owner_display_name); del_op->params.bucket_owner.set_id(bucket_info.owner); del_op->params.unmod_since = meta.mtime; - del_op->params.marker_version_id = version_id; // notification supported only for RADOS driver for now - notify = driver->get_notification(dpp, obj.get(), nullptr, event_type, - bucket.get(), lc_id, + notify = driver->get_notification(dpp, oc.obj.get(), nullptr, event_type, + oc.bucket, lc_id, const_cast<std::string&>(oc.bucket->get_tenant()), lc_req_id, null_yield); @@ -573,13 +568,17 @@ static int remove_expired_obj( ret = del_op->delete_obj(dpp, null_yield); if (ret < 0) { ldpp_dout(dpp, 1) << - "ERROR: publishing notification failed, with error: " << ret << dendl; + fmt::format("ERROR: {} failed, with error: {}", __func__, ret) << dendl; } else { // send request to notification manager - (void) notify->publish_commit(dpp, obj_state->size, - ceph::real_clock::now(), - obj_state->attrset[RGW_ATTR_ETAG].to_str(), - version_id); + ret = notify->publish_commit(dpp, obj_state->size, + ceph::real_clock::now(), + obj_state->attrset[RGW_ATTR_ETAG].to_str(), + version_id); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: notify publish_commit failed, with error: " + << ret << dendl; + } } return ret; @@ -828,33 +827,64 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, params.ns = RGW_OBJ_NS_MULTIPART; params.access_list_filter = &mp_filter; - auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { + const auto event_type = rgw::notify::ObjectExpirationAbortMPU; + + auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi) { + int ret{0}; auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi); auto& [rule, obj] = wt; if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) { rgw_obj_key key(obj.key); std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name); - int ret = mpu->abort(this, cct, null_yield); + std::unique_ptr<rgw::sal::Object> sal_obj + = target->get_object(key); + std::unique_ptr<rgw::sal::Notification> notify + = driver->get_notification( + this, sal_obj.get(), nullptr, event_type, + target, lc_id, + const_cast<std::string&>(target->get_tenant()), + lc_req_id, null_yield); + auto& version_id = obj.key.instance; + + ret = notify->publish_reserve(this, nullptr); + if (ret < 0) { + ldpp_dout(wk->get_lc(), 0) + << "ERROR: reserving persistent notification for " + "abort_multipart_upload, ret=" + << ret << ", thread:" << wq->thr_name() + << ", deferring mpu cleanup for meta:" << obj.key << dendl; + return ret; + } + + ret = mpu->abort(this, cct, null_yield); if (ret == 0) { + ret = notify->publish_commit( + this, sal_obj->get_obj_size(), ceph::real_clock::now(), + sal_obj->get_attrs()[RGW_ATTR_ETAG].to_str(), + version_id); + if (ret < 0) { + ldpp_dout(wk->get_lc(), 1) + << "ERROR: notify publish_commit failed, with error: " << ret + << dendl; + } if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); } } else { - if (ret == -ERR_NO_SUCH_UPLOAD) { - ldpp_dout(wk->get_lc(), 5) - << "ERROR: abort_multipart_upload failed, ret=" << ret - << ", thread:" << wq->thr_name() - << ", meta:" << obj.key - << dendl; - } else { - ldpp_dout(wk->get_lc(), 0) - << "ERROR: abort_multipart_upload failed, ret=" << ret - << ", thread:" << wq->thr_name() - << ", meta:" << obj.key - << dendl; - } + if (ret == -ERR_NO_SUCH_UPLOAD) { + ldpp_dout(wk->get_lc(), 5) << "ERROR: abort_multipart_upload " + "failed, ret=" + << ret << ", thread:" << wq->thr_name() + << ", meta:" << obj.key << dendl; + } else { + ldpp_dout(wk->get_lc(), 0) << "ERROR: abort_multipart_upload " + "failed, ret=" + << ret << ", thread:" << wq->thr_name() + << ", meta:" << obj.key << dendl; + } } /* abort failed */ - } /* expired */ + } /* expired */ + return ret; }; worker->workpool->setf(pf); @@ -1267,28 +1297,83 @@ public: /* If bucket is versioned, create delete_marker for current version */ - if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { - ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration); - ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + if (! oc.bucket->versioned()) { + ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectTransition); + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key + << ") not versioned flags: " << oc.o.flags << dendl; } else { - ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration); - ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + /* versioned */ + if (oc.o.is_current() && !oc.o.is_delete_marker()) { + ret = remove_expired_obj(oc.dpp, oc, false, + rgw::notify::ObjectTransitionCurrent); + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key + << ") current & not delete_marker" + << " versioned_epoch: " << oc.o.versioned_epoch + << "flags: " << oc.o.flags << dendl; + } else { + ret = remove_expired_obj(oc.dpp, oc, true, + rgw::notify::ObjectTransitionNoncurrent); + ldpp_dout(oc.dpp, 20) + << "delete_tier_obj Object(key:" << oc.o.key << ") not current " + << "versioned_epoch: " << oc.o.versioned_epoch + << "flags: " << oc.o.flags << dendl; + } } + return ret; } int transition_obj_to_cloud(lc_op_ctx& oc) { + int ret{0}; /* If CurrentVersion object, remove it & create delete marker */ bool delete_object = (!oc.tier->retain_head_object() || (oc.o.is_current() && oc.bucket->versioned())); - int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, - oc.env.worker->get_cloud_targets(), oc.cct, - !delete_object, oc.dpp, null_yield); + /* notifications */ + auto& bucket = oc.bucket; + std::string version_id; + + auto& obj = oc.obj; + + const auto event_type = (bucket->versioned() && + oc.o.is_current() && !oc.o.is_delete_marker()) ? + rgw::notify::ObjectTransitionCurrent : + rgw::notify::ObjectTransitionNoncurrent; + + std::unique_ptr<rgw::sal::Notification> notify + = oc.driver->get_notification( + oc.dpp, obj.get(), nullptr, event_type, + bucket, lc_id, + const_cast<std::string&>(oc.bucket->get_tenant()), + lc_req_id, null_yield); + + ret = notify->publish_reserve(oc.dpp, nullptr); if (ret < 0) { + ldpp_dout(oc.dpp, 1) + << "ERROR: notify reservation failed, deferring transition of object k=" + << oc.o.key + << dendl; return ret; } + ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, + oc.env.worker->get_cloud_targets(), + oc.cct, !delete_object, oc.dpp, + null_yield); + if (ret < 0) { + return ret; + } else { + // send request to notification manager + ret = notify->publish_commit(oc.dpp, obj->get_obj_size(), + ceph::real_clock::now(), + obj->get_attrs()[RGW_ATTR_ETAG].to_str(), + version_id); + if (ret < 0) { + ldpp_dout(oc.dpp, 1) << + "ERROR: notify publish_commit failed, with error: " << ret << dendl; + } + } + if (delete_object) { ret = delete_tier_obj(oc); if (ret < 0) { @@ -1525,20 +1610,14 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, return 0; } - int ret = driver->get_bucket(this, nullptr, bucket_tenant, bucket_name, &bucket, null_yield); + int ret = driver->load_bucket(this, rgw_bucket(bucket_tenant, bucket_name), + &bucket, null_yield); if (ret < 0) { ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name << " failed" << dendl; return ret; } - ret = bucket->load_bucket(this, null_yield); - if (ret < 0) { - ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name - << " failed" << dendl; - return ret; - } - auto stack_guard = make_scope_guard( [&worker] { @@ -2054,14 +2133,14 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, utime_t lock_for_s(max_lock_secs, 0); const auto& lock_lambda = [&]() { - ret = lock->try_lock(this, lock_for_s, null_yield); + int ret = lock->try_lock(this, lock_for_s, null_yield); if (ret == 0) { return true; } if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ return false; - } + } return false; }; |