diff options
author | Samuel Just <sjust@redhat.com> | 2024-10-25 01:39:18 +0200 |
---|---|---|
committer | Samuel Just <sjust@redhat.com> | 2024-12-11 03:02:01 +0100 |
commit | 54a42bef571b3d83a81e02595dd40edcf5054245 (patch) | |
tree | 90bea0ae3c13211b3161e42e99ecd86b98f7b504 /src/crimson | |
parent | crimson/.../pg: update debugging in PG::submit_error_log (diff) | |
download | ceph-54a42bef571b3d83a81e02595dd40edcf5054245.tar.xz ceph-54a42bef571b3d83a81e02595dd40edcf5054245.zip |
crimson: manage log submission atomicity independently of pipeline stages
It's important to construct and submit log entries atomically because
the submission order needs to match the entry numbering. Previously,
this occurred under the pg-wide exclusive process stage. Shortly, each
obc will have its own process stage, so we need to ensure that atomicity
seperately from the pipeline stages. Introduce a simple mutex.
Signed-off-by: Samuel Just <sjust@redhat.com>
Diffstat (limited to 'src/crimson')
-rw-r--r-- | src/crimson/osd/osd_operations/snaptrim_event.cc | 38 | ||||
-rw-r--r-- | src/crimson/osd/pg.cc | 21 | ||||
-rw-r--r-- | src/crimson/osd/pg.h | 1 |
3 files changed, 44 insertions, 16 deletions
diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index 7f78457167d..459c98bb9c0 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -414,21 +414,31 @@ SnapTrimObjSubEvent::start() logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid()); co_await enter_stage<interruptor>(client_pp().process); + auto all_completed = interruptor::now(); + { + // as with PG::submit_executer, we need to build the pg log entries + // and submit the transaction atomically + co_await interruptor::make_interruptible(pg->submit_lock.lock()); + auto unlocker = seastar::defer([this] { + pg->submit_lock.unlock(); + }); - logger().debug("{}: processing obc={}", *this, obc_manager.get_obc()->get_oid()); - - auto txn = co_await remove_or_update( - obc_manager.get_obc(), obc_manager.get_head_obc()); - - auto [submitted, all_completed] = co_await pg->submit_transaction( - ObjectContextRef(obc_manager.get_obc()), - nullptr, - std::move(txn), - std::move(osd_op_p), - std::move(log_entries) - ); - - co_await std::move(submitted); + logger().debug("{}: calling remove_or_update obc={}", + *this, obc_manager.get_obc()->get_oid()); + + auto txn = co_await remove_or_update( + obc_manager.get_obc(), obc_manager.get_head_obc()); + + auto submitted = interruptor::now(); + std::tie(submitted, all_completed) = co_await pg->submit_transaction( + ObjectContextRef(obc_manager.get_obc()), + nullptr, + std::move(txn), + std::move(osd_op_p), + std::move(log_entries) + ); + co_await std::move(submitted); + } co_await enter_stage<interruptor>(client_pp().wait_repop); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index a276b0e4366..c677961fe0f 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1039,6 +1039,12 @@ PG::interruptible_future<eversion_t> PG::submit_error_log( const std::error_code e, ceph_tid_t rep_tid) { + // as with submit_executer, need to ensure that log numbering and submission + // are atomic + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); LOG_PREFIX(PG::submit_error_log); DEBUGDPP("{} rep_tid: {} error: {}", *this, *m, rep_tid, e); @@ -1156,8 +1162,15 @@ PG::submit_executer_fut PG::submit_executer( OpsExecuter &&ox, const std::vector<OSDOp>& ops) { LOG_PREFIX(PG::submit_executer); - // transaction must commit at this point - return std::move( + + // we need to build the pg log entries and submit the transaction + // atomically to ensure log ordering + co_await interruptor::make_interruptible(submit_lock.lock()); + auto unlocker = seastar::defer([this] { + submit_lock.unlock(); + }); + + auto [submitted, completed] = co_await std::move( ox ).flush_changes_n_do_ops_effects( ops, @@ -1177,6 +1190,10 @@ PG::submit_executer_fut PG::submit_executer( std::move(osd_op_p), std::move(log_entries)); }); + + co_return std::make_tuple( + std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}), + std::move(completed)); } PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 15aeec0e4f3..2adaf69f26b 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -663,6 +663,7 @@ private: const OpInfo &op_info, std::vector<OSDOp>& ops); + seastar::shared_mutex submit_lock; using submit_executer_ret = std::tuple< interruptible_future<>, interruptible_future<>>; |