summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_pubsub_push.cc
diff options
context:
space:
mode:
authorYuval Lifshitz <yuvalif@yahoo.com>2019-08-12 18:48:15 +0200
committerYuval Lifshitz <yuvalif@yahoo.com>2019-09-10 17:54:05 +0200
commitdc31b030ea30f3b7bfdeb9fa92e303892d17138a (patch)
tree93208312bffafb7fe6e6994089479a8715267478 /src/rgw/rgw_pubsub_push.cc
parentrgw/pubsub: allow pubsub REST API on master (diff)
downloadceph-dc31b030ea30f3b7bfdeb9fa92e303892d17138a.tar.xz
ceph-dc31b030ea30f3b7bfdeb9fa92e303892d17138a.zip
rgw/pubsub: push notifications from ops
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_pubsub_push.cc')
-rw-r--r--src/rgw/rgw_pubsub_push.cc127
1 files changed, 111 insertions, 16 deletions
diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc
index b1e0430e23c..dd825e0570b 100644
--- a/src/rgw/rgw_pubsub_push.cc
+++ b/src/rgw/rgw_pubsub_push.cc
@@ -7,6 +7,7 @@
#include <algorithm>
#include "include/buffer_fwd.h"
#include "common/Formatter.h"
+#include "common/async/completion.h"
#include "rgw_common.h"
#include "rgw_data_sync.h"
#include "rgw_pubsub.h"
@@ -101,7 +102,7 @@ public:
} else {
ack_level = std::atoi(str_ack_level.c_str());
if (ack_level < 100 || ack_level >= 600) {
- throw configuration_error("HTTP/S: invalid http-ack-level " + str_ack_level);
+ throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
}
}
@@ -125,6 +126,19 @@ public:
return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
}
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ bufferlist read_bl;
+ RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
+ const auto post_data = json_format_pubsub_event(record);
+ request.set_post_data(post_data);
+ request.set_send_length(post_data.length());
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+ const auto rc = RGWHTTP::process(&request, y);
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ // TODO: use read_bl to process return code and handle according to ack level
+ return rc;
+ }
+
std::string to_str() const override {
std::string str("HTTP/S Endpoint");
str += "\nURI: " + endpoint;
@@ -143,6 +157,7 @@ private:
ACK_LEVEL_BROKER,
ACK_LEVEL_ROUTEABLE
};
+ CephContext* const cct;
const std::string endpoint;
const std::string topic;
amqp::connection_ptr_t conn;
@@ -162,17 +177,16 @@ private:
// This coroutine ends when it send the message and does not wait for an ack
class NoAckPublishCR : public RGWCoroutine {
private:
- RGWDataSyncEnv* const sync_env;
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
public:
- NoAckPublishCR(RGWDataSyncEnv* _sync_env,
+ NoAckPublishCR(CephContext* cct,
const std::string& _topic,
amqp::connection_ptr_t& _conn,
const std::string& _message) :
- RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ RGWCoroutine(cct),
topic(_topic), conn(_conn), message(_message) {}
// send message to endpoint, without waiting for reply
@@ -193,19 +207,18 @@ private:
// note that it does not wait for an ack fron the end client
class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
private:
- RGWDataSyncEnv* const sync_env;
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
const ack_level_t ack_level; // TODO not used for now
public:
- AckPublishCR(RGWDataSyncEnv* _sync_env,
+ AckPublishCR(CephContext* cct,
const std::string& _topic,
amqp::connection_ptr_t& _conn,
const std::string& _message,
ack_level_t _ack_level) :
- RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ RGWCoroutine(cct),
topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
// send message to endpoint, waiting for reply
@@ -255,10 +268,14 @@ public:
RGWPubSubAMQPEndpoint(const std::string& _endpoint,
const std::string& _topic,
const RGWHTTPArgs& args,
- CephContext* cct) :
+ CephContext* _cct) :
+ cct(_cct),
endpoint(_endpoint),
topic(_topic),
conn(amqp::connect(endpoint, get_exchange(args))) {
+ if (!conn) {
+ throw configuration_error("AMQP: failed to create connection to: " + endpoint);
+ }
bool exists;
// get ack level
str_ack_level = args.get("amqp-ack-level", &exists);
@@ -270,27 +287,105 @@ public:
} else if (str_ack_level == "routable") {
ack_level = ACK_LEVEL_ROUTEABLE;
} else {
- throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
+ throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
}
}
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
if (ack_level == ACK_LEVEL_NONE) {
- return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags
// but the same mechanism
- return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
}
}
RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
if (ack_level == ACK_LEVEL_NONE) {
- return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record));
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
} else {
// TODO: currently broker and routable are the same - this will require different flags
// but the same mechanism
- return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level);
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
+ }
+ }
+
+ // this allows waiting untill "finish()" is called from a different thread
+ // waiting could be blocking the waiting thread or yielding, depending
+ // with compilation flag support and whether the optional_yield is set
+ class Waiter {
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ std::unique_ptr<Completion> completion = nullptr;
+ int ret;
+
+ mutable std::atomic<bool> done = false;
+ mutable std::mutex lock;
+ mutable std::condition_variable cond;
+
+ template <typename ExecutionContext, typename CompletionToken>
+ auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ {
+ std::unique_lock l{lock};
+ completion = Completion::create(ctx.get_executor(), std::move(handler));
+ }
+ return init.result.get();
+ }
+
+ public:
+ int wait(optional_yield y) {
+ if (done) {
+ return ret;
+ }
+#ifdef HAVE_BOOST_CONTEXT
+ if (y) {
+ auto& io_ctx = y.get_io_context();
+ auto& yield_ctx = y.get_yield_context();
+ boost::system::error_code ec;
+ async_wait(io_ctx, yield_ctx[ec]);
+ return -ec.value();
+ }
+#endif
+ std::unique_lock l(lock);
+ cond.wait(l, [this]{return (done==true);});
+ return ret;
+ }
+
+ void finish(int r) {
+ std::unique_lock l{lock};
+ ret = r;
+ done = true;
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
+ } else {
+ cond.notify_all();
+ }
+ }
+ };
+
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ ceph_assert(conn);
+ if (ack_level == ACK_LEVEL_NONE) {
+ return amqp::publish(conn, topic, json_format_pubsub_event(record));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
+ // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
+ auto w = std::unique_ptr<Waiter>(new Waiter);
+ const auto rc = amqp::publish_with_confirm(conn,
+ topic,
+ json_format_pubsub_event(record),
+ std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ if (rc < 0) {
+ // failed to publish, does not wait for reply
+ return rc;
+ }
+ return w->wait(y);
}
}
@@ -348,14 +443,14 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
if (version == AMQP_0_9_1) {
return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
} else if (version == AMQP_1_0) {
- throw configuration_error("amqp v1.0 not supported");
+ throw configuration_error("AMQP: v1.0 not supported");
return nullptr;
} else {
- throw configuration_error("unknown amqp version " + version);
+ throw configuration_error("AMQP: unknown version: " + version);
return nullptr;
}
} else if (schema == "amqps") {
- throw configuration_error("amqps not supported");
+ throw configuration_error("AMQP: ssl not supported");
return nullptr;
#endif
}