diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2019-10-30 22:51:12 +0100 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2020-01-28 19:20:37 +0100 |
commit | c4deb504921324430c74daf6013aad44502a07da (patch) | |
tree | bd34e8f5f017e66e1c0c45bc51ff806a87e0d1be | |
parent | rgw: sync: bucket sync manager adjustments for new system (diff) | |
download | ceph-c4deb504921324430c74daf6013aad44502a07da.tar.xz ceph-c4deb504921324430c74daf6013aad44502a07da.zip |
rgw: data sync: more work towards new policy integration
RGWGetBucketPeersCR will be able to return correct buckets
that need to sync, either by a target bucket, and/or a source
zone and a source bucket (partially implemented).
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r-- | src/rgw/rgw_bucket_sync.h | 14 | ||||
-rw-r--r-- | src/rgw/rgw_data_sync.cc | 351 | ||||
-rw-r--r-- | src/rgw/rgw_data_sync.h | 3 |
3 files changed, 224 insertions, 144 deletions
diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index c62f4cf2edb..2368ffb097d 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -113,6 +113,20 @@ public: struct pipe_set { std::set<rgw_sync_bucket_pipe> pipes; + using iterator = std::set<rgw_sync_bucket_pipe>::iterator; + + void insert(const rgw_sync_bucket_pipe& pipe) { + pipes.insert(pipe); + } + + iterator begin() { + return pipes.begin(); + } + + iterator end() { + return pipes.end(); + } + void dump(ceph::Formatter *f) const; }; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f86d08582c7..2e0e3ad5f2a 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -48,6 +48,7 @@ static string datalog_sync_full_sync_index_prefix = "data.full-sync.index"; static string bucket_status_oid_prefix = "bucket.sync-status"; static string object_status_oid_prefix = "bucket.sync-status"; static string bucket_sync_sources_oid_prefix = "bucket.sync-sources"; +static string bucket_sync_targets_oid_prefix = "bucket.sync-targets"; void rgw_datalog_info::decode_json(JSONObj *obj) { @@ -67,29 +68,6 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; -#warning FIXME -#if 0 -void rgw_sync_flow_rule::get_zone_peers(const string& zone_id, - std::set<string> *sources, - std::set<string> *targets) const -{ - sources->clear(); - targets->clear(); - - if (directional) { - if (directional->target_zone == zone_id) { - sources->insert(directional->source_zone); - } else if (directional->source_zone == zone_id) { - targets->insert(directional->target_zone); - } - } else if (symmetrical && - symmetrical->find(zone_id) != symmetrical->end()) { - *sources = *symmetrical; - sources->erase(zone_id); - *targets = *sources; - } -} -#endif class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { static constexpr int MAX_CONCURRENT_SHARDS = 16; @@ -1071,6 +1049,26 @@ public: int operate() override; }; +class RGWRunBucketsSyncBySourceCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + + rgw_bucket_shard source; + + RGWSyncTraceNodeRef tn; + +public: + RGWRunBucketsSyncBySourceCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& _source, const RGWSyncTraceNodeRef& _tn_parent) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source(_source), + tn(sync_env->sync_tracer->add_node(_tn_parent, "source", + SSTR(bucket_shard_str{_source} ))) { + } + ~RGWRunBucketsSyncBySourceCR() override { + } + + int operate() override; +}; + class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; @@ -1124,7 +1122,7 @@ public: sync_pair.source_bs = bs; sync_pair.dest_bs = bs; #warning init pipe fields - call(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn)); + call(new RGWRunBucketsSyncBySourceCR(sc, bs, tn)); } } while (marker_tracker && marker_tracker->need_retry(raw_key)); @@ -3351,7 +3349,7 @@ int RGWBucketShardIncrementalSyncCR::operate() return 0; } -class RGWBucketSyncSourcesManager { +class RGWBucketSyncPeersManager { public: static string sync_sources_oid(const rgw_bucket bucket) { return bucket_sync_sources_oid_prefix + "." + bucket.get_key(); @@ -3360,6 +3358,13 @@ public: static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket)); } + static string sync_targets_oid(const rgw_bucket bucket) { + return bucket_sync_targets_oid_prefix + "." + bucket.get_key(); + } + + static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { + return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(bucket)); + } }; struct rgw_bucket_sync_source_local_info { @@ -3416,25 +3421,19 @@ struct rgw_bucket_sync_sources_local_info { }; WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info) -class RGWGetBucketSourcePeersCR : public RGWCoroutine { +class RGWGetBucketPeersCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - rgw_bucket bucket; + std::optional<rgw_bucket> target_bucket; std::optional<string> source_zone; + std::optional<rgw_bucket> source_bucket; - map<string, RGWBucketSyncFlowManager::pipe_set> *sources; + RGWBucketSyncFlowManager::pipe_set *sources; map<rgw_bucket, RGWBucketInfo> *sources_info; map<rgw_bucket, RGWBucketInfo>::iterator siiter; RGWBucketInfo *pbucket_info; - rgw_raw_obj sources_obj; - - bool found_binfo{false}; - map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter; - map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter_end; - set<rgw_sync_bucket_pipe>::iterator piter; - - std::optional<rgw_bucket> source_bucket; + RGWBucketSyncFlowManager::pipe_set::iterator siter; rgw_bucket_sync_sources_local_info sources_local_info; rgw_bucket_sync_sources_local_info expected_local_info; @@ -3444,26 +3443,79 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; + using pipe_const_iter = map<string, RGWBucketSyncFlowManager::pipe_set>::const_iterator; + + static pair<pipe_const_iter, pipe_const_iter> get_pipe_iters(const map<string, RGWBucketSyncFlowManager::pipe_set>& m, std::optional<string> zone) { + if (!zone) { + return { m.begin(), m.end() }; + } + + auto b = m.find(*zone); + if (b == m.end()) { + return { b, b }; + } + return { b, std::next(b) }; + } + + static RGWBucketSyncFlowManager::pipe_set filter_sources(std::optional<string> source_zone, + std::optional<rgw_bucket> source_bucket, + const map<string, RGWBucketSyncFlowManager::pipe_set>& all_sources) { + RGWBucketSyncFlowManager::pipe_set result; + + auto iters = get_pipe_iters(all_sources, source_zone); + for (auto i = iters.first; i != iters.second; ++i) { + for (auto& peer : i->second.pipes) { + if (source_bucket && + peer.source.bucket && + *source_bucket != *peer.source.bucket) { + continue; + } + result.insert(peer); + } + } + return result; + } + + static RGWBucketSyncFlowManager::pipe_set filter_targets(std::optional<string> target_zone, + std::optional<rgw_bucket> target_bucket, + const map<string, RGWBucketSyncFlowManager::pipe_set>& all_targets) { + RGWBucketSyncFlowManager::pipe_set result; + + auto iters = get_pipe_iters(all_targets, target_zone); + for (auto i = iters.first; i != iters.second; ++i) { + for (auto& peer : i->second.pipes) { + if (target_bucket && + peer.dest.bucket && + *target_bucket != *peer.dest.bucket) { + continue; + } + result.insert(peer); + } + } + return result; + } + public: - RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env, - const rgw_bucket& _bucket, - std::optional<string> _source_zone, - map<string, RGWBucketSyncFlowManager::pipe_set> *_sources, - map<rgw_bucket, RGWBucketInfo> *_sources_info, - RGWBucketInfo *_pbucket_info, - const RGWSyncTraceNodeRef& _tn_parent) + RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env, + std::optional<rgw_bucket> _target_bucket, + std::optional<string> _source_zone, + std::optional<rgw_bucket> _source_bucket, + RGWBucketSyncFlowManager::pipe_set *_sources, + map<rgw_bucket, RGWBucketInfo> *_sources_info, + RGWBucketInfo *_pbucket_info, + const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket(_bucket), + target_bucket(_target_bucket), source_zone(_source_zone), sources(_sources), sources_info(_sources_info), pbucket_info(_pbucket_info), - sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)), policy(make_shared<rgw_bucket_get_sync_policy_result>()), - tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources", - SSTR(bucket))) { - } + tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers", + SSTR( "target=" << target_bucket.value_or(rgw_bucket()) + << ":source=" << target_bucket.value_or(rgw_bucket()) + << ":source_zone=" << source_zone.value_or("*")))) {} int operate() override; }; @@ -3477,10 +3529,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { rgw_raw_obj sources_obj; - map<string, RGWBucketSyncFlowManager::pipe_set> sources; + RGWBucketSyncFlowManager::pipe_set sources; map<rgw_bucket, RGWBucketInfo> sources_info; - map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter; - set<rgw_sync_bucket_pipe>::iterator piter; + RGWBucketSyncFlowManager::pipe_set::iterator siter; rgw_bucket_sync_pair_info sync_pair; @@ -3491,6 +3542,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { std::vector<RGWDataSyncCtx> scs; RGWDataSyncCtx *cur_sc{nullptr}; + RGWRESTConn *conn{nullptr}; + string last_zone; + int ret{0}; public: @@ -3502,7 +3556,7 @@ public: sync_env(_sync_env), bucket(_bucket), source_zone(_source_zone), - sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)), + sources_obj(RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, bucket)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", SSTR(bucket))) { } @@ -3539,7 +3593,7 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, source_zone, &sources, &sources_info, &bucket_info, tn)); + yield call(new RGWGetBucketPeersCR(sync_env, bucket, source_zone, std::nullopt, &sources, &sources_info, &bucket_info, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3550,32 +3604,39 @@ int RGWRunBucketSourcesSyncCR::operate() for (siter = sources.begin(); siter != sources.end(); ++siter) { scs.emplace_back(); cur_sc = &scs.back(); + if (!siter->source.zone || + !siter->source.bucket) { + continue; + } + { - auto& szone = siter->first; - auto conn = sync_env->svc->zone->get_zone_conn_by_id(szone); - if (!conn) { - ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl; - continue; + auto& szone = *siter->source.zone; + if (last_zone != szone) { + conn = sync_env->svc->zone->get_zone_conn_by_id(szone); + if (!conn) { + ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << szone << " does not exist" << dendl; + continue; + } + last_zone = szone; } - cur_sc->init(sync_env, conn, siter->first); + cur_sc->init(sync_env, conn, szone); } - for (piter = siter->second.pipes.begin(); piter != siter->second.pipes.end(); ++piter) { - sync_pair.source_bs.bucket = *piter->source.bucket; - sync_pair.dest_bs.bucket = bucket_info.bucket; - yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false); - while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - /* we have reported this error */ - } - /* not waiting for child here */ + sync_pair.source_bs.bucket = *siter->source.bucket; + sync_pair.dest_bs.bucket = bucket_info.bucket; + + yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false); + while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { + set_status() << "num_spawned() > spawn_window"; + yield wait_for_child(); + bool again = true; + while (again) { + again = collect(&ret, nullptr); + if (ret < 0) { + tn->log(10, "a sync operation returned error"); + /* we have reported this error */ } + /* not waiting for child here */ } } } @@ -3649,19 +3710,21 @@ int RGWSyncGetBucketInfoCR::operate() return 0; } -int RGWGetBucketSourcePeersCR::operate() +int RGWGetBucketPeersCR::operate() { reenter(this) { - yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados, - sync_env->svc->sysobj, - RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket), - &sources_local_info)); - if (retcode < 0 && - retcode != -ENOENT) { - return set_cr_error(retcode); + if (target_bucket) { + yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados, + sync_env->svc->sysobj, + RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket), + &sources_local_info)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); + } } - get_policy_params.bucket = bucket; + get_policy_params.bucket = *target_bucket; yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, sync_env->store, get_policy_params, @@ -3675,47 +3738,36 @@ int RGWGetBucketSourcePeersCR::operate() { auto& handler = policy->policy_handler; - *sources = handler->get_sources(); + if (sources) { + *sources = filter_sources(source_zone, + source_bucket, + handler->get_sources()); + } + auto& binfo = handler->get_bucket_info(); if (binfo) { *pbucket_info = *binfo; - found_binfo = true; + if (sources_info) { + (*sources_info)[binfo->bucket] = *binfo; + } } } - if (sources_info) { - if (found_binfo) { - (*sources_info)[bucket] = *pbucket_info; - } - - siter_end = sources->end(); - if (source_zone) { - siter = sources->find(*source_zone); - if (siter != sources->end()) { - siter_end = siter; - ++siter_end; + if (sources && sources_info) { + for (siter = sources->begin(); siter != sources->end(); ++siter) { + source_bucket = siter->source.bucket; + if (!source_bucket) { + continue; } - } else { - siter = sources->begin(); - } - for (; siter != siter_end; ++siter) { - for (piter = siter->second.pipes.begin(); - piter != siter->second.pipes.end(); - ++piter) { - source_bucket = piter->source.bucket; - if (!source_bucket) { - continue; - } - if (sources_info->find(*source_bucket) != sources_info->end()) { - continue; - } - - (*sources_info)[*source_bucket] = RGWBucketInfo(); /* reserve space for it, will fetch it later when map cannot change */ + if (sources_info->find(*source_bucket) != sources_info->end()) { + continue; } + + sources_info->emplace(*source_bucket, RGWBucketInfo()); /* reserve space for it, will fetch it later when map cannot change */ } for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) { - if (siiter->first != bucket) { + if (siiter->second.bucket.name.empty()) { yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn)); } } @@ -3727,6 +3779,15 @@ int RGWGetBucketSourcePeersCR::operate() return 0; } +int RGWRunBucketsSyncBySourceCR::operate() +{ + reenter(this) { + return set_cr_done(); + } + + return 0; +} + int RGWRunBucketSyncCoroutine::operate() { reenter(this) { @@ -3860,55 +3921,59 @@ int RGWBucketPipeSyncStatusManager::init() error_logger, store->getRados()->get_sync_tracer(), sync_module, nullptr); - map<string, RGWBucketSyncFlowManager::pipe_set> sources; + RGWBucketSyncFlowManager::pipe_set sources; map<rgw_bucket, RGWBucketInfo> sources_info; RGWBucketInfo dest_bucket_info; - ret = cr_mgr.run(new RGWGetBucketSourcePeersCR(&sync_env, - dest_bucket, - source_zone, - &sources, - &sources_info, - &dest_bucket_info, - sync_env.sync_tracer->root_node)); + ret = cr_mgr.run(new RGWGetBucketPeersCR(&sync_env, + dest_bucket, + source_zone, + std::nullopt, + &sources, + &sources_info, + &dest_bucket_info, + sync_env.sync_tracer->root_node)); if (ret < 0) { ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl; return ret; } - for (auto siter : sources) { - if (source_zone && siter.first != *source_zone) { + string last_zone; + + for (auto& peer : sources) { + if (!peer.source.zone) { continue; } - auto& szone = siter.first; + auto szone = *peer.source.zone; - conn = store->svc()->zone->get_zone_conn_by_id(szone); - if (!conn) { - ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl; - return -EINVAL; + if (last_zone != szone) { + conn = store->svc()->zone->get_zone_conn_by_id(szone); + if (!conn) { + ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl; + return -EINVAL; + } + last_zone = szone; } - for (auto& pipe : siter.second.pipes) { - auto& source_bucket = pipe.source.bucket; + auto& source_bucket = peer.source.bucket; - if (!source_bucket) { - continue; - } + if (!source_bucket) { + continue; + } - auto iter = sources_info.find(*source_bucket); - if (iter == sources_info.end()) { - ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl; - return -EIO; + auto iter = sources_info.find(*source_bucket); + if (iter == sources_info.end()) { + ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl; + return -EIO; } - auto& source_bucket_info = iter->second; + auto& source_bucket_info = iter->second; - source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env, - szone, conn, - source_bucket_info, - dest_bucket_info.bucket)); - } + source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env, + szone, conn, + source_bucket_info, + dest_bucket_info.bucket)); } return 0; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index fc1e0602452..483a3b21781 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -633,7 +633,8 @@ public: int init_sync_status(); static string status_oid(const string& source_zone, const rgw_bucket_sync_pair_info& bs); - static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */ + static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* specific source obj sync status, + can be used by sync modules */ // implements DoutPrefixProvider CephContext *get_cct() const override; |