diff options
author | Yuval Lifshitz <yuvalif@yahoo.com> | 2019-03-13 18:54:16 +0100 |
---|---|---|
committer | Yuval Lifshitz <yuvalif@yahoo.com> | 2019-03-21 07:57:21 +0100 |
commit | 2bd353233112ead32181b94a2af1b04e3fa9e1de (patch) | |
tree | a89bb7190f40452aba80e0c3ee3ff80f1599a675 /src/rgw/rgw_pubsub_push.cc | |
parent | rgw/pubsub: initial version of S3 compliant API (diff) | |
download | ceph-2bd353233112ead32181b94a2af1b04e3fa9e1de.tar.xz ceph-2bd353233112ead32181b94a2af1b04e3fa9e1de.zip |
rgw: pubsub support s3 records. refactor ARN
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_pubsub_push.cc')
-rw-r--r-- | src/rgw/rgw_pubsub_push.cc | 176 |
1 files changed, 95 insertions, 81 deletions
diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index efc1823e748..5c4ca84db9f 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -21,10 +21,11 @@ using namespace rgw; -std::string json_format_pubsub_event(const rgw_pubsub_event& event) { +template<typename EventType> +std::string json_format_pubsub_event(const EventType& event) { std::stringstream ss; JSONFormatter f(false); - encode_json("event", event, &f); + encode_json(EventType::json_type_single, event, &f); f.flush(ss); return ss.str(); } @@ -88,39 +89,42 @@ private: public: RGWPubSubHTTPEndpoint(const std::string& _endpoint, - const RGWHTTPArgs& args) : - endpoint(_endpoint) { - bool exists; - - str_ack_level = args.get("http-ack-level", &exists); - if (!exists || str_ack_level == "any") { - // "any" is default - ack_level = ACK_LEVEL_ANY; - } else if (str_ack_level == "non-error") { - ack_level = ACK_LEVEL_NON_ERROR; - } else { - ack_level = std::atoi(str_ack_level.c_str()); - if (ack_level < 100 || ack_level >= 600) { - throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level); - } - } + const RGWHTTPArgs& args) : endpoint(_endpoint) { + bool exists; - auto str_verify_ssl = args.get("verify-ssl", &exists); - boost::algorithm::to_lower(str_verify_ssl); - // verify server certificate by default - if (!exists || str_verify_ssl == "true") { - verify_ssl = true; - } else if (str_verify_ssl == "false") { - verify_ssl = false; - } else { - throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl); + str_ack_level = args.get("http-ack-level", &exists); + if (!exists || str_ack_level == "any") { + // "any" is default + ack_level = ACK_LEVEL_ANY; + } else if (str_ack_level == "non-error") { + ack_level = ACK_LEVEL_NON_ERROR; + } else { + ack_level = std::atoi(str_ack_level.c_str()); + if (ack_level < 100 || ack_level >= 600) { + throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level); } } + auto str_verify_ssl = args.get("verify-ssl", &exists); + boost::algorithm::to_lower(str_verify_ssl); + // verify server certificate by default + if (!exists || str_verify_ssl == "true") { + verify_ssl = true; + } else if (str_verify_ssl == "false") { + verify_ssl = false; + } else { + throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl); + } + } + RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl); } + RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { + return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl); + } + std::string to_str() const override { std::string str("HTTP Endpoint"); str += "\nURI: " + endpoint; @@ -133,26 +137,26 @@ public: #ifdef WITH_RADOSGW_AMQP_ENDPOINT class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { - private: - enum ack_level_t { - ACK_LEVEL_NONE, - ACK_LEVEL_BROKER, - ACK_LEVEL_ROUTEABLE - }; - const std::string endpoint; - const std::string topic; - amqp::connection_ptr_t conn; - ack_level_t ack_level; - std::string str_ack_level; - - static std::string get_exchange(const RGWHTTPArgs& args) { - bool exists; - const auto exchange = args.get("amqp-exchange", &exists); - if (!exists) { - throw configuration_error("AMQP: missing amqp-exchange"); - } - return exchange; +private: + enum ack_level_t { + ACK_LEVEL_NONE, + ACK_LEVEL_BROKER, + ACK_LEVEL_ROUTEABLE + }; + const std::string endpoint; + const std::string topic; + amqp::connection_ptr_t conn; + ack_level_t ack_level; + std::string str_ack_level; + + static std::string get_exchange(const RGWHTTPArgs& args) { + bool exists; + const auto exchange = args.get("amqp-exchange", &exists); + if (!exists) { + throw configuration_error("AMQP: missing amqp-exchange"); } + return exchange; + } // NoAckPublishCR implements async amqp publishing via coroutine // This coroutine ends when it send the message and does not wait for an ack @@ -247,45 +251,55 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { } }; - public: - RGWPubSubAMQPEndpoint(const std::string& _endpoint, - const std::string& _topic, - const RGWHTTPArgs& args) : - endpoint(_endpoint), - topic(_topic), - conn(amqp::connect(endpoint, get_exchange(args))) { - bool exists; - // get ack level - str_ack_level = args.get("amqp-ack-level", &exists); - if (!exists || str_ack_level == "broker") { - // "broker" is default - ack_level = ACK_LEVEL_BROKER; - } else if (str_ack_level == "none") { - ack_level = ACK_LEVEL_NONE; - } else if (str_ack_level == "routable") { - ack_level = ACK_LEVEL_ROUTEABLE; - } else { - throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level); - } +public: + RGWPubSubAMQPEndpoint(const std::string& _endpoint, + const std::string& _topic, + const RGWHTTPArgs& args) : + endpoint(_endpoint), + topic(_topic), + conn(amqp::connect(endpoint, get_exchange(args))) { + bool exists; + // get ack level + str_ack_level = args.get("amqp-ack-level", &exists); + if (!exists || str_ack_level == "broker") { + // "broker" is default + ack_level = ACK_LEVEL_BROKER; + } else if (str_ack_level == "none") { + ack_level = ACK_LEVEL_NONE; + } else if (str_ack_level == "routable") { + ack_level = ACK_LEVEL_ROUTEABLE; + } else { + throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level); } + } - RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { - if (ack_level == ACK_LEVEL_NONE) { - return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event)); - } else { - // TODO: currently broker and routable are the same - this will require different flags - // but the same mechanism - return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level); - } + RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { + if (ack_level == ACK_LEVEL_NONE) { + return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event)); + } else { + // TODO: currently broker and routable are the same - this will require different flags + // but the same mechanism + return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level); } - - std::string to_str() const override { - std::string str("AMQP(0.9.1) Endpoint"); - str += "\nURI: " + endpoint; - str += "\nTopic: " + topic; - str += "\nAck Level: " + str_ack_level; - return str; + } + + RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { + if (ack_level == ACK_LEVEL_NONE) { + return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record)); + } else { + // TODO: currently broker and routable are the same - this will require different flags + // but the same mechanism + return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level); } + } + + std::string to_str() const override { + std::string str("AMQP(0.9.1) Endpoint"); + str += "\nURI: " + endpoint; + str += "\nTopic: " + topic; + str += "\nAck Level: " + str_ack_level; + return str; + } }; static const std::string AMQP_0_9_1("0-9-1"); |