diff options
author | Yuval Lifshitz <yuvalif@yahoo.com> | 2019-08-12 18:48:15 +0200 |
---|---|---|
committer | Yuval Lifshitz <yuvalif@yahoo.com> | 2019-09-10 17:54:05 +0200 |
commit | dc31b030ea30f3b7bfdeb9fa92e303892d17138a (patch) | |
tree | 93208312bffafb7fe6e6994089479a8715267478 /src/rgw/rgw_pubsub_push.cc | |
parent | rgw/pubsub: allow pubsub REST API on master (diff) | |
download | ceph-dc31b030ea30f3b7bfdeb9fa92e303892d17138a.tar.xz ceph-dc31b030ea30f3b7bfdeb9fa92e303892d17138a.zip |
rgw/pubsub: push notifications from ops
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_pubsub_push.cc')
-rw-r--r-- | src/rgw/rgw_pubsub_push.cc | 127 |
1 files changed, 111 insertions, 16 deletions
diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index b1e0430e23c..dd825e0570b 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -7,6 +7,7 @@ #include <algorithm> #include "include/buffer_fwd.h" #include "common/Formatter.h" +#include "common/async/completion.h" #include "rgw_common.h" #include "rgw_data_sync.h" #include "rgw_pubsub.h" @@ -101,7 +102,7 @@ public: } else { ack_level = std::atoi(str_ack_level.c_str()); if (ack_level < 100 || ack_level >= 600) { - throw configuration_error("HTTP/S: invalid http-ack-level " + str_ack_level); + throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level); } } @@ -125,6 +126,19 @@ public: return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl); } + int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { + bufferlist read_bl; + RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); + const auto post_data = json_format_pubsub_event(record); + request.set_post_data(post_data); + request.set_send_length(post_data.length()); + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); + const auto rc = RGWHTTP::process(&request, y); + if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); + // TODO: use read_bl to process return code and handle according to ack level + return rc; + } + std::string to_str() const override { std::string str("HTTP/S Endpoint"); str += "\nURI: " + endpoint; @@ -143,6 +157,7 @@ private: ACK_LEVEL_BROKER, ACK_LEVEL_ROUTEABLE }; + CephContext* const cct; const std::string endpoint; const std::string topic; amqp::connection_ptr_t conn; @@ -162,17 +177,16 @@ private: // This coroutine ends when it send the message and does not wait for an ack class NoAckPublishCR : public RGWCoroutine { private: - RGWDataSyncEnv* const sync_env; const std::string topic; amqp::connection_ptr_t conn; const std::string message; public: - NoAckPublishCR(RGWDataSyncEnv* _sync_env, + NoAckPublishCR(CephContext* cct, const std::string& _topic, amqp::connection_ptr_t& _conn, const std::string& _message) : - RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + RGWCoroutine(cct), topic(_topic), conn(_conn), message(_message) {} // send message to endpoint, without waiting for reply @@ -193,19 +207,18 @@ private: // note that it does not wait for an ack fron the end client class AckPublishCR : public RGWCoroutine, public RGWIOProvider { private: - RGWDataSyncEnv* const sync_env; const std::string topic; amqp::connection_ptr_t conn; const std::string message; const ack_level_t ack_level; // TODO not used for now public: - AckPublishCR(RGWDataSyncEnv* _sync_env, + AckPublishCR(CephContext* cct, const std::string& _topic, amqp::connection_ptr_t& _conn, const std::string& _message, ack_level_t _ack_level) : - RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + RGWCoroutine(cct), topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {} // send message to endpoint, waiting for reply @@ -255,10 +268,14 @@ public: RGWPubSubAMQPEndpoint(const std::string& _endpoint, const std::string& _topic, const RGWHTTPArgs& args, - CephContext* cct) : + CephContext* _cct) : + cct(_cct), endpoint(_endpoint), topic(_topic), conn(amqp::connect(endpoint, get_exchange(args))) { + if (!conn) { + throw configuration_error("AMQP: failed to create connection to: " + endpoint); + } bool exists; // get ack level str_ack_level = args.get("amqp-ack-level", &exists); @@ -270,27 +287,105 @@ public: } else if (str_ack_level == "routable") { ack_level = ACK_LEVEL_ROUTEABLE; } else { - throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level); + throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); } } RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { + ceph_assert(conn); if (ack_level == ACK_LEVEL_NONE) { - return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event)); + return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); } else { // TODO: currently broker and routable are the same - this will require different flags // but the same mechanism - return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level); + return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level); } } RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override { + ceph_assert(conn); if (ack_level == ACK_LEVEL_NONE) { - return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record)); + return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); } else { // TODO: currently broker and routable are the same - this will require different flags // but the same mechanism - return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level); + return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level); + } + } + + // this allows waiting untill "finish()" is called from a different thread + // waiting could be blocking the waiting thread or yielding, depending + // with compilation flag support and whether the optional_yield is set + class Waiter { + using Signature = void(boost::system::error_code); + using Completion = ceph::async::Completion<Signature>; + std::unique_ptr<Completion> completion = nullptr; + int ret; + + mutable std::atomic<bool> done = false; + mutable std::mutex lock; + mutable std::condition_variable cond; + + template <typename ExecutionContext, typename CompletionToken> + auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { + boost::asio::async_completion<CompletionToken, Signature> init(token); + auto& handler = init.completion_handler; + { + std::unique_lock l{lock}; + completion = Completion::create(ctx.get_executor(), std::move(handler)); + } + return init.result.get(); + } + + public: + int wait(optional_yield y) { + if (done) { + return ret; + } +#ifdef HAVE_BOOST_CONTEXT + if (y) { + auto& io_ctx = y.get_io_context(); + auto& yield_ctx = y.get_yield_context(); + boost::system::error_code ec; + async_wait(io_ctx, yield_ctx[ec]); + return -ec.value(); + } +#endif + std::unique_lock l(lock); + cond.wait(l, [this]{return (done==true);}); + return ret; + } + + void finish(int r) { + std::unique_lock l{lock}; + ret = r; + done = true; + if (completion) { + boost::system::error_code ec(-ret, boost::system::system_category()); + Completion::post(std::move(completion), ec); + } else { + cond.notify_all(); + } + } + }; + + int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override { + ceph_assert(conn); + if (ack_level == ACK_LEVEL_NONE) { + return amqp::publish(conn, topic, json_format_pubsub_event(record)); + } else { + // TODO: currently broker and routable are the same - this will require different flags but the same mechanism + // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine + auto w = std::unique_ptr<Waiter>(new Waiter); + const auto rc = amqp::publish_with_confirm(conn, + topic, + json_format_pubsub_event(record), + std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); + if (rc < 0) { + // failed to publish, does not wait for reply + return rc; + } + return w->wait(y); } } @@ -348,14 +443,14 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, if (version == AMQP_0_9_1) { return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct)); } else if (version == AMQP_1_0) { - throw configuration_error("amqp v1.0 not supported"); + throw configuration_error("AMQP: v1.0 not supported"); return nullptr; } else { - throw configuration_error("unknown amqp version " + version); + throw configuration_error("AMQP: unknown version: " + version); return nullptr; } } else if (schema == "amqps") { - throw configuration_error("amqps not supported"); + throw configuration_error("AMQP: ssl not supported"); return nullptr; #endif } |