summaryrefslogtreecommitdiffstats
path: root/src/rgw/driver/rados/rgw_rados.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/driver/rados/rgw_rados.cc')
-rw-r--r--src/rgw/driver/rados/rgw_rados.cc583
1 files changed, 496 insertions, 87 deletions
diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc
index a75078d1151..69075c506f1 100644
--- a/src/rgw/driver/rados/rgw_rados.cc
+++ b/src/rgw/driver/rados/rgw_rados.cc
@@ -23,6 +23,7 @@
#include "common/BackTrace.h"
#include "common/ceph_time.h"
+#include "rgw_asio_thread.h"
#include "rgw_cksum.h"
#include "rgw_sal.h"
#include "rgw_zone.h"
@@ -36,6 +37,7 @@
#include "rgw_cr_rest.h"
#include "rgw_datalog.h"
#include "rgw_putobj_processor.h"
+#include "rgw_lc_tier.h"
#include "cls/rgw/cls_rgw_ops.h"
#include "cls/rgw/cls_rgw_client.h"
@@ -1928,11 +1930,58 @@ int RGWRados::Bucket::List::list_objects_ordered(
": finished due to getting past requested namespace \"" <<
params.ns << "\"" << dendl;
goto done;
- }
+ } else if (!obj.ns.empty()) {
+ // We're in the namespace range and we're enforcing an empty
+ // namespace, therefore we can skip past a congtiguous chunk
+ // of namespaced entries. Namespaces are demarcated in the
+ // index key by underscores before and after the namespace
+ // name (e.g., "_somenamespace_somekey"). Also, regular
+ // entries might begin with an underscore, in which case
+ // they're escaped with another underscore (e.g., "_foobar"
+ // is encoded as "__foobar"). We also have to account for
+ // the fact that in lexical ordering there are characters
+ // both before underscore (e.g., uppercase letters) and
+ // after (e.g., lowercase letters). So that means there can
+ // be five distinct and meaningful regions in the lexical
+ // ordering of entries, which we'll use examples to help
+ // illustrate:
+
+ // 1. FOOBAR (regular pre-underscore)
+ // 2. _BAZ_foobar (namespaced, with namespace pre-underscore)
+ // 3. __foobar (regular with escaped underscore)
+ // 4. _baz_foobar (namespaced, with namespace post-underscore)
+ // 5. foobar (regular, post-underscore)
+
+ // So if we're skipping namespaces and recognize we're in
+ // region 2, we must skip to region 3. And if we recognize
+ // we're in region 4, we skip to region 5.
+ rgw_obj_index_key potential_marker;
+ if (obj.ns[0] < '_') {
+ // We're in region 2, so need to skip to region 3. The
+ // caret (^) is the ASCII character that preceeds
+ // underscore, so we'll set the marker to the
+ // caret/circumflex followed by 0xFF, so the key after can
+ // be in the double underscore range.
+ potential_marker = rgw_obj_index_key("_^\xFF");
+ } else {
+ // we're passed the escaped underscore region (i.e.,
+ // starting with two underscores), so we can skip past the
+ // underscore region
+ potential_marker = rgw_obj_index_key("_\xFF");
+ }
+
+ if (cur_marker < potential_marker) {
+ ldpp_dout(dpp, 20) << __func__ <<
+ ": skipping past region of namespaced entries, starting with \"" <<
+ entry.key << "\"" << dendl;
+ cur_marker = potential_marker;
+ break; // leave inner loop (for) and allow another cls call
+ }
+ }
- /* we're skipping past namespaced objects */
+ // we're skipping past namespaced objects
ldpp_dout(dpp, 20) << __func__ <<
- ": skipping past namespaced objects, including \"" << entry.key <<
+ ": skipping past individual namespaced entry \"" << entry.key <<
"\"" << dendl;
continue;
}
@@ -1953,7 +2002,7 @@ int RGWRados::Bucket::List::list_objects_ordered(
if (params.access_list_filter &&
!params.access_list_filter(obj.name, index_key.name)) {
ldpp_dout(dpp, 20) << __func__ <<
- ": skipping past namespaced objects, including \"" << entry.key <<
+ ": skipping past filtered out entry \"" << entry.key <<
"\"" << dendl;
continue;
}
@@ -3211,6 +3260,30 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
op.setxattr(RGW_ATTR_STORAGE_CLASS, bl);
}
+ /* For temporary restored copies, storage-class returned
+ * in GET/list-objects should correspond to original
+ * cloudtier storage class. For GET its handled in its REST
+ * response by verifying RESTORE_TYPE in attrs. But the same
+ * cannot be done for list-objects response and hence this
+ * needs to be updated in bi entry itself.
+ */
+ auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
+ if (attr_iter != attrs.end()) {
+ rgw::sal::RGWRestoreType rt;
+ bufferlist bl = attr_iter->second;
+ auto iter = bl.cbegin();
+ decode(rt, iter);
+
+ if (rt == rgw::sal::RGWRestoreType::Temporary) {
+ // temporary restore; set storage-class to cloudtier storage class
+ auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS);
+
+ if (c_iter != attrs.end()) {
+ storage_class = rgw_bl_str(c_iter->second);
+ }
+ }
+ }
+
if (!op.size())
return 0;
@@ -3247,7 +3320,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
auto& ioctx = ref.ioctx;
tracepoint(rgw_rados, operate_enter, req_id.c_str());
- r = rgw_rados_operate(rctx.dpp, ref.ioctx, ref.obj.oid, &op, rctx.y, 0, &trace);
+ r = rgw_rados_operate(rctx.dpp, ref.ioctx, ref.obj.oid, &op, rctx.y, 0, &trace, &epoch);
tracepoint(rgw_rados, operate_exit, req_id.c_str());
if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under,
or -ENOENT if was removed, or -EEXIST if it did not exist
@@ -3259,7 +3332,6 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
goto done_cancel;
}
- epoch = ioctx.get_last_version();
poolid = ioctx.get_id();
r = target->complete_atomic_modification(rctx.dpp, rctx.y);
@@ -3318,12 +3390,17 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
return 0;
done_cancel:
- int ret = index_op->cancel(rctx.dpp, meta.remove_objs, rctx.y, log_op);
- if (ret < 0) {
- ldpp_dout(rctx.dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
- }
+ // if r == -ETIMEDOUT, rgw can't determine whether or not the rados op succeeded
+ // we shouldn't be calling index_op->cancel() in this case
+ // Instead, we should leave that pending entry in the index so than bucket listing can recover with check_disk_state() and cls_rgw_suggest_changes()
+ if (r != -ETIMEDOUT) {
+ int ret = index_op->cancel(rctx.dpp, meta.remove_objs, rctx.y, log_op);
+ if (ret < 0) {
+ ldpp_dout(rctx.dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
+ }
- meta.canceled = true;
+ meta.canceled = true;
+ }
/* we lost in a race. There are a few options:
* - existing object was rewritten (ECANCELED)
@@ -5064,7 +5141,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
int RGWRados::transition_obj(RGWObjectCtx& obj_ctx,
RGWBucketInfo& bucket_info,
- const rgw_obj& obj,
+ rgw_obj obj,
const rgw_placement_rule& placement_rule,
const real_time& mtime,
uint64_t olh_epoch,
@@ -5095,6 +5172,11 @@ int RGWRados::transition_obj(RGWObjectCtx& obj_ctx,
return -ECANCELED;
}
+ // bi expects empty instance for the entries created when bucket versioning
+ // is not enabled or suspended.
+ if (obj.key.instance == "null") {
+ obj.key.instance.clear();
+ }
attrs.erase(RGW_ATTR_ID_TAG);
attrs.erase(RGW_ATTR_TAIL_TAG);
@@ -5126,6 +5208,187 @@ int RGWRados::transition_obj(RGWObjectCtx& obj_ctx,
return 0;
}
+int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
+ RGWObjectCtx& obj_ctx,
+ RGWBucketInfo& dest_bucket_info,
+ const rgw_obj& dest_obj,
+ rgw_placement_rule& dest_placement,
+ RGWObjTier& tier_config,
+ real_time& mtime,
+ uint64_t olh_epoch,
+ std::optional<uint64_t> days,
+ const DoutPrefixProvider *dpp,
+ optional_yield y,
+ bool log_op){
+
+ //XXX: read below from attrs .. check transition_obj()
+ ACLOwner owner;
+ rgw::sal::Attrs attrs;
+ const req_context rctx{dpp, y, nullptr};
+ int ret = 0;
+ bufferlist t, t_tier;
+ string tag;
+ append_rand_alpha(cct, tag, tag, 32);
+ auto aio = rgw::make_throttle(cct->_conf->rgw_put_obj_min_window_size, y);
+ using namespace rgw::putobj;
+ jspan_context no_trace{false, false};
+ rgw::putobj::AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info, nullptr,
+ owner, obj_ctx, dest_obj, olh_epoch, tag, dpp, y, no_trace);
+
+ void (*progress_cb)(off_t, void *) = NULL;
+ void *progress_data = NULL;
+ bool cb_processed = false;
+ RGWFetchObjFilter *filter;
+ RGWFetchObjFilter_Default source_filter;
+ if (!filter) {
+ filter = &source_filter;
+ }
+ boost::optional<RGWPutObj_Compress> compressor;
+ CompressorRef plugin;
+ RGWRadosPutObj cb(dpp, cct, plugin, compressor, &processor, progress_cb, progress_data,
+ [&](map<string, bufferlist> obj_attrs) {
+ // XXX: do we need filter() like in fetch_remote_obj() cb
+ dest_placement.inherit_from(dest_bucket_info.placement_rule);
+ /* For now we always restore to STANDARD storage-class.
+ * Later we will add support to take restore-target-storage-class
+ * for permanent restore
+ */
+ dest_placement.storage_class = RGW_STORAGE_CLASS_STANDARD;
+
+ processor.set_tail_placement(dest_placement);
+
+ ret = processor.prepare(rctx.y);
+ if (ret < 0) {
+ return ret;
+ }
+ cb_processed = true;
+ return 0;
+ });
+
+ uint64_t accounted_size = 0;
+ string etag;
+ real_time set_mtime;
+ std::map<std::string, std::string> headers;
+ ldpp_dout(dpp, 20) << "Fetching from cloud, object:" << dest_obj << dendl;
+ ret = rgw_cloud_tier_get_object(tier_ctx, false, headers,
+ &set_mtime, etag, accounted_size,
+ attrs, &cb);
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << "Fetching from cloud failed, object:" << dest_obj << dendl;
+ return ret;
+ }
+
+ if (!cb_processed) {
+ ldpp_dout(dpp, 20) << "Callback not processed, object:" << dest_obj << dendl;
+ return -EIO;
+ }
+
+ ret = cb.flush();
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (cb.get_data_len() != accounted_size) {
+ ret = -EIO;
+ ldpp_dout(dpp, -1) << "ERROR: object truncated during fetching, expected "
+ << accounted_size << " bytes but received " << cb.get_data_len() << dendl;
+ return ret;
+ }
+
+ {
+ bufferlist bl;
+ encode(rgw::sal::RGWRestoreStatus::CloudRestored, bl);
+ attrs[RGW_ATTR_RESTORE_STATUS] = std::move(bl);
+ }
+
+ ceph::real_time restore_time = real_clock::now();
+ {
+ bufferlist bl;
+ encode(restore_time, bl);
+ attrs[RGW_ATTR_RESTORE_TIME] = std::move(bl);
+ }
+
+ real_time delete_at = real_time();
+ if (days) { //temp copy; do not change mtime and set expiry date
+ int expiry_days = days.value();
+ constexpr int32_t secs_in_a_day = 24 * 60 * 60;
+ ceph::real_time expiration_date ;
+
+ if (cct->_conf->rgw_restore_debug_interval > 0) {
+ expiration_date = restore_time + make_timespan(double(expiry_days)*cct->_conf->rgw_restore_debug_interval);
+ ldpp_dout(dpp, 20) << "Setting expiration time to rgw_restore_debug_interval: " << double(expiry_days)*cct->_conf->rgw_restore_debug_interval << ", days:" << expiry_days << dendl;
+ } else {
+ expiration_date = restore_time + make_timespan(double(expiry_days) * secs_in_a_day);
+ }
+ delete_at = expiration_date;
+
+ {
+ bufferlist bl;
+ encode(expiration_date, bl);
+ attrs[RGW_ATTR_RESTORE_EXPIRY_DATE] = std::move(bl);
+ }
+ {
+ bufferlist bl;
+ bl.clear();
+ using ceph::encode;
+ encode(rgw::sal::RGWRestoreType::Temporary, bl);
+ attrs[RGW_ATTR_RESTORE_TYPE] = std::move(bl);
+ ldpp_dout(dpp, 20) << "Temporary restore, object:" << dest_obj << dendl;
+ }
+ {
+ string sc = tier_ctx.storage_class;
+ bufferlist bl;
+ bl.append(sc.c_str(), sc.size());
+ attrs[RGW_ATTR_CLOUDTIER_STORAGE_CLASS] = std::move(bl);
+ ldpp_dout(dpp, 20) << "Setting RGW_ATTR_CLOUDTIER_STORAGE_CLASS: " << tier_ctx.storage_class << dendl;
+ }
+ //set same old mtime as that of transition time
+ set_mtime = mtime;
+
+ // set tier-config only for temp restored objects, as
+ // permanent copies will be treated as regular objects
+ {
+ t.append("cloud-s3");
+ encode(tier_config, t_tier);
+ attrs[RGW_ATTR_CLOUD_TIER_TYPE] = t;
+ attrs[RGW_ATTR_CLOUD_TIER_CONFIG] = t_tier;
+ }
+
+ } else { // permanent restore
+ {
+ bufferlist bl;
+ bl.clear();
+ using ceph::encode;
+ encode(rgw::sal::RGWRestoreType::Permanent, bl);
+ attrs[RGW_ATTR_RESTORE_TYPE] = std::move(bl);
+ ldpp_dout(dpp, 20) << "Permanent restore, object:" << dest_obj << dendl;
+ }
+ //set mtime to now()
+ set_mtime = real_clock::now();
+ }
+
+ {
+ string sc = dest_placement.get_storage_class(); //"STANDARD";
+ bufferlist bl;
+ bl.append(sc.c_str(), sc.size());
+ attrs[RGW_ATTR_STORAGE_CLASS] = std::move(bl);
+ }
+
+ // XXX: handle COMPLETE_RETRY like in fetch_remote_obj
+ bool canceled = false;
+ rgw_zone_set zone_set{};
+ ret = processor.complete(accounted_size, etag, &mtime, set_mtime,
+ attrs, rgw::cksum::no_cksum, delete_at , nullptr, nullptr, nullptr,
+ (rgw_zone_set *)&zone_set, &canceled, rctx, log_op ? rgw::sal::FLAG_LOG_OP : 0);
+ if (ret < 0) {
+ return ret;
+ }
+
+ // XXX: handle olh_epoch for versioned objects like in fetch_remote_obj
+ return ret;
+}
+
int RGWRados::check_bucket_empty(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y)
{
constexpr uint NUM_ENTRIES = 1000u;
@@ -5222,7 +5485,7 @@ int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& ob
}
/* if the bucket is not synced we can remove the meta file */
- if (!svc.zone->is_syncing_bucket_meta(bucket)) {
+ if (!svc.zone->is_syncing_bucket_meta()) {
RGWObjVersionTracker objv_tracker;
r = ctl.bucket->remove_bucket_instance_info(bucket, bucket_info, y, dpp);
if (r < 0) {
@@ -5230,6 +5493,7 @@ int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& ob
}
/* remove bucket index objects asynchronously by best effort */
+ maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
(void) CLSRGWIssueBucketIndexClean(index_pool,
bucket_objs,
cct->_conf->rgw_bucket_index_max_aio)();
@@ -5332,12 +5596,12 @@ int RGWRados::Object::complete_atomic_modification(const DoutPrefixProvider *dpp
if (store->gc == nullptr) {
ldpp_dout(dpp, 0) << "deleting objects inline since gc isn't initialized" << dendl;
//Delete objects inline just in case gc hasn't been initialised, prevents crashes
- store->delete_objs_inline(dpp, chain, tag);
+ store->delete_objs_inline(dpp, chain, tag, y);
} else {
auto [ret, leftover_chain] = store->gc->send_split_chain(chain, tag, y); // do it synchronously
if (ret < 0 && leftover_chain) {
//Delete objects inline if send chain to gc fails
- store->delete_objs_inline(dpp, *leftover_chain, tag);
+ store->delete_objs_inline(dpp, *leftover_chain, tag, y);
}
}
return 0;
@@ -5366,35 +5630,44 @@ std::tuple<int, std::optional<cls_rgw_obj_chain>> RGWRados::send_chain_to_gc(cls
return gc->send_split_chain(chain, tag, y);
}
-void RGWRados::delete_objs_inline(const DoutPrefixProvider *dpp, cls_rgw_obj_chain& chain, const string& tag)
+void RGWRados::delete_objs_inline(const DoutPrefixProvider *dpp, cls_rgw_obj_chain& chain,
+ const string& tag, optional_yield y)
{
- string last_pool;
- std::unique_ptr<IoCtx> ctx(new IoCtx);
- int ret = 0;
- for (auto liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
- cls_rgw_obj& obj = *liter;
- if (obj.pool != last_pool) {
- ctx.reset(new IoCtx);
- ret = rgw_init_ioctx(dpp, get_rados_handle(), obj.pool, *ctx);
- if (ret < 0) {
- last_pool = "";
- ldpp_dout(dpp, 0) << "ERROR: failed to create ioctx pool=" <<
- obj.pool << dendl;
- continue;
- }
- last_pool = obj.pool;
- }
- ctx->locator_set_key(obj.loc);
- const string& oid = obj.key.name; /* just stored raw oid there */
- ldpp_dout(dpp, 5) << "delete_objs_inline: removing " << obj.pool <<
- ":" << obj.key.name << dendl;
+ if (chain.objs.empty()) {
+ return;
+ }
+
+ // initialize an IoCtx for the first object's pool. RGWObjManifest uses the
+ // same pool for all tail objects
+ auto obj = chain.objs.begin();
+
+ librados::IoCtx ioctx;
+ int ret = rgw_init_ioctx(dpp, get_rados_handle(), obj->pool, ioctx);
+ if (ret < 0) {
+ return;
+ }
+
+ // issue deletions in parallel, up to max_aio at a time
+ auto aio = rgw::make_throttle(cct->_conf->rgw_multi_obj_del_max_aio, y);
+ static constexpr uint64_t cost = 1; // 1 throttle unit per request
+ static constexpr uint64_t id = 0; // ids unused
+
+ for (; obj != chain.objs.end(); ++obj) {
ObjectWriteOperation op;
cls_refcount_put(op, tag, true);
- ret = ctx->operate(oid, &op);
- if (ret < 0) {
- ldpp_dout(dpp, 5) << "delete_objs_inline: refcount put returned error " << ret << dendl;
- }
+
+ rgw_raw_obj raw;
+ raw.pool = std::move(obj->pool);
+ raw.oid = std::move(obj->key.name);
+ raw.loc = std::move(obj->loc);
+
+ auto completed = aio->get(std::move(raw), rgw::Aio::librados_op(
+ ioctx, std::move(op), y), cost, id);
+ std::ignore = rgw::check_for_errors(completed);
}
+
+ auto completed = aio->drain();
+ std::ignore = rgw::check_for_errors(completed);
}
static void accumulate_raw_stats(const rgw_bucket_dir_header& header,
@@ -5435,6 +5708,7 @@ int RGWRados::bucket_check_index(const DoutPrefixProvider *dpp, RGWBucketInfo& b
bucket_objs_ret.emplace(iter.first, rgw_cls_check_index_ret());
}
+ maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
ret = CLSRGWIssueBucketCheck(index_pool, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
if (ret < 0) {
return ret;
@@ -5459,6 +5733,7 @@ int RGWRados::bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo&
return r;
}
+ maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
return CLSRGWIssueBucketRebuild(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
@@ -5596,7 +5871,9 @@ int RGWRados::bucket_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
return 0;
}
-int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
+int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp,
+ const RGWBucketInfo& bucket_info,
+ const cls_rgw_bucket_instance_entry& entry)
{
librados::IoCtx index_pool;
map<int, string> bucket_objs;
@@ -5609,6 +5886,7 @@ int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketI
return r;
}
+ maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
@@ -5855,20 +6133,29 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
store->remove_rgw_head_obj(op);
+ if (params.check_objv != nullptr) {
+ cls_version_check(op, *params.check_objv, VER_COND_EQ);
+ }
+
auto& ioctx = ref.ioctx;
- r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
+ version_t epoch = 0;
+ r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y, 0, nullptr, &epoch);
/* raced with another operation, object state is indeterminate */
const bool need_invalidate = (r == -ECANCELED);
int64_t poolid = ioctx.get_id();
- if (r >= 0) {
+ if (r == -ETIMEDOUT) {
+ // rgw can't determine whether or not the delete succeeded, shouldn't be calling either of complete_del() or cancel()
+ // leaving that pending entry in the index so that bucket listing can recover with check_disk_state() and cls_rgw_suggest_changes()
+ ldpp_dout(dpp, 0) << "ERROR: rgw_rados_operate returned r=" << r << dendl;
+ } else if (r >= 0 || r == -ENOENT) {
tombstone_cache_t *obj_tombstone_cache = store->get_tombstone_cache();
if (obj_tombstone_cache) {
tombstone_entry entry{*state};
obj_tombstone_cache->add(obj, entry);
}
- r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y, log_op);
+ r = index_op.complete_del(dpp, poolid, epoch, state->mtime, params.remove_objs, y, log_op);
int ret = target->complete_atomic_modification(dpp, y);
if (ret < 0) {
@@ -6049,7 +6336,7 @@ int RGWRados::get_obj_state_impl(const DoutPrefixProvider *dpp, RGWObjectCtx *oc
int r = -ENOENT;
if (!assume_noent) {
- r = RGWRados::raw_obj_stat(dpp, raw_obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : NULL), NULL, y);
+ r = RGWRados::raw_obj_stat(dpp, raw_obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : NULL), &s->objv_tracker, y);
}
if (r == -ENOENT) {
@@ -6589,7 +6876,8 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
struct timespec mtime_ts = real_clock::to_timespec(mtime);
op.mtime2(&mtime_ts);
auto& ioctx = ref.ioctx;
- r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
+ version_t epoch = 0;
+ r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y, 0, nullptr, &epoch);
if (state) {
if (r >= 0) {
ACLOwner owner;
@@ -6620,11 +6908,29 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
iter != state->attrset.end()) {
storage_class = rgw_bl_str(iter->second);
}
- uint64_t epoch = ioctx.get_last_version();
int64_t poolid = ioctx.get_id();
+
+ // Retain Object category as CloudTiered while restore is in
+ // progress or failed
+ RGWObjCategory category = RGWObjCategory::Main;
+ auto r_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
+ if (r_iter != attrs.end()) {
+ rgw::sal::RGWRestoreStatus st = rgw::sal::RGWRestoreStatus::None;
+ auto iter = r_iter->second.cbegin();
+
+ try {
+ using ceph::decode;
+ decode(st, iter);
+
+ if (st != rgw::sal::RGWRestoreStatus::CloudRestored) {
+ category = RGWObjCategory::CloudTiered;
+ }
+ } catch (buffer::error& err) {
+ }
+ }
r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size,
mtime, etag, content_type, storage_class, owner,
- RGWObjCategory::Main, nullptr, y, nullptr, false, log_op);
+ category, nullptr, y, nullptr, false, log_op);
} else {
int ret = index_op.cancel(dpp, nullptr, y, log_op);
if (ret < 0) {
@@ -6656,13 +6962,13 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
}
return 0;
-}
+} /* RGWRados::set_attrs() */
-static int get_part_obj_state(const DoutPrefixProvider* dpp, optional_yield y,
- RGWRados* store, RGWBucketInfo& bucket_info,
- RGWObjectCtx* rctx, RGWObjManifest* manifest,
- int part_num, int* parts_count, bool prefetch,
- RGWObjState** pstate, RGWObjManifest** pmanifest)
+int RGWRados::get_part_obj_state(const DoutPrefixProvider* dpp, optional_yield y,
+ RGWRados* store, RGWBucketInfo& bucket_info,
+ RGWObjectCtx* rctx, RGWObjManifest* manifest,
+ int part_num, int* parts_count, bool prefetch,
+ RGWObjState** pstate, RGWObjManifest** pmanifest)
{
if (!manifest) {
return -ERR_INVALID_PART;
@@ -6741,6 +7047,9 @@ static int get_part_obj_state(const DoutPrefixProvider* dpp, optional_yield y,
// update the object size
sm->state.size = part_manifest.get_obj_size();
+ if (!sm->state.attrset.count(RGW_ATTR_COMPRESSION)) {
+ sm->state.accounted_size = sm->state.size;
+ }
*pmanifest = &part_manifest;
return 0;
@@ -6782,9 +7091,20 @@ int RGWRados::Object::Read::prepare(optional_yield y, const DoutPrefixProvider *
return -ENOENT;
}
+ if (params.objv_tracker) {
+ *params.objv_tracker = astate->objv_tracker;
+ }
+
RGWBucketInfo& bucket_info = source->get_bucket_info();
if (params.part_num) {
+ map<string, bufferlist> src_attrset;
+ for (auto& iter : astate->attrset) {
+ if (boost::algorithm::starts_with(iter.first, RGW_ATTR_CRYPT_PREFIX)) {
+ ldpp_dout(dpp, 4) << "get src crypt attr: " << iter.first << dendl;
+ src_attrset[iter.first] = iter.second;
+ }
+ }
int parts_count = 0;
// use the manifest to redirect to the requested part number
r = get_part_obj_state(dpp, y, store, bucket_info, &source->get_ctx(),
@@ -6807,6 +7127,13 @@ int RGWRados::Object::Read::prepare(optional_yield y, const DoutPrefixProvider *
} else {
params.parts_count = parts_count;
}
+
+ for (auto& iter : src_attrset) {
+ ldpp_dout(dpp, 4) << "copy crypt attr: " << iter.first << dendl;
+ if (astate->attrset.find(iter.first) == astate->attrset.end()) {
+ astate->attrset[iter.first] = std::move(iter.second);
+ }
+ }
}
state.obj = astate->obj;
@@ -6966,6 +7293,10 @@ int RGWRados::Bucket::UpdateIndex::guard_reshard(const DoutPrefixProvider *dpp,
*pbs = bs;
}
+ if (target->bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) {
+ store->check_reshard_logrecord_status(target->bucket_info, y, dpp);
+ }
+
return 0;
}
@@ -7221,7 +7552,7 @@ int RGWRados::Object::Read::read(int64_t ofs, int64_t end,
state.cur_ioctx->locator_set_key(read_obj.loc);
- r = state.cur_ioctx->operate(read_obj.oid, &op, NULL);
+ r = rgw_rados_operate(dpp, *state.cur_ioctx, read_obj.oid, &op, nullptr, y);
ldpp_dout(dpp, 20) << "rados->read r=" << r << " bl.length=" << bl.length() << dendl;
if (r < 0) {
@@ -7659,9 +7990,76 @@ int RGWRados::guard_reshard(const DoutPrefixProvider *dpp,
return r;
}
+ if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) {
+ check_reshard_logrecord_status(bucket_info, y, dpp);
+ }
+
return 0;
}
+int RGWRados::check_reshard_logrecord_status(RGWBucketInfo& bucket_info, optional_yield y,
+ const DoutPrefixProvider *dpp)
+{
+ real_time now = real_clock::now();
+ double r = rand() / (double)RAND_MAX;
+ double reshard_progress_judge_interval = cct->_conf.get_val<uint64_t>("rgw_reshard_progress_judge_interval");
+ // avoid getting reshard_lock simultaneously by mass differrent operation
+ reshard_progress_judge_interval +=
+ reshard_progress_judge_interval * cct->_conf.get_val<double>("rgw_reshard_progress_judge_ratio") * r;
+ if (now - bucket_info.layout.judge_reshard_lock_time >= make_timespan(reshard_progress_judge_interval)) {
+
+ map<string, bufferlist> bucket_attrs;
+ int ret = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.bucket.name,
+ bucket_info, nullptr, y, dpp, &bucket_attrs);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ " ERROR: failed to refresh bucket info : " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord &&
+ now - bucket_info.layout.judge_reshard_lock_time >= make_timespan(reshard_progress_judge_interval))
+ return recover_reshard_logrecord(bucket_info, bucket_attrs, y, dpp);
+ }
+ return 0;
+}
+
+int RGWRados::recover_reshard_logrecord(RGWBucketInfo& bucket_info,
+ map<string, bufferlist>& bucket_attrs,
+ optional_yield y,
+ const DoutPrefixProvider *dpp)
+{
+ RGWBucketReshardLock reshard_lock(this->driver, bucket_info, true);
+ int ret = reshard_lock.lock(dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ <<
+ " INFO: failed to take reshard lock for bucket " <<
+ bucket_info.bucket.bucket_id << "; expected if resharding underway" << dendl;
+ // update the judge time
+ bucket_info.layout.judge_reshard_lock_time = real_clock::now();
+ ret = put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ <<
+ " ERROR: error putting bucket instance info: " << cpp_strerror(-ret) << dendl;
+ }
+ } else {
+ ldpp_dout(dpp,20) << __func__ << ": reshard lock success, " <<
+ "that means the reshard has failed for bucekt " << bucket_info.bucket.bucket_id << dendl;
+ // clear the RESHARD_IN_PROGRESS status after reshard failed, set bucket instance status
+ // to CLS_RGW_RESHARD_NONE, also clear the reshard log entries
+ ret = RGWBucketReshard::clear_resharding(this->driver, bucket_info, bucket_attrs, dpp, y);
+ reshard_lock.unlock();
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ " ERROR: failed to clear resharding flags for bucket " <<
+ bucket_info.bucket.bucket_id << dendl;
+ } else {
+ ldpp_dout(dpp, 5) << __func__ <<
+ " INFO: apparently successfully cleared resharding flags for "
+ "bucket " << bucket_info.bucket.bucket_id << dendl;
+ } // if clear resharding succeeded
+ } // if taking of lock succeeded
+ return 0;
+}
int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
const rgw_obj& obj_instance,
@@ -7727,7 +8125,7 @@ int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
return ret;
}
- if (!entry.resharding_in_progress()) {
+ if (!entry.resharding()) {
ret = fetch_new_bucket_info("get_bucket_resharding_succeeded");
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
@@ -8708,13 +9106,9 @@ int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp,
if (first_chunk) {
op.read(0, cct->_conf->rgw_max_chunk_size, first_chunk, NULL);
}
- bufferlist outbl;
- r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, &outbl, y);
-
- if (epoch) {
- *epoch = ref.ioctx.get_last_version();
- }
+ bufferlist outbl;
+ r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, &outbl, y, 0, nullptr, epoch);
if (r < 0)
return r;
@@ -9202,7 +9596,8 @@ int RGWRados::bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj&
int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket,
const string& obj_name_filter, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated,
+ bool reshardlog, optional_yield y)
{
rgw_obj obj(bucket, obj_name_filter);
BucketShard bs(this);
@@ -9213,7 +9608,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket,
}
auto& ref = bs.bucket_obj;
- ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated);
+ ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog);
if (ret == -ENOENT) {
*is_truncated = false;
}
@@ -9224,10 +9619,10 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket,
}
int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y)
{
auto& ref = bs.bucket_obj;
- int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated);
+ int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog);
if (ret < 0)
return ret;
@@ -9236,7 +9631,7 @@ int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const stri
int RGWRados::bi_list(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info, int shard_id, const string& obj_name_filter, const string& marker, uint32_t max,
- list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+ list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y)
{
BucketShard bs(this);
int ret = bs.init(dpp, bucket_info,
@@ -9247,7 +9642,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp,
return ret;
}
- return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, y);
+ return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, reshardlog, y);
}
int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs)
@@ -9265,6 +9660,18 @@ int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs)
return 0;
}
+int RGWRados::trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y)
+{
+ librados::IoCtx index_pool;
+ map<int, string> bucket_objs;
+
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+ if (r < 0) {
+ return r;
+ }
+ return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+}
+
int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op, optional_yield y)
{
return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, y);
@@ -9291,13 +9698,6 @@ int RGWRados::process_gc(bool expired_only, optional_yield y)
return gc->process(expired_only, y);
}
-int RGWRados::list_lc_progress(string& marker, uint32_t max_entries,
- vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>>& progress_map,
- int& index)
-{
- return lc->list_lc_progress(marker, max_entries, progress_map, index);
-}
-
int RGWRados::process_lc(const std::unique_ptr<rgw::sal::Bucket>& optional_bucket)
{
RGWLC lc;
@@ -9430,6 +9830,7 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(const DoutPrefixProvider *dpp, RGWB
if (r < 0)
return r;
+ maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
return CLSRGWIssueSetTagTimeout(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
}
@@ -9559,8 +9960,15 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
num_entries << " total entries" << dendl;
auto& ioctx = index_pool;
+
+ // XXX: check_disk_state() relies on ioctx.get_last_version() but that
+ // returns 0 because CLSRGWIssueBucketList doesn't make any synchonous calls
+ rgw_bucket_entry_ver index_ver;
+ index_ver.pool = ioctx.get_id();
+
std::map<int, rgw_cls_list_ret> shard_list_results;
cls_rgw_obj_key start_after_key(start_after.name, start_after.instance);
+ maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter,
num_entries_per_shard,
list_versions, shard_oids, shard_list_results,
@@ -9682,12 +10090,10 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
/* there are uncommitted ops. We need to check the current
* state, and if the tags are old we need to do clean-up as
* well. */
- librados::IoCtx sub_ctx;
- sub_ctx.dup(ioctx);
ldout_bitx(bitx, dpp, 20) << "INFO: " << __func__ <<
" calling check_disk_state bucket=" << bucket_info.bucket <<
" entry=" << dirent.key << dendl_bitx;
- r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent,
+ r = check_disk_state(dpp, bucket_info, index_ver, dirent, dirent,
updates[tracker.oid_name], y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 0) << __func__ <<
@@ -9755,6 +10161,8 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
for (auto& miter : updates) {
if (miter.second.length()) {
ObjectWriteOperation o;
+ o.assert_exists();
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_suggest_changes(o, miter.second);
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c =
@@ -9907,6 +10315,9 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
}
}
+ rgw_bucket_entry_ver index_ver;
+ index_ver.pool = ioctx.get_id();
+
uint32_t count = 0u;
std::map<std::string, bufferlist> updates;
rgw_obj_index_key last_added_entry;
@@ -9921,7 +10332,7 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
cls_rgw_bucket_list_op(op, marker, prefix, empty_delimiter,
num_entries,
list_versions, &result);
- r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y);
+ r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y, 0, nullptr, &index_ver.epoch);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
": error in rgw_rados_operate (bucket list op), r=" << r << dendl;
@@ -9938,12 +10349,10 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
force_check) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
- librados::IoCtx sub_ctx;
- sub_ctx.dup(ioctx);
ldout_bitx(bitx, dpp, 20) << "INFO: " << __func__ <<
": calling check_disk_state bucket=" << bucket_info.bucket <<
" entry=" << dirent.key << dendl_bitx;
- r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent, updates[oid], y);
+ r = check_disk_state(dpp, bucket_info, index_ver, dirent, dirent, updates[oid], y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
": error in check_disk_state, r=" << r << dendl;
@@ -9990,6 +10399,8 @@ check_updates:
for (; miter != updates.end(); ++miter) {
if (miter->second.length()) {
ObjectWriteOperation o;
+ o.assert_exists();
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
cls_rgw_suggest_changes(o, miter->second);
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr);
@@ -10173,8 +10584,8 @@ int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp,
}
int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
- librados::IoCtx io_ctx,
RGWBucketInfo& bucket_info,
+ const rgw_bucket_entry_ver& index_ver,
rgw_bucket_dir_entry& list_state,
rgw_bucket_dir_entry& object,
bufferlist& suggested_updates,
@@ -10202,8 +10613,6 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
ldpp_dout(dpp, 0) << "WARNING: generated locator (" << loc << ") is different from listed locator (" << list_state.locator << ")" << dendl;
}
- io_ctx.locator_set_key(list_state.locator);
-
RGWObjState *astate = NULL;
RGWObjManifest *manifest = nullptr;
RGWObjectCtx octx(this->driver);
@@ -10224,8 +10633,7 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
}
// encode a suggested removal of that key
- list_state.ver.epoch = io_ctx.get_last_version();
- list_state.ver.pool = io_ctx.get_id();
+ list_state.ver = index_ver;
ldout_bitx(bitx, dpp, 10) << "INFO: " << __func__ << ": encoding remove of " << list_state.key << " on suggested_updates" << dendl_bitx;
cls_rgw_encode_suggestion(CEPH_RGW_REMOVE | suggest_flag, list_state, suggested_updates);
return -ENOENT;
@@ -10341,6 +10749,7 @@ int RGWRados::cls_bucket_head(const DoutPrefixProvider *dpp, const RGWBucketInfo
return r;
}
+ maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
r = CLSRGWIssueGetDirHeader(index_pool, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0) {
ldpp_dout(dpp, 20) << "cls_bucket_head: CLSRGWIssueGetDirHeader() returned "