summaryrefslogtreecommitdiffstats
path: root/src/crimson
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson')
-rw-r--r--src/crimson/osd/osd_operations/snaptrim_event.cc38
-rw-r--r--src/crimson/osd/pg.cc21
-rw-r--r--src/crimson/osd/pg.h1
3 files changed, 44 insertions, 16 deletions
diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc
index 7f78457167d..459c98bb9c0 100644
--- a/src/crimson/osd/osd_operations/snaptrim_event.cc
+++ b/src/crimson/osd/osd_operations/snaptrim_event.cc
@@ -414,21 +414,31 @@ SnapTrimObjSubEvent::start()
logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid());
co_await enter_stage<interruptor>(client_pp().process);
+ auto all_completed = interruptor::now();
+ {
+ // as with PG::submit_executer, we need to build the pg log entries
+ // and submit the transaction atomically
+ co_await interruptor::make_interruptible(pg->submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ pg->submit_lock.unlock();
+ });
- logger().debug("{}: processing obc={}", *this, obc_manager.get_obc()->get_oid());
-
- auto txn = co_await remove_or_update(
- obc_manager.get_obc(), obc_manager.get_head_obc());
-
- auto [submitted, all_completed] = co_await pg->submit_transaction(
- ObjectContextRef(obc_manager.get_obc()),
- nullptr,
- std::move(txn),
- std::move(osd_op_p),
- std::move(log_entries)
- );
-
- co_await std::move(submitted);
+ logger().debug("{}: calling remove_or_update obc={}",
+ *this, obc_manager.get_obc()->get_oid());
+
+ auto txn = co_await remove_or_update(
+ obc_manager.get_obc(), obc_manager.get_head_obc());
+
+ auto submitted = interruptor::now();
+ std::tie(submitted, all_completed) = co_await pg->submit_transaction(
+ ObjectContextRef(obc_manager.get_obc()),
+ nullptr,
+ std::move(txn),
+ std::move(osd_op_p),
+ std::move(log_entries)
+ );
+ co_await std::move(submitted);
+ }
co_await enter_stage<interruptor>(client_pp().wait_repop);
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc
index a276b0e4366..c677961fe0f 100644
--- a/src/crimson/osd/pg.cc
+++ b/src/crimson/osd/pg.cc
@@ -1039,6 +1039,12 @@ PG::interruptible_future<eversion_t> PG::submit_error_log(
const std::error_code e,
ceph_tid_t rep_tid)
{
+ // as with submit_executer, need to ensure that log numbering and submission
+ // are atomic
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
+ });
LOG_PREFIX(PG::submit_error_log);
DEBUGDPP("{} rep_tid: {} error: {}",
*this, *m, rep_tid, e);
@@ -1156,8 +1162,15 @@ PG::submit_executer_fut PG::submit_executer(
OpsExecuter &&ox,
const std::vector<OSDOp>& ops) {
LOG_PREFIX(PG::submit_executer);
- // transaction must commit at this point
- return std::move(
+
+ // we need to build the pg log entries and submit the transaction
+ // atomically to ensure log ordering
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
+ });
+
+ auto [submitted, completed] = co_await std::move(
ox
).flush_changes_n_do_ops_effects(
ops,
@@ -1177,6 +1190,10 @@ PG::submit_executer_fut PG::submit_executer(
std::move(osd_op_p),
std::move(log_entries));
});
+
+ co_return std::make_tuple(
+ std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}),
+ std::move(completed));
}
PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h
index 15aeec0e4f3..2adaf69f26b 100644
--- a/src/crimson/osd/pg.h
+++ b/src/crimson/osd/pg.h
@@ -663,6 +663,7 @@ private:
const OpInfo &op_info,
std::vector<OSDOp>& ops);
+ seastar::shared_mutex submit_lock;
using submit_executer_ret = std::tuple<
interruptible_future<>,
interruptible_future<>>;