summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/pg.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/pg.cc')
-rw-r--r--src/crimson/osd/pg.cc323
1 files changed, 157 insertions, 166 deletions
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc
index 744a1dbc02b..2746e730f2b 100644
--- a/src/crimson/osd/pg.cc
+++ b/src/crimson/osd/pg.cc
@@ -132,6 +132,7 @@ PG::PG(
pool,
name),
osdmap,
+ PG_FEATURE_CRIMSON_ALL,
this,
this),
scrubber(*this),
@@ -392,7 +393,13 @@ void PG::on_replica_activate()
void PG::on_activate_complete()
{
- wait_for_active_blocker.unblock();
+ /* Confusingly, on_activate_complete is invoked when the primary and replicas
+ * have recorded the current interval. At that point, the PG may either become
+ * ACTIVE or PEERED, depending on whether the acting set is eligible for client
+ * IO. Only unblock wait_for_active_blocker if we actually became ACTIVE */
+ if (peering_state.is_active()) {
+ wait_for_active_blocker.unblock();
+ }
if (peering_state.needs_recovery()) {
logger().info("{}: requesting recovery",
@@ -861,43 +868,26 @@ std::ostream& operator<<(std::ostream& os, const PG& pg)
return os;
}
-void PG::mutate_object(
- ObjectContextRef& obc,
- ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_p)
+void PG::enqueue_push_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers)
{
- if (obc->obs.exists) {
- obc->obs.oi.prior_version = obc->obs.oi.version;
- obc->obs.oi.version = osd_op_p.at_version;
- if (osd_op_p.user_modify)
- obc->obs.oi.user_version = osd_op_p.at_version.version;
- obc->obs.oi.last_reqid = osd_op_p.req_id;
- obc->obs.oi.mtime = osd_op_p.mtime;
- obc->obs.oi.local_mtime = ceph_clock_now();
-
- // object_info_t
- {
- ceph::bufferlist osv;
- obc->obs.oi.encode_no_oid(osv, CEPH_FEATURES_ALL);
- // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
- txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
- }
+ assert(recovery_handler);
+ assert(recovery_handler->backfill_state);
+ auto backfill_state = recovery_handler->backfill_state.get();
+ backfill_state->enqueue_standalone_push(obj, v, peers);
+}
- // snapset
- if (obc->obs.oi.soid.snap == CEPH_NOSNAP) {
- logger().debug("final snapset {} in {}",
- obc->ssc->snapset, obc->obs.oi.soid);
- ceph::bufferlist bss;
- encode(obc->ssc->snapset, bss);
- txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss);
- obc->ssc->exists = true;
- } else {
- logger().debug("no snapset (this is a clone)");
- }
- } else {
- // reset cached ObjectState without enforcing eviction
- obc->obs.oi = object_info_t(obc->obs.oi.soid);
- }
+void PG::enqueue_delete_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers)
+{
+ assert(recovery_handler);
+ assert(recovery_handler->backfill_state);
+ auto backfill_state = recovery_handler->backfill_state.get();
+ backfill_state->enqueue_standalone_delete(obj, v, peers);
}
PG::interruptible_future<
@@ -905,6 +895,7 @@ PG::interruptible_future<
PG::interruptible_future<>>>
PG::submit_transaction(
ObjectContextRef&& obc,
+ ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
std::vector<pg_log_entry_t>&& log_entries)
@@ -917,17 +908,23 @@ PG::submit_transaction(
}
epoch_t map_epoch = get_osdmap_epoch();
+ auto at_version = osd_op_p.at_version;
- peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
+ peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, at_version);
peering_state.update_trim_to();
ceph_assert(!log_entries.empty());
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
projected_last_update = log_entries.rbegin()->version;
+ for (const auto& entry: log_entries) {
+ projected_log.add(entry);
+ }
+
auto [submitted, all_completed] = co_await backend->submit_transaction(
peering_state.get_acting_recovery_backfill(),
obc->obs.oi.soid,
+ std::move(new_clone),
std::move(txn),
std::move(osd_op_p),
peering_state.get_last_peering_reset(),
@@ -936,8 +933,8 @@ PG::submit_transaction(
co_return std::make_tuple(
std::move(submitted),
all_completed.then_interruptible(
- [this, last_complete=peering_state.get_info().last_complete,
- at_version=osd_op_p.at_version](auto acked) {
+ [this, last_complete=peering_state.get_info().last_complete, at_version]
+ (auto acked) {
for (const auto& peer : acked) {
peering_state.update_peer_last_complete_ondisk(
peer.shard, peer.last_complete_ondisk);
@@ -1014,8 +1011,15 @@ PG::interruptible_future<eversion_t> PG::submit_error_log(
const std::error_code e,
ceph_tid_t rep_tid)
{
- logger().debug("{}: {} rep_tid: {} error: {}",
- __func__, *m, rep_tid, e);
+ // as with submit_executer, need to ensure that log numbering and submission
+ // are atomic
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
+ });
+ LOG_PREFIX(PG::submit_error_log);
+ DEBUGDPP("{} rep_tid: {} error: {}",
+ *this, *m, rep_tid, e);
const osd_reqid_t &reqid = m->get_reqid();
mempool::osd_pglog::list<pg_log_entry_t> log_entries;
log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
@@ -1034,49 +1038,47 @@ PG::interruptible_future<eversion_t> PG::submit_error_log(
ceph::os::Transaction t;
peering_state.merge_new_log_entries(
log_entries, t, peering_state.get_pg_trim_to(),
- peering_state.get_min_last_complete_ondisk());
-
- return seastar::do_with(log_entries, set<pg_shard_t>{},
- [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable {
- return interruptor::do_for_each(get_acting_recovery_backfill(),
- [this, log_entries, waiting_on, rep_tid]
- (auto& i) mutable {
- pg_shard_t peer(i);
- if (peer == pg_whoami) {
- return seastar::now();
- }
- ceph_assert(peering_state.get_peer_missing().count(peer));
- ceph_assert(peering_state.has_peer_info(peer));
- auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
- log_entries,
- spg_t(peering_state.get_info().pgid.pgid, i.shard),
- pg_whoami.shard,
- get_osdmap_epoch(),
- get_last_peering_reset(),
- rep_tid,
- peering_state.get_pg_trim_to(),
- peering_state.get_min_last_complete_ondisk());
- waiting_on.insert(peer);
- logger().debug("submit_error_log: sending log"
- "missing_request (rep_tid: {} entries: {})"
- " to osd {}", rep_tid, log_entries, peer.osd);
- return shard_services.send_to_osd(peer.osd,
- std::move(log_m),
- get_osdmap_epoch());
- }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable {
- waiting_on.insert(pg_whoami);
- logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
- log_entry_update_waiting_on.insert(
- std::make_pair(rep_tid,
- log_update_t{std::move(waiting_on)}));
- return shard_services.get_store().do_transaction(
- get_collection_ref(), std::move(t)
- ).then([this] {
- peering_state.update_trim_to();
- return seastar::make_ready_future<eversion_t>(projected_last_update);
- });
- });
- });
+ peering_state.get_pg_committed_to());
+
+
+ set<pg_shard_t> waiting_on;
+ for (const auto &peer: get_acting_recovery_backfill()) {
+ if (peer == pg_whoami) {
+ continue;
+ }
+ ceph_assert(peering_state.get_peer_missing().count(peer));
+ ceph_assert(peering_state.has_peer_info(peer));
+ auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+ log_entries,
+ spg_t(peering_state.get_info().pgid.pgid, peer.shard),
+ pg_whoami.shard,
+ get_osdmap_epoch(),
+ get_last_peering_reset(),
+ rep_tid,
+ peering_state.get_pg_trim_to(),
+ peering_state.get_pg_committed_to());
+ waiting_on.insert(peer);
+
+ DEBUGDPP("sending log missing_request (rep_tid: {} entries: {}) to osd {}",
+ *this, rep_tid, log_entries, peer.osd);
+ co_await interruptor::make_interruptible(
+ shard_services.send_to_osd(
+ peer.osd,
+ std::move(log_m),
+ get_osdmap_epoch()));
+ }
+ waiting_on.insert(pg_whoami);
+ DEBUGDPP("inserting rep_tid {}", *this, rep_tid);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid,
+ log_update_t{std::move(waiting_on)}));
+ co_await interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(
+ get_collection_ref(), std::move(t)
+ ));
+
+ peering_state.update_trim_to();
+ co_return projected_last_update;
}
PG::run_executer_fut PG::run_executer(
@@ -1132,25 +1134,25 @@ PG::submit_executer_fut PG::submit_executer(
OpsExecuter &&ox,
const std::vector<OSDOp>& ops) {
LOG_PREFIX(PG::submit_executer);
- // transaction must commit at this point
- return std::move(
+ DEBUGDPP("", *this);
+
+ // we need to build the pg log entries and submit the transaction
+ // atomically to ensure log ordering
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
+ });
+
+ auto [submitted, completed] = co_await std::move(
ox
- ).flush_changes_n_do_ops_effects(
+ ).flush_changes_and_submit(
ops,
snap_mapper,
- osdriver,
- [FNAME, this](auto&& txn,
- auto&& obc,
- auto&& osd_op_p,
- auto&& log_entries) {
- DEBUGDPP("object {} submitting txn", *this, obc->get_oid());
- mutate_object(obc, txn, osd_op_p);
- return submit_transaction(
- std::move(obc),
- std::move(txn),
- std::move(osd_op_p),
- std::move(log_entries));
- });
+ osdriver
+ );
+ co_return std::make_tuple(
+ std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}),
+ std::move(completed));
}
PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
@@ -1215,31 +1217,6 @@ void PG::check_blocklisted_obc_watchers(
}
}
-PG::load_obc_iertr::future<>
-PG::with_locked_obc(const hobject_t &hobj,
- const OpInfo &op_info,
- with_obc_func_t &&f)
-{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
- const hobject_t oid = get_oid(hobj);
- auto wrapper = [f=std::move(f), this](auto head, auto obc) {
- check_blocklisted_obc_watchers(obc);
- return f(head, obc);
- };
- switch (get_lock_type(op_info)) {
- case RWState::RWREAD:
- return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper));
- case RWState::RWWRITE:
- return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper));
- case RWState::RWEXCL:
- return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper));
- default:
- ceph_abort();
- };
-}
-
void PG::update_stats(const pg_stat_t &stat) {
peering_state.update_stats(
[&stat] (auto& history, auto& stats) {
@@ -1249,13 +1226,10 @@ void PG::update_stats(const pg_stat_t &stat) {
);
}
-PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req)
{
LOG_PREFIX(PG::handle_rep_op);
DEBUGDPP("{}", *this, *req);
- if (can_discard_replica_op(*req)) {
- co_return;
- }
ceph::os::Transaction txn;
auto encoded_txn = req->get_data().cbegin();
@@ -1272,12 +1246,13 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
log_operation(std::move(log_entries),
req->pg_trim_to,
req->version,
- req->min_last_complete_ondisk,
+ req->pg_committed_to,
!txn.empty(),
txn,
false);
DEBUGDPP("{} do_transaction", *this, *req);
- co_await interruptor::make_interruptible(
+
+ auto commit_fut = interruptor::make_interruptible(
shard_services.get_store().do_transaction(coll_ref, std::move(txn))
);
@@ -1288,10 +1263,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
req.get(), pg_whoami, 0,
map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(lcod);
- co_await interruptor::make_interruptible(
- shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch)
- );
- co_return;
+ co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply));
}
PG::interruptible_future<> PG::update_snap_map(
@@ -1318,28 +1290,25 @@ void PG::log_operation(
std::vector<pg_log_entry_t>&& logv,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
- const eversion_t &min_last_complete_ondisk,
+ const eversion_t &pg_committed_to,
bool transaction_applied,
ObjectStore::Transaction &txn,
bool async) {
- logger().debug("{}", __func__);
+ LOG_PREFIX(PG::log_operation);
+ DEBUGDPP("", *this);
if (is_primary()) {
- ceph_assert(trim_to <= peering_state.get_last_update_ondisk());
+ ceph_assert(trim_to <= peering_state.get_pg_committed_to());
}
- /* TODO: when we add snap mapper and projected log support,
- * we'll likely want to update them here.
- *
- * See src/osd/PrimaryLogPG.h:log_operation for how classic
- * handles these cases.
- */
-#if 0
auto last = logv.rbegin();
if (is_primary() && last != logv.rend()) {
+ DEBUGDPP("on primary, trimming projected log", *this);
projected_log.skip_can_rollback_to_to_head();
- projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
+ projected_log.trim(shard_services.get_cct(), last->version,
+ nullptr, nullptr, nullptr);
}
-#endif
+
if (!is_primary()) { // && !is_ec_pg()
+ DEBUGDPP("on replica, clearing obc", *this);
replica_clear_repop_obc(logv);
}
if (!logv.empty()) {
@@ -1348,7 +1317,7 @@ void PG::log_operation(
peering_state.append_log(std::move(logv),
trim_to,
roll_forward_to,
- min_last_complete_ondisk,
+ pg_committed_to,
txn,
!txn.empty(),
false);
@@ -1356,13 +1325,13 @@ void PG::log_operation(
void PG::replica_clear_repop_obc(
const std::vector<pg_log_entry_t> &logv) {
- logger().debug("{} clearing {} entries", __func__, logv.size());
- for (auto &&e: logv) {
- logger().debug(" {} get_object_boundary(from): {} "
- " head version(to): {}",
- e.soid,
- e.soid.get_object_boundary(),
- e.soid.get_head());
+ LOG_PREFIX(PG::replica_clear_repop_obc);
+ DEBUGDPP("clearing obc for {} log entries", logv.size());
+ for (auto &&e: logv) {
+ DEBUGDPP("clearing entry for {} from: {} to: {}",
+ e.soid,
+ e.soid.get_object_boundary(),
+ e.soid.get_head());
/* Have to blast all clones, they share a snapset */
obc_registry.clear_range(
e.soid.get_object_boundary(), e.soid.get_head());
@@ -1387,17 +1356,17 @@ PG::interruptible_future<> PG::do_update_log_missing(
ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
- std::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ std::optional<eversion_t> op_trim_to, op_pg_committed_to;
if (m->pg_trim_to != eversion_t())
op_trim_to = m->pg_trim_to;
- if (m->pg_roll_forward_to != eversion_t())
- op_roll_forward_to = m->pg_roll_forward_to;
- logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
+ if (m->pg_committed_to != eversion_t())
+ op_pg_committed_to = m->pg_committed_to;
+ logger().debug("op_trim_to = {}, op_pg_committed_to = {}",
op_trim_to.has_value() ? *op_trim_to : eversion_t(),
- op_roll_forward_to.has_value() ? *op_roll_forward_to : eversion_t());
+ op_pg_committed_to.has_value() ? *op_pg_committed_to : eversion_t());
peering_state.append_log_entries_update_missing(
- m->entries, t, op_trim_to, op_roll_forward_to);
+ m->entries, t, op_trim_to, op_pg_committed_to);
return interruptor::make_interruptible(shard_services.get_store().do_transaction(
coll_ref, std::move(t))).then_interruptible(
@@ -1615,14 +1584,21 @@ bool PG::should_send_op(
return true;
bool should_send =
(hoid.pool != (int64_t)get_info().pgid.pool() ||
- (has_backfill_state() && hoid <= get_last_backfill_started()) ||
- hoid <= peering_state.get_peer_info(peer).last_backfill);
+ // An object has been fully pushed to the backfill target if and only if
+ // either of the following conditions is met:
+ // 1. peer_info.last_backfill has passed "hoid"
+ // 2. last_backfill_started has passed "hoid" and "hoid" is not in the peer
+ // missing set
+ hoid <= peering_state.get_peer_info(peer).last_backfill ||
+ (has_backfill_state() && hoid <= get_last_backfill_started() &&
+ !is_missing_on_peer(peer, hoid)));
if (!should_send) {
ceph_assert(is_backfill_target(peer));
logger().debug("{} issue_repop shipping empty opt to osd."
"{}, object {} beyond std::max(last_backfill_started, "
"peer_info[peer].last_backfill {})",
- peer, hoid, peering_state.get_peer_info(peer).last_backfill);
+ __func__, peer, hoid,
+ peering_state.get_peer_info(peer).last_backfill);
}
return should_send;
// TODO: should consider async recovery cases in the future which are not supported
@@ -1637,8 +1613,8 @@ PG::already_complete(const osd_reqid_t& reqid)
int ret;
std::vector<pg_log_op_return_item_t> op_returns;
- if (peering_state.get_pg_log().get_log().get_request(
- reqid, &version, &user_version, &ret, &op_returns)) {
+ if (check_in_progress_op(
+ reqid, &version, &user_version, &ret, &op_returns)) {
complete_op_t dupinfo{
user_version,
version,
@@ -1703,4 +1679,19 @@ void PG::C_PG_FinishRecovery::finish(int r) {
DEBUGDPP("stale recovery finsher", pg);
}
}
+bool PG::check_in_progress_op(
+ const osd_reqid_t& reqid,
+ eversion_t *version,
+ version_t *user_version,
+ int *return_code,
+ std::vector<pg_log_op_return_item_t> *op_returns
+ ) const
+{
+ return (
+ projected_log.get_request(reqid, version, user_version, return_code,
+ op_returns) ||
+ peering_state.get_pg_log().get_log().get_request(
+ reqid, version, user_version, return_code, op_returns));
+}
+
}