diff options
-rw-r--r-- | src/cls/rgw/cls_rgw.cc | 1 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 7 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_client.h | 2 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_rados.cc | 2 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_rados.h | 2 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_reshard.cc | 110 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_reshard.h | 5 |
7 files changed, 84 insertions, 45 deletions
diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index af520b6bb0e..8f0190d4218 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -2914,7 +2914,6 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist * } rgw_cls_bi_entry& entry = op.entry; - if (entry.type == BIIndexType::ReshardDeleted) { int r = cls_cxx_map_remove_key(hctx, entry.idx); if (r < 0) { diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index a9b1a5bdb1c..c5ac99eada0 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -471,12 +471,12 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid, } int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, - std::set<std::string> log_entries_wanted, + std::set<std::string>& log_entries_wanted, std::list<rgw_cls_bi_entry> *entries) { bufferlist in, out; struct rgw_cls_bi_get_vals_op call; - call.log_entries_wanted = log_entries_wanted; + call.log_entries_wanted = std::move(log_entries_wanted); encode(call, in); int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET_VALS, in, out); if (r < 0) @@ -490,7 +490,8 @@ int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, return -EIO; } - entries->swap(op_ret.entries); + if (entries) + entries->swap(op_ret.entries); return 0; } diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index c5336030c07..86c40dc9278 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -384,7 +384,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const std::string oid, BIIndexType index_type, const cls_rgw_obj_key& key, rgw_cls_bi_entry *entry); int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, - std::set<std::string> log_entries_wanted, + std::set<std::string>& log_entries_wanted, std::list<rgw_cls_bi_entry> *entries); int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry); void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 1df24538067..4be7264d32f 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -9239,7 +9239,7 @@ int RGWRados::bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_ return cls_rgw_bi_get(ref.ioctx, ref.obj.oid, index_type, key, entry); } -int RGWRados::bi_get_vals(BucketShard& bs, set<string> log_entries_wanted, +int RGWRados::bi_get_vals(BucketShard& bs, set<string>& log_entries_wanted, list<rgw_cls_bi_entry> *entries, optional_yield y) { auto& ref = bs.bucket_obj; diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 4321cddf40e..a2c55b585d4 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1522,7 +1522,7 @@ public: int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent, optional_yield y); int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh, optional_yield y); int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry, optional_yield y); - int bi_get_vals(BucketShard& bs, std::set<std::string> log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y); + int bi_get_vals(BucketShard& bs, std::set<std::string>& log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y); void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y); int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y); int bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry, optional_yield y); diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index 3ff4915dd75..2625b979233 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -237,7 +237,8 @@ public: return 0; } - int flush(bool process_log = false) { + int flush(bool process_log = false, RGWBucketReshard *br = nullptr, + const DoutPrefixProvider *dpp = nullptr) { if (entries.size() == 0) { return 0; } @@ -292,6 +293,13 @@ public: } entries.clear(); stats.clear(); + + if (br != nullptr) { + ret = br->renew_lock_if_needed(dpp); + if (ret < 0) { + return ret; + } + } return 0; } @@ -353,10 +361,11 @@ public: return 0; } - int finish(bool process_log = false) { + int finish(bool process_log = false, RGWBucketReshard *br = nullptr, + const DoutPrefixProvider *dpp = nullptr) { int ret = 0; for (auto& shard : target_shards) { - int r = shard.flush(process_log); + int r = shard.flush(process_log, br, dpp); if (r < 0) { derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl; ret = r; @@ -934,7 +943,7 @@ int RGWBucketReshard::cancel(const DoutPrefixProvider* dpp, optional_yield y) return ret; } - if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress || + if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress && bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) { ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl; ret = -EINVAL; @@ -1032,13 +1041,55 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) { return 0; } +int RGWBucketReshard::renew_lock_if_needed(const DoutPrefixProvider *dpp) { + int ret = 0; + Clock::time_point now = Clock::now(); + if (reshard_lock.should_renew(now)) { + // assume outer locks have timespans at least the size of ours, so + // can call inside conditional + if (outer_reshard_lock) { + ret = outer_reshard_lock->renew(now); + if (ret < 0) { + return ret; + } + } + ret = reshard_lock.renew(now); + if (ret < 0) { + ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl; + return ret; + } + } + return 0; +} + +int RGWBucketReshard::calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, + int& shard, const DoutPrefixProvider *dpp) { + int target_shard_id, ret; + + rgw_obj obj(bucket_info.bucket, key); + RGWMPObj mp; + if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { + // place the multipart .meta object on the same shard as its head object + obj.index_hash_source = mp.get_key(); + } + ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal, + obj.get_hash_object(), &target_shard_id); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; + return ret; + } + shard = (target_shard_id > 0 ? target_shard_id : 0); + + return 0; +} + int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current, - int& max_op_entries, - BucketReshardManager& target_shards_mgr, - bool verbose_json_out, - ostream *out, - Formatter *formatter, rgw::BucketReshardState reshard_stage, - const DoutPrefixProvider *dpp, optional_yield y) + int& max_op_entries, + BucketReshardManager& target_shards_mgr, + bool verbose_json_out, + ostream *out, + Formatter *formatter, rgw::BucketReshardState reshard_stage, + const DoutPrefixProvider *dpp, optional_yield y) { list<rgw_cls_bi_entry> entries; @@ -1065,6 +1116,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& uint64_t stage_entries = 0; stage.append(":"); if (!verbose_json_out && out) { + (*out) << "start time: " << real_clock::now() << std::endl; (*out) << stage; } @@ -1103,7 +1155,6 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& marker = entry.idx; - int target_shard_id; cls_rgw_obj_key cls_key; RGWObjCategory category; rgw_bucket_category_stats stats; @@ -1116,43 +1167,24 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl; continue; } - rgw_obj obj(bucket_info.bucket, key); - RGWMPObj mp; - if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { - // place the multipart .meta object on the same shard as its head object - obj.index_hash_source = mp.get_key(); - } - ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal, - obj.get_hash_object(), &target_shard_id); + + int shard_index; + ret = calc_target_shard(bucket_info, key, shard_index, dpp); if (ret < 0) { - ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; return ret; } - int shard_index = (target_shard_id > 0 ? target_shard_id : 0); - ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats, process_log); if (ret < 0) { return ret; } - Clock::time_point now = Clock::now(); - if (reshard_lock.should_renew(now)) { - // assume outer locks have timespans at least the size of ours, so - // can call inside conditional - if (outer_reshard_lock) { - ret = outer_reshard_lock->renew(now); - if (ret < 0) { - return ret; - } - } - ret = reshard_lock.renew(now); - if (ret < 0) { - ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl; - return ret; - } + ret = renew_lock_if_needed(dpp); + if (ret < 0) { + return ret; } + if (verbose_json_out) { formatter->close_section(); formatter->flush(*out); @@ -1168,13 +1200,15 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& formatter->flush(*out); } else if (out) { (*out) << " " << stage_entries << std::endl; + (*out) << "end time: " << real_clock::now() << std::endl; } - int ret = target_shards_mgr.finish(process_log); + int ret = target_shards_mgr.finish(process_log, this, dpp); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl; return -EIO; } + return 0; } diff --git a/src/rgw/driver/rados/rgw_reshard.h b/src/rgw/driver/rados/rgw_reshard.h index ea5bb6e713d..3d056e50f46 100644 --- a/src/rgw/driver/rados/rgw_reshard.h +++ b/src/rgw/driver/rados/rgw_reshard.h @@ -84,6 +84,9 @@ class RGWBucketReshard { // using an initializer_list as an array in contiguous memory // allocated in at once static const std::initializer_list<uint16_t> reshard_primes; + + int calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, + int& shard, const DoutPrefixProvider *dpp); int reshard_process(const rgw::bucket_index_layout_generation& current, int& max_entries, BucketReshardManager& target_shards_mgr, @@ -91,6 +94,7 @@ class RGWBucketReshard { std::ostream *out, Formatter *formatter, rgw::BucketReshardState reshard_stage, const DoutPrefixProvider *dpp, optional_yield y); + int do_reshard(const rgw::bucket_index_layout_generation& current, const rgw::bucket_index_layout_generation& target, int max_entries, bool support_logrecord, @@ -115,6 +119,7 @@ public: RGWReshard *reshard_log = nullptr); int get_status(const DoutPrefixProvider *dpp, std::list<cls_rgw_bucket_instance_entry> *status); int cancel(const DoutPrefixProvider* dpp, optional_yield y); + int renew_lock_if_needed(const DoutPrefixProvider *dpp); static int clear_resharding(rgw::sal::RadosStore* store, RGWBucketInfo& bucket_info, |