// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_WORKQUEUE_H #define CEPH_WORKQUEUE_H #include "Cond.h" #include "include/unordered_map.h" #include "common/config_obs.h" #include "common/HeartbeatMap.h" #include class CephContext; /// Pool of threads that share work submitted to multiple work queues. class ThreadPool : public md_config_obs_t { CephContext *cct; string name; string thread_name; string lockname; Mutex _lock; Cond _cond; bool _stop; int _pause; int _draining; Cond _wait_cond; int ioprio_class, ioprio_priority; public: class TPHandle { friend class ThreadPool; CephContext *cct; heartbeat_handle_d *hb; ceph::coarse_mono_clock::rep grace; ceph::coarse_mono_clock::rep suicide_grace; public: TPHandle( CephContext *cct, heartbeat_handle_d *hb, time_t grace, time_t suicide_grace) : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {} void reset_tp_timeout(); void suspend_tp_timeout(); }; private: /// Basic interface to a work queue used by the worker threads. struct WorkQueue_ { string name; time_t timeout_interval, suicide_interval; WorkQueue_(string n, time_t ti, time_t sti) : name(std::move(n)), timeout_interval(ti), suicide_interval(sti) { } virtual ~WorkQueue_() {} /// Remove all work items from the queue. virtual void _clear() = 0; /// Check whether there is anything to do. virtual bool _empty() = 0; /// Get the next work item to process. virtual void *_void_dequeue() = 0; /** @brief Process the work item. * This function will be called several times in parallel * and must therefore be thread-safe. */ virtual void _void_process(void *item, TPHandle &handle) = 0; /** @brief Synchronously finish processing a work item. * This function is called after _void_process with the global thread pool lock held, * so at most one copy will execute simultaneously for a given thread pool. * It can be used for non-thread-safe finalization. */ virtual void _void_process_finish(void *) = 0; }; // track thread pool size changes unsigned _num_threads; string _thread_num_option; const char **_conf_keys; const char **get_tracked_conf_keys() const override { return _conf_keys; } void handle_conf_change(const ConfigProxy& conf, const std::set &changed) override; public: /** @brief Work queue that processes several submitted items at once. * The queue will automatically add itself to the thread pool on construction * and remove itself on destruction. */ template class BatchWorkQueue : public WorkQueue_ { ThreadPool *pool; virtual bool _enqueue(T *) = 0; virtual void _dequeue(T *) = 0; virtual void _dequeue(list *) = 0; virtual void _process_finish(const list &) {} // virtual methods from WorkQueue_ below void *_void_dequeue() override { list *out(new list); _dequeue(out); if (!out->empty()) { return (void *)out; } else { delete out; return 0; } } void _void_process(void *p, TPHandle &handle) override { _process(*((list*)p), handle); } void _void_process_finish(void *p) override { _process_finish(*(list*)p); delete (list *)p; } protected: virtual void _process(const list &items, TPHandle &handle) = 0; public: BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } ~BatchWorkQueue() override { pool->remove_work_queue(this); } bool queue(T *item) { pool->_lock.Lock(); bool r = _enqueue(item); pool->_cond.SignalOne(); pool->_lock.Unlock(); return r; } void dequeue(T *item) { pool->_lock.Lock(); _dequeue(item); pool->_lock.Unlock(); } void clear() { pool->_lock.Lock(); _clear(); pool->_lock.Unlock(); } void lock() { pool->lock(); } void unlock() { pool->unlock(); } void wake() { pool->wake(); } void _wake() { pool->_wake(); } void drain() { pool->drain(this); } }; /** @brief Templated by-value work queue. * Skeleton implementation of a queue that processes items submitted by value. * This is useful if the items are single primitive values or very small objects * (a few bytes). The queue will automatically add itself to the thread pool on * construction and remove itself on destruction. */ template class WorkQueueVal : public WorkQueue_ { Mutex _lock; ThreadPool *pool; list to_process; list to_finish; virtual void _enqueue(T) = 0; virtual void _enqueue_front(T) = 0; bool _empty() override = 0; virtual U _dequeue() = 0; virtual void _process_finish(U) {} void *_void_dequeue() override { { Mutex::Locker l(_lock); if (_empty()) return 0; U u = _dequeue(); to_process.push_back(u); } return ((void*)1); // Not used } void _void_process(void *, TPHandle &handle) override { _lock.Lock(); assert(!to_process.empty()); U u = to_process.front(); to_process.pop_front(); _lock.Unlock(); _process(u, handle); _lock.Lock(); to_finish.push_back(u); _lock.Unlock(); } void _void_process_finish(void *) override { _lock.Lock(); assert(!to_finish.empty()); U u = to_finish.front(); to_finish.pop_front(); _lock.Unlock(); _process_finish(u); } void _clear() override {} public: WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p) : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) { pool->add_work_queue(this); } ~WorkQueueVal() override { pool->remove_work_queue(this); } void queue(T item) { Mutex::Locker l(pool->_lock); _enqueue(item); pool->_cond.SignalOne(); } void queue_front(T item) { Mutex::Locker l(pool->_lock); _enqueue_front(item); pool->_cond.SignalOne(); } void drain() { pool->drain(this); } protected: void lock() { pool->lock(); } void unlock() { pool->unlock(); } virtual void _process(U u, TPHandle &) = 0; }; /** @brief Template by-pointer work queue. * Skeleton implementation of a queue that processes items of a given type submitted as pointers. * This is useful when the work item are large or include dynamically allocated memory. The queue * will automatically add itself to the thread pool on construction and remove itself on * destruction. */ template class WorkQueue : public WorkQueue_ { ThreadPool *pool; /// Add a work item to the queue. virtual bool _enqueue(T *) = 0; /// Dequeue a previously submitted work item. virtual void _dequeue(T *) = 0; /// Dequeue a work item and return the original submitted pointer. virtual T *_dequeue() = 0; virtual void _process_finish(T *) {} // implementation of virtual methods from WorkQueue_ void *_void_dequeue() override { return (void *)_dequeue(); } void _void_process(void *p, TPHandle &handle) override { _process(static_cast(p), handle); } void _void_process_finish(void *p) override { _process_finish(static_cast(p)); } protected: /// Process a work item. Called from the worker threads. virtual void _process(T *t, TPHandle &) = 0; public: WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), pool(p) { pool->add_work_queue(this); } ~WorkQueue() override { pool->remove_work_queue(this); } bool queue(T *item) { pool->_lock.Lock(); bool r = _enqueue(item); pool->_cond.SignalOne(); pool->_lock.Unlock(); return r; } void dequeue(T *item) { pool->_lock.Lock(); _dequeue(item); pool->_lock.Unlock(); } void clear() { pool->_lock.Lock(); _clear(); pool->_lock.Unlock(); } Mutex &get_lock() { return pool->_lock; } void lock() { pool->lock(); } void unlock() { pool->unlock(); } /// wake up the thread pool (without lock held) void wake() { pool->wake(); } /// wake up the thread pool (with lock already held) void _wake() { pool->_wake(); } void _wait() { pool->_wait(); } void drain() { pool->drain(this); } }; template class PointerWQ : public WorkQueue_ { public: ~PointerWQ() override { m_pool->remove_work_queue(this); assert(m_processing == 0); } void drain() { { // if this queue is empty and not processing, don't wait for other // queues to finish processing Mutex::Locker l(m_pool->_lock); if (m_processing == 0 && m_items.empty()) { return; } } m_pool->drain(this); } void queue(T *item) { Mutex::Locker l(m_pool->_lock); m_items.push_back(item); m_pool->_cond.SignalOne(); } bool empty() { Mutex::Locker l(m_pool->_lock); return _empty(); } protected: PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) { } void register_work_queue() { m_pool->add_work_queue(this); } void _clear() override { assert(m_pool->_lock.is_locked()); m_items.clear(); } bool _empty() override { assert(m_pool->_lock.is_locked()); return m_items.empty(); } void *_void_dequeue() override { assert(m_pool->_lock.is_locked()); if (m_items.empty()) { return NULL; } ++m_processing; T *item = m_items.front(); m_items.pop_front(); return item; } void _void_process(void *item, ThreadPool::TPHandle &handle) override { process(reinterpret_cast(item)); } void _void_process_finish(void *item) override { assert(m_pool->_lock.is_locked()); assert(m_processing > 0); --m_processing; } virtual void process(T *item) = 0; void process_finish() { Mutex::Locker locker(m_pool->_lock); _void_process_finish(nullptr); } T *front() { assert(m_pool->_lock.is_locked()); if (m_items.empty()) { return NULL; } return m_items.front(); } void requeue(T *item) { Mutex::Locker pool_locker(m_pool->_lock); _void_process_finish(nullptr); m_items.push_front(item); } void signal() { Mutex::Locker pool_locker(m_pool->_lock); m_pool->_cond.SignalOne(); } Mutex &get_pool_lock() { return m_pool->_lock; } private: ThreadPool *m_pool; std::list m_items; uint32_t m_processing; }; private: vector work_queues; int next_work_queue = 0; // threads struct WorkThread : public Thread { ThreadPool *pool; // cppcheck-suppress noExplicitConstructor WorkThread(ThreadPool *p) : pool(p) {} void *entry() override { pool->worker(this); return 0; } }; set _threads; list _old_threads; ///< need to be joined int processing; void start_threads(); void join_old_threads(); void worker(WorkThread *wt); public: ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL); ~ThreadPool() override; /// return number of threads currently running int get_num_threads() { Mutex::Locker l(_lock); return _num_threads; } /// assign a work queue to this thread pool void add_work_queue(WorkQueue_* wq) { Mutex::Locker l(_lock); work_queues.push_back(wq); } /// remove a work queue from this thread pool void remove_work_queue(WorkQueue_* wq) { Mutex::Locker l(_lock); unsigned i = 0; while (work_queues[i] != wq) i++; for (i++; i < work_queues.size(); i++) work_queues[i-1] = work_queues[i]; assert(i == work_queues.size()); work_queues.resize(i-1); } /// take thread pool lock void lock() { _lock.Lock(); } /// release thread pool lock void unlock() { _lock.Unlock(); } /// wait for a kick on this thread pool void wait(Cond &c) { c.Wait(_lock); } /// wake up a waiter (with lock already held) void _wake() { _cond.Signal(); } /// wake up a waiter (without lock held) void wake() { Mutex::Locker l(_lock); _cond.Signal(); } void _wait() { _cond.Wait(_lock); } /// start thread pool thread void start(); /// stop thread pool thread void stop(bool clear_after=true); /// pause thread pool (if it not already paused) void pause(); /// pause initiation of new work void pause_new(); /// resume work in thread pool. must match each pause() call 1:1 to resume. void unpause(); /** @brief Wait until work completes. * If the parameter is NULL, blocks until all threads are idle. * If it is not NULL, blocks until the given work queue does not have * any items left to process. */ void drain(WorkQueue_* wq = 0); /// set io priority void set_ioprio(int cls, int priority); }; class GenContextWQ : public ThreadPool::WorkQueueVal*> { list*> _queue; public: GenContextWQ(const string &name, time_t ti, ThreadPool *tp) : ThreadPool::WorkQueueVal< GenContext*>(name, ti, ti*10, tp) {} void _enqueue(GenContext *c) override { _queue.push_back(c); } void _enqueue_front(GenContext *c) override { _queue.push_front(c); } bool _empty() override { return _queue.empty(); } GenContext *_dequeue() override { assert(!_queue.empty()); GenContext *c = _queue.front(); _queue.pop_front(); return c; } void _process(GenContext *c, ThreadPool::TPHandle &tp) override { c->complete(tp); } }; class C_QueueInWQ : public Context { GenContextWQ *wq; GenContext *c; public: C_QueueInWQ(GenContextWQ *wq, GenContext *c) : wq(wq), c(c) {} void finish(int) override { wq->queue(c); } }; /// Work queue that asynchronously completes contexts (executes callbacks). /// @see Finisher class ContextWQ : public ThreadPool::PointerWQ { public: ContextWQ(const string &name, time_t ti, ThreadPool *tp) : ThreadPool::PointerWQ(name, ti, 0, tp), m_lock("ContextWQ::m_lock") { this->register_work_queue(); } void queue(Context *ctx, int result = 0) { if (result != 0) { Mutex::Locker locker(m_lock); m_context_results[ctx] = result; } ThreadPool::PointerWQ::queue(ctx); } protected: void _clear() override { ThreadPool::PointerWQ::_clear(); Mutex::Locker locker(m_lock); m_context_results.clear(); } void process(Context *ctx) override { int result = 0; { Mutex::Locker locker(m_lock); ceph::unordered_map::iterator it = m_context_results.find(ctx); if (it != m_context_results.end()) { result = it->second; m_context_results.erase(it); } } ctx->complete(result); } private: Mutex m_lock; ceph::unordered_map m_context_results; }; class ShardedThreadPool { CephContext *cct; string name; string thread_name; string lockname; Mutex shardedpool_lock; Cond shardedpool_cond; Cond wait_cond; uint32_t num_threads; std::atomic stop_threads = { false }; std::atomic pause_threads = { false }; std::atomic drain_threads = { false }; uint32_t num_paused; uint32_t num_drained; public: class BaseShardedWQ { public: time_t timeout_interval, suicide_interval; BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {} virtual ~BaseShardedWQ() {} virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0; virtual void return_waiting_threads() = 0; virtual void stop_return_waiting_threads() = 0; virtual bool is_shard_empty(uint32_t thread_index) = 0; }; template class ShardedWQ: public BaseShardedWQ { ShardedThreadPool* sharded_pool; protected: virtual void _enqueue(T&&) = 0; virtual void _enqueue_front(T&&) = 0; public: ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), sharded_pool(tp) { tp->set_wq(this); } ~ShardedWQ() override {} void queue(T&& item) { _enqueue(std::move(item)); } void queue_front(T&& item) { _enqueue_front(std::move(item)); } void drain() { sharded_pool->drain(); } }; private: BaseShardedWQ* wq; // threads struct WorkThreadSharded : public Thread { ShardedThreadPool *pool; uint32_t thread_index; WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p), thread_index(pthread_index) {} void *entry() override { pool->shardedthreadpool_worker(thread_index); return 0; } }; vector threads_shardedpool; void start_threads(); void shardedthreadpool_worker(uint32_t thread_index); void set_wq(BaseShardedWQ* swq) { wq = swq; } public: ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads); ~ShardedThreadPool(){}; /// start thread pool thread void start(); /// stop thread pool thread void stop(); /// pause thread pool (if it not already paused) void pause(); /// pause initiation of new work void pause_new(); /// resume work in thread pool. must match each pause() call 1:1 to resume. void unpause(); /// wait for all work to complete void drain(); }; #endif