summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYuval Lifshitz <ylifshit@ibm.com>2024-04-18 11:39:42 +0200
committerYuval Lifshitz <ylifshit@ibm.com>2024-05-12 12:39:08 +0200
commit15536cf7d75b40755a9a291498e25bf8c3d55fbf (patch)
tree85e3140827df391d8e79b948dd16712eb4685f99
parentrgw/notifications: cleanup includes and unused parameters (diff)
downloadceph-15536cf7d75b40755a9a291498e25bf8c3d55fbf.tar.xz
ceph-15536cf7d75b40755a9a291498e25bf8c3d55fbf.zip
rgw/notifications: start/stop endpoint managers in notification manager
Fixes: https://tracker.ceph.com/issues/65337 Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
-rw-r--r--src/rgw/driver/rados/rgw_notify.cc131
-rw-r--r--src/rgw/driver/rados/rgw_notify.h4
-rw-r--r--src/rgw/driver/rados/rgw_pubsub_push.cc61
-rw-r--r--src/rgw/driver/rados/rgw_pubsub_push.h6
-rw-r--r--src/rgw/driver/rados/rgw_rados.cc6
-rw-r--r--src/rgw/rgw_amqp.cc14
-rw-r--r--src/rgw/rgw_appmain.cc26
-rw-r--r--src/rgw/rgw_kafka.cc14
-rw-r--r--src/rgw/rgw_lib.cc1
-rw-r--r--src/rgw/rgw_main.cc1
-rw-r--r--src/rgw/rgw_main.h1
-rw-r--r--src/test/rgw/bucket_notification/test_bn.py108
12 files changed, 294 insertions, 79 deletions
diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc
index 43481629bdf..c70693895bf 100644
--- a/src/rgw/driver/rados/rgw_notify.cc
+++ b/src/rgw/driver/rados/rgw_notify.cc
@@ -123,6 +123,7 @@ void publish_commit_completion(rados_completion_t completion, void *arg) {
};
class Manager : public DoutPrefixProvider {
+ bool shutdown = false;
const size_t max_queue_size;
const uint32_t queues_update_period_ms;
const uint32_t queues_update_retry_ms;
@@ -311,7 +312,7 @@ private:
// clean stale reservation from queue
void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
- while (true) {
+ while (!shutdown) {
ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
const auto now = ceph::coarse_real_time::clock::now();
const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
@@ -346,6 +347,27 @@ private:
boost::system::error_code ec;
timer.async_wait(yield[ec]);
}
+ ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl;
+ }
+
+ // unlock (lose ownership) queue
+ int unlock_queue(const std::string& queue_name, spawn::yield_context yield) {
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
+ auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+ const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ if (ret == -ENOENT) {
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. nothing to unlock" << dendl;
+ return 0;
+ }
+ if (ret == -EBUSY) {
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". already owned by another RGW. no need to unlock" << dendl;
+ return 0;
+ }
+ return ret;
}
// processing of a specific queue
@@ -361,7 +383,7 @@ private:
CountersManager queue_counters_container(queue_name, this->get_cct());
- while (true) {
+ while (!shutdown) {
// if queue was empty the last time, sleep for idle timeout
if (is_idle) {
Timer timer(io_context);
@@ -446,7 +468,7 @@ private:
if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
|| result == EntryProcessingResult::Migrating) {
ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
- << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
+ << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " "
<< entryProcessingResultString[static_cast<unsigned int>(result)] << dendl;
remove_entries = true;
needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
@@ -588,6 +610,7 @@ private:
queue_counters_container.set(l_rgw_persistent_topic_size, entries_size);
}
}
+ ldpp_dout(this, 5) << "INFO: manager stopped. done processing for queue: " << queue_name << dendl;
}
// lits of owned queues
@@ -598,6 +621,7 @@ private:
void process_queues(spawn::yield_context yield) {
auto has_error = false;
owned_queues_t owned_queues;
+ size_t processed_queue_count = 0;
// add randomness to the duration between queue checking
// to make sure that different daemons are not synced
@@ -609,7 +633,8 @@ private:
std::vector<std::string> queue_gc;
std::mutex queue_gc_lock;
- while (true) {
+ auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+ while (!shutdown) {
Timer timer(io_context);
const auto duration = (has_error ?
std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) +
@@ -640,7 +665,7 @@ private:
failover_time,
LOCK_FLAG_MAY_RENEW);
- ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -EBUSY) {
// lock is already taken by another RGW
ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
@@ -662,12 +687,21 @@ private:
if (owned_queues.insert(queue_name).second) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
// start processing this queue
- spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
+ spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) {
+ ++processed_queue_count;
process_queue(queue_name, yield);
// if queue processing ended, it means that the queue was removed or not owned anymore
+ const auto ret = unlock_queue(queue_name, yield);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: failed to unlock queue: " << queue_name << " with error: " <<
+ ret << " (ownership would still move if not renewed)" << dendl;
+ } else {
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " not locked (ownership can move)" << dendl;
+ }
// mark it for deletion
std::lock_guard lock_guard(queue_gc_lock);
queue_gc.push_back(queue_name);
+ --processed_queue_count;
ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
}, make_stack_allocator());
} else {
@@ -680,21 +714,55 @@ private:
std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
topics_persistency_tracker.erase(queue_name);
owned_queues.erase(queue_name);
- ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " was removed" << dendl;
});
queue_gc.clear();
}
}
+ Timer timer(io_context);
+ while (processed_queue_count > 0) {
+ ldpp_dout(this, 5) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl;
+ timer.expires_from_now(std::chrono::milliseconds(queues_update_retry_ms));
+ boost::system::error_code ec;
+ timer.async_wait(yield[ec]);
+ }
+ ldpp_dout(this, 5) << "INFO: manager stopped. done processing all queues" << dendl;
}
public:
~Manager() {
+ }
+
+ void stop() {
+ shutdown = true;
work_guard.reset();
- io_context.stop();
std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
}
+ void init() {
+ spawn::spawn(io_context, [this](spawn::yield_context yield) {
+ process_queues(yield);
+ }, make_stack_allocator());
+
+ // start the worker threads to do the actual queue processing
+ const std::string WORKER_THREAD_NAME = "notif-worker";
+ for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
+ workers.emplace_back([this]() {
+ try {
+ io_context.run();
+ } catch (const std::exception& err) {
+ ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
+ throw err;
+ }
+ });
+ const auto rc = ceph_pthread_setname(workers.back().native_handle(),
+ (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
+ ceph_assert(rc == 0);
+ }
+ ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl;
+ }
+
// ctor: start all threads
Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
@@ -714,28 +782,7 @@ public:
reservations_cleanup_period_s(_reservations_cleanup_period_s),
site(site),
rados_store(*store)
- {
- spawn::spawn(io_context, [this](spawn::yield_context yield) {
- process_queues(yield);
- }, make_stack_allocator());
-
- // start the worker threads to do the actual queue processing
- const std::string WORKER_THREAD_NAME = "notif-worker";
- for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
- workers.emplace_back([this]() {
- try {
- io_context.run();
- } catch (const std::exception& err) {
- ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
- throw(err);
- }
- });
- const auto rc = ceph_pthread_setname(workers.back().native_handle(),
- (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
- ceph_assert(rc == 0);
- }
- ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
- }
+ {}
int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (topic_queue == Q_LIST_OBJECT_NAME) {
@@ -771,10 +818,7 @@ public:
}
};
-// singleton manager
-// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
-static Manager* s_manager = nullptr;
+std::unique_ptr<Manager> s_manager;
constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds
@@ -785,24 +829,31 @@ constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
-bool init(CephContext* cct, rgw::sal::RadosStore* store,
- const SiteConfig& site, const DoutPrefixProvider *dpp) {
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+ const SiteConfig& site) {
if (s_manager) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to init notification manager: already exists" << dendl;
+ return false;
+ }
+ if (!RGWPubSubEndpoint::init_all(dpp->get_cct())) {
return false;
}
// TODO: take conf from CephContext
- s_manager = new Manager(cct, MAX_QUEUE_SIZE,
+ s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE,
Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
WORKER_COUNT,
store, site);
+ s_manager->init();
return true;
}
void shutdown() {
- delete s_manager;
- s_manager = nullptr;
+ if (!s_manager) return;
+ RGWPubSubEndpoint::shutdown_all();
+ s_manager->stop();
+ s_manager.reset();
}
int add_persistent_topic(const std::string& topic_name, optional_yield y) {
@@ -842,7 +893,7 @@ int remove_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
- return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
+ return remove_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
}
rgw::sal::Object* get_object_with_attributes(
diff --git a/src/rgw/driver/rados/rgw_notify.h b/src/rgw/driver/rados/rgw_notify.h
index 7014cda3ca3..e1566d3f71d 100644
--- a/src/rgw/driver/rados/rgw_notify.h
+++ b/src/rgw/driver/rados/rgw_notify.h
@@ -26,8 +26,8 @@ namespace rgw::notify {
// initialize the notification manager
// notification manager is dequeuing the 2-phase-commit queues
// and send the notifications to the endpoints
-bool init(CephContext* cct, rgw::sal::RadosStore* store,
- const rgw::SiteConfig& site, const DoutPrefixProvider *dpp);
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+ const rgw::SiteConfig& site);
// shutdown the notification manager
void shutdown();
diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc
index d2c0824c90c..16937ef23ec 100644
--- a/src/rgw/driver/rados/rgw_pubsub_push.cc
+++ b/src/rgw/driver/rados/rgw_pubsub_push.cc
@@ -5,6 +5,7 @@
#include <string>
#include <sstream>
#include <algorithm>
+#include <curl/curl.h>
#include "common/Formatter.h"
#include "common/iso_8601.h"
#include "common/async/completion.h"
@@ -23,6 +24,8 @@
#include <functional>
#include "rgw_perf_counters.h"
+#define dout_subsys ceph_subsys_rgw_notification
+
using namespace rgw;
template<typename EventType>
@@ -52,6 +55,9 @@ bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_val
return value;
}
+static std::unique_ptr<RGWHTTPManager> s_http_manager;
+static std::shared_mutex s_http_manager_mutex;
+
class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
private:
CephContext* const cct;
@@ -83,6 +89,11 @@ public:
}
int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
+ std::shared_lock lock(s_http_manager_mutex);
+ if (!s_http_manager) {
+ ldout(cct, 1) << "ERROR: send failed. http endpoint manager not running" << dendl;
+ return -ESRCH;
+ }
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
const auto post_data = json_format_pubsub_event(event);
@@ -101,7 +112,10 @@ public:
request.set_send_length(post_data.length());
request.append_header("Content-Type", "application/json");
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
- const auto rc = RGWHTTP::process(&request, y);
+ auto rc = s_http_manager->add_request(&request);
+ if (rc == 0) {
+ rc = request.wait(y);
+ }
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
// TODO: use read_bl to process return code and handle according to ack level
return rc;
@@ -398,3 +412,48 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
return nullptr;
}
+bool init_http_manager(CephContext* cct) {
+ std::unique_lock lock(s_http_manager_mutex);
+ if (s_http_manager) return false;
+ s_http_manager = std::make_unique<RGWHTTPManager>(cct);
+ return (s_http_manager->start() == 0);
+}
+
+void shutdown_http_manager() {
+ std::unique_lock lock(s_http_manager_mutex);
+ if (s_http_manager) {
+ s_http_manager->stop();
+ s_http_manager.reset();
+ }
+}
+
+bool RGWPubSubEndpoint::init_all(CephContext* cct) {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ if (!amqp::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to init amqp endpoint manager" << dendl;
+ return false;
+ }
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ if (!kafka::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to init kafka endpoint manager" << dendl;
+ return false;
+ }
+#endif
+ if (!init_http_manager(cct)) {
+ ldout(cct, 1) << "ERROR: failed to init http endpoint manager" << dendl;
+ return false;
+ }
+ return true;
+}
+
+void RGWPubSubEndpoint::shutdown_all() {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ amqp::shutdown();
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ kafka::shutdown();
+#endif
+ shutdown_http_manager();
+}
+
diff --git a/src/rgw/driver/rados/rgw_pubsub_push.h b/src/rgw/driver/rados/rgw_pubsub_push.h
index 040be3dd428..bacebfba44c 100644
--- a/src/rgw/driver/rados/rgw_pubsub_push.h
+++ b/src/rgw/driver/rados/rgw_pubsub_push.h
@@ -40,5 +40,11 @@ public:
configuration_error(const std::string& what_arg) :
std::logic_error("pubsub endpoint configuration error: " + what_arg) {}
};
+
+ // init all supported endpoints
+ static bool init_all(CephContext* cct);
+ // shutdown all supported endpoints
+ static void shutdown_all();
+
};
diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc
index 95f05f149a0..45d50452e03 100644
--- a/src/rgw/driver/rados/rgw_rados.cc
+++ b/src/rgw/driver/rados/rgw_rados.cc
@@ -1356,10 +1356,8 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
index_completion_manager = new RGWIndexCompletionManager(this);
if (run_notification_thread) {
- ret = rgw::notify::init(cct, driver, *svc.site, dpp);
- if (ret < 0 ) {
- ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;
- return ret;
+ if (!rgw::notify::init(dpp, driver, *svc.site)) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl;
}
using namespace rgw;
diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc
index 46ab9c575bd..ea824b8295a 100644
--- a/src/rgw/rgw_amqp.cc
+++ b/src/rgw/rgw_amqp.cc
@@ -966,8 +966,8 @@ public:
// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;
+static std::shared_mutex s_manager_mutex;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
@@ -977,6 +977,7 @@ static const unsigned IDLE_TIME_MS = 100;
static const unsigned RECONNECT_TIME_MS = 100;
bool init(CephContext* cct) {
+ std::unique_lock lock(s_manager_mutex);
if (s_manager) {
return false;
}
@@ -987,12 +988,14 @@ bool init(CephContext* cct) {
}
void shutdown() {
+ std::unique_lock lock(s_manager_mutex);
delete s_manager;
s_manager = nullptr;
}
bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
boost::optional<const std::string&> ca_location) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
}
@@ -1000,6 +1003,7 @@ bool connect(connection_id_t& conn_id, const std::string& url, const std::string
int publish(const connection_id_t& conn_id,
const std::string& topic,
const std::string& message) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish(conn_id, topic, message);
}
@@ -1008,41 +1012,49 @@ int publish_with_confirm(const connection_id_t& conn_id,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish_with_confirm(conn_id, topic, message, cb);
}
size_t get_connection_count() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_connection_count();
}
size_t get_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_inflight();
}
size_t get_queued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_queued();
}
size_t get_dequeued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_dequeued();
}
size_t get_max_connections() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
return s_manager->max_connections;
}
size_t get_max_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_INFLIGHT_DEFAULT;
return s_manager->max_inflight;
}
size_t get_max_queue() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_QUEUE_DEFAULT;
return s_manager->max_queue;
}
diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc
index 8266985ece4..9969811083e 100644
--- a/src/rgw/rgw_appmain.cc
+++ b/src/rgw/rgw_appmain.cc
@@ -59,12 +59,6 @@
#include "rgw_kmip_client_impl.h"
#include "rgw_perf_counters.h"
#include "rgw_signal.h"
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-#include "rgw_amqp.h"
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-#include "rgw_kafka.h"
-#endif
#ifdef WITH_ARROW_FLIGHT
#include "rgw_flight_frontend.h"
#endif
@@ -554,20 +548,6 @@ void rgw::AppMain::init_tracepoints()
tracing::rgw::tracer.init(dpp->get_cct(), "rgw");
} /* init_tracepoints() */
-void rgw::AppMain::init_notification_endpoints()
-{
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- if (!rgw::amqp::init(dpp->get_cct())) {
- derr << "ERROR: failed to initialize AMQP manager" << dendl;
- }
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
- if (!rgw::kafka::init(dpp->get_cct())) {
- derr << "ERROR: failed to initialize Kafka manager" << dendl;
- }
-#endif
-} /* init_notification_endpoints */
-
void rgw::AppMain::init_lua()
{
rgw::sal::Driver* driver = env.driver;
@@ -644,12 +624,6 @@ void rgw::AppMain::shutdown(std::function<void(void)> finalize_async_signals)
rgw::curl::cleanup_curl();
g_conf().remove_observer(implicit_tenant_context.get());
implicit_tenant_context.reset(); // deletes
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- rgw::amqp::shutdown();
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
- rgw::kafka::shutdown();
-#endif
rgw_perf_stop(g_ceph_context);
ratelimiter.reset(); // deletes--ensure this happens before we destruct
} /* AppMain::shutdown */
diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc
index ce496e414a7..ce1b273d8b7 100644
--- a/src/rgw/rgw_kafka.cc
+++ b/src/rgw/rgw_kafka.cc
@@ -688,14 +688,15 @@ public:
// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;
+static std::shared_mutex s_manager_mutex;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
bool init(CephContext* cct) {
+ std::unique_lock lock(s_manager_mutex);
if (s_manager) {
return false;
}
@@ -705,6 +706,7 @@ bool init(CephContext* cct) {
}
void shutdown() {
+ std::unique_lock lock(s_manager_mutex);
delete s_manager;
s_manager = nullptr;
}
@@ -714,6 +716,7 @@ bool connect(std::string& broker, const std::string& url, bool use_ssl, bool ver
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
boost::optional<const std::string&> password) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
}
@@ -721,6 +724,7 @@ bool connect(std::string& broker, const std::string& url, bool use_ssl, bool ver
int publish(const std::string& conn_name,
const std::string& topic,
const std::string& message) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return STATUS_MANAGER_STOPPED;
return s_manager->publish(conn_name, topic, message);
}
@@ -729,41 +733,49 @@ int publish_with_confirm(const std::string& conn_name,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return STATUS_MANAGER_STOPPED;
return s_manager->publish_with_confirm(conn_name, topic, message, cb);
}
size_t get_connection_count() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_connection_count();
}
size_t get_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_inflight();
}
size_t get_queued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_queued();
}
size_t get_dequeued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_dequeued();
}
size_t get_max_connections() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
return s_manager->max_connections;
}
size_t get_max_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_INFLIGHT_DEFAULT;
return s_manager->max_inflight;
}
size_t get_max_queue() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_QUEUE_DEFAULT;
return s_manager->max_queue;
}
diff --git a/src/rgw/rgw_lib.cc b/src/rgw/rgw_lib.cc
index 665a03b6279..59443c52b02 100644
--- a/src/rgw/rgw_lib.cc
+++ b/src/rgw/rgw_lib.cc
@@ -548,7 +548,6 @@ namespace rgw {
return r;
}
- main.init_notification_endpoints();
main.init_lua();
return 0;
diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc
index 6d0dab8245a..5536e2810f6 100644
--- a/src/rgw/rgw_main.cc
+++ b/src/rgw/rgw_main.cc
@@ -168,7 +168,6 @@ int main(int argc, char *argv[])
main.shutdown();
return r;
}
- main.init_notification_endpoints();
#if defined(HAVE_SYS_PRCTL_H)
if (prctl(PR_SET_DUMPABLE, 1) == -1) {
diff --git a/src/rgw/rgw_main.h b/src/rgw/rgw_main.h
index caa6a082282..d106485855a 100644
--- a/src/rgw/rgw_main.h
+++ b/src/rgw/rgw_main.h
@@ -114,7 +114,6 @@ public:
void init_opslog();
int init_frontends2(RGWLib* rgwlib = nullptr);
void init_tracepoints();
- void init_notification_endpoints();
void init_lua();
bool have_http() {
diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py
index 72be3594d3b..6363d9b84ec 100644
--- a/src/test/rgw/bucket_notification/test_bn.py
+++ b/src/test/rgw/bucket_notification/test_bn.py
@@ -2957,8 +2957,9 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None):
parsed_result = json.loads(result[0])
entries = parsed_result['Topic Stats']['Entries']
retries += 1
+ time_diff = time.time() - start_time
+ log.info('queue %s has %d entries after %ds', topic_name, entries, time_diff)
if retries > 30:
- time_diff = time.time() - start_time
log.warning('queue %s still has %d entries after %ds', topic_name, entries, time_diff)
assert_equal(entries, 0)
time.sleep(5)
@@ -4510,6 +4511,111 @@ def test_ps_s3_notification_push_kafka_security_sasl_scram():
kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
+@attr('http_test')
+def test_persistent_ps_s3_reload():
+ """ do a realm reload while we send notifications """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ # make sure there is nothing to migrate
+ print('delete all topics')
+ delete_all_topics(conn, '', get_config_cluster())
+
+ # create random port for the http server
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ print('start http server')
+ http_server = HTTPServerWithEvents((host, http_port), delay=2)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+
+ # create s3 topics
+ endpoint_address = 'http://'+host+':'+str(http_port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ # 2nd topic is unused
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
+ topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name1], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ wait_for_queue_to_drain(topic_name1)
+
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('another-key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # do a reload
+ print('do reload')
+ result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ wait_for_queue_to_drain(topic_name1)
+ # verify events
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=False)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ # delete objects from the bucket
+ client_threads = []
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
@attr('data_path_v2_test')
def test_persistent_ps_s3_data_path_v2_migration():
""" test data path v2 persistent migration """