summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/ops_executer.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/ops_executer.h')
-rw-r--r--src/crimson/osd/ops_executer.h151
1 files changed, 43 insertions, 108 deletions
diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h
index 0dea7d0515e..f5554bd6919 100644
--- a/src/crimson/osd/ops_executer.h
+++ b/src/crimson/osd/ops_executer.h
@@ -40,7 +40,7 @@ namespace crimson::osd {
class PG;
// OpsExecuter -- a class for executing ops targeting a certain object.
-class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
+class OpsExecuter {
friend class SnapTrimObjSubEvent;
using call_errorator = crimson::errorator<
@@ -170,16 +170,12 @@ public:
object_stat_sum_t delta_stats;
private:
- // an operation can be divided into two stages: main and effect-exposing
- // one. The former is performed immediately on call to `do_osd_op()` while
- // the later on `submit_changes()` – after successfully processing main
- // stages of all involved operations. When any stage fails, none of all
- // scheduled effect-exposing stages will be executed.
- // when operation requires this division, some variant of `with_effect()`
- // should be used.
+ // with_effect can be used to schedule operations to be performed
+ // at commit time. effects will be discarded if the operation does
+ // not commit.
struct effect_t {
// an effect can affect PG, i.e. create a watch timeout
- virtual osd_op_errorator::future<> execute(Ref<PG> pg) = 0;
+ virtual seastar::future<> execute(Ref<PG> pg) = 0;
virtual ~effect_t() = default;
};
@@ -199,25 +195,26 @@ private:
SnapContext snapc; // writer snap context
struct CloningContext {
+ /// id of new clone, populated in prepare_cloning_ctx
+ hobject_t coid;
+ /// new snapset, populated in prepare_cloning_ctx
SnapSet new_snapset;
- pg_log_entry_t log_entry;
-
- void apply_to(
- std::vector<pg_log_entry_t>& log_entries,
- ObjectContext& processed_obc) &&;
+ /// populated in complete_cloning_ctx
+ ObjectContextRef clone_obc;
};
std::unique_ptr<CloningContext> cloning_ctx;
-
/**
- * execute_clone
+ * prepare_cloning_ctx
*
* If snapc contains a snap which occurred logically after the last write
- * seen by this object (see OpsExecutor::should_clone()), we first need
- * make a clone of the object at its current state. execute_clone primes
- * txn with that clone operation and returns an
- * OpsExecutor::CloningContext which will allow us to fill in the corresponding
- * metadata and log_entries once the operations have been processed.
+ * seen by this object (see OpsExecuter::should_clone()), we first need
+ * make a clone of the object at its current state. prepare_cloning_ctx
+ * primes txn with that clone operation and populates cloning_ctx with
+ * an obc for the clone and a new snapset reflecting the clone.
+ *
+ * complete_cloning_ctx later uses the information from cloning_ctx to
+ * generate a log entry and object_info versions for the clone.
*
* Note that this strategy differs from classic, which instead performs this
* work at the end and reorders the transaction. See
@@ -230,13 +227,15 @@ private:
* @param backend [in,out] interface for generating mutations
* @param txn [out] transaction for the operation
*/
- std::unique_ptr<CloningContext> execute_clone(
+ void prepare_cloning_ctx(
const SnapContext& snapc,
const ObjectState& initial_obs,
const SnapSet& initial_snapset,
PGBackend& backend,
ceph::os::Transaction& txn);
+ /// complete clone, populate clone_obc, return log entry
+ pg_log_entry_t complete_cloning_ctx();
/**
* should_clone
@@ -267,12 +266,6 @@ private:
*/
void update_clone_overlap();
- interruptible_future<std::vector<pg_log_entry_t>> flush_clone_metadata(
- std::vector<pg_log_entry_t>&& log_entries,
- SnapMapper& snap_mapper,
- OSDriver& osdriver,
- ceph::os::Transaction& txn);
-
private:
// this gizmo could be wrapped in std::optional for the sake of lazy
// initialization. we don't need it for ops that doesn't have effect
@@ -400,18 +393,25 @@ public:
execute_op(OSDOp& osd_op);
using rep_op_fut_tuple =
- std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
+ std::tuple<interruptible_future<>, interruptible_future<>>;
using rep_op_fut_t =
interruptible_future<rep_op_fut_tuple>;
- template <typename MutFunc>
- rep_op_fut_t flush_changes_n_do_ops_effects(
+ rep_op_fut_t flush_changes_and_submit(
const std::vector<OSDOp>& ops,
SnapMapper& snap_mapper,
- OSDriver& osdriver,
- MutFunc mut_func) &&;
- std::vector<pg_log_entry_t> prepare_transaction(
- const std::vector<OSDOp>& ops);
- void fill_op_params(modified_by m);
+ OSDriver& osdriver);
+ pg_log_entry_t prepare_head_update(
+ const std::vector<OSDOp>& ops,
+ ceph::os::Transaction &txn);
+
+ void check_init_op_params(OpsExecuter::modified_by m) {
+ if (!osd_op_params) {
+ osd_op_params.emplace();
+ osd_op_params->req_id = msg->get_reqid();
+ osd_op_params->mtime = msg->get_mtime();
+ osd_op_params->user_modify = (m == modified_by::user);
+ }
+ }
ObjectContextRef get_obc() const {
return obc;
@@ -446,7 +446,7 @@ public:
ObjectContextRef prepare_clone(
const hobject_t& coid,
- eversion_t version);
+ const ObjectState& initial_obs);
void apply_stats();
};
@@ -475,7 +475,7 @@ auto OpsExecuter::with_effect_on_obc(
effect_func(std::move(effect_func)),
obc(std::move(obc)) {
}
- osd_op_errorator::future<> execute(Ref<PG> pg) final {
+ seastar::future<> execute(Ref<PG> pg) final {
return std::move(effect_func)(std::move(ctx),
std::move(obc),
std::move(pg));
@@ -488,85 +488,21 @@ auto OpsExecuter::with_effect_on_obc(
return std::forward<MainFunc>(main_func)(ctx_ref);
}
-template <typename MutFunc>
-OpsExecuter::rep_op_fut_t
-OpsExecuter::flush_changes_n_do_ops_effects(
- const std::vector<OSDOp>& ops,
- SnapMapper& snap_mapper,
- OSDriver& osdriver,
- MutFunc mut_func) &&
-{
- const bool want_mutate = !txn.empty();
- // osd_op_params are instantiated by every wr-like operation.
- assert(osd_op_params || !want_mutate);
- assert(obc);
-
- auto submitted = interruptor::now();
- auto all_completed =
- interruptor::make_interruptible(osd_op_errorator::now());
-
- if (cloning_ctx) {
- ceph_assert(want_mutate);
- }
-
- if (want_mutate) {
- auto log_entries = co_await flush_clone_metadata(
- prepare_transaction(ops),
- snap_mapper,
- osdriver,
- txn);
-
- if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) {
- ceph_assert(log_rit->version == osd_op_params->at_version);
- }
-
- auto [_submitted, _all_completed] = co_await mut_func(
- std::move(txn),
- std::move(obc),
- std::move(*osd_op_params),
- std::move(log_entries));
-
- submitted = std::move(_submitted);
- all_completed = std::move(_all_completed);
- }
-
- apply_stats();
-
- if (op_effects.size()) [[unlikely]] {
- // need extra ref pg due to apply_stats() which can be executed after
- // informing snap mapper
- all_completed =
- std::move(all_completed).safe_then_interruptible([this, pg=this->pg] {
- // let's do the cleaning of `op_effects` in destructor
- return interruptor::do_for_each(op_effects,
- [pg=std::move(pg)](auto& op_effect) {
- return op_effect->execute(pg);
- });
- });
- }
-
- co_return std::make_tuple(
- std::move(submitted),
- std::move(all_completed));
-}
-
template <class Func>
struct OpsExecuter::RollbackHelper {
- void rollback_obc_if_modified(const std::error_code& e);
- seastar::lw_shared_ptr<OpsExecuter> ox;
+ void rollback_obc_if_modified();
+ OpsExecuter *ox;
Func func;
};
template <class Func>
inline OpsExecuter::RollbackHelper<Func>
OpsExecuter::create_rollbacker(Func&& func) {
- return {shared_from_this(), std::forward<Func>(func)};
+ return {this, std::forward<Func>(func)};
}
-
template <class Func>
-void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
- const std::error_code& e)
+void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified()
{
// Oops, an operation had failed. do_osd_ops() altogether with
// OpsExecuter already dropped the ObjectStore::Transaction if
@@ -584,10 +520,9 @@ void OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
assert(ox);
const auto need_rollback = ox->has_seen_write();
crimson::get_logger(ceph_subsys_osd).debug(
- "{}: object {} got error {}, need_rollback={}",
+ "{}: object {} got error, need_rollback={}",
__func__,
ox->obc->get_oid(),
- e,
need_rollback);
if (need_rollback) {
func(ox->obc);