diff options
author | Radosław Zarzyński <rzarzyns@redhat.com> | 2022-04-22 16:50:47 +0200 |
---|---|---|
committer | Radosław Zarzyński <rzarzyns@redhat.com> | 2022-05-05 04:06:32 +0200 |
commit | b7e2b11595c6e2cd2c638135ef6f044c40efaa87 (patch) | |
tree | 5391ed1514ed84570d07e361b8cc2c6078e040a5 /src/crimson/osd | |
parent | crimson/osd: bring AggregateBlockingEvent. (diff) | |
download | ceph-b7e2b11595c6e2cd2c638135ef6f044c40efaa87.tar.xz ceph-b7e2b11595c6e2cd2c638135ef6f044c40efaa87.zip |
crimson/osd: migrate AggregateBlocker-related ops to new tracking infra
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
Diffstat (limited to 'src/crimson/osd')
-rw-r--r-- | src/crimson/osd/osd_operations/background_recovery.cc | 32 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/background_recovery.h | 9 | ||||
-rw-r--r-- | src/crimson/osd/pg_recovery.cc | 111 | ||||
-rw-r--r-- | src/crimson/osd/pg_recovery.h | 29 | ||||
-rw-r--r-- | src/crimson/osd/recovery_backend.h | 13 |
5 files changed, 118 insertions, 76 deletions
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc index d3f57d7dee6..0c40e270534 100644 --- a/src/crimson/osd/osd_operations/background_recovery.cc +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -18,6 +18,22 @@ namespace { } } +namespace crimson { + template <> + struct EventBackendRegistry<osd::UrgentRecovery> { + static std::tuple<> get_backends() { + return {}; + } + }; + + template <> + struct EventBackendRegistry<osd::PglogBasedRecovery> { + static std::tuple<> get_backends() { + return {}; + } + }; +} + namespace crimson::osd { template <class T> @@ -97,9 +113,10 @@ UrgentRecovery::do_recovery() { logger().debug("{}: {}", __func__, *this); if (!pg->has_reset_since(epoch_started)) { - return with_blocking_future_interruptible<interruptor::condition>( - pg->get_recovery_handler()->recover_missing(soid, need) - ).then_interruptible([] { + return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent, + interruptor>([this] (auto&& trigger) { + return pg->get_recovery_handler()->recover_missing(trigger, soid, need); + }).then_interruptible([] { return seastar::make_ready_future<bool>(false); }); } @@ -143,9 +160,12 @@ PglogBasedRecovery::do_recovery() if (pg->has_reset_since(epoch_started)) { return seastar::make_ready_future<bool>(false); } - return with_blocking_future_interruptible<interruptor::condition>( - pg->get_recovery_handler()->start_recovery_ops( - crimson::common::local_conf()->osd_recovery_max_single_start)); + return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent, + interruptor>([this] (auto&& trigger) { + return pg->get_recovery_handler()->start_recovery_ops( + trigger, + crimson::common::local_conf()->osd_recovery_max_single_start); + }); } BackfillRecovery::BackfillRecoveryPipeline &BackfillRecovery::bp(PG &pg) diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h index 8f726c51a63..1f293e12ba7 100644 --- a/src/crimson/osd/osd_operations/background_recovery.h +++ b/src/crimson/osd/osd_operations/background_recovery.h @@ -7,6 +7,7 @@ #include "crimson/net/Connection.h" #include "crimson/osd/osd_operation.h" +#include "crimson/osd/recovery_backend.h" #include "crimson/common/type_helpers.h" namespace crimson::osd { @@ -63,6 +64,10 @@ public: epoch_t epoch_started); void print(std::ostream&) const final; + std::tuple< + RecoveryBackend::RecoveryBlockingEvent + > tracking_events; + private: void dump_detail(Formatter* f) const final; interruptible_future<bool> do_recovery() override; @@ -78,6 +83,10 @@ public: epoch_t epoch_started, float delay = 0); + std::tuple< + RecoveryBackend::RecoveryBlockingEvent + > tracking_events; + private: interruptible_future<bool> do_recovery() override; }; diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index f4a4058dc90..62c8730fedb 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -34,8 +34,10 @@ void PGRecovery::start_pglogbased_recovery() float(0.001)); } -PGRecovery::blocking_interruptible_future<bool> -PGRecovery::start_recovery_ops(size_t max_to_start) +PGRecovery::interruptible_future<bool> +PGRecovery::start_recovery_ops( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, + size_t max_to_start) { assert(pg->is_primary()); assert(pg->is_peered()); @@ -49,16 +51,20 @@ PGRecovery::start_recovery_ops(size_t max_to_start) assert(!pg->is_backfilling()); assert(!pg->get_peering_state().is_deleting()); - std::vector<blocking_interruptible_future<>> started; + std::vector<RecoveryBackend::interruptible_future<>> new_started; + std::vector<interruptible_future<>> started; + new_started.reserve(max_to_start); started.reserve(max_to_start); - max_to_start -= start_primary_recovery_ops(max_to_start, &started); + max_to_start -= start_primary_recovery_ops(trigger, max_to_start, &started); if (max_to_start > 0) { - max_to_start -= start_replica_recovery_ops(max_to_start, &started); + max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started); } - return crimson::join_blocking_interruptible_futures< - ::crimson::osd::IOInterruptCondition>(std::move(started)).then_interruptible< - ::crimson::osd::IOInterruptCondition>( - [this] { + using interruptor = + crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>; + return interruptor::parallel_for_each(std::move(new_started), + [] (auto&& ifut) { + return std::move(ifut); + }).then_interruptible([this] { bool done = !pg->get_peering_state().needs_recovery(); if (done) { logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}", @@ -93,8 +99,9 @@ PGRecovery::start_recovery_ops(size_t max_to_start) } size_t PGRecovery::start_primary_recovery_ops( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, size_t max_to_start, - std::vector<PGRecovery::blocking_interruptible_future<>> *out) + std::vector<PGRecovery::interruptible_future<>> *out) { if (!pg->is_recovering()) { return 0; @@ -149,13 +156,13 @@ size_t PGRecovery::start_primary_recovery_ops( // TODO: handle lost/unfound if (pg->get_recovery_backend()->is_recovering(soid)) { auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid); - out->push_back(recovery_waiter.wait_for_recovered_blocking< - ::crimson::osd::IOInterruptCondition>()); + out->emplace_back(recovery_waiter.wait_for_recovered( + *trigger.create_part_trigger())); ++started; } else if (pg->get_recovery_backend()->is_recovering(head)) { ++skipped; } else { - out->push_back(recover_missing(soid, item.need)); + out->emplace_back(recover_missing(trigger, soid, item.need)); ++started; } @@ -169,8 +176,9 @@ size_t PGRecovery::start_primary_recovery_ops( } size_t PGRecovery::start_replica_recovery_ops( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, size_t max_to_start, - std::vector<PGRecovery::blocking_interruptible_future<>> *out) + std::vector<PGRecovery::interruptible_future<>> *out) { if (!pg->is_recovering()) { return 0; @@ -217,8 +225,8 @@ size_t PGRecovery::start_replica_recovery_ops( if (pg->get_recovery_backend()->is_recovering(soid)) { logger().debug("{}: already recovering object {}", __func__, soid); auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid); - out->push_back(recovery_waiter.wait_for_recovered_blocking< - ::crimson::osd::IOInterruptCondition>()); + out->emplace_back(recovery_waiter.wait_for_recovered( + *trigger.create_part_trigger())); started++; continue; } @@ -227,8 +235,9 @@ size_t PGRecovery::start_replica_recovery_ops( logger().debug("{}: soid {} is a delete, removing", __func__, soid); map<hobject_t,pg_missing_item>::const_iterator r = pm.get_items().find(soid); - started += prep_object_replica_deletes( - soid, r->second.need, out); + started++; + out->emplace_back( + prep_object_replica_deletes(trigger, soid, r->second.need)); continue; } @@ -248,23 +257,27 @@ size_t PGRecovery::start_replica_recovery_ops( logger().debug("{}: recover_object_replicas({})", __func__,soid); map<hobject_t,pg_missing_item>::const_iterator r = pm.get_items().find( soid); - started += prep_object_replica_pushes( - soid, r->second.need, out); + started++; + out->emplace_back( + prep_object_replica_pushes(trigger, soid, r->second.need)); } } return started; } -PGRecovery::blocking_interruptible_future<> +PGRecovery::interruptible_future<> PGRecovery::recover_missing( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, const hobject_t &soid, eversion_t need) { if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) { - return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( - pg->get_recovery_backend()->recover_delete(soid, need)); + return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + trigger, + pg->get_recovery_backend()->recover_delete(soid, need)); } else { - return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( + return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + trigger, pg->get_recovery_backend()->recover_object(soid, need) .handle_exception_interruptible( [=, soid = std::move(soid)] (auto e) { @@ -275,41 +288,37 @@ PGRecovery::recover_missing( } } -size_t PGRecovery::prep_object_replica_deletes( +RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_deletes( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, const hobject_t& soid, - eversion_t need, - std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress) + eversion_t need) { - in_progress->push_back( - pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( - pg->get_recovery_backend()->push_delete(soid, need).then_interruptible( - [=] { - object_stat_sum_t stat_diff; - stat_diff.num_objects_recovered = 1; - on_global_recover(soid, stat_diff, true); - return seastar::make_ready_future<>(); - }) - ) + return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + trigger, + pg->get_recovery_backend()->push_delete(soid, need).then_interruptible( + [=] { + object_stat_sum_t stat_diff; + stat_diff.num_objects_recovered = 1; + on_global_recover(soid, stat_diff, true); + return seastar::make_ready_future<>(); + }) ); - return 1; } -size_t PGRecovery::prep_object_replica_pushes( +RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, const hobject_t& soid, - eversion_t need, - std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress) + eversion_t need) { - in_progress->push_back( - pg->get_recovery_backend()->add_recovering(soid).make_blocking_future( - pg->get_recovery_backend()->recover_object(soid, need) - .handle_exception_interruptible( - [=, soid = std::move(soid)] (auto e) { - on_failed_recover({ pg->get_pg_whoami() }, soid, need); - return seastar::make_ready_future<>(); - }) - ) + return pg->get_recovery_backend()->add_recovering(soid).track_blocking( + trigger, + pg->get_recovery_backend()->recover_object(soid, need) + .handle_exception_interruptible( + [=, soid = std::move(soid)] (auto e) { + on_failed_recover({ pg->get_pg_whoami() }, soid, need); + return seastar::make_ready_future<>(); + }) ); - return 1; } void PGRecovery::on_local_recover( diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index cdb07cc5c39..7840d85be08 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -20,14 +20,14 @@ class PGBackend; class PGRecovery : public crimson::osd::BackfillState::BackfillListener { public: template <typename T = void> - using blocking_interruptible_future = - ::crimson::blocking_interruptible_future< - ::crimson::osd::IOInterruptCondition, T>; + using interruptible_future = RecoveryBackend::interruptible_future<T>; PGRecovery(PGRecoveryListener* pg) : pg(pg) {} virtual ~PGRecovery() {} void start_pglogbased_recovery(); - blocking_interruptible_future<bool> start_recovery_ops(size_t max_to_start); + interruptible_future<bool> start_recovery_ops( + RecoveryBackend::RecoveryBlockingEvent::TriggerI&, + size_t max_to_start); void on_backfill_reserved(); void dispatch_backfill_event( boost::intrusive_ptr<const boost::statechart::event_base> evt); @@ -36,25 +36,28 @@ public: private: PGRecoveryListener* pg; size_t start_primary_recovery_ops( + RecoveryBackend::RecoveryBlockingEvent::TriggerI&, size_t max_to_start, - std::vector<blocking_interruptible_future<>> *out); + std::vector<interruptible_future<>> *out); size_t start_replica_recovery_ops( + RecoveryBackend::RecoveryBlockingEvent::TriggerI&, size_t max_to_start, - std::vector<blocking_interruptible_future<>> *out); + std::vector<interruptible_future<>> *out); std::vector<pg_shard_t> get_replica_recovery_order() const { return pg->get_replica_recovery_order(); } - blocking_interruptible_future<> recover_missing( + RecoveryBackend::interruptible_future<> recover_missing( + RecoveryBackend::RecoveryBlockingEvent::TriggerI&, const hobject_t &soid, eversion_t need); - size_t prep_object_replica_deletes( + RecoveryBackend::interruptible_future<> prep_object_replica_deletes( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, const hobject_t& soid, - eversion_t need, - std::vector<blocking_interruptible_future<>> *in_progress); - size_t prep_object_replica_pushes( + eversion_t need); + RecoveryBackend::interruptible_future<> prep_object_replica_pushes( + RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger, const hobject_t& soid, - eversion_t need, - std::vector<blocking_interruptible_future<>> *in_progress); + eversion_t need); void on_local_recover( const hobject_t& soid, diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index eb621487a18..cc48dd7d0de 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -25,7 +25,7 @@ namespace crimson::osd{ class PGBackend; class RecoveryBackend { -protected: +public: class WaitForObjectRecovery; public: template <typename T = void> @@ -119,6 +119,7 @@ protected: object_stat_sum_t stat; }; +public: class WaitForObjectRecovery : public crimson::BlockerT<WaitForObjectRecovery> { seastar::shared_promise<> readable, recovered, pulled; std::map<pg_shard_t, seastar::shared_promise<>> pushes; @@ -138,11 +139,8 @@ protected: seastar::future<> wait_for_recovered() { return recovered.get_shared_future(); } - template <typename InterruptCond> - crimson::blocking_interruptible_future<InterruptCond> - wait_for_recovered_blocking() { - return make_blocking_interruptible_future<InterruptCond>( - recovered.get_shared_future()); + seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) { + return trigger.maybe_record_blocking(recovered.get_shared_future(), *this); } seastar::future<> wait_for_pull() { return pulled.get_shared_future(); @@ -178,6 +176,9 @@ protected: void dump_detail(Formatter* f) const { } }; + using RecoveryBlockingEvent = + crimson::AggregateBlockingEvent<WaitForObjectRecovery::BlockingEvent>; +protected: std::map<hobject_t, WaitForObjectRecovery> recovering; hobject_t get_temp_recovery_object( const hobject_t& target, |