summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_amqp.cc
diff options
context:
space:
mode:
authorYuval Lifshitz <yuvalif@yahoo.com>2019-03-31 21:52:27 +0200
committerYuval Lifshitz <yuvalif@yahoo.com>2019-03-31 21:52:27 +0200
commit376a3df0d12b544a0378a1467fcf4b2c32fd8afa (patch)
tree4386ef31a9e33d699d36a957bf2752819d1ceb58 /src/rgw/rgw_amqp.cc
parentrgw/pubsub: add s3-compatible API documentation (diff)
downloadceph-376a3df0d12b544a0378a1467fcf4b2c32fd8afa.tar.xz
ceph-376a3df0d12b544a0378a1467fcf4b2c32fd8afa.zip
rgw/pubsub: avoid static creation of amqp manager
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_amqp.cc')
-rw-r--r--src/rgw/rgw_amqp.cc61
1 files changed, 33 insertions, 28 deletions
diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc
index f9acabb2c5f..4143eaae977 100644
--- a/src/rgw/rgw_amqp.cc
+++ b/src/rgw/rgw_amqp.cc
@@ -2,6 +2,7 @@
// vim: ts=8 sw=2 smarttab
#include "rgw_amqp.h"
+#include <atomic>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
@@ -533,6 +534,7 @@ private:
MessageQueue messages;
std::atomic<size_t> queued;
std::atomic<size_t> dequeued;
+ CephContext* const cct;
mutable std::mutex connections_lock;
std::thread runner;
@@ -634,7 +636,6 @@ private:
for (;conn_it != end_it;) {
auto& conn = conn_it->second;
- ldout(conn->cct, 20) << "AMQP run: start processing connection" << dendl;
// delete the connection if marked for deletion
if (conn->marked_for_deletion) {
ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
@@ -668,7 +669,6 @@ private:
if (rc == AMQP_STATUS_TIMEOUT) {
// TODO mark connection as idle
- ldout(conn->cct, 20) << "AMQP run: connection is idle" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
@@ -777,7 +777,8 @@ public:
Manager(size_t _max_connections,
size_t _max_inflight,
size_t _max_queue,
- long _usec_timeout) :
+ long _usec_timeout,
+ CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
@@ -788,6 +789,7 @@ public:
messages(max_queue),
queued(0),
dequeued(0),
+ cct(_cct),
runner(&Manager::run, this) {
// The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
@@ -818,7 +820,7 @@ public:
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) {
+ connection_ptr_t connect(const std::string& url, const std::string& exchange) {
if (stopped) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
@@ -939,33 +941,36 @@ public:
// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
-class SingletonManager {
- std::mutex manager_creation_lock;
- public:
- SingletonManager(CephContext* cct=nullptr) {
- // TODO get parameters from conf
- std::lock_guard<std::mutex> l(manager_creation_lock);
- if (s_manager == nullptr) {
- s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100);
- }
- }
-};
+bool init(CephContext* cct) {
+ if (s_manager) {
+ return false;
+ }
+ // TODO: take conf from CephContext
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+ return true;
+}
+
+void shutdown() {
+ delete s_manager;
+ s_manager = nullptr;
+}
-connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) {
- static const SingletonManager singleton(cct);
- return s_manager->connect(url, exchange, cct);
+connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+ if (!s_manager) return nullptr;
+ return s_manager->connect(url, exchange);
}
int publish(connection_ptr_t& conn,
const std::string& topic,
const std::string& message) {
- if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish(conn, topic, message);
}
@@ -973,47 +978,47 @@ int publish_with_confirm(connection_ptr_t& conn,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
- if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish_with_confirm(conn, topic, message, cb);
}
size_t get_connection_count() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_connection_count();
}
size_t get_inflight() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_inflight();
}
size_t get_queued() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_queued();
}
size_t get_dequeued() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_dequeued();
}
size_t get_max_connections() {
- if (s_manager == nullptr) return MAX_CONNECTIONS_DEFAULT;
+ if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
return s_manager->max_connections;
}
size_t get_max_inflight() {
- if (s_manager == nullptr) return MAX_INFLIGHT_DEFAULT;
+ if (!s_manager) return MAX_INFLIGHT_DEFAULT;
return s_manager->max_inflight;
}
size_t get_max_queue() {
- if (s_manager == nullptr) return MAX_QUEUE_DEFAULT;
+ if (!s_manager) return MAX_QUEUE_DEFAULT;
return s_manager->max_queue;
}
bool disconnect(connection_ptr_t& conn) {
- if (s_manager == nullptr) return false;
+ if (!s_manager) return false;
return s_manager->disconnect(conn);
}