diff options
Diffstat (limited to 'src/crimson/osd/recovery_backend.h')
-rw-r--r-- | src/crimson/osd/recovery_backend.h | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index 4c9c67770ab..f5a365c1558 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -125,7 +125,7 @@ public: public boost::intrusive_ref_counter< WaitForObjectRecovery, boost::thread_unsafe_counter>, public crimson::BlockerT<WaitForObjectRecovery> { - seastar::shared_promise<> readable, recovered, pulled; + std::optional<seastar::shared_promise<>> readable, recovered, pulled; std::map<pg_shard_t, seastar::shared_promise<>> pushes; public: static constexpr const char* type_name = "WaitForObjectRecovery"; @@ -135,13 +135,19 @@ public: std::map<pg_shard_t, push_info_t> pushing; seastar::future<> wait_for_readable() { - return readable.get_shared_future(); + if (!readable) { + readable = seastar::shared_promise<>(); + } + return readable->get_shared_future(); } seastar::future<> wait_for_pushes(pg_shard_t shard) { return pushes[shard].get_shared_future(); } seastar::future<> wait_for_recovered() { - return recovered.get_shared_future(); + if (!recovered) { + recovered = seastar::shared_promise<>(); + } + return recovered->get_shared_future(); } template <typename T, typename F> auto wait_track_blocking(T &trigger, F &&fut) { @@ -154,37 +160,72 @@ public: template <typename T> seastar::future<> wait_for_recovered(T &trigger) { WaitForObjectRecoveryRef ref = this; - return wait_track_blocking(trigger, recovered.get_shared_future()); + if (!recovered) { + recovered = seastar::shared_promise<>(); + } + return wait_track_blocking(trigger, recovered->get_shared_future()); } seastar::future<> wait_for_pull() { - return pulled.get_shared_future(); + if (!pulled) { + pulled = seastar::shared_promise<>(); + } + return pulled->get_shared_future(); } void set_readable() { - readable.set_value(); + if (readable) { + readable->set_value(); + readable.reset(); + } } void set_recovered() { - recovered.set_value(); + if (recovered) { + recovered->set_value(); + recovered.reset(); + } } void set_pushed(pg_shard_t shard) { - pushes[shard].set_value(); + auto it = pushes.find(shard); + if (it != pushes.end()) { + auto &push_promise = it->second; + push_promise.set_value(); + pushes.erase(it); + } } void set_pulled() { - pulled.set_value(); + if (pulled) { + pulled->set_value(); + pulled.reset(); + } } void set_push_failed(pg_shard_t shard, std::exception_ptr e) { - pushes.at(shard).set_exception(e); + auto it = pushes.find(shard); + if (it != pushes.end()) { + auto &push_promise = it->second; + push_promise.set_exception(e); + pushes.erase(it); + } } void interrupt(std::string_view why) { - readable.set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); - recovered.set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); - pulled.set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); + if (readable) { + readable->set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); + readable.reset(); + } + if (recovered) { + recovered->set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); + recovered.reset(); + } + if (pulled) { + pulled->set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); + pulled.reset(); + } for (auto& [pg_shard, pr] : pushes) { - pr.set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); + pr.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); } + pushes.clear(); } void stop(); void dump_detail(Formatter* f) const { |