diff options
author | Yuval Lifshitz <yuvalif@yahoo.com> | 2019-03-31 21:52:27 +0200 |
---|---|---|
committer | Yuval Lifshitz <yuvalif@yahoo.com> | 2019-03-31 21:52:27 +0200 |
commit | 376a3df0d12b544a0378a1467fcf4b2c32fd8afa (patch) | |
tree | 4386ef31a9e33d699d36a957bf2752819d1ceb58 /src/rgw/rgw_amqp.cc | |
parent | rgw/pubsub: add s3-compatible API documentation (diff) | |
download | ceph-376a3df0d12b544a0378a1467fcf4b2c32fd8afa.tar.xz ceph-376a3df0d12b544a0378a1467fcf4b2c32fd8afa.zip |
rgw/pubsub: avoid static creation of amqp manager
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_amqp.cc')
-rw-r--r-- | src/rgw/rgw_amqp.cc | 61 |
1 files changed, 33 insertions, 28 deletions
diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index f9acabb2c5f..4143eaae977 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "rgw_amqp.h" +#include <atomic> #include <amqp.h> #include <amqp_tcp_socket.h> #include <amqp_framing.h> @@ -533,6 +534,7 @@ private: MessageQueue messages; std::atomic<size_t> queued; std::atomic<size_t> dequeued; + CephContext* const cct; mutable std::mutex connections_lock; std::thread runner; @@ -634,7 +636,6 @@ private: for (;conn_it != end_it;) { auto& conn = conn_it->second; - ldout(conn->cct, 20) << "AMQP run: start processing connection" << dendl; // delete the connection if marked for deletion if (conn->marked_for_deletion) { ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl; @@ -668,7 +669,6 @@ private: if (rc == AMQP_STATUS_TIMEOUT) { // TODO mark connection as idle - ldout(conn->cct, 20) << "AMQP run: connection is idle" << dendl; INCREMENT_AND_CONTINUE(conn_it); } @@ -777,7 +777,8 @@ public: Manager(size_t _max_connections, size_t _max_inflight, size_t _max_queue, - long _usec_timeout) : + long _usec_timeout, + CephContext* _cct) : max_connections(_max_connections), max_inflight(_max_inflight), max_queue(_max_queue), @@ -788,6 +789,7 @@ public: messages(max_queue), queued(0), dequeued(0), + cct(_cct), runner(&Manager::run, this) { // The hashmap has "max connections" as the initial number of buckets, // and allows for 10 collisions per bucket before rehash. @@ -818,7 +820,7 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) { + connection_ptr_t connect(const std::string& url, const std::string& exchange) { if (stopped) { // TODO: increment counter ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl; @@ -939,33 +941,36 @@ public: // singleton manager // note that the manager itself is not a singleton, and multiple instances may co-exist +// TODO make the pointer atomic in allocation and deallocation to avoid race conditions static Manager* s_manager = nullptr; static const size_t MAX_CONNECTIONS_DEFAULT = 256; static const size_t MAX_INFLIGHT_DEFAULT = 8192; static const size_t MAX_QUEUE_DEFAULT = 8192; -class SingletonManager { - std::mutex manager_creation_lock; - public: - SingletonManager(CephContext* cct=nullptr) { - // TODO get parameters from conf - std::lock_guard<std::mutex> l(manager_creation_lock); - if (s_manager == nullptr) { - s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100); - } - } -}; +bool init(CephContext* cct) { + if (s_manager) { + return false; + } + // TODO: take conf from CephContext + s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct); + return true; +} + +void shutdown() { + delete s_manager; + s_manager = nullptr; +} -connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) { - static const SingletonManager singleton(cct); - return s_manager->connect(url, exchange, cct); +connection_ptr_t connect(const std::string& url, const std::string& exchange) { + if (!s_manager) return nullptr; + return s_manager->connect(url, exchange); } int publish(connection_ptr_t& conn, const std::string& topic, const std::string& message) { - if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED; + if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; return s_manager->publish(conn, topic, message); } @@ -973,47 +978,47 @@ int publish_with_confirm(connection_ptr_t& conn, const std::string& topic, const std::string& message, reply_callback_t cb) { - if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED; + if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; return s_manager->publish_with_confirm(conn, topic, message, cb); } size_t get_connection_count() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_connection_count(); } size_t get_inflight() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_inflight(); } size_t get_queued() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_queued(); } size_t get_dequeued() { - if (s_manager == nullptr) return 0; + if (!s_manager) return 0; return s_manager->get_dequeued(); } size_t get_max_connections() { - if (s_manager == nullptr) return MAX_CONNECTIONS_DEFAULT; + if (!s_manager) return MAX_CONNECTIONS_DEFAULT; return s_manager->max_connections; } size_t get_max_inflight() { - if (s_manager == nullptr) return MAX_INFLIGHT_DEFAULT; + if (!s_manager) return MAX_INFLIGHT_DEFAULT; return s_manager->max_inflight; } size_t get_max_queue() { - if (s_manager == nullptr) return MAX_QUEUE_DEFAULT; + if (!s_manager) return MAX_QUEUE_DEFAULT; return s_manager->max_queue; } bool disconnect(connection_ptr_t& conn) { - if (s_manager == nullptr) return false; + if (!s_manager) return false; return s_manager->disconnect(conn); } |