summaryrefslogtreecommitdiffstats
path: root/src/common/intrusive_timer.h
blob: b32286a209633b7e702928b47fcfbc9c4eab3eee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <mutex>
#include <condition_variable>

#include <boost/intrusive/set.hpp>

#include "common/ceph_time.h"

namespace ceph::common {

/**
 * intrusive_timer
 *
 * SafeTimer (common/Timer.h) isn't well suited to usage in high
 * usage pathways for a few reasons:
 * - Usage generally requires allocation of a fresh context for each
 *   scheduled operation.  One could override Context::complete to avoid
 *   destroying the instance, but actually reusing the instance is tricky
 *   as SafeTimer doesn't guarrantee cancelation if safe_callbacks is false.
 * - SafeTimer only guarrantees cancelation if safe_timer is true, which
 *   it generally won't be if the user needs to call into SafeTimer while
 *   holding locks taken by callbacks.
 *
 * This implementation allows the user to repeatedly schedule and cancel
 * an object inheriting from the callback_t interface below while
 * guarranteeing cancelation provided that the user holds the lock
 * associated with a particular callback while calling into intrusive_timer.
 */
class intrusive_timer {
  using clock_t = ceph::coarse_real_clock;

public:
  /**
   * callback_t
   *
   * Objects inheriting from callback_t can be scheduled
   * via intrusive_timer.
   */
  class callback_t : public boost::intrusive::set_base_hook<> {
    friend class intrusive_timer;
    clock_t::time_point schedule_point;
    unsigned incarnation = 0;

  public:
    /**
     * add_ref, dec_ref
     *
     * callback_t must remain live and all methods must remain
     * safe to call as long as calls to add_ref() outnumber calls
     * to dec_ref().
     */
    virtual void add_ref() = 0;
    virtual void dec_ref() = 0;

    /**
     * lock, unlock
     *
     * For any specific callback_t, must lock/unlock a lock held while
     * accessing intrusive_timer public methods for that callback_t
     * instance.
     */
    virtual void lock() = 0;
    virtual void unlock() = 0;

    /// Invokes callback, will be called with lock held
    virtual void invoke() = 0;

    /**
     * is_scheduled
     *
     * Return true iff callback is scheduled to be invoked.
     * May only be validly invoked while lock associated with
     * callback_t instance is held.
     */
    bool is_scheduled() const { return incarnation % 2 == 1; }
    virtual ~callback_t() = default;

    /// Order callback_t by schedule_point
    auto operator<=>(const callback_t &rhs) const {
      return std::make_pair(schedule_point, this) <=>
	std::make_pair(rhs.schedule_point, &rhs);
    }
  };

private:
  /// protects events, stopping
  std::mutex lock;

  /// stopping, cv used to signal that t should halt
  std::condition_variable cv;
  bool stopping = false;

  /// queued events ordered by callback_t::schedule_point
  boost::intrusive::set<callback_t> events;

  /// thread responsible for calling scheduled callbacks
  std::thread t;

  /// peek front of queue, null if empty
  callback_t *peek() {
    return events.empty() ? nullptr : &*(events.begin());
  }

  /// entry point for t
  void _run() {
    std::unique_lock l(lock);
    while (true) {
      if (stopping) {
	return;
      }
    
      auto next = peek();
      if (!next) {
	cv.wait(l);
	continue;
      }

      if (next->schedule_point > clock_t::now()) {
	cv.wait_until(l, next->schedule_point);
	continue;
      }

      // we release the reference below
      events.erase(*next);

      /* cancel() and schedule_after() both hold both intrusive_timer::lock
       * and the callback_t lock (precondition of both) while mutating
       * next->incarnation, so this read is safe.  We're relying on the
       * fact that only this method in this thread will access
       * next->incarnation under only one of the two. */
      auto incarnation = next->incarnation;
      l.unlock();
      {
	/* Note that intrusive_timer::cancel may observe that
	 * callback_t::is_scheduled() returns true while
	 * callback_t::is_linked() is false since we drop
	 * intrusive_timer::lock between removing next from the
	 * queue and incrementing callback_t::incarnation here
	 * under the callback_t lock.  In that case, cancel()
	 * increments incarnation logically canceling the callback
	 * but leaves the reference for us to drop.
	 */
	std::unique_lock m(*next);
	if (next->incarnation == incarnation) {
	  /* As above, cancel() and schedule_after() hold both locks so this
	   * mutation and read are safe. */
	  ++next->incarnation;
	  next->invoke();
	}
	/* else, next was canceled between l.unlock() and next->lock().
	 * Note that if incarnation does not match, we do nothing to next
	 * other than drop our reference -- it might well have been
	 * rescheduled already! */
      }
      next->dec_ref();
      l.lock();
    }
  }

public:
  intrusive_timer() : t([this] { _run(); }) {}

  /**
   * schedule_after
   *
   * Schedule cb to run after the specified period.
   * The lock associated with cb must be held.
   * cb must not already be scheduled.
   *
   * @param cb [in] callback to schedule
   * @param after [in] period after which to schedule cb
   */
  template <typename T>
  void schedule_after(callback_t &cb, T after) {
    ceph_assert(!cb.is_scheduled());
    std::unique_lock l(lock);
    ceph_assert(!cb.is_linked());

    ++cb.incarnation;
    cb.schedule_point = clock_t::now() + after;

    cb.add_ref();
    events.insert(cb);

    cv.notify_one();
  }

  /**
   * cancel
   *
   * Cancel already scheduled cb.
   * The lock associated with cb must be held.
   *
   * @param cb [in] callback to cancel
   */
  void cancel(callback_t &cb) {
    ceph_assert(cb.is_scheduled());
    std::unique_lock l(lock);
    ++cb.incarnation;

    if (cb.is_linked()) {
      events.erase(cb);
      cb.dec_ref();
    }
  }

  /// Stop intrusive_timer
  void stop() {
    {
      std::unique_lock l(lock);
      stopping = true;
      cv.notify_one();
    }
    t.join();
  }
};

}