diff options
-rw-r--r-- | doc/radosgw/notifications.rst | 6 | ||||
-rw-r--r-- | doc/radosgw/pubsub-module.rst | 5 | ||||
-rw-r--r-- | doc/radosgw/s3-notification-compatibility.rst | 2 | ||||
-rw-r--r-- | doc/radosgw/s3/bucketops.rst | 33 | ||||
-rw-r--r-- | examples/boto3/README.md | 1 | ||||
-rwxr-xr-x | examples/boto3/notification_filters.py | 4 | ||||
-rw-r--r-- | examples/boto3/service-2.sdk-extras.json | 17 | ||||
-rw-r--r-- | src/common/ceph_json.h | 36 | ||||
-rw-r--r-- | src/rgw/rgw_auth_s3.cc | 10 | ||||
-rw-r--r-- | src/rgw/rgw_auth_s3.h | 4 | ||||
-rw-r--r-- | src/rgw/rgw_common.cc | 2 | ||||
-rw-r--r-- | src/rgw/rgw_common.h | 5 | ||||
-rw-r--r-- | src/rgw/rgw_loadgen.cc | 4 | ||||
-rw-r--r-- | src/rgw/rgw_notify.cc | 5 | ||||
-rw-r--r-- | src/rgw/rgw_op.cc | 83 | ||||
-rw-r--r-- | src/rgw/rgw_pubsub.cc | 30 | ||||
-rw-r--r-- | src/rgw/rgw_pubsub.h | 51 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.cc | 6 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/tests_ps.py | 145 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/zone_ps.py | 20 |
20 files changed, 380 insertions, 89 deletions
diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 43bcced787c..ced68e0d935 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -18,7 +18,7 @@ user can only manage its own topics, and can only associate them with buckets it In order to send notifications for events for a specific bucket, a notification entity needs to be created. A notification can be created on a subset of event types, or for all event types (default). The notification may also filter out events based on prefix/suffix and/or regular expression matching of the keys. As well as, -on the metadata attributes attached to the object. +on the metadata attributes attached to the object, or the object tags. There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications. REST API has been defined to provide configuration and control interfaces for the bucket notification @@ -283,7 +283,8 @@ pushed or pulled using the pubsub sync module. "eTag":"", "versionId":"", "sequencer": "", - "metadata":[] + "metadata":[], + "tags":[] } }, "eventId":"", @@ -308,6 +309,7 @@ pushed or pulled using the pubsub sync module. - s3.object.version: object version in case of versioned bucket - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format) - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) +- s3.object.tags: any tags set on the objcet (an extension to the S3 notification API) - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) .. _PubSub Module : ../pubsub-module diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index a727ad72bcc..eb4158bab52 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -270,6 +270,7 @@ Detailed under: `Bucket Operations`_. the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access), and will have to be deleted explicitly with the subscription deletion API - Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored + - Filtering based on tags (which is an extension to S3) is not supported, and such rules will be ignored Non S3-Compliant Notifications @@ -496,7 +497,8 @@ the events will have an S3-compatible record format (JSON): "eTag":"", "versionId":"", "sequencer":"", - "metadata":[] + "metadata":[], + "tags":[] } }, "eventId":"", @@ -520,6 +522,7 @@ the events will have an S3-compatible record format (JSON): - s3.object.version: object version in case of versioned bucket - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format) - s3.object.metadata: not supported (an extension to the S3 notification API) +- s3.object.tags: not supported (an extension to the S3 notification API) - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) In case that the subscription was not created via a non S3-compatible notification, diff --git a/doc/radosgw/s3-notification-compatibility.rst b/doc/radosgw/s3-notification-compatibility.rst index 6cc6ac0283f..ffbceca64d4 100644 --- a/doc/radosgw/s3-notification-compatibility.rst +++ b/doc/radosgw/s3-notification-compatibility.rst @@ -49,6 +49,8 @@ Ceph's bucket notification API has the following extensions: - Filtering based on metadata attributes attached to the object + - Filtering based on object tags + - Filtering overlapping is allowed, so that same event could be sent as different notification diff --git a/doc/radosgw/s3/bucketops.rst b/doc/radosgw/s3/bucketops.rst index d520e379b37..378eb5f044a 100644 --- a/doc/radosgw/s3/bucketops.rst +++ b/doc/radosgw/s3/bucketops.rst @@ -514,7 +514,13 @@ Parameters are XML encoded in the body of the request, in the following format: <Name></Name> <Value></Value> </FilterRule> - </s3Metadata> + </S3Metadata> + <S3Tags> + <FilterRule> + <Name></Name> + <Value></Value> + </FilterRule> + </S3Tags> </Filter> </TopicConfiguration> </NotificationConfiguration> @@ -533,15 +539,19 @@ Parameters are XML encoded in the body of the request, in the following format: | ``Event`` | String | List of supported events see: `S3 Notification Compatibility`_. Multiple ``Event`` | No | | | | entities can be used. If omitted, all events are handled | | +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ -| ``Filter`` | Container | Holding ``S3Key`` and ``S3Metadata`` entities | No | +| ``Filter`` | Container | Holding ``S3Key``, ``S3Metadata`` and ``S3Tags`` entities | No | +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ | ``S3Key`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object key. | No | | | | At most, 3 entities may be in the list, with ``Name`` be ``prefix``, ``suffix`` or | | | | | ``regex``. All filter rules in the list must match for the filter to match. | | +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ | ``S3Metadata`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object metadata. | No | -| | | All filter rules in the list must match the ones defined on the object. The object, | | -| | | have other metadata entitied not listed in the filter. | | +| | | All filter rules in the list must match the metadata defined on the object. However, | | +| | | the object still match if it has other metadata entries not listed in the filter. | | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``S3Tags`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object tags. | No | +| | | All filter rules in the list must match the tags defined on the object. However, | | +| | | the object still match it it has other tags not listed in the filter. | | +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ | ``S3Key.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be: ``prefix``, ``suffix`` | Yes | | | | or ``regex``. The ``Value`` would hold the key prefix, key suffix or a regular | | @@ -549,7 +559,10 @@ Parameters are XML encoded in the body of the request, in the following format: +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ | ``S3Metadata.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the name of the metadata | Yes | | | | attribute (e.g. ``x-amz-meta-xxx``). The ``Value`` would be the expected value for | | -| | | this attribute | | +| | | this attribute. | | ++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ +| ``S3Tags.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the tag key, | Yes | +| | | and ``Value`` would be the tag value. | | +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+ @@ -652,7 +665,13 @@ Response is XML encoded in the body of the request, in the following format: <Name></Name> <Value></Value> </FilterRule> - </s3Metadata> + </S3Metadata> + <S3Tags> + <FilterRule> + <Name></Name> + <Value></Value> + </FilterRule> + </S3Tags> </Filter> </TopicConfiguration> </NotificationConfiguration> @@ -684,4 +703,4 @@ HTTP Response | ``404`` | NoSuchKey | The notification does not exist (if provided) | +---------------+-----------------------+----------------------------------------------------------+ -.. _S3 Notification Compatibility: ../s3-notification-compatibility +.. _S3 Notification Compatibility: ../../s3-notification-compatibility diff --git a/examples/boto3/README.md b/examples/boto3/README.md index 2c285261d31..2abbd2812fe 100644 --- a/examples/boto3/README.md +++ b/examples/boto3/README.md @@ -1,5 +1,6 @@ # Introduction This directory contains examples on how to use AWS CLI/boto3 to exercise the RadosGW extensions to the S3 API. +This is an extension to the [AWS SDK](https://github.com/boto/botocore/blob/develop/botocore/data/s3/2006-03-01/service-2.json). # Users For the standard client to support these extensions, the: ``service-2.sdk-extras.json`` file should be placed under: ``~/.aws/models/s3/2006-03-01/`` directory. diff --git a/examples/boto3/notification_filters.py b/examples/boto3/notification_filters.py index c9fd27e5a15..a45393c74f9 100755 --- a/examples/boto3/notification_filters.py +++ b/examples/boto3/notification_filters.py @@ -35,6 +35,10 @@ topic_conf_list = [{'Id': notification_id, 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'}, {'Name': 'x-amz-meta-hello', 'Value': 'world'}] }, + 'Tags': { + 'FilterRules': [{'Name': 'foo', 'Value': 'bar'}, + {'Name': 'hello', 'Value': 'world'}] + }, 'Key': { 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}] } diff --git a/examples/boto3/service-2.sdk-extras.json b/examples/boto3/service-2.sdk-extras.json index 671f802a498..f69912c0bbe 100644 --- a/examples/boto3/service-2.sdk-extras.json +++ b/examples/boto3/service-2.sdk-extras.json @@ -96,7 +96,13 @@ "shape":"S3MetadataFilter", "documentation":"<p/>", "locationName":"S3Metadata" + }, + "Tags":{ + "shape":"S3TagsFilter", + "documentation":"<p/>", + "locationName":"S3Tags" } + }, "documentation":"<p>Specifies object key name filtering rules. For information about key name filtering, see <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html\">Configuring Event Notifications</a> in the <i>Amazon Simple Storage Service Developer Guide</i>.</p>" }, @@ -121,6 +127,17 @@ } }, "documentation":"<p>A container for metadata filtering rules.</p>" + }, + "S3TagsFilter":{ + "type":"structure", + "members":{ + "FilterRules":{ + "shape":"FilterRuleList", + "documentation":"<p/>", + "locationName":"FilterRule" + } + }, + "documentation":"<p>A container for object tags filtering rules.</p>" } }, "documentation":"<p/>" diff --git a/src/common/ceph_json.h b/src/common/ceph_json.h index f7d0e11b8f4..05615c6dc71 100644 --- a/src/common/ceph_json.h +++ b/src/common/ceph_json.h @@ -3,7 +3,7 @@ #include <stdexcept> #include <include/types.h> - +#include <boost/container/flat_map.hpp> #include "json_spirit/json_spirit.h" @@ -259,6 +259,22 @@ void decode_json_obj(std::multimap<K, V>& m, JSONObj *obj) } } +template<class K, class V> +void decode_json_obj(boost::container::flat_map<K, V>& m, JSONObj *obj) +{ + m.clear(); + + JSONObjIter iter = obj->find_first(); + + for (; !iter.end(); ++iter) { + K key; + V val; + JSONObj *o = *iter; + JSONDecoder::decode_json("key", key, o); + JSONDecoder::decode_json("val", val, o); + m[key] = val; + } +} template<class C> void decode_json_obj(C& container, void (*cb)(C&, JSONObj *obj), JSONObj *obj) { @@ -397,6 +413,7 @@ static void encode_json(const char *name, const std::list<T>& l, ceph::Formatter } f->close_section(); } + template<class T> static void encode_json(const char *name, const std::deque<T>& l, ceph::Formatter *f) { @@ -406,6 +423,7 @@ static void encode_json(const char *name, const std::deque<T>& l, ceph::Formatte } f->close_section(); } + template<class T, class Compare = std::less<T> > static void encode_json(const char *name, const std::set<T, Compare>& l, ceph::Formatter *f) { @@ -426,7 +444,7 @@ static void encode_json(const char *name, const std::vector<T>& l, ceph::Formatt f->close_section(); } -template<class K, class V, class C = std::less<K> > +template<class K, class V, class C = std::less<K>> static void encode_json(const char *name, const std::map<K, V, C>& m, ceph::Formatter *f) { f->open_array_section(name); @@ -451,6 +469,20 @@ static void encode_json(const char *name, const std::multimap<K, V>& m, ceph::Fo } f->close_section(); } + +template<class K, class V> +static void encode_json(const char *name, const boost::container::flat_map<K, V>& m, ceph::Formatter *f) +{ + f->open_array_section(name); + for (auto i = m.begin(); i != m.end(); ++i) { + f->open_object_section("entry"); + encode_json("key", i->first, f); + encode_json("val", i->second, f); + f->close_section(); + } + f->close_section(); +} + template<class K, class V> void encode_json_map(const char *name, const std::map<K, V>& m, ceph::Formatter *f) { diff --git a/src/rgw/rgw_auth_s3.cc b/src/rgw/rgw_auth_s3.cc index 1f8050da720..93e46bc4621 100644 --- a/src/rgw/rgw_auth_s3.cc +++ b/src/rgw/rgw_auth_s3.cc @@ -56,7 +56,7 @@ static const auto signed_subresources = { */ static std::string -get_canon_amz_hdr(const std::map<std::string, std::string>& meta_map) +get_canon_amz_hdr(const meta_map_t& meta_map) { std::string dest; @@ -117,8 +117,8 @@ void rgw_create_s3_canonical_header( const char* const content_md5, const char* const content_type, const char* const date, - const std::map<std::string, std::string>& meta_map, - const std::map<std::string, std::string>& qs_map, + const meta_map_t& meta_map, + const meta_map_t& qs_map, const char* const request_uri, const std::map<std::string, std::string>& sub_resources, std::string& dest_str) @@ -157,7 +157,7 @@ static inline bool is_base64_for_content_md5(unsigned char c) { } static inline void get_v2_qs_map(const req_info& info, - std::map<std::string, std::string>& qs_map) { + meta_map_t& qs_map) { const auto& params = const_cast<RGWHTTPArgs&>(info.args).get_params(); for (const auto& elt : params) { std::string k = boost::algorithm::to_lower_copy(elt.first); @@ -190,7 +190,7 @@ bool rgw_create_s3_canonical_header(const req_info& info, const char *content_type = info.env->get("CONTENT_TYPE"); std::string date; - std::map<std::string, std::string> qs_map; + meta_map_t qs_map; if (qsr) { get_v2_qs_map(info, qs_map); // handle qs metadata diff --git a/src/rgw/rgw_auth_s3.h b/src/rgw/rgw_auth_s3.h index 67a01b4e9c8..9e79ee16213 100644 --- a/src/rgw/rgw_auth_s3.h +++ b/src/rgw/rgw_auth_s3.h @@ -421,8 +421,8 @@ void rgw_create_s3_canonical_header( const char *content_md5, const char *content_type, const char *date, - const std::map<std::string, std::string>& meta_map, - const std::map<std::string, std::string>& qs_map, + const meta_map_t& meta_map, + const meta_map_t& qs_map, const char *request_uri, const std::map<std::string, std::string>& sub_resources, std::string& dest_str); diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index 6e394bb0151..65421d6e130 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -435,7 +435,7 @@ std::ostream& operator<<(std::ostream& oss, const rgw_err &err) } void rgw_add_amz_meta_header( - std::map<std::string, std::string>& x_meta_map, + meta_map_t& x_meta_map, const std::string& k, const std::string& v) { diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index a8ea073512a..6af3de23af6 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -1648,11 +1648,12 @@ namespace rgw { } } +using meta_map_t = boost::container::flat_map <std::string, std::string>; struct req_info { const RGWEnv *env; RGWHTTPArgs args; - map<string, string> x_meta_map; + meta_map_t x_meta_map; string host; const char *method; @@ -2456,7 +2457,7 @@ static inline uint64_t rgw_rounded_objsize_kb(uint64_t bytes) /* implement combining step, S3 header canonicalization; k is a * valid header and in lc form */ void rgw_add_amz_meta_header( - std::map<std::string, std::string>& x_meta_map, + meta_map_t& x_meta_map, const std::string& k, const std::string& v); diff --git a/src/rgw/rgw_loadgen.cc b/src/rgw/rgw_loadgen.cc index 4085f739707..a8e6fd7b2bf 100644 --- a/src/rgw/rgw_loadgen.cc +++ b/src/rgw/rgw_loadgen.cc @@ -18,7 +18,7 @@ void RGWLoadGenRequestEnv::set_date(utime_t& tm) int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key) { - map<string, string> meta_map; + meta_map_t meta_map; map<string, string> sub_resources; string canonical_header; @@ -29,7 +29,7 @@ int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key) content_type.c_str(), date_str.c_str(), meta_map, - map<string, string>{}, + meta_map_t{}, uri.c_str(), sub_resources, canonical_header); diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 11ca0256462..0c41b679a11 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -40,6 +40,8 @@ void populate_record_from_request(const req_state *s, record.bucket_id = s->bucket.bucket_id; // pass meta data record.x_meta_map = s->info.x_meta_map; + // pass tags + record.tags = s->tagset.get_tags(); } bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) { @@ -52,6 +54,9 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) { return false; } + if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) { + return false; + } return true; } diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 69faf288eb4..19ab3cc537a 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -980,6 +980,26 @@ int RGWGetObj::verify_permission() return 0; } +// cache the objects tags into the requests +// use inside try/catch as "decode()" may throw +void populate_tags_in_request(req_state* s, const std::map<std::string, bufferlist>& attrs) { + const auto attr_iter = attrs.find(RGW_ATTR_TAGS); + if (attr_iter != attrs.end()) { + auto bliter = attr_iter->second.cbegin(); + decode(s->tagset, bliter); + } +} + +// cache the objects metadata into the request +void populate_metadata_in_request(req_state* s, std::map<std::string, bufferlist>& attrs) { + for (auto& attr : attrs) { + if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) { + std::string_view key(attr.first); + key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1); + s->info.x_meta_map.emplace(key, attr.second.c_str()); + } + } +} int RGWOp::verify_op_mask() { @@ -4819,6 +4839,15 @@ void RGWDeleteObj::execute() if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) { op_ret = 0; } + + // cache the objects tags and metadata into the requests + // so it could be used in the notification mechanism + try { + populate_tags_in_request(s, attrs); + } catch (buffer::error& err) { + ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl; + } + populate_metadata_in_request(s, attrs); } else { op_ret = -EINVAL; } @@ -5110,33 +5139,33 @@ void RGWCopyObj::execute() } op_ret = store->getRados()->copy_obj(obj_ctx, - s->user->user_id, - &s->info, - source_zone, - dst_obj, - src_obj, - dest_bucket_info, - src_bucket_info, - s->dest_placement, - &src_mtime, - &mtime, - mod_ptr, - unmod_ptr, - high_precision_time, - if_match, - if_nomatch, - attrs_mod, - copy_if_newer, - attrs, RGWObjCategory::Main, - olh_epoch, - (delete_at ? *delete_at : real_time()), - (version_id.empty() ? NULL : &version_id), - &s->req_id, /* use req_id as tag */ - &etag, - copy_obj_progress_cb, (void *)this, - this, - s->yield); - + s->user->user_id, + &s->info, + source_zone, + dst_obj, + src_obj, + dest_bucket_info, + src_bucket_info, + s->dest_placement, + &src_mtime, + &mtime, + mod_ptr, + unmod_ptr, + high_precision_time, + if_match, + if_nomatch, + attrs_mod, + copy_if_newer, + attrs, RGWObjCategory::Main, + olh_epoch, + (delete_at ? *delete_at : real_time()), + (version_id.empty() ? NULL : &version_id), + &s->req_id, /* use req_id as tag */ + &etag, + copy_obj_progress_cb, (void *)this, + this, + s->yield); + const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 4fac3ae0727..3236d525f49 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -76,8 +76,8 @@ bool rgw_s3_key_filter::has_content() const { return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); } -bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) { - metadata.clear(); +bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) { + kvl.clear(); XMLObjIter iter = obj->find("FilterRule"); XMLObj *o; @@ -89,13 +89,13 @@ bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) { while ((o = iter.get_next())) { RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing); RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing); - metadata.emplace(key, value); + kvl.emplace(key, value); } return true; } -void rgw_s3_metadata_filter::dump_xml(Formatter *f) const { - for (const auto& key_value : metadata) { +void rgw_s3_key_value_filter::dump_xml(Formatter *f) const { + for (const auto& key_value : kvl) { f->open_object_section("FilterRule"); ::encode_xml("Name", key_value.first, f); ::encode_xml("Value", key_value.second, f); @@ -103,13 +103,14 @@ void rgw_s3_metadata_filter::dump_xml(Formatter *f) const { } } -bool rgw_s3_metadata_filter::has_content() const { - return !metadata.empty(); +bool rgw_s3_key_value_filter::has_content() const { + return !kvl.empty(); } bool rgw_s3_filter::decode_xml(XMLObj* obj) { RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); + RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj); return true; } @@ -120,11 +121,15 @@ void rgw_s3_filter::dump_xml(Formatter *f) const { if (metadata_filter.has_content()) { ::encode_xml("S3Metadata", metadata_filter, f); } + if (tag_filter.has_content()) { + ::encode_xml("S3Tags", tag_filter, f); + } } bool rgw_s3_filter::has_content() const { return key_filter.has_content() || - metadata_filter.has_content(); + metadata_filter.has_content() || + tag_filter.has_content(); } bool match(const rgw_s3_key_filter& filter, const std::string& key) { @@ -161,10 +166,10 @@ bool match(const rgw_s3_key_filter& filter, const std::string& key) { return true; } -bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) { - // all filter pairs must exist with the same value in the object's metadata - // object metadata may include items not in the filter - return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end()); +bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl) { + // all filter pairs must exist with the same value in the object's metadata/tags + // object metadata/tags may include items not in the filter + return std::includes(kvl.begin(), kvl.end(), filter.kvl.begin(), filter.kvl.end()); } bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) { @@ -273,6 +278,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const { encode_json("versionId", object_versionId, f); encode_json("sequencer", object_sequencer, f); encode_json("metadata", x_meta_map, f); + encode_json("tags", tags, f); } } encode_json("eventId", id, f); diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index ee975700956..7c74701c90f 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -10,6 +10,7 @@ #include "rgw_zone.h" #include "rgw_rados.h" #include "rgw_notify_event_type.h" +#include <boost/container/flat_map.hpp> class XMLObj; @@ -41,10 +42,10 @@ struct rgw_s3_key_filter { }; WRITE_CLASS_ENCODER(rgw_s3_key_filter) -using Metadata = std::map<std::string, std::string>; +using KeyValueList = boost::container::flat_map<std::string, std::string>; -struct rgw_s3_metadata_filter { - Metadata metadata; +struct rgw_s3_key_value_filter { + KeyValueList kvl; bool has_content() const; @@ -53,20 +54,21 @@ struct rgw_s3_metadata_filter { void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(metadata, bl); + encode(kvl, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(metadata, bl); + decode(kvl, bl); DECODE_FINISH(bl); } }; -WRITE_CLASS_ENCODER(rgw_s3_metadata_filter) +WRITE_CLASS_ENCODER(rgw_s3_key_value_filter) struct rgw_s3_filter { rgw_s3_key_filter key_filter; - rgw_s3_metadata_filter metadata_filter; + rgw_s3_key_value_filter metadata_filter; + rgw_s3_key_value_filter tag_filter; bool has_content() const; @@ -74,16 +76,20 @@ struct rgw_s3_filter { void dump_xml(Formatter *f) const; void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(key_filter, bl); encode(metadata_filter, bl); + encode(tag_filter, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(key_filter, bl); decode(metadata_filter, bl); + if (struct_v >= 2) { + decode(tag_filter, bl); + } DECODE_FINISH(bl); } }; @@ -109,6 +115,12 @@ class rgw_pubsub_topic_filter; <Value></Value> </FilterRule> </S3Metadata> + <S3Tags> + <FilterRule> + <Name></Name> + <Value></Value> + </FilterRule> + </S3Tags> </Filter> <Id>notification1</Id> <Topic>arn:aws:sns:<region>:<account>:<topic></Topic> @@ -132,13 +144,13 @@ struct rgw_pubsub_s3_notification { rgw_pubsub_s3_notification() = default; // construct from rgw_pubsub_topic_filter (used by get/list notifications) - rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter); + explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter); }; // return true if the key matches the prefix/suffix/regex rules of the key filter bool match(const rgw_s3_key_filter& filter, const std::string& key); -// return true if the key matches the metadata rules of the metadata filter -bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata); +// return true if the key matches the metadata/tags rules of the metadata/tags filter +bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl); // return true if the event type matches (equal or contained in) one of the events in the list bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event); @@ -186,6 +198,7 @@ struct rgw_pubsub_s3_notifications { "versionId":"", "sequencer": "", "metadata": "" + "tags": "" } }, "eventId":"", @@ -238,10 +251,12 @@ struct rgw_pubsub_s3_record { // this is an rgw extension holding the internal bucket id std::string bucket_id; // meta data - std::map<std::string, std::string> x_meta_map; + KeyValueList x_meta_map; + // tags + KeyValueList tags; void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); encode(eventVersion, bl); encode(eventSource, bl); encode(awsRegion, bl); @@ -264,11 +279,12 @@ struct rgw_pubsub_s3_record { encode(id, bl); encode(bucket_id, bl); encode(x_meta_map, bl); + encode(tags, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(2, bl); + DECODE_START(3, bl); decode(eventVersion, bl); decode(eventSource, bl); decode(awsRegion, bl); @@ -293,6 +309,9 @@ struct rgw_pubsub_s3_record { decode(bucket_id, bl); decode(x_meta_map, bl); } + if (struct_v >= 3) { + decode(tags, bl); + } DECODE_FINISH(bl); } @@ -609,7 +628,7 @@ public: // read the list of topics associated with a bucket and populate into result // return 0 on success or if no topic was associated with the bucket, error code otherwise int get_topics(rgw_pubsub_bucket_topics *result); - // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket + // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket // assigning a notification name is optional (needed for S3 compatible notifications) // if the topic already exist on the bucket, the filter event list may be updated // for S3 compliant notifications the version with: s3_filter and notif_name should be used diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index de34de6837b..eab3f7a5321 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -137,7 +137,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *_method, const headers.push_back(pair<string, string>("HTTP_DATE", date_str)); string canonical_header; - map<string, string> meta_map; + meta_map_t meta_map; map<string, string> sub_resources; rgw_create_s3_canonical_header(method.c_str(), NULL, NULL, date_str.c_str(), @@ -292,7 +292,7 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz headers.emplace_back(kv); } - map<string, string>& meta_map = new_info.x_meta_map; + meta_map_t& meta_map = new_info.x_meta_map; for (const auto& kv: meta_map) { headers.emplace_back(kv); } @@ -426,7 +426,7 @@ static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, } } -static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map) +static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map) { struct grant_type_to_header *t; diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 1470d9a6e54..38c11a9e844 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -20,10 +20,19 @@ from .tests import get_realm, \ gen_bucket_name, \ get_user, \ get_tenant -from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics +from .zone_ps import PSTopic, \ + PSTopicS3, \ + PSNotification, \ + PSSubscription, \ + PSNotificationS3, \ + print_connection_info, \ + delete_all_s3_topics, \ + put_object_tagging, \ + get_object_tagging from multisite import User from nose import SkipTest from nose.tools import assert_not_equal, assert_equal +import boto.s3.tagging # configure logging for the tests module log = logging.getLogger(__name__) @@ -2597,30 +2606,152 @@ def test_ps_s3_metadata_on_master(): topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX + meta_key = 'meta1' + meta_value = 'This is my metadata value' + meta_prefix = 'x-amz-meta-' topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, - 'Events': ['s3:ObjectCreated:*'] - }] + 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], + 'Filter': { + 'Metadata': { + 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}] + } + } + }] s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket - key = bucket.new_key('foo') - key.set_metadata('meta1', 'This is my metadata value') + key_name = 'foo' + key = bucket.new_key(key_name) + key.set_metadata(meta_key, meta_value) key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') - keys = list(bucket.list()) + + # create objects in the bucket using COPY + bucket.copy_key('copy_of_foo', bucket.name, key.name) + # create objects in the bucket using multi-part upload + fp = tempfile.TemporaryFile(mode='w') + fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') + fp.close() + uploader = bucket.initiate_multipart_upload('multipart_foo', + metadata={meta_key: meta_value}) + fp = tempfile.TemporaryFile(mode='r') + uploader.upload_part_from_file(fp, 1) + uploader.complete_upload() + fp.close() print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - receiver.verify_s3_events(keys, exact_match=True) + event_count = 0 + for event in receiver.get_and_reset_events(): + assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key) + assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value) + event_count +=1 + + # only PUT and POST has the metadata value + assert_equal(event_count, 2) + + # delete objects + for key in bucket.list(): + key.delete() + print('wait for 5sec for the messages...') + time.sleep(5) + # check amqp receiver + event_count = 0 + for event in receiver.get_and_reset_events(): + assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key) + assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value) + event_count +=1 + + # all 3 object has metadata when deleted + assert_equal(event_count, 3) # cleanup stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() + # delete the bucket + zones[0].delete_bucket(bucket_name) + clean_rabbitmq(proc) + + +def test_ps_s3_tags_on_master(): + """ test s3 notification of tags on master """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + zones, _ = init_env(require_ps=False) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + + # create s3 topic + endpoint_address = 'amqp://' + hostname + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], + 'Filter': { + 'Tags': { + 'FilterRules': [{'Name': 'hello', 'Value': 'world'}] + } + } + }] + + s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket with tags + tags = 'hello=world&ka=boom' + key_name1 = 'key1' + put_object_tagging(zones[0].conn, bucket_name, key_name1, tags) + tags = 'foo=bar&ka=boom' + key_name2 = 'key2' + put_object_tagging(zones[0].conn, bucket_name, key_name2, tags) + key_name3 = 'key3' + key = bucket.new_key(key_name3) + key.set_contents_from_string('bar') + # create objects in the bucket using COPY + bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1) + print('wait for 5sec for the messages...') + time.sleep(5) + expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}] + # check amqp receiver + for event in receiver.get_and_reset_events(): + obj_tags = event['s3']['object']['tags'] + assert_equal(obj_tags[0], expected_tags[0]) + + # delete the objects for key in bucket.list(): key.delete() + print('wait for 5sec for the messages...') + time.sleep(5) + # check amqp receiver + for event in receiver.get_and_reset_events(): + obj_tags = event['s3']['object']['tags'] + assert_equal(obj_tags[0], expected_tags[0]) + + # cleanup + stop_amqp_receiver(receiver, task) + s3_notification_conf.del_config() + topic_conf.del_config() # delete the bucket zones[0].delete_bucket(bucket_name) clean_rabbitmq(proc) diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index a67a2fee43b..ddefde179bf 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -14,6 +14,26 @@ from botocore.client import Config log = logging.getLogger('rgw_multi.tests') +def put_object_tagging(conn, bucket_name, key, tags): + client = boto3.client('s3', + endpoint_url='http://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key, + config=Config(signature_version='s3')) + return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags) + + +def get_object_tagging(conn, bucket, object_key): + client = boto3.client('s3', + endpoint_url='http://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key, + config=Config(signature_version='s3')) + return client.get_object_tagging( + Bucket=bucket, + Key=object_key + ) + class PSZone(Zone): # pylint: disable=too-many-ancestors """ PubSub zone class """ |