diff options
Diffstat (limited to 'src/crimson')
20 files changed, 269 insertions, 66 deletions
diff --git a/src/crimson/os/alienstore/CMakeLists.txt b/src/crimson/os/alienstore/CMakeLists.txt index e446d4099f7..0fa69984c53 100644 --- a/src/crimson/os/alienstore/CMakeLists.txt +++ b/src/crimson/os/alienstore/CMakeLists.txt @@ -50,6 +50,7 @@ set(alien_store_srcs ${PROJECT_SOURCE_DIR}/src/os/bluestore/Allocator.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/AvlAllocator.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/BtreeAllocator.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/Btree2Allocator.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapFreelistManager.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueFS.cc ${PROJECT_SOURCE_DIR}/src/os/bluestore/bluefs_types.cc diff --git a/src/crimson/os/seastore/async_cleaner.h b/src/crimson/os/seastore/async_cleaner.h index 485497b9a9c..424247c5bdc 100644 --- a/src/crimson/os/seastore/async_cleaner.h +++ b/src/crimson/os/seastore/async_cleaner.h @@ -350,12 +350,12 @@ public: sea_time_point modify_time) = 0; /** - * get_extent_if_live + * get_extents_if_live * * Returns extent at specified location if still referenced by * lba_manager and not removed by t. * - * See TransactionManager::get_extent_if_live and + * See TransactionManager::get_extents_if_live and * LBAManager::get_physical_extent_if_live. */ using get_extents_if_live_iertr = base_iertr; diff --git a/src/crimson/os/seastore/btree/fixed_kv_btree.h b/src/crimson/os/seastore/btree/fixed_kv_btree.h index 097072a7434..0671b55a4fb 100644 --- a/src/crimson/os/seastore/btree/fixed_kv_btree.h +++ b/src/crimson/os/seastore/btree/fixed_kv_btree.h @@ -1383,6 +1383,9 @@ private: "looking up root on {}", c.trans, *root_block); + + // checking the lba root node must be atomic with creating + // and linking the absent root node auto [found, fut] = get_root_node(c); auto on_found_internal = diff --git a/src/crimson/os/seastore/cache.h b/src/crimson/os/seastore/cache.h index 0e4e14f5574..4441df86d4e 100644 --- a/src/crimson/os/seastore/cache.h +++ b/src/crimson/os/seastore/cache.h @@ -437,6 +437,11 @@ public: * Mostly the same as Cache::get_extent(), with the only difference * that get_absent_extent won't search the transaction's context for * the specific CachedExtent + * + * The extent in query is supposed to be absent in Cache. + * + * User is responsible to call get_extent_viewable_by_trans() + * *atomically* prior to call this method. */ template <typename T> get_extent_iertr::future<TCachedExtentRef<T>> get_absent_extent( @@ -471,18 +476,46 @@ public: CachedExtentRef extent) { assert(extent->is_valid()); - auto p_extent = extent->get_transactional_view(t); - if (!p_extent->is_pending_in_trans(t.get_trans_id())) { - t.add_to_read_set(p_extent); - if (!p_extent->is_mutation_pending()) { - touch_extent(*p_extent); + CachedExtent* p_extent; + if (extent->is_stable()) { + p_extent = extent->get_transactional_view(t); + if (p_extent != extent.get()) { + assert(!extent->is_stable_writting()); + assert(p_extent->is_pending_in_trans(t.get_trans_id())); + assert(!p_extent->is_stable_writting()); + if (p_extent->is_mutable()) { + assert(p_extent->is_fully_loaded()); + assert(!p_extent->is_pending_io()); + return get_extent_ertr::make_ready_future<CachedExtentRef>( + CachedExtentRef(p_extent)); + } else { + assert(p_extent->is_exist_clean()); + } + } else { + // stable from trans-view + assert(!p_extent->is_pending_in_trans(t.get_trans_id())); + if (t.maybe_add_to_read_set(p_extent)) { + touch_extent(*p_extent); + } + } + } else { + assert(!extent->is_stable_writting()); + assert(extent->is_pending_in_trans(t.get_trans_id())); + if (extent->is_mutable()) { + assert(extent->is_fully_loaded()); + assert(!extent->is_pending_io()); + return get_extent_ertr::make_ready_future<CachedExtentRef>(extent); + } else { + assert(extent->is_exist_clean()); + p_extent = extent.get(); } } + + assert(p_extent->is_stable() || p_extent->is_exist_clean()); // user should not see RETIRED_PLACEHOLDER extents ceph_assert(p_extent->get_type() != extent_types_t::RETIRED_PLACEHOLDER); if (!p_extent->is_fully_loaded()) { assert(!p_extent->is_mutable()); - touch_extent(*p_extent); LOG_PREFIX(Cache::get_extent_viewable_by_trans); SUBDEBUG(seastore_cache, "{} {}~{} is present without been fully loaded, reading ... -- {}", @@ -784,6 +817,9 @@ public: * and read in the extent at location offset~length. * * The extent in query is supposed to be absent in Cache. + * + * User is responsible to call get_extent_viewable_by_trans() + * *atomically* prior to call this method. */ template <typename Func> get_extent_by_type_ret get_absent_extent_by_type( diff --git a/src/crimson/os/seastore/cached_extent.h b/src/crimson/os/seastore/cached_extent.h index 1d6ffc740cb..e5e32b6713b 100644 --- a/src/crimson/os/seastore/cached_extent.h +++ b/src/crimson/os/seastore/cached_extent.h @@ -343,6 +343,8 @@ public: : "nullopt"; out << "CachedExtent(addr=" << this << ", type=" << get_type() + << ", trans=" << pending_for_transaction + << ", pending_io=" << is_pending_io() << ", version=" << version << ", dirty_from_or_retired_at=" << dirty_from_or_retired_at << ", modify_time=" << sea_time_point_printer_t{modify_time} @@ -440,14 +442,18 @@ public: state == extent_state_t::DIRTY; } + bool is_stable_writting() const { + // MUTATION_PENDING and under-io extents are already stable and visible, + // see prepare_record(). + // + // XXX: It might be good to mark this case as DIRTY from the definition, + // which probably can make things simpler. + return is_mutation_pending() && is_pending_io(); + } + /// Returns true if extent is stable and shared among transactions bool is_stable() const { - return is_stable_written() || - // MUTATION_PENDING and under-io extents are to-be-stable extents, - // for the sake of caveats that checks the correctness of extents - // states, we consider them stable. - (is_mutation_pending() && - is_pending_io()); + return is_stable_written() || is_stable_writting(); } bool is_data_stable() const { diff --git a/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc b/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc index f3e37ff9bef..80bfd7a2e20 100644 --- a/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc +++ b/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc @@ -83,7 +83,7 @@ BtreeLBAMappingRef LBALeafNode::get_mapping( this, iter.get_offset(), val, - lba_node_meta_t{laddr, val.len, 0}); + lba_node_meta_t{laddr, laddr + val.len, 0}); } } diff --git a/src/crimson/os/seastore/transaction.h b/src/crimson/os/seastore/transaction.h index d7cdb30098a..2fa468fb48e 100644 --- a/src/crimson/os/seastore/transaction.h +++ b/src/crimson/os/seastore/transaction.h @@ -137,14 +137,37 @@ public: } } + // Returns true if added, false if already added or weak + bool maybe_add_to_read_set(CachedExtentRef ref) { + if (is_weak()) { + return false; + } + + assert(ref->is_valid()); + + auto it = ref->transactions.lower_bound( + this, read_set_item_t<Transaction>::trans_cmp_t()); + if (it != ref->transactions.end() && it->t == this) { + return false; + } + + auto [iter, inserted] = read_set.emplace(this, ref); + ceph_assert(inserted); + ref->transactions.insert_before( + it, const_cast<read_set_item_t<Transaction>&>(*iter)); + return true; + } + void add_to_read_set(CachedExtentRef ref) { - if (is_weak()) return; + if (is_weak()) { + return; + } assert(ref->is_valid()); auto it = ref->transactions.lower_bound( this, read_set_item_t<Transaction>::trans_cmp_t()); - if (it != ref->transactions.end() && it->t == this) return; + assert(it == ref->transactions.end() || it->t != this); auto [iter, inserted] = read_set.emplace(this, ref); ceph_assert(inserted); diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index b9768909735..1b309a03683 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -680,7 +680,7 @@ TransactionManager::get_extents_if_live( laddr_t laddr, extent_len_t len) { - LOG_PREFIX(TransactionManager::get_extent_if_live); + LOG_PREFIX(TransactionManager::get_extents_if_live); TRACET("{} {}~{} {}", t, type, laddr, len, paddr); // This only works with segments to check if alive, diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc index 1fe1e06ef85..509d4c4a484 100644 --- a/src/crimson/osd/osd_operations/background_recovery.cc +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -87,13 +87,6 @@ seastar::future<> BackgroundRecoveryT<T>::start() }, [](std::exception_ptr) { return seastar::make_ready_future<bool>(false); }, pg, epoch_started); - }).handle_exception_type([ref, this](const std::system_error& err) { - LOG_PREFIX(BackgroundRecoveryT<T>::start); - if (err.code() == std::make_error_code(std::errc::interrupted)) { - DEBUGDPPI("recovery interruped: {}", *pg, err.what()); - return seastar::now(); - } - return seastar::make_exception_future<>(err); }); }); }); diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 33d0f4c0f41..bb314bad4f3 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -23,10 +23,18 @@ namespace crimson::osd { void ClientRequest::Orderer::requeue(Ref<PG> pg) { LOG_PREFIX(ClientRequest::Orderer::requeue); - for (auto &req: list) { - DEBUGDPP("requeueing {}", *pg, req); - req.reset_instance_handle(); - std::ignore = req.with_pg_process(pg); + std::list<ClientRequest*> to_requeue; + for (auto &req : list) { + to_requeue.emplace_back(&req); + } + // Client requests might be destroyed in the following + // iteration leading to short lived dangling pointers + // to those requests, but this doesn't hurt as we won't + // dereference those dangling pointers. + for (auto req: to_requeue) { + DEBUGDPP("requeueing {}", *pg, *req); + req->reset_instance_handle(); + std::ignore = req->with_pg_process(pg); } } @@ -286,6 +294,7 @@ ClientRequest::recover_missing_snaps( ObjectContextRef head, std::set<snapid_t> &snaps) { + LOG_PREFIX(ClientRequest::process_op); co_await ihref.enter_stage<interruptor>( client_pp(*pg).recover_missing_snaps, *this); for (auto &snap : snaps) { @@ -299,7 +308,12 @@ ClientRequest::recover_missing_snaps( * we skip the oid as there is no corresponding clone to recover. * See https://tracker.ceph.com/issues/63821 */ if (oid) { - co_await do_recover_missing(pg, *oid, m->get_reqid()); + auto unfound = co_await do_recover_missing(pg, *oid, m->get_reqid()); + if (unfound) { + DEBUGDPP("{} unfound, hang it for now", *pg, m->get_hobj().get_head()); + co_await interruptor::make_interruptible( + pg->get_recovery_backend()->add_unfound(m->get_hobj().get_head())); + } } } } @@ -317,7 +331,14 @@ ClientRequest::process_op( "Skipping recover_missings on non primary pg for soid {}", *pg, m->get_hobj()); } else { - co_await do_recover_missing(pg, m->get_hobj().get_head(), m->get_reqid()); + auto unfound = co_await do_recover_missing( + pg, m->get_hobj().get_head(), m->get_reqid()); + if (unfound) { + DEBUGDPP("{} unfound, hang it for now", *pg, m->get_hobj().get_head()); + co_await interruptor::make_interruptible( + pg->get_recovery_backend()->add_unfound(m->get_hobj().get_head())); + } + std::set<snapid_t> snaps = snaps_need_to_recover(); if (!snaps.empty()) { // call with_obc() in order, but wait concurrently for loading. diff --git a/src/crimson/osd/osd_operations/client_request_common.cc b/src/crimson/osd/osd_operations/client_request_common.cc index c4439d5bb35..a56d58d2066 100644 --- a/src/crimson/osd/osd_operations/client_request_common.cc +++ b/src/crimson/osd/osd_operations/client_request_common.cc @@ -13,7 +13,7 @@ namespace { namespace crimson::osd { -typename InterruptibleOperation::template interruptible_future<> +typename InterruptibleOperation::template interruptible_future<bool> CommonClientRequest::do_recover_missing( Ref<PG> pg, const hobject_t& soid, @@ -45,22 +45,29 @@ CommonClientRequest::do_recover_missing( if (!needs_recovery_or_backfill) { logger().debug("{} reqid {} nothing to recover {}", __func__, reqid, soid); - return seastar::now(); + return seastar::make_ready_future<bool>(false); } + if (pg->get_peering_state().get_missing_loc().is_unfound(soid)) { + return seastar::make_ready_future<bool>(true); + } logger().debug("{} reqid {} need to wait for recovery, {} version {}", __func__, reqid, soid, ver); if (pg->get_recovery_backend()->is_recovering(soid)) { logger().debug("{} reqid {} object {} version {}, already recovering", __func__, reqid, soid, ver); - return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered(); + return pg->get_recovery_backend()->get_recovering( + soid).wait_for_recovered( + ).then([] { + return seastar::make_ready_future<bool>(false); + }); } else { logger().debug("{} reqid {} object {} version {}, starting recovery", __func__, reqid, soid, ver); auto [op, fut] = pg->get_shard_services().start_operation<UrgentRecovery>( soid, ver, pg, pg->get_shard_services(), pg->get_osdmap_epoch()); - return std::move(fut); + return fut.then([] { return seastar::make_ready_future<bool>(false); }); } } diff --git a/src/crimson/osd/osd_operations/client_request_common.h b/src/crimson/osd/osd_operations/client_request_common.h index 85f118d64c1..951bf653799 100644 --- a/src/crimson/osd/osd_operations/client_request_common.h +++ b/src/crimson/osd/osd_operations/client_request_common.h @@ -11,7 +11,7 @@ namespace crimson::osd { struct CommonClientRequest { - static InterruptibleOperation::template interruptible_future<> + static InterruptibleOperation::template interruptible_future<bool> do_recover_missing( Ref<PG> pg, const hobject_t& soid, diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index 22d7f3e492a..2968a6f4385 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -70,7 +70,12 @@ seastar::future<> InternalClientRequest::start() client_pp().recover_missing); }).then_interruptible([this] { return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); - }).then_interruptible([this] { + }).then_interruptible([this](bool unfound) { + if (unfound) { + throw std::system_error( + std::make_error_code(std::errc::operation_canceled), + fmt::format("{} is unfound, drop it!", get_target_oid())); + } return enter_stage<interruptor>( client_pp().get_obc); }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { @@ -128,6 +133,9 @@ seastar::future<> InternalClientRequest::start() }, pg, start_epoch); }).then([this] { track_event<CompletionEvent>(); + }).handle_exception_type([](std::system_error &error) { + logger().debug("error {}, message: {}", error.code(), error.what()); + return seastar::now(); }).finally([this] { logger().debug("{}: exit", *this); handle.exit(); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index c195241cc5d..bb249b8e10e 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -513,6 +513,35 @@ Context *PG::on_clean() return nullptr; } +seastar::future<> PG::clear_temp_objects() +{ + logger().info("{} {}", __func__, pgid); + ghobject_t _next; + ceph::os::Transaction t; + auto max_size = local_conf()->osd_target_transaction_size; + while(true) { + auto [objs, next] = co_await shard_services.get_store().list_objects( + coll_ref, _next, ghobject_t::get_max(), max_size); + if (objs.empty()) { + if (!t.empty()) { + co_await shard_services.get_store().do_transaction( + coll_ref, std::move(t)); + } + break; + } + for (auto &obj : objs) { + if (obj.hobj.is_temp()) { + t.remove(coll_ref->get_cid(), obj); + } + } + _next = next; + if (t.get_num_ops() >= max_size) { + co_await shard_services.get_store().do_transaction( + coll_ref, t.claim_and_reset()); + } + } +} + PG::interruptible_future<seastar::stop_iteration> PG::trim_snap( snapid_t to_trim, bool needs_pause) @@ -1698,6 +1727,28 @@ bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const { return false; } +bool PG::should_send_op( + pg_shard_t peer, + const hobject_t &hoid) const +{ + if (peer == get_primary()) + return true; + bool should_send = + (hoid.pool != (int64_t)get_info().pgid.pool() || + (has_backfill_state() && hoid <= get_last_backfill_started()) || + hoid <= peering_state.get_peer_info(peer).last_backfill); + if (!should_send) { + ceph_assert(is_backfill_target(peer)); + logger().debug("{} issue_repop shipping empty opt to osd." + "{}, object {} beyond std::max(last_backfill_started, " + "peer_info[peer].last_backfill {})", + peer, hoid, peering_state.get_peer_info(peer).last_backfill); + } + return should_send; + // TODO: should consider async recovery cases in the future which are not supported + // by crimson yet +} + PG::interruptible_future<std::optional<PG::complete_op_t>> PG::already_complete(const osd_reqid_t& reqid) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 252709dea4d..6810803867f 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -521,6 +521,7 @@ public: bool get_need_up_thru() const { return peering_state.get_need_up_thru(); } + bool should_send_op(pg_shard_t peer, const hobject_t &hoid) const; epoch_t get_same_interval_since() const { return get_info().history.same_interval_since; } @@ -603,6 +604,7 @@ public: ObjectContextRef obc, const std::error_code e, ceph_tid_t rep_tid); + seastar::future<> clear_temp_objects(); private: @@ -740,6 +742,15 @@ public: PeeringState& get_peering_state() final { return peering_state; } + bool has_backfill_state() const { + return (bool)(recovery_handler->backfill_state); + } + const BackfillState& get_backfill_state() const { + return *recovery_handler->backfill_state; + } + hobject_t get_last_backfill_started() const { + return get_backfill_state().get_last_backfill_started(); + } bool has_reset_since(epoch_t epoch) const final { return peering_state.pg_has_reset_since(epoch); } diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index eeadacd21c2..09c93ed341e 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -433,6 +433,7 @@ void PGRecovery::on_global_recover ( auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid); recovery_waiter.set_recovered(); pg->get_recovery_backend()->remove_recovering(soid); + pg->get_recovery_backend()->found_and_remove(soid); } void PGRecovery::on_failed_recover( diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc index a32a1581adc..0d041e91e5e 100644 --- a/src/crimson/osd/pg_shard_manager.cc +++ b/src/crimson/osd/pg_shard_manager.cc @@ -35,8 +35,10 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store) pgid ).then([pgid, &per_shard_state](auto &&pg) { logger().info("load_pgs: loaded {}", pgid); - per_shard_state.pg_map.pg_loaded(pgid, std::move(pg)); - return seastar::now(); + return pg->clear_temp_objects( + ).then([&per_shard_state, pg, pgid] { + per_shard_state.pg_map.pg_loaded(pgid, std::move(pg)); + }); }); }); }); diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index c55d3150850..1148585ad25 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -43,7 +43,7 @@ void RecoveryBackend::clear_temp_obj(const hobject_t &oid) } void RecoveryBackend::clean_up(ceph::os::Transaction& t, - std::string_view why) + interrupt_cause_t why) { for_each_temp_obj([&](auto &soid) { t.remove(pg.get_collection_ref()->get_cid(), @@ -65,6 +65,36 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t, recovering.clear(); } +void RecoveryBackend::WaitForObjectRecovery::interrupt(interrupt_cause_t why) { + switch(why) { + case interrupt_cause_t::INTERVAL_CHANGE: + if (readable) { + readable->set_exception( + crimson::common::actingset_changed(pg.is_primary())); + readable.reset(); + } + if (recovered) { + recovered->set_exception( + crimson::common::actingset_changed(pg.is_primary())); + recovered.reset(); + } + if (pulled) { + pulled->set_exception( + crimson::common::actingset_changed(pg.is_primary())); + pulled.reset(); + } + for (auto& [pg_shard, pr] : pushes) { + pr.set_exception( + crimson::common::actingset_changed(pg.is_primary())); + } + pushes.clear(); + break; + default: + ceph_abort("impossible"); + break; + } +} + void RecoveryBackend::WaitForObjectRecovery::stop() { if (readable) { readable->set_exception( diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index 179cfbabd08..b404b79751e 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -45,10 +45,22 @@ public: backend{backend} {} virtual ~RecoveryBackend() {} std::pair<WaitForObjectRecovery&, bool> add_recovering(const hobject_t& soid) { - auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{}); + auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery(pg)); assert(it->second); return {*(it->second), added}; } + seastar::future<> add_unfound(const hobject_t &soid) { + auto [it, added] = unfound.emplace(soid, seastar::shared_promise()); + return it->second.get_shared_future(); + } + void found_and_remove(const hobject_t &soid) { + auto it = unfound.find(soid); + if (it != unfound.end()) { + auto &found_promise = it->second; + found_promise.set_value(); + unfound.erase(it); + } + } WaitForObjectRecovery& get_recovering(const hobject_t& soid) { assert(is_recovering(soid)); return *(recovering.at(soid)); @@ -82,14 +94,22 @@ public: std::int64_t min, std::int64_t max); + enum interrupt_cause_t : uint8_t { + INTERVAL_CHANGE, + MAX + }; void on_peering_interval_change(ceph::os::Transaction& t) { - clean_up(t, "new peering interval"); + clean_up(t, interrupt_cause_t::INTERVAL_CHANGE); } seastar::future<> stop() { for (auto& [soid, recovery_waiter] : recovering) { recovery_waiter->stop(); } + for (auto& [soid, promise] : unfound) { + promise.set_exception( + crimson::common::system_shutdown_exception()); + } return on_stop(); } protected: @@ -124,11 +144,14 @@ public: public boost::intrusive_ref_counter< WaitForObjectRecovery, boost::thread_unsafe_counter>, public crimson::BlockerT<WaitForObjectRecovery> { + crimson::osd::PG &pg; std::optional<seastar::shared_promise<>> readable, recovered, pulled; std::map<pg_shard_t, seastar::shared_promise<>> pushes; public: static constexpr const char* type_name = "WaitForObjectRecovery"; + WaitForObjectRecovery(crimson::osd::PG &pg) : pg(pg) {} + crimson::osd::ObjectContextRef obc; std::optional<pull_info_t> pull_info; std::map<pg_shard_t, push_info_t> pushing; @@ -204,28 +227,7 @@ public: pushes.erase(it); } } - void interrupt(std::string_view why) { - if (readable) { - readable->set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); - readable.reset(); - } - if (recovered) { - recovered->set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); - recovered.reset(); - } - if (pulled) { - pulled->set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); - pulled.reset(); - } - for (auto& [pg_shard, pr] : pushes) { - pr.set_exception(std::system_error( - std::make_error_code(std::errc::interrupted), why.data())); - } - pushes.clear(); - } + void interrupt(interrupt_cause_t why); void stop(); void dump_detail(Formatter* f) const { } @@ -235,6 +237,7 @@ public: using WaitForObjectRecoveryRef = boost::intrusive_ptr<WaitForObjectRecovery>; protected: std::map<hobject_t, WaitForObjectRecoveryRef> recovering; + std::map<hobject_t, seastar::shared_promise<>> unfound; hobject_t get_temp_recovery_object( const hobject_t& target, eversion_t version) const; @@ -249,7 +252,7 @@ protected: backend->clear_temp_objs(); } - void clean_up(ceph::os::Transaction& t, std::string_view why); + void clean_up(ceph::os::Transaction& t, interrupt_cause_t why); virtual seastar::future<> on_stop() = 0; private: void handle_backfill_finish( diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index d01fd646803..d227b9c89e9 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -65,7 +65,14 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards, min_epoch, tid, osd_op_p.at_version); - m->set_data(encoded_txn); + if (pg.should_send_op(pg_shard, hoid)) { + m->set_data(encoded_txn); + } else { + ceph::os::Transaction t; + bufferlist bl; + encode(t, bl); + m->set_data(bl); + } pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); encode(log_entries, m->logbl); m->pg_trim_to = osd_op_p.pg_trim_to; |