summaryrefslogtreecommitdiffstats
path: root/src/crimson
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson')
-rw-r--r--src/crimson/os/alienstore/CMakeLists.txt1
-rw-r--r--src/crimson/os/seastore/async_cleaner.h4
-rw-r--r--src/crimson/os/seastore/btree/fixed_kv_btree.h3
-rw-r--r--src/crimson/os/seastore/cache.h48
-rw-r--r--src/crimson/os/seastore/cached_extent.h18
-rw-r--r--src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc2
-rw-r--r--src/crimson/os/seastore/transaction.h27
-rw-r--r--src/crimson/os/seastore/transaction_manager.cc2
-rw-r--r--src/crimson/osd/osd_operations/background_recovery.cc7
-rw-r--r--src/crimson/osd/osd_operations/client_request.cc33
-rw-r--r--src/crimson/osd/osd_operations/client_request_common.cc15
-rw-r--r--src/crimson/osd/osd_operations/client_request_common.h2
-rw-r--r--src/crimson/osd/osd_operations/internal_client_request.cc10
-rw-r--r--src/crimson/osd/pg.cc51
-rw-r--r--src/crimson/osd/pg.h11
-rw-r--r--src/crimson/osd/pg_recovery.cc1
-rw-r--r--src/crimson/osd/pg_shard_manager.cc6
-rw-r--r--src/crimson/osd/recovery_backend.cc32
-rw-r--r--src/crimson/osd/recovery_backend.h53
-rw-r--r--src/crimson/osd/replicated_backend.cc9
20 files changed, 269 insertions, 66 deletions
diff --git a/src/crimson/os/alienstore/CMakeLists.txt b/src/crimson/os/alienstore/CMakeLists.txt
index e446d4099f7..0fa69984c53 100644
--- a/src/crimson/os/alienstore/CMakeLists.txt
+++ b/src/crimson/os/alienstore/CMakeLists.txt
@@ -50,6 +50,7 @@ set(alien_store_srcs
${PROJECT_SOURCE_DIR}/src/os/bluestore/Allocator.cc
${PROJECT_SOURCE_DIR}/src/os/bluestore/AvlAllocator.cc
${PROJECT_SOURCE_DIR}/src/os/bluestore/BtreeAllocator.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/Btree2Allocator.cc
${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapFreelistManager.cc
${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueFS.cc
${PROJECT_SOURCE_DIR}/src/os/bluestore/bluefs_types.cc
diff --git a/src/crimson/os/seastore/async_cleaner.h b/src/crimson/os/seastore/async_cleaner.h
index 485497b9a9c..424247c5bdc 100644
--- a/src/crimson/os/seastore/async_cleaner.h
+++ b/src/crimson/os/seastore/async_cleaner.h
@@ -350,12 +350,12 @@ public:
sea_time_point modify_time) = 0;
/**
- * get_extent_if_live
+ * get_extents_if_live
*
* Returns extent at specified location if still referenced by
* lba_manager and not removed by t.
*
- * See TransactionManager::get_extent_if_live and
+ * See TransactionManager::get_extents_if_live and
* LBAManager::get_physical_extent_if_live.
*/
using get_extents_if_live_iertr = base_iertr;
diff --git a/src/crimson/os/seastore/btree/fixed_kv_btree.h b/src/crimson/os/seastore/btree/fixed_kv_btree.h
index 097072a7434..0671b55a4fb 100644
--- a/src/crimson/os/seastore/btree/fixed_kv_btree.h
+++ b/src/crimson/os/seastore/btree/fixed_kv_btree.h
@@ -1383,6 +1383,9 @@ private:
"looking up root on {}",
c.trans,
*root_block);
+
+ // checking the lba root node must be atomic with creating
+ // and linking the absent root node
auto [found, fut] = get_root_node(c);
auto on_found_internal =
diff --git a/src/crimson/os/seastore/cache.h b/src/crimson/os/seastore/cache.h
index 0e4e14f5574..4441df86d4e 100644
--- a/src/crimson/os/seastore/cache.h
+++ b/src/crimson/os/seastore/cache.h
@@ -437,6 +437,11 @@ public:
* Mostly the same as Cache::get_extent(), with the only difference
* that get_absent_extent won't search the transaction's context for
* the specific CachedExtent
+ *
+ * The extent in query is supposed to be absent in Cache.
+ *
+ * User is responsible to call get_extent_viewable_by_trans()
+ * *atomically* prior to call this method.
*/
template <typename T>
get_extent_iertr::future<TCachedExtentRef<T>> get_absent_extent(
@@ -471,18 +476,46 @@ public:
CachedExtentRef extent)
{
assert(extent->is_valid());
- auto p_extent = extent->get_transactional_view(t);
- if (!p_extent->is_pending_in_trans(t.get_trans_id())) {
- t.add_to_read_set(p_extent);
- if (!p_extent->is_mutation_pending()) {
- touch_extent(*p_extent);
+ CachedExtent* p_extent;
+ if (extent->is_stable()) {
+ p_extent = extent->get_transactional_view(t);
+ if (p_extent != extent.get()) {
+ assert(!extent->is_stable_writting());
+ assert(p_extent->is_pending_in_trans(t.get_trans_id()));
+ assert(!p_extent->is_stable_writting());
+ if (p_extent->is_mutable()) {
+ assert(p_extent->is_fully_loaded());
+ assert(!p_extent->is_pending_io());
+ return get_extent_ertr::make_ready_future<CachedExtentRef>(
+ CachedExtentRef(p_extent));
+ } else {
+ assert(p_extent->is_exist_clean());
+ }
+ } else {
+ // stable from trans-view
+ assert(!p_extent->is_pending_in_trans(t.get_trans_id()));
+ if (t.maybe_add_to_read_set(p_extent)) {
+ touch_extent(*p_extent);
+ }
+ }
+ } else {
+ assert(!extent->is_stable_writting());
+ assert(extent->is_pending_in_trans(t.get_trans_id()));
+ if (extent->is_mutable()) {
+ assert(extent->is_fully_loaded());
+ assert(!extent->is_pending_io());
+ return get_extent_ertr::make_ready_future<CachedExtentRef>(extent);
+ } else {
+ assert(extent->is_exist_clean());
+ p_extent = extent.get();
}
}
+
+ assert(p_extent->is_stable() || p_extent->is_exist_clean());
// user should not see RETIRED_PLACEHOLDER extents
ceph_assert(p_extent->get_type() != extent_types_t::RETIRED_PLACEHOLDER);
if (!p_extent->is_fully_loaded()) {
assert(!p_extent->is_mutable());
- touch_extent(*p_extent);
LOG_PREFIX(Cache::get_extent_viewable_by_trans);
SUBDEBUG(seastore_cache,
"{} {}~{} is present without been fully loaded, reading ... -- {}",
@@ -784,6 +817,9 @@ public:
* and read in the extent at location offset~length.
*
* The extent in query is supposed to be absent in Cache.
+ *
+ * User is responsible to call get_extent_viewable_by_trans()
+ * *atomically* prior to call this method.
*/
template <typename Func>
get_extent_by_type_ret get_absent_extent_by_type(
diff --git a/src/crimson/os/seastore/cached_extent.h b/src/crimson/os/seastore/cached_extent.h
index 1d6ffc740cb..e5e32b6713b 100644
--- a/src/crimson/os/seastore/cached_extent.h
+++ b/src/crimson/os/seastore/cached_extent.h
@@ -343,6 +343,8 @@ public:
: "nullopt";
out << "CachedExtent(addr=" << this
<< ", type=" << get_type()
+ << ", trans=" << pending_for_transaction
+ << ", pending_io=" << is_pending_io()
<< ", version=" << version
<< ", dirty_from_or_retired_at=" << dirty_from_or_retired_at
<< ", modify_time=" << sea_time_point_printer_t{modify_time}
@@ -440,14 +442,18 @@ public:
state == extent_state_t::DIRTY;
}
+ bool is_stable_writting() const {
+ // MUTATION_PENDING and under-io extents are already stable and visible,
+ // see prepare_record().
+ //
+ // XXX: It might be good to mark this case as DIRTY from the definition,
+ // which probably can make things simpler.
+ return is_mutation_pending() && is_pending_io();
+ }
+
/// Returns true if extent is stable and shared among transactions
bool is_stable() const {
- return is_stable_written() ||
- // MUTATION_PENDING and under-io extents are to-be-stable extents,
- // for the sake of caveats that checks the correctness of extents
- // states, we consider them stable.
- (is_mutation_pending() &&
- is_pending_io());
+ return is_stable_written() || is_stable_writting();
}
bool is_data_stable() const {
diff --git a/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc b/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc
index f3e37ff9bef..80bfd7a2e20 100644
--- a/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc
+++ b/src/crimson/os/seastore/lba_manager/btree/lba_btree_node.cc
@@ -83,7 +83,7 @@ BtreeLBAMappingRef LBALeafNode::get_mapping(
this,
iter.get_offset(),
val,
- lba_node_meta_t{laddr, val.len, 0});
+ lba_node_meta_t{laddr, laddr + val.len, 0});
}
}
diff --git a/src/crimson/os/seastore/transaction.h b/src/crimson/os/seastore/transaction.h
index d7cdb30098a..2fa468fb48e 100644
--- a/src/crimson/os/seastore/transaction.h
+++ b/src/crimson/os/seastore/transaction.h
@@ -137,14 +137,37 @@ public:
}
}
+ // Returns true if added, false if already added or weak
+ bool maybe_add_to_read_set(CachedExtentRef ref) {
+ if (is_weak()) {
+ return false;
+ }
+
+ assert(ref->is_valid());
+
+ auto it = ref->transactions.lower_bound(
+ this, read_set_item_t<Transaction>::trans_cmp_t());
+ if (it != ref->transactions.end() && it->t == this) {
+ return false;
+ }
+
+ auto [iter, inserted] = read_set.emplace(this, ref);
+ ceph_assert(inserted);
+ ref->transactions.insert_before(
+ it, const_cast<read_set_item_t<Transaction>&>(*iter));
+ return true;
+ }
+
void add_to_read_set(CachedExtentRef ref) {
- if (is_weak()) return;
+ if (is_weak()) {
+ return;
+ }
assert(ref->is_valid());
auto it = ref->transactions.lower_bound(
this, read_set_item_t<Transaction>::trans_cmp_t());
- if (it != ref->transactions.end() && it->t == this) return;
+ assert(it == ref->transactions.end() || it->t != this);
auto [iter, inserted] = read_set.emplace(this, ref);
ceph_assert(inserted);
diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc
index b9768909735..1b309a03683 100644
--- a/src/crimson/os/seastore/transaction_manager.cc
+++ b/src/crimson/os/seastore/transaction_manager.cc
@@ -680,7 +680,7 @@ TransactionManager::get_extents_if_live(
laddr_t laddr,
extent_len_t len)
{
- LOG_PREFIX(TransactionManager::get_extent_if_live);
+ LOG_PREFIX(TransactionManager::get_extents_if_live);
TRACET("{} {}~{} {}", t, type, laddr, len, paddr);
// This only works with segments to check if alive,
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc
index 1fe1e06ef85..509d4c4a484 100644
--- a/src/crimson/osd/osd_operations/background_recovery.cc
+++ b/src/crimson/osd/osd_operations/background_recovery.cc
@@ -87,13 +87,6 @@ seastar::future<> BackgroundRecoveryT<T>::start()
}, [](std::exception_ptr) {
return seastar::make_ready_future<bool>(false);
}, pg, epoch_started);
- }).handle_exception_type([ref, this](const std::system_error& err) {
- LOG_PREFIX(BackgroundRecoveryT<T>::start);
- if (err.code() == std::make_error_code(std::errc::interrupted)) {
- DEBUGDPPI("recovery interruped: {}", *pg, err.what());
- return seastar::now();
- }
- return seastar::make_exception_future<>(err);
});
});
});
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc
index 33d0f4c0f41..bb314bad4f3 100644
--- a/src/crimson/osd/osd_operations/client_request.cc
+++ b/src/crimson/osd/osd_operations/client_request.cc
@@ -23,10 +23,18 @@ namespace crimson::osd {
void ClientRequest::Orderer::requeue(Ref<PG> pg)
{
LOG_PREFIX(ClientRequest::Orderer::requeue);
- for (auto &req: list) {
- DEBUGDPP("requeueing {}", *pg, req);
- req.reset_instance_handle();
- std::ignore = req.with_pg_process(pg);
+ std::list<ClientRequest*> to_requeue;
+ for (auto &req : list) {
+ to_requeue.emplace_back(&req);
+ }
+ // Client requests might be destroyed in the following
+ // iteration leading to short lived dangling pointers
+ // to those requests, but this doesn't hurt as we won't
+ // dereference those dangling pointers.
+ for (auto req: to_requeue) {
+ DEBUGDPP("requeueing {}", *pg, *req);
+ req->reset_instance_handle();
+ std::ignore = req->with_pg_process(pg);
}
}
@@ -286,6 +294,7 @@ ClientRequest::recover_missing_snaps(
ObjectContextRef head,
std::set<snapid_t> &snaps)
{
+ LOG_PREFIX(ClientRequest::process_op);
co_await ihref.enter_stage<interruptor>(
client_pp(*pg).recover_missing_snaps, *this);
for (auto &snap : snaps) {
@@ -299,7 +308,12 @@ ClientRequest::recover_missing_snaps(
* we skip the oid as there is no corresponding clone to recover.
* See https://tracker.ceph.com/issues/63821 */
if (oid) {
- co_await do_recover_missing(pg, *oid, m->get_reqid());
+ auto unfound = co_await do_recover_missing(pg, *oid, m->get_reqid());
+ if (unfound) {
+ DEBUGDPP("{} unfound, hang it for now", *pg, m->get_hobj().get_head());
+ co_await interruptor::make_interruptible(
+ pg->get_recovery_backend()->add_unfound(m->get_hobj().get_head()));
+ }
}
}
}
@@ -317,7 +331,14 @@ ClientRequest::process_op(
"Skipping recover_missings on non primary pg for soid {}",
*pg, m->get_hobj());
} else {
- co_await do_recover_missing(pg, m->get_hobj().get_head(), m->get_reqid());
+ auto unfound = co_await do_recover_missing(
+ pg, m->get_hobj().get_head(), m->get_reqid());
+ if (unfound) {
+ DEBUGDPP("{} unfound, hang it for now", *pg, m->get_hobj().get_head());
+ co_await interruptor::make_interruptible(
+ pg->get_recovery_backend()->add_unfound(m->get_hobj().get_head()));
+ }
+
std::set<snapid_t> snaps = snaps_need_to_recover();
if (!snaps.empty()) {
// call with_obc() in order, but wait concurrently for loading.
diff --git a/src/crimson/osd/osd_operations/client_request_common.cc b/src/crimson/osd/osd_operations/client_request_common.cc
index c4439d5bb35..a56d58d2066 100644
--- a/src/crimson/osd/osd_operations/client_request_common.cc
+++ b/src/crimson/osd/osd_operations/client_request_common.cc
@@ -13,7 +13,7 @@ namespace {
namespace crimson::osd {
-typename InterruptibleOperation::template interruptible_future<>
+typename InterruptibleOperation::template interruptible_future<bool>
CommonClientRequest::do_recover_missing(
Ref<PG> pg,
const hobject_t& soid,
@@ -45,22 +45,29 @@ CommonClientRequest::do_recover_missing(
if (!needs_recovery_or_backfill) {
logger().debug("{} reqid {} nothing to recover {}",
__func__, reqid, soid);
- return seastar::now();
+ return seastar::make_ready_future<bool>(false);
}
+ if (pg->get_peering_state().get_missing_loc().is_unfound(soid)) {
+ return seastar::make_ready_future<bool>(true);
+ }
logger().debug("{} reqid {} need to wait for recovery, {} version {}",
__func__, reqid, soid, ver);
if (pg->get_recovery_backend()->is_recovering(soid)) {
logger().debug("{} reqid {} object {} version {}, already recovering",
__func__, reqid, soid, ver);
- return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered();
+ return pg->get_recovery_backend()->get_recovering(
+ soid).wait_for_recovered(
+ ).then([] {
+ return seastar::make_ready_future<bool>(false);
+ });
} else {
logger().debug("{} reqid {} object {} version {}, starting recovery",
__func__, reqid, soid, ver);
auto [op, fut] =
pg->get_shard_services().start_operation<UrgentRecovery>(
soid, ver, pg, pg->get_shard_services(), pg->get_osdmap_epoch());
- return std::move(fut);
+ return fut.then([] { return seastar::make_ready_future<bool>(false); });
}
}
diff --git a/src/crimson/osd/osd_operations/client_request_common.h b/src/crimson/osd/osd_operations/client_request_common.h
index 85f118d64c1..951bf653799 100644
--- a/src/crimson/osd/osd_operations/client_request_common.h
+++ b/src/crimson/osd/osd_operations/client_request_common.h
@@ -11,7 +11,7 @@ namespace crimson::osd {
struct CommonClientRequest {
- static InterruptibleOperation::template interruptible_future<>
+ static InterruptibleOperation::template interruptible_future<bool>
do_recover_missing(
Ref<PG> pg,
const hobject_t& soid,
diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc
index 22d7f3e492a..2968a6f4385 100644
--- a/src/crimson/osd/osd_operations/internal_client_request.cc
+++ b/src/crimson/osd/osd_operations/internal_client_request.cc
@@ -70,7 +70,12 @@ seastar::future<> InternalClientRequest::start()
client_pp().recover_missing);
}).then_interruptible([this] {
return do_recover_missing(pg, get_target_oid(), osd_reqid_t());
- }).then_interruptible([this] {
+ }).then_interruptible([this](bool unfound) {
+ if (unfound) {
+ throw std::system_error(
+ std::make_error_code(std::errc::operation_canceled),
+ fmt::format("{} is unfound, drop it!", get_target_oid()));
+ }
return enter_stage<interruptor>(
client_pp().get_obc);
}).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
@@ -128,6 +133,9 @@ seastar::future<> InternalClientRequest::start()
}, pg, start_epoch);
}).then([this] {
track_event<CompletionEvent>();
+ }).handle_exception_type([](std::system_error &error) {
+ logger().debug("error {}, message: {}", error.code(), error.what());
+ return seastar::now();
}).finally([this] {
logger().debug("{}: exit", *this);
handle.exit();
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc
index c195241cc5d..bb249b8e10e 100644
--- a/src/crimson/osd/pg.cc
+++ b/src/crimson/osd/pg.cc
@@ -513,6 +513,35 @@ Context *PG::on_clean()
return nullptr;
}
+seastar::future<> PG::clear_temp_objects()
+{
+ logger().info("{} {}", __func__, pgid);
+ ghobject_t _next;
+ ceph::os::Transaction t;
+ auto max_size = local_conf()->osd_target_transaction_size;
+ while(true) {
+ auto [objs, next] = co_await shard_services.get_store().list_objects(
+ coll_ref, _next, ghobject_t::get_max(), max_size);
+ if (objs.empty()) {
+ if (!t.empty()) {
+ co_await shard_services.get_store().do_transaction(
+ coll_ref, std::move(t));
+ }
+ break;
+ }
+ for (auto &obj : objs) {
+ if (obj.hobj.is_temp()) {
+ t.remove(coll_ref->get_cid(), obj);
+ }
+ }
+ _next = next;
+ if (t.get_num_ops() >= max_size) {
+ co_await shard_services.get_store().do_transaction(
+ coll_ref, t.claim_and_reset());
+ }
+ }
+}
+
PG::interruptible_future<seastar::stop_iteration> PG::trim_snap(
snapid_t to_trim,
bool needs_pause)
@@ -1698,6 +1727,28 @@ bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
return false;
}
+bool PG::should_send_op(
+ pg_shard_t peer,
+ const hobject_t &hoid) const
+{
+ if (peer == get_primary())
+ return true;
+ bool should_send =
+ (hoid.pool != (int64_t)get_info().pgid.pool() ||
+ (has_backfill_state() && hoid <= get_last_backfill_started()) ||
+ hoid <= peering_state.get_peer_info(peer).last_backfill);
+ if (!should_send) {
+ ceph_assert(is_backfill_target(peer));
+ logger().debug("{} issue_repop shipping empty opt to osd."
+ "{}, object {} beyond std::max(last_backfill_started, "
+ "peer_info[peer].last_backfill {})",
+ peer, hoid, peering_state.get_peer_info(peer).last_backfill);
+ }
+ return should_send;
+ // TODO: should consider async recovery cases in the future which are not supported
+ // by crimson yet
+}
+
PG::interruptible_future<std::optional<PG::complete_op_t>>
PG::already_complete(const osd_reqid_t& reqid)
{
diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h
index 252709dea4d..6810803867f 100644
--- a/src/crimson/osd/pg.h
+++ b/src/crimson/osd/pg.h
@@ -521,6 +521,7 @@ public:
bool get_need_up_thru() const {
return peering_state.get_need_up_thru();
}
+ bool should_send_op(pg_shard_t peer, const hobject_t &hoid) const;
epoch_t get_same_interval_since() const {
return get_info().history.same_interval_since;
}
@@ -603,6 +604,7 @@ public:
ObjectContextRef obc,
const std::error_code e,
ceph_tid_t rep_tid);
+ seastar::future<> clear_temp_objects();
private:
@@ -740,6 +742,15 @@ public:
PeeringState& get_peering_state() final {
return peering_state;
}
+ bool has_backfill_state() const {
+ return (bool)(recovery_handler->backfill_state);
+ }
+ const BackfillState& get_backfill_state() const {
+ return *recovery_handler->backfill_state;
+ }
+ hobject_t get_last_backfill_started() const {
+ return get_backfill_state().get_last_backfill_started();
+ }
bool has_reset_since(epoch_t epoch) const final {
return peering_state.pg_has_reset_since(epoch);
}
diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc
index eeadacd21c2..09c93ed341e 100644
--- a/src/crimson/osd/pg_recovery.cc
+++ b/src/crimson/osd/pg_recovery.cc
@@ -433,6 +433,7 @@ void PGRecovery::on_global_recover (
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
recovery_waiter.set_recovered();
pg->get_recovery_backend()->remove_recovering(soid);
+ pg->get_recovery_backend()->found_and_remove(soid);
}
void PGRecovery::on_failed_recover(
diff --git a/src/crimson/osd/pg_shard_manager.cc b/src/crimson/osd/pg_shard_manager.cc
index a32a1581adc..0d041e91e5e 100644
--- a/src/crimson/osd/pg_shard_manager.cc
+++ b/src/crimson/osd/pg_shard_manager.cc
@@ -35,8 +35,10 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
pgid
).then([pgid, &per_shard_state](auto &&pg) {
logger().info("load_pgs: loaded {}", pgid);
- per_shard_state.pg_map.pg_loaded(pgid, std::move(pg));
- return seastar::now();
+ return pg->clear_temp_objects(
+ ).then([&per_shard_state, pg, pgid] {
+ per_shard_state.pg_map.pg_loaded(pgid, std::move(pg));
+ });
});
});
});
diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc
index c55d3150850..1148585ad25 100644
--- a/src/crimson/osd/recovery_backend.cc
+++ b/src/crimson/osd/recovery_backend.cc
@@ -43,7 +43,7 @@ void RecoveryBackend::clear_temp_obj(const hobject_t &oid)
}
void RecoveryBackend::clean_up(ceph::os::Transaction& t,
- std::string_view why)
+ interrupt_cause_t why)
{
for_each_temp_obj([&](auto &soid) {
t.remove(pg.get_collection_ref()->get_cid(),
@@ -65,6 +65,36 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t,
recovering.clear();
}
+void RecoveryBackend::WaitForObjectRecovery::interrupt(interrupt_cause_t why) {
+ switch(why) {
+ case interrupt_cause_t::INTERVAL_CHANGE:
+ if (readable) {
+ readable->set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ readable.reset();
+ }
+ if (recovered) {
+ recovered->set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ recovered.reset();
+ }
+ if (pulled) {
+ pulled->set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ pulled.reset();
+ }
+ for (auto& [pg_shard, pr] : pushes) {
+ pr.set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ }
+ pushes.clear();
+ break;
+ default:
+ ceph_abort("impossible");
+ break;
+ }
+}
+
void RecoveryBackend::WaitForObjectRecovery::stop() {
if (readable) {
readable->set_exception(
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h
index 179cfbabd08..b404b79751e 100644
--- a/src/crimson/osd/recovery_backend.h
+++ b/src/crimson/osd/recovery_backend.h
@@ -45,10 +45,22 @@ public:
backend{backend} {}
virtual ~RecoveryBackend() {}
std::pair<WaitForObjectRecovery&, bool> add_recovering(const hobject_t& soid) {
- auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{});
+ auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery(pg));
assert(it->second);
return {*(it->second), added};
}
+ seastar::future<> add_unfound(const hobject_t &soid) {
+ auto [it, added] = unfound.emplace(soid, seastar::shared_promise());
+ return it->second.get_shared_future();
+ }
+ void found_and_remove(const hobject_t &soid) {
+ auto it = unfound.find(soid);
+ if (it != unfound.end()) {
+ auto &found_promise = it->second;
+ found_promise.set_value();
+ unfound.erase(it);
+ }
+ }
WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
assert(is_recovering(soid));
return *(recovering.at(soid));
@@ -82,14 +94,22 @@ public:
std::int64_t min,
std::int64_t max);
+ enum interrupt_cause_t : uint8_t {
+ INTERVAL_CHANGE,
+ MAX
+ };
void on_peering_interval_change(ceph::os::Transaction& t) {
- clean_up(t, "new peering interval");
+ clean_up(t, interrupt_cause_t::INTERVAL_CHANGE);
}
seastar::future<> stop() {
for (auto& [soid, recovery_waiter] : recovering) {
recovery_waiter->stop();
}
+ for (auto& [soid, promise] : unfound) {
+ promise.set_exception(
+ crimson::common::system_shutdown_exception());
+ }
return on_stop();
}
protected:
@@ -124,11 +144,14 @@ public:
public boost::intrusive_ref_counter<
WaitForObjectRecovery, boost::thread_unsafe_counter>,
public crimson::BlockerT<WaitForObjectRecovery> {
+ crimson::osd::PG &pg;
std::optional<seastar::shared_promise<>> readable, recovered, pulled;
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
public:
static constexpr const char* type_name = "WaitForObjectRecovery";
+ WaitForObjectRecovery(crimson::osd::PG &pg) : pg(pg) {}
+
crimson::osd::ObjectContextRef obc;
std::optional<pull_info_t> pull_info;
std::map<pg_shard_t, push_info_t> pushing;
@@ -204,28 +227,7 @@ public:
pushes.erase(it);
}
}
- void interrupt(std::string_view why) {
- if (readable) {
- readable->set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- readable.reset();
- }
- if (recovered) {
- recovered->set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- recovered.reset();
- }
- if (pulled) {
- pulled->set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- pulled.reset();
- }
- for (auto& [pg_shard, pr] : pushes) {
- pr.set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- }
- pushes.clear();
- }
+ void interrupt(interrupt_cause_t why);
void stop();
void dump_detail(Formatter* f) const {
}
@@ -235,6 +237,7 @@ public:
using WaitForObjectRecoveryRef = boost::intrusive_ptr<WaitForObjectRecovery>;
protected:
std::map<hobject_t, WaitForObjectRecoveryRef> recovering;
+ std::map<hobject_t, seastar::shared_promise<>> unfound;
hobject_t get_temp_recovery_object(
const hobject_t& target,
eversion_t version) const;
@@ -249,7 +252,7 @@ protected:
backend->clear_temp_objs();
}
- void clean_up(ceph::os::Transaction& t, std::string_view why);
+ void clean_up(ceph::os::Transaction& t, interrupt_cause_t why);
virtual seastar::future<> on_stop() = 0;
private:
void handle_backfill_finish(
diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc
index d01fd646803..d227b9c89e9 100644
--- a/src/crimson/osd/replicated_backend.cc
+++ b/src/crimson/osd/replicated_backend.cc
@@ -65,7 +65,14 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
min_epoch,
tid,
osd_op_p.at_version);
- m->set_data(encoded_txn);
+ if (pg.should_send_op(pg_shard, hoid)) {
+ m->set_data(encoded_txn);
+ } else {
+ ceph::os::Transaction t;
+ bufferlist bl;
+ encode(t, bl);
+ m->set_data(bl);
+ }
pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
encode(log_entries, m->logbl);
m->pg_trim_to = osd_op_p.pg_trim_to;