diff options
Diffstat (limited to 'src/crimson/osd/pg.h')
-rw-r--r-- | src/crimson/osd/pg.h | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 604f49005ff..06038c0aa00 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -10,6 +10,7 @@ #include <seastar/core/shared_future.hh> #include "common/dout.h" +#include "common/ostream_temp.h" #include "include/interval_set.h" #include "crimson/net/Fwd.h" #include "messages/MOSDRepOpReply.h" @@ -45,6 +46,7 @@ class MQuery; class OSDMap; class PGBackend; +class ReplicatedBackend; class PGPeeringEvent; class osd_op_params_t; @@ -76,7 +78,8 @@ class PG : public boost::intrusive_ref_counter< using ec_profile_t = std::map<std::string,std::string>; using cached_map_t = OSDMapService::cached_map_t; - ClientRequest::PGPipeline request_pg_pipeline; + CommonPGPipeline request_pg_pipeline; + PGRepopPipeline repop_pipeline; PGPeeringPipeline peering_request_pg_pipeline; ClientRequest::Orderer client_request_orderer; @@ -129,8 +132,8 @@ public: return peering_state.get_pg_trim_to(); } - eversion_t get_min_last_complete_ondisk() const { - return peering_state.get_min_last_complete_ondisk(); + eversion_t get_pg_committed_to() const { + return peering_state.get_pg_committed_to(); } const pg_info_t& get_info() const final { @@ -376,6 +379,7 @@ public: void check_blocklisted_watchers() final; void clear_primary_state() final { recovery_finisher = nullptr; + projected_log = PGLog::IndexedLog(); } void queue_check_readable(epoch_t last_peering_reset, @@ -517,6 +521,9 @@ public: // Utility + bool is_active() const { + return peering_state.is_active(); + } bool is_active_clean() const { return peering_state.is_active() && peering_state.is_clean(); } @@ -589,12 +596,13 @@ public: using with_obc_func_t = std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>; - load_obc_iertr::future<> with_locked_obc( - const hobject_t &hobj, - const OpInfo &op_info, - with_obc_func_t&& f); - - interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m); + using handle_rep_op_ret = std::tuple< + interruptible_future<>, // resolves upon commit + MURef<MOSDRepOpReply> // reply message + >; + // outer future resolves upon submission + using handle_rep_op_fut = interruptible_future<handle_rep_op_ret>; + handle_rep_op_fut handle_rep_op(Ref<MOSDRepOp> m); void update_stats(const pg_stat_t &stat); interruptible_future<> update_snap_map( const std::vector<pg_log_entry_t> &log_entries, @@ -603,7 +611,7 @@ public: std::vector<pg_log_entry_t>&& logv, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_commited_to, bool transaction_applied, ObjectStore::Transaction &txn, bool async = false); @@ -663,6 +671,7 @@ private: const OpInfo &op_info, std::vector<OSDOp>& ops); + seastar::shared_mutex submit_lock; using submit_executer_ret = std::tuple< interruptible_future<>, interruptible_future<>>; @@ -675,13 +684,18 @@ private: struct do_osd_ops_params_t; interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m); + +public: interruptible_future< std::tuple<interruptible_future<>, interruptible_future<>>> submit_transaction( ObjectContextRef&& obc, + ObjectContextRef&& new_clone, ceph::os::Transaction&& txn, osd_op_params_t&& oop, std::vector<pg_log_entry_t>&& log_entries); + +private: interruptible_future<> repair_object( const hobject_t& oid, eversion_t& v); @@ -826,8 +840,15 @@ public: const eversion_t version; const int err; }; + PGLog::IndexedLog projected_log; interruptible_future<std::optional<complete_op_t>> already_complete(const osd_reqid_t& reqid); + bool check_in_progress_op( + const osd_reqid_t& reqid, + eversion_t *version, + version_t *user_version, + int *return_code, + std::vector<pg_log_op_return_item_t> *op_returns) const; int get_recovery_op_priority() const { int64_t pri = 0; get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); @@ -879,15 +900,20 @@ private: friend class SnapTrimObjSubEvent; private: - void mutate_object( - ObjectContextRef& obc, - ceph::os::Transaction& txn, - osd_op_params_t& osd_op_p); + void enqueue_push_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers); + void enqueue_delete_for_backfill( + const hobject_t &obj, + const eversion_t &v, + const std::vector<pg_shard_t> &peers); + bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const; bool can_discard_op(const MOSDOp& m) const; void context_registry_on_change(); bool is_missing_object(const hobject_t& soid) const { - return peering_state.get_pg_log().get_missing().get_items().count(soid); + return get_local_missing().is_missing(soid); } bool is_unreadable_object(const hobject_t &oid, eversion_t* v = 0) const final { @@ -895,6 +921,11 @@ private: !peering_state.get_missing_loc().readable_with_acting( oid, get_actingset(), v); } + bool is_missing_on_peer( + const pg_shard_t &peer, + const hobject_t &soid) const { + return peering_state.get_peer_missing(peer).is_missing(soid); + } bool is_degraded_or_backfilling_object(const hobject_t& soid) const; const std::set<pg_shard_t> &get_actingset() const { return peering_state.get_actingset(); @@ -902,6 +933,7 @@ private: private: friend class IOInterruptCondition; + friend class ::ReplicatedBackend; struct log_update_t { std::set<pg_shard_t> waiting_on; seastar::shared_promise<> all_committed; |