summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2019-11-06 03:12:05 +0100
committerYehuda Sadeh <yehuda@redhat.com>2020-01-28 19:20:38 +0100
commitfb134d93d380a0135dc45b09486b9548b401a7ac (patch)
tree5f4e5e22e36720ca8deab1a769c7a11fb697cec8
parentrgw: identify potential related (for sync) buckets on bucket update (diff)
downloadceph-fb134d93d380a0135dc45b09486b9548b401a7ac.tar.xz
ceph-fb134d93d380a0135dc45b09486b9548b401a7ac.zip
rgw: manage bucket sync deps index
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r--src/cls/version/cls_version_types.h9
-rw-r--r--src/rgw/rgw_admin.cc46
-rw-r--r--src/rgw/rgw_bucket.cc3
-rw-r--r--src/rgw/rgw_bucket_sync.cc39
-rw-r--r--src/rgw/rgw_bucket_sync.h20
-rw-r--r--src/rgw/rgw_service.cc6
-rw-r--r--src/rgw/services/svc_bucket.h1
-rw-r--r--src/rgw/services/svc_bucket_sobj.cc21
-rw-r--r--src/rgw/services/svc_bucket_sobj.h1
-rw-r--r--src/rgw/services/svc_bucket_sync.h11
-rw-r--r--src/rgw/services/svc_bucket_sync_sobj.cc570
-rw-r--r--src/rgw/services/svc_bucket_sync_sobj.h38
-rw-r--r--src/rgw/services/svc_zone.cc12
-rw-r--r--src/rgw/services/svc_zone.h5
14 files changed, 755 insertions, 27 deletions
diff --git a/src/cls/version/cls_version_types.h b/src/cls/version/cls_version_types.h
index 852183f30e5..ffcb73fa115 100644
--- a/src/cls/version/cls_version_types.h
+++ b/src/cls/version/cls_version_types.h
@@ -36,15 +36,20 @@ struct obj_version {
tag.clear();
}
- bool empty() {
+ bool empty() const {
return tag.empty();
}
- bool compare(struct obj_version *v) {
+ bool compare(struct obj_version *v) const {
return (ver == v->ver &&
tag.compare(v->tag) == 0);
}
+ bool operator==(const struct obj_version& v) const {
+ return (ver == v.ver &&
+ tag.compare(v.tag) == 0);
+ }
+
void dump(Formatter *f) const;
void decode_json(JSONObj *obj);
static void generate_test_instances(list<obj_version*>& o);
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc
index 1c5d5b574b8..11ceacaaaa8 100644
--- a/src/rgw/rgw_admin.cc
+++ b/src/rgw/rgw_admin.cc
@@ -2331,6 +2331,16 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pse
}
}
+static std::vector<string> convert_bucket_set_to_str_vec(const std::set<rgw_bucket>& bs)
+{
+ std::vector<string> result;
+ result.reserve(bs.size());
+ for (auto& b : bs) {
+ result.push_back(b.get_key());
+ }
+ return std::move(result);
+}
+
static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter)
{
std::optional<string> zone_id;
@@ -2369,6 +2379,12 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
bucket_handler.reset(handler->alloc_child(*eff_bucket, nullopt));
}
+ ret = bucket_handler->init(null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: failed to init bucket sync policy handler: " << cpp_strerror(-ret) << " (ret=" << ret << ")" << std::endl;
+ return ret;
+ }
+
handler = bucket_handler;
}
@@ -2377,10 +2393,40 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
handler->get_pipes(&sources, &dests);
+ auto source_hints_vec = convert_bucket_set_to_str_vec(handler->get_source_hints());
+ auto target_hints_vec = convert_bucket_set_to_str_vec(handler->get_target_hints());
+
+ RGWBucketSyncFlowManager::pipe_set *resolved_sources;
+ RGWBucketSyncFlowManager::pipe_set *resolved_dests;
+
+ for (auto& b : handler->get_source_hints()) {
+ RGWBucketInfo hint_bucket_info;
+ rgw_bucket hint_bucket;
+ int ret = init_bucket(b, hint_bucket_info, hint_bucket);
+ if (ret < 0) {
+ ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
+ continue;
+ }
+
+ RGWBucketSyncPolicyHandlerRef hint_bucket_handler;
+ hint_bucket_handler.reset(handler->alloc_child(hint_bucket_indo));
+
+ }
+
{
Formatter::ObjectSection os(*formatter, "result");
encode_json("sources", *sources, formatter);
encode_json("dests", *dests, formatter);
+ {
+ Formatter::ObjectSection hints_section(*formatter, "hints");
+ encode_json("sources", source_hints_vec, formatter);
+ encode_json("dests", target_hints_vec, formatter);
+ }
+ {
+ Formatter::ObjectSection resolved_hints_section(*formatter, "resolved-hints");
+ encode_json("resolved-hints", *sources, formatter);
+ encode_json("dests", *dests, formatter);
+ }
}
formatter->flush(cout);
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index 16adc9df3b4..a433b30ba18 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -2926,7 +2926,7 @@ public:
if (ret < 0 && ret != -ENOENT)
return ret;
- return svc.bucket->remove_bucket_instance_info(ctx, entry, &bci.info.objv_tracker, y);
+ return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y);
}
int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
@@ -3267,6 +3267,7 @@ int RGWBucketCtl::remove_bucket_instance_info(const rgw_bucket& bucket,
return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& ctx) {
return svc.bucket->remove_bucket_instance_info(ctx,
RGWSI_Bucket::get_bi_meta_key(bucket),
+ info,
&info.objv_tracker,
y);
});
diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc
index 184a31becff..d6e07df72d5 100644
--- a/src/rgw/rgw_bucket_sync.cc
+++ b/src/rgw/rgw_bucket_sync.cc
@@ -6,6 +6,7 @@
#include "rgw_zone.h"
#include "services/svc_zone.h"
+#include "services/svc_bucket_sync.h"
#define dout_subsys ceph_subsys_rgw
@@ -344,6 +345,7 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
RGWBucketSyncFlowManager::pipe_set *source_pipes,
RGWBucketSyncFlowManager::pipe_set *dest_pipes,
+ std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const
{
@@ -352,7 +354,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
entity.bucket = effective_bucket.value_or(rgw_bucket());
if (parent) {
- parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled);
+ parent->reflect(effective_bucket, source_pipes, dest_pipes, filter_peer_bucket, only_enabled);
}
for (auto& item : flow_groups) {
@@ -369,6 +371,9 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
if (!pipe.dest.match_bucket(effective_bucket)) {
continue;
}
+ if (!pipe.source.match_bucket(filter_peer_bucket)) {
+ continue;
+ }
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
@@ -382,6 +387,9 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
if (!pipe.source.match_bucket(effective_bucket)) {
continue;
}
+ if (!pipe.dest.match_bucket(filter_peer_bucket)) {
+ continue;
+ }
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
@@ -449,7 +457,9 @@ void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc,
RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
RGWSI_SyncModules *sync_modules_svc,
- std::optional<string> effective_zone) : zone_svc(_zone_svc) {
+ RGWSI_Bucket_Sync *_bucket_sync_svc,
+ std::optional<string> effective_zone) : zone_svc(_zone_svc) ,
+ bucket_sync_svc(_bucket_sync_svc) {
zone_name = effective_zone.value_or(zone_svc->zone_name());
flow_mgr.reset(new RGWBucketSyncFlowManager(zone_name,
nullopt,
@@ -459,8 +469,6 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
if (sync_policy.empty()) {
RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy);
}
-
- init();
}
RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
@@ -471,10 +479,10 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy
}
bucket = _bucket_info.bucket;
zone_svc = parent->zone_svc;
+ bucket_sync_svc = parent->bucket_sync_svc;
flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name,
_bucket_info.bucket,
parent->flow_mgr.get()));
- init();
}
RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
@@ -485,10 +493,10 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy
}
bucket = _bucket;
zone_svc = parent->zone_svc;
+ bucket_sync_svc = parent->bucket_sync_svc;
flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name,
_bucket,
parent->flow_mgr.get()));
- init();
}
RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info) const
@@ -502,8 +510,19 @@ RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bu
return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy);
}
-void RGWBucketSyncPolicyHandler::init()
+int RGWBucketSyncPolicyHandler::init(std::optional<rgw_bucket> filter_peer_bucket,
+ optional_yield y)
{
+ int r = bucket_sync_svc->get_bucket_sync_hints(bucket.value_or(rgw_bucket()),
+ &source_hints,
+ &target_hints,
+ y);
+ if (r < 0) {
+ ldout(bucket_sync_svc->ctx(), 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket="
+ << bucket << " returned r=" << r << dendl;
+ return r;
+ }
+
flow_mgr->init(sync_policy);
reflect(&sources_by_name,
@@ -512,7 +531,10 @@ void RGWBucketSyncPolicyHandler::init()
&targets,
&source_zones,
&target_zones,
+ filter_peer_bucket,
true);
+
+ return 0;
}
void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
@@ -521,6 +543,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
std::set<string> *psource_zones,
std::set<string> *ptarget_zones,
+ std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const
{
RGWBucketSyncFlowManager::pipe_set _sources_by_name;
@@ -530,7 +553,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
std::set<string> _source_zones;
std::set<string> _target_zones;
- flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled);
+ flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, filter_peer_bucket, only_enabled);
for (auto& pipe : _sources_by_name.pipes) {
if (!pipe.source.zone) {
diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h
index 9a0c447ace1..4be4dda26c6 100644
--- a/src/rgw/rgw_bucket_sync.h
+++ b/src/rgw/rgw_bucket_sync.h
@@ -21,6 +21,7 @@
class RGWSI_Zone;
class RGWSI_SyncModules;
+class RGWSI_Bucket_Sync;
struct rgw_sync_group_pipe_map;
struct rgw_sync_bucket_pipes;
@@ -167,6 +168,7 @@ public:
void reflect(std::optional<rgw_bucket> effective_bucket,
pipe_set *flow_by_source,
pipe_set *flow_by_dest,
+ std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const;
};
@@ -174,6 +176,7 @@ public:
class RGWBucketSyncPolicyHandler {
const RGWBucketSyncPolicyHandler *parent{nullptr};
RGWSI_Zone *zone_svc;
+ RGWSI_Bucket_Sync *bucket_sync_svc;
string zone_name;
std::optional<RGWBucketInfo> bucket_info;
std::optional<rgw_bucket> bucket;
@@ -189,6 +192,9 @@ class RGWBucketSyncPolicyHandler {
std::set<string> source_zones; /* source zones by name */
std::set<string> target_zones; /* target zones by name */
+ std::set<rgw_bucket> source_hints;
+ std::set<rgw_bucket> target_hints;
+
bool bucket_is_sync_source() const {
return !targets.empty();
}
@@ -197,8 +203,6 @@ class RGWBucketSyncPolicyHandler {
return !sources.empty();
}
- void init();
-
RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
const RGWBucketInfo& _bucket_info);
@@ -208,18 +212,22 @@ class RGWBucketSyncPolicyHandler {
public:
RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
RGWSI_SyncModules *sync_modules_svc,
+ RGWSI_Bucket_Sync *bucket_sync_svc,
std::optional<string> effective_zone = std::nullopt);
RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info) const;
RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
std::optional<rgw_sync_policy_info> sync_policy) const;
+ int init(std::optional<rgw_bucket> filter_peer_bucket, optional_yield y);
+
void reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
RGWBucketSyncFlowManager::pipe_set *ptargets_by_name,
map<string, RGWBucketSyncFlowManager::pipe_set> *psources,
map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
std::set<string> *psource_zones,
std::set<string> *ptarget_zones,
+ std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const;
const std::set<string>& get_source_zones() const {
@@ -247,6 +255,14 @@ public:
*targets = &targets_by_name;
}
+ const std::set<rgw_bucket>& get_source_hints() const {
+ return source_hints;
+ }
+
+ const std::set<rgw_bucket>& get_target_hints() const {
+ return target_hints;
+ }
+
bool bucket_exports_data() const;
bool bucket_imports_data() const;
};
diff --git a/src/rgw/rgw_service.cc b/src/rgw/rgw_service.cc
index c81817f546c..00803b81a85 100644
--- a/src/rgw/rgw_service.cc
+++ b/src/rgw/rgw_service.cc
@@ -82,7 +82,9 @@ int RGWServices_Def::init(CephContext *cct,
bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
bi_rados.get(), meta.get(), meta_be_sobj.get(),
sync_modules.get(), bucket_sync_sobj.get());
- bucket_sync_sobj->init(zone.get(), sysobj_cache.get(),
+ bucket_sync_sobj->init(zone.get(),
+ sysobj.get(),
+ sysobj_cache.get(),
bucket_sobj.get());
cls->init(zone.get(), rados.get());
datalog_rados->init(zone.get(), cls.get());
@@ -93,7 +95,7 @@ int RGWServices_Def::init(CephContext *cct,
notify->init(zone.get(), rados.get(), finisher.get());
otp->init(zone.get(), meta.get(), meta_be_otp.get());
rados->init();
- zone->init(sysobj.get(), rados.get(), sync_modules.get());
+ zone->init(sysobj.get(), rados.get(), sync_modules.get(), bucket_sync_sobj.get());
zone_utils->init(rados.get(), zone.get());
quota->init(zone.get());
sync_modules->init(zone.get());
diff --git a/src/rgw/services/svc_bucket.h b/src/rgw/services/svc_bucket.h
index b47b8f711af..7e39302f43c 100644
--- a/src/rgw/services/svc_bucket.h
+++ b/src/rgw/services/svc_bucket.h
@@ -86,6 +86,7 @@ public:
virtual int remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
const string& key,
+ const RGWBucketInfo& bucket_info,
RGWObjVersionTracker *objv_tracker,
optional_yield y) = 0;
diff --git a/src/rgw/services/svc_bucket_sobj.cc b/src/rgw/services/svc_bucket_sobj.cc
index 0b87485fdeb..b276969e72d 100644
--- a/src/rgw/services/svc_bucket_sobj.cc
+++ b/src/rgw/services/svc_bucket_sobj.cc
@@ -533,7 +533,8 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
if (ret >= 0) {
int r = svc.bucket_sync->handle_bi_update(info,
- orig_info.value_or(nullptr));
+ orig_info.value_or(nullptr),
+ y);
if (r < 0) {
return r;
}
@@ -558,11 +559,27 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
int RGWSI_Bucket_SObj::remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
const string& key,
+ const RGWBucketInfo& info,
RGWObjVersionTracker *objv_tracker,
optional_yield y)
{
RGWSI_MBSObj_RemoveParams params;
- return svc.meta_be->remove_entry(ctx.get(), key, params, objv_tracker, y);
+ int ret = svc.meta_be->remove_entry(ctx.get(), key, params, objv_tracker, y);
+
+ if (ret < 0 &&
+ ret != -ENOENT) {
+ return ret;
+ }
+
+ int r = svc.bucket_sync->handle_bi_removal(info, y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update bucket instance sync index: r=" << r << dendl;
+ /* returning success as index is just keeping hints, so will keep extra hints,
+ * but bucket removal succeeded
+ */
+ }
+
+ return 0;
}
int RGWSI_Bucket_SObj::read_bucket_stats(const RGWBucketInfo& bucket_info,
diff --git a/src/rgw/services/svc_bucket_sobj.h b/src/rgw/services/svc_bucket_sobj.h
index 80695ba2f73..744f4a8931e 100644
--- a/src/rgw/services/svc_bucket_sobj.h
+++ b/src/rgw/services/svc_bucket_sobj.h
@@ -153,6 +153,7 @@ public:
int remove_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
const string& key,
+ const RGWBucketInfo& bucket_info,
RGWObjVersionTracker *objv_tracker,
optional_yield y) override;
diff --git a/src/rgw/services/svc_bucket_sync.h b/src/rgw/services/svc_bucket_sync.h
index ed709620276..250e874f006 100644
--- a/src/rgw/services/svc_bucket_sync.h
+++ b/src/rgw/services/svc_bucket_sync.h
@@ -35,8 +35,17 @@ public:
std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y) = 0;
+
virtual int handle_bi_update(RGWBucketInfo& bucket_info,
- RGWBucketInfo *orig_bucket_info) = 0;
+ RGWBucketInfo *orig_bucket_info,
+ optional_yield y) = 0;
+ virtual int handle_bi_removal(const RGWBucketInfo& bucket_info,
+ optional_yield y) = 0;
+
+ virtual int get_bucket_sync_hints(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests,
+ optional_yield y) = 0;
};
diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc
index 935e0ea776a..c849011e236 100644
--- a/src/rgw/services/svc_bucket_sync_sobj.cc
+++ b/src/rgw/services/svc_bucket_sync_sobj.cc
@@ -4,20 +4,28 @@
#include "svc_bucket_sobj.h"
#include "rgw/rgw_bucket_sync.h"
+#include "rgw/rgw_zone.h"
#define dout_subsys ceph_subsys_rgw
+static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints";
+static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints";
RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
}
void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc,
+ RGWSI_SysObj *_sysobj_svc,
RGWSI_SysObj_Cache *_cache_svc,
RGWSI_Bucket_SObj *bucket_sobj_svc)
{
svc.zone = _zone_svc;
+ svc.sysobj = _sysobj_svc;
svc.cache = _cache_svc;
svc.bucket_sobj = bucket_sobj_svc;
+
+ hint_index_mgr.init(svc.zone,
+ svc.sysobj);
}
int RGWSI_Bucket_Sync_SObj::do_start()
@@ -79,6 +87,12 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info));
+ r = e.handler->init(y);
+ if (r < 0) {
+ ldout(cct, 20) << "ERROR: failed to init bucket sync policy handler: r=" << r << dendl;
+ return r;
+ }
+
if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
}
@@ -88,7 +102,7 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
return 0;
}
-static void diff_sets(std::set<rgw_bucket>& orig_set,
+static bool diff_sets(std::set<rgw_bucket>& orig_set,
std::set<rgw_bucket>& new_set,
vector<rgw_bucket> *added,
vector<rgw_bucket> *removed)
@@ -118,10 +132,499 @@ static void diff_sets(std::set<rgw_bucket>& orig_set,
for (; niter != new_set.end(); ++niter) {
added->push_back(*niter);
}
+
+ return !(removed->empty() && added->empty());
+}
+
+
+class RGWSI_BS_SObj_HintIndexObj
+{
+ friend class RGWSI_Bucket_Sync_SObj;
+
+ CephContext *cct;
+ struct {
+ RGWSI_SysObj *sysobj;
+ } svc;
+
+ RGWSysObjectCtx obj_ctx;
+ rgw_raw_obj obj;
+ RGWSysObj sysobj;
+
+ RGWObjVersionTracker ot;
+
+ bool has_data{false};
+
+public:
+ struct bi_entry {
+ rgw_bucket bucket;
+ map<rgw_bucket /* info_source */, obj_version> sources;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(bucket, bl);
+ encode(sources, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(bucket, bl);
+ decode(sources, bl);
+ DECODE_FINISH(bl);
+ }
+
+ bool add(const rgw_bucket& info_source,
+ const obj_version& info_source_ver) {
+ auto& ver = sources[info_source];
+
+ if (ver == info_source_ver) { /* already updated */
+ return false;
+ }
+
+ if (info_source_ver.tag == ver.tag &&
+ info_source_ver.ver < ver.ver) {
+ return false;
+ }
+
+ ver = info_source_ver;
+
+ return true;
+ }
+
+ bool remove(const rgw_bucket& info_source,
+ const obj_version& info_source_ver) {
+ auto iter = sources.find(info_source);
+ if (iter == sources.end()) {
+ return false;
+ }
+
+ auto& ver = iter->second;
+
+ if (info_source_ver.tag == ver.tag &&
+ info_source_ver.ver < ver.ver) {
+ return false;
+ }
+
+ sources.erase(info_source);
+ return true;
+ }
+
+ bool empty() const {
+ return sources.empty();
+ }
+ };
+
+ struct single_instance_info {
+ map<rgw_bucket, bi_entry> entries;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+
+ bool add_entry(const rgw_bucket& info_source,
+ const obj_version& info_source_ver,
+ const rgw_bucket& bucket) {
+ auto& entry = entries[bucket];
+
+ if (!entry.add(info_source, info_source_ver)) {
+ return false;
+ }
+
+ entry.bucket = bucket;
+
+ return true;
+ }
+
+ bool remove_entry(const rgw_bucket& info_source,
+ const obj_version& info_source_ver,
+ const rgw_bucket& bucket) {
+ auto iter = entries.find(bucket);
+ if (iter == entries.end()) {
+ return false;
+ }
+
+ if (!iter->second.remove(info_source, info_source_ver)) {
+ return false;
+ }
+
+ if (iter->second.empty()) {
+ entries.erase(iter);
+ }
+
+ return true;
+ }
+
+ void clear() {
+ entries.clear();
+ }
+
+ bool empty() const {
+ return entries.empty();
+ }
+
+ void get_entities(std::set<rgw_bucket> *result) const {
+ for (auto& iter : entries) {
+ result->insert(iter.first);
+ }
+ }
+ };
+
+ struct info_map {
+ map<rgw_bucket, single_instance_info> instances;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(instances, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(instances, bl);
+ DECODE_FINISH(bl);
+ }
+
+ bool empty() const {
+ return instances.empty();
+ }
+
+ void clear() {
+ instances.clear();
+ }
+
+ void get_entities(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *result) const {
+ auto iter = instances.find(bucket);
+ if (iter == instances.end()) {
+ return;
+ }
+ iter->second.get_entities(result);
+ }
+ } info;
+
+ RGWSI_BS_SObj_HintIndexObj(RGWSI_SysObj *_sysobj_svc,
+ const rgw_raw_obj& _obj) : cct(_sysobj_svc->ctx()),
+ obj_ctx(_sysobj_svc->init_obj_ctx()),
+ obj(_obj),
+ sysobj(obj_ctx.get_obj(obj))
+ {
+ svc.sysobj = _sysobj_svc;
+ }
+
+ int update(const rgw_bucket& entity,
+ const RGWBucketInfo& info_source,
+ std::optional<std::vector<rgw_bucket> > add,
+ std::optional<std::vector<rgw_bucket> > remove,
+ optional_yield y);
+
+private:
+ void update_entries(const rgw_bucket& info_source,
+ const obj_version& info_source_ver,
+ std::optional<std::vector<rgw_bucket> > add,
+ std::optional<std::vector<rgw_bucket> > remove,
+ single_instance_info *instance);
+
+ int read(optional_yield y);
+ int flush(optional_yield y);
+
+ void invalidate() {
+ has_data = false;
+ info.clear();
+ }
+
+ void get_entities(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *result) const {
+ info.get_entities(bucket, result);
+ }
+};
+WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry)
+WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info)
+WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map)
+
+int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity,
+ const RGWBucketInfo& info_source,
+ std::optional<std::vector<rgw_bucket> > add,
+ std::optional<std::vector<rgw_bucket> > remove,
+ optional_yield y)
+{
+ int r = 0;
+
+ auto& info_source_ver = info_source.objv_tracker.read_version;
+
+#define MAX_RETRIES 25
+
+ for (int i = 0; i < MAX_RETRIES; ++i) {
+ if (!has_data) {
+ r = read(y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: cannot update hint index: failed to read: r=" << r << dendl;
+ return r;
+ }
+ }
+
+ auto& instance = info.instances[entity];
+
+ update_entries(info_source.bucket,
+ info_source_ver,
+ add, remove,
+ &instance);
+
+ if (instance.empty()) {
+ info.instances.erase(entity);
+ }
+
+ r = flush(y);
+ if (r >= 0) {
+ return 0;
+ }
+
+ if (r != -ECANCELED) {
+ ldout(cct, 0) << "ERROR: failed to flush hint index: obj=" << obj << " r=" << r << dendl;
+ return r;
+ }
+ }
+ ldout(cct, 0) << "ERROR: failed to flush hint index: too many retries (obj=" << obj << "), likely a bug" << dendl;
+
+ return -EIO;
+}
+
+void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source,
+ const obj_version& info_source_ver,
+ std::optional<std::vector<rgw_bucket> > add,
+ std::optional<std::vector<rgw_bucket> > remove,
+ single_instance_info *instance)
+{
+ if (remove) {
+ for (auto& bucket : *remove) {
+ instance->remove_entry(info_source, info_source_ver, bucket);
+ }
+ }
+
+ if (add) {
+ for (auto& bucket : *add) {
+ instance->add_entry(info_source, info_source_ver, bucket);
+ }
+ }
+}
+
+int RGWSI_BS_SObj_HintIndexObj::read(optional_yield y) {
+ RGWObjVersionTracker _ot;
+ bufferlist bl;
+ int r = sysobj.rop()
+ .set_objv_tracker(&_ot) /* forcing read of current version */
+ .read(&bl, y);
+ if (r < 0 && r != -ENOENT) {
+ ldout(cct, 0) << "ERROR: failed reading data (obj=" << obj << "), r=" << r << dendl;
+ return r;
+ }
+
+ ot = _ot;
+
+ if (r >= 0) {
+ auto iter = bl.cbegin();
+ try {
+ decode(info, iter);
+ has_data = true;
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to decode entries, ignoring" << dendl;
+ info.clear();
+ }
+ } else {
+ info.clear();
+ }
+
+ return 0;
+}
+
+int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y) {
+ int r;
+
+ if (!info.empty()) {
+ bufferlist bl;
+ encode(info, bl);
+
+ r = sysobj.wop()
+ .set_objv_tracker(&ot) /* forcing read of current version */
+ .write(bl, y);
+
+ } else { /* remove */
+ r = sysobj.wop()
+ .set_objv_tracker(&ot)
+ .remove(y);
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const
+{
+ rgw_bucket b = bucket;
+ b.bucket_id.clear();
+ return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
+ bucket_sync_sources_oid_prefix + "." + b.get_key());
+}
+
+rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const
+{
+ rgw_bucket b = bucket;
+ b.bucket_id.clear();
+ return rgw_raw_obj(svc.zone->get_zone_params().log_pool,
+ bucket_sync_targets_oid_prefix + "." + b.get_key());
+}
+
+int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info,
+ std::vector<rgw_bucket>& added_dests,
+ std::vector<rgw_bucket>& removed_dests,
+ std::vector<rgw_bucket>& added_sources,
+ std::vector<rgw_bucket>& removed_sources,
+ optional_yield y)
+{
+ std::vector<rgw_bucket> self_entity;
+ self_entity.push_back(bucket_info.bucket);
+
+ if (!added_dests.empty() ||
+ !removed_dests.empty()) {
+ /* update our dests */
+ RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+ hint_index_mgr.get_dests_obj(bucket_info.bucket));
+ int r = index.update(bucket_info.bucket,
+ bucket_info,
+ added_dests,
+ removed_dests,
+ y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
+ return r;
+ }
+
+ /* update added dest buckets */
+ for (auto& dest_bucket : added_dests) {
+ RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+ hint_index_mgr.get_sources_obj(dest_bucket));
+ int r = dep_index.update(dest_bucket,
+ bucket_info,
+ self_entity,
+ std::nullopt,
+ y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
+ return r;
+ }
+ }
+ /* update removed dest buckets */
+ for (auto& dest_bucket : removed_dests) {
+ RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+ hint_index_mgr.get_sources_obj(dest_bucket));
+ int r = dep_index.update(dest_bucket,
+ bucket_info,
+ std::nullopt,
+ self_entity,
+ y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
+ return r;
+ }
+ }
+ }
+
+ if (!added_dests.empty() ||
+ !removed_dests.empty()) {
+ RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+ hint_index_mgr.get_sources_obj(bucket_info.bucket));
+ /* update our sources */
+ int r = index.update(bucket_info.bucket,
+ bucket_info,
+ added_sources,
+ removed_sources,
+ y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
+ return r;
+ }
+
+ /* update added sources buckets */
+ for (auto& source_bucket : added_sources) {
+ RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+ hint_index_mgr.get_dests_obj(source_bucket));
+ int r = dep_index.update(source_bucket,
+ bucket_info,
+ self_entity,
+ std::nullopt,
+ y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
+ return r;
+ }
+ }
+ /* update removed dest buckets */
+ for (auto& source_bucket : removed_sources) {
+ RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
+ hint_index_mgr.get_dests_obj(source_bucket));
+ int r = dep_index.update(source_bucket,
+ bucket_info,
+ std::nullopt,
+ self_entity,
+ y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
+ return r;
+ }
+ }
+ }
+
+ return 0;
+}
+
+int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo& bucket_info,
+ optional_yield y)
+{
+ std::set<rgw_bucket> sources_set;
+ std::set<rgw_bucket> dests_set;
+
+ if (bucket_info.sync_policy) {
+ bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
+ &sources_set,
+ &dests_set);
+ }
+
+ std::vector<rgw_bucket> removed_sources;
+ removed_sources.reserve(sources_set.size());
+ for (auto& e : sources_set) {
+ removed_sources.push_back(e);
+ }
+
+ std::vector<rgw_bucket> removed_dests;
+ removed_dests.reserve(dests_set.size());
+ for (auto& e : dests_set) {
+ removed_dests.push_back(e);
+ }
+
+ std::vector<rgw_bucket> added_sources;
+ std::vector<rgw_bucket> added_dests;
+
+ return do_update_hints(bucket_info,
+ added_dests,
+ removed_dests,
+ added_sources,
+ removed_sources,
+ y);
}
int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
- RGWBucketInfo *orig_bucket_info)
+ RGWBucketInfo *orig_bucket_info,
+ optional_yield y)
{
std::set<rgw_bucket> orig_sources;
std::set<rgw_bucket> orig_dests;
@@ -143,16 +646,73 @@ int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
std::vector<rgw_bucket> removed_sources;
std::vector<rgw_bucket> added_sources;
- diff_sets(orig_sources, sources, &added_sources, &removed_sources);
+ bool found = diff_sets(orig_sources, sources, &added_sources, &removed_sources);
ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl;
ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl;
std::vector<rgw_bucket> removed_dests;
std::vector<rgw_bucket> added_dests;
- diff_sets(orig_dests, dests, &added_dests, &removed_dests);
+ found = found || diff_sets(orig_dests, dests, &added_dests, &removed_dests);
+
ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl;
ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl;
- return 0;
+ if (!found) {
+ return 0;
+ }
+
+ return do_update_hints(bucket_info,
+ added_dests,
+ removed_dests,
+ added_sources,
+ removed_sources,
+ y);
+}
+
+int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests,
+ optional_yield y)
+{
+ if (!sources && !dests) {
+ return 0;
+ }
+
+ if (sources) {
+ RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+ hint_index_mgr.get_sources_obj(bucket));
+ int r = index.read(y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl;
+ return r;
+ }
+ index.get_entities(bucket, dests);
+
+ if (!bucket.bucket_id.empty()) {
+ rgw_bucket b = bucket;
+ b.bucket_id.clear();
+ index.get_entities(bucket, dests);
+ }
+ }
+
+ if (dests) {
+ RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
+ hint_index_mgr.get_dests_obj(bucket));
+ int r = index.read(y);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl;
+ return r;
+ }
+
+ index.get_entities(bucket, dests);
+
+ if (!bucket.bucket_id.empty()) {
+ rgw_bucket b = bucket;
+ b.bucket_id.clear();
+ index.get_entities(bucket, dests);
+ }
+ }
+
+ return 0;
}
diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h
index 0271d9faff6..a0305e12b4d 100644
--- a/src/rgw/services/svc_bucket_sync_sobj.h
+++ b/src/rgw/services/svc_bucket_sync_sobj.h
@@ -38,10 +38,37 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl<bucket_sync_policy_cache_entry>;
unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache;
+ class HintIndexManager {
+ struct {
+ RGWSI_Zone *zone;
+ RGWSI_SysObj *sysobj;
+ } svc;
+
+ public:
+ HintIndexManager() {}
+
+ void init(RGWSI_Zone *_zone_svc,
+ RGWSI_SysObj *_sysobj_svc) {
+ svc.zone = _zone_svc;
+ svc.sysobj = _sysobj_svc;
+ }
+
+ rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const;
+ rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const;
+ } hint_index_mgr;
+
int do_start() override;
+
+ int do_update_hints(const RGWBucketInfo& bucket_info,
+ std::vector<rgw_bucket>& added_dests,
+ std::vector<rgw_bucket>& removed_dests,
+ std::vector<rgw_bucket>& added_sources,
+ std::vector<rgw_bucket>& removed_sources,
+ optional_yield y);
public:
struct Svc {
RGWSI_Zone *zone{nullptr};
+ RGWSI_SysObj *sysobj{nullptr};
RGWSI_SysObj_Cache *cache{nullptr};
RGWSI_Bucket_SObj *bucket_sobj{nullptr};
} svc;
@@ -50,6 +77,7 @@ public:
~RGWSI_Bucket_Sync_SObj();
void init(RGWSI_Zone *_zone_svc,
+ RGWSI_SysObj *_sysobj_svc,
RGWSI_SysObj_Cache *_cache_svc,
RGWSI_Bucket_SObj *_bucket_sobj_svc);
@@ -61,6 +89,14 @@ public:
optional_yield y) override;
int handle_bi_update(RGWBucketInfo& bucket_info,
- RGWBucketInfo *orig_bucket_info) override;
+ RGWBucketInfo *orig_bucket_info,
+ optional_yield y) override;
+ int handle_bi_removal(const RGWBucketInfo& bucket_info,
+ optional_yield y) override;
+
+ int get_bucket_sync_hints(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests,
+ optional_yield y) override;
};
diff --git a/src/rgw/services/svc_zone.cc b/src/rgw/services/svc_zone.cc
index c86d28c591f..511984f9a29 100644
--- a/src/rgw/services/svc_zone.cc
+++ b/src/rgw/services/svc_zone.cc
@@ -23,11 +23,13 @@ RGWSI_Zone::RGWSI_Zone(CephContext *cct) : RGWServiceInstance(cct)
void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc,
RGWSI_RADOS * _rados_svc,
- RGWSI_SyncModules * _sync_modules_svc)
+ RGWSI_SyncModules * _sync_modules_svc,
+ RGWSI_Bucket_Sync *_bucket_sync_svc)
{
sysobj_svc = _sysobj_svc;
rados_svc = _rados_svc;
sync_modules_svc = _sync_modules_svc;
+ bucket_sync_svc = _bucket_sync_svc;
realm = new RGWRealm();
zonegroup = new RGWZoneGroup();
@@ -164,7 +166,13 @@ int RGWSI_Zone::do_start()
zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id());
for (auto ziter : zonegroup->zones) {
- sync_policy_handlers[ziter.second.id].reset(new RGWBucketSyncPolicyHandler(this, sync_modules_svc, ziter.second.name));
+ auto zone_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc, bucket_sync_svc, ziter.second.name);
+ ret = zone_handler->init(null_yield);
+ if (ret < 0) {
+ lderr(cct) << "ERROR: could not initialize zone policy handler for zone=" << ziter.second.name << dendl;
+ return ret;
+ }
+ sync_policy_handlers[ziter.second.id].reset(zone_handler);
}
sync_policy_handler = sync_policy_handlers[zone_id()]; /* we made sure earlier that zonegroup->zones has our zone */
diff --git a/src/rgw/services/svc_zone.h b/src/rgw/services/svc_zone.h
index 50403343d36..e20575f1632 100644
--- a/src/rgw/services/svc_zone.h
+++ b/src/rgw/services/svc_zone.h
@@ -9,6 +9,7 @@
class RGWSI_RADOS;
class RGWSI_SysObj;
class RGWSI_SyncModules;
+class RGWSI_Bucket_Sync;
class RGWRealm;
class RGWZoneGroup;
@@ -30,6 +31,7 @@ class RGWSI_Zone : public RGWServiceInstance
RGWSI_SysObj *sysobj_svc{nullptr};
RGWSI_RADOS *rados_svc{nullptr};
RGWSI_SyncModules *sync_modules_svc{nullptr};
+ RGWSI_Bucket_Sync *bucket_sync_svc{nullptr};
RGWRealm *realm{nullptr};
RGWZoneGroup *zonegroup{nullptr};
@@ -55,7 +57,8 @@ class RGWSI_Zone : public RGWServiceInstance
void init(RGWSI_SysObj *_sysobj_svc,
RGWSI_RADOS *_rados_svc,
- RGWSI_SyncModules *_sync_modules_svc);
+ RGWSI_SyncModules *_sync_modules_svc,
+ RGWSI_Bucket_Sync *_bucket_sync_svc);
int do_start() override;
void shutdown() override;