summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/replicated_backend.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/replicated_backend.cc')
-rw-r--r--src/crimson/osd/replicated_backend.cc134
1 files changed, 96 insertions, 38 deletions
diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc
index cbb8c883e07..6c8abecffaf 100644
--- a/src/crimson/osd/replicated_backend.cc
+++ b/src/crimson/osd/replicated_backend.cc
@@ -36,19 +36,59 @@ ReplicatedBackend::_read(const hobject_t& hoid,
return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
+MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
+ const pg_shard_t &pg_shard,
+ const hobject_t &hoid,
+ const bufferlist &encoded_txn,
+ const osd_op_params_t &osd_op_p,
+ epoch_t min_epoch,
+ epoch_t map_epoch,
+ const std::vector<pg_log_entry_t> &log_entries,
+ bool send_op,
+ ceph_tid_t tid)
+{
+ ceph_assert(pg_shard != whoami);
+ auto m = crimson::make_message<MOSDRepOp>(
+ osd_op_p.req_id,
+ whoami,
+ spg_t{pgid, pg_shard.shard},
+ hoid,
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+ map_epoch,
+ min_epoch,
+ tid,
+ osd_op_p.at_version);
+ if (send_op) {
+ m->set_data(encoded_txn);
+ } else {
+ ceph::os::Transaction t;
+ bufferlist bl;
+ encode(t, bl);
+ m->set_data(bl);
+ }
+ encode(log_entries, m->logbl);
+ m->pg_trim_to = osd_op_p.pg_trim_to;
+ m->pg_committed_to = osd_op_p.pg_committed_to;
+ m->pg_stats = pg.get_info().stats;
+ return m;
+}
+
ReplicatedBackend::rep_op_fut_t
-ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
- const hobject_t& hoid,
- ceph::os::Transaction&& t,
- osd_op_params_t&& opp,
- epoch_t min_epoch, epoch_t map_epoch,
- std::vector<pg_log_entry_t>&& logv)
+ReplicatedBackend::submit_transaction(
+ const std::set<pg_shard_t> &pg_shards,
+ const hobject_t& hoid,
+ crimson::osd::ObjectContextRef &&new_clone,
+ ceph::os::Transaction&& t,
+ osd_op_params_t&& opp,
+ epoch_t min_epoch, epoch_t map_epoch,
+ std::vector<pg_log_entry_t>&& logv)
{
LOG_PREFIX(ReplicatedBackend::submit_transaction);
DEBUGDPP("object {}", dpp, hoid);
auto log_entries = std::move(logv);
auto txn = std::move(t);
auto osd_op_p = std::move(opp);
+ auto _new_clone = std::move(new_clone);
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
@@ -56,50 +96,57 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
bufferlist encoded_txn;
encode(txn, encoded_txn);
+ bool is_delete = false;
for (auto &le : log_entries) {
le.mark_unrollbackable();
+ if (le.is_delete()) {
+ is_delete = true;
+ }
}
+ co_await pg.update_snap_map(log_entries, txn);
+
+ std::vector<pg_shard_t> to_push_clone;
+ std::vector<pg_shard_t> to_push_delete;
auto sends = std::make_unique<std::vector<seastar::future<>>>();
- for (auto pg_shard : pg_shards) {
- if (pg_shard != whoami) {
- auto m = crimson::make_message<MOSDRepOp>(
- osd_op_p.req_id,
- whoami,
- spg_t{pgid, pg_shard.shard},
- hoid,
- CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
- map_epoch,
- min_epoch,
- tid,
- osd_op_p.at_version);
- if (pg.should_send_op(pg_shard, hoid)) {
- m->set_data(encoded_txn);
- } else {
- ceph::os::Transaction t;
- bufferlist bl;
- encode(t, bl);
- m->set_data(bl);
+ for (auto &pg_shard : pg_shards) {
+ if (pg_shard == whoami) {
+ continue;
+ }
+ MURef<MOSDRepOp> m;
+ if (pg.should_send_op(pg_shard, hoid)) {
+ m = new_repop_msg(
+ pg_shard, hoid, encoded_txn, osd_op_p,
+ min_epoch, map_epoch, log_entries, true, tid);
+ } else {
+ m = new_repop_msg(
+ pg_shard, hoid, encoded_txn, osd_op_p,
+ min_epoch, map_epoch, log_entries, false, tid);
+ if (pg.is_missing_on_peer(pg_shard, hoid)) {
+ if (_new_clone) {
+ // The head is in the push queue but hasn't been pushed yet.
+ // We need to ensure that the newly created clone will be
+ // pushed as well, otherwise we might skip it.
+ // See: https://tracker.ceph.com/issues/68808
+ to_push_clone.push_back(pg_shard);
+ }
+ if (is_delete) {
+ to_push_delete.push_back(pg_shard);
+ }
}
- pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
- encode(log_entries, m->logbl);
- m->pg_trim_to = osd_op_p.pg_trim_to;
- m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
- m->pg_stats = pg.get_info().stats;
- // TODO: set more stuff. e.g., pg_states
- sends->emplace_back(
- shard_services.send_to_osd(
- pg_shard.osd, std::move(m), map_epoch));
}
+ pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+ // TODO: set more stuff. e.g., pg_states
+ sends->emplace_back(
+ shard_services.send_to_osd(
+ pg_shard.osd, std::move(m), map_epoch));
}
- co_await pg.update_snap_map(log_entries, txn);
-
pg.log_operation(
std::move(log_entries),
osd_op_p.pg_trim_to,
osd_op_p.at_version,
- osd_op_p.min_last_complete_ondisk,
+ osd_op_p.pg_committed_to,
true,
txn,
false);
@@ -120,9 +167,20 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
return seastar::now();
}
return peers->all_committed.get_shared_future();
- }).then_interruptible([pending_txn, this] {
+ }).then_interruptible([pending_txn, this, _new_clone, &hoid,
+ to_push_delete=std::move(to_push_delete),
+ to_push_clone=std::move(to_push_clone)] {
auto acked_peers = std::move(pending_txn->second.acked_peers);
pending_trans.erase(pending_txn);
+ if (_new_clone && !to_push_clone.empty()) {
+ pg.enqueue_push_for_backfill(
+ _new_clone->obs.oi.soid,
+ _new_clone->obs.oi.version,
+ to_push_clone);
+ }
+ if (!to_push_delete.empty()) {
+ pg.enqueue_delete_for_backfill(hoid, {}, to_push_delete);
+ }
return seastar::make_ready_future<
crimson::osd::acked_peers_t>(std::move(acked_peers));
});