diff options
Diffstat (limited to 'src/crimson/osd/pg.cc')
-rw-r--r-- | src/crimson/osd/pg.cc | 207 |
1 files changed, 89 insertions, 118 deletions
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 1e2988efbbe..2746e730f2b 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -868,46 +868,18 @@ std::ostream& operator<<(std::ostream& os, const PG& pg) return os; } -void PG::mutate_object( - ObjectContextRef& obc, - ceph::os::Transaction& txn, - osd_op_params_t& osd_op_p) +void PG::enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers) { - if (obc->obs.exists) { - obc->obs.oi.prior_version = obc->obs.oi.version; - obc->obs.oi.version = osd_op_p.at_version; - if (osd_op_p.user_modify) - obc->obs.oi.user_version = osd_op_p.at_version.version; - obc->obs.oi.last_reqid = osd_op_p.req_id; - obc->obs.oi.mtime = osd_op_p.mtime; - obc->obs.oi.local_mtime = ceph_clock_now(); - - // object_info_t - { - ceph::bufferlist osv; - obc->obs.oi.encode_no_oid(osv, CEPH_FEATURES_ALL); - // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); - txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv); - } - - // snapset - if (obc->obs.oi.soid.snap == CEPH_NOSNAP) { - logger().debug("final snapset {} in {}", - obc->ssc->snapset, obc->obs.oi.soid); - ceph::bufferlist bss; - encode(obc->ssc->snapset, bss); - txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss); - obc->ssc->exists = true; - } else { - logger().debug("no snapset (this is a clone)"); - } - } else { - // reset cached ObjectState without enforcing eviction - obc->obs.oi = object_info_t(obc->obs.oi.soid); - } + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_push(obj, v, peers); } -void PG::enqueue_push_for_backfill( +void PG::enqueue_delete_for_backfill( const hobject_t &obj, const eversion_t &v, const std::vector<pg_shard_t> &peers) @@ -915,7 +887,7 @@ void PG::enqueue_push_for_backfill( assert(recovery_handler); assert(recovery_handler->backfill_state); auto backfill_state = recovery_handler->backfill_state.get(); - backfill_state->enqueue_standalone_push(obj, v, peers); + backfill_state->enqueue_standalone_delete(obj, v, peers); } PG::interruptible_future< @@ -1039,8 +1011,15 @@ PG::interruptible_future<eversion_t> PG::submit_error_log( const std::error_code e, ceph_tid_t rep_tid) { - logger().debug("{}: {} rep_tid: {} error: {}", - __func__, *m, rep_tid, e); + // as with submit_executer, need to ensure that log numbering and submission + // are atomic + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); + LOG_PREFIX(PG::submit_error_log); + DEBUGDPP("{} rep_tid: {} error: {}", + *this, *m, rep_tid, e); const osd_reqid_t &reqid = m->get_reqid(); mempool::osd_pglog::list<pg_log_entry_t> log_entries; log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, @@ -1061,47 +1040,45 @@ PG::interruptible_future<eversion_t> PG::submit_error_log( log_entries, t, peering_state.get_pg_trim_to(), peering_state.get_pg_committed_to()); - return seastar::do_with(log_entries, set<pg_shard_t>{}, - [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable { - return interruptor::do_for_each(get_acting_recovery_backfill(), - [this, log_entries, waiting_on, rep_tid] - (auto& i) mutable { - pg_shard_t peer(i); - if (peer == pg_whoami) { - return seastar::now(); - } - ceph_assert(peering_state.get_peer_missing().count(peer)); - ceph_assert(peering_state.has_peer_info(peer)); - auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>( - log_entries, - spg_t(peering_state.get_info().pgid.pgid, i.shard), - pg_whoami.shard, - get_osdmap_epoch(), - get_last_peering_reset(), - rep_tid, - peering_state.get_pg_trim_to(), - peering_state.get_pg_committed_to()); - waiting_on.insert(peer); - logger().debug("submit_error_log: sending log" - "missing_request (rep_tid: {} entries: {})" - " to osd {}", rep_tid, log_entries, peer.osd); - return shard_services.send_to_osd(peer.osd, - std::move(log_m), - get_osdmap_epoch()); - }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable { - waiting_on.insert(pg_whoami); - logger().debug("submit_error_log: inserting rep_tid {}", rep_tid); - log_entry_update_waiting_on.insert( - std::make_pair(rep_tid, - log_update_t{std::move(waiting_on)})); - return shard_services.get_store().do_transaction( - get_collection_ref(), std::move(t) - ).then([this] { - peering_state.update_trim_to(); - return seastar::make_ready_future<eversion_t>(projected_last_update); - }); - }); - }); + + set<pg_shard_t> waiting_on; + for (const auto &peer: get_acting_recovery_backfill()) { + if (peer == pg_whoami) { + continue; + } + ceph_assert(peering_state.get_peer_missing().count(peer)); + ceph_assert(peering_state.has_peer_info(peer)); + auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>( + log_entries, + spg_t(peering_state.get_info().pgid.pgid, peer.shard), + pg_whoami.shard, + get_osdmap_epoch(), + get_last_peering_reset(), + rep_tid, + peering_state.get_pg_trim_to(), + peering_state.get_pg_committed_to()); + waiting_on.insert(peer); + + DEBUGDPP("sending log missing_request (rep_tid: {} entries: {}) to osd {}", + *this, rep_tid, log_entries, peer.osd); + co_await interruptor::make_interruptible( + shard_services.send_to_osd( + peer.osd, + std::move(log_m), + get_osdmap_epoch())); + } + waiting_on.insert(pg_whoami); + DEBUGDPP("inserting rep_tid {}", *this, rep_tid); + log_entry_update_waiting_on.insert( + std::make_pair(rep_tid, + log_update_t{std::move(waiting_on)})); + co_await interruptor::make_interruptible( + shard_services.get_store().do_transaction( + get_collection_ref(), std::move(t) + )); + + peering_state.update_trim_to(); + co_return projected_last_update; } PG::run_executer_fut PG::run_executer( @@ -1157,27 +1134,25 @@ PG::submit_executer_fut PG::submit_executer( OpsExecuter &&ox, const std::vector<OSDOp>& ops) { LOG_PREFIX(PG::submit_executer); - // transaction must commit at this point - return std::move( + DEBUGDPP("", *this); + + // we need to build the pg log entries and submit the transaction + // atomically to ensure log ordering + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); + + auto [submitted, completed] = co_await std::move( ox - ).flush_changes_n_do_ops_effects( + ).flush_changes_and_submit( ops, snap_mapper, - osdriver, - [FNAME, this](auto&& txn, - auto&& obc, - auto&& osd_op_p, - auto&& log_entries, - auto&& new_clone) { - DEBUGDPP("object {} submitting txn", *this, obc->get_oid()); - mutate_object(obc, txn, osd_op_p); - return submit_transaction( - std::move(obc), - std::move(new_clone), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - }); + osdriver + ); + co_return std::make_tuple( + std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}), + std::move(completed)); } PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m) @@ -1251,13 +1226,10 @@ void PG::update_stats(const pg_stat_t &stat) { ); } -PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) +PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req) { LOG_PREFIX(PG::handle_rep_op); DEBUGDPP("{}", *this, *req); - if (can_discard_replica_op(*req)) { - co_return; - } ceph::os::Transaction txn; auto encoded_txn = req->get_data().cbegin(); @@ -1279,7 +1251,8 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) txn, false); DEBUGDPP("{} do_transaction", *this, *req); - co_await interruptor::make_interruptible( + + auto commit_fut = interruptor::make_interruptible( shard_services.get_store().do_transaction(coll_ref, std::move(txn)) ); @@ -1290,10 +1263,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) req.get(), pg_whoami, 0, map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); reply->set_last_complete_ondisk(lcod); - co_await interruptor::make_interruptible( - shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch) - ); - co_return; + co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply)); } PG::interruptible_future<> PG::update_snap_map( @@ -1324,20 +1294,21 @@ void PG::log_operation( bool transaction_applied, ObjectStore::Transaction &txn, bool async) { - logger().debug("{}", __func__); + LOG_PREFIX(PG::log_operation); + DEBUGDPP("", *this); if (is_primary()) { ceph_assert(trim_to <= peering_state.get_pg_committed_to()); } auto last = logv.rbegin(); if (is_primary() && last != logv.rend()) { - logger().debug("{} on primary, trimming projected log", - __func__); + DEBUGDPP("on primary, trimming projected log", *this); projected_log.skip_can_rollback_to_to_head(); projected_log.trim(shard_services.get_cct(), last->version, nullptr, nullptr, nullptr); } if (!is_primary()) { // && !is_ec_pg() + DEBUGDPP("on replica, clearing obc", *this); replica_clear_repop_obc(logv); } if (!logv.empty()) { @@ -1354,13 +1325,13 @@ void PG::log_operation( void PG::replica_clear_repop_obc( const std::vector<pg_log_entry_t> &logv) { - logger().debug("{} clearing {} entries", __func__, logv.size()); - for (auto &&e: logv) { - logger().debug(" {} get_object_boundary(from): {} " - " head version(to): {}", - e.soid, - e.soid.get_object_boundary(), - e.soid.get_head()); + LOG_PREFIX(PG::replica_clear_repop_obc); + DEBUGDPP("clearing obc for {} log entries", logv.size()); + for (auto &&e: logv) { + DEBUGDPP("clearing entry for {} from: {} to: {}", + e.soid, + e.soid.get_object_boundary(), + e.soid.get_head()); /* Have to blast all clones, they share a snapset */ obc_registry.clear_range( e.soid.get_object_boundary(), e.soid.get_head()); |