summaryrefslogtreecommitdiffstats
path: root/src/rgw/driver/rados/rgw_reshard.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/driver/rados/rgw_reshard.cc')
-rw-r--r--src/rgw/driver/rados/rgw_reshard.cc110
1 files changed, 72 insertions, 38 deletions
diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc
index 3ff4915dd75..2625b979233 100644
--- a/src/rgw/driver/rados/rgw_reshard.cc
+++ b/src/rgw/driver/rados/rgw_reshard.cc
@@ -237,7 +237,8 @@ public:
return 0;
}
- int flush(bool process_log = false) {
+ int flush(bool process_log = false, RGWBucketReshard *br = nullptr,
+ const DoutPrefixProvider *dpp = nullptr) {
if (entries.size() == 0) {
return 0;
}
@@ -292,6 +293,13 @@ public:
}
entries.clear();
stats.clear();
+
+ if (br != nullptr) {
+ ret = br->renew_lock_if_needed(dpp);
+ if (ret < 0) {
+ return ret;
+ }
+ }
return 0;
}
@@ -353,10 +361,11 @@ public:
return 0;
}
- int finish(bool process_log = false) {
+ int finish(bool process_log = false, RGWBucketReshard *br = nullptr,
+ const DoutPrefixProvider *dpp = nullptr) {
int ret = 0;
for (auto& shard : target_shards) {
- int r = shard.flush(process_log);
+ int r = shard.flush(process_log, br, dpp);
if (r < 0) {
derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
ret = r;
@@ -934,7 +943,7 @@ int RGWBucketReshard::cancel(const DoutPrefixProvider* dpp, optional_yield y)
return ret;
}
- if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress ||
+ if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress &&
bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) {
ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl;
ret = -EINVAL;
@@ -1032,13 +1041,55 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) {
return 0;
}
+int RGWBucketReshard::renew_lock_if_needed(const DoutPrefixProvider *dpp) {
+ int ret = 0;
+ Clock::time_point now = Clock::now();
+ if (reshard_lock.should_renew(now)) {
+ // assume outer locks have timespans at least the size of ours, so
+ // can call inside conditional
+ if (outer_reshard_lock) {
+ ret = outer_reshard_lock->renew(now);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ ret = reshard_lock.renew(now);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
+ return ret;
+ }
+ }
+ return 0;
+}
+
+int RGWBucketReshard::calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key,
+ int& shard, const DoutPrefixProvider *dpp) {
+ int target_shard_id, ret;
+
+ rgw_obj obj(bucket_info.bucket, key);
+ RGWMPObj mp;
+ if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
+ // place the multipart .meta object on the same shard as its head object
+ obj.index_hash_source = mp.get_key();
+ }
+ ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
+ obj.get_hash_object(), &target_shard_id);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+ return ret;
+ }
+ shard = (target_shard_id > 0 ? target_shard_id : 0);
+
+ return 0;
+}
+
int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current,
- int& max_op_entries,
- BucketReshardManager& target_shards_mgr,
- bool verbose_json_out,
- ostream *out,
- Formatter *formatter, rgw::BucketReshardState reshard_stage,
- const DoutPrefixProvider *dpp, optional_yield y)
+ int& max_op_entries,
+ BucketReshardManager& target_shards_mgr,
+ bool verbose_json_out,
+ ostream *out,
+ Formatter *formatter, rgw::BucketReshardState reshard_stage,
+ const DoutPrefixProvider *dpp, optional_yield y)
{
list<rgw_cls_bi_entry> entries;
@@ -1065,6 +1116,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
uint64_t stage_entries = 0;
stage.append(":");
if (!verbose_json_out && out) {
+ (*out) << "start time: " << real_clock::now() << std::endl;
(*out) << stage;
}
@@ -1103,7 +1155,6 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
marker = entry.idx;
- int target_shard_id;
cls_rgw_obj_key cls_key;
RGWObjCategory category;
rgw_bucket_category_stats stats;
@@ -1116,43 +1167,24 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl;
continue;
}
- rgw_obj obj(bucket_info.bucket, key);
- RGWMPObj mp;
- if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
- // place the multipart .meta object on the same shard as its head object
- obj.index_hash_source = mp.get_key();
- }
- ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
- obj.get_hash_object(), &target_shard_id);
+
+ int shard_index;
+ ret = calc_target_shard(bucket_info, key, shard_index, dpp);
if (ret < 0) {
- ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
return ret;
}
- int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
-
ret = target_shards_mgr.add_entry(shard_index, entry, account,
category, stats, process_log);
if (ret < 0) {
return ret;
}
- Clock::time_point now = Clock::now();
- if (reshard_lock.should_renew(now)) {
- // assume outer locks have timespans at least the size of ours, so
- // can call inside conditional
- if (outer_reshard_lock) {
- ret = outer_reshard_lock->renew(now);
- if (ret < 0) {
- return ret;
- }
- }
- ret = reshard_lock.renew(now);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
- return ret;
- }
+ ret = renew_lock_if_needed(dpp);
+ if (ret < 0) {
+ return ret;
}
+
if (verbose_json_out) {
formatter->close_section();
formatter->flush(*out);
@@ -1168,13 +1200,15 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
formatter->flush(*out);
} else if (out) {
(*out) << " " << stage_entries << std::endl;
+ (*out) << "end time: " << real_clock::now() << std::endl;
}
- int ret = target_shards_mgr.finish(process_log);
+ int ret = target_shards_mgr.finish(process_log, this, dpp);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl;
return -EIO;
}
+
return 0;
}