diff options
Diffstat (limited to 'src/rgw/driver/rados/rgw_rados.cc')
-rw-r--r-- | src/rgw/driver/rados/rgw_rados.cc | 583 |
1 files changed, 496 insertions, 87 deletions
diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index a75078d1151..69075c506f1 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -23,6 +23,7 @@ #include "common/BackTrace.h" #include "common/ceph_time.h" +#include "rgw_asio_thread.h" #include "rgw_cksum.h" #include "rgw_sal.h" #include "rgw_zone.h" @@ -36,6 +37,7 @@ #include "rgw_cr_rest.h" #include "rgw_datalog.h" #include "rgw_putobj_processor.h" +#include "rgw_lc_tier.h" #include "cls/rgw/cls_rgw_ops.h" #include "cls/rgw/cls_rgw_client.h" @@ -1928,11 +1930,58 @@ int RGWRados::Bucket::List::list_objects_ordered( ": finished due to getting past requested namespace \"" << params.ns << "\"" << dendl; goto done; - } + } else if (!obj.ns.empty()) { + // We're in the namespace range and we're enforcing an empty + // namespace, therefore we can skip past a congtiguous chunk + // of namespaced entries. Namespaces are demarcated in the + // index key by underscores before and after the namespace + // name (e.g., "_somenamespace_somekey"). Also, regular + // entries might begin with an underscore, in which case + // they're escaped with another underscore (e.g., "_foobar" + // is encoded as "__foobar"). We also have to account for + // the fact that in lexical ordering there are characters + // both before underscore (e.g., uppercase letters) and + // after (e.g., lowercase letters). So that means there can + // be five distinct and meaningful regions in the lexical + // ordering of entries, which we'll use examples to help + // illustrate: + + // 1. FOOBAR (regular pre-underscore) + // 2. _BAZ_foobar (namespaced, with namespace pre-underscore) + // 3. __foobar (regular with escaped underscore) + // 4. _baz_foobar (namespaced, with namespace post-underscore) + // 5. foobar (regular, post-underscore) + + // So if we're skipping namespaces and recognize we're in + // region 2, we must skip to region 3. And if we recognize + // we're in region 4, we skip to region 5. + rgw_obj_index_key potential_marker; + if (obj.ns[0] < '_') { + // We're in region 2, so need to skip to region 3. The + // caret (^) is the ASCII character that preceeds + // underscore, so we'll set the marker to the + // caret/circumflex followed by 0xFF, so the key after can + // be in the double underscore range. + potential_marker = rgw_obj_index_key("_^\xFF"); + } else { + // we're passed the escaped underscore region (i.e., + // starting with two underscores), so we can skip past the + // underscore region + potential_marker = rgw_obj_index_key("_\xFF"); + } + + if (cur_marker < potential_marker) { + ldpp_dout(dpp, 20) << __func__ << + ": skipping past region of namespaced entries, starting with \"" << + entry.key << "\"" << dendl; + cur_marker = potential_marker; + break; // leave inner loop (for) and allow another cls call + } + } - /* we're skipping past namespaced objects */ + // we're skipping past namespaced objects ldpp_dout(dpp, 20) << __func__ << - ": skipping past namespaced objects, including \"" << entry.key << + ": skipping past individual namespaced entry \"" << entry.key << "\"" << dendl; continue; } @@ -1953,7 +2002,7 @@ int RGWRados::Bucket::List::list_objects_ordered( if (params.access_list_filter && !params.access_list_filter(obj.name, index_key.name)) { ldpp_dout(dpp, 20) << __func__ << - ": skipping past namespaced objects, including \"" << entry.key << + ": skipping past filtered out entry \"" << entry.key << "\"" << dendl; continue; } @@ -3211,6 +3260,30 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si op.setxattr(RGW_ATTR_STORAGE_CLASS, bl); } + /* For temporary restored copies, storage-class returned + * in GET/list-objects should correspond to original + * cloudtier storage class. For GET its handled in its REST + * response by verifying RESTORE_TYPE in attrs. But the same + * cannot be done for list-objects response and hence this + * needs to be updated in bi entry itself. + */ + auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE); + if (attr_iter != attrs.end()) { + rgw::sal::RGWRestoreType rt; + bufferlist bl = attr_iter->second; + auto iter = bl.cbegin(); + decode(rt, iter); + + if (rt == rgw::sal::RGWRestoreType::Temporary) { + // temporary restore; set storage-class to cloudtier storage class + auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS); + + if (c_iter != attrs.end()) { + storage_class = rgw_bl_str(c_iter->second); + } + } + } + if (!op.size()) return 0; @@ -3247,7 +3320,7 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si auto& ioctx = ref.ioctx; tracepoint(rgw_rados, operate_enter, req_id.c_str()); - r = rgw_rados_operate(rctx.dpp, ref.ioctx, ref.obj.oid, &op, rctx.y, 0, &trace); + r = rgw_rados_operate(rctx.dpp, ref.ioctx, ref.obj.oid, &op, rctx.y, 0, &trace, &epoch); tracepoint(rgw_rados, operate_exit, req_id.c_str()); if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under, or -ENOENT if was removed, or -EEXIST if it did not exist @@ -3259,7 +3332,6 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si goto done_cancel; } - epoch = ioctx.get_last_version(); poolid = ioctx.get_id(); r = target->complete_atomic_modification(rctx.dpp, rctx.y); @@ -3318,12 +3390,17 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si return 0; done_cancel: - int ret = index_op->cancel(rctx.dpp, meta.remove_objs, rctx.y, log_op); - if (ret < 0) { - ldpp_dout(rctx.dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl; - } + // if r == -ETIMEDOUT, rgw can't determine whether or not the rados op succeeded + // we shouldn't be calling index_op->cancel() in this case + // Instead, we should leave that pending entry in the index so than bucket listing can recover with check_disk_state() and cls_rgw_suggest_changes() + if (r != -ETIMEDOUT) { + int ret = index_op->cancel(rctx.dpp, meta.remove_objs, rctx.y, log_op); + if (ret < 0) { + ldpp_dout(rctx.dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl; + } - meta.canceled = true; + meta.canceled = true; + } /* we lost in a race. There are a few options: * - existing object was rewritten (ECANCELED) @@ -5064,7 +5141,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx, int RGWRados::transition_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, - const rgw_obj& obj, + rgw_obj obj, const rgw_placement_rule& placement_rule, const real_time& mtime, uint64_t olh_epoch, @@ -5095,6 +5172,11 @@ int RGWRados::transition_obj(RGWObjectCtx& obj_ctx, return -ECANCELED; } + // bi expects empty instance for the entries created when bucket versioning + // is not enabled or suspended. + if (obj.key.instance == "null") { + obj.key.instance.clear(); + } attrs.erase(RGW_ATTR_ID_TAG); attrs.erase(RGW_ATTR_TAIL_TAG); @@ -5126,6 +5208,187 @@ int RGWRados::transition_obj(RGWObjectCtx& obj_ctx, return 0; } +int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, + RGWObjectCtx& obj_ctx, + RGWBucketInfo& dest_bucket_info, + const rgw_obj& dest_obj, + rgw_placement_rule& dest_placement, + RGWObjTier& tier_config, + real_time& mtime, + uint64_t olh_epoch, + std::optional<uint64_t> days, + const DoutPrefixProvider *dpp, + optional_yield y, + bool log_op){ + + //XXX: read below from attrs .. check transition_obj() + ACLOwner owner; + rgw::sal::Attrs attrs; + const req_context rctx{dpp, y, nullptr}; + int ret = 0; + bufferlist t, t_tier; + string tag; + append_rand_alpha(cct, tag, tag, 32); + auto aio = rgw::make_throttle(cct->_conf->rgw_put_obj_min_window_size, y); + using namespace rgw::putobj; + jspan_context no_trace{false, false}; + rgw::putobj::AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info, nullptr, + owner, obj_ctx, dest_obj, olh_epoch, tag, dpp, y, no_trace); + + void (*progress_cb)(off_t, void *) = NULL; + void *progress_data = NULL; + bool cb_processed = false; + RGWFetchObjFilter *filter; + RGWFetchObjFilter_Default source_filter; + if (!filter) { + filter = &source_filter; + } + boost::optional<RGWPutObj_Compress> compressor; + CompressorRef plugin; + RGWRadosPutObj cb(dpp, cct, plugin, compressor, &processor, progress_cb, progress_data, + [&](map<string, bufferlist> obj_attrs) { + // XXX: do we need filter() like in fetch_remote_obj() cb + dest_placement.inherit_from(dest_bucket_info.placement_rule); + /* For now we always restore to STANDARD storage-class. + * Later we will add support to take restore-target-storage-class + * for permanent restore + */ + dest_placement.storage_class = RGW_STORAGE_CLASS_STANDARD; + + processor.set_tail_placement(dest_placement); + + ret = processor.prepare(rctx.y); + if (ret < 0) { + return ret; + } + cb_processed = true; + return 0; + }); + + uint64_t accounted_size = 0; + string etag; + real_time set_mtime; + std::map<std::string, std::string> headers; + ldpp_dout(dpp, 20) << "Fetching from cloud, object:" << dest_obj << dendl; + ret = rgw_cloud_tier_get_object(tier_ctx, false, headers, + &set_mtime, etag, accounted_size, + attrs, &cb); + + if (ret < 0) { + ldpp_dout(dpp, 20) << "Fetching from cloud failed, object:" << dest_obj << dendl; + return ret; + } + + if (!cb_processed) { + ldpp_dout(dpp, 20) << "Callback not processed, object:" << dest_obj << dendl; + return -EIO; + } + + ret = cb.flush(); + if (ret < 0) { + return ret; + } + + if (cb.get_data_len() != accounted_size) { + ret = -EIO; + ldpp_dout(dpp, -1) << "ERROR: object truncated during fetching, expected " + << accounted_size << " bytes but received " << cb.get_data_len() << dendl; + return ret; + } + + { + bufferlist bl; + encode(rgw::sal::RGWRestoreStatus::CloudRestored, bl); + attrs[RGW_ATTR_RESTORE_STATUS] = std::move(bl); + } + + ceph::real_time restore_time = real_clock::now(); + { + bufferlist bl; + encode(restore_time, bl); + attrs[RGW_ATTR_RESTORE_TIME] = std::move(bl); + } + + real_time delete_at = real_time(); + if (days) { //temp copy; do not change mtime and set expiry date + int expiry_days = days.value(); + constexpr int32_t secs_in_a_day = 24 * 60 * 60; + ceph::real_time expiration_date ; + + if (cct->_conf->rgw_restore_debug_interval > 0) { + expiration_date = restore_time + make_timespan(double(expiry_days)*cct->_conf->rgw_restore_debug_interval); + ldpp_dout(dpp, 20) << "Setting expiration time to rgw_restore_debug_interval: " << double(expiry_days)*cct->_conf->rgw_restore_debug_interval << ", days:" << expiry_days << dendl; + } else { + expiration_date = restore_time + make_timespan(double(expiry_days) * secs_in_a_day); + } + delete_at = expiration_date; + + { + bufferlist bl; + encode(expiration_date, bl); + attrs[RGW_ATTR_RESTORE_EXPIRY_DATE] = std::move(bl); + } + { + bufferlist bl; + bl.clear(); + using ceph::encode; + encode(rgw::sal::RGWRestoreType::Temporary, bl); + attrs[RGW_ATTR_RESTORE_TYPE] = std::move(bl); + ldpp_dout(dpp, 20) << "Temporary restore, object:" << dest_obj << dendl; + } + { + string sc = tier_ctx.storage_class; + bufferlist bl; + bl.append(sc.c_str(), sc.size()); + attrs[RGW_ATTR_CLOUDTIER_STORAGE_CLASS] = std::move(bl); + ldpp_dout(dpp, 20) << "Setting RGW_ATTR_CLOUDTIER_STORAGE_CLASS: " << tier_ctx.storage_class << dendl; + } + //set same old mtime as that of transition time + set_mtime = mtime; + + // set tier-config only for temp restored objects, as + // permanent copies will be treated as regular objects + { + t.append("cloud-s3"); + encode(tier_config, t_tier); + attrs[RGW_ATTR_CLOUD_TIER_TYPE] = t; + attrs[RGW_ATTR_CLOUD_TIER_CONFIG] = t_tier; + } + + } else { // permanent restore + { + bufferlist bl; + bl.clear(); + using ceph::encode; + encode(rgw::sal::RGWRestoreType::Permanent, bl); + attrs[RGW_ATTR_RESTORE_TYPE] = std::move(bl); + ldpp_dout(dpp, 20) << "Permanent restore, object:" << dest_obj << dendl; + } + //set mtime to now() + set_mtime = real_clock::now(); + } + + { + string sc = dest_placement.get_storage_class(); //"STANDARD"; + bufferlist bl; + bl.append(sc.c_str(), sc.size()); + attrs[RGW_ATTR_STORAGE_CLASS] = std::move(bl); + } + + // XXX: handle COMPLETE_RETRY like in fetch_remote_obj + bool canceled = false; + rgw_zone_set zone_set{}; + ret = processor.complete(accounted_size, etag, &mtime, set_mtime, + attrs, rgw::cksum::no_cksum, delete_at , nullptr, nullptr, nullptr, + (rgw_zone_set *)&zone_set, &canceled, rctx, log_op ? rgw::sal::FLAG_LOG_OP : 0); + if (ret < 0) { + return ret; + } + + // XXX: handle olh_epoch for versioned objects like in fetch_remote_obj + return ret; +} + int RGWRados::check_bucket_empty(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y) { constexpr uint NUM_ENTRIES = 1000u; @@ -5222,7 +5485,7 @@ int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& ob } /* if the bucket is not synced we can remove the meta file */ - if (!svc.zone->is_syncing_bucket_meta(bucket)) { + if (!svc.zone->is_syncing_bucket_meta()) { RGWObjVersionTracker objv_tracker; r = ctl.bucket->remove_bucket_instance_info(bucket, bucket_info, y, dpp); if (r < 0) { @@ -5230,6 +5493,7 @@ int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& ob } /* remove bucket index objects asynchronously by best effort */ + maybe_warn_about_blocking(dpp); // TODO: use AioTrottle (void) CLSRGWIssueBucketIndexClean(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); @@ -5332,12 +5596,12 @@ int RGWRados::Object::complete_atomic_modification(const DoutPrefixProvider *dpp if (store->gc == nullptr) { ldpp_dout(dpp, 0) << "deleting objects inline since gc isn't initialized" << dendl; //Delete objects inline just in case gc hasn't been initialised, prevents crashes - store->delete_objs_inline(dpp, chain, tag); + store->delete_objs_inline(dpp, chain, tag, y); } else { auto [ret, leftover_chain] = store->gc->send_split_chain(chain, tag, y); // do it synchronously if (ret < 0 && leftover_chain) { //Delete objects inline if send chain to gc fails - store->delete_objs_inline(dpp, *leftover_chain, tag); + store->delete_objs_inline(dpp, *leftover_chain, tag, y); } } return 0; @@ -5366,35 +5630,44 @@ std::tuple<int, std::optional<cls_rgw_obj_chain>> RGWRados::send_chain_to_gc(cls return gc->send_split_chain(chain, tag, y); } -void RGWRados::delete_objs_inline(const DoutPrefixProvider *dpp, cls_rgw_obj_chain& chain, const string& tag) +void RGWRados::delete_objs_inline(const DoutPrefixProvider *dpp, cls_rgw_obj_chain& chain, + const string& tag, optional_yield y) { - string last_pool; - std::unique_ptr<IoCtx> ctx(new IoCtx); - int ret = 0; - for (auto liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) { - cls_rgw_obj& obj = *liter; - if (obj.pool != last_pool) { - ctx.reset(new IoCtx); - ret = rgw_init_ioctx(dpp, get_rados_handle(), obj.pool, *ctx); - if (ret < 0) { - last_pool = ""; - ldpp_dout(dpp, 0) << "ERROR: failed to create ioctx pool=" << - obj.pool << dendl; - continue; - } - last_pool = obj.pool; - } - ctx->locator_set_key(obj.loc); - const string& oid = obj.key.name; /* just stored raw oid there */ - ldpp_dout(dpp, 5) << "delete_objs_inline: removing " << obj.pool << - ":" << obj.key.name << dendl; + if (chain.objs.empty()) { + return; + } + + // initialize an IoCtx for the first object's pool. RGWObjManifest uses the + // same pool for all tail objects + auto obj = chain.objs.begin(); + + librados::IoCtx ioctx; + int ret = rgw_init_ioctx(dpp, get_rados_handle(), obj->pool, ioctx); + if (ret < 0) { + return; + } + + // issue deletions in parallel, up to max_aio at a time + auto aio = rgw::make_throttle(cct->_conf->rgw_multi_obj_del_max_aio, y); + static constexpr uint64_t cost = 1; // 1 throttle unit per request + static constexpr uint64_t id = 0; // ids unused + + for (; obj != chain.objs.end(); ++obj) { ObjectWriteOperation op; cls_refcount_put(op, tag, true); - ret = ctx->operate(oid, &op); - if (ret < 0) { - ldpp_dout(dpp, 5) << "delete_objs_inline: refcount put returned error " << ret << dendl; - } + + rgw_raw_obj raw; + raw.pool = std::move(obj->pool); + raw.oid = std::move(obj->key.name); + raw.loc = std::move(obj->loc); + + auto completed = aio->get(std::move(raw), rgw::Aio::librados_op( + ioctx, std::move(op), y), cost, id); + std::ignore = rgw::check_for_errors(completed); } + + auto completed = aio->drain(); + std::ignore = rgw::check_for_errors(completed); } static void accumulate_raw_stats(const rgw_bucket_dir_header& header, @@ -5435,6 +5708,7 @@ int RGWRados::bucket_check_index(const DoutPrefixProvider *dpp, RGWBucketInfo& b bucket_objs_ret.emplace(iter.first, rgw_cls_check_index_ret()); } + maybe_warn_about_blocking(dpp); // TODO: use AioTrottle ret = CLSRGWIssueBucketCheck(index_pool, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); if (ret < 0) { return ret; @@ -5459,6 +5733,7 @@ int RGWRados::bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo& return r; } + maybe_warn_about_blocking(dpp); // TODO: use AioTrottle return CLSRGWIssueBucketRebuild(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } @@ -5596,7 +5871,9 @@ int RGWRados::bucket_resync_encrypted_multipart(const DoutPrefixProvider* dpp, return 0; } -int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry) +int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const cls_rgw_bucket_instance_entry& entry) { librados::IoCtx index_pool; map<int, string> bucket_objs; @@ -5609,6 +5886,7 @@ int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketI return r; } + maybe_warn_about_blocking(dpp); // TODO: use AioTrottle r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << @@ -5855,20 +6133,29 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi store->remove_rgw_head_obj(op); + if (params.check_objv != nullptr) { + cls_version_check(op, *params.check_objv, VER_COND_EQ); + } + auto& ioctx = ref.ioctx; - r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y); + version_t epoch = 0; + r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y, 0, nullptr, &epoch); /* raced with another operation, object state is indeterminate */ const bool need_invalidate = (r == -ECANCELED); int64_t poolid = ioctx.get_id(); - if (r >= 0) { + if (r == -ETIMEDOUT) { + // rgw can't determine whether or not the delete succeeded, shouldn't be calling either of complete_del() or cancel() + // leaving that pending entry in the index so that bucket listing can recover with check_disk_state() and cls_rgw_suggest_changes() + ldpp_dout(dpp, 0) << "ERROR: rgw_rados_operate returned r=" << r << dendl; + } else if (r >= 0 || r == -ENOENT) { tombstone_cache_t *obj_tombstone_cache = store->get_tombstone_cache(); if (obj_tombstone_cache) { tombstone_entry entry{*state}; obj_tombstone_cache->add(obj, entry); } - r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y, log_op); + r = index_op.complete_del(dpp, poolid, epoch, state->mtime, params.remove_objs, y, log_op); int ret = target->complete_atomic_modification(dpp, y); if (ret < 0) { @@ -6049,7 +6336,7 @@ int RGWRados::get_obj_state_impl(const DoutPrefixProvider *dpp, RGWObjectCtx *oc int r = -ENOENT; if (!assume_noent) { - r = RGWRados::raw_obj_stat(dpp, raw_obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : NULL), NULL, y); + r = RGWRados::raw_obj_stat(dpp, raw_obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : NULL), &s->objv_tracker, y); } if (r == -ENOENT) { @@ -6589,7 +6876,8 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu struct timespec mtime_ts = real_clock::to_timespec(mtime); op.mtime2(&mtime_ts); auto& ioctx = ref.ioctx; - r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y); + version_t epoch = 0; + r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y, 0, nullptr, &epoch); if (state) { if (r >= 0) { ACLOwner owner; @@ -6620,11 +6908,29 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu iter != state->attrset.end()) { storage_class = rgw_bl_str(iter->second); } - uint64_t epoch = ioctx.get_last_version(); int64_t poolid = ioctx.get_id(); + + // Retain Object category as CloudTiered while restore is in + // progress or failed + RGWObjCategory category = RGWObjCategory::Main; + auto r_iter = attrs.find(RGW_ATTR_RESTORE_STATUS); + if (r_iter != attrs.end()) { + rgw::sal::RGWRestoreStatus st = rgw::sal::RGWRestoreStatus::None; + auto iter = r_iter->second.cbegin(); + + try { + using ceph::decode; + decode(st, iter); + + if (st != rgw::sal::RGWRestoreStatus::CloudRestored) { + category = RGWObjCategory::CloudTiered; + } + } catch (buffer::error& err) { + } + } r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size, mtime, etag, content_type, storage_class, owner, - RGWObjCategory::Main, nullptr, y, nullptr, false, log_op); + category, nullptr, y, nullptr, false, log_op); } else { int ret = index_op.cancel(dpp, nullptr, y, log_op); if (ret < 0) { @@ -6656,13 +6962,13 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu } return 0; -} +} /* RGWRados::set_attrs() */ -static int get_part_obj_state(const DoutPrefixProvider* dpp, optional_yield y, - RGWRados* store, RGWBucketInfo& bucket_info, - RGWObjectCtx* rctx, RGWObjManifest* manifest, - int part_num, int* parts_count, bool prefetch, - RGWObjState** pstate, RGWObjManifest** pmanifest) +int RGWRados::get_part_obj_state(const DoutPrefixProvider* dpp, optional_yield y, + RGWRados* store, RGWBucketInfo& bucket_info, + RGWObjectCtx* rctx, RGWObjManifest* manifest, + int part_num, int* parts_count, bool prefetch, + RGWObjState** pstate, RGWObjManifest** pmanifest) { if (!manifest) { return -ERR_INVALID_PART; @@ -6741,6 +7047,9 @@ static int get_part_obj_state(const DoutPrefixProvider* dpp, optional_yield y, // update the object size sm->state.size = part_manifest.get_obj_size(); + if (!sm->state.attrset.count(RGW_ATTR_COMPRESSION)) { + sm->state.accounted_size = sm->state.size; + } *pmanifest = &part_manifest; return 0; @@ -6782,9 +7091,20 @@ int RGWRados::Object::Read::prepare(optional_yield y, const DoutPrefixProvider * return -ENOENT; } + if (params.objv_tracker) { + *params.objv_tracker = astate->objv_tracker; + } + RGWBucketInfo& bucket_info = source->get_bucket_info(); if (params.part_num) { + map<string, bufferlist> src_attrset; + for (auto& iter : astate->attrset) { + if (boost::algorithm::starts_with(iter.first, RGW_ATTR_CRYPT_PREFIX)) { + ldpp_dout(dpp, 4) << "get src crypt attr: " << iter.first << dendl; + src_attrset[iter.first] = iter.second; + } + } int parts_count = 0; // use the manifest to redirect to the requested part number r = get_part_obj_state(dpp, y, store, bucket_info, &source->get_ctx(), @@ -6807,6 +7127,13 @@ int RGWRados::Object::Read::prepare(optional_yield y, const DoutPrefixProvider * } else { params.parts_count = parts_count; } + + for (auto& iter : src_attrset) { + ldpp_dout(dpp, 4) << "copy crypt attr: " << iter.first << dendl; + if (astate->attrset.find(iter.first) == astate->attrset.end()) { + astate->attrset[iter.first] = std::move(iter.second); + } + } } state.obj = astate->obj; @@ -6966,6 +7293,10 @@ int RGWRados::Bucket::UpdateIndex::guard_reshard(const DoutPrefixProvider *dpp, *pbs = bs; } + if (target->bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) { + store->check_reshard_logrecord_status(target->bucket_info, y, dpp); + } + return 0; } @@ -7221,7 +7552,7 @@ int RGWRados::Object::Read::read(int64_t ofs, int64_t end, state.cur_ioctx->locator_set_key(read_obj.loc); - r = state.cur_ioctx->operate(read_obj.oid, &op, NULL); + r = rgw_rados_operate(dpp, *state.cur_ioctx, read_obj.oid, &op, nullptr, y); ldpp_dout(dpp, 20) << "rados->read r=" << r << " bl.length=" << bl.length() << dendl; if (r < 0) { @@ -7659,9 +7990,76 @@ int RGWRados::guard_reshard(const DoutPrefixProvider *dpp, return r; } + if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) { + check_reshard_logrecord_status(bucket_info, y, dpp); + } + return 0; } +int RGWRados::check_reshard_logrecord_status(RGWBucketInfo& bucket_info, optional_yield y, + const DoutPrefixProvider *dpp) +{ + real_time now = real_clock::now(); + double r = rand() / (double)RAND_MAX; + double reshard_progress_judge_interval = cct->_conf.get_val<uint64_t>("rgw_reshard_progress_judge_interval"); + // avoid getting reshard_lock simultaneously by mass differrent operation + reshard_progress_judge_interval += + reshard_progress_judge_interval * cct->_conf.get_val<double>("rgw_reshard_progress_judge_ratio") * r; + if (now - bucket_info.layout.judge_reshard_lock_time >= make_timespan(reshard_progress_judge_interval)) { + + map<string, bufferlist> bucket_attrs; + int ret = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.bucket.name, + bucket_info, nullptr, y, dpp, &bucket_attrs); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + " ERROR: failed to refresh bucket info : " << cpp_strerror(-ret) << dendl; + return ret; + } + if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord && + now - bucket_info.layout.judge_reshard_lock_time >= make_timespan(reshard_progress_judge_interval)) + return recover_reshard_logrecord(bucket_info, bucket_attrs, y, dpp); + } + return 0; +} + +int RGWRados::recover_reshard_logrecord(RGWBucketInfo& bucket_info, + map<string, bufferlist>& bucket_attrs, + optional_yield y, + const DoutPrefixProvider *dpp) +{ + RGWBucketReshardLock reshard_lock(this->driver, bucket_info, true); + int ret = reshard_lock.lock(dpp); + if (ret < 0) { + ldpp_dout(dpp, 20) << __func__ << + " INFO: failed to take reshard lock for bucket " << + bucket_info.bucket.bucket_id << "; expected if resharding underway" << dendl; + // update the judge time + bucket_info.layout.judge_reshard_lock_time = real_clock::now(); + ret = put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << + " ERROR: error putting bucket instance info: " << cpp_strerror(-ret) << dendl; + } + } else { + ldpp_dout(dpp,20) << __func__ << ": reshard lock success, " << + "that means the reshard has failed for bucekt " << bucket_info.bucket.bucket_id << dendl; + // clear the RESHARD_IN_PROGRESS status after reshard failed, set bucket instance status + // to CLS_RGW_RESHARD_NONE, also clear the reshard log entries + ret = RGWBucketReshard::clear_resharding(this->driver, bucket_info, bucket_attrs, dpp, y); + reshard_lock.unlock(); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + " ERROR: failed to clear resharding flags for bucket " << + bucket_info.bucket.bucket_id << dendl; + } else { + ldpp_dout(dpp, 5) << __func__ << + " INFO: apparently successfully cleared resharding flags for " + "bucket " << bucket_info.bucket.bucket_id << dendl; + } // if clear resharding succeeded + } // if taking of lock succeeded + return 0; +} int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, const rgw_obj& obj_instance, @@ -7727,7 +8125,7 @@ int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, return ret; } - if (!entry.resharding_in_progress()) { + if (!entry.resharding()) { ret = fetch_new_bucket_info("get_bucket_resharding_succeeded"); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << @@ -8708,13 +9106,9 @@ int RGWRados::raw_obj_stat(const DoutPrefixProvider *dpp, if (first_chunk) { op.read(0, cct->_conf->rgw_max_chunk_size, first_chunk, NULL); } - bufferlist outbl; - r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, &outbl, y); - - if (epoch) { - *epoch = ref.ioctx.get_last_version(); - } + bufferlist outbl; + r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, &outbl, y, 0, nullptr, epoch); if (r < 0) return r; @@ -9202,7 +9596,8 @@ int RGWRados::bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, const string& obj_name_filter, const string& marker, uint32_t max, - list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y) + list<rgw_cls_bi_entry> *entries, bool *is_truncated, + bool reshardlog, optional_yield y) { rgw_obj obj(bucket, obj_name_filter); BucketShard bs(this); @@ -9213,7 +9608,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, } auto& ref = bs.bucket_obj; - ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated); + ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog); if (ret == -ENOENT) { *is_truncated = false; } @@ -9224,10 +9619,10 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, } int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const string& marker, uint32_t max, - list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y) + list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y) { auto& ref = bs.bucket_obj; - int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated); + int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog); if (ret < 0) return ret; @@ -9236,7 +9631,7 @@ int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const stri int RGWRados::bi_list(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, const string& obj_name_filter, const string& marker, uint32_t max, - list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y) + list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y) { BucketShard bs(this); int ret = bs.init(dpp, bucket_info, @@ -9247,7 +9642,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, return ret; } - return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, y); + return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, reshardlog, y); } int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs) @@ -9265,6 +9660,18 @@ int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs) return 0; } +int RGWRados::trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y) +{ + librados::IoCtx index_pool; + map<int, string> bucket_objs; + + int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr); + if (r < 0) { + return r; + } + return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); +} + int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op, optional_yield y) { return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, y); @@ -9291,13 +9698,6 @@ int RGWRados::process_gc(bool expired_only, optional_yield y) return gc->process(expired_only, y); } -int RGWRados::list_lc_progress(string& marker, uint32_t max_entries, - vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>>& progress_map, - int& index) -{ - return lc->list_lc_progress(marker, max_entries, progress_map, index); -} - int RGWRados::process_lc(const std::unique_ptr<rgw::sal::Bucket>& optional_bucket) { RGWLC lc; @@ -9430,6 +9830,7 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(const DoutPrefixProvider *dpp, RGWB if (r < 0) return r; + maybe_warn_about_blocking(dpp); // TODO: use AioTrottle return CLSRGWIssueSetTagTimeout(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)(); } @@ -9559,8 +9960,15 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp, num_entries << " total entries" << dendl; auto& ioctx = index_pool; + + // XXX: check_disk_state() relies on ioctx.get_last_version() but that + // returns 0 because CLSRGWIssueBucketList doesn't make any synchonous calls + rgw_bucket_entry_ver index_ver; + index_ver.pool = ioctx.get_id(); + std::map<int, rgw_cls_list_ret> shard_list_results; cls_rgw_obj_key start_after_key(start_after.name, start_after.instance); + maybe_warn_about_blocking(dpp); // TODO: use AioTrottle r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter, num_entries_per_shard, list_versions, shard_oids, shard_list_results, @@ -9682,12 +10090,10 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp, /* there are uncommitted ops. We need to check the current * state, and if the tags are old we need to do clean-up as * well. */ - librados::IoCtx sub_ctx; - sub_ctx.dup(ioctx); ldout_bitx(bitx, dpp, 20) << "INFO: " << __func__ << " calling check_disk_state bucket=" << bucket_info.bucket << " entry=" << dirent.key << dendl_bitx; - r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent, + r = check_disk_state(dpp, bucket_info, index_ver, dirent, dirent, updates[tracker.oid_name], y); if (r < 0 && r != -ENOENT) { ldpp_dout(dpp, 0) << __func__ << @@ -9755,6 +10161,8 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp, for (auto& miter : updates) { if (miter.second.length()) { ObjectWriteOperation o; + o.assert_exists(); + cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING); cls_rgw_suggest_changes(o, miter.second); // we don't care if we lose suggested updates, send them off blindly AioCompletion *c = @@ -9907,6 +10315,9 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp, } } + rgw_bucket_entry_ver index_ver; + index_ver.pool = ioctx.get_id(); + uint32_t count = 0u; std::map<std::string, bufferlist> updates; rgw_obj_index_key last_added_entry; @@ -9921,7 +10332,7 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp, cls_rgw_bucket_list_op(op, marker, prefix, empty_delimiter, num_entries, list_versions, &result); - r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y); + r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, y, 0, nullptr, &index_ver.epoch); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << ": error in rgw_rados_operate (bucket list op), r=" << r << dendl; @@ -9938,12 +10349,10 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp, force_check) { /* there are uncommitted ops. We need to check the current state, * and if the tags are old we need to do cleanup as well. */ - librados::IoCtx sub_ctx; - sub_ctx.dup(ioctx); ldout_bitx(bitx, dpp, 20) << "INFO: " << __func__ << ": calling check_disk_state bucket=" << bucket_info.bucket << " entry=" << dirent.key << dendl_bitx; - r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent, updates[oid], y); + r = check_disk_state(dpp, bucket_info, index_ver, dirent, dirent, updates[oid], y); if (r < 0 && r != -ENOENT) { ldpp_dout(dpp, 0) << "ERROR: " << __func__ << ": error in check_disk_state, r=" << r << dendl; @@ -9990,6 +10399,8 @@ check_updates: for (; miter != updates.end(); ++miter) { if (miter->second.length()) { ObjectWriteOperation o; + o.assert_exists(); + cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING); cls_rgw_suggest_changes(o, miter->second); // we don't care if we lose suggested updates, send them off blindly AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr); @@ -10173,8 +10584,8 @@ int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp, } int RGWRados::check_disk_state(const DoutPrefixProvider *dpp, - librados::IoCtx io_ctx, RGWBucketInfo& bucket_info, + const rgw_bucket_entry_ver& index_ver, rgw_bucket_dir_entry& list_state, rgw_bucket_dir_entry& object, bufferlist& suggested_updates, @@ -10202,8 +10613,6 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp, ldpp_dout(dpp, 0) << "WARNING: generated locator (" << loc << ") is different from listed locator (" << list_state.locator << ")" << dendl; } - io_ctx.locator_set_key(list_state.locator); - RGWObjState *astate = NULL; RGWObjManifest *manifest = nullptr; RGWObjectCtx octx(this->driver); @@ -10224,8 +10633,7 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp, } // encode a suggested removal of that key - list_state.ver.epoch = io_ctx.get_last_version(); - list_state.ver.pool = io_ctx.get_id(); + list_state.ver = index_ver; ldout_bitx(bitx, dpp, 10) << "INFO: " << __func__ << ": encoding remove of " << list_state.key << " on suggested_updates" << dendl_bitx; cls_rgw_encode_suggestion(CEPH_RGW_REMOVE | suggest_flag, list_state, suggested_updates); return -ENOENT; @@ -10341,6 +10749,7 @@ int RGWRados::cls_bucket_head(const DoutPrefixProvider *dpp, const RGWBucketInfo return r; } + maybe_warn_about_blocking(dpp); // TODO: use AioTrottle r = CLSRGWIssueGetDirHeader(index_pool, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) { ldpp_dout(dpp, 20) << "cls_bucket_head: CLSRGWIssueGetDirHeader() returned " |