summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/pg.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/pg.h')
-rw-r--r--src/crimson/osd/pg.h62
1 files changed, 47 insertions, 15 deletions
diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h
index 604f49005ff..06038c0aa00 100644
--- a/src/crimson/osd/pg.h
+++ b/src/crimson/osd/pg.h
@@ -10,6 +10,7 @@
#include <seastar/core/shared_future.hh>
#include "common/dout.h"
+#include "common/ostream_temp.h"
#include "include/interval_set.h"
#include "crimson/net/Fwd.h"
#include "messages/MOSDRepOpReply.h"
@@ -45,6 +46,7 @@
class MQuery;
class OSDMap;
class PGBackend;
+class ReplicatedBackend;
class PGPeeringEvent;
class osd_op_params_t;
@@ -76,7 +78,8 @@ class PG : public boost::intrusive_ref_counter<
using ec_profile_t = std::map<std::string,std::string>;
using cached_map_t = OSDMapService::cached_map_t;
- ClientRequest::PGPipeline request_pg_pipeline;
+ CommonPGPipeline request_pg_pipeline;
+ PGRepopPipeline repop_pipeline;
PGPeeringPipeline peering_request_pg_pipeline;
ClientRequest::Orderer client_request_orderer;
@@ -129,8 +132,8 @@ public:
return peering_state.get_pg_trim_to();
}
- eversion_t get_min_last_complete_ondisk() const {
- return peering_state.get_min_last_complete_ondisk();
+ eversion_t get_pg_committed_to() const {
+ return peering_state.get_pg_committed_to();
}
const pg_info_t& get_info() const final {
@@ -376,6 +379,7 @@ public:
void check_blocklisted_watchers() final;
void clear_primary_state() final {
recovery_finisher = nullptr;
+ projected_log = PGLog::IndexedLog();
}
void queue_check_readable(epoch_t last_peering_reset,
@@ -517,6 +521,9 @@ public:
// Utility
+ bool is_active() const {
+ return peering_state.is_active();
+ }
bool is_active_clean() const {
return peering_state.is_active() && peering_state.is_clean();
}
@@ -589,12 +596,13 @@ public:
using with_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;
- load_obc_iertr::future<> with_locked_obc(
- const hobject_t &hobj,
- const OpInfo &op_info,
- with_obc_func_t&& f);
-
- interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
+ using handle_rep_op_ret = std::tuple<
+ interruptible_future<>, // resolves upon commit
+ MURef<MOSDRepOpReply> // reply message
+ >;
+ // outer future resolves upon submission
+ using handle_rep_op_fut = interruptible_future<handle_rep_op_ret>;
+ handle_rep_op_fut handle_rep_op(Ref<MOSDRepOp> m);
void update_stats(const pg_stat_t &stat);
interruptible_future<> update_snap_map(
const std::vector<pg_log_entry_t> &log_entries,
@@ -603,7 +611,7 @@ public:
std::vector<pg_log_entry_t>&& logv,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
- const eversion_t &min_last_complete_ondisk,
+ const eversion_t &pg_commited_to,
bool transaction_applied,
ObjectStore::Transaction &txn,
bool async = false);
@@ -663,6 +671,7 @@ private:
const OpInfo &op_info,
std::vector<OSDOp>& ops);
+ seastar::shared_mutex submit_lock;
using submit_executer_ret = std::tuple<
interruptible_future<>,
interruptible_future<>>;
@@ -675,13 +684,18 @@ private:
struct do_osd_ops_params_t;
interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
+
+public:
interruptible_future<
std::tuple<interruptible_future<>, interruptible_future<>>>
submit_transaction(
ObjectContextRef&& obc,
+ ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& oop,
std::vector<pg_log_entry_t>&& log_entries);
+
+private:
interruptible_future<> repair_object(
const hobject_t& oid,
eversion_t& v);
@@ -826,8 +840,15 @@ public:
const eversion_t version;
const int err;
};
+ PGLog::IndexedLog projected_log;
interruptible_future<std::optional<complete_op_t>>
already_complete(const osd_reqid_t& reqid);
+ bool check_in_progress_op(
+ const osd_reqid_t& reqid,
+ eversion_t *version,
+ version_t *user_version,
+ int *return_code,
+ std::vector<pg_log_op_return_item_t> *op_returns) const;
int get_recovery_op_priority() const {
int64_t pri = 0;
get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
@@ -879,15 +900,20 @@ private:
friend class SnapTrimObjSubEvent;
private:
- void mutate_object(
- ObjectContextRef& obc,
- ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_p);
+ void enqueue_push_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers);
+ void enqueue_delete_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers);
+
bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const;
bool can_discard_op(const MOSDOp& m) const;
void context_registry_on_change();
bool is_missing_object(const hobject_t& soid) const {
- return peering_state.get_pg_log().get_missing().get_items().count(soid);
+ return get_local_missing().is_missing(soid);
}
bool is_unreadable_object(const hobject_t &oid,
eversion_t* v = 0) const final {
@@ -895,6 +921,11 @@ private:
!peering_state.get_missing_loc().readable_with_acting(
oid, get_actingset(), v);
}
+ bool is_missing_on_peer(
+ const pg_shard_t &peer,
+ const hobject_t &soid) const {
+ return peering_state.get_peer_missing(peer).is_missing(soid);
+ }
bool is_degraded_or_backfilling_object(const hobject_t& soid) const;
const std::set<pg_shard_t> &get_actingset() const {
return peering_state.get_actingset();
@@ -902,6 +933,7 @@ private:
private:
friend class IOInterruptCondition;
+ friend class ::ReplicatedBackend;
struct log_update_t {
std::set<pg_shard_t> waiting_on;
seastar::shared_promise<> all_committed;