diff options
Diffstat (limited to 'src/crimson/osd/pg.cc')
-rw-r--r-- | src/crimson/osd/pg.cc | 620 |
1 files changed, 206 insertions, 414 deletions
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 97d48c1fa45..2746e730f2b 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -13,6 +13,9 @@ #include <boost/range/numeric.hpp> #include <fmt/format.h> #include <fmt/ostream.h> + +#include <seastar/util/defer.hh> + #include "include/utime_fmt.h" #include "common/hobject.h" @@ -129,6 +132,7 @@ PG::PG( pool, name), osdmap, + PG_FEATURE_CRIMSON_ALL, this, this), scrubber(*this), @@ -389,7 +393,13 @@ void PG::on_replica_activate() void PG::on_activate_complete() { - wait_for_active_blocker.unblock(); + /* Confusingly, on_activate_complete is invoked when the primary and replicas + * have recorded the current interval. At that point, the PG may either become + * ACTIVE or PEERED, depending on whether the acting set is eligible for client + * IO. Only unblock wait_for_active_blocker if we actually became ACTIVE */ + if (peering_state.is_active()) { + wait_for_active_blocker.unblock(); + } if (peering_state.needs_recovery()) { logger().info("{}: requesting recovery", @@ -858,43 +868,26 @@ std::ostream& operator<<(std::ostream& os, const PG& pg) return os; } -void PG::mutate_object( - ObjectContextRef& obc, - ceph::os::Transaction& txn, - osd_op_params_t& osd_op_p) +void PG::enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers) { - if (obc->obs.exists) { - obc->obs.oi.prior_version = obc->obs.oi.version; - obc->obs.oi.version = osd_op_p.at_version; - if (osd_op_p.user_modify) - obc->obs.oi.user_version = osd_op_p.at_version.version; - obc->obs.oi.last_reqid = osd_op_p.req_id; - obc->obs.oi.mtime = osd_op_p.mtime; - obc->obs.oi.local_mtime = ceph_clock_now(); - - // object_info_t - { - ceph::bufferlist osv; - obc->obs.oi.encode_no_oid(osv, CEPH_FEATURES_ALL); - // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); - txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv); - } + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_push(obj, v, peers); +} - // snapset - if (obc->obs.oi.soid.snap == CEPH_NOSNAP) { - logger().debug("final snapset {} in {}", - obc->ssc->snapset, obc->obs.oi.soid); - ceph::bufferlist bss; - encode(obc->ssc->snapset, bss); - txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss); - obc->ssc->exists = true; - } else { - logger().debug("no snapset (this is a clone)"); - } - } else { - // reset cached ObjectState without enforcing eviction - obc->obs.oi = object_info_t(obc->obs.oi.soid); - } +void PG::enqueue_delete_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers) +{ + assert(recovery_handler); + assert(recovery_handler->backfill_state); + auto backfill_state = recovery_handler->backfill_state.get(); + backfill_state->enqueue_standalone_delete(obj, v, peers); } PG::interruptible_future< @@ -902,6 +895,7 @@ PG::interruptible_future< PG::interruptible_future<>>> PG::submit_transaction( ObjectContextRef&& obc, + ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, std::vector<pg_log_entry_t>&& log_entries) @@ -914,17 +908,23 @@ PG::submit_transaction( } epoch_t map_epoch = get_osdmap_epoch(); + auto at_version = osd_op_p.at_version; - peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version); + peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, at_version); peering_state.update_trim_to(); ceph_assert(!log_entries.empty()); ceph_assert(log_entries.rbegin()->version >= projected_last_update); projected_last_update = log_entries.rbegin()->version; + for (const auto& entry: log_entries) { + projected_log.add(entry); + } + auto [submitted, all_completed] = co_await backend->submit_transaction( peering_state.get_acting_recovery_backfill(), obc->obs.oi.soid, + std::move(new_clone), std::move(txn), std::move(osd_op_p), peering_state.get_last_peering_reset(), @@ -933,8 +933,8 @@ PG::submit_transaction( co_return std::make_tuple( std::move(submitted), all_completed.then_interruptible( - [this, last_complete=peering_state.get_info().last_complete, - at_version=osd_op_p.at_version](auto acked) { + [this, last_complete=peering_state.get_info().last_complete, at_version] + (auto acked) { for (const auto& peer : acked) { peering_state.update_peer_last_complete_ondisk( peer.shard, peer.last_complete_ondisk); @@ -975,150 +975,6 @@ ObjectContextRef duplicate_obc(const ObjectContextRef &obc) { return object_context; } -template <class Ret, class SuccessFunc, class FailureFunc> -PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>> -PG::do_osd_ops_execute( - seastar::lw_shared_ptr<OpsExecuter> ox, - ObjectContextRef obc, - const OpInfo &op_info, - Ref<MOSDOp> m, - std::vector<OSDOp>& ops, - SuccessFunc&& success_func, - FailureFunc&& failure_func) -{ - assert(ox); - auto rollbacker = ox->create_rollbacker( - [object_context=duplicate_obc(obc)] (auto& obc) mutable { - obc->update_from(*object_context); - }); - auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); - return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) { - logger().debug( - "do_osd_ops_execute: object {} - handling op {}", - ox->get_target(), - ceph_osd_op_name(osd_op.op.op)); - return ox->execute_op(osd_op); - }).safe_then_interruptible([this, ox, &ops] { - logger().debug( - "do_osd_ops_execute: object {} all operations successful", - ox->get_target()); - // check for full - if ((ox->delta_stats.num_bytes > 0 || - ox->delta_stats.num_objects > 0) && - get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { - const auto& m = ox->get_message(); - if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now - m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { - logger().info(" full, but proceeding due to FULL_FORCE or MDS"); - } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) { - // they tried, they failed. - logger().info(" full, replying to FULL_TRY op"); - if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) - return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::edquot::make())); - else - return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::enospc::make())); - } else { - // drop request - logger().info(" full, dropping request (bad client)"); - return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::eagain::make())); - } - } - return std::move(*ox).flush_changes_n_do_ops_effects( - ops, - snap_mapper, - osdriver, - [this] (auto&& txn, - auto&& obc, - auto&& osd_op_p, - auto&& log_entries) { - logger().debug( - "do_osd_ops_execute: object {} submitting txn", - obc->get_oid()); - mutate_object(obc, txn, osd_op_p); - return submit_transaction( - std::move(obc), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - }); - }).safe_then_unpack_interruptible( - [success_func=std::move(success_func), rollbacker, this, failure_func_ptr, obc] - (auto submitted_fut, auto _all_completed_fut) mutable { - - auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple( - std::move(success_func), - crimson::ct_error::object_corrupted::handle( - [rollbacker, this, obc] (const std::error_code& e) mutable { - // this is a path for EIO. it's special because we want to fix the obejct - // and try again. that is, the layer above `PG::do_osd_ops` is supposed to - // restart the execution. - rollbacker.rollback_obc_if_modified(e); - return repair_object(obc->obs.oi.soid, - obc->obs.oi.version - ).then_interruptible([] { - return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()}; - }); - }), OpsExecuter::osd_op_errorator::all_same_way( - [rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - // handle non-fatal errors only - ceph_assert(e.value() == EDQUOT || - e.value() == ENOSPC || - e.value() == EAGAIN); - rollbacker.rollback_obc_if_modified(e); - return (*failure_func_ptr)(e); - })); - - return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>( - std::move(submitted_fut), - std::move(all_completed_fut) - ); - }, OpsExecuter::osd_op_errorator::all_same_way( - [this, op_info, m, obc, - rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - ceph_tid_t rep_tid = shard_services.get_tid(); - rollbacker.rollback_obc_if_modified(e); - // record error log - auto maybe_submit_error_log = - interruptor::make_ready_future<std::optional<eversion_t>>(std::nullopt); - // call submit_error_log only for non-internal clients - if constexpr (!std::is_same_v<Ret, void>) { - if(op_info.may_write()) { - maybe_submit_error_log = - submit_error_log(m, op_info, obc, e, rep_tid); - } - } - return maybe_submit_error_log.then_interruptible( - [this, failure_func_ptr, e, rep_tid] (auto version) { - auto all_completed = - [this, failure_func_ptr, e, rep_tid, version] { - if (version.has_value()) { - return complete_error_log(rep_tid, version.value() - ).then_interruptible([failure_func_ptr, e] { - return (*failure_func_ptr)(e); - }); - } else { - return (*failure_func_ptr)(e); - } - }; - return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>( - std::move(seastar::now()), - std::move(all_completed()) - ); - }); - })); -} - PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version) { @@ -1148,15 +1004,22 @@ PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, return result; } -PG::interruptible_future<std::optional<eversion_t>> PG::submit_error_log( +PG::interruptible_future<eversion_t> PG::submit_error_log( Ref<MOSDOp> m, const OpInfo &op_info, ObjectContextRef obc, const std::error_code e, ceph_tid_t rep_tid) { - logger().debug("{}: {} rep_tid: {} error: {}", - __func__, *m, rep_tid, e); + // as with submit_executer, need to ensure that log numbering and submission + // are atomic + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); + LOG_PREFIX(PG::submit_error_log); + DEBUGDPP("{} rep_tid: {} error: {}", + *this, *m, rep_tid, e); const osd_reqid_t &reqid = m->get_reqid(); mempool::osd_pglog::list<pg_log_entry_t> log_entries; log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, @@ -1175,181 +1038,121 @@ PG::interruptible_future<std::optional<eversion_t>> PG::submit_error_log( ceph::os::Transaction t; peering_state.merge_new_log_entries( log_entries, t, peering_state.get_pg_trim_to(), - peering_state.get_min_last_complete_ondisk()); - - return seastar::do_with(log_entries, set<pg_shard_t>{}, - [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable { - return interruptor::do_for_each(get_acting_recovery_backfill(), - [this, log_entries, waiting_on, rep_tid] - (auto& i) mutable { - pg_shard_t peer(i); - if (peer == pg_whoami) { - return seastar::now(); - } - ceph_assert(peering_state.get_peer_missing().count(peer)); - ceph_assert(peering_state.has_peer_info(peer)); - auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>( - log_entries, - spg_t(peering_state.get_info().pgid.pgid, i.shard), - pg_whoami.shard, - get_osdmap_epoch(), - get_last_peering_reset(), - rep_tid, - peering_state.get_pg_trim_to(), - peering_state.get_min_last_complete_ondisk()); - waiting_on.insert(peer); - logger().debug("submit_error_log: sending log" - "missing_request (rep_tid: {} entries: {})" - " to osd {}", rep_tid, log_entries, peer.osd); - return shard_services.send_to_osd(peer.osd, - std::move(log_m), - get_osdmap_epoch()); - }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable { - waiting_on.insert(pg_whoami); - logger().debug("submit_error_log: inserting rep_tid {}", rep_tid); - log_entry_update_waiting_on.insert( - std::make_pair(rep_tid, - log_update_t{std::move(waiting_on)})); - return shard_services.get_store().do_transaction( - get_collection_ref(), std::move(t) - ).then([this] { - peering_state.update_trim_to(); - return seastar::make_ready_future<std::optional<eversion_t>>(projected_last_update); - }); - }); - }); + peering_state.get_pg_committed_to()); + + + set<pg_shard_t> waiting_on; + for (const auto &peer: get_acting_recovery_backfill()) { + if (peer == pg_whoami) { + continue; + } + ceph_assert(peering_state.get_peer_missing().count(peer)); + ceph_assert(peering_state.has_peer_info(peer)); + auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>( + log_entries, + spg_t(peering_state.get_info().pgid.pgid, peer.shard), + pg_whoami.shard, + get_osdmap_epoch(), + get_last_peering_reset(), + rep_tid, + peering_state.get_pg_trim_to(), + peering_state.get_pg_committed_to()); + waiting_on.insert(peer); + + DEBUGDPP("sending log missing_request (rep_tid: {} entries: {}) to osd {}", + *this, rep_tid, log_entries, peer.osd); + co_await interruptor::make_interruptible( + shard_services.send_to_osd( + peer.osd, + std::move(log_m), + get_osdmap_epoch())); + } + waiting_on.insert(pg_whoami); + DEBUGDPP("inserting rep_tid {}", *this, rep_tid); + log_entry_update_waiting_on.insert( + std::make_pair(rep_tid, + log_update_t{std::move(waiting_on)})); + co_await interruptor::make_interruptible( + shard_services.get_store().do_transaction( + get_collection_ref(), std::move(t) + )); + + peering_state.update_trim_to(); + co_return projected_last_update; } -PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>> -PG::do_osd_ops( - Ref<MOSDOp> m, - crimson::net::ConnectionXcoreRef conn, +PG::run_executer_fut PG::run_executer( + OpsExecuter &ox, ObjectContextRef obc, const OpInfo &op_info, - const SnapContext& snapc) + std::vector<OSDOp>& ops) { - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - return do_osd_ops_execute<MURef<MOSDOpReply>>( - seastar::make_lw_shared<OpsExecuter>( - Ref<PG>{this}, obc, op_info, *m, conn, snapc), - obc, - op_info, - m, - m->ops, - // success_func - [this, m, obc, may_write = op_info.may_write(), - may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] { - // TODO: should stop at the first op which returns a negative retval, - // cmpext uses it for returning the index of first unmatched byte - int result = m->ops.empty() ? 0 : m->ops.back().rval.code; - if (may_read && result >= 0) { - for (auto &osdop : m->ops) { - if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = osdop.rval.code; - break; - } - } - } else if (result > 0 && may_write && !rvec) { - result = 0; - } else if (result < 0 && (m->ops.empty() ? - 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = 0; - } - auto reply = crimson::make_message<MOSDOpReply>(m.get(), - result, - get_osdmap_epoch(), - 0, - false); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - logger().debug( - "do_osd_ops: {} - object {} sending reply", - *m, - m->get_hobj()); - if (obc->obs.exists) { - reply->set_reply_versions(peering_state.get_info().last_update, - obc->obs.oi.user_version); - } else { - reply->set_reply_versions(peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - } - return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>( - std::move(reply)); - }, - // failure_func - [m, this] - (const std::error_code& e) { - logger().error("do_osd_ops_execute::failure_func {} got error: {}", - *m, e); - return log_reply(m, e); + LOG_PREFIX(PG::run_executer); + auto rollbacker = ox.create_rollbacker( + [stored_obc=duplicate_obc(obc)](auto &obc) mutable { + obc->update_from(*stored_obc); + }); + auto rollback_on_error = seastar::defer([&rollbacker] { + rollbacker.rollback_obc_if_modified(); }); -} -PG::do_osd_ops_iertr::future<MURef<MOSDOpReply>> -PG::log_reply( - Ref<MOSDOp> m, - const std::error_code& e) -{ - auto reply = crimson::make_message<MOSDOpReply>( - m.get(), -e.value(), get_osdmap_epoch(), 0, false); - if (m->ops.empty() ? 0 : - m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { - reply->set_result(0); - } - // For all ops except for CMPEXT, the correct error value is encoded - // in e.value(). For CMPEXT, osdop.rval has the actual error value. - if (e.value() == ct_error::cmp_fail_error_value) { - assert(!m->ops.empty()); - for (auto &osdop : m->ops) { - if (osdop.rval < 0) { - reply->set_result(osdop.rval); - break; + for (auto &op: ops) { + DEBUGDPP("object {} handle op {}", *this, ox.get_target(), op); + co_await ox.execute_op(op); + } + DEBUGDPP("object {} all operations successful", *this, ox.get_target()); + + // check for full + if ((ox.delta_stats.num_bytes > 0 || + ox.delta_stats.num_objects > 0) && + get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { + const auto& m = ox.get_message(); + if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now + m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { + INFODPP("full, but proceeding due to FULL_FORCE, or MDS", *this); + } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) { + // they tried, they failed. + INFODPP("full, replying to FULL_TRY op", *this); + if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) { + co_await run_executer_fut( + crimson::ct_error::edquot::make()); + } else { + co_await run_executer_fut( + crimson::ct_error::enospc::make()); } + } else { + // drop request + INFODPP("full, dropping request (bad client)", *this); + co_await run_executer_fut( + crimson::ct_error::eagain::make()); } } - reply->set_enoent_reply_versions( - peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>( - std::move(reply)); + rollback_on_error.cancel(); } -PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>> -PG::do_osd_ops( - ObjectContextRef obc, - std::vector<OSDOp>& ops, - const OpInfo &op_info, - const do_osd_ops_params_t &&msg_params) -{ - // This overload is generally used for internal client requests, - // use an empty SnapContext. - return seastar::do_with( - std::move(msg_params), - [=, this, &ops, &op_info](auto &msg_params) { - return do_osd_ops_execute<void>( - seastar::make_lw_shared<OpsExecuter>( - Ref<PG>{this}, - obc, - op_info, - msg_params, - msg_params.get_connection(), - SnapContext{} - ), - obc, - op_info, - Ref<MOSDOp>(), - ops, - // success_func - [] { - return do_osd_ops_iertr::now(); - }, - // failure_func - [] (const std::error_code& e) { - return do_osd_ops_iertr::now(); - }); +PG::submit_executer_fut PG::submit_executer( + OpsExecuter &&ox, + const std::vector<OSDOp>& ops) { + LOG_PREFIX(PG::submit_executer); + DEBUGDPP("", *this); + + // we need to build the pg log entries and submit the transaction + // atomically to ensure log ordering + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); }); + + auto [submitted, completed] = co_await std::move( + ox + ).flush_changes_and_submit( + ops, + snap_mapper, + osdriver + ); + co_return std::make_tuple( + std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}), + std::move(completed)); } PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m) @@ -1414,31 +1217,6 @@ void PG::check_blocklisted_obc_watchers( } } -PG::load_obc_iertr::future<> -PG::with_locked_obc(const hobject_t &hobj, - const OpInfo &op_info, - with_obc_func_t &&f) -{ - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - const hobject_t oid = get_oid(hobj); - auto wrapper = [f=std::move(f), this](auto head, auto obc) { - check_blocklisted_obc_watchers(obc); - return f(head, obc); - }; - switch (get_lock_type(op_info)) { - case RWState::RWREAD: - return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper)); - case RWState::RWWRITE: - return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper)); - case RWState::RWEXCL: - return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper)); - default: - ceph_abort(); - }; -} - void PG::update_stats(const pg_stat_t &stat) { peering_state.update_stats( [&stat] (auto& history, auto& stats) { @@ -1448,13 +1226,10 @@ void PG::update_stats(const pg_stat_t &stat) { ); } -PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) +PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req) { LOG_PREFIX(PG::handle_rep_op); DEBUGDPP("{}", *this, *req); - if (can_discard_replica_op(*req)) { - co_return; - } ceph::os::Transaction txn; auto encoded_txn = req->get_data().cbegin(); @@ -1471,12 +1246,13 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) log_operation(std::move(log_entries), req->pg_trim_to, req->version, - req->min_last_complete_ondisk, + req->pg_committed_to, !txn.empty(), txn, false); DEBUGDPP("{} do_transaction", *this, *req); - co_await interruptor::make_interruptible( + + auto commit_fut = interruptor::make_interruptible( shard_services.get_store().do_transaction(coll_ref, std::move(txn)) ); @@ -1487,10 +1263,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req) req.get(), pg_whoami, 0, map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); reply->set_last_complete_ondisk(lcod); - co_await interruptor::make_interruptible( - shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch) - ); - co_return; + co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply)); } PG::interruptible_future<> PG::update_snap_map( @@ -1517,28 +1290,25 @@ void PG::log_operation( std::vector<pg_log_entry_t>&& logv, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, bool transaction_applied, ObjectStore::Transaction &txn, bool async) { - logger().debug("{}", __func__); + LOG_PREFIX(PG::log_operation); + DEBUGDPP("", *this); if (is_primary()) { - ceph_assert(trim_to <= peering_state.get_last_update_ondisk()); + ceph_assert(trim_to <= peering_state.get_pg_committed_to()); } - /* TODO: when we add snap mapper and projected log support, - * we'll likely want to update them here. - * - * See src/osd/PrimaryLogPG.h:log_operation for how classic - * handles these cases. - */ -#if 0 auto last = logv.rbegin(); if (is_primary() && last != logv.rend()) { + DEBUGDPP("on primary, trimming projected log", *this); projected_log.skip_can_rollback_to_to_head(); - projected_log.trim(cct, last->version, nullptr, nullptr, nullptr); + projected_log.trim(shard_services.get_cct(), last->version, + nullptr, nullptr, nullptr); } -#endif + if (!is_primary()) { // && !is_ec_pg() + DEBUGDPP("on replica, clearing obc", *this); replica_clear_repop_obc(logv); } if (!logv.empty()) { @@ -1547,7 +1317,7 @@ void PG::log_operation( peering_state.append_log(std::move(logv), trim_to, roll_forward_to, - min_last_complete_ondisk, + pg_committed_to, txn, !txn.empty(), false); @@ -1555,13 +1325,13 @@ void PG::log_operation( void PG::replica_clear_repop_obc( const std::vector<pg_log_entry_t> &logv) { - logger().debug("{} clearing {} entries", __func__, logv.size()); - for (auto &&e: logv) { - logger().debug(" {} get_object_boundary(from): {} " - " head version(to): {}", - e.soid, - e.soid.get_object_boundary(), - e.soid.get_head()); + LOG_PREFIX(PG::replica_clear_repop_obc); + DEBUGDPP("clearing obc for {} log entries", logv.size()); + for (auto &&e: logv) { + DEBUGDPP("clearing entry for {} from: {} to: {}", + e.soid, + e.soid.get_object_boundary(), + e.soid.get_head()); /* Have to blast all clones, they share a snapset */ obc_registry.clear_range( e.soid.get_object_boundary(), e.soid.get_head()); @@ -1586,17 +1356,17 @@ PG::interruptible_future<> PG::do_update_log_missing( ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING); ObjectStore::Transaction t; - std::optional<eversion_t> op_trim_to, op_roll_forward_to; + std::optional<eversion_t> op_trim_to, op_pg_committed_to; if (m->pg_trim_to != eversion_t()) op_trim_to = m->pg_trim_to; - if (m->pg_roll_forward_to != eversion_t()) - op_roll_forward_to = m->pg_roll_forward_to; - logger().debug("op_trim_to = {}, op_roll_forward_to = {}", + if (m->pg_committed_to != eversion_t()) + op_pg_committed_to = m->pg_committed_to; + logger().debug("op_trim_to = {}, op_pg_committed_to = {}", op_trim_to.has_value() ? *op_trim_to : eversion_t(), - op_roll_forward_to.has_value() ? *op_roll_forward_to : eversion_t()); + op_pg_committed_to.has_value() ? *op_pg_committed_to : eversion_t()); peering_state.append_log_entries_update_missing( - m->entries, t, op_trim_to, op_roll_forward_to); + m->entries, t, op_trim_to, op_pg_committed_to); return interruptor::make_interruptible(shard_services.get_store().do_transaction( coll_ref, std::move(t))).then_interruptible( @@ -1814,14 +1584,21 @@ bool PG::should_send_op( return true; bool should_send = (hoid.pool != (int64_t)get_info().pgid.pool() || - (has_backfill_state() && hoid <= get_last_backfill_started()) || - hoid <= peering_state.get_peer_info(peer).last_backfill); + // An object has been fully pushed to the backfill target if and only if + // either of the following conditions is met: + // 1. peer_info.last_backfill has passed "hoid" + // 2. last_backfill_started has passed "hoid" and "hoid" is not in the peer + // missing set + hoid <= peering_state.get_peer_info(peer).last_backfill || + (has_backfill_state() && hoid <= get_last_backfill_started() && + !is_missing_on_peer(peer, hoid))); if (!should_send) { ceph_assert(is_backfill_target(peer)); logger().debug("{} issue_repop shipping empty opt to osd." "{}, object {} beyond std::max(last_backfill_started, " "peer_info[peer].last_backfill {})", - peer, hoid, peering_state.get_peer_info(peer).last_backfill); + __func__, peer, hoid, + peering_state.get_peer_info(peer).last_backfill); } return should_send; // TODO: should consider async recovery cases in the future which are not supported @@ -1836,8 +1613,8 @@ PG::already_complete(const osd_reqid_t& reqid) int ret; std::vector<pg_log_op_return_item_t> op_returns; - if (peering_state.get_pg_log().get_log().get_request( - reqid, &version, &user_version, &ret, &op_returns)) { + if (check_in_progress_op( + reqid, &version, &user_version, &ret, &op_returns)) { complete_op_t dupinfo{ user_version, version, @@ -1902,4 +1679,19 @@ void PG::C_PG_FinishRecovery::finish(int r) { DEBUGDPP("stale recovery finsher", pg); } } +bool PG::check_in_progress_op( + const osd_reqid_t& reqid, + eversion_t *version, + version_t *user_version, + int *return_code, + std::vector<pg_log_op_return_item_t> *op_returns + ) const +{ + return ( + projected_log.get_request(reqid, version, user_version, return_code, + op_returns) || + peering_state.get_pg_log().get_log().get_request( + reqid, version, user_version, return_code, op_returns)); +} + } |