diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2019-11-06 03:12:05 +0100 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2020-01-28 19:20:38 +0100 |
commit | fb134d93d380a0135dc45b09486b9548b401a7ac (patch) | |
tree | 5f4e5e22e36720ca8deab1a769c7a11fb697cec8 | |
parent | rgw: identify potential related (for sync) buckets on bucket update (diff) | |
download | ceph-fb134d93d380a0135dc45b09486b9548b401a7ac.tar.xz ceph-fb134d93d380a0135dc45b09486b9548b401a7ac.zip |
rgw: manage bucket sync deps index
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r-- | src/cls/version/cls_version_types.h | 9 | ||||
-rw-r--r-- | src/rgw/rgw_admin.cc | 46 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.cc | 3 | ||||
-rw-r--r-- | src/rgw/rgw_bucket_sync.cc | 39 | ||||
-rw-r--r-- | src/rgw/rgw_bucket_sync.h | 20 | ||||
-rw-r--r-- | src/rgw/rgw_service.cc | 6 | ||||
-rw-r--r-- | src/rgw/services/svc_bucket.h | 1 | ||||
-rw-r--r-- | src/rgw/services/svc_bucket_sobj.cc | 21 | ||||
-rw-r--r-- | src/rgw/services/svc_bucket_sobj.h | 1 | ||||
-rw-r--r-- | src/rgw/services/svc_bucket_sync.h | 11 | ||||
-rw-r--r-- | src/rgw/services/svc_bucket_sync_sobj.cc | 570 | ||||
-rw-r--r-- | src/rgw/services/svc_bucket_sync_sobj.h | 38 | ||||
-rw-r--r-- | src/rgw/services/svc_zone.cc | 12 | ||||
-rw-r--r-- | src/rgw/services/svc_zone.h | 5 |
14 files changed, 755 insertions, 27 deletions
diff --git a/src/cls/version/cls_version_types.h b/src/cls/version/cls_version_types.h index 852183f30e5..ffcb73fa115 100644 --- a/src/cls/version/cls_version_types.h +++ b/src/cls/version/cls_version_types.h @@ -36,15 +36,20 @@ struct obj_version { tag.clear(); } - bool empty() { + bool empty() const { return tag.empty(); } - bool compare(struct obj_version *v) { + bool compare(struct obj_version *v) const { return (ver == v->ver && tag.compare(v->tag) == 0); } + bool operator==(const struct obj_version& v) const { + return (ver == v.ver && + tag.compare(v.tag) == 0); + } + void dump(Formatter *f) const; void decode_json(JSONObj *obj); static void generate_test_instances(list<obj_version*>& o); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 1c5d5b574b8..11ceacaaaa8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2331,6 +2331,16 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pse } } +static std::vector<string> convert_bucket_set_to_str_vec(const std::set<rgw_bucket>& bs) +{ + std::vector<string> result; + result.reserve(bs.size()); + for (auto& b : bs) { + result.push_back(b.get_key()); + } + return std::move(result); +} + static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter) { std::optional<string> zone_id; @@ -2369,6 +2379,12 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu bucket_handler.reset(handler->alloc_child(*eff_bucket, nullopt)); } + ret = bucket_handler->init(null_yield); + if (ret < 0) { + cerr << "ERROR: failed to init bucket sync policy handler: " << cpp_strerror(-ret) << " (ret=" << ret << ")" << std::endl; + return ret; + } + handler = bucket_handler; } @@ -2377,10 +2393,40 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu handler->get_pipes(&sources, &dests); + auto source_hints_vec = convert_bucket_set_to_str_vec(handler->get_source_hints()); + auto target_hints_vec = convert_bucket_set_to_str_vec(handler->get_target_hints()); + + RGWBucketSyncFlowManager::pipe_set *resolved_sources; + RGWBucketSyncFlowManager::pipe_set *resolved_dests; + + for (auto& b : handler->get_source_hints()) { + RGWBucketInfo hint_bucket_info; + rgw_bucket hint_bucket; + int ret = init_bucket(b, hint_bucket_info, hint_bucket); + if (ret < 0) { + ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl; + continue; + } + + RGWBucketSyncPolicyHandlerRef hint_bucket_handler; + hint_bucket_handler.reset(handler->alloc_child(hint_bucket_indo)); + + } + { Formatter::ObjectSection os(*formatter, "result"); encode_json("sources", *sources, formatter); encode_json("dests", *dests, formatter); + { + Formatter::ObjectSection hints_section(*formatter, "hints"); + encode_json("sources", source_hints_vec, formatter); + encode_json("dests", target_hints_vec, formatter); + } + { + Formatter::ObjectSection resolved_hints_section(*formatter, "resolved-hints"); + encode_json("resolved-hints", *sources, formatter); + encode_json("dests", *dests, formatter); + } } formatter->flush(cout); diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 16adc9df3b4..a433b30ba18 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -2926,7 +2926,7 @@ public: if (ret < 0 && ret != -ENOENT) return ret; - return svc.bucket->remove_bucket_instance_info(ctx, entry, &bci.info.objv_tracker, y); + return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y); } int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) { @@ -3267,6 +3267,7 @@ int RGWBucketCtl::remove_bucket_instance_info(const rgw_bucket& bucket, return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& ctx) { return svc.bucket->remove_bucket_instance_info(ctx, RGWSI_Bucket::get_bi_meta_key(bucket), + info, &info.objv_tracker, y); }); diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 184a31becff..d6e07df72d5 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -6,6 +6,7 @@ #include "rgw_zone.h" #include "services/svc_zone.h" +#include "services/svc_bucket_sync.h" #define dout_subsys ceph_subsys_rgw @@ -344,6 +345,7 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket, RGWBucketSyncFlowManager::pipe_set *source_pipes, RGWBucketSyncFlowManager::pipe_set *dest_pipes, + std::optional<rgw_bucket> filter_peer_bucket, bool only_enabled) const { @@ -352,7 +354,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke entity.bucket = effective_bucket.value_or(rgw_bucket()); if (parent) { - parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled); + parent->reflect(effective_bucket, source_pipes, dest_pipes, filter_peer_bucket, only_enabled); } for (auto& item : flow_groups) { @@ -369,6 +371,9 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke if (!pipe.dest.match_bucket(effective_bucket)) { continue; } + if (!pipe.source.match_bucket(filter_peer_bucket)) { + continue; + } pipe.source.apply_bucket(effective_bucket); pipe.dest.apply_bucket(effective_bucket); @@ -382,6 +387,9 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke if (!pipe.source.match_bucket(effective_bucket)) { continue; } + if (!pipe.dest.match_bucket(filter_peer_bucket)) { + continue; + } pipe.source.apply_bucket(effective_bucket); pipe.dest.apply_bucket(effective_bucket); @@ -449,7 +457,9 @@ void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc, RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, RGWSI_SyncModules *sync_modules_svc, - std::optional<string> effective_zone) : zone_svc(_zone_svc) { + RGWSI_Bucket_Sync *_bucket_sync_svc, + std::optional<string> effective_zone) : zone_svc(_zone_svc) , + bucket_sync_svc(_bucket_sync_svc) { zone_name = effective_zone.value_or(zone_svc->zone_name()); flow_mgr.reset(new RGWBucketSyncFlowManager(zone_name, nullopt, @@ -459,8 +469,6 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, if (sync_policy.empty()) { RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy); } - - init(); } RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, @@ -471,10 +479,10 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy } bucket = _bucket_info.bucket; zone_svc = parent->zone_svc; + bucket_sync_svc = parent->bucket_sync_svc; flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name, _bucket_info.bucket, parent->flow_mgr.get())); - init(); } RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, @@ -485,10 +493,10 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy } bucket = _bucket; zone_svc = parent->zone_svc; + bucket_sync_svc = parent->bucket_sync_svc; flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name, _bucket, parent->flow_mgr.get())); - init(); } RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info) const @@ -502,8 +510,19 @@ RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bu return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy); } -void RGWBucketSyncPolicyHandler::init() +int RGWBucketSyncPolicyHandler::init(std::optional<rgw_bucket> filter_peer_bucket, + optional_yield y) { + int r = bucket_sync_svc->get_bucket_sync_hints(bucket.value_or(rgw_bucket()), + &source_hints, + &target_hints, + y); + if (r < 0) { + ldout(bucket_sync_svc->ctx(), 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket=" + << bucket << " returned r=" << r << dendl; + return r; + } + flow_mgr->init(sync_policy); reflect(&sources_by_name, @@ -512,7 +531,10 @@ void RGWBucketSyncPolicyHandler::init() &targets, &source_zones, &target_zones, + filter_peer_bucket, true); + + return 0; } void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name, @@ -521,6 +543,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets, std::set<string> *psource_zones, std::set<string> *ptarget_zones, + std::optional<rgw_bucket> filter_peer_bucket, bool only_enabled) const { RGWBucketSyncFlowManager::pipe_set _sources_by_name; @@ -530,7 +553,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso std::set<string> _source_zones; std::set<string> _target_zones; - flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled); + flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, filter_peer_bucket, only_enabled); for (auto& pipe : _sources_by_name.pipes) { if (!pipe.source.zone) { diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 9a0c447ace1..4be4dda26c6 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -21,6 +21,7 @@ class RGWSI_Zone; class RGWSI_SyncModules; +class RGWSI_Bucket_Sync; struct rgw_sync_group_pipe_map; struct rgw_sync_bucket_pipes; @@ -167,6 +168,7 @@ public: void reflect(std::optional<rgw_bucket> effective_bucket, pipe_set *flow_by_source, pipe_set *flow_by_dest, + std::optional<rgw_bucket> filter_peer_bucket, bool only_enabled) const; }; @@ -174,6 +176,7 @@ public: class RGWBucketSyncPolicyHandler { const RGWBucketSyncPolicyHandler *parent{nullptr}; RGWSI_Zone *zone_svc; + RGWSI_Bucket_Sync *bucket_sync_svc; string zone_name; std::optional<RGWBucketInfo> bucket_info; std::optional<rgw_bucket> bucket; @@ -189,6 +192,9 @@ class RGWBucketSyncPolicyHandler { std::set<string> source_zones; /* source zones by name */ std::set<string> target_zones; /* target zones by name */ + std::set<rgw_bucket> source_hints; + std::set<rgw_bucket> target_hints; + bool bucket_is_sync_source() const { return !targets.empty(); } @@ -197,8 +203,6 @@ class RGWBucketSyncPolicyHandler { return !sources.empty(); } - void init(); - RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, const RGWBucketInfo& _bucket_info); @@ -208,18 +212,22 @@ class RGWBucketSyncPolicyHandler { public: RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, RGWSI_SyncModules *sync_modules_svc, + RGWSI_Bucket_Sync *bucket_sync_svc, std::optional<string> effective_zone = std::nullopt); RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info) const; RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket, std::optional<rgw_sync_policy_info> sync_policy) const; + int init(std::optional<rgw_bucket> filter_peer_bucket, optional_yield y); + void reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name, RGWBucketSyncFlowManager::pipe_set *ptargets_by_name, map<string, RGWBucketSyncFlowManager::pipe_set> *psources, map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets, std::set<string> *psource_zones, std::set<string> *ptarget_zones, + std::optional<rgw_bucket> filter_peer_bucket, bool only_enabled) const; const std::set<string>& get_source_zones() const { @@ -247,6 +255,14 @@ public: *targets = &targets_by_name; } + const std::set<rgw_bucket>& get_source_hints() const { + return source_hints; + } + + const std::set<rgw_bucket>& get_target_hints() const { + return target_hints; + } + bool bucket_exports_data() const; bool bucket_imports_data() const; }; diff --git a/src/rgw/rgw_service.cc b/src/rgw/rgw_service.cc index c81817f546c..00803b81a85 100644 --- a/src/rgw/rgw_service.cc +++ b/src/rgw/rgw_service.cc @@ -82,7 +82,9 @@ int RGWServices_Def::init(CephContext *cct, bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(), bi_rados.get(), meta.get(), meta_be_sobj.get(), sync_modules.get(), bucket_sync_sobj.get()); - bucket_sync_sobj->init(zone.get(), sysobj_cache.get(), + bucket_sync_sobj->init(zone.get(), + sysobj.get(), + sysobj_cache.get(), bucket_sobj.get()); cls->init(zone.get(), rados.get()); datalog_rados->init(zone.get(), cls.get()); @@ -93,7 +95,7 @@ int RGWServices_Def::init(CephContext *cct, notify->init(zone.get(), rados.get(), finisher.get()); otp->init(zone.get(), meta.get(), meta_be_otp.get()); rados->init(); - zone->init(sysobj.get(), rados.get(), sync_modules.get()); + zone->init(sysobj.get(), rados.get(), sync_modules.get(), bucket_sync_sobj.get()); zone_utils->init(rados.get(), zone.get()); quota->init(zone.get()); sync_modules->init(zone.get()); diff --git a/src/rgw/services/svc_bucket.h b/src/rgw/services/svc_bucket.h index b47b8f711af..7e39302f43c 100644 --- a/src/rgw/services/svc_bucket.h +++ b/src/rgw/services/svc_bucket.h @@ -86,6 +86,7 @@ public: virtual int remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, const string& key, + const RGWBucketInfo& bucket_info, RGWObjVersionTracker *objv_tracker, optional_yield y) = 0; diff --git a/src/rgw/services/svc_bucket_sobj.cc b/src/rgw/services/svc_bucket_sobj.cc index 0b87485fdeb..b276969e72d 100644 --- a/src/rgw/services/svc_bucket_sobj.cc +++ b/src/rgw/services/svc_bucket_sobj.cc @@ -533,7 +533,8 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, if (ret >= 0) { int r = svc.bucket_sync->handle_bi_update(info, - orig_info.value_or(nullptr)); + orig_info.value_or(nullptr), + y); if (r < 0) { return r; } @@ -558,11 +559,27 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, int RGWSI_Bucket_SObj::remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, const string& key, + const RGWBucketInfo& info, RGWObjVersionTracker *objv_tracker, optional_yield y) { RGWSI_MBSObj_RemoveParams params; - return svc.meta_be->remove_entry(ctx.get(), key, params, objv_tracker, y); + int ret = svc.meta_be->remove_entry(ctx.get(), key, params, objv_tracker, y); + + if (ret < 0 && + ret != -ENOENT) { + return ret; + } + + int r = svc.bucket_sync->handle_bi_removal(info, y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update bucket instance sync index: r=" << r << dendl; + /* returning success as index is just keeping hints, so will keep extra hints, + * but bucket removal succeeded + */ + } + + return 0; } int RGWSI_Bucket_SObj::read_bucket_stats(const RGWBucketInfo& bucket_info, diff --git a/src/rgw/services/svc_bucket_sobj.h b/src/rgw/services/svc_bucket_sobj.h index 80695ba2f73..744f4a8931e 100644 --- a/src/rgw/services/svc_bucket_sobj.h +++ b/src/rgw/services/svc_bucket_sobj.h @@ -153,6 +153,7 @@ public: int remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, const string& key, + const RGWBucketInfo& bucket_info, RGWObjVersionTracker *objv_tracker, optional_yield y) override; diff --git a/src/rgw/services/svc_bucket_sync.h b/src/rgw/services/svc_bucket_sync.h index ed709620276..250e874f006 100644 --- a/src/rgw/services/svc_bucket_sync.h +++ b/src/rgw/services/svc_bucket_sync.h @@ -35,8 +35,17 @@ public: std::optional<rgw_bucket> bucket, RGWBucketSyncPolicyHandlerRef *handler, optional_yield y) = 0; + virtual int handle_bi_update(RGWBucketInfo& bucket_info, - RGWBucketInfo *orig_bucket_info) = 0; + RGWBucketInfo *orig_bucket_info, + optional_yield y) = 0; + virtual int handle_bi_removal(const RGWBucketInfo& bucket_info, + optional_yield y) = 0; + + virtual int get_bucket_sync_hints(const rgw_bucket& bucket, + std::set<rgw_bucket> *sources, + std::set<rgw_bucket> *dests, + optional_yield y) = 0; }; diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc index 935e0ea776a..c849011e236 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.cc +++ b/src/rgw/services/svc_bucket_sync_sobj.cc @@ -4,20 +4,28 @@ #include "svc_bucket_sobj.h" #include "rgw/rgw_bucket_sync.h" +#include "rgw/rgw_zone.h" #define dout_subsys ceph_subsys_rgw +static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints"; +static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints"; RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() { } void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc, + RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc, RGWSI_Bucket_SObj *bucket_sobj_svc) { svc.zone = _zone_svc; + svc.sysobj = _sysobj_svc; svc.cache = _cache_svc; svc.bucket_sobj = bucket_sobj_svc; + + hint_index_mgr.init(svc.zone, + svc.sysobj); } int RGWSI_Bucket_Sync_SObj::do_start() @@ -79,6 +87,12 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info)); + r = e.handler->init(y); + if (r < 0) { + ldout(cct, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r << dendl; + return r; + } + if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) { ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl; } @@ -88,7 +102,7 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, return 0; } -static void diff_sets(std::set<rgw_bucket>& orig_set, +static bool diff_sets(std::set<rgw_bucket>& orig_set, std::set<rgw_bucket>& new_set, vector<rgw_bucket> *added, vector<rgw_bucket> *removed) @@ -118,10 +132,499 @@ static void diff_sets(std::set<rgw_bucket>& orig_set, for (; niter != new_set.end(); ++niter) { added->push_back(*niter); } + + return !(removed->empty() && added->empty()); +} + + +class RGWSI_BS_SObj_HintIndexObj +{ + friend class RGWSI_Bucket_Sync_SObj; + + CephContext *cct; + struct { + RGWSI_SysObj *sysobj; + } svc; + + RGWSysObjectCtx obj_ctx; + rgw_raw_obj obj; + RGWSysObj sysobj; + + RGWObjVersionTracker ot; + + bool has_data{false}; + +public: + struct bi_entry { + rgw_bucket bucket; + map<rgw_bucket /* info_source */, obj_version> sources; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(bucket, bl); + encode(sources, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(bucket, bl); + decode(sources, bl); + DECODE_FINISH(bl); + } + + bool add(const rgw_bucket& info_source, + const obj_version& info_source_ver) { + auto& ver = sources[info_source]; + + if (ver == info_source_ver) { /* already updated */ + return false; + } + + if (info_source_ver.tag == ver.tag && + info_source_ver.ver < ver.ver) { + return false; + } + + ver = info_source_ver; + + return true; + } + + bool remove(const rgw_bucket& info_source, + const obj_version& info_source_ver) { + auto iter = sources.find(info_source); + if (iter == sources.end()) { + return false; + } + + auto& ver = iter->second; + + if (info_source_ver.tag == ver.tag && + info_source_ver.ver < ver.ver) { + return false; + } + + sources.erase(info_source); + return true; + } + + bool empty() const { + return sources.empty(); + } + }; + + struct single_instance_info { + map<rgw_bucket, bi_entry> entries; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(entries, bl); + DECODE_FINISH(bl); + } + + bool add_entry(const rgw_bucket& info_source, + const obj_version& info_source_ver, + const rgw_bucket& bucket) { + auto& entry = entries[bucket]; + + if (!entry.add(info_source, info_source_ver)) { + return false; + } + + entry.bucket = bucket; + + return true; + } + + bool remove_entry(const rgw_bucket& info_source, + const obj_version& info_source_ver, + const rgw_bucket& bucket) { + auto iter = entries.find(bucket); + if (iter == entries.end()) { + return false; + } + + if (!iter->second.remove(info_source, info_source_ver)) { + return false; + } + + if (iter->second.empty()) { + entries.erase(iter); + } + + return true; + } + + void clear() { + entries.clear(); + } + + bool empty() const { + return entries.empty(); + } + + void get_entities(std::set<rgw_bucket> *result) const { + for (auto& iter : entries) { + result->insert(iter.first); + } + } + }; + + struct info_map { + map<rgw_bucket, single_instance_info> instances; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(instances, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(instances, bl); + DECODE_FINISH(bl); + } + + bool empty() const { + return instances.empty(); + } + + void clear() { + instances.clear(); + } + + void get_entities(const rgw_bucket& bucket, + std::set<rgw_bucket> *result) const { + auto iter = instances.find(bucket); + if (iter == instances.end()) { + return; + } + iter->second.get_entities(result); + } + } info; + + RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj *_sysobj_svc, + const rgw_raw_obj& _obj) : cct(_sysobj_svc->ctx()), + obj_ctx(_sysobj_svc->init_obj_ctx()), + obj(_obj), + sysobj(obj_ctx.get_obj(obj)) + { + svc.sysobj = _sysobj_svc; + } + + int update(const rgw_bucket& entity, + const RGWBucketInfo& info_source, + std::optional<std::vector<rgw_bucket> > add, + std::optional<std::vector<rgw_bucket> > remove, + optional_yield y); + +private: + void update_entries(const rgw_bucket& info_source, + const obj_version& info_source_ver, + std::optional<std::vector<rgw_bucket> > add, + std::optional<std::vector<rgw_bucket> > remove, + single_instance_info *instance); + + int read(optional_yield y); + int flush(optional_yield y); + + void invalidate() { + has_data = false; + info.clear(); + } + + void get_entities(const rgw_bucket& bucket, + std::set<rgw_bucket> *result) const { + info.get_entities(bucket, result); + } +}; +WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry) +WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info) +WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map) + +int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity, + const RGWBucketInfo& info_source, + std::optional<std::vector<rgw_bucket> > add, + std::optional<std::vector<rgw_bucket> > remove, + optional_yield y) +{ + int r = 0; + + auto& info_source_ver = info_source.objv_tracker.read_version; + +#define MAX_RETRIES 25 + + for (int i = 0; i < MAX_RETRIES; ++i) { + if (!has_data) { + r = read(y); + if (r < 0) { + ldout(cct, 0) << "ERROR: cannot update hint index: failed to read: r=" << r << dendl; + return r; + } + } + + auto& instance = info.instances[entity]; + + update_entries(info_source.bucket, + info_source_ver, + add, remove, + &instance); + + if (instance.empty()) { + info.instances.erase(entity); + } + + r = flush(y); + if (r >= 0) { + return 0; + } + + if (r != -ECANCELED) { + ldout(cct, 0) << "ERROR: failed to flush hint index: obj=" << obj << " r=" << r << dendl; + return r; + } + } + ldout(cct, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj << "), likely a bug" << dendl; + + return -EIO; +} + +void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source, + const obj_version& info_source_ver, + std::optional<std::vector<rgw_bucket> > add, + std::optional<std::vector<rgw_bucket> > remove, + single_instance_info *instance) +{ + if (remove) { + for (auto& bucket : *remove) { + instance->remove_entry(info_source, info_source_ver, bucket); + } + } + + if (add) { + for (auto& bucket : *add) { + instance->add_entry(info_source, info_source_ver, bucket); + } + } +} + +int RGWSI_BS_SObj_HintIndexObj::read(optional_yield y) { + RGWObjVersionTracker _ot; + bufferlist bl; + int r = sysobj.rop() + .set_objv_tracker(&_ot) /* forcing read of current version */ + .read(&bl, y); + if (r < 0 && r != -ENOENT) { + ldout(cct, 0) << "ERROR: failed reading data (obj=" << obj << "), r=" << r << dendl; + return r; + } + + ot = _ot; + + if (r >= 0) { + auto iter = bl.cbegin(); + try { + decode(info, iter); + has_data = true; + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to decode entries, ignoring" << dendl; + info.clear(); + } + } else { + info.clear(); + } + + return 0; +} + +int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y) { + int r; + + if (!info.empty()) { + bufferlist bl; + encode(info, bl); + + r = sysobj.wop() + .set_objv_tracker(&ot) /* forcing read of current version */ + .write(bl, y); + + } else { /* remove */ + r = sysobj.wop() + .set_objv_tracker(&ot) + .remove(y); + } + + if (r < 0) { + return r; + } + + return 0; +} + +rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const +{ + rgw_bucket b = bucket; + b.bucket_id.clear(); + return rgw_raw_obj(svc.zone->get_zone_params().log_pool, + bucket_sync_sources_oid_prefix + "." + b.get_key()); +} + +rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const +{ + rgw_bucket b = bucket; + b.bucket_id.clear(); + return rgw_raw_obj(svc.zone->get_zone_params().log_pool, + bucket_sync_targets_oid_prefix + "." + b.get_key()); +} + +int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info, + std::vector<rgw_bucket>& added_dests, + std::vector<rgw_bucket>& removed_dests, + std::vector<rgw_bucket>& added_sources, + std::vector<rgw_bucket>& removed_sources, + optional_yield y) +{ + std::vector<rgw_bucket> self_entity; + self_entity.push_back(bucket_info.bucket); + + if (!added_dests.empty() || + !removed_dests.empty()) { + /* update our dests */ + RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, + hint_index_mgr.get_dests_obj(bucket_info.bucket)); + int r = index.update(bucket_info.bucket, + bucket_info, + added_dests, + removed_dests, + y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; + return r; + } + + /* update added dest buckets */ + for (auto& dest_bucket : added_dests) { + RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, + hint_index_mgr.get_sources_obj(dest_bucket)); + int r = dep_index.update(dest_bucket, + bucket_info, + self_entity, + std::nullopt, + y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; + return r; + } + } + /* update removed dest buckets */ + for (auto& dest_bucket : removed_dests) { + RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, + hint_index_mgr.get_sources_obj(dest_bucket)); + int r = dep_index.update(dest_bucket, + bucket_info, + std::nullopt, + self_entity, + y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; + return r; + } + } + } + + if (!added_dests.empty() || + !removed_dests.empty()) { + RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, + hint_index_mgr.get_sources_obj(bucket_info.bucket)); + /* update our sources */ + int r = index.update(bucket_info.bucket, + bucket_info, + added_sources, + removed_sources, + y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; + return r; + } + + /* update added sources buckets */ + for (auto& source_bucket : added_sources) { + RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, + hint_index_mgr.get_dests_obj(source_bucket)); + int r = dep_index.update(source_bucket, + bucket_info, + self_entity, + std::nullopt, + y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; + return r; + } + } + /* update removed dest buckets */ + for (auto& source_bucket : removed_sources) { + RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, + hint_index_mgr.get_dests_obj(source_bucket)); + int r = dep_index.update(source_bucket, + bucket_info, + std::nullopt, + self_entity, + y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; + return r; + } + } + } + + return 0; +} + +int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo& bucket_info, + optional_yield y) +{ + std::set<rgw_bucket> sources_set; + std::set<rgw_bucket> dests_set; + + if (bucket_info.sync_policy) { + bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket, + &sources_set, + &dests_set); + } + + std::vector<rgw_bucket> removed_sources; + removed_sources.reserve(sources_set.size()); + for (auto& e : sources_set) { + removed_sources.push_back(e); + } + + std::vector<rgw_bucket> removed_dests; + removed_dests.reserve(dests_set.size()); + for (auto& e : dests_set) { + removed_dests.push_back(e); + } + + std::vector<rgw_bucket> added_sources; + std::vector<rgw_bucket> added_dests; + + return do_update_hints(bucket_info, + added_dests, + removed_dests, + added_sources, + removed_sources, + y); } int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info, - RGWBucketInfo *orig_bucket_info) + RGWBucketInfo *orig_bucket_info, + optional_yield y) { std::set<rgw_bucket> orig_sources; std::set<rgw_bucket> orig_dests; @@ -143,16 +646,73 @@ int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info, std::vector<rgw_bucket> removed_sources; std::vector<rgw_bucket> added_sources; - diff_sets(orig_sources, sources, &added_sources, &removed_sources); + bool found = diff_sets(orig_sources, sources, &added_sources, &removed_sources); ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl; ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl; std::vector<rgw_bucket> removed_dests; std::vector<rgw_bucket> added_dests; - diff_sets(orig_dests, dests, &added_dests, &removed_dests); + found = found || diff_sets(orig_dests, dests, &added_dests, &removed_dests); + ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl; ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl; - return 0; + if (!found) { + return 0; + } + + return do_update_hints(bucket_info, + added_dests, + removed_dests, + added_sources, + removed_sources, + y); +} + +int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket, + std::set<rgw_bucket> *sources, + std::set<rgw_bucket> *dests, + optional_yield y) +{ + if (!sources && !dests) { + return 0; + } + + if (sources) { + RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, + hint_index_mgr.get_sources_obj(bucket)); + int r = index.read(y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl; + return r; + } + index.get_entities(bucket, dests); + + if (!bucket.bucket_id.empty()) { + rgw_bucket b = bucket; + b.bucket_id.clear(); + index.get_entities(bucket, dests); + } + } + + if (dests) { + RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, + hint_index_mgr.get_dests_obj(bucket)); + int r = index.read(y); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl; + return r; + } + + index.get_entities(bucket, dests); + + if (!bucket.bucket_id.empty()) { + rgw_bucket b = bucket; + b.bucket_id.clear(); + index.get_entities(bucket, dests); + } + } + + return 0; } diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h index 0271d9faff6..a0305e12b4d 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.h +++ b/src/rgw/services/svc_bucket_sync_sobj.h @@ -38,10 +38,37 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl<bucket_sync_policy_cache_entry>; unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache; + class HintIndexManager { + struct { + RGWSI_Zone *zone; + RGWSI_SysObj *sysobj; + } svc; + + public: + HintIndexManager() {} + + void init(RGWSI_Zone *_zone_svc, + RGWSI_SysObj *_sysobj_svc) { + svc.zone = _zone_svc; + svc.sysobj = _sysobj_svc; + } + + rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const; + rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const; + } hint_index_mgr; + int do_start() override; + + int do_update_hints(const RGWBucketInfo& bucket_info, + std::vector<rgw_bucket>& added_dests, + std::vector<rgw_bucket>& removed_dests, + std::vector<rgw_bucket>& added_sources, + std::vector<rgw_bucket>& removed_sources, + optional_yield y); public: struct Svc { RGWSI_Zone *zone{nullptr}; + RGWSI_SysObj *sysobj{nullptr}; RGWSI_SysObj_Cache *cache{nullptr}; RGWSI_Bucket_SObj *bucket_sobj{nullptr}; } svc; @@ -50,6 +77,7 @@ public: ~RGWSI_Bucket_Sync_SObj(); void init(RGWSI_Zone *_zone_svc, + RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc, RGWSI_Bucket_SObj *_bucket_sobj_svc); @@ -61,6 +89,14 @@ public: optional_yield y) override; int handle_bi_update(RGWBucketInfo& bucket_info, - RGWBucketInfo *orig_bucket_info) override; + RGWBucketInfo *orig_bucket_info, + optional_yield y) override; + int handle_bi_removal(const RGWBucketInfo& bucket_info, + optional_yield y) override; + + int get_bucket_sync_hints(const rgw_bucket& bucket, + std::set<rgw_bucket> *sources, + std::set<rgw_bucket> *dests, + optional_yield y) override; }; diff --git a/src/rgw/services/svc_zone.cc b/src/rgw/services/svc_zone.cc index c86d28c591f..511984f9a29 100644 --- a/src/rgw/services/svc_zone.cc +++ b/src/rgw/services/svc_zone.cc @@ -23,11 +23,13 @@ RGWSI_Zone::RGWSI_Zone(CephContext *cct) : RGWServiceInstance(cct) void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc, RGWSI_RADOS * _rados_svc, - RGWSI_SyncModules * _sync_modules_svc) + RGWSI_SyncModules * _sync_modules_svc, + RGWSI_Bucket_Sync *_bucket_sync_svc) { sysobj_svc = _sysobj_svc; rados_svc = _rados_svc; sync_modules_svc = _sync_modules_svc; + bucket_sync_svc = _bucket_sync_svc; realm = new RGWRealm(); zonegroup = new RGWZoneGroup(); @@ -164,7 +166,13 @@ int RGWSI_Zone::do_start() zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id()); for (auto ziter : zonegroup->zones) { - sync_policy_handlers[ziter.second.id].reset(new RGWBucketSyncPolicyHandler(this, sync_modules_svc, ziter.second.name)); + auto zone_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc, bucket_sync_svc, ziter.second.name); + ret = zone_handler->init(null_yield); + if (ret < 0) { + lderr(cct) << "ERROR: could not initialize zone policy handler for zone=" << ziter.second.name << dendl; + return ret; + } + sync_policy_handlers[ziter.second.id].reset(zone_handler); } sync_policy_handler = sync_policy_handlers[zone_id()]; /* we made sure earlier that zonegroup->zones has our zone */ diff --git a/src/rgw/services/svc_zone.h b/src/rgw/services/svc_zone.h index 50403343d36..e20575f1632 100644 --- a/src/rgw/services/svc_zone.h +++ b/src/rgw/services/svc_zone.h @@ -9,6 +9,7 @@ class RGWSI_RADOS; class RGWSI_SysObj; class RGWSI_SyncModules; +class RGWSI_Bucket_Sync; class RGWRealm; class RGWZoneGroup; @@ -30,6 +31,7 @@ class RGWSI_Zone : public RGWServiceInstance RGWSI_SysObj *sysobj_svc{nullptr}; RGWSI_RADOS *rados_svc{nullptr}; RGWSI_SyncModules *sync_modules_svc{nullptr}; + RGWSI_Bucket_Sync *bucket_sync_svc{nullptr}; RGWRealm *realm{nullptr}; RGWZoneGroup *zonegroup{nullptr}; @@ -55,7 +57,8 @@ class RGWSI_Zone : public RGWServiceInstance void init(RGWSI_SysObj *_sysobj_svc, RGWSI_RADOS *_rados_svc, - RGWSI_SyncModules *_sync_modules_svc); + RGWSI_SyncModules *_sync_modules_svc, + RGWSI_Bucket_Sync *_bucket_sync_svc); int do_start() override; void shutdown() override; |