summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd
diff options
context:
space:
mode:
authorRadosław Zarzyński <rzarzyns@redhat.com>2022-04-22 16:50:47 +0200
committerRadosław Zarzyński <rzarzyns@redhat.com>2022-05-05 04:06:32 +0200
commitb7e2b11595c6e2cd2c638135ef6f044c40efaa87 (patch)
tree5391ed1514ed84570d07e361b8cc2c6078e040a5 /src/crimson/osd
parentcrimson/osd: bring AggregateBlockingEvent. (diff)
downloadceph-b7e2b11595c6e2cd2c638135ef6f044c40efaa87.tar.xz
ceph-b7e2b11595c6e2cd2c638135ef6f044c40efaa87.zip
crimson/osd: migrate AggregateBlocker-related ops to new tracking infra
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
Diffstat (limited to 'src/crimson/osd')
-rw-r--r--src/crimson/osd/osd_operations/background_recovery.cc32
-rw-r--r--src/crimson/osd/osd_operations/background_recovery.h9
-rw-r--r--src/crimson/osd/pg_recovery.cc111
-rw-r--r--src/crimson/osd/pg_recovery.h29
-rw-r--r--src/crimson/osd/recovery_backend.h13
5 files changed, 118 insertions, 76 deletions
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc
index d3f57d7dee6..0c40e270534 100644
--- a/src/crimson/osd/osd_operations/background_recovery.cc
+++ b/src/crimson/osd/osd_operations/background_recovery.cc
@@ -18,6 +18,22 @@ namespace {
}
}
+namespace crimson {
+ template <>
+ struct EventBackendRegistry<osd::UrgentRecovery> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+
+ template <>
+ struct EventBackendRegistry<osd::PglogBasedRecovery> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+}
+
namespace crimson::osd {
template <class T>
@@ -97,9 +113,10 @@ UrgentRecovery::do_recovery()
{
logger().debug("{}: {}", __func__, *this);
if (!pg->has_reset_since(epoch_started)) {
- return with_blocking_future_interruptible<interruptor::condition>(
- pg->get_recovery_handler()->recover_missing(soid, need)
- ).then_interruptible([] {
+ return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
+ }).then_interruptible([] {
return seastar::make_ready_future<bool>(false);
});
}
@@ -143,9 +160,12 @@ PglogBasedRecovery::do_recovery()
if (pg->has_reset_since(epoch_started)) {
return seastar::make_ready_future<bool>(false);
}
- return with_blocking_future_interruptible<interruptor::condition>(
- pg->get_recovery_handler()->start_recovery_ops(
- crimson::common::local_conf()->osd_recovery_max_single_start));
+ return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->get_recovery_handler()->start_recovery_ops(
+ trigger,
+ crimson::common::local_conf()->osd_recovery_max_single_start);
+ });
}
BackfillRecovery::BackfillRecoveryPipeline &BackfillRecovery::bp(PG &pg)
diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h
index 8f726c51a63..1f293e12ba7 100644
--- a/src/crimson/osd/osd_operations/background_recovery.h
+++ b/src/crimson/osd/osd_operations/background_recovery.h
@@ -7,6 +7,7 @@
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/recovery_backend.h"
#include "crimson/common/type_helpers.h"
namespace crimson::osd {
@@ -63,6 +64,10 @@ public:
epoch_t epoch_started);
void print(std::ostream&) const final;
+ std::tuple<
+ RecoveryBackend::RecoveryBlockingEvent
+ > tracking_events;
+
private:
void dump_detail(Formatter* f) const final;
interruptible_future<bool> do_recovery() override;
@@ -78,6 +83,10 @@ public:
epoch_t epoch_started,
float delay = 0);
+ std::tuple<
+ RecoveryBackend::RecoveryBlockingEvent
+ > tracking_events;
+
private:
interruptible_future<bool> do_recovery() override;
};
diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc
index f4a4058dc90..62c8730fedb 100644
--- a/src/crimson/osd/pg_recovery.cc
+++ b/src/crimson/osd/pg_recovery.cc
@@ -34,8 +34,10 @@ void PGRecovery::start_pglogbased_recovery()
float(0.001));
}
-PGRecovery::blocking_interruptible_future<bool>
-PGRecovery::start_recovery_ops(size_t max_to_start)
+PGRecovery::interruptible_future<bool>
+PGRecovery::start_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
+ size_t max_to_start)
{
assert(pg->is_primary());
assert(pg->is_peered());
@@ -49,16 +51,20 @@ PGRecovery::start_recovery_ops(size_t max_to_start)
assert(!pg->is_backfilling());
assert(!pg->get_peering_state().is_deleting());
- std::vector<blocking_interruptible_future<>> started;
+ std::vector<RecoveryBackend::interruptible_future<>> new_started;
+ std::vector<interruptible_future<>> started;
+ new_started.reserve(max_to_start);
started.reserve(max_to_start);
- max_to_start -= start_primary_recovery_ops(max_to_start, &started);
+ max_to_start -= start_primary_recovery_ops(trigger, max_to_start, &started);
if (max_to_start > 0) {
- max_to_start -= start_replica_recovery_ops(max_to_start, &started);
+ max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
}
- return crimson::join_blocking_interruptible_futures<
- ::crimson::osd::IOInterruptCondition>(std::move(started)).then_interruptible<
- ::crimson::osd::IOInterruptCondition>(
- [this] {
+ using interruptor =
+ crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
+ return interruptor::parallel_for_each(std::move(new_started),
+ [] (auto&& ifut) {
+ return std::move(ifut);
+ }).then_interruptible([this] {
bool done = !pg->get_peering_state().needs_recovery();
if (done) {
logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
@@ -93,8 +99,9 @@ PGRecovery::start_recovery_ops(size_t max_to_start)
}
size_t PGRecovery::start_primary_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
size_t max_to_start,
- std::vector<PGRecovery::blocking_interruptible_future<>> *out)
+ std::vector<PGRecovery::interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
@@ -149,13 +156,13 @@ size_t PGRecovery::start_primary_recovery_ops(
// TODO: handle lost/unfound
if (pg->get_recovery_backend()->is_recovering(soid)) {
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking<
- ::crimson::osd::IOInterruptCondition>());
+ out->emplace_back(recovery_waiter.wait_for_recovered(
+ *trigger.create_part_trigger()));
++started;
} else if (pg->get_recovery_backend()->is_recovering(head)) {
++skipped;
} else {
- out->push_back(recover_missing(soid, item.need));
+ out->emplace_back(recover_missing(trigger, soid, item.need));
++started;
}
@@ -169,8 +176,9 @@ size_t PGRecovery::start_primary_recovery_ops(
}
size_t PGRecovery::start_replica_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
size_t max_to_start,
- std::vector<PGRecovery::blocking_interruptible_future<>> *out)
+ std::vector<PGRecovery::interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
@@ -217,8 +225,8 @@ size_t PGRecovery::start_replica_recovery_ops(
if (pg->get_recovery_backend()->is_recovering(soid)) {
logger().debug("{}: already recovering object {}", __func__, soid);
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking<
- ::crimson::osd::IOInterruptCondition>());
+ out->emplace_back(recovery_waiter.wait_for_recovered(
+ *trigger.create_part_trigger()));
started++;
continue;
}
@@ -227,8 +235,9 @@ size_t PGRecovery::start_replica_recovery_ops(
logger().debug("{}: soid {} is a delete, removing", __func__, soid);
map<hobject_t,pg_missing_item>::const_iterator r =
pm.get_items().find(soid);
- started += prep_object_replica_deletes(
- soid, r->second.need, out);
+ started++;
+ out->emplace_back(
+ prep_object_replica_deletes(trigger, soid, r->second.need));
continue;
}
@@ -248,23 +257,27 @@ size_t PGRecovery::start_replica_recovery_ops(
logger().debug("{}: recover_object_replicas({})", __func__,soid);
map<hobject_t,pg_missing_item>::const_iterator r = pm.get_items().find(
soid);
- started += prep_object_replica_pushes(
- soid, r->second.need, out);
+ started++;
+ out->emplace_back(
+ prep_object_replica_pushes(trigger, soid, r->second.need));
}
}
return started;
}
-PGRecovery::blocking_interruptible_future<>
+PGRecovery::interruptible_future<>
PGRecovery::recover_missing(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t &soid, eversion_t need)
{
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
- return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_delete(soid, need));
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
+ pg->get_recovery_backend()->recover_delete(soid, need));
} else {
- return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
pg->get_recovery_backend()->recover_object(soid, need)
.handle_exception_interruptible(
[=, soid = std::move(soid)] (auto e) {
@@ -275,41 +288,37 @@ PGRecovery::recover_missing(
}
}
-size_t PGRecovery::prep_object_replica_deletes(
+RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_deletes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
+ eversion_t need)
{
- in_progress->push_back(
- pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
- [=] {
- object_stat_sum_t stat_diff;
- stat_diff.num_objects_recovered = 1;
- on_global_recover(soid, stat_diff, true);
- return seastar::make_ready_future<>();
- })
- )
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
+ pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
+ [=] {
+ object_stat_sum_t stat_diff;
+ stat_diff.num_objects_recovered = 1;
+ on_global_recover(soid, stat_diff, true);
+ return seastar::make_ready_future<>();
+ })
);
- return 1;
}
-size_t PGRecovery::prep_object_replica_pushes(
+RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
+ eversion_t need)
{
- in_progress->push_back(
- pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_object(soid, need)
- .handle_exception_interruptible(
- [=, soid = std::move(soid)] (auto e) {
- on_failed_recover({ pg->get_pg_whoami() }, soid, need);
- return seastar::make_ready_future<>();
- })
- )
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
+ pg->get_recovery_backend()->recover_object(soid, need)
+ .handle_exception_interruptible(
+ [=, soid = std::move(soid)] (auto e) {
+ on_failed_recover({ pg->get_pg_whoami() }, soid, need);
+ return seastar::make_ready_future<>();
+ })
);
- return 1;
}
void PGRecovery::on_local_recover(
diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h
index cdb07cc5c39..7840d85be08 100644
--- a/src/crimson/osd/pg_recovery.h
+++ b/src/crimson/osd/pg_recovery.h
@@ -20,14 +20,14 @@ class PGBackend;
class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
public:
template <typename T = void>
- using blocking_interruptible_future =
- ::crimson::blocking_interruptible_future<
- ::crimson::osd::IOInterruptCondition, T>;
+ using interruptible_future = RecoveryBackend::interruptible_future<T>;
PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
virtual ~PGRecovery() {}
void start_pglogbased_recovery();
- blocking_interruptible_future<bool> start_recovery_ops(size_t max_to_start);
+ interruptible_future<bool> start_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
+ size_t max_to_start);
void on_backfill_reserved();
void dispatch_backfill_event(
boost::intrusive_ptr<const boost::statechart::event_base> evt);
@@ -36,25 +36,28 @@ public:
private:
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
size_t max_to_start,
- std::vector<blocking_interruptible_future<>> *out);
+ std::vector<interruptible_future<>> *out);
size_t start_replica_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
size_t max_to_start,
- std::vector<blocking_interruptible_future<>> *out);
+ std::vector<interruptible_future<>> *out);
std::vector<pg_shard_t> get_replica_recovery_order() const {
return pg->get_replica_recovery_order();
}
- blocking_interruptible_future<> recover_missing(
+ RecoveryBackend::interruptible_future<> recover_missing(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
const hobject_t &soid, eversion_t need);
- size_t prep_object_replica_deletes(
+ RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<blocking_interruptible_future<>> *in_progress);
- size_t prep_object_replica_pushes(
+ eversion_t need);
+ RecoveryBackend::interruptible_future<> prep_object_replica_pushes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<blocking_interruptible_future<>> *in_progress);
+ eversion_t need);
void on_local_recover(
const hobject_t& soid,
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h
index eb621487a18..cc48dd7d0de 100644
--- a/src/crimson/osd/recovery_backend.h
+++ b/src/crimson/osd/recovery_backend.h
@@ -25,7 +25,7 @@ namespace crimson::osd{
class PGBackend;
class RecoveryBackend {
-protected:
+public:
class WaitForObjectRecovery;
public:
template <typename T = void>
@@ -119,6 +119,7 @@ protected:
object_stat_sum_t stat;
};
+public:
class WaitForObjectRecovery : public crimson::BlockerT<WaitForObjectRecovery> {
seastar::shared_promise<> readable, recovered, pulled;
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
@@ -138,11 +139,8 @@ protected:
seastar::future<> wait_for_recovered() {
return recovered.get_shared_future();
}
- template <typename InterruptCond>
- crimson::blocking_interruptible_future<InterruptCond>
- wait_for_recovered_blocking() {
- return make_blocking_interruptible_future<InterruptCond>(
- recovered.get_shared_future());
+ seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) {
+ return trigger.maybe_record_blocking(recovered.get_shared_future(), *this);
}
seastar::future<> wait_for_pull() {
return pulled.get_shared_future();
@@ -178,6 +176,9 @@ protected:
void dump_detail(Formatter* f) const {
}
};
+ using RecoveryBlockingEvent =
+ crimson::AggregateBlockingEvent<WaitForObjectRecovery::BlockingEvent>;
+protected:
std::map<hobject_t, WaitForObjectRecovery> recovering;
hobject_t get_temp_recovery_object(
const hobject_t& target,