1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2023 IBM, Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License db_version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include "mds/QuiesceDb.h"
#include <functional>
#include <optional>
#include <map>
#include <mutex>
#include <thread>
class QuiesceAgent {
public:
struct ControlInterface {
QuiesceInterface::RequestSubmit submit_request;
QuiesceInterface::RequestCancel cancel_request;
QuiesceInterface::AgentAck agent_ack;
// TODO: do we need a "cancel all"?
};
QuiesceAgent(const ControlInterface& quiesce_control)
: quiesce_control(quiesce_control)
, stop_agent_thread(false)
, agent_thread(this) {
agent_thread.create("quiesce.agt");
};
virtual ~QuiesceAgent() {
shutdown();
}
/// @brief WARNING: will reset syncrhonously
/// this may call cancel on active roots
/// which may lead to a deadlock if the MDS
/// lock is being held when calling this.
/// Consider `reset_async` if you're holding
/// the MDS lock.
void reset() {
std::unique_lock l(agent_mutex);
// prevent any pending change
pending.clear();
// let the system settle
await_idle_locked(l);
// we are idle, hence the current holds
// our only tracked set
TrackedRoots current_roots = current.clear();
l.unlock();
// do this outside of the lock
current_roots.clear();
}
void reset_async() {
set_pending_roots({0, 0}, {});
}
void shutdown()
{
std::unique_lock l(agent_mutex);
stop_agent_thread = true;
agent_cond.notify_all();
l.unlock();
if (agent_thread.is_started()) {
agent_thread.join();
}
current.clear();
pending.clear();
}
bool db_update(QuiesceMap& map);
struct TrackedRoot {
std::optional<QuiesceInterface::RequestHandle> quiesce_request;
// we could have hidden the request handle
// inside the cancel functor, but then we'd lose
// the ability to identify individual requests
// when looking at the tracked root.
QuiesceInterface::RequestCancel cancel;
std::optional<int> quiesce_result;
std::optional<int> cancel_result;
QuiesceState committed_state;
QuiesceTimePoint expires_at;
TrackedRoot(QuiesceState state, QuiesceTimeInterval ttl)
: committed_state(state)
, expires_at(interval_saturate_add_now(ttl))
, busy_lock(false)
{
}
TrackedRoot() : TrackedRoot(QS__INVALID, QuiesceTimeInterval::zero()) {}
bool should_quiesce() const
{
return committed_state == QS_QUIESCING || committed_state == QS_QUIESCED;
}
bool should_release() const {
return committed_state == QS_RELEASING || committed_state == QS_RELEASED;
}
~TrackedRoot();
void update_committed(QuiesceMap::RootInfo const & info) {
committed_state = info.state;
expires_at = interval_saturate_add_now(info.ttl);
}
QuiesceTimeInterval get_ttl() const
{
auto now = QuiesceClock::now();
if (expires_at.time_since_epoch() == QuiesceTimeInterval::max()) {
return QuiesceTimeInterval::max();
}
if (expires_at > now) {
return expires_at - now;
} else {
return QuiesceTimeInterval::zero();
}
}
QuiesceState get_actual_state() const {
QuiesceState result = QS_QUIESCING;
bool did_quiesce = quiesce_result == 0;
bool did_cancel = cancel_result == 0;
if (did_quiesce) {
if (cancel_result.has_value()) {
result = did_cancel ? QS_RELEASED : QS_EXPIRED;
} else {
result = QS_QUIESCED;
}
} else {
if (quiesce_result.has_value()) {
result = QS_FAILED;
} else if (should_release()) {
// we must have lost track of this root,
// probably, due to expiration. But even if due to an error,
// this is our best guess for the situation
result = QS_EXPIRED;
}
}
return result;
}
void lock() const {
while (busy_lock.test_and_set(std::memory_order_acquire))
; // spin
}
void unlock() const {
busy_lock.clear(std::memory_order_release);
}
private:
mutable std::atomic_flag busy_lock;
};
using TrackedRootRef = std::shared_ptr<TrackedRoot>;
using TrackedRoots = std::unordered_map<QuiesceRoot, TrackedRootRef>;
TrackedRoots tracked_roots() {
std::lock_guard l(agent_mutex);
return current.roots;
}
TrackedRootRef get_tracked_root(QuiesceRoot root) {
std::lock_guard l(agent_mutex);
if (auto it = current.roots.find(root); it != current.roots.end()) {
return it->second;
} else {
return nullptr;
}
}
QuiesceDbVersion get_current_version() {
std::lock_guard l(agent_mutex);
return current.db_version;
}
protected:
ControlInterface quiesce_control;
struct TrackedRootsVersion {
TrackedRoots roots;
QuiesceDbVersion db_version = {0, 0};
bool armed = false;
TrackedRoots clear() {
armed = false;
db_version = {0, 0};
TrackedRoots old = std::move(roots);
roots.clear();
return old;
}
};
template <class CharT, class Traits>
friend std::basic_ostream<CharT, Traits>&
operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr);
TrackedRootsVersion current;
TrackedRootsVersion pending;
std::mutex agent_mutex;
std::condition_variable agent_cond;
bool stop_agent_thread;
bool upkeep_needed;
template<class L>
QuiesceDbVersion await_idle_locked(L &lock) {
return await_phase_locked(lock, false, false);
}
template <class L>
QuiesceDbVersion await_phase_locked(L& lock, bool pending_armed, bool current_armed)
{
agent_cond.wait(lock, [=, this] {
return ( !upkeep_needed
&& current.armed == current_armed
&& pending.armed == pending_armed);
});
return std::max(current.db_version, pending.db_version);
}
virtual void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots);
void set_upkeep_needed();
class AgentThread : public Thread {
public:
explicit AgentThread(QuiesceAgent* qa)
: qa(qa)
{
}
void* entry() override
{
return qa->agent_thread_main();
}
private:
QuiesceAgent* qa;
} agent_thread;
void* agent_thread_main();
virtual void _agent_thread_will_work() { }
virtual void _agent_thread_did_work() { }
};
|