diff options
Diffstat (limited to 'src/crimson/osd/replicated_backend.cc')
-rw-r--r-- | src/crimson/osd/replicated_backend.cc | 132 |
1 files changed, 95 insertions, 37 deletions
diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 12ee38b4370..6c8abecffaf 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -36,19 +36,59 @@ ReplicatedBackend::_read(const hobject_t& hoid, return store->read(coll, ghobject_t{hoid}, off, len, flags); } +MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg( + const pg_shard_t &pg_shard, + const hobject_t &hoid, + const bufferlist &encoded_txn, + const osd_op_params_t &osd_op_p, + epoch_t min_epoch, + epoch_t map_epoch, + const std::vector<pg_log_entry_t> &log_entries, + bool send_op, + ceph_tid_t tid) +{ + ceph_assert(pg_shard != whoami); + auto m = crimson::make_message<MOSDRepOp>( + osd_op_p.req_id, + whoami, + spg_t{pgid, pg_shard.shard}, + hoid, + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, + map_epoch, + min_epoch, + tid, + osd_op_p.at_version); + if (send_op) { + m->set_data(encoded_txn); + } else { + ceph::os::Transaction t; + bufferlist bl; + encode(t, bl); + m->set_data(bl); + } + encode(log_entries, m->logbl); + m->pg_trim_to = osd_op_p.pg_trim_to; + m->pg_committed_to = osd_op_p.pg_committed_to; + m->pg_stats = pg.get_info().stats; + return m; +} + ReplicatedBackend::rep_op_fut_t -ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards, - const hobject_t& hoid, - ceph::os::Transaction&& t, - osd_op_params_t&& opp, - epoch_t min_epoch, epoch_t map_epoch, - std::vector<pg_log_entry_t>&& logv) +ReplicatedBackend::submit_transaction( + const std::set<pg_shard_t> &pg_shards, + const hobject_t& hoid, + crimson::osd::ObjectContextRef &&new_clone, + ceph::os::Transaction&& t, + osd_op_params_t&& opp, + epoch_t min_epoch, epoch_t map_epoch, + std::vector<pg_log_entry_t>&& logv) { LOG_PREFIX(ReplicatedBackend::submit_transaction); DEBUGDPP("object {}", dpp, hoid); auto log_entries = std::move(logv); auto txn = std::move(t); auto osd_op_p = std::move(opp); + auto _new_clone = std::move(new_clone); const ceph_tid_t tid = shard_services.get_tid(); auto pending_txn = @@ -56,45 +96,52 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards, bufferlist encoded_txn; encode(txn, encoded_txn); + bool is_delete = false; for (auto &le : log_entries) { le.mark_unrollbackable(); + if (le.is_delete()) { + is_delete = true; + } } + co_await pg.update_snap_map(log_entries, txn); + + std::vector<pg_shard_t> to_push_clone; + std::vector<pg_shard_t> to_push_delete; auto sends = std::make_unique<std::vector<seastar::future<>>>(); - for (auto pg_shard : pg_shards) { - if (pg_shard != whoami) { - auto m = crimson::make_message<MOSDRepOp>( - osd_op_p.req_id, - whoami, - spg_t{pgid, pg_shard.shard}, - hoid, - CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, - map_epoch, - min_epoch, - tid, - osd_op_p.at_version); - if (pg.should_send_op(pg_shard, hoid)) { - m->set_data(encoded_txn); - } else { - ceph::os::Transaction t; - bufferlist bl; - encode(t, bl); - m->set_data(bl); + for (auto &pg_shard : pg_shards) { + if (pg_shard == whoami) { + continue; + } + MURef<MOSDRepOp> m; + if (pg.should_send_op(pg_shard, hoid)) { + m = new_repop_msg( + pg_shard, hoid, encoded_txn, osd_op_p, + min_epoch, map_epoch, log_entries, true, tid); + } else { + m = new_repop_msg( + pg_shard, hoid, encoded_txn, osd_op_p, + min_epoch, map_epoch, log_entries, false, tid); + if (pg.is_missing_on_peer(pg_shard, hoid)) { + if (_new_clone) { + // The head is in the push queue but hasn't been pushed yet. + // We need to ensure that the newly created clone will be + // pushed as well, otherwise we might skip it. + // See: https://tracker.ceph.com/issues/68808 + to_push_clone.push_back(pg_shard); + } + if (is_delete) { + to_push_delete.push_back(pg_shard); + } } - pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); - encode(log_entries, m->logbl); - m->pg_trim_to = osd_op_p.pg_trim_to; - m->pg_committed_to = osd_op_p.pg_committed_to; - m->pg_stats = pg.get_info().stats; - // TODO: set more stuff. e.g., pg_states - sends->emplace_back( - shard_services.send_to_osd( - pg_shard.osd, std::move(m), map_epoch)); } + pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); + // TODO: set more stuff. e.g., pg_states + sends->emplace_back( + shard_services.send_to_osd( + pg_shard.osd, std::move(m), map_epoch)); } - co_await pg.update_snap_map(log_entries, txn); - pg.log_operation( std::move(log_entries), osd_op_p.pg_trim_to, @@ -120,9 +167,20 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards, return seastar::now(); } return peers->all_committed.get_shared_future(); - }).then_interruptible([pending_txn, this] { + }).then_interruptible([pending_txn, this, _new_clone, &hoid, + to_push_delete=std::move(to_push_delete), + to_push_clone=std::move(to_push_clone)] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); + if (_new_clone && !to_push_clone.empty()) { + pg.enqueue_push_for_backfill( + _new_clone->obs.oi.soid, + _new_clone->obs.oi.version, + to_push_clone); + } + if (!to_push_delete.empty()) { + pg.enqueue_delete_for_backfill(hoid, {}, to_push_delete); + } return seastar::make_ready_future< crimson::osd::acked_peers_t>(std::move(acked_peers)); }); |