summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2019-10-30 22:51:12 +0100
committerYehuda Sadeh <yehuda@redhat.com>2020-01-28 19:20:37 +0100
commitc4deb504921324430c74daf6013aad44502a07da (patch)
treebd34e8f5f017e66e1c0c45bc51ff806a87e0d1be
parentrgw: sync: bucket sync manager adjustments for new system (diff)
downloadceph-c4deb504921324430c74daf6013aad44502a07da.tar.xz
ceph-c4deb504921324430c74daf6013aad44502a07da.zip
rgw: data sync: more work towards new policy integration
RGWGetBucketPeersCR will be able to return correct buckets that need to sync, either by a target bucket, and/or a source zone and a source bucket (partially implemented). Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r--src/rgw/rgw_bucket_sync.h14
-rw-r--r--src/rgw/rgw_data_sync.cc351
-rw-r--r--src/rgw/rgw_data_sync.h3
3 files changed, 224 insertions, 144 deletions
diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h
index c62f4cf2edb..2368ffb097d 100644
--- a/src/rgw/rgw_bucket_sync.h
+++ b/src/rgw/rgw_bucket_sync.h
@@ -113,6 +113,20 @@ public:
struct pipe_set {
std::set<rgw_sync_bucket_pipe> pipes;
+ using iterator = std::set<rgw_sync_bucket_pipe>::iterator;
+
+ void insert(const rgw_sync_bucket_pipe& pipe) {
+ pipes.insert(pipe);
+ }
+
+ iterator begin() {
+ return pipes.begin();
+ }
+
+ iterator end() {
+ return pipes.end();
+ }
+
void dump(ceph::Formatter *f) const;
};
diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc
index f86d08582c7..2e0e3ad5f2a 100644
--- a/src/rgw/rgw_data_sync.cc
+++ b/src/rgw/rgw_data_sync.cc
@@ -48,6 +48,7 @@ static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
static string bucket_status_oid_prefix = "bucket.sync-status";
static string object_status_oid_prefix = "bucket.sync-status";
static string bucket_sync_sources_oid_prefix = "bucket.sync-sources";
+static string bucket_sync_targets_oid_prefix = "bucket.sync-targets";
void rgw_datalog_info::decode_json(JSONObj *obj) {
@@ -67,29 +68,6 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("entries", entries, obj);
};
-#warning FIXME
-#if 0
-void rgw_sync_flow_rule::get_zone_peers(const string& zone_id,
- std::set<string> *sources,
- std::set<string> *targets) const
-{
- sources->clear();
- targets->clear();
-
- if (directional) {
- if (directional->target_zone == zone_id) {
- sources->insert(directional->source_zone);
- } else if (directional->source_zone == zone_id) {
- targets->insert(directional->target_zone);
- }
- } else if (symmetrical &&
- symmetrical->find(zone_id) != symmetrical->end()) {
- *sources = *symmetrical;
- sources->erase(zone_id);
- *targets = *sources;
- }
-}
-#endif
class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
static constexpr int MAX_CONCURRENT_SHARDS = 16;
@@ -1071,6 +1049,26 @@ public:
int operate() override;
};
+class RGWRunBucketsSyncBySourceCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+
+ rgw_bucket_shard source;
+
+ RGWSyncTraceNodeRef tn;
+
+public:
+ RGWRunBucketsSyncBySourceCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& _source, const RGWSyncTraceNodeRef& _tn_parent)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source(_source),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "source",
+ SSTR(bucket_shard_str{_source} ))) {
+ }
+ ~RGWRunBucketsSyncBySourceCR() override {
+ }
+
+ int operate() override;
+};
+
class RGWDataSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
@@ -1124,7 +1122,7 @@ public:
sync_pair.source_bs = bs;
sync_pair.dest_bs = bs;
#warning init pipe fields
- call(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn));
+ call(new RGWRunBucketsSyncBySourceCR(sc, bs, tn));
}
} while (marker_tracker && marker_tracker->need_retry(raw_key));
@@ -3351,7 +3349,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
return 0;
}
-class RGWBucketSyncSourcesManager {
+class RGWBucketSyncPeersManager {
public:
static string sync_sources_oid(const rgw_bucket bucket) {
return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
@@ -3360,6 +3358,13 @@ public:
static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) {
return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
}
+ static string sync_targets_oid(const rgw_bucket bucket) {
+ return bucket_sync_targets_oid_prefix + "." + bucket.get_key();
+ }
+
+ static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) {
+ return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(bucket));
+ }
};
struct rgw_bucket_sync_source_local_info {
@@ -3416,25 +3421,19 @@ struct rgw_bucket_sync_sources_local_info {
};
WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
-class RGWGetBucketSourcePeersCR : public RGWCoroutine {
+class RGWGetBucketPeersCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- rgw_bucket bucket;
+ std::optional<rgw_bucket> target_bucket;
std::optional<string> source_zone;
+ std::optional<rgw_bucket> source_bucket;
- map<string, RGWBucketSyncFlowManager::pipe_set> *sources;
+ RGWBucketSyncFlowManager::pipe_set *sources;
map<rgw_bucket, RGWBucketInfo> *sources_info;
map<rgw_bucket, RGWBucketInfo>::iterator siiter;
RGWBucketInfo *pbucket_info;
- rgw_raw_obj sources_obj;
-
- bool found_binfo{false};
- map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
- map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter_end;
- set<rgw_sync_bucket_pipe>::iterator piter;
-
- std::optional<rgw_bucket> source_bucket;
+ RGWBucketSyncFlowManager::pipe_set::iterator siter;
rgw_bucket_sync_sources_local_info sources_local_info;
rgw_bucket_sync_sources_local_info expected_local_info;
@@ -3444,26 +3443,79 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine {
RGWSyncTraceNodeRef tn;
+ using pipe_const_iter = map<string, RGWBucketSyncFlowManager::pipe_set>::const_iterator;
+
+ static pair<pipe_const_iter, pipe_const_iter> get_pipe_iters(const map<string, RGWBucketSyncFlowManager::pipe_set>& m, std::optional<string> zone) {
+ if (!zone) {
+ return { m.begin(), m.end() };
+ }
+
+ auto b = m.find(*zone);
+ if (b == m.end()) {
+ return { b, b };
+ }
+ return { b, std::next(b) };
+ }
+
+ static RGWBucketSyncFlowManager::pipe_set filter_sources(std::optional<string> source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources) {
+ RGWBucketSyncFlowManager::pipe_set result;
+
+ auto iters = get_pipe_iters(all_sources, source_zone);
+ for (auto i = iters.first; i != iters.second; ++i) {
+ for (auto& peer : i->second.pipes) {
+ if (source_bucket &&
+ peer.source.bucket &&
+ *source_bucket != *peer.source.bucket) {
+ continue;
+ }
+ result.insert(peer);
+ }
+ }
+ return result;
+ }
+
+ static RGWBucketSyncFlowManager::pipe_set filter_targets(std::optional<string> target_zone,
+ std::optional<rgw_bucket> target_bucket,
+ const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets) {
+ RGWBucketSyncFlowManager::pipe_set result;
+
+ auto iters = get_pipe_iters(all_targets, target_zone);
+ for (auto i = iters.first; i != iters.second; ++i) {
+ for (auto& peer : i->second.pipes) {
+ if (target_bucket &&
+ peer.dest.bucket &&
+ *target_bucket != *peer.dest.bucket) {
+ continue;
+ }
+ result.insert(peer);
+ }
+ }
+ return result;
+ }
+
public:
- RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
- const rgw_bucket& _bucket,
- std::optional<string> _source_zone,
- map<string, RGWBucketSyncFlowManager::pipe_set> *_sources,
- map<rgw_bucket, RGWBucketInfo> *_sources_info,
- RGWBucketInfo *_pbucket_info,
- const RGWSyncTraceNodeRef& _tn_parent)
+ RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
+ std::optional<rgw_bucket> _target_bucket,
+ std::optional<string> _source_zone,
+ std::optional<rgw_bucket> _source_bucket,
+ RGWBucketSyncFlowManager::pipe_set *_sources,
+ map<rgw_bucket, RGWBucketInfo> *_sources_info,
+ RGWBucketInfo *_pbucket_info,
+ const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket(_bucket),
+ target_bucket(_target_bucket),
source_zone(_source_zone),
sources(_sources),
sources_info(_sources_info),
pbucket_info(_pbucket_info),
- sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
policy(make_shared<rgw_bucket_get_sync_policy_result>()),
- tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
- SSTR(bucket))) {
- }
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
+ SSTR( "target=" << target_bucket.value_or(rgw_bucket())
+ << ":source=" << target_bucket.value_or(rgw_bucket())
+ << ":source_zone=" << source_zone.value_or("*")))) {}
int operate() override;
};
@@ -3477,10 +3529,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
rgw_raw_obj sources_obj;
- map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+ RGWBucketSyncFlowManager::pipe_set sources;
map<rgw_bucket, RGWBucketInfo> sources_info;
- map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
- set<rgw_sync_bucket_pipe>::iterator piter;
+ RGWBucketSyncFlowManager::pipe_set::iterator siter;
rgw_bucket_sync_pair_info sync_pair;
@@ -3491,6 +3542,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
std::vector<RGWDataSyncCtx> scs;
RGWDataSyncCtx *cur_sc{nullptr};
+ RGWRESTConn *conn{nullptr};
+ string last_zone;
+
int ret{0};
public:
@@ -3502,7 +3556,7 @@ public:
sync_env(_sync_env),
bucket(_bucket),
source_zone(_source_zone),
- sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
+ sources_obj(RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
SSTR(bucket))) {
}
@@ -3539,7 +3593,7 @@ int RGWRunBucketSourcesSyncCR::operate()
}
tn->log(10, "took lease");
- yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, source_zone, &sources, &sources_info, &bucket_info, tn));
+ yield call(new RGWGetBucketPeersCR(sync_env, bucket, source_zone, std::nullopt, &sources, &sources_info, &bucket_info, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
@@ -3550,32 +3604,39 @@ int RGWRunBucketSourcesSyncCR::operate()
for (siter = sources.begin(); siter != sources.end(); ++siter) {
scs.emplace_back();
cur_sc = &scs.back();
+ if (!siter->source.zone ||
+ !siter->source.bucket) {
+ continue;
+ }
+
{
- auto& szone = siter->first;
- auto conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
- if (!conn) {
- ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
- continue;
+ auto& szone = *siter->source.zone;
+ if (last_zone != szone) {
+ conn = sync_env->svc->zone->get_zone_conn_by_id(szone);
+ if (!conn) {
+ ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl;
+ continue;
+ }
+ last_zone = szone;
}
- cur_sc->init(sync_env, conn, siter->first);
+ cur_sc->init(sync_env, conn, szone);
}
- for (piter = siter->second.pipes.begin(); piter != siter->second.pipes.end(); ++piter) {
- sync_pair.source_bs.bucket = *piter->source.bucket;
- sync_pair.dest_bs.bucket = bucket_info.bucket;
- yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
- while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- bool again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- /* we have reported this error */
- }
- /* not waiting for child here */
+ sync_pair.source_bs.bucket = *siter->source.bucket;
+ sync_pair.dest_bs.bucket = bucket_info.bucket;
+
+ yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
+ while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
+ set_status() << "num_spawned() > spawn_window";
+ yield wait_for_child();
+ bool again = true;
+ while (again) {
+ again = collect(&ret, nullptr);
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ /* we have reported this error */
}
+ /* not waiting for child here */
}
}
}
@@ -3649,19 +3710,21 @@ int RGWSyncGetBucketInfoCR::operate()
return 0;
}
-int RGWGetBucketSourcePeersCR::operate()
+int RGWGetBucketPeersCR::operate()
{
reenter(this) {
- yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
- sync_env->svc->sysobj,
- RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
- &sources_local_info));
- if (retcode < 0 &&
- retcode != -ENOENT) {
- return set_cr_error(retcode);
+ if (target_bucket) {
+ yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+ sync_env->svc->sysobj,
+ RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket),
+ &sources_local_info));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
}
- get_policy_params.bucket = bucket;
+ get_policy_params.bucket = *target_bucket;
yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
sync_env->store,
get_policy_params,
@@ -3675,47 +3738,36 @@ int RGWGetBucketSourcePeersCR::operate()
{
auto& handler = policy->policy_handler;
- *sources = handler->get_sources();
+ if (sources) {
+ *sources = filter_sources(source_zone,
+ source_bucket,
+ handler->get_sources());
+ }
+
auto& binfo = handler->get_bucket_info();
if (binfo) {
*pbucket_info = *binfo;
- found_binfo = true;
+ if (sources_info) {
+ (*sources_info)[binfo->bucket] = *binfo;
+ }
}
}
- if (sources_info) {
- if (found_binfo) {
- (*sources_info)[bucket] = *pbucket_info;
- }
-
- siter_end = sources->end();
- if (source_zone) {
- siter = sources->find(*source_zone);
- if (siter != sources->end()) {
- siter_end = siter;
- ++siter_end;
+ if (sources && sources_info) {
+ for (siter = sources->begin(); siter != sources->end(); ++siter) {
+ source_bucket = siter->source.bucket;
+ if (!source_bucket) {
+ continue;
}
- } else {
- siter = sources->begin();
- }
- for (; siter != siter_end; ++siter) {
- for (piter = siter->second.pipes.begin();
- piter != siter->second.pipes.end();
- ++piter) {
- source_bucket = piter->source.bucket;
- if (!source_bucket) {
- continue;
- }
- if (sources_info->find(*source_bucket) != sources_info->end()) {
- continue;
- }
-
- (*sources_info)[*source_bucket] = RGWBucketInfo(); /* reserve space for it, will fetch it later when map cannot change */
+ if (sources_info->find(*source_bucket) != sources_info->end()) {
+ continue;
}
+
+ sources_info->emplace(*source_bucket, RGWBucketInfo()); /* reserve space for it, will fetch it later when map cannot change */
}
for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) {
- if (siiter->first != bucket) {
+ if (siiter->second.bucket.name.empty()) {
yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
}
}
@@ -3727,6 +3779,15 @@ int RGWGetBucketSourcePeersCR::operate()
return 0;
}
+int RGWRunBucketsSyncBySourceCR::operate()
+{
+ reenter(this) {
+ return set_cr_done();
+ }
+
+ return 0;
+}
+
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {
@@ -3860,55 +3921,59 @@ int RGWBucketPipeSyncStatusManager::init()
error_logger, store->getRados()->get_sync_tracer(),
sync_module, nullptr);
- map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+ RGWBucketSyncFlowManager::pipe_set sources;
map<rgw_bucket, RGWBucketInfo> sources_info;
RGWBucketInfo dest_bucket_info;
- ret = cr_mgr.run(new RGWGetBucketSourcePeersCR(&sync_env,
- dest_bucket,
- source_zone,
- &sources,
- &sources_info,
- &dest_bucket_info,
- sync_env.sync_tracer->root_node));
+ ret = cr_mgr.run(new RGWGetBucketPeersCR(&sync_env,
+ dest_bucket,
+ source_zone,
+ std::nullopt,
+ &sources,
+ &sources_info,
+ &dest_bucket_info,
+ sync_env.sync_tracer->root_node));
if (ret < 0) {
ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
return ret;
}
- for (auto siter : sources) {
- if (source_zone && siter.first != *source_zone) {
+ string last_zone;
+
+ for (auto& peer : sources) {
+ if (!peer.source.zone) {
continue;
}
- auto& szone = siter.first;
+ auto szone = *peer.source.zone;
- conn = store->svc()->zone->get_zone_conn_by_id(szone);
- if (!conn) {
- ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
- return -EINVAL;
+ if (last_zone != szone) {
+ conn = store->svc()->zone->get_zone_conn_by_id(szone);
+ if (!conn) {
+ ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
+ return -EINVAL;
+ }
+ last_zone = szone;
}
- for (auto& pipe : siter.second.pipes) {
- auto& source_bucket = pipe.source.bucket;
+ auto& source_bucket = peer.source.bucket;
- if (!source_bucket) {
- continue;
- }
+ if (!source_bucket) {
+ continue;
+ }
- auto iter = sources_info.find(*source_bucket);
- if (iter == sources_info.end()) {
- ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
- return -EIO;
+ auto iter = sources_info.find(*source_bucket);
+ if (iter == sources_info.end()) {
+ ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl;
+ return -EIO;
}
- auto& source_bucket_info = iter->second;
+ auto& source_bucket_info = iter->second;
- source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
- szone, conn,
- source_bucket_info,
- dest_bucket_info.bucket));
- }
+ source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
+ szone, conn,
+ source_bucket_info,
+ dest_bucket_info.bucket));
}
return 0;
diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h
index fc1e0602452..483a3b21781 100644
--- a/src/rgw/rgw_data_sync.h
+++ b/src/rgw/rgw_data_sync.h
@@ -633,7 +633,8 @@ public:
int init_sync_status();
static string status_oid(const string& source_zone, const rgw_bucket_sync_pair_info& bs);
- static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */
+ static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* specific source obj sync status,
+ can be used by sync modules */
// implements DoutPrefixProvider
CephContext *get_cct() const override;