summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/pg.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/pg.cc')
-rw-r--r--src/crimson/osd/pg.cc620
1 files changed, 206 insertions, 414 deletions
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc
index 97d48c1fa45..2746e730f2b 100644
--- a/src/crimson/osd/pg.cc
+++ b/src/crimson/osd/pg.cc
@@ -13,6 +13,9 @@
#include <boost/range/numeric.hpp>
#include <fmt/format.h>
#include <fmt/ostream.h>
+
+#include <seastar/util/defer.hh>
+
#include "include/utime_fmt.h"
#include "common/hobject.h"
@@ -129,6 +132,7 @@ PG::PG(
pool,
name),
osdmap,
+ PG_FEATURE_CRIMSON_ALL,
this,
this),
scrubber(*this),
@@ -389,7 +393,13 @@ void PG::on_replica_activate()
void PG::on_activate_complete()
{
- wait_for_active_blocker.unblock();
+ /* Confusingly, on_activate_complete is invoked when the primary and replicas
+ * have recorded the current interval. At that point, the PG may either become
+ * ACTIVE or PEERED, depending on whether the acting set is eligible for client
+ * IO. Only unblock wait_for_active_blocker if we actually became ACTIVE */
+ if (peering_state.is_active()) {
+ wait_for_active_blocker.unblock();
+ }
if (peering_state.needs_recovery()) {
logger().info("{}: requesting recovery",
@@ -858,43 +868,26 @@ std::ostream& operator<<(std::ostream& os, const PG& pg)
return os;
}
-void PG::mutate_object(
- ObjectContextRef& obc,
- ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_p)
+void PG::enqueue_push_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers)
{
- if (obc->obs.exists) {
- obc->obs.oi.prior_version = obc->obs.oi.version;
- obc->obs.oi.version = osd_op_p.at_version;
- if (osd_op_p.user_modify)
- obc->obs.oi.user_version = osd_op_p.at_version.version;
- obc->obs.oi.last_reqid = osd_op_p.req_id;
- obc->obs.oi.mtime = osd_op_p.mtime;
- obc->obs.oi.local_mtime = ceph_clock_now();
-
- // object_info_t
- {
- ceph::bufferlist osv;
- obc->obs.oi.encode_no_oid(osv, CEPH_FEATURES_ALL);
- // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
- txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
- }
+ assert(recovery_handler);
+ assert(recovery_handler->backfill_state);
+ auto backfill_state = recovery_handler->backfill_state.get();
+ backfill_state->enqueue_standalone_push(obj, v, peers);
+}
- // snapset
- if (obc->obs.oi.soid.snap == CEPH_NOSNAP) {
- logger().debug("final snapset {} in {}",
- obc->ssc->snapset, obc->obs.oi.soid);
- ceph::bufferlist bss;
- encode(obc->ssc->snapset, bss);
- txn.setattr(coll_ref->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss);
- obc->ssc->exists = true;
- } else {
- logger().debug("no snapset (this is a clone)");
- }
- } else {
- // reset cached ObjectState without enforcing eviction
- obc->obs.oi = object_info_t(obc->obs.oi.soid);
- }
+void PG::enqueue_delete_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers)
+{
+ assert(recovery_handler);
+ assert(recovery_handler->backfill_state);
+ auto backfill_state = recovery_handler->backfill_state.get();
+ backfill_state->enqueue_standalone_delete(obj, v, peers);
}
PG::interruptible_future<
@@ -902,6 +895,7 @@ PG::interruptible_future<
PG::interruptible_future<>>>
PG::submit_transaction(
ObjectContextRef&& obc,
+ ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
std::vector<pg_log_entry_t>&& log_entries)
@@ -914,17 +908,23 @@ PG::submit_transaction(
}
epoch_t map_epoch = get_osdmap_epoch();
+ auto at_version = osd_op_p.at_version;
- peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
+ peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, at_version);
peering_state.update_trim_to();
ceph_assert(!log_entries.empty());
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
projected_last_update = log_entries.rbegin()->version;
+ for (const auto& entry: log_entries) {
+ projected_log.add(entry);
+ }
+
auto [submitted, all_completed] = co_await backend->submit_transaction(
peering_state.get_acting_recovery_backfill(),
obc->obs.oi.soid,
+ std::move(new_clone),
std::move(txn),
std::move(osd_op_p),
peering_state.get_last_peering_reset(),
@@ -933,8 +933,8 @@ PG::submit_transaction(
co_return std::make_tuple(
std::move(submitted),
all_completed.then_interruptible(
- [this, last_complete=peering_state.get_info().last_complete,
- at_version=osd_op_p.at_version](auto acked) {
+ [this, last_complete=peering_state.get_info().last_complete, at_version]
+ (auto acked) {
for (const auto& peer : acked) {
peering_state.update_peer_last_complete_ondisk(
peer.shard, peer.last_complete_ondisk);
@@ -975,150 +975,6 @@ ObjectContextRef duplicate_obc(const ObjectContextRef &obc) {
return object_context;
}
-template <class Ret, class SuccessFunc, class FailureFunc>
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
-PG::do_osd_ops_execute(
- seastar::lw_shared_ptr<OpsExecuter> ox,
- ObjectContextRef obc,
- const OpInfo &op_info,
- Ref<MOSDOp> m,
- std::vector<OSDOp>& ops,
- SuccessFunc&& success_func,
- FailureFunc&& failure_func)
-{
- assert(ox);
- auto rollbacker = ox->create_rollbacker(
- [object_context=duplicate_obc(obc)] (auto& obc) mutable {
- obc->update_from(*object_context);
- });
- auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
- return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
- logger().debug(
- "do_osd_ops_execute: object {} - handling op {}",
- ox->get_target(),
- ceph_osd_op_name(osd_op.op.op));
- return ox->execute_op(osd_op);
- }).safe_then_interruptible([this, ox, &ops] {
- logger().debug(
- "do_osd_ops_execute: object {} all operations successful",
- ox->get_target());
- // check for full
- if ((ox->delta_stats.num_bytes > 0 ||
- ox->delta_stats.num_objects > 0) &&
- get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
- const auto& m = ox->get_message();
- if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now
- m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
- logger().info(" full, but proceeding due to FULL_FORCE or MDS");
- } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
- // they tried, they failed.
- logger().info(" full, replying to FULL_TRY op");
- if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA))
- return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
- seastar::now(),
- OpsExecuter::osd_op_ierrorator::future<>(
- crimson::ct_error::edquot::make()));
- else
- return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
- seastar::now(),
- OpsExecuter::osd_op_ierrorator::future<>(
- crimson::ct_error::enospc::make()));
- } else {
- // drop request
- logger().info(" full, dropping request (bad client)");
- return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
- seastar::now(),
- OpsExecuter::osd_op_ierrorator::future<>(
- crimson::ct_error::eagain::make()));
- }
- }
- return std::move(*ox).flush_changes_n_do_ops_effects(
- ops,
- snap_mapper,
- osdriver,
- [this] (auto&& txn,
- auto&& obc,
- auto&& osd_op_p,
- auto&& log_entries) {
- logger().debug(
- "do_osd_ops_execute: object {} submitting txn",
- obc->get_oid());
- mutate_object(obc, txn, osd_op_p);
- return submit_transaction(
- std::move(obc),
- std::move(txn),
- std::move(osd_op_p),
- std::move(log_entries));
- });
- }).safe_then_unpack_interruptible(
- [success_func=std::move(success_func), rollbacker, this, failure_func_ptr, obc]
- (auto submitted_fut, auto _all_completed_fut) mutable {
-
- auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple(
- std::move(success_func),
- crimson::ct_error::object_corrupted::handle(
- [rollbacker, this, obc] (const std::error_code& e) mutable {
- // this is a path for EIO. it's special because we want to fix the obejct
- // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
- // restart the execution.
- rollbacker.rollback_obc_if_modified(e);
- return repair_object(obc->obs.oi.soid,
- obc->obs.oi.version
- ).then_interruptible([] {
- return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
- });
- }), OpsExecuter::osd_op_errorator::all_same_way(
- [rollbacker, failure_func_ptr]
- (const std::error_code& e) mutable {
- // handle non-fatal errors only
- ceph_assert(e.value() == EDQUOT ||
- e.value() == ENOSPC ||
- e.value() == EAGAIN);
- rollbacker.rollback_obc_if_modified(e);
- return (*failure_func_ptr)(e);
- }));
-
- return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
- std::move(submitted_fut),
- std::move(all_completed_fut)
- );
- }, OpsExecuter::osd_op_errorator::all_same_way(
- [this, op_info, m, obc,
- rollbacker, failure_func_ptr]
- (const std::error_code& e) mutable {
- ceph_tid_t rep_tid = shard_services.get_tid();
- rollbacker.rollback_obc_if_modified(e);
- // record error log
- auto maybe_submit_error_log =
- interruptor::make_ready_future<std::optional<eversion_t>>(std::nullopt);
- // call submit_error_log only for non-internal clients
- if constexpr (!std::is_same_v<Ret, void>) {
- if(op_info.may_write()) {
- maybe_submit_error_log =
- submit_error_log(m, op_info, obc, e, rep_tid);
- }
- }
- return maybe_submit_error_log.then_interruptible(
- [this, failure_func_ptr, e, rep_tid] (auto version) {
- auto all_completed =
- [this, failure_func_ptr, e, rep_tid, version] {
- if (version.has_value()) {
- return complete_error_log(rep_tid, version.value()
- ).then_interruptible([failure_func_ptr, e] {
- return (*failure_func_ptr)(e);
- });
- } else {
- return (*failure_func_ptr)(e);
- }
- };
- return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
- std::move(seastar::now()),
- std::move(all_completed())
- );
- });
- }));
-}
-
PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
const eversion_t& version)
{
@@ -1148,15 +1004,22 @@ PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
return result;
}
-PG::interruptible_future<std::optional<eversion_t>> PG::submit_error_log(
+PG::interruptible_future<eversion_t> PG::submit_error_log(
Ref<MOSDOp> m,
const OpInfo &op_info,
ObjectContextRef obc,
const std::error_code e,
ceph_tid_t rep_tid)
{
- logger().debug("{}: {} rep_tid: {} error: {}",
- __func__, *m, rep_tid, e);
+ // as with submit_executer, need to ensure that log numbering and submission
+ // are atomic
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
+ });
+ LOG_PREFIX(PG::submit_error_log);
+ DEBUGDPP("{} rep_tid: {} error: {}",
+ *this, *m, rep_tid, e);
const osd_reqid_t &reqid = m->get_reqid();
mempool::osd_pglog::list<pg_log_entry_t> log_entries;
log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
@@ -1175,181 +1038,121 @@ PG::interruptible_future<std::optional<eversion_t>> PG::submit_error_log(
ceph::os::Transaction t;
peering_state.merge_new_log_entries(
log_entries, t, peering_state.get_pg_trim_to(),
- peering_state.get_min_last_complete_ondisk());
-
- return seastar::do_with(log_entries, set<pg_shard_t>{},
- [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable {
- return interruptor::do_for_each(get_acting_recovery_backfill(),
- [this, log_entries, waiting_on, rep_tid]
- (auto& i) mutable {
- pg_shard_t peer(i);
- if (peer == pg_whoami) {
- return seastar::now();
- }
- ceph_assert(peering_state.get_peer_missing().count(peer));
- ceph_assert(peering_state.has_peer_info(peer));
- auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
- log_entries,
- spg_t(peering_state.get_info().pgid.pgid, i.shard),
- pg_whoami.shard,
- get_osdmap_epoch(),
- get_last_peering_reset(),
- rep_tid,
- peering_state.get_pg_trim_to(),
- peering_state.get_min_last_complete_ondisk());
- waiting_on.insert(peer);
- logger().debug("submit_error_log: sending log"
- "missing_request (rep_tid: {} entries: {})"
- " to osd {}", rep_tid, log_entries, peer.osd);
- return shard_services.send_to_osd(peer.osd,
- std::move(log_m),
- get_osdmap_epoch());
- }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable {
- waiting_on.insert(pg_whoami);
- logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
- log_entry_update_waiting_on.insert(
- std::make_pair(rep_tid,
- log_update_t{std::move(waiting_on)}));
- return shard_services.get_store().do_transaction(
- get_collection_ref(), std::move(t)
- ).then([this] {
- peering_state.update_trim_to();
- return seastar::make_ready_future<std::optional<eversion_t>>(projected_last_update);
- });
- });
- });
+ peering_state.get_pg_committed_to());
+
+
+ set<pg_shard_t> waiting_on;
+ for (const auto &peer: get_acting_recovery_backfill()) {
+ if (peer == pg_whoami) {
+ continue;
+ }
+ ceph_assert(peering_state.get_peer_missing().count(peer));
+ ceph_assert(peering_state.has_peer_info(peer));
+ auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+ log_entries,
+ spg_t(peering_state.get_info().pgid.pgid, peer.shard),
+ pg_whoami.shard,
+ get_osdmap_epoch(),
+ get_last_peering_reset(),
+ rep_tid,
+ peering_state.get_pg_trim_to(),
+ peering_state.get_pg_committed_to());
+ waiting_on.insert(peer);
+
+ DEBUGDPP("sending log missing_request (rep_tid: {} entries: {}) to osd {}",
+ *this, rep_tid, log_entries, peer.osd);
+ co_await interruptor::make_interruptible(
+ shard_services.send_to_osd(
+ peer.osd,
+ std::move(log_m),
+ get_osdmap_epoch()));
+ }
+ waiting_on.insert(pg_whoami);
+ DEBUGDPP("inserting rep_tid {}", *this, rep_tid);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid,
+ log_update_t{std::move(waiting_on)}));
+ co_await interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(
+ get_collection_ref(), std::move(t)
+ ));
+
+ peering_state.update_trim_to();
+ co_return projected_last_update;
}
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
-PG::do_osd_ops(
- Ref<MOSDOp> m,
- crimson::net::ConnectionXcoreRef conn,
+PG::run_executer_fut PG::run_executer(
+ OpsExecuter &ox,
ObjectContextRef obc,
const OpInfo &op_info,
- const SnapContext& snapc)
+ std::vector<OSDOp>& ops)
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
- return do_osd_ops_execute<MURef<MOSDOpReply>>(
- seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, obc, op_info, *m, conn, snapc),
- obc,
- op_info,
- m,
- m->ops,
- // success_func
- [this, m, obc, may_write = op_info.may_write(),
- may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
- // TODO: should stop at the first op which returns a negative retval,
- // cmpext uses it for returning the index of first unmatched byte
- int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
- if (may_read && result >= 0) {
- for (auto &osdop : m->ops) {
- if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
- result = osdop.rval.code;
- break;
- }
- }
- } else if (result > 0 && may_write && !rvec) {
- result = 0;
- } else if (result < 0 && (m->ops.empty() ?
- 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
- result = 0;
- }
- auto reply = crimson::make_message<MOSDOpReply>(m.get(),
- result,
- get_osdmap_epoch(),
- 0,
- false);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- logger().debug(
- "do_osd_ops: {} - object {} sending reply",
- *m,
- m->get_hobj());
- if (obc->obs.exists) {
- reply->set_reply_versions(peering_state.get_info().last_update,
- obc->obs.oi.user_version);
- } else {
- reply->set_reply_versions(peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- }
- return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
- std::move(reply));
- },
- // failure_func
- [m, this]
- (const std::error_code& e) {
- logger().error("do_osd_ops_execute::failure_func {} got error: {}",
- *m, e);
- return log_reply(m, e);
+ LOG_PREFIX(PG::run_executer);
+ auto rollbacker = ox.create_rollbacker(
+ [stored_obc=duplicate_obc(obc)](auto &obc) mutable {
+ obc->update_from(*stored_obc);
+ });
+ auto rollback_on_error = seastar::defer([&rollbacker] {
+ rollbacker.rollback_obc_if_modified();
});
-}
-PG::do_osd_ops_iertr::future<MURef<MOSDOpReply>>
-PG::log_reply(
- Ref<MOSDOp> m,
- const std::error_code& e)
-{
- auto reply = crimson::make_message<MOSDOpReply>(
- m.get(), -e.value(), get_osdmap_epoch(), 0, false);
- if (m->ops.empty() ? 0 :
- m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
- reply->set_result(0);
- }
- // For all ops except for CMPEXT, the correct error value is encoded
- // in e.value(). For CMPEXT, osdop.rval has the actual error value.
- if (e.value() == ct_error::cmp_fail_error_value) {
- assert(!m->ops.empty());
- for (auto &osdop : m->ops) {
- if (osdop.rval < 0) {
- reply->set_result(osdop.rval);
- break;
+ for (auto &op: ops) {
+ DEBUGDPP("object {} handle op {}", *this, ox.get_target(), op);
+ co_await ox.execute_op(op);
+ }
+ DEBUGDPP("object {} all operations successful", *this, ox.get_target());
+
+ // check for full
+ if ((ox.delta_stats.num_bytes > 0 ||
+ ox.delta_stats.num_objects > 0) &&
+ get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
+ const auto& m = ox.get_message();
+ if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now
+ m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
+ INFODPP("full, but proceeding due to FULL_FORCE, or MDS", *this);
+ } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
+ // they tried, they failed.
+ INFODPP("full, replying to FULL_TRY op", *this);
+ if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) {
+ co_await run_executer_fut(
+ crimson::ct_error::edquot::make());
+ } else {
+ co_await run_executer_fut(
+ crimson::ct_error::enospc::make());
}
+ } else {
+ // drop request
+ INFODPP("full, dropping request (bad client)", *this);
+ co_await run_executer_fut(
+ crimson::ct_error::eagain::make());
}
}
- reply->set_enoent_reply_versions(
- peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
- std::move(reply));
+ rollback_on_error.cancel();
}
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
-PG::do_osd_ops(
- ObjectContextRef obc,
- std::vector<OSDOp>& ops,
- const OpInfo &op_info,
- const do_osd_ops_params_t &&msg_params)
-{
- // This overload is generally used for internal client requests,
- // use an empty SnapContext.
- return seastar::do_with(
- std::move(msg_params),
- [=, this, &ops, &op_info](auto &msg_params) {
- return do_osd_ops_execute<void>(
- seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this},
- obc,
- op_info,
- msg_params,
- msg_params.get_connection(),
- SnapContext{}
- ),
- obc,
- op_info,
- Ref<MOSDOp>(),
- ops,
- // success_func
- [] {
- return do_osd_ops_iertr::now();
- },
- // failure_func
- [] (const std::error_code& e) {
- return do_osd_ops_iertr::now();
- });
+PG::submit_executer_fut PG::submit_executer(
+ OpsExecuter &&ox,
+ const std::vector<OSDOp>& ops) {
+ LOG_PREFIX(PG::submit_executer);
+ DEBUGDPP("", *this);
+
+ // we need to build the pg log entries and submit the transaction
+ // atomically to ensure log ordering
+ co_await interruptor::make_interruptible(submit_lock.lock());
+ auto unlocker = seastar::defer([this] {
+ submit_lock.unlock();
});
+
+ auto [submitted, completed] = co_await std::move(
+ ox
+ ).flush_changes_and_submit(
+ ops,
+ snap_mapper,
+ osdriver
+ );
+ co_return std::make_tuple(
+ std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}),
+ std::move(completed));
}
PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
@@ -1414,31 +1217,6 @@ void PG::check_blocklisted_obc_watchers(
}
}
-PG::load_obc_iertr::future<>
-PG::with_locked_obc(const hobject_t &hobj,
- const OpInfo &op_info,
- with_obc_func_t &&f)
-{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
- const hobject_t oid = get_oid(hobj);
- auto wrapper = [f=std::move(f), this](auto head, auto obc) {
- check_blocklisted_obc_watchers(obc);
- return f(head, obc);
- };
- switch (get_lock_type(op_info)) {
- case RWState::RWREAD:
- return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper));
- case RWState::RWWRITE:
- return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper));
- case RWState::RWEXCL:
- return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper));
- default:
- ceph_abort();
- };
-}
-
void PG::update_stats(const pg_stat_t &stat) {
peering_state.update_stats(
[&stat] (auto& history, auto& stats) {
@@ -1448,13 +1226,10 @@ void PG::update_stats(const pg_stat_t &stat) {
);
}
-PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req)
{
LOG_PREFIX(PG::handle_rep_op);
DEBUGDPP("{}", *this, *req);
- if (can_discard_replica_op(*req)) {
- co_return;
- }
ceph::os::Transaction txn;
auto encoded_txn = req->get_data().cbegin();
@@ -1471,12 +1246,13 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
log_operation(std::move(log_entries),
req->pg_trim_to,
req->version,
- req->min_last_complete_ondisk,
+ req->pg_committed_to,
!txn.empty(),
txn,
false);
DEBUGDPP("{} do_transaction", *this, *req);
- co_await interruptor::make_interruptible(
+
+ auto commit_fut = interruptor::make_interruptible(
shard_services.get_store().do_transaction(coll_ref, std::move(txn))
);
@@ -1487,10 +1263,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
req.get(), pg_whoami, 0,
map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(lcod);
- co_await interruptor::make_interruptible(
- shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch)
- );
- co_return;
+ co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply));
}
PG::interruptible_future<> PG::update_snap_map(
@@ -1517,28 +1290,25 @@ void PG::log_operation(
std::vector<pg_log_entry_t>&& logv,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
- const eversion_t &min_last_complete_ondisk,
+ const eversion_t &pg_committed_to,
bool transaction_applied,
ObjectStore::Transaction &txn,
bool async) {
- logger().debug("{}", __func__);
+ LOG_PREFIX(PG::log_operation);
+ DEBUGDPP("", *this);
if (is_primary()) {
- ceph_assert(trim_to <= peering_state.get_last_update_ondisk());
+ ceph_assert(trim_to <= peering_state.get_pg_committed_to());
}
- /* TODO: when we add snap mapper and projected log support,
- * we'll likely want to update them here.
- *
- * See src/osd/PrimaryLogPG.h:log_operation for how classic
- * handles these cases.
- */
-#if 0
auto last = logv.rbegin();
if (is_primary() && last != logv.rend()) {
+ DEBUGDPP("on primary, trimming projected log", *this);
projected_log.skip_can_rollback_to_to_head();
- projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
+ projected_log.trim(shard_services.get_cct(), last->version,
+ nullptr, nullptr, nullptr);
}
-#endif
+
if (!is_primary()) { // && !is_ec_pg()
+ DEBUGDPP("on replica, clearing obc", *this);
replica_clear_repop_obc(logv);
}
if (!logv.empty()) {
@@ -1547,7 +1317,7 @@ void PG::log_operation(
peering_state.append_log(std::move(logv),
trim_to,
roll_forward_to,
- min_last_complete_ondisk,
+ pg_committed_to,
txn,
!txn.empty(),
false);
@@ -1555,13 +1325,13 @@ void PG::log_operation(
void PG::replica_clear_repop_obc(
const std::vector<pg_log_entry_t> &logv) {
- logger().debug("{} clearing {} entries", __func__, logv.size());
- for (auto &&e: logv) {
- logger().debug(" {} get_object_boundary(from): {} "
- " head version(to): {}",
- e.soid,
- e.soid.get_object_boundary(),
- e.soid.get_head());
+ LOG_PREFIX(PG::replica_clear_repop_obc);
+ DEBUGDPP("clearing obc for {} log entries", logv.size());
+ for (auto &&e: logv) {
+ DEBUGDPP("clearing entry for {} from: {} to: {}",
+ e.soid,
+ e.soid.get_object_boundary(),
+ e.soid.get_head());
/* Have to blast all clones, they share a snapset */
obc_registry.clear_range(
e.soid.get_object_boundary(), e.soid.get_head());
@@ -1586,17 +1356,17 @@ PG::interruptible_future<> PG::do_update_log_missing(
ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
- std::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ std::optional<eversion_t> op_trim_to, op_pg_committed_to;
if (m->pg_trim_to != eversion_t())
op_trim_to = m->pg_trim_to;
- if (m->pg_roll_forward_to != eversion_t())
- op_roll_forward_to = m->pg_roll_forward_to;
- logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
+ if (m->pg_committed_to != eversion_t())
+ op_pg_committed_to = m->pg_committed_to;
+ logger().debug("op_trim_to = {}, op_pg_committed_to = {}",
op_trim_to.has_value() ? *op_trim_to : eversion_t(),
- op_roll_forward_to.has_value() ? *op_roll_forward_to : eversion_t());
+ op_pg_committed_to.has_value() ? *op_pg_committed_to : eversion_t());
peering_state.append_log_entries_update_missing(
- m->entries, t, op_trim_to, op_roll_forward_to);
+ m->entries, t, op_trim_to, op_pg_committed_to);
return interruptor::make_interruptible(shard_services.get_store().do_transaction(
coll_ref, std::move(t))).then_interruptible(
@@ -1814,14 +1584,21 @@ bool PG::should_send_op(
return true;
bool should_send =
(hoid.pool != (int64_t)get_info().pgid.pool() ||
- (has_backfill_state() && hoid <= get_last_backfill_started()) ||
- hoid <= peering_state.get_peer_info(peer).last_backfill);
+ // An object has been fully pushed to the backfill target if and only if
+ // either of the following conditions is met:
+ // 1. peer_info.last_backfill has passed "hoid"
+ // 2. last_backfill_started has passed "hoid" and "hoid" is not in the peer
+ // missing set
+ hoid <= peering_state.get_peer_info(peer).last_backfill ||
+ (has_backfill_state() && hoid <= get_last_backfill_started() &&
+ !is_missing_on_peer(peer, hoid)));
if (!should_send) {
ceph_assert(is_backfill_target(peer));
logger().debug("{} issue_repop shipping empty opt to osd."
"{}, object {} beyond std::max(last_backfill_started, "
"peer_info[peer].last_backfill {})",
- peer, hoid, peering_state.get_peer_info(peer).last_backfill);
+ __func__, peer, hoid,
+ peering_state.get_peer_info(peer).last_backfill);
}
return should_send;
// TODO: should consider async recovery cases in the future which are not supported
@@ -1836,8 +1613,8 @@ PG::already_complete(const osd_reqid_t& reqid)
int ret;
std::vector<pg_log_op_return_item_t> op_returns;
- if (peering_state.get_pg_log().get_log().get_request(
- reqid, &version, &user_version, &ret, &op_returns)) {
+ if (check_in_progress_op(
+ reqid, &version, &user_version, &ret, &op_returns)) {
complete_op_t dupinfo{
user_version,
version,
@@ -1902,4 +1679,19 @@ void PG::C_PG_FinishRecovery::finish(int r) {
DEBUGDPP("stale recovery finsher", pg);
}
}
+bool PG::check_in_progress_op(
+ const osd_reqid_t& reqid,
+ eversion_t *version,
+ version_t *user_version,
+ int *return_code,
+ std::vector<pg_log_op_return_item_t> *op_returns
+ ) const
+{
+ return (
+ projected_log.get_request(reqid, version, user_version, return_code,
+ op_returns) ||
+ peering_state.get_pg_log().get_log().get_request(
+ reqid, version, user_version, return_code, op_returns));
+}
+
}