diff options
author | Yuval Lifshitz <ylifshit@ibm.com> | 2024-04-18 11:39:42 +0200 |
---|---|---|
committer | Yuval Lifshitz <ylifshit@ibm.com> | 2024-05-12 12:39:08 +0200 |
commit | 15536cf7d75b40755a9a291498e25bf8c3d55fbf (patch) | |
tree | 85e3140827df391d8e79b948dd16712eb4685f99 | |
parent | rgw/notifications: cleanup includes and unused parameters (diff) | |
download | ceph-15536cf7d75b40755a9a291498e25bf8c3d55fbf.tar.xz ceph-15536cf7d75b40755a9a291498e25bf8c3d55fbf.zip |
rgw/notifications: start/stop endpoint managers in notification manager
Fixes: https://tracker.ceph.com/issues/65337
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
-rw-r--r-- | src/rgw/driver/rados/rgw_notify.cc | 131 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_notify.h | 4 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_pubsub_push.cc | 61 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_pubsub_push.h | 6 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_rados.cc | 6 | ||||
-rw-r--r-- | src/rgw/rgw_amqp.cc | 14 | ||||
-rw-r--r-- | src/rgw/rgw_appmain.cc | 26 | ||||
-rw-r--r-- | src/rgw/rgw_kafka.cc | 14 | ||||
-rw-r--r-- | src/rgw/rgw_lib.cc | 1 | ||||
-rw-r--r-- | src/rgw/rgw_main.cc | 1 | ||||
-rw-r--r-- | src/rgw/rgw_main.h | 1 | ||||
-rw-r--r-- | src/test/rgw/bucket_notification/test_bn.py | 108 |
12 files changed, 294 insertions, 79 deletions
diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 43481629bdf..c70693895bf 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -123,6 +123,7 @@ void publish_commit_completion(rados_completion_t completion, void *arg) { }; class Manager : public DoutPrefixProvider { + bool shutdown = false; const size_t max_queue_size; const uint32_t queues_update_period_ms; const uint32_t queues_update_retry_ms; @@ -311,7 +312,7 @@ private: // clean stale reservation from queue void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) { - while (true) { + while (!shutdown) { ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl; const auto now = ceph::coarse_real_time::clock::now(); const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s); @@ -346,6 +347,27 @@ private: boost::system::error_code ec; timer.async_wait(yield[ec]); } + ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl; + } + + // unlock (lose ownership) queue + int unlock_queue(const std::string& queue_name, spawn::yield_context yield) { + librados::ObjectWriteOperation op; + op.assert_exists(); + rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie); + auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); + const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + if (ret == -ENOENT) { + ldpp_dout(this, 10) << "INFO: queue: " << queue_name + << ". was removed. nothing to unlock" << dendl; + return 0; + } + if (ret == -EBUSY) { + ldpp_dout(this, 10) << "INFO: queue: " << queue_name + << ". already owned by another RGW. no need to unlock" << dendl; + return 0; + } + return ret; } // processing of a specific queue @@ -361,7 +383,7 @@ private: CountersManager queue_counters_container(queue_name, this->get_cct()); - while (true) { + while (!shutdown) { // if queue was empty the last time, sleep for idle timeout if (is_idle) { Timer timer(io_context); @@ -446,7 +468,7 @@ private: if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired || result == EntryProcessingResult::Migrating) { ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker - << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name + << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " " << entryProcessingResultString[static_cast<unsigned int>(result)] << dendl; remove_entries = true; needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating); @@ -588,6 +610,7 @@ private: queue_counters_container.set(l_rgw_persistent_topic_size, entries_size); } } + ldpp_dout(this, 5) << "INFO: manager stopped. done processing for queue: " << queue_name << dendl; } // lits of owned queues @@ -598,6 +621,7 @@ private: void process_queues(spawn::yield_context yield) { auto has_error = false; owned_queues_t owned_queues; + size_t processed_queue_count = 0; // add randomness to the duration between queue checking // to make sure that different daemons are not synced @@ -609,7 +633,8 @@ private: std::vector<std::string> queue_gc; std::mutex queue_gc_lock; - while (true) { + auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); + while (!shutdown) { Timer timer(io_context); const auto duration = (has_error ? std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + @@ -640,7 +665,7 @@ private: failover_time, LOCK_FLAG_MAY_RENEW); - ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield)); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); if (ret == -EBUSY) { // lock is already taken by another RGW ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl; @@ -662,12 +687,21 @@ private: if (owned_queues.insert(queue_name).second) { ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl; // start processing this queue - spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) { + spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) { + ++processed_queue_count; process_queue(queue_name, yield); // if queue processing ended, it means that the queue was removed or not owned anymore + const auto ret = unlock_queue(queue_name, yield); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: failed to unlock queue: " << queue_name << " with error: " << + ret << " (ownership would still move if not renewed)" << dendl; + } else { + ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " not locked (ownership can move)" << dendl; + } // mark it for deletion std::lock_guard lock_guard(queue_gc_lock); queue_gc.push_back(queue_name); + --processed_queue_count; ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl; }, make_stack_allocator()); } else { @@ -680,21 +714,55 @@ private: std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) { topics_persistency_tracker.erase(queue_name); owned_queues.erase(queue_name); - ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl; + ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " was removed" << dendl; }); queue_gc.clear(); } } + Timer timer(io_context); + while (processed_queue_count > 0) { + ldpp_dout(this, 5) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl; + timer.expires_from_now(std::chrono::milliseconds(queues_update_retry_ms)); + boost::system::error_code ec; + timer.async_wait(yield[ec]); + } + ldpp_dout(this, 5) << "INFO: manager stopped. done processing all queues" << dendl; } public: ~Manager() { + } + + void stop() { + shutdown = true; work_guard.reset(); - io_context.stop(); std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); }); } + void init() { + spawn::spawn(io_context, [this](spawn::yield_context yield) { + process_queues(yield); + }, make_stack_allocator()); + + // start the worker threads to do the actual queue processing + const std::string WORKER_THREAD_NAME = "notif-worker"; + for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) { + workers.emplace_back([this]() { + try { + io_context.run(); + } catch (const std::exception& err) { + ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl; + throw err; + } + }); + const auto rc = ceph_pthread_setname(workers.back().native_handle(), + (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str()); + ceph_assert(rc == 0); + } + ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl; + } + // ctor: start all threads Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, @@ -714,28 +782,7 @@ public: reservations_cleanup_period_s(_reservations_cleanup_period_s), site(site), rados_store(*store) - { - spawn::spawn(io_context, [this](spawn::yield_context yield) { - process_queues(yield); - }, make_stack_allocator()); - - // start the worker threads to do the actual queue processing - const std::string WORKER_THREAD_NAME = "notif-worker"; - for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) { - workers.emplace_back([this]() { - try { - io_context.run(); - } catch (const std::exception& err) { - ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl; - throw(err); - } - }); - const auto rc = ceph_pthread_setname(workers.back().native_handle(), - (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str()); - ceph_assert(rc == 0); - } - ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl; - } + {} int add_persistent_topic(const std::string& topic_queue, optional_yield y) { if (topic_queue == Q_LIST_OBJECT_NAME) { @@ -771,10 +818,7 @@ public: } }; -// singleton manager -// note that the manager itself is not a singleton, and multiple instances may co-exist -// TODO make the pointer atomic in allocation and deallocation to avoid race conditions -static Manager* s_manager = nullptr; +std::unique_ptr<Manager> s_manager; constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds @@ -785,24 +829,31 @@ constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds -bool init(CephContext* cct, rgw::sal::RadosStore* store, - const SiteConfig& site, const DoutPrefixProvider *dpp) { +bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store, + const SiteConfig& site) { if (s_manager) { + ldpp_dout(dpp, 1) << "ERROR: failed to init notification manager: already exists" << dendl; + return false; + } + if (!RGWPubSubEndpoint::init_all(dpp->get_cct())) { return false; } // TODO: take conf from CephContext - s_manager = new Manager(cct, MAX_QUEUE_SIZE, + s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE, Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S, WORKER_COUNT, store, site); + s_manager->init(); return true; } void shutdown() { - delete s_manager; - s_manager = nullptr; + if (!s_manager) return; + RGWPubSubEndpoint::shutdown_all(); + s_manager->stop(); + s_manager.reset(); } int add_persistent_topic(const std::string& topic_name, optional_yield y) { @@ -842,7 +893,7 @@ int remove_persistent_topic(const std::string& topic_queue, optional_yield y) { if (!s_manager) { return -EAGAIN; } - return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y); + return remove_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y); } rgw::sal::Object* get_object_with_attributes( diff --git a/src/rgw/driver/rados/rgw_notify.h b/src/rgw/driver/rados/rgw_notify.h index 7014cda3ca3..e1566d3f71d 100644 --- a/src/rgw/driver/rados/rgw_notify.h +++ b/src/rgw/driver/rados/rgw_notify.h @@ -26,8 +26,8 @@ namespace rgw::notify { // initialize the notification manager // notification manager is dequeuing the 2-phase-commit queues // and send the notifications to the endpoints -bool init(CephContext* cct, rgw::sal::RadosStore* store, - const rgw::SiteConfig& site, const DoutPrefixProvider *dpp); +bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store, + const rgw::SiteConfig& site); // shutdown the notification manager void shutdown(); diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index d2c0824c90c..16937ef23ec 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -5,6 +5,7 @@ #include <string> #include <sstream> #include <algorithm> +#include <curl/curl.h> #include "common/Formatter.h" #include "common/iso_8601.h" #include "common/async/completion.h" @@ -23,6 +24,8 @@ #include <functional> #include "rgw_perf_counters.h" +#define dout_subsys ceph_subsys_rgw_notification + using namespace rgw; template<typename EventType> @@ -52,6 +55,9 @@ bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_val return value; } +static std::unique_ptr<RGWHTTPManager> s_http_manager; +static std::shared_mutex s_http_manager_mutex; + class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint { private: CephContext* const cct; @@ -83,6 +89,11 @@ public: } int send(const rgw_pubsub_s3_event& event, optional_yield y) override { + std::shared_lock lock(s_http_manager_mutex); + if (!s_http_manager) { + ldout(cct, 1) << "ERROR: send failed. http endpoint manager not running" << dendl; + return -ESRCH; + } bufferlist read_bl; RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); const auto post_data = json_format_pubsub_event(event); @@ -101,7 +112,10 @@ public: request.set_send_length(post_data.length()); request.append_header("Content-Type", "application/json"); if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending); - const auto rc = RGWHTTP::process(&request, y); + auto rc = s_http_manager->add_request(&request); + if (rc == 0) { + rc = request.wait(y); + } if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending); // TODO: use read_bl to process return code and handle according to ack level return rc; @@ -398,3 +412,48 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, return nullptr; } +bool init_http_manager(CephContext* cct) { + std::unique_lock lock(s_http_manager_mutex); + if (s_http_manager) return false; + s_http_manager = std::make_unique<RGWHTTPManager>(cct); + return (s_http_manager->start() == 0); +} + +void shutdown_http_manager() { + std::unique_lock lock(s_http_manager_mutex); + if (s_http_manager) { + s_http_manager->stop(); + s_http_manager.reset(); + } +} + +bool RGWPubSubEndpoint::init_all(CephContext* cct) { +#ifdef WITH_RADOSGW_AMQP_ENDPOINT + if (!amqp::init(cct)) { + ldout(cct, 1) << "ERROR: failed to init amqp endpoint manager" << dendl; + return false; + } +#endif +#ifdef WITH_RADOSGW_KAFKA_ENDPOINT + if (!kafka::init(cct)) { + ldout(cct, 1) << "ERROR: failed to init kafka endpoint manager" << dendl; + return false; + } +#endif + if (!init_http_manager(cct)) { + ldout(cct, 1) << "ERROR: failed to init http endpoint manager" << dendl; + return false; + } + return true; +} + +void RGWPubSubEndpoint::shutdown_all() { +#ifdef WITH_RADOSGW_AMQP_ENDPOINT + amqp::shutdown(); +#endif +#ifdef WITH_RADOSGW_KAFKA_ENDPOINT + kafka::shutdown(); +#endif + shutdown_http_manager(); +} + diff --git a/src/rgw/driver/rados/rgw_pubsub_push.h b/src/rgw/driver/rados/rgw_pubsub_push.h index 040be3dd428..bacebfba44c 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.h +++ b/src/rgw/driver/rados/rgw_pubsub_push.h @@ -40,5 +40,11 @@ public: configuration_error(const std::string& what_arg) : std::logic_error("pubsub endpoint configuration error: " + what_arg) {} }; + + // init all supported endpoints + static bool init_all(CephContext* cct); + // shutdown all supported endpoints + static void shutdown_all(); + }; diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 95f05f149a0..45d50452e03 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1356,10 +1356,8 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) index_completion_manager = new RGWIndexCompletionManager(this); if (run_notification_thread) { - ret = rgw::notify::init(cct, driver, *svc.site, dpp); - if (ret < 0 ) { - ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl; - return ret; + if (!rgw::notify::init(dpp, driver, *svc.site)) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl; } using namespace rgw; diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index 46ab9c575bd..ea824b8295a 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -966,8 +966,8 @@ public: // singleton manager // note that the manager itself is not a singleton, and multiple instances may co-exist -// TODO make the pointer atomic in allocation and deallocation to avoid race conditions static Manager* s_manager = nullptr; +static std::shared_mutex s_manager_mutex; static const size_t MAX_CONNECTIONS_DEFAULT = 256; static const size_t MAX_INFLIGHT_DEFAULT = 8192; @@ -977,6 +977,7 @@ static const unsigned IDLE_TIME_MS = 100; static const unsigned RECONNECT_TIME_MS = 100; bool init(CephContext* cct) { + std::unique_lock lock(s_manager_mutex); if (s_manager) { return false; } @@ -987,12 +988,14 @@ bool init(CephContext* cct) { } void shutdown() { + std::unique_lock lock(s_manager_mutex); delete s_manager; s_manager = nullptr; } bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl, boost::optional<const std::string&> ca_location) { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return false; return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location); } @@ -1000,6 +1003,7 @@ bool connect(connection_id_t& conn_id, const std::string& url, const std::string int publish(const connection_id_t& conn_id, const std::string& topic, const std::string& message) { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; return s_manager->publish(conn_id, topic, message); } @@ -1008,41 +1012,49 @@ int publish_with_confirm(const connection_id_t& conn_id, const std::string& topic, const std::string& message, reply_callback_t cb) { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED; return s_manager->publish_with_confirm(conn_id, topic, message, cb); } size_t get_connection_count() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_connection_count(); } size_t get_inflight() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_inflight(); } size_t get_queued() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_queued(); } size_t get_dequeued() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_dequeued(); } size_t get_max_connections() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return MAX_CONNECTIONS_DEFAULT; return s_manager->max_connections; } size_t get_max_inflight() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return MAX_INFLIGHT_DEFAULT; return s_manager->max_inflight; } size_t get_max_queue() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return MAX_QUEUE_DEFAULT; return s_manager->max_queue; } diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 8266985ece4..9969811083e 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -59,12 +59,6 @@ #include "rgw_kmip_client_impl.h" #include "rgw_perf_counters.h" #include "rgw_signal.h" -#ifdef WITH_RADOSGW_AMQP_ENDPOINT -#include "rgw_amqp.h" -#endif -#ifdef WITH_RADOSGW_KAFKA_ENDPOINT -#include "rgw_kafka.h" -#endif #ifdef WITH_ARROW_FLIGHT #include "rgw_flight_frontend.h" #endif @@ -554,20 +548,6 @@ void rgw::AppMain::init_tracepoints() tracing::rgw::tracer.init(dpp->get_cct(), "rgw"); } /* init_tracepoints() */ -void rgw::AppMain::init_notification_endpoints() -{ -#ifdef WITH_RADOSGW_AMQP_ENDPOINT - if (!rgw::amqp::init(dpp->get_cct())) { - derr << "ERROR: failed to initialize AMQP manager" << dendl; - } -#endif -#ifdef WITH_RADOSGW_KAFKA_ENDPOINT - if (!rgw::kafka::init(dpp->get_cct())) { - derr << "ERROR: failed to initialize Kafka manager" << dendl; - } -#endif -} /* init_notification_endpoints */ - void rgw::AppMain::init_lua() { rgw::sal::Driver* driver = env.driver; @@ -644,12 +624,6 @@ void rgw::AppMain::shutdown(std::function<void(void)> finalize_async_signals) rgw::curl::cleanup_curl(); g_conf().remove_observer(implicit_tenant_context.get()); implicit_tenant_context.reset(); // deletes -#ifdef WITH_RADOSGW_AMQP_ENDPOINT - rgw::amqp::shutdown(); -#endif -#ifdef WITH_RADOSGW_KAFKA_ENDPOINT - rgw::kafka::shutdown(); -#endif rgw_perf_stop(g_ceph_context); ratelimiter.reset(); // deletes--ensure this happens before we destruct } /* AppMain::shutdown */ diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index ce496e414a7..ce1b273d8b7 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -688,14 +688,15 @@ public: // singleton manager // note that the manager itself is not a singleton, and multiple instances may co-exist -// TODO make the pointer atomic in allocation and deallocation to avoid race conditions static Manager* s_manager = nullptr; +static std::shared_mutex s_manager_mutex; static const size_t MAX_CONNECTIONS_DEFAULT = 256; static const size_t MAX_INFLIGHT_DEFAULT = 8192; static const size_t MAX_QUEUE_DEFAULT = 8192; bool init(CephContext* cct) { + std::unique_lock lock(s_manager_mutex); if (s_manager) { return false; } @@ -705,6 +706,7 @@ bool init(CephContext* cct) { } void shutdown() { + std::unique_lock lock(s_manager_mutex); delete s_manager; s_manager = nullptr; } @@ -714,6 +716,7 @@ bool connect(std::string& broker, const std::string& url, bool use_ssl, bool ver boost::optional<const std::string&> mechanism, boost::optional<const std::string&> user_name, boost::optional<const std::string&> password) { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return false; return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password); } @@ -721,6 +724,7 @@ bool connect(std::string& broker, const std::string& url, bool use_ssl, bool ver int publish(const std::string& conn_name, const std::string& topic, const std::string& message) { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return STATUS_MANAGER_STOPPED; return s_manager->publish(conn_name, topic, message); } @@ -729,41 +733,49 @@ int publish_with_confirm(const std::string& conn_name, const std::string& topic, const std::string& message, reply_callback_t cb) { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return STATUS_MANAGER_STOPPED; return s_manager->publish_with_confirm(conn_name, topic, message, cb); } size_t get_connection_count() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_connection_count(); } size_t get_inflight() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_inflight(); } size_t get_queued() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_queued(); } size_t get_dequeued() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return 0; return s_manager->get_dequeued(); } size_t get_max_connections() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return MAX_CONNECTIONS_DEFAULT; return s_manager->max_connections; } size_t get_max_inflight() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return MAX_INFLIGHT_DEFAULT; return s_manager->max_inflight; } size_t get_max_queue() { + std::shared_lock lock(s_manager_mutex); if (!s_manager) return MAX_QUEUE_DEFAULT; return s_manager->max_queue; } diff --git a/src/rgw/rgw_lib.cc b/src/rgw/rgw_lib.cc index 665a03b6279..59443c52b02 100644 --- a/src/rgw/rgw_lib.cc +++ b/src/rgw/rgw_lib.cc @@ -548,7 +548,6 @@ namespace rgw { return r; } - main.init_notification_endpoints(); main.init_lua(); return 0; diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 6d0dab8245a..5536e2810f6 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -168,7 +168,6 @@ int main(int argc, char *argv[]) main.shutdown(); return r; } - main.init_notification_endpoints(); #if defined(HAVE_SYS_PRCTL_H) if (prctl(PR_SET_DUMPABLE, 1) == -1) { diff --git a/src/rgw/rgw_main.h b/src/rgw/rgw_main.h index caa6a082282..d106485855a 100644 --- a/src/rgw/rgw_main.h +++ b/src/rgw/rgw_main.h @@ -114,7 +114,6 @@ public: void init_opslog(); int init_frontends2(RGWLib* rgwlib = nullptr); void init_tracepoints(); - void init_notification_endpoints(); void init_lua(); bool have_http() { diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 72be3594d3b..6363d9b84ec 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -2957,8 +2957,9 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None): parsed_result = json.loads(result[0]) entries = parsed_result['Topic Stats']['Entries'] retries += 1 + time_diff = time.time() - start_time + log.info('queue %s has %d entries after %ds', topic_name, entries, time_diff) if retries > 30: - time_diff = time.time() - start_time log.warning('queue %s still has %d entries after %ds', topic_name, entries, time_diff) assert_equal(entries, 0) time.sleep(5) @@ -4510,6 +4511,111 @@ def test_ps_s3_notification_push_kafka_security_sasl_scram(): kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256') +@attr('http_test') +def test_persistent_ps_s3_reload(): + """ do a realm reload while we send notifications """ + conn = connection() + zonegroup = get_config_zonegroup() + + # make sure there is nothing to migrate + print('delete all topics') + delete_all_topics(conn, '', get_config_cluster()) + + # create random port for the http server + host = get_ip() + http_port = random.randint(10000, 20000) + print('start http server') + http_server = HTTPServerWithEvents((host, http_port), delay=2) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' + + # create s3 topics + endpoint_address = 'http://'+host+':'+str(http_port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' + topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args) + topic_arn1 = topic_conf1.set_config() + # 2nd topic is unused + topic_name2 = bucket_name + TOPIC_SUFFIX + '_2' + topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args) + topic_arn2 = topic_conf2.set_config() + + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name1], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + wait_for_queue_to_drain(topic_name1) + + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('another-key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # do a reload + print('do reload') + result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + wait_for_queue_to_drain(topic_name1) + # verify events + keys = list(bucket.list()) + http_server.verify_s3_events(keys, exact_match=False) + + # cleanup + s3_notification_conf.del_config() + topic_conf1.del_config() + topic_conf2.del_config() + # delete objects from the bucket + client_threads = [] + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() + + @attr('data_path_v2_test') def test_persistent_ps_s3_data_path_v2_migration(): """ test data path v2 persistent migration """ |