summaryrefslogtreecommitdiffstats
path: root/src/libcephfs_proxy/proxy_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcephfs_proxy/proxy_manager.c')
-rw-r--r--src/libcephfs_proxy/proxy_manager.c247
1 files changed, 247 insertions, 0 deletions
diff --git a/src/libcephfs_proxy/proxy_manager.c b/src/libcephfs_proxy/proxy_manager.c
new file mode 100644
index 00000000000..ea57083e700
--- /dev/null
+++ b/src/libcephfs_proxy/proxy_manager.c
@@ -0,0 +1,247 @@
+
+#include <signal.h>
+
+#include "proxy_manager.h"
+#include "proxy_helpers.h"
+#include "proxy_list.h"
+#include "proxy_log.h"
+
+static void proxy_manager_signal_handler(int32_t signum, siginfo_t *info,
+ void *ctx)
+{
+}
+
+static void proxy_worker_register(proxy_worker_t *worker)
+{
+ proxy_manager_t *manager;
+
+ manager = worker->manager;
+
+ proxy_mutex_lock(&manager->mutex);
+
+ list_add_tail(&worker->list, &manager->workers);
+
+ proxy_mutex_unlock(&manager->mutex);
+}
+
+static void proxy_worker_deregister(proxy_worker_t *worker)
+{
+ proxy_manager_t *manager;
+
+ manager = worker->manager;
+
+ proxy_mutex_lock(&manager->mutex);
+
+ list_del_init(&worker->list);
+ if (list_empty(&manager->workers)) {
+ proxy_condition_signal(&manager->condition);
+ }
+
+ proxy_mutex_unlock(&manager->mutex);
+}
+
+static void proxy_worker_finished(proxy_worker_t *worker)
+{
+ proxy_manager_t *manager;
+
+ manager = worker->manager;
+
+ proxy_mutex_lock(&manager->mutex);
+
+ if (list_empty(&manager->finished)) {
+ proxy_condition_signal(&manager->condition);
+ }
+
+ list_move_tail(&worker->list, &manager->finished);
+
+ proxy_mutex_unlock(&manager->mutex);
+}
+
+static void *proxy_worker_start(void *arg)
+{
+ proxy_worker_t *worker;
+
+ worker = arg;
+
+ worker->start(worker);
+
+ proxy_worker_finished(worker);
+
+ return NULL;
+}
+
+static void *proxy_manager_start(void *arg)
+{
+ proxy_manager_t *manager;
+ proxy_worker_t *worker;
+
+ manager = arg;
+
+ proxy_mutex_lock(&manager->mutex);
+
+ while (true) {
+ while (!list_empty(&manager->finished)) {
+ worker = list_first_entry(&manager->finished,
+ proxy_worker_t, list);
+ list_del_init(&worker->list);
+
+ proxy_mutex_unlock(&manager->mutex);
+
+ proxy_thread_join(worker->tid);
+
+ if (worker->destroy != NULL) {
+ worker->destroy(worker);
+ }
+
+ proxy_mutex_lock(&manager->mutex);
+ }
+
+ if (manager->stop && list_empty(&manager->workers)) {
+ break;
+ }
+
+ proxy_condition_wait(&manager->condition, &manager->mutex);
+ }
+
+ manager->done = true;
+ proxy_condition_signal(&manager->condition);
+
+ proxy_mutex_unlock(&manager->mutex);
+
+ return NULL;
+}
+
+static int32_t proxy_manager_init(proxy_manager_t *manager)
+{
+ int32_t err;
+
+ list_init(&manager->workers);
+ list_init(&manager->finished);
+
+ manager->stop = false;
+ manager->done = false;
+
+ manager->main_tid = pthread_self();
+
+ err = proxy_mutex_init(&manager->mutex);
+ if (err < 0) {
+ return err;
+ }
+
+ err = proxy_condition_init(&manager->condition);
+ if (err < 0) {
+ pthread_mutex_destroy(&manager->mutex);
+ }
+
+ return err;
+}
+
+static void proxy_manager_destroy(proxy_manager_t *manager)
+{
+ pthread_cond_destroy(&manager->condition);
+ pthread_mutex_destroy(&manager->mutex);
+}
+
+static int32_t proxy_manager_setup_signals(struct sigaction *old)
+{
+ struct sigaction action;
+
+ /* The CONT signal will be used to wake threads blocked in I/O. */
+ memset(&action, 0, sizeof(action));
+ action.sa_flags = SA_SIGINFO;
+ action.sa_sigaction = proxy_manager_signal_handler;
+
+ return proxy_signal_set(SIGCONT, &action, old);
+}
+
+static void proxy_manager_restore_signals(struct sigaction *action)
+{
+ proxy_signal_set(SIGCONT, action, NULL);
+}
+
+static void proxy_manager_terminate(proxy_manager_t *manager)
+{
+ proxy_worker_t *worker;
+
+ proxy_mutex_lock(&manager->mutex);
+
+ list_for_each_entry(worker, &manager->workers, list) {
+ worker->stop = true;
+ proxy_thread_kill(worker->tid, SIGCONT);
+ }
+
+ while (!manager->done) {
+ proxy_condition_wait(&manager->condition, &manager->mutex);
+ }
+
+ proxy_mutex_unlock(&manager->mutex);
+
+ proxy_thread_join(manager->tid);
+}
+
+int32_t proxy_manager_run(proxy_manager_t *manager, proxy_manager_start_t start)
+{
+ struct sigaction old_action;
+ int32_t err;
+
+ err = proxy_manager_init(manager);
+ if (err < 0) {
+ return err;
+ }
+
+ err = proxy_manager_setup_signals(&old_action);
+ if (err < 0) {
+ goto done_destroy;
+ }
+
+ err = proxy_thread_create(&manager->tid, proxy_manager_start, manager);
+ if (err < 0) {
+ goto done_signal;
+ }
+
+ err = start(manager);
+
+ proxy_manager_terminate(manager);
+
+done_signal:
+ proxy_manager_restore_signals(&old_action);
+
+done_destroy:
+ proxy_manager_destroy(manager);
+
+ return err;
+}
+
+void proxy_manager_shutdown(proxy_manager_t *manager)
+{
+ proxy_mutex_lock(&manager->mutex);
+
+ manager->stop = true;
+ proxy_condition_signal(&manager->condition);
+
+ proxy_mutex_unlock(&manager->mutex);
+
+ /* Wake the thread if it was blocked in an I/O operation. */
+ proxy_thread_kill(manager->main_tid, SIGCONT);
+}
+
+int32_t proxy_manager_launch(proxy_manager_t *manager, proxy_worker_t *worker,
+ proxy_worker_start_t start,
+ proxy_worker_destroy_t destroy)
+{
+ int32_t err;
+
+ worker->manager = manager;
+ worker->start = start;
+ worker->destroy = destroy;
+ worker->stop = false;
+
+ proxy_worker_register(worker);
+
+ err = proxy_thread_create(&worker->tid, proxy_worker_start, worker);
+ if (err < 0) {
+ proxy_worker_deregister(worker);
+ }
+
+ return err;
+}