summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/recovery_backend.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/recovery_backend.h')
-rw-r--r--src/crimson/osd/recovery_backend.h77
1 files changed, 59 insertions, 18 deletions
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h
index 4c9c67770ab..f5a365c1558 100644
--- a/src/crimson/osd/recovery_backend.h
+++ b/src/crimson/osd/recovery_backend.h
@@ -125,7 +125,7 @@ public:
public boost::intrusive_ref_counter<
WaitForObjectRecovery, boost::thread_unsafe_counter>,
public crimson::BlockerT<WaitForObjectRecovery> {
- seastar::shared_promise<> readable, recovered, pulled;
+ std::optional<seastar::shared_promise<>> readable, recovered, pulled;
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
public:
static constexpr const char* type_name = "WaitForObjectRecovery";
@@ -135,13 +135,19 @@ public:
std::map<pg_shard_t, push_info_t> pushing;
seastar::future<> wait_for_readable() {
- return readable.get_shared_future();
+ if (!readable) {
+ readable = seastar::shared_promise<>();
+ }
+ return readable->get_shared_future();
}
seastar::future<> wait_for_pushes(pg_shard_t shard) {
return pushes[shard].get_shared_future();
}
seastar::future<> wait_for_recovered() {
- return recovered.get_shared_future();
+ if (!recovered) {
+ recovered = seastar::shared_promise<>();
+ }
+ return recovered->get_shared_future();
}
template <typename T, typename F>
auto wait_track_blocking(T &trigger, F &&fut) {
@@ -154,37 +160,72 @@ public:
template <typename T>
seastar::future<> wait_for_recovered(T &trigger) {
WaitForObjectRecoveryRef ref = this;
- return wait_track_blocking(trigger, recovered.get_shared_future());
+ if (!recovered) {
+ recovered = seastar::shared_promise<>();
+ }
+ return wait_track_blocking(trigger, recovered->get_shared_future());
}
seastar::future<> wait_for_pull() {
- return pulled.get_shared_future();
+ if (!pulled) {
+ pulled = seastar::shared_promise<>();
+ }
+ return pulled->get_shared_future();
}
void set_readable() {
- readable.set_value();
+ if (readable) {
+ readable->set_value();
+ readable.reset();
+ }
}
void set_recovered() {
- recovered.set_value();
+ if (recovered) {
+ recovered->set_value();
+ recovered.reset();
+ }
}
void set_pushed(pg_shard_t shard) {
- pushes[shard].set_value();
+ auto it = pushes.find(shard);
+ if (it != pushes.end()) {
+ auto &push_promise = it->second;
+ push_promise.set_value();
+ pushes.erase(it);
+ }
}
void set_pulled() {
- pulled.set_value();
+ if (pulled) {
+ pulled->set_value();
+ pulled.reset();
+ }
}
void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
- pushes.at(shard).set_exception(e);
+ auto it = pushes.find(shard);
+ if (it != pushes.end()) {
+ auto &push_promise = it->second;
+ push_promise.set_exception(e);
+ pushes.erase(it);
+ }
}
void interrupt(std::string_view why) {
- readable.set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- recovered.set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- pulled.set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
+ if (readable) {
+ readable->set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ readable.reset();
+ }
+ if (recovered) {
+ recovered->set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ recovered.reset();
+ }
+ if (pulled) {
+ pulled->set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ pulled.reset();
+ }
for (auto& [pg_shard, pr] : pushes) {
- pr.set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
+ pr.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
}
+ pushes.clear();
}
void stop();
void dump_detail(Formatter* f) const {