diff options
Diffstat (limited to 'src/crimson/osd')
-rw-r--r-- | src/crimson/osd/backfill_state.cc | 39 | ||||
-rw-r--r-- | src/crimson/osd/backfill_state.h | 8 | ||||
-rw-r--r-- | src/crimson/osd/osd_operation.h | 12 | ||||
-rw-r--r-- | src/crimson/osd/pg.cc | 11 | ||||
-rw-r--r-- | src/crimson/osd/pg.h | 5 | ||||
-rw-r--r-- | src/crimson/osd/pg_recovery.cc | 19 | ||||
-rw-r--r-- | src/crimson/osd/pg_recovery.h | 2 | ||||
-rw-r--r-- | src/crimson/osd/replicated_backend.cc | 32 | ||||
-rw-r--r-- | src/crimson/osd/replicated_recovery_backend.cc | 9 | ||||
-rw-r--r-- | src/crimson/osd/shard_services.h | 1 |
10 files changed, 116 insertions, 22 deletions
diff --git a/src/crimson/osd/backfill_state.cc b/src/crimson/osd/backfill_state.cc index 1392ee330ac..f957f072c93 100644 --- a/src/crimson/osd/backfill_state.cc +++ b/src/crimson/osd/backfill_state.cc @@ -342,6 +342,7 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx) do { if (!backfill_listener().budget_available()) { + DEBUGDPP("throttle failed, turning to Waiting", pg()); post_event(RequestWaiting{}); return; } else if (should_rescan_replicas(backfill_state().peer_backfill_info, @@ -380,16 +381,25 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx) } } while (!all_emptied(primary_bi, backfill_state().peer_backfill_info)); - if (backfill_state().progress_tracker->tracked_objects_completed() - && Enqueuing::all_enqueued(peering_state(), - backfill_state().backfill_info, - backfill_state().peer_backfill_info)) { - backfill_state().last_backfill_started = hobject_t::get_max(); - backfill_listener().update_peers_last_backfill(hobject_t::get_max()); + if (should_rescan_primary(backfill_state().peer_backfill_info, + primary_bi)) { + // need to grab one another chunk of the object namespace and restart + // the queueing. + DEBUGDPP("reached end for current local chunk", pg()); + post_event(RequestPrimaryScanning{}); + return; + } else { + if (backfill_state().progress_tracker->tracked_objects_completed() + && Enqueuing::all_enqueued(peering_state(), + backfill_state().backfill_info, + backfill_state().peer_backfill_info)) { + backfill_state().last_backfill_started = hobject_t::get_max(); + backfill_listener().update_peers_last_backfill(hobject_t::get_max()); + } + DEBUGDPP("reached end for both local and all peers " + "but still has in-flight operations", pg()); + post_event(RequestWaiting{}); } - DEBUGDPP("reached end for both local and all peers " - "but still has in-flight operations", pg()); - post_event(RequestWaiting{}); } // -- PrimaryScanning @@ -676,6 +686,17 @@ void BackfillState::enqueue_standalone_push( backfill_machine.backfill_listener.enqueue_push(obj, v, peers); } +void BackfillState::enqueue_standalone_delete( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers) +{ + progress_tracker->enqueue_drop(obj); + for (auto bt : peers) { + backfill_machine.backfill_listener.enqueue_drop(bt, obj, v); + } +} + std::ostream &operator<<(std::ostream &out, const BackfillState::PGFacade &pg) { return pg.print(out); } diff --git a/src/crimson/osd/backfill_state.h b/src/crimson/osd/backfill_state.h index 463be4a7a2e..517a02ea4df 100644 --- a/src/crimson/osd/backfill_state.h +++ b/src/crimson/osd/backfill_state.h @@ -62,6 +62,8 @@ struct BackfillState { struct CancelBackfill : sc::event<CancelBackfill> { }; + struct ThrottleAcquired : sc::event<ThrottleAcquired> { + }; private: // internal events struct RequestPrimaryScanning : sc::event<RequestPrimaryScanning> { @@ -257,6 +259,7 @@ public: sc::transition<RequestDone, Done>, sc::custom_reaction<CancelBackfill>, sc::custom_reaction<Triggered>, + sc::transition<ThrottleAcquired, Enqueuing>, sc::transition<sc::event_base, Crashed>>; explicit Waiting(my_context); sc::result react(ObjectPushed); @@ -289,6 +292,11 @@ public: const hobject_t &obj, const eversion_t &v, const std::vector<pg_shard_t> &peers); + void enqueue_standalone_delete( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers); + bool is_triggered() const { return backfill_machine.triggering_event() != nullptr; diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 6376dabd04d..394375c1129 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -341,6 +341,18 @@ public: with_throttle_while(std::forward<Args>(args)...), *this); } + // Returns std::nullopt if the throttle is acquired immediately, + // returns the future for the acquiring otherwise + std::optional<seastar::future<>> + try_acquire_throttle_now(crimson::osd::scheduler::params_t params) { + if (!max_in_progress || in_progress < max_in_progress) { + ++in_progress; + --pending; + return std::nullopt; + } + return acquire_throttle(params); + } + private: void dump_detail(Formatter *f) const final; diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index bf521498abf..2746e730f2b 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -879,6 +879,17 @@ void PG::enqueue_push_for_backfill( backfill_state->enqueue_standalone_push(obj, v, peers); } +void PG::enqueue_delete_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers) +{ + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_delete(obj, v, peers); +} + PG::interruptible_future< std::tuple<PG::interruptible_future<>, PG::interruptible_future<>>> diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 6db73ee835b..06038c0aa00 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -904,6 +904,11 @@ private: const hobject_t &obj, const eversion_t &v, const std::vector<pg_shard_t> &peers); + void enqueue_delete_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers); + bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const; bool can_discard_op(const MOSDOp& m) const; void context_registry_on_change(); diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index ec3af0d2b00..5eef584c776 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -67,8 +67,6 @@ PGRecovery::start_recovery_ops( if (max_to_start > 0) { max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started); } - using interruptor = - crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>; return interruptor::parallel_for_each(started, [] (auto&& ifut) { return std::move(ifut); @@ -609,8 +607,21 @@ void PGRecovery::update_peers_last_backfill( bool PGRecovery::budget_available() const { - // TODO: the limits! - return true; + crimson::osd::scheduler::params_t params = + {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort}; + auto &ss = pg->get_shard_services(); + auto futopt = ss.try_acquire_throttle_now(std::move(params)); + if (!futopt) { + return true; + } + std::ignore = interruptor::make_interruptible(std::move(*futopt) + ).then_interruptible([this] { + assert(!backfill_state->is_triggered()); + using BackfillState = crimson::osd::BackfillState; + backfill_state->process_event( + BackfillState::ThrottleAcquired{}.intrusive_from_this()); + }); + return false; } void PGRecovery::on_pg_clean() diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h index 657e6d3e888..5c7b5c5ef2b 100644 --- a/src/crimson/osd/pg_recovery.h +++ b/src/crimson/osd/pg_recovery.h @@ -25,6 +25,8 @@ class PGBackend; class PGRecovery : public crimson::osd::BackfillState::BackfillListener { public: + using interruptor = + crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>; template <typename T = void> using interruptible_future = RecoveryBackend::interruptible_future<T>; PGRecovery(PGRecoveryListener* pg) : pg(pg) {} diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index f09cd147ea9..6c8abecffaf 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -96,11 +96,18 @@ ReplicatedBackend::submit_transaction( bufferlist encoded_txn; encode(txn, encoded_txn); + bool is_delete = false; for (auto &le : log_entries) { le.mark_unrollbackable(); + if (le.is_delete()) { + is_delete = true; + } } + co_await pg.update_snap_map(log_entries, txn); + std::vector<pg_shard_t> to_push_clone; + std::vector<pg_shard_t> to_push_delete; auto sends = std::make_unique<std::vector<seastar::future<>>>(); for (auto &pg_shard : pg_shards) { if (pg_shard == whoami) { @@ -115,12 +122,17 @@ ReplicatedBackend::submit_transaction( m = new_repop_msg( pg_shard, hoid, encoded_txn, osd_op_p, min_epoch, map_epoch, log_entries, false, tid); - if (_new_clone && pg.is_missing_on_peer(pg_shard, hoid)) { - // The head is in the push queue but hasn't been pushed yet. - // We need to ensure that the newly created clone will be - // pushed as well, otherwise we might skip it. - // See: https://tracker.ceph.com/issues/68808 - to_push_clone.push_back(pg_shard); + if (pg.is_missing_on_peer(pg_shard, hoid)) { + if (_new_clone) { + // The head is in the push queue but hasn't been pushed yet. + // We need to ensure that the newly created clone will be + // pushed as well, otherwise we might skip it. + // See: https://tracker.ceph.com/issues/68808 + to_push_clone.push_back(pg_shard); + } + if (is_delete) { + to_push_delete.push_back(pg_shard); + } } } pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); @@ -130,8 +142,6 @@ ReplicatedBackend::submit_transaction( pg_shard.osd, std::move(m), map_epoch)); } - co_await pg.update_snap_map(log_entries, txn); - pg.log_operation( std::move(log_entries), osd_op_p.pg_trim_to, @@ -157,7 +167,8 @@ ReplicatedBackend::submit_transaction( return seastar::now(); } return peers->all_committed.get_shared_future(); - }).then_interruptible([pending_txn, this, _new_clone, + }).then_interruptible([pending_txn, this, _new_clone, &hoid, + to_push_delete=std::move(to_push_delete), to_push_clone=std::move(to_push_clone)] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); @@ -167,6 +178,9 @@ ReplicatedBackend::submit_transaction( _new_clone->obs.oi.version, to_push_clone); } + if (!to_push_delete.empty()) { + pg.enqueue_delete_for_backfill(hoid, {}, to_push_delete); + } return seastar::make_ready_future< crimson::osd::acked_peers_t>(std::move(acked_peers)); }); diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 30e7a8a333d..0d6c9d38236 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -35,6 +35,15 @@ ReplicatedRecoveryBackend::recover_object( logger().debug("recover_object: loading obc: {}", soid); return pg.obc_loader.with_obc<RWState::RWREAD>(soid, [this, soid, need](auto head, auto obc) { + if (!obc->obs.exists) { + // XXX: this recovery must be triggered by backfills and the corresponding + // object must have been deleted by some client request after the object + // is enqueued for push but before the lock is acquired by the recovery. + // + // Abort the recovery in this case, a "recover_delete" must have been + // added for this object by the client request that deleted it. + return interruptor::now(); + } logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid); auto& recovery_waiter = get_recovering(soid); recovery_waiter.obc = obc; diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index f4d4b4c2eb4..f1ed9b8d911 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -591,6 +591,7 @@ public: FORWARD_TO_OSD_SINGLETON(get_pool_info) FORWARD(with_throttle_while, with_throttle_while, local_state.throttler) + FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler) FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg) FORWARD_TO_OSD_SINGLETON(send_incremental_map) |