summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2019-11-19 19:59:59 +0100
committerYehuda Sadeh <yehuda@redhat.com>2020-01-28 19:20:39 +0100
commitda6e8632fa4089308834b9b87e07d2236da5f1a2 (patch)
tree3fa5be4a8611e555978d69f8e0e2b1c7c7c213b3 /src
parentrgw: find pipe rules param for specific obj changes (diff)
downloadceph-da6e8632fa4089308834b9b87e07d2236da5f1a2.tar.xz
ceph-da6e8632fa4089308834b9b87e07d2236da5f1a2.zip
rgw: filter fetch remote obj after reading remote metadata
In the sync case: find the appropriate pipe params that match this remote object (if matches tags / prefix), and adjust fetch if needed -- abort if doesn't match tags. Later will adjust acls as needed (if user sync and not system). Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Diffstat (limited to 'src')
-rw-r--r--src/rgw/rgw_cr_rados.cc1
-rw-r--r--src/rgw/rgw_cr_rados.h21
-rw-r--r--src/rgw/rgw_data_sync.cc75
-rw-r--r--src/rgw/rgw_putobj_processor.h3
-rw-r--r--src/rgw/rgw_rados.cc62
-rw-r--r--src/rgw/rgw_rados.h27
-rw-r--r--src/rgw/rgw_sync_policy.cc34
-rw-r--r--src/rgw/rgw_sync_policy.h10
8 files changed, 203 insertions, 30 deletions
diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc
index b34337ff431..5ad6bf287ff 100644
--- a/src/rgw/rgw_cr_rados.cc
+++ b/src/rgw/rgw_cr_rados.cc
@@ -621,6 +621,7 @@ int RGWAsyncFetchRemoteObj::_send_request()
NULL, /* void (*progress_cb)(off_t, void *), */
NULL, /* void *progress_data*); */
dpp,
+ filter.get(),
&zones_trace,
&bytes_transferred);
diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h
index 5b689e6d493..f5c8f67b2b8 100644
--- a/src/rgw/rgw_cr_rados.h
+++ b/src/rgw/rgw_cr_rados.h
@@ -931,6 +931,7 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
real_time src_mtime;
bool copy_if_newer;
+ std::shared_ptr<RGWFetchObjFilter> filter;
rgw_zone_set zones_trace;
PerfCounters* counters;
const DoutPrefixProvider *dpp;
@@ -946,7 +947,9 @@ public:
const rgw_obj_key& _key,
const std::optional<rgw_obj_key>& _dest_key,
std::optional<uint64_t> _versioned_epoch,
- bool _if_newer, rgw_zone_set *_zones_trace,
+ bool _if_newer,
+ std::shared_ptr<RGWFetchObjFilter> _filter,
+ rgw_zone_set *_zones_trace,
PerfCounters* counters, const DoutPrefixProvider *dpp)
: RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
@@ -956,7 +959,9 @@ public:
key(_key),
dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer), counters(counters),
+ copy_if_newer(_if_newer),
+ filter(_filter),
+ counters(counters),
dpp(dpp)
{
if (_zones_trace) {
@@ -983,6 +988,8 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
bool copy_if_newer;
+ std::shared_ptr<RGWFetchObjFilter> filter;
+
RGWAsyncFetchRemoteObj *req;
rgw_zone_set *zones_trace;
PerfCounters* counters;
@@ -997,7 +1004,9 @@ public:
const rgw_obj_key& _key,
const std::optional<rgw_obj_key>& _dest_key,
std::optional<uint64_t> _versioned_epoch,
- bool _if_newer, rgw_zone_set *_zones_trace,
+ bool _if_newer,
+ std::shared_ptr<RGWFetchObjFilter> _filter,
+ rgw_zone_set *_zones_trace,
PerfCounters* counters, const DoutPrefixProvider *dpp)
: RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
@@ -1008,7 +1017,9 @@ public:
key(_key),
dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer), req(NULL),
+ copy_if_newer(_if_newer),
+ filter(_filter),
+ req(NULL),
zones_trace(_zones_trace), counters(counters), dpp(dpp) {}
@@ -1026,7 +1037,7 @@ public:
int send_request() override {
req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
source_zone, src_bucket, dest_placement_rule, dest_bucket_info,
- key, dest_key, versioned_epoch, copy_if_newer,
+ key, dest_key, versioned_epoch, copy_if_newer, filter,
zones_trace, counters, dpp);
async_rados->queue(req);
return 0;
diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc
index e0665db8811..b1a6aeca3e3 100644
--- a/src/rgw/rgw_data_sync.cc
+++ b/src/rgw/rgw_data_sync.cc
@@ -1971,13 +1971,78 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl
return 0;
}
+class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
+ rgw_bucket_sync_pipe sync_pipe;
+
+public:
+ RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) {
+ }
+
+ int filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule) override;
+};
+
+int RGWFetchObjFilter_Sync::filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule)
+{
+ rgw_sync_pipe_params params;
+
+ RGWObjTags obj_tags;
+
+ auto iter = obj_attrs.find(RGW_ATTR_TAGS);
+ if (iter != obj_attrs.end()) {
+ try{
+ auto it = iter->second.cbegin();
+ obj_tags.decode(it);
+ } catch (buffer::error &err) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+ }
+ }
+
+ if (!sync_pipe.info.handler.find_obj_params(source_key,
+ obj_tags.get_tags(),
+ &params)) {
+ return -ERR_PRECONDITION_FAILED;
+ }
+
+ if (!dest_placement_rule &&
+ params.dest.storage_class) {
+ dest_rule.storage_class = *params.dest.storage_class;
+ dest_rule.inherit_from(dest_bucket_info.placement_rule);
+ dest_placement_rule = dest_rule;
+ *prule = &dest_rule;
+ }
+
+ return RGWFetchObjFilter_Default::filter(cct,
+ source_key,
+ dest_bucket_info,
+ dest_placement_rule,
+ obj_attrs,
+ prule);
+}
+
+
RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
- return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket,
+
+ auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
+ return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+ sync_pipe.info.source_bs.bucket,
std::nullopt, sync_pipe.dest_bucket_info,
key, std::nullopt, versioned_epoch,
- true, zones_trace, sync_env->counters, sync_env->dpp);
+ true,
+ std::static_pointer_cast<RGWFetchObjFilter>(filter),
+ zones_trace, sync_env->counters, sync_env->dpp);
}
RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
@@ -2054,10 +2119,14 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck
}
}
+ auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
key, dest_key, versioned_epoch,
- true, zones_trace, nullptr, sync_env->dpp);
+ true,
+ std::static_pointer_cast<RGWFetchObjFilter>(filter),
+ zones_trace, nullptr, sync_env->dpp);
}
RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h
index 499b6b09776..76ea90e3e5f 100644
--- a/src/rgw/rgw_putobj_processor.h
+++ b/src/rgw/rgw_putobj_processor.h
@@ -154,6 +154,9 @@ class ManifestObjectProcessor : public HeadObjectProcessor,
}
}
+ void set_tail_placement(const rgw_placement_rule& tpr) {
+ tail_placement_rule = tpr;
+ }
void set_tail_placement(const rgw_placement_rule&& tpr) {
tail_placement_rule = tpr;
}
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index c68c9addb5b..b211cf4033b 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -3616,6 +3616,28 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
return 0;
}
+int RGWFetchObjFilter_Default::filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule)
+{
+ const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
+ if (!ptail_rule) {
+ auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
+ if (iter != obj_attrs.end()) {
+ dest_rule.storage_class = iter->second.to_str();
+ dest_rule.inherit_from(dest_bucket_info.placement_rule);
+ ptail_rule = &dest_rule;
+ } else {
+ ptail_rule = &dest_bucket_info.placement_rule;
+ }
+ }
+ *prule = ptail_rule;
+ return 0;
+}
+
int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
const rgw_user& user_id,
req_info *info,
@@ -3643,6 +3665,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
void (*progress_cb)(off_t, void *),
void *progress_data,
const DoutPrefixProvider *dpp,
+ RGWFetchObjFilter *filter,
rgw_zone_set *zones_trace,
std::optional<uint64_t>* bytes_transferred)
{
@@ -3658,8 +3681,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
- const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
- AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, ptail_rule, user_id,
+ AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, nullptr, user_id,
obj_ctx, dest_obj, olh_epoch, tag, dpp, null_yield);
RGWRESTConn *conn;
auto& zone_conn_map = svc.zone->get_zone_conn_map();
@@ -3685,25 +3707,30 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
conn = iter->second;
}
- string obj_name = dest_obj.bucket.name + "/" + dest_obj.get_oid();
-
boost::optional<RGWPutObj_Compress> compressor;
CompressorRef plugin;
- rgw_placement_rule dest_rule;
+ RGWFetchObjFilter_Default source_filter;
+ if (!filter) {
+ filter = &source_filter;
+ }
+
RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data,
[&](const map<string, bufferlist>& obj_attrs) {
- if (!ptail_rule) {
- auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
- if (iter != obj_attrs.end()) {
- dest_rule.storage_class = iter->second.to_str();
- dest_rule.inherit_from(dest_bucket_info.placement_rule);
- processor.set_tail_placement(std::move(dest_rule));
- ptail_rule = &dest_rule;
- } else {
- ptail_rule = &dest_bucket_info.placement_rule;
- }
+ const rgw_placement_rule *ptail_rule;
+ int ret = filter->filter(cct,
+ src_obj.key,
+ dest_bucket_info,
+ dest_placement_rule,
+ obj_attrs,
+ &ptail_rule);
+ if (ret < 0) {
+ ldout(cct, 5) << "Aborting fetch: source object filter returned ret=" << ret << dendl;
+ return ret;
}
+
+ processor.set_tail_placement(*ptail_rule);
+
const auto& compression_type = svc.zone->get_zone_params().get_compression_type(*ptail_rule);
if (compression_type != "none") {
plugin = Compressor::create(cct, compression_type);
@@ -3713,7 +3740,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
}
}
- int ret = processor.prepare(null_yield);
+ ret = processor.prepare(null_yield);
if (ret < 0) {
return ret;
}
@@ -3992,7 +4019,8 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
dest_placement, src_mtime, mtime, mod_ptr,
unmod_ptr, high_precision_time,
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
- olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp);
+ olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp,
+ nullptr /* filter */);
}
map<string, bufferlist> src_attrs;
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index e730a2f45ab..e9451ea1909 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -202,6 +202,32 @@ struct RGWObjState {
}
};
+class RGWFetchObjFilter {
+public:
+ virtual ~RGWFetchObjFilter() {}
+
+ virtual int filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule) = 0;
+};
+
+class RGWFetchObjFilter_Default : public RGWFetchObjFilter {
+protected:
+ rgw_placement_rule dest_rule;
+public:
+ RGWFetchObjFilter_Default() {}
+
+ int filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule) override;
+};
+
class RGWObjectCtx {
rgw::sal::RGWRadosStore *store;
ceph::shared_mutex lock = ceph::make_shared_mutex("RGWObjectCtx");
@@ -1101,6 +1127,7 @@ public:
void (*progress_cb)(off_t, void *),
void *progress_data,
const DoutPrefixProvider *dpp,
+ RGWFetchObjFilter *filter,
rgw_zone_set *zones_trace= nullptr,
std::optional<uint64_t>* bytes_transferred = 0);
/**
diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc
index 01a1fb6ccde..9047b4b1862 100644
--- a/src/rgw/rgw_sync_policy.cc
+++ b/src/rgw/rgw_sync_policy.cc
@@ -121,18 +121,42 @@ bool rgw_sync_pipe_filter::check_tag(const string& s) const
return true;
}
- for (auto& t : tags) {
- if (t == s) {
+ auto iter = tags.find(rgw_sync_pipe_filter_tag(s));
+ return (iter != tags.end());
+}
+
+bool rgw_sync_pipe_filter::check_tag(const string& k, const string& v) const
+{
+ if (tags.empty()) { /* tag filter wasn't defined */
+ return true;
+ }
+
+ auto iter = tags.find(rgw_sync_pipe_filter_tag(k, v));
+ return (iter != tags.end());
+}
+
+bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& _tags) const
+{
+ if (tags.empty()) {
+ return true;
+ }
+
+ for (auto& t : _tags) {
+ if (check_tag(t)) {
return true;
}
}
return false;
}
-bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& tags) const
+bool rgw_sync_pipe_filter::check_tags(const RGWObjTags::tag_map_t& _tags) const
{
- for (auto& t : tags) {
- if (check_tag(t)) {
+ if (tags.empty()) {
+ return true;
+ }
+
+ for (auto& item : _tags) {
+ if (check_tag(item.first, item.second)) {
return true;
}
}
diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h
index 07623583a3b..700025486fd 100644
--- a/src/rgw/rgw_sync_policy.h
+++ b/src/rgw/rgw_sync_policy.h
@@ -180,6 +180,14 @@ struct rgw_sync_pipe_filter_tag {
string key;
string value;
+ rgw_sync_pipe_filter_tag() {}
+ rgw_sync_pipe_filter_tag(const string& s) {
+ from_str(s);
+ }
+ rgw_sync_pipe_filter_tag(const string& _key,
+ const string& _value) : key(_key),
+ value(_value) {}
+
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(key, bl);
@@ -231,7 +239,9 @@ struct rgw_sync_pipe_filter {
bool is_subset_of(const rgw_sync_pipe_filter& f) const;
bool check_tag(const string& s) const;
+ bool check_tag(const string& k, const string& v) const;
bool check_tags(const std::vector<string>& tags) const;
+ bool check_tags(const RGWObjTags::tag_map_t& tags) const;
};
WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)