diff options
Diffstat (limited to 'src/crimson/osd/ops_executer.cc')
-rw-r--r-- | src/crimson/osd/ops_executer.cc | 262 |
1 files changed, 173 insertions, 89 deletions
diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 4e735c3b4cb..cbc35c21a04 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -15,12 +15,15 @@ #include <seastar/core/thread.hh> +#include "crimson/common/log.h" #include "crimson/osd/exceptions.h" #include "crimson/osd/pg.h" #include "crimson/osd/watch.h" #include "osd/ClassHandler.h" #include "osd/SnapMapper.h" +SET_SUBSYS(osd); + namespace { seastar::logger& logger() { return crimson::get_logger(ceph_subsys_osd); @@ -464,10 +467,7 @@ auto OpsExecuter::do_const_op(Func&& f) { template <class Func> auto OpsExecuter::do_write_op(Func&& f, OpsExecuter::modified_by m) { ++num_write; - if (!osd_op_params) { - osd_op_params.emplace(); - fill_op_params(m); - } + check_init_op_params(m); return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn); } OpsExecuter::call_errorator::future<> OpsExecuter::do_assert_ver( @@ -822,25 +822,100 @@ OpsExecuter::do_execute_op(OSDOp& osd_op) } } -void OpsExecuter::fill_op_params(OpsExecuter::modified_by m) +OpsExecuter::rep_op_fut_t +OpsExecuter::flush_changes_and_submit( + const std::vector<OSDOp>& ops, + SnapMapper& snap_mapper, + OSDriver& osdriver) { - osd_op_params->req_id = msg->get_reqid(); - osd_op_params->mtime = msg->get_mtime(); - osd_op_params->at_version = pg->get_next_version(); - osd_op_params->pg_trim_to = pg->get_pg_trim_to(); - osd_op_params->pg_committed_to = pg->get_pg_committed_to(); - osd_op_params->last_complete = pg->get_info().last_complete; - osd_op_params->user_modify = (m == modified_by::user); + const bool want_mutate = !txn.empty(); + // osd_op_params are instantiated by every wr-like operation. + assert(osd_op_params || !want_mutate); + assert(obc); + + auto submitted = interruptor::now(); + auto all_completed = interruptor::now(); + + if (cloning_ctx) { + ceph_assert(want_mutate); + } + + apply_stats(); + if (want_mutate) { + osd_op_params->at_version = pg->get_next_version(); + osd_op_params->pg_trim_to = pg->get_pg_trim_to(); + osd_op_params->pg_committed_to = pg->get_pg_committed_to(); + osd_op_params->last_complete = pg->get_info().last_complete; + + std::vector<pg_log_entry_t> log_entries; + + if (cloning_ctx) { + log_entries.emplace_back(complete_cloning_ctx()); + } + + log_entries.emplace_back(prepare_head_update(ops, txn)); + + if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) { + ceph_assert(log_rit->version == osd_op_params->at_version); + } + + /* + * This works around the gcc bug causing the generated code to incorrectly + * execute unconditionally before the predicate. + * + * https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101244 + */ + auto clone_obc = cloning_ctx + ? std::move(cloning_ctx->clone_obc) + : nullptr; + auto [_submitted, _all_completed] = co_await pg->submit_transaction( + std::move(obc), + std::move(clone_obc), + std::move(txn), + std::move(*osd_op_params), + std::move(log_entries) + ); + + submitted = std::move(_submitted); + all_completed = std::move(_all_completed); + } + + if (op_effects.size()) [[unlikely]] { + // need extra ref pg due to apply_stats() which can be executed after + // informing snap mapper + all_completed = + std::move(all_completed).then_interruptible([this, pg=this->pg] { + // let's do the cleaning of `op_effects` in destructor + return interruptor::do_for_each(op_effects, + [pg=std::move(pg)](auto& op_effect) { + return op_effect->execute(pg); + }); + }); + } + + co_return std::make_tuple( + std::move(submitted), + std::move(all_completed)); } -std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction( - const std::vector<OSDOp>& ops) +pg_log_entry_t OpsExecuter::prepare_head_update( + const std::vector<OSDOp>& ops, + ceph::os::Transaction &txn) { - // let's ensure we don't need to inform SnapMapper about this particular - // entry. + LOG_PREFIX(OpsExecuter::prepare_head_update); assert(obc->obs.oi.soid.snap >= CEPH_MAXSNAP); - std::vector<pg_log_entry_t> log_entries; - log_entries.emplace_back( + + update_clone_overlap(); + if (cloning_ctx) { + obc->ssc->snapset = std::move(cloning_ctx->new_snapset); + } + if (snapc.seq > obc->ssc->snapset.seq) { + // update snapset with latest snap context + obc->ssc->snapset.seq = snapc.seq; + obc->ssc->snapset.snaps.clear(); + } + + pg_log_entry_t ret{ obc->obs.exists ? pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE, obc->obs.oi.soid, @@ -849,15 +924,38 @@ std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction( osd_op_params->user_modify ? osd_op_params->at_version.version : 0, osd_op_params->req_id, osd_op_params->mtime, - op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0); + op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0}; + if (op_info.allows_returnvec()) { // also the per-op values are recorded in the pg log - log_entries.back().set_op_returns(ops); - logger().debug("{} op_returns: {}", - __func__, log_entries.back().op_returns); + ret.set_op_returns(ops); + DEBUGDPP("op returns: {}", *pg, ret.op_returns); + } + ret.clean_regions = std::move(osd_op_params->clean_regions); + + + if (obc->obs.exists) { + obc->obs.oi.prior_version = obc->obs.oi.version; + obc->obs.oi.version = osd_op_params->at_version; + if (osd_op_params->user_modify) + obc->obs.oi.user_version = osd_op_params->at_version.version; + obc->obs.oi.last_reqid = osd_op_params->req_id; + obc->obs.oi.mtime = osd_op_params->mtime; + obc->obs.oi.local_mtime = ceph_clock_now(); + + obc->ssc->exists = true; + pg->get_backend().set_metadata( + obc->obs.oi.soid, + obc->obs.oi, + obc->obs.oi.soid.is_head() ? &(obc->ssc->snapset) : nullptr, + txn); + } else { + // reset cached ObjectState without enforcing eviction + obc->obs.oi = object_info_t(obc->obs.oi.soid); } - log_entries.back().clean_regions = std::move(osd_op_params->clean_regions); - return log_entries; + + DEBUGDPP("entry: {}", *pg, ret); + return ret; } // Defined here because there is a circular dependency between OpsExecuter and PG @@ -871,25 +969,26 @@ version_t OpsExecuter::get_last_user_version() const return pg->get_last_user_version(); } -std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone( +void OpsExecuter::prepare_cloning_ctx( const SnapContext& snapc, const ObjectState& initial_obs, const SnapSet& initial_snapset, PGBackend& backend, ceph::os::Transaction& txn) { + LOG_PREFIX(OpsExecuter::prepare_cloning_ctx); const hobject_t& soid = initial_obs.oi.soid; logger().debug("{} {} snapset={} snapc={}", __func__, soid, initial_snapset, snapc); - auto cloning_ctx = std::make_unique<CloningContext>(); + cloning_ctx = std::make_unique<CloningContext>(); cloning_ctx->new_snapset = initial_snapset; // clone object, the snap field is set to the seq of the SnapContext // at its creation. - hobject_t coid = soid; - coid.snap = snapc.seq; + cloning_ctx->coid = soid; + cloning_ctx->coid.snap = snapc.seq; // existing snaps are stored in descending order in snapc, // cloned_snaps vector will hold all the snaps stored until snapset.seq @@ -900,48 +999,63 @@ std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone( return std::vector<snapid_t>{std::begin(snapc.snaps), last}; }(); - auto clone_obc = prepare_clone(coid, osd_op_params->at_version); - osd_op_params->at_version.version++; + // make clone here, but populate in metadata in complete_cloning_ctx + backend.clone_for_write(soid, cloning_ctx->coid, txn); - // make clone - backend.clone(clone_obc->obs.oi, initial_obs, clone_obc->obs, txn); + cloning_ctx->clone_obc = prepare_clone(cloning_ctx->coid, initial_obs); delta_stats.num_objects++; - if (clone_obc->obs.oi.is_omap()) { + if (cloning_ctx->clone_obc->obs.oi.is_omap()) { delta_stats.num_objects_omap++; } delta_stats.num_object_clones++; // newsnapset is obc's ssc - cloning_ctx->new_snapset.clones.push_back(coid.snap); - cloning_ctx->new_snapset.clone_size[coid.snap] = initial_obs.oi.size; - cloning_ctx->new_snapset.clone_snaps[coid.snap] = cloned_snaps; + cloning_ctx->new_snapset.clones.push_back(cloning_ctx->coid.snap); + cloning_ctx->new_snapset.clone_size[cloning_ctx->coid.snap] = initial_obs.oi.size; + cloning_ctx->new_snapset.clone_snaps[cloning_ctx->coid.snap] = cloned_snaps; // clone_overlap should contain an entry for each clone // (an empty interval_set if there is no overlap) - auto &overlap = cloning_ctx->new_snapset.clone_overlap[coid.snap]; + auto &overlap = cloning_ctx->new_snapset.clone_overlap[cloning_ctx->coid.snap]; if (initial_obs.oi.size) { overlap.insert(0, initial_obs.oi.size); } // log clone - logger().debug("cloning v {} to {} v {} snaps={} snapset={}", - initial_obs.oi.version, coid, - osd_op_params->at_version, cloned_snaps, cloning_ctx->new_snapset); + DEBUGDPP("cloning v {} to {} v {} snaps={} snapset={}", *pg, + initial_obs.oi.version, cloning_ctx->coid, + osd_op_params->at_version, cloned_snaps, cloning_ctx->new_snapset); +} - cloning_ctx->log_entry = { +pg_log_entry_t OpsExecuter::complete_cloning_ctx() +{ + ceph_assert(cloning_ctx); + const auto &coid = cloning_ctx->coid; + cloning_ctx->clone_obc->obs.oi.version = osd_op_params->at_version; + + osd_op_params->at_version.version++; + + pg->get_backend().set_metadata( + cloning_ctx->coid, + cloning_ctx->clone_obc->obs.oi, + nullptr /* snapset */, + txn); + + pg_log_entry_t ret{ pg_log_entry_t::CLONE, coid, - clone_obc->obs.oi.version, - clone_obc->obs.oi.prior_version, - clone_obc->obs.oi.user_version, + cloning_ctx->clone_obc->obs.oi.version, + cloning_ctx->clone_obc->obs.oi.prior_version, + cloning_ctx->clone_obc->obs.oi.user_version, osd_reqid_t(), - clone_obc->obs.oi.mtime, // will be replaced in `apply_to()` + cloning_ctx->clone_obc->obs.oi.mtime, // will be replaced in `apply_to()` 0 }; - encode(cloned_snaps, cloning_ctx->log_entry.snaps); - cloning_ctx->log_entry.clean_regions.mark_data_region_dirty(0, initial_obs.oi.size); - - return cloning_ctx; + ceph_assert(cloning_ctx->new_snapset.clone_snaps.count(coid.snap)); + encode(cloning_ctx->new_snapset.clone_snaps[coid.snap], ret.snaps); + ret.clean_regions.mark_data_region_dirty(0, cloning_ctx->clone_obc->obs.oi.size); + ret.mtime = cloning_ctx->clone_obc->obs.oi.mtime; + return ret; } void OpsExecuter::update_clone_overlap() { @@ -964,47 +1078,16 @@ void OpsExecuter::update_clone_overlap() { delta_stats.num_bytes += osd_op_params->modified_ranges.size(); } -void OpsExecuter::CloningContext::apply_to( - std::vector<pg_log_entry_t>& log_entries, - ObjectContext& processed_obc) && -{ - log_entry.mtime = processed_obc.obs.oi.mtime; - log_entries.insert(log_entries.begin(), std::move(log_entry)); - processed_obc.ssc->snapset = std::move(new_snapset); -} - -std::vector<pg_log_entry_t> -OpsExecuter::flush_clone_metadata( - std::vector<pg_log_entry_t>&& log_entries, - SnapMapper& snap_mapper, - OSDriver& osdriver, - ceph::os::Transaction& txn) -{ - assert(!txn.empty()); - update_clone_overlap(); - if (cloning_ctx) { - std::move(*cloning_ctx).apply_to(log_entries, *obc); - } - if (snapc.seq > obc->ssc->snapset.seq) { - // update snapset with latest snap context - obc->ssc->snapset.seq = snapc.seq; - obc->ssc->snapset.snaps.clear(); - } - logger().debug("{} done, initial snapset={}, new snapset={}", - __func__, obc->obs.oi.soid, obc->ssc->snapset); - return std::move(log_entries); -} - ObjectContextRef OpsExecuter::prepare_clone( const hobject_t& coid, - eversion_t version) + const ObjectState& initial_obs) { ceph_assert(pg->is_primary()); ObjectState clone_obs{coid}; clone_obs.exists = true; - clone_obs.oi.version = version; - clone_obs.oi.prior_version = obc->obs.oi.version; - clone_obs.oi.copy_user_bits(obc->obs.oi); + // clone_obs.oi.version will be populated in complete_cloning_ctx + clone_obs.oi.prior_version = initial_obs.oi.version; + clone_obs.oi.copy_user_bits(initial_obs.oi); clone_obs.oi.clear_flag(object_info_t::FLAG_WHITEOUT); auto [clone_obc, existed] = pg->obc_registry.get_cached_obc(std::move(coid)); @@ -1035,11 +1118,12 @@ OpsExecuter::OpsExecuter(Ref<PG> pg, { if (op_info.may_write() && should_clone(*obc, snapc)) { do_write_op([this](auto& backend, auto& os, auto& txn) { - cloning_ctx = execute_clone(std::as_const(snapc), - std::as_const(obc->obs), - std::as_const(obc->ssc->snapset), - backend, - txn); + prepare_cloning_ctx( + std::as_const(snapc), + std::as_const(obc->obs), + std::as_const(obc->ssc->snapset), + backend, + txn); }); } } |