diff options
Diffstat (limited to 'src/crimson/osd/pg.cc')
-rw-r--r-- | src/crimson/osd/pg.cc | 323 |
1 files changed, 157 insertions, 166 deletions
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 744a1dbc02b..2746e730f2b 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -132,6 +132,7 @@ PG::PG( pool, name), osdmap, + PG_FEATURE_CRIMSON_ALL, this, this), scrubber(*this), @@ -392,7 +393,13 @@ void PG::on_replica_activate() void PG::on_activate_complete() { - wait_for_active_blocker.unblock(); + /* Confusingly, on_activate_complete is invoked when the primary and replicas + * have recorded the current interval. At that point, the PG may either become + * ACTIVE or PEERED, depending on whether the acting set is eligible for client + * IO. Only unblock wait_for_active_blocker if we actually became ACTIVE */ + if (peering_state.is_active()) { + wait_for_active_blocker.unblock(); + } if (peering_state.needs_recovery()) { logger().info("{}: requesting recovery", @@ -861,43 +868,26 @@ std::ostream& operator<<(std::ostream& os, const PG& pg) return os; } -void PG::mutate_object( - ObjectContextRef& obc, - ceph::os::Transaction& txn, - osd_op_params_t& osd_op_p) +void PG::enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers) { - if (obc->obs.exists) { - obc->obs.oi.prior_version = obc->obs.oi.version; - obc->obs.oi.version = osd_op_p.at_version; - if (osd_op_p.user_modify) - obc->obs.oi.user_version = osd_op_p.at_version.version; - obc->obs.oi.last_reqid = osd_op_p.req_id; - obc->obs.oi.mtime = osd_op_p.mtime; - obc->obs.oi.local_mtime = ceph_clock_now(); - - // object_info_t - { - ceph::bufferlist osv; - obc->obs.oi.encode_no_oid(osv, CEPH_FEATURES_ALL); - // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); - txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv); - } + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_push(obj, v, peers); +} - // snapset - if (obc->obs.oi.soid.snap == CEPH_NOSNAP) { - logger().debug("final snapset {} in {}", - obc->ssc->snapset, obc->obs.oi.soid); - ceph::bufferlist bss; - encode(obc->ssc->snapset, bss); - txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss); - obc->ssc->exists = true; - } else { - logger().debug("no snapset (this is a clone)"); - } - } else { - // reset cached ObjectState without enforcing eviction - obc->obs.oi = object_info_t(obc->obs.oi.soid); - } +void PG::enqueue_delete_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers) +{ + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_delete(obj, v, peers); } PG::interruptible_future< @@ -905,6 +895,7 @@ PG::interruptible_future< PG::interruptible_future<>>> PG::submit_transaction( ObjectContextRef&& obc, + ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, std::vector<pg_log_entry_t>&& log_entries) @@ -917,17 +908,23 @@ PG::submit_transaction( } epoch_t map_epoch = get_osdmap_epoch(); + auto at_version = osd_op_p.at_version; - peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version); + peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, at_version); peering_state.update_trim_to(); ceph_assert(!log_entries.empty()); ceph_assert(log_entries.rbegin()->version >= projected_last_update); projected_last_update = log_entries.rbegin()->version; + for (const auto& entry: log_entries) { + projected_log.add(entry); + } + auto [submitted, all_completed] = co_await backend->submit_transaction( peering_state.get_acting_recovery_backfill(), obc->obs.oi.soid, + std::move(new_clone), std::move(txn), std::move(osd_op_p), peering_state.get_last_peering_reset(), @@ -936,8 +933,8 @@ PG::submit_transaction( co_return std::make_tuple( std::move(submitted), all_completed.then_interruptible( - [this, last_complete=peering_state.get_info().last_complete, - at_version=osd_op_p.at_version](auto acked) { + [this, last_complete=peering_state.get_info().last_complete, at_version] + (auto acked) { for (const auto& peer : acked) { peering_state.update_peer_last_complete_ondisk( peer.shard, peer.last_complete_ondisk); @@ -1014,8 +1011,15 @@ PG::interruptible_future<eversion_t> PG::submit_error_log( const std::error_code e, ceph_tid_t rep_tid) { - logger().debug("{}: {} rep_tid: {} error: {}", - __func__, *m, rep_tid, e); + // as with submit_executer, need to ensure that log numbering and submission + // are atomic + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); + LOG_PREFIX(PG::submit_error_log); + DEBUGDPP("{} rep_tid: {} error: {}", + *this, *m, rep_tid, e); const osd_reqid_t &reqid = m->get_reqid(); mempool::osd_pglog::list<pg_log_entry_t> log_entries; log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, @@ -1034,49 +1038,47 @@ PG::interruptible_future<eversion_t> PG::submit_error_log( ceph::os::Transaction t; peering_state.merge_new_log_entries( log_entries, t, peering_state.get_pg_trim_to(), - peering_state.get_min_last_complete_ondisk()); - - return seastar::do_with(log_entries, set<pg_shard_t>{}, - [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable { - return interruptor::do_for_each(get_acting_recovery_backfill(), - [this, log_entries, waiting_on, rep_tid] - (auto& i) mutable { - pg_shard_t peer(i); - if (peer == pg_whoami) { - return seastar::now(); - } - ceph_assert(peering_state.get_peer_missing().count(peer)); - ceph_assert(peering_state.has_peer_info(peer)); - auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>( - log_entries, - spg_t(peering_state.get_info().pgid.pgid, i.shard), - pg_whoami.shard, - get_osdmap_epoch(), - get_last_peering_reset(), - rep_tid, - peering_state.get_pg_trim_to(), - peering_state.get_min_last_complete_ondisk()); - waiting_on.insert(peer); - logger().debug("submit_error_log: sending log" - "missing_request (rep_tid: {} entries: {})" - " to osd {}", rep_tid, log_entries, peer.osd); - return shard_services.send_to_osd(peer.osd, - std::move(log_m), - get_osdmap_epoch()); - }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable { - waiting_on.insert(pg_whoami); - logger().debug("submit_error_log: inserting rep_tid {}", rep_tid); - log_entry_update_waiting_on.insert( - std::make_pair(rep_tid, - log_update_t{std::move(waiting_on)})); - return shard_services.get_store().do_transaction( - get_collection_ref(), std::move(t) - ).then([this] { - peering_state.update_trim_to(); - return seastar::make_ready_future<eversion_t>(projected_last_update); - }); - }); - }); + peering_state.get_pg_committed_to()); + + + set<pg_shard_t> waiting_on; + for (const auto &peer: get_acting_recovery_backfill()) { + if (peer == pg_whoami) { + continue; + } + ceph_assert(peering_state.get_peer_missing().count(peer)); + ceph_assert(peering_state.has_peer_info(peer)); + auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>( + log_entries, + spg_t(peering_state.get_info().pgid.pgid, peer.shard), + pg_whoami.shard, + get_osdmap_epoch(), + get_last_peering_reset(), + rep_tid, + peering_state.get_pg_trim_to(), + peering_state.get_pg_committed_to()); + waiting_on.insert(peer); + + DEBUGDPP("sending log missing_request (rep_tid: {} entries: {}) to osd {}", + *this, rep_tid, log_entries, peer.osd); + co_await interruptor::make_interruptible( + shard_services.send_to_osd( + peer.osd, + std::move(log_m), + get_osdmap_epoch())); + } + waiting_on.insert(pg_whoami); + DEBUGDPP("inserting rep_tid {}", *this, rep_tid); + log_entry_update_waiting_on.insert( + std::make_pair(rep_tid, + log_update_t{std::move(waiting_on)})); + co_await interruptor::make_interruptible( + shard_services.get_store().do_transaction( + get_collection_ref(), std::move(t) + )); + + peering_state.update_trim_to(); + co_return projected_last_update; } PG::run_executer_fut PG::run_executer( @@ -1132,25 +1134,25 @@ PG::submit_executer_fut PG::submit_executer( OpsExecuter &&ox, const std::vector<OSDOp>& ops) { LOG_PREFIX(PG::submit_executer); - // transaction must commit at this point - return std::move( + DEBUGDPP("", *this); + + // we need to build the pg log entries and submit the transaction + // atomically to ensure log ordering + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); + + auto [submitted, completed] = co_await std::move( ox - ).flush_changes_n_do_ops_effects( + ).flush_changes_and_submit( ops, snap_mapper, - osdriver, - [FNAME, this](auto&& txn, - auto&& obc, - auto&& osd_op_p, - auto&& log_entries) { - DEBUGDPP("object {} submitting txn", *this, obc->get_oid()); - mutate_object(obc, txn, osd_op_p); - return submit_transaction( - std::move(obc), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - }); + osdriver + ); + co_return std::make_tuple( + std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}), + std::move(completed)); } PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m) @@ -1215,31 +1217,6 @@ void PG::check_blocklisted_obc_watchers( } } -PG::load_obc_iertr::future<> -PG::with_locked_obc(const hobject_t &hobj, - const OpInfo &op_info, - with_obc_func_t &&f) -{ - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - const hobject_t oid = get_oid(hobj); - auto wrapper = [f=std::move(f), this](auto head, auto obc) { - check_blocklisted_obc_watchers(obc); - return f(head, obc); - }; - switch (get_lock_type(op_info)) { - case RWState::RWREAD: - return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper)); - case RWState::RWWRITE: - return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper)); - case RWState::RWEXCL: - return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper)); - default: - ceph_abort(); - }; -} - void PG::update_stats(const pg_stat_t &stat) { peering_state.update_stats( [&stat] (auto& history, auto& stats) { @@ -1249,13 +1226,10 @@ void PG::update_stats(const pg_stat_t &stat) { ); } -PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) +PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req) { LOG_PREFIX(PG::handle_rep_op); DEBUGDPP("{}", *this, *req); - if (can_discard_replica_op(*req)) { - co_return; - } ceph::os::Transaction txn; auto encoded_txn = req->get_data().cbegin(); @@ -1272,12 +1246,13 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) log_operation(std::move(log_entries), req->pg_trim_to, req->version, - req->min_last_complete_ondisk, + req->pg_committed_to, !txn.empty(), txn, false); DEBUGDPP("{} do_transaction", *this, *req); - co_await interruptor::make_interruptible( + + auto commit_fut = interruptor::make_interruptible( shard_services.get_store().do_transaction(coll_ref, std::move(txn)) ); @@ -1288,10 +1263,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) req.get(), pg_whoami, 0, map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); reply->set_last_complete_ondisk(lcod); - co_await interruptor::make_interruptible( - shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch) - ); - co_return; + co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply)); } PG::interruptible_future<> PG::update_snap_map( @@ -1318,28 +1290,25 @@ void PG::log_operation( std::vector<pg_log_entry_t>&& logv, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, bool transaction_applied, ObjectStore::Transaction &txn, bool async) { - logger().debug("{}", __func__); + LOG_PREFIX(PG::log_operation); + DEBUGDPP("", *this); if (is_primary()) { - ceph_assert(trim_to <= peering_state.get_last_update_ondisk()); + ceph_assert(trim_to <= peering_state.get_pg_committed_to()); } - /* TODO: when we add snap mapper and projected log support, - * we'll likely want to update them here. - * - * See src/osd/PrimaryLogPG.h:log_operation for how classic - * handles these cases. - */ -#if 0 auto last = logv.rbegin(); if (is_primary() && last != logv.rend()) { + DEBUGDPP("on primary, trimming projected log", *this); projected_log.skip_can_rollback_to_to_head(); - projected_log.trim(cct, last->version, nullptr, nullptr, nullptr); + projected_log.trim(shard_services.get_cct(), last->version, + nullptr, nullptr, nullptr); } -#endif + if (!is_primary()) { // && !is_ec_pg() + DEBUGDPP("on replica, clearing obc", *this); replica_clear_repop_obc(logv); } if (!logv.empty()) { @@ -1348,7 +1317,7 @@ void PG::log_operation( peering_state.append_log(std::move(logv), trim_to, roll_forward_to, - min_last_complete_ondisk, + pg_committed_to, txn, !txn.empty(), false); @@ -1356,13 +1325,13 @@ void PG::log_operation( void PG::replica_clear_repop_obc( const std::vector<pg_log_entry_t> &logv) { - logger().debug("{} clearing {} entries", __func__, logv.size()); - for (auto &&e: logv) { - logger().debug(" {} get_object_boundary(from): {} " - " head version(to): {}", - e.soid, - e.soid.get_object_boundary(), - e.soid.get_head()); + LOG_PREFIX(PG::replica_clear_repop_obc); + DEBUGDPP("clearing obc for {} log entries", logv.size()); + for (auto &&e: logv) { + DEBUGDPP("clearing entry for {} from: {} to: {}", + e.soid, + e.soid.get_object_boundary(), + e.soid.get_head()); /* Have to blast all clones, they share a snapset */ obc_registry.clear_range( e.soid.get_object_boundary(), e.soid.get_head()); @@ -1387,17 +1356,17 @@ PG::interruptible_future<> PG::do_update_log_missing( ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING); ObjectStore::Transaction t; - std::optional<eversion_t> op_trim_to, op_roll_forward_to; + std::optional<eversion_t> op_trim_to, op_pg_committed_to; if (m->pg_trim_to != eversion_t()) op_trim_to = m->pg_trim_to; - if (m->pg_roll_forward_to != eversion_t()) - op_roll_forward_to = m->pg_roll_forward_to; - logger().debug("op_trim_to = {}, op_roll_forward_to = {}", + if (m->pg_committed_to != eversion_t()) + op_pg_committed_to = m->pg_committed_to; + logger().debug("op_trim_to = {}, op_pg_committed_to = {}", op_trim_to.has_value() ? *op_trim_to : eversion_t(), - op_roll_forward_to.has_value() ? *op_roll_forward_to : eversion_t()); + op_pg_committed_to.has_value() ? *op_pg_committed_to : eversion_t()); peering_state.append_log_entries_update_missing( - m->entries, t, op_trim_to, op_roll_forward_to); + m->entries, t, op_trim_to, op_pg_committed_to); return interruptor::make_interruptible(shard_services.get_store().do_transaction( coll_ref, std::move(t))).then_interruptible( @@ -1615,14 +1584,21 @@ bool PG::should_send_op( return true; bool should_send = (hoid.pool != (int64_t)get_info().pgid.pool() || - (has_backfill_state() && hoid <= get_last_backfill_started()) || - hoid <= peering_state.get_peer_info(peer).last_backfill); + // An object has been fully pushed to the backfill target if and only if + // either of the following conditions is met: + // 1. peer_info.last_backfill has passed "hoid" + // 2. last_backfill_started has passed "hoid" and "hoid" is not in the peer + // missing set + hoid <= peering_state.get_peer_info(peer).last_backfill || + (has_backfill_state() && hoid <= get_last_backfill_started() && + !is_missing_on_peer(peer, hoid))); if (!should_send) { ceph_assert(is_backfill_target(peer)); logger().debug("{} issue_repop shipping empty opt to osd." "{}, object {} beyond std::max(last_backfill_started, " "peer_info[peer].last_backfill {})", - peer, hoid, peering_state.get_peer_info(peer).last_backfill); + __func__, peer, hoid, + peering_state.get_peer_info(peer).last_backfill); } return should_send; // TODO: should consider async recovery cases in the future which are not supported @@ -1637,8 +1613,8 @@ PG::already_complete(const osd_reqid_t& reqid) int ret; std::vector<pg_log_op_return_item_t> op_returns; - if (peering_state.get_pg_log().get_log().get_request( - reqid, &version, &user_version, &ret, &op_returns)) { + if (check_in_progress_op( + reqid, &version, &user_version, &ret, &op_returns)) { complete_op_t dupinfo{ user_version, version, @@ -1703,4 +1679,19 @@ void PG::C_PG_FinishRecovery::finish(int r) { DEBUGDPP("stale recovery finsher", pg); } } +bool PG::check_in_progress_op( + const osd_reqid_t& reqid, + eversion_t *version, + version_t *user_version, + int *return_code, + std::vector<pg_log_op_return_item_t> *op_returns + ) const +{ + return ( + projected_log.get_request(reqid, version, user_version, return_code, + op_returns) || + peering_state.get_pg_log().get_log().get_request( + reqid, version, user_version, return_code, op_returns)); +} + } |