summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_pubsub_push.cc
diff options
context:
space:
mode:
authorYuval Lifshitz <yuvalif@yahoo.com>2019-03-13 18:54:16 +0100
committerYuval Lifshitz <yuvalif@yahoo.com>2019-03-21 07:57:21 +0100
commit2bd353233112ead32181b94a2af1b04e3fa9e1de (patch)
treea89bb7190f40452aba80e0c3ee3ff80f1599a675 /src/rgw/rgw_pubsub_push.cc
parentrgw/pubsub: initial version of S3 compliant API (diff)
downloadceph-2bd353233112ead32181b94a2af1b04e3fa9e1de.tar.xz
ceph-2bd353233112ead32181b94a2af1b04e3fa9e1de.zip
rgw: pubsub support s3 records. refactor ARN
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_pubsub_push.cc')
-rw-r--r--src/rgw/rgw_pubsub_push.cc176
1 files changed, 95 insertions, 81 deletions
diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc
index efc1823e748..5c4ca84db9f 100644
--- a/src/rgw/rgw_pubsub_push.cc
+++ b/src/rgw/rgw_pubsub_push.cc
@@ -21,10 +21,11 @@
using namespace rgw;
-std::string json_format_pubsub_event(const rgw_pubsub_event& event) {
+template<typename EventType>
+std::string json_format_pubsub_event(const EventType& event) {
std::stringstream ss;
JSONFormatter f(false);
- encode_json("event", event, &f);
+ encode_json(EventType::json_type_single, event, &f);
f.flush(ss);
return ss.str();
}
@@ -88,39 +89,42 @@ private:
public:
RGWPubSubHTTPEndpoint(const std::string& _endpoint,
- const RGWHTTPArgs& args) :
- endpoint(_endpoint) {
- bool exists;
-
- str_ack_level = args.get("http-ack-level", &exists);
- if (!exists || str_ack_level == "any") {
- // "any" is default
- ack_level = ACK_LEVEL_ANY;
- } else if (str_ack_level == "non-error") {
- ack_level = ACK_LEVEL_NON_ERROR;
- } else {
- ack_level = std::atoi(str_ack_level.c_str());
- if (ack_level < 100 || ack_level >= 600) {
- throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level);
- }
- }
+ const RGWHTTPArgs& args) : endpoint(_endpoint) {
+ bool exists;
- auto str_verify_ssl = args.get("verify-ssl", &exists);
- boost::algorithm::to_lower(str_verify_ssl);
- // verify server certificate by default
- if (!exists || str_verify_ssl == "true") {
- verify_ssl = true;
- } else if (str_verify_ssl == "false") {
- verify_ssl = false;
- } else {
- throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl);
+ str_ack_level = args.get("http-ack-level", &exists);
+ if (!exists || str_ack_level == "any") {
+ // "any" is default
+ ack_level = ACK_LEVEL_ANY;
+ } else if (str_ack_level == "non-error") {
+ ack_level = ACK_LEVEL_NON_ERROR;
+ } else {
+ ack_level = std::atoi(str_ack_level.c_str());
+ if (ack_level < 100 || ack_level >= 600) {
+ throw configuration_error("HTTP: invalid http-ack-level " + str_ack_level);
}
}
+ auto str_verify_ssl = args.get("verify-ssl", &exists);
+ boost::algorithm::to_lower(str_verify_ssl);
+ // verify server certificate by default
+ if (!exists || str_verify_ssl == "true") {
+ verify_ssl = true;
+ } else if (str_verify_ssl == "false") {
+ verify_ssl = false;
+ } else {
+ throw configuration_error("HTTP: verify-ssl must be true/false, not: " + str_verify_ssl);
+ }
+ }
+
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
}
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
+ }
+
std::string to_str() const override {
std::string str("HTTP Endpoint");
str += "\nURI: " + endpoint;
@@ -133,26 +137,26 @@ public:
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
- private:
- enum ack_level_t {
- ACK_LEVEL_NONE,
- ACK_LEVEL_BROKER,
- ACK_LEVEL_ROUTEABLE
- };
- const std::string endpoint;
- const std::string topic;
- amqp::connection_ptr_t conn;
- ack_level_t ack_level;
- std::string str_ack_level;
-
- static std::string get_exchange(const RGWHTTPArgs& args) {
- bool exists;
- const auto exchange = args.get("amqp-exchange", &exists);
- if (!exists) {
- throw configuration_error("AMQP: missing amqp-exchange");
- }
- return exchange;
+private:
+ enum ack_level_t {
+ ACK_LEVEL_NONE,
+ ACK_LEVEL_BROKER,
+ ACK_LEVEL_ROUTEABLE
+ };
+ const std::string endpoint;
+ const std::string topic;
+ amqp::connection_ptr_t conn;
+ ack_level_t ack_level;
+ std::string str_ack_level;
+
+ static std::string get_exchange(const RGWHTTPArgs& args) {
+ bool exists;
+ const auto exchange = args.get("amqp-exchange", &exists);
+ if (!exists) {
+ throw configuration_error("AMQP: missing amqp-exchange");
}
+ return exchange;
+ }
// NoAckPublishCR implements async amqp publishing via coroutine
// This coroutine ends when it send the message and does not wait for an ack
@@ -247,45 +251,55 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
}
};
- public:
- RGWPubSubAMQPEndpoint(const std::string& _endpoint,
- const std::string& _topic,
- const RGWHTTPArgs& args) :
- endpoint(_endpoint),
- topic(_topic),
- conn(amqp::connect(endpoint, get_exchange(args))) {
- bool exists;
- // get ack level
- str_ack_level = args.get("amqp-ack-level", &exists);
- if (!exists || str_ack_level == "broker") {
- // "broker" is default
- ack_level = ACK_LEVEL_BROKER;
- } else if (str_ack_level == "none") {
- ack_level = ACK_LEVEL_NONE;
- } else if (str_ack_level == "routable") {
- ack_level = ACK_LEVEL_ROUTEABLE;
- } else {
- throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
- }
+public:
+ RGWPubSubAMQPEndpoint(const std::string& _endpoint,
+ const std::string& _topic,
+ const RGWHTTPArgs& args) :
+ endpoint(_endpoint),
+ topic(_topic),
+ conn(amqp::connect(endpoint, get_exchange(args))) {
+ bool exists;
+ // get ack level
+ str_ack_level = args.get("amqp-ack-level", &exists);
+ if (!exists || str_ack_level == "broker") {
+ // "broker" is default
+ ack_level = ACK_LEVEL_BROKER;
+ } else if (str_ack_level == "none") {
+ ack_level = ACK_LEVEL_NONE;
+ } else if (str_ack_level == "routable") {
+ ack_level = ACK_LEVEL_ROUTEABLE;
+ } else {
+ throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
}
+ }
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
- if (ack_level == ACK_LEVEL_NONE) {
- return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
- } else {
- // TODO: currently broker and routable are the same - this will require different flags
- // but the same mechanism
- return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
- }
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+ if (ack_level == ACK_LEVEL_NONE) {
+ return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags
+ // but the same mechanism
+ return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
}
-
- std::string to_str() const override {
- std::string str("AMQP(0.9.1) Endpoint");
- str += "\nURI: " + endpoint;
- str += "\nTopic: " + topic;
- str += "\nAck Level: " + str_ack_level;
- return str;
+ }
+
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ if (ack_level == ACK_LEVEL_NONE) {
+ return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags
+ // but the same mechanism
+ return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level);
}
+ }
+
+ std::string to_str() const override {
+ std::string str("AMQP(0.9.1) Endpoint");
+ str += "\nURI: " + endpoint;
+ str += "\nTopic: " + topic;
+ str += "\nAck Level: " + str_ack_level;
+ return str;
+ }
};
static const std::string AMQP_0_9_1("0-9-1");