diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2019-11-19 19:59:59 +0100 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2020-01-28 19:20:39 +0100 |
commit | da6e8632fa4089308834b9b87e07d2236da5f1a2 (patch) | |
tree | 3fa5be4a8611e555978d69f8e0e2b1c7c7c213b3 /src | |
parent | rgw: find pipe rules param for specific obj changes (diff) | |
download | ceph-da6e8632fa4089308834b9b87e07d2236da5f1a2.tar.xz ceph-da6e8632fa4089308834b9b87e07d2236da5f1a2.zip |
rgw: filter fetch remote obj after reading remote metadata
In the sync case: find the appropriate pipe params that match this
remote object (if matches tags / prefix), and adjust fetch if needed --
abort if doesn't match tags. Later will adjust acls as needed (if user
sync and not system).
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/rgw/rgw_cr_rados.cc | 1 | ||||
-rw-r--r-- | src/rgw/rgw_cr_rados.h | 21 | ||||
-rw-r--r-- | src/rgw/rgw_data_sync.cc | 75 | ||||
-rw-r--r-- | src/rgw/rgw_putobj_processor.h | 3 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 62 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 27 | ||||
-rw-r--r-- | src/rgw/rgw_sync_policy.cc | 34 | ||||
-rw-r--r-- | src/rgw/rgw_sync_policy.h | 10 |
8 files changed, 203 insertions, 30 deletions
diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index b34337ff431..5ad6bf287ff 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -621,6 +621,7 @@ int RGWAsyncFetchRemoteObj::_send_request() NULL, /* void (*progress_cb)(off_t, void *), */ NULL, /* void *progress_data*); */ dpp, + filter.get(), &zones_trace, &bytes_transferred); diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 5b689e6d493..f5c8f67b2b8 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -931,6 +931,7 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { real_time src_mtime; bool copy_if_newer; + std::shared_ptr<RGWFetchObjFilter> filter; rgw_zone_set zones_trace; PerfCounters* counters; const DoutPrefixProvider *dpp; @@ -946,7 +947,9 @@ public: const rgw_obj_key& _key, const std::optional<rgw_obj_key>& _dest_key, std::optional<uint64_t> _versioned_epoch, - bool _if_newer, rgw_zone_set *_zones_trace, + bool _if_newer, + std::shared_ptr<RGWFetchObjFilter> _filter, + rgw_zone_set *_zones_trace, PerfCounters* counters, const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), @@ -956,7 +959,9 @@ public: key(_key), dest_key(_dest_key), versioned_epoch(_versioned_epoch), - copy_if_newer(_if_newer), counters(counters), + copy_if_newer(_if_newer), + filter(_filter), + counters(counters), dpp(dpp) { if (_zones_trace) { @@ -983,6 +988,8 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { bool copy_if_newer; + std::shared_ptr<RGWFetchObjFilter> filter; + RGWAsyncFetchRemoteObj *req; rgw_zone_set *zones_trace; PerfCounters* counters; @@ -997,7 +1004,9 @@ public: const rgw_obj_key& _key, const std::optional<rgw_obj_key>& _dest_key, std::optional<uint64_t> _versioned_epoch, - bool _if_newer, rgw_zone_set *_zones_trace, + bool _if_newer, + std::shared_ptr<RGWFetchObjFilter> _filter, + rgw_zone_set *_zones_trace, PerfCounters* counters, const DoutPrefixProvider *dpp) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), @@ -1008,7 +1017,9 @@ public: key(_key), dest_key(_dest_key), versioned_epoch(_versioned_epoch), - copy_if_newer(_if_newer), req(NULL), + copy_if_newer(_if_newer), + filter(_filter), + req(NULL), zones_trace(_zones_trace), counters(counters), dpp(dpp) {} @@ -1026,7 +1037,7 @@ public: int send_request() override { req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, src_bucket, dest_placement_rule, dest_bucket_info, - key, dest_key, versioned_epoch, copy_if_newer, + key, dest_key, versioned_epoch, copy_if_newer, filter, zones_trace, counters, dpp); async_rados->queue(req); return 0; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e0665db8811..b1a6aeca3e3 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1971,13 +1971,78 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl return 0; } +class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default { + rgw_bucket_sync_pipe sync_pipe; + +public: + RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) { + } + + int filter(CephContext *cct, + const rgw_obj_key& source_key, + const RGWBucketInfo& dest_bucket_info, + std::optional<rgw_placement_rule> dest_placement_rule, + const map<string, bufferlist>& obj_attrs, + const rgw_placement_rule **prule) override; +}; + +int RGWFetchObjFilter_Sync::filter(CephContext *cct, + const rgw_obj_key& source_key, + const RGWBucketInfo& dest_bucket_info, + std::optional<rgw_placement_rule> dest_placement_rule, + const map<string, bufferlist>& obj_attrs, + const rgw_placement_rule **prule) +{ + rgw_sync_pipe_params params; + + RGWObjTags obj_tags; + + auto iter = obj_attrs.find(RGW_ATTR_TAGS); + if (iter != obj_attrs.end()) { + try{ + auto it = iter->second.cbegin(); + obj_tags.decode(it); + } catch (buffer::error &err) { + ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl; + } + } + + if (!sync_pipe.info.handler.find_obj_params(source_key, + obj_tags.get_tags(), + ¶ms)) { + return -ERR_PRECONDITION_FAILED; + } + + if (!dest_placement_rule && + params.dest.storage_class) { + dest_rule.storage_class = *params.dest.storage_class; + dest_rule.inherit_from(dest_bucket_info.placement_rule); + dest_placement_rule = dest_rule; + *prule = &dest_rule; + } + + return RGWFetchObjFilter_Default::filter(cct, + source_key, + dest_bucket_info, + dest_placement_rule, + obj_attrs, + prule); +} + + RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) { auto sync_env = sc->env; - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket, + + auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe); + + return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, + sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, std::nullopt, versioned_epoch, - true, zones_trace, sync_env->counters, sync_env->dpp); + true, + std::static_pointer_cast<RGWFetchObjFilter>(filter), + zones_trace, sync_env->counters, sync_env->dpp); } RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, @@ -2054,10 +2119,14 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck } } + auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe); + return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, dest_key, versioned_epoch, - true, zones_trace, nullptr, sync_env->dpp); + true, + std::static_pointer_cast<RGWFetchObjFilter>(filter), + zones_trace, nullptr, sync_env->dpp); } RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h index 499b6b09776..76ea90e3e5f 100644 --- a/src/rgw/rgw_putobj_processor.h +++ b/src/rgw/rgw_putobj_processor.h @@ -154,6 +154,9 @@ class ManifestObjectProcessor : public HeadObjectProcessor, } } + void set_tail_placement(const rgw_placement_rule& tpr) { + tail_placement_rule = tpr; + } void set_tail_placement(const rgw_placement_rule&& tpr) { tail_placement_rule = tpr; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index c68c9addb5b..b211cf4033b 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3616,6 +3616,28 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx, return 0; } +int RGWFetchObjFilter_Default::filter(CephContext *cct, + const rgw_obj_key& source_key, + const RGWBucketInfo& dest_bucket_info, + std::optional<rgw_placement_rule> dest_placement_rule, + const map<string, bufferlist>& obj_attrs, + const rgw_placement_rule **prule) +{ + const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr); + if (!ptail_rule) { + auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS); + if (iter != obj_attrs.end()) { + dest_rule.storage_class = iter->second.to_str(); + dest_rule.inherit_from(dest_bucket_info.placement_rule); + ptail_rule = &dest_rule; + } else { + ptail_rule = &dest_bucket_info.placement_rule; + } + } + *prule = ptail_rule; + return 0; +} + int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, const rgw_user& user_id, req_info *info, @@ -3643,6 +3665,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, void (*progress_cb)(off_t, void *), void *progress_data, const DoutPrefixProvider *dpp, + RGWFetchObjFilter *filter, rgw_zone_set *zones_trace, std::optional<uint64_t>* bytes_transferred) { @@ -3658,8 +3681,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr); - AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, ptail_rule, user_id, + AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, nullptr, user_id, obj_ctx, dest_obj, olh_epoch, tag, dpp, null_yield); RGWRESTConn *conn; auto& zone_conn_map = svc.zone->get_zone_conn_map(); @@ -3685,25 +3707,30 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, conn = iter->second; } - string obj_name = dest_obj.bucket.name + "/" + dest_obj.get_oid(); - boost::optional<RGWPutObj_Compress> compressor; CompressorRef plugin; - rgw_placement_rule dest_rule; + RGWFetchObjFilter_Default source_filter; + if (!filter) { + filter = &source_filter; + } + RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data, [&](const map<string, bufferlist>& obj_attrs) { - if (!ptail_rule) { - auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS); - if (iter != obj_attrs.end()) { - dest_rule.storage_class = iter->second.to_str(); - dest_rule.inherit_from(dest_bucket_info.placement_rule); - processor.set_tail_placement(std::move(dest_rule)); - ptail_rule = &dest_rule; - } else { - ptail_rule = &dest_bucket_info.placement_rule; - } + const rgw_placement_rule *ptail_rule; + int ret = filter->filter(cct, + src_obj.key, + dest_bucket_info, + dest_placement_rule, + obj_attrs, + &ptail_rule); + if (ret < 0) { + ldout(cct, 5) << "Aborting fetch: source object filter returned ret=" << ret << dendl; + return ret; } + + processor.set_tail_placement(*ptail_rule); + const auto& compression_type = svc.zone->get_zone_params().get_compression_type(*ptail_rule); if (compression_type != "none") { plugin = Compressor::create(cct, compression_type); @@ -3713,7 +3740,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, } } - int ret = processor.prepare(null_yield); + ret = processor.prepare(null_yield); if (ret < 0) { return ret; } @@ -3992,7 +4019,8 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx, dest_placement, src_mtime, mtime, mod_ptr, unmod_ptr, high_precision_time, if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category, - olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp); + olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp, + nullptr /* filter */); } map<string, bufferlist> src_attrs; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index e730a2f45ab..e9451ea1909 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -202,6 +202,32 @@ struct RGWObjState { } }; +class RGWFetchObjFilter { +public: + virtual ~RGWFetchObjFilter() {} + + virtual int filter(CephContext *cct, + const rgw_obj_key& source_key, + const RGWBucketInfo& dest_bucket_info, + std::optional<rgw_placement_rule> dest_placement_rule, + const map<string, bufferlist>& obj_attrs, + const rgw_placement_rule **prule) = 0; +}; + +class RGWFetchObjFilter_Default : public RGWFetchObjFilter { +protected: + rgw_placement_rule dest_rule; +public: + RGWFetchObjFilter_Default() {} + + int filter(CephContext *cct, + const rgw_obj_key& source_key, + const RGWBucketInfo& dest_bucket_info, + std::optional<rgw_placement_rule> dest_placement_rule, + const map<string, bufferlist>& obj_attrs, + const rgw_placement_rule **prule) override; +}; + class RGWObjectCtx { rgw::sal::RGWRadosStore *store; ceph::shared_mutex lock = ceph::make_shared_mutex("RGWObjectCtx"); @@ -1101,6 +1127,7 @@ public: void (*progress_cb)(off_t, void *), void *progress_data, const DoutPrefixProvider *dpp, + RGWFetchObjFilter *filter, rgw_zone_set *zones_trace= nullptr, std::optional<uint64_t>* bytes_transferred = 0); /** diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index 01a1fb6ccde..9047b4b1862 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -121,18 +121,42 @@ bool rgw_sync_pipe_filter::check_tag(const string& s) const return true; } - for (auto& t : tags) { - if (t == s) { + auto iter = tags.find(rgw_sync_pipe_filter_tag(s)); + return (iter != tags.end()); +} + +bool rgw_sync_pipe_filter::check_tag(const string& k, const string& v) const +{ + if (tags.empty()) { /* tag filter wasn't defined */ + return true; + } + + auto iter = tags.find(rgw_sync_pipe_filter_tag(k, v)); + return (iter != tags.end()); +} + +bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& _tags) const +{ + if (tags.empty()) { + return true; + } + + for (auto& t : _tags) { + if (check_tag(t)) { return true; } } return false; } -bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& tags) const +bool rgw_sync_pipe_filter::check_tags(const RGWObjTags::tag_map_t& _tags) const { - for (auto& t : tags) { - if (check_tag(t)) { + if (tags.empty()) { + return true; + } + + for (auto& item : _tags) { + if (check_tag(item.first, item.second)) { return true; } } diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 07623583a3b..700025486fd 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -180,6 +180,14 @@ struct rgw_sync_pipe_filter_tag { string key; string value; + rgw_sync_pipe_filter_tag() {} + rgw_sync_pipe_filter_tag(const string& s) { + from_str(s); + } + rgw_sync_pipe_filter_tag(const string& _key, + const string& _value) : key(_key), + value(_value) {} + void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(key, bl); @@ -231,7 +239,9 @@ struct rgw_sync_pipe_filter { bool is_subset_of(const rgw_sync_pipe_filter& f) const; bool check_tag(const string& s) const; + bool check_tag(const string& k, const string& v) const; bool check_tags(const std::vector<string>& tags) const; + bool check_tags(const RGWObjTags::tag_map_t& tags) const; }; WRITE_CLASS_ENCODER(rgw_sync_pipe_filter) |