diff options
Diffstat (limited to 'src/common/intrusive_timer.h')
-rw-r--r-- | src/common/intrusive_timer.h | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/src/common/intrusive_timer.h b/src/common/intrusive_timer.h new file mode 100644 index 00000000000..b32286a2096 --- /dev/null +++ b/src/common/intrusive_timer.h @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <mutex> +#include <condition_variable> + +#include <boost/intrusive/set.hpp> + +#include "common/ceph_time.h" + +namespace ceph::common { + +/** + * intrusive_timer + * + * SafeTimer (common/Timer.h) isn't well suited to usage in high + * usage pathways for a few reasons: + * - Usage generally requires allocation of a fresh context for each + * scheduled operation. One could override Context::complete to avoid + * destroying the instance, but actually reusing the instance is tricky + * as SafeTimer doesn't guarrantee cancelation if safe_callbacks is false. + * - SafeTimer only guarrantees cancelation if safe_timer is true, which + * it generally won't be if the user needs to call into SafeTimer while + * holding locks taken by callbacks. + * + * This implementation allows the user to repeatedly schedule and cancel + * an object inheriting from the callback_t interface below while + * guarranteeing cancelation provided that the user holds the lock + * associated with a particular callback while calling into intrusive_timer. + */ +class intrusive_timer { + using clock_t = ceph::coarse_real_clock; + +public: + /** + * callback_t + * + * Objects inheriting from callback_t can be scheduled + * via intrusive_timer. + */ + class callback_t : public boost::intrusive::set_base_hook<> { + friend class intrusive_timer; + clock_t::time_point schedule_point; + unsigned incarnation = 0; + + public: + /** + * add_ref, dec_ref + * + * callback_t must remain live and all methods must remain + * safe to call as long as calls to add_ref() outnumber calls + * to dec_ref(). + */ + virtual void add_ref() = 0; + virtual void dec_ref() = 0; + + /** + * lock, unlock + * + * For any specific callback_t, must lock/unlock a lock held while + * accessing intrusive_timer public methods for that callback_t + * instance. + */ + virtual void lock() = 0; + virtual void unlock() = 0; + + /// Invokes callback, will be called with lock held + virtual void invoke() = 0; + + /** + * is_scheduled + * + * Return true iff callback is scheduled to be invoked. + * May only be validly invoked while lock associated with + * callback_t instance is held. + */ + bool is_scheduled() const { return incarnation % 2 == 1; } + virtual ~callback_t() = default; + + /// Order callback_t by schedule_point + auto operator<=>(const callback_t &rhs) const { + return std::make_pair(schedule_point, this) <=> + std::make_pair(rhs.schedule_point, &rhs); + } + }; + +private: + /// protects events, stopping + std::mutex lock; + + /// stopping, cv used to signal that t should halt + std::condition_variable cv; + bool stopping = false; + + /// queued events ordered by callback_t::schedule_point + boost::intrusive::set<callback_t> events; + + /// thread responsible for calling scheduled callbacks + std::thread t; + + /// peek front of queue, null if empty + callback_t *peek() { + return events.empty() ? nullptr : &*(events.begin()); + } + + /// entry point for t + void _run() { + std::unique_lock l(lock); + while (true) { + if (stopping) { + return; + } + + auto next = peek(); + if (!next) { + cv.wait(l); + continue; + } + + if (next->schedule_point > clock_t::now()) { + cv.wait_until(l, next->schedule_point); + continue; + } + + // we release the reference below + events.erase(*next); + + /* cancel() and schedule_after() both hold both intrusive_timer::lock + * and the callback_t lock (precondition of both) while mutating + * next->incarnation, so this read is safe. We're relying on the + * fact that only this method in this thread will access + * next->incarnation under only one of the two. */ + auto incarnation = next->incarnation; + l.unlock(); + { + /* Note that intrusive_timer::cancel may observe that + * callback_t::is_scheduled() returns true while + * callback_t::is_linked() is false since we drop + * intrusive_timer::lock between removing next from the + * queue and incrementing callback_t::incarnation here + * under the callback_t lock. In that case, cancel() + * increments incarnation logically canceling the callback + * but leaves the reference for us to drop. + */ + std::unique_lock m(*next); + if (next->incarnation == incarnation) { + /* As above, cancel() and schedule_after() hold both locks so this + * mutation and read are safe. */ + ++next->incarnation; + next->invoke(); + } + /* else, next was canceled between l.unlock() and next->lock(). + * Note that if incarnation does not match, we do nothing to next + * other than drop our reference -- it might well have been + * rescheduled already! */ + } + next->dec_ref(); + l.lock(); + } + } + +public: + intrusive_timer() : t([this] { _run(); }) {} + + /** + * schedule_after + * + * Schedule cb to run after the specified period. + * The lock associated with cb must be held. + * cb must not already be scheduled. + * + * @param cb [in] callback to schedule + * @param after [in] period after which to schedule cb + */ + template <typename T> + void schedule_after(callback_t &cb, T after) { + ceph_assert(!cb.is_scheduled()); + std::unique_lock l(lock); + ceph_assert(!cb.is_linked()); + + ++cb.incarnation; + cb.schedule_point = clock_t::now() + after; + + cb.add_ref(); + events.insert(cb); + + cv.notify_one(); + } + + /** + * cancel + * + * Cancel already scheduled cb. + * The lock associated with cb must be held. + * + * @param cb [in] callback to cancel + */ + void cancel(callback_t &cb) { + ceph_assert(cb.is_scheduled()); + std::unique_lock l(lock); + ++cb.incarnation; + + if (cb.is_linked()) { + events.erase(cb); + cb.dec_ref(); + } + } + + /// Stop intrusive_timer + void stop() { + { + std::unique_lock l(lock); + stopping = true; + cv.notify_one(); + } + t.join(); + } +}; + +} |