// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef COMMON_CEPH_TIMER_H #define COMMON_CEPH_TIMER_H #include #include #include namespace ceph { /// Newly constructed timer should be suspended at point of /// construction. struct construct_suspended_t { }; constexpr construct_suspended_t construct_suspended { }; namespace timer_detail { using boost::intrusive::member_hook; using boost::intrusive::set_member_hook; using boost::intrusive::link_mode; using boost::intrusive::normal_link; using boost::intrusive::set; using boost::intrusive::constant_time_size; using boost::intrusive::compare; // Compared to the SafeTimer this does fewer allocations (you // don't have to allocate a new Context every time you // want to cue the next tick.) // // It also does not share a lock with the caller. If you call // cancel event, it either cancels the event (and returns true) or // you missed it. If this does not work for you, you can set up a // flag and mutex of your own. // // You get to pick your clock. I like mono_clock, since I usually // want to wait FOR a given duration. real_clock is worthwhile if // you want to wait UNTIL a specific moment of wallclock time. If // you want you can set up a timer that executes a function after // you use up ten seconds of CPU time. template class timer { using sh = set_member_hook >; struct event { typename TC::time_point t; uint64_t id; std::function f; sh schedule_link; sh event_link; event() : t(TC::time_point::min()), id(0) {} event(uint64_t _id) : t(TC::time_point::min()), id(_id) {} event(typename TC::time_point _t, uint64_t _id, std::function&& _f) : t(_t), id(_id), f(_f) {} event(typename TC::time_point _t, uint64_t _id, const std::function& _f) : t(_t), id(_id), f(_f) {} bool operator <(const event& e) { return t == e.t ? id < e.id : t < e.t; } }; struct SchedCompare { bool operator()(const event& e1, const event& e2) const { return e1.t == e2.t ? e1.id < e2.id : e1.t < e2.t; } }; struct EventCompare { bool operator()(const event& e1, const event& e2) const { return e1.id < e2.id; } }; using schedule_type = set, constant_time_size, compare >; schedule_type schedule; using event_set_type = set, constant_time_size, compare >; event_set_type events; std::mutex lock; using lock_guard = std::lock_guard; using unique_lock = std::unique_lock; std::condition_variable cond; event* running{ nullptr }; uint64_t next_id{ 0 }; bool suspended; std::thread thread; void timer_thread() { unique_lock l(lock); while (!suspended) { typename TC::time_point now = TC::now(); while (!schedule.empty()) { auto p = schedule.begin(); // Should we wait for the future? if (p->t > now) break; event& e = *p; schedule.erase(e); events.erase(e); // Since we have only one thread it is impossible to have more // than one running event running = &e; l.unlock(); e.f(); l.lock(); if (running) { running = nullptr; delete &e; } // Otherwise the event requeued itself } if (schedule.empty()) cond.wait(l); else cond.wait_until(l, schedule.begin()->t); } } public: timer() { lock_guard l(lock); suspended = false; thread = std::thread(&timer::timer_thread, this); } // Create a suspended timer, jobs will be executed in order when // it is resumed. timer(construct_suspended_t) { lock_guard l(lock); suspended = true; } timer(const timer &) = delete; timer& operator=(const timer &) = delete; ~timer() { suspend(); cancel_all_events(); } // Suspend operation of the timer (and let its thread die). void suspend() { unique_lock l(lock); if (suspended) return; suspended = true; cond.notify_one(); l.unlock(); thread.join(); } // Resume operation of the timer. (Must have been previously // suspended.) void resume() { unique_lock l(lock); if (!suspended) return; suspended = false; ceph_assert(!thread.joinable()); thread = std::thread(&timer::timer_thread, this); } // Schedule an event in the relative future template uint64_t add_event(typename TC::duration duration, Callable&& f, Args&&... args) { typename TC::time_point when = TC::now(); when += duration; return add_event(when, std::forward(f), std::forward(args)...); } // Schedule an event in the absolute future template uint64_t add_event(typename TC::time_point when, Callable&& f, Args&&... args) { std::lock_guard l(lock); event& e = *(new event( when, ++next_id, std::forward >( std::bind(std::forward(f), std::forward(args)...)))); auto i = schedule.insert(e); events.insert(e); /* If the event we have just inserted comes before everything * else, we need to adjust our timeout. */ if (i.first == schedule.begin()) cond.notify_one(); // Previously each event was a context, identified by a // pointer, and each context to be called only once. Since you // can queue the same function pointer, member function, // lambda, or functor up multiple times, identifying things by // function for the purposes of cancellation is no longer // suitable. Thus: return e.id; } // Adjust the timeout of a currently-scheduled event (relative) bool adjust_event(uint64_t id, typename TC::duration duration) { return adjust_event(id, TC::now() + duration); } // Adjust the timeout of a currently-scheduled event (absolute) bool adjust_event(uint64_t id, typename TC::time_point when) { std::lock_guard l(lock); event key(id); typename event_set_type::iterator it = events.find(key); if (it == events.end()) return false; event& e = *it; schedule.erase(e); e.t = when; schedule.insert(e); return true; } // Cancel an event. If the event has already come and gone (or you // never submitted it) you will receive false. Otherwise you will // receive true and it is guaranteed the event will not execute. bool cancel_event(const uint64_t id) { std::lock_guard l(lock); event dummy(id); auto p = events.find(dummy); if (p == events.end()) { return false; } event& e = *p; events.erase(e); schedule.erase(e); delete &e; return true; } // Reschedules a currently running event in the relative // future. Must be called only from an event executed by this // timer. If you have a function that can be called either from // this timer or some other way, it is your responsibility to make // sure it can tell the difference only does not call // reschedule_me in the non-timer case. // // Returns an event id. If you had an event_id from the first // scheduling, replace it with this return value. uint64_t reschedule_me(typename TC::duration duration) { return reschedule_me(TC::now() + duration); } // Reschedules a currently running event in the absolute // future. Must be called only from an event executed by this // timer. if you have a function that can be called either from // this timer or some other way, it is your responsibility to make // sure it can tell the difference only does not call // reschedule_me in the non-timer case. // // Returns an event id. If you had an event_id from the first // scheduling, replace it with this return value. uint64_t reschedule_me(typename TC::time_point when) { if (std::this_thread::get_id() != thread.get_id()) throw std::make_error_condition(std::errc::operation_not_permitted); std::lock_guard l(lock); running->t = when; uint64_t id = ++next_id; running->id = id; schedule.insert(*running); events.insert(*running); // Hacky, but keeps us from being deleted running = nullptr; // Same function, but you get a new ID. return id; } // Remove all events from the queue. void cancel_all_events() { std::lock_guard l(lock); while (!events.empty()) { auto p = events.begin(); event& e = *p; schedule.erase(e); events.erase(e); delete &e; } } }; // timer }; // timer_detail using timer_detail::timer; }; // ceph #endif