diff options
author | Samuel Just <sjust@redhat.com> | 2019-04-13 02:12:45 +0200 |
---|---|---|
committer | sjust@redhat.com <sjust@redhat.com> | 2019-05-01 20:22:27 +0200 |
commit | 4eef7f405082d7d23f63404e8a952cc0d237df3d (patch) | |
tree | f16170210f75cb6910569b7a9f8de1742053af3f | |
parent | osd/: refactor to avoid mutable peer_missing refs in PG (diff) | |
download | ceph-4eef7f405082d7d23f63404e8a952cc0d237df3d.tar.xz ceph-4eef7f405082d7d23f63404e8a952cc0d237df3d.zip |
osd/: Move log version pointer updates to PeeringState
Signed-off-by: sjust@redhat.com <sjust@redhat.com>
-rw-r--r-- | src/osd/PG.cc | 17 | ||||
-rw-r--r-- | src/osd/PG.h | 14 | ||||
-rw-r--r-- | src/osd/PeeringState.cc | 132 | ||||
-rw-r--r-- | src/osd/PeeringState.h | 37 | ||||
-rw-r--r-- | src/osd/PrimaryLogPG.cc | 158 | ||||
-rw-r--r-- | src/osd/PrimaryLogPG.h | 6 |
6 files changed, 193 insertions, 171 deletions
diff --git a/src/osd/PG.cc b/src/osd/PG.cc index a2f616e663d..06fd8340463 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -188,13 +188,7 @@ PG::PG(OSDService *o, OSDMapRef curmap, pg_whoami(recovery_state.pg_whoami), info(recovery_state.info), pg_log(recovery_state.pg_log), - last_update_ondisk(recovery_state.last_update_ondisk), - last_complete_ondisk(recovery_state.last_complete_ondisk), - last_update_applied(recovery_state.last_update_applied), peer_info(recovery_state.peer_info), - peer_last_complete_ondisk(recovery_state.peer_last_complete_ondisk), - min_last_complete_ondisk(recovery_state.min_last_complete_ondisk), - pg_trim_to(recovery_state.pg_trim_to), missing_loc(recovery_state.missing_loc), pg_id(p), coll(p), @@ -332,8 +326,6 @@ void PG::update_object_snap_mapping( /******* PG ***********/ void PG::clear_primary_state() { - last_update_ondisk = eversion_t(); - projected_log = PGLog::IndexedLog(); snap_trimq.clear(); @@ -2814,7 +2806,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) break; case PG::Scrubber::WAIT_LAST_UPDATE: - if (last_update_applied < scrubber.subset_last_update) { + if (recovery_state.get_last_update_applied() < + scrubber.subset_last_update) { // will be requeued by op_applied dout(15) << "wait for EC read/modify/writes to queue" << dendl; done = true; @@ -2842,7 +2835,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) break; case PG::Scrubber::BUILD_MAP: - ceph_assert(last_update_applied >= scrubber.subset_last_update); + ceph_assert(recovery_state.get_last_update_applied() >= + scrubber.subset_last_update); // build my own scrub map if (scrub_preempted) { @@ -2898,7 +2892,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) break; case PG::Scrubber::COMPARE_MAPS: - ceph_assert(last_update_applied >= scrubber.subset_last_update); + ceph_assert(recovery_state.get_last_update_applied() >= + scrubber.subset_last_update); ceph_assert(scrubber.waiting_on_whom.empty()); scrub_compare_maps(); diff --git a/src/osd/PG.h b/src/osd/PG.h index f38ba46ad04..1469862fa9f 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -185,13 +185,7 @@ protected: pg_shard_t pg_whoami; pg_info_t &info; PGLog &pg_log; - eversion_t &last_update_ondisk; - eversion_t &last_complete_ondisk; - eversion_t &last_update_applied; map<pg_shard_t, pg_info_t> &peer_info; - map<pg_shard_t,eversion_t> &peer_last_complete_ondisk; - eversion_t &min_last_complete_ondisk; - eversion_t &pg_trim_to; MissingLoc &missing_loc; public: @@ -998,10 +992,6 @@ protected: bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const; - virtual void calc_trim_to() = 0; - - virtual void calc_trim_to_aggressive() = 0; - struct PGLogEntryHandler : public PGLog::LogEntryHandler { PG *pg; ObjectStore::Transaction *t; @@ -1377,10 +1367,6 @@ protected: bool delete_needs_sleep = false; protected: - bool hard_limit_pglog() const { - return (get_osdmap()->test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT)); - } - bool state_test(uint64_t m) const { return recovery_state.state_test(m); } void state_set(uint64_t m) { recovery_state.state_set(m); } void state_clear(uint64_t m) { recovery_state.state_clear(m); } diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 4f2fac332f5..0802f5dc372 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -11,6 +11,7 @@ #include "messages/MRecoveryReserve.h" #include "messages/MOSDScrubReserve.h" #include "messages/MOSDPGInfo.h" +#include "messages/MOSDPGTrim.h" #define dout_context cct #define dout_subsys ceph_subsys_osd @@ -797,6 +798,7 @@ void PeeringState::clear_primary_state() clear_recovery_state(); + last_update_ondisk = eversion_t(); pl->clear_primary_state(); } @@ -3743,6 +3745,136 @@ void PeeringState::pre_submit_op( } } +void PeeringState::recovery_committed_to(eversion_t version) +{ + psdout(10) << __func__ << " version " << version + << " now ondisk" << dendl; + last_complete_ondisk = version; + + if (last_complete_ondisk == info.last_update) { + if (!is_primary()) { + // Either we are a replica or backfill target. + // we are fully up to date. tell the primary! + pl->send_cluster_message( + get_primary().osd, + new MOSDPGTrim( + get_osdmap_epoch(), + spg_t(info.pgid.pgid, primary.shard), + last_complete_ondisk), + get_osdmap_epoch()); + } else { + calc_min_last_complete_ondisk(); + } + } +} + +void PeeringState::complete_write(eversion_t v, eversion_t lc) +{ + last_update_ondisk = v; + last_complete_ondisk = lc; + calc_min_last_complete_ondisk(); +} + +void PeeringState::calc_trim_to() +{ + size_t target = cct->_conf->osd_min_pg_log_entries; + if (is_degraded() || + state_test(PG_STATE_RECOVERING | + PG_STATE_RECOVERY_WAIT | + PG_STATE_BACKFILLING | + PG_STATE_BACKFILL_WAIT | + PG_STATE_BACKFILL_TOOFULL)) { + target = cct->_conf->osd_max_pg_log_entries; + } + + eversion_t limit = std::min( + min_last_complete_ondisk, + pg_log.get_can_rollback_to()); + if (limit != eversion_t() && + limit != pg_trim_to && + pg_log.get_log().approx_size() > target) { + size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target, + cct->_conf->osd_pg_log_trim_max); + if (num_to_trim < cct->_conf->osd_pg_log_trim_min && + cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { + return; + } + list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin(); + eversion_t new_trim_to; + for (size_t i = 0; i < num_to_trim; ++i) { + new_trim_to = it->version; + ++it; + if (new_trim_to > limit) { + new_trim_to = limit; + psdout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl; + break; + } + } + psdout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl; + pg_trim_to = new_trim_to; + assert(pg_trim_to <= pg_log.get_head()); + assert(pg_trim_to <= min_last_complete_ondisk); + } +} + +void PeeringState::calc_trim_to_aggressive() +{ + size_t target = cct->_conf->osd_min_pg_log_entries; + if (is_degraded() || + state_test(PG_STATE_RECOVERING | + PG_STATE_RECOVERY_WAIT | + PG_STATE_BACKFILLING | + PG_STATE_BACKFILL_WAIT | + PG_STATE_BACKFILL_TOOFULL)) { + target = cct->_conf->osd_max_pg_log_entries; + } + // limit pg log trimming up to the can_rollback_to value + eversion_t limit = std::min( + pg_log.get_head(), + pg_log.get_can_rollback_to()); + psdout(10) << __func__ << " limit = " << limit << dendl; + + if (limit != eversion_t() && + limit != pg_trim_to && + pg_log.get_log().approx_size() > target) { + psdout(10) << __func__ << " approx pg log length = " + << pg_log.get_log().approx_size() << dendl; + uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target, + cct->_conf->osd_pg_log_trim_max); + psdout(10) << __func__ << " num_to_trim = " << num_to_trim << dendl; + if (num_to_trim < cct->_conf->osd_pg_log_trim_min && + cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { + return; + } + auto it = pg_log.get_log().log.begin(); // oldest log entry + auto rit = pg_log.get_log().log.rbegin(); + eversion_t by_n_to_keep; // start from tail + eversion_t by_n_to_trim = eversion_t::max(); // start from head + for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) { + i++; + if (i > target && by_n_to_keep == eversion_t()) { + by_n_to_keep = rit->version; + } + if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) { + by_n_to_trim = it->version; + } + if (by_n_to_keep != eversion_t() && + by_n_to_trim != eversion_t::max()) { + break; + } + } + + if (by_n_to_keep == eversion_t()) { + return; + } + + pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit}); + psdout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl; + ceph_assert(pg_trim_to <= pg_log.get_head()); + } +} + + /*------------ Peering State Machine----------------*/ #undef dout_prefix #define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \ diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 8af0d6ed90a..1211c62d313 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -1422,6 +1422,9 @@ public: void add_log_entry(const pg_log_entry_t& e, bool applied); + void calc_trim_to(); + void calc_trim_to_aggressive(); + public: PeeringState( CephContext *cct, @@ -1525,6 +1528,14 @@ public: bool transaction_applied, bool async); + void update_trim_to() { + bool hard_limit = (get_osdmap()->test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT)); + if (hard_limit) + calc_trim_to_aggressive(); + else + calc_trim_to(); + } + void pre_submit_op( const hobject_t &hoid, const vector<pg_log_entry_t>& logv, @@ -1555,6 +1566,24 @@ public: const hobject_t &oid, eversion_t version); + void update_peer_last_complete_ondisk( + pg_shard_t fromosd, + eversion_t lcod) { + peer_last_complete_ondisk[fromosd] = lcod; + } + + void update_last_complete_ondisk( + eversion_t lcod) { + last_complete_ondisk = lcod; + } + + void recovery_committed_to(eversion_t version); + + void complete_write(eversion_t v, eversion_t lc); + void local_write_applied(eversion_t v) { + last_update_applied = v; + } + void dump_history(Formatter *f) const { state_history.dump(f); } @@ -1815,6 +1844,14 @@ public: return min_last_complete_ondisk; } + eversion_t get_pg_trim_to() const { + return pg_trim_to; + } + + eversion_t get_last_update_applied() const { + return last_update_applied; + } + bool debug_has_dirty_state() const { return dirty_info || dirty_big_info; } diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 84e99eb9a40..2dec89ee12b 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1533,105 +1533,6 @@ int PrimaryLogPG::do_scrub_ls(MOSDOp *m, OSDOp *osd_op) return r; } -void PrimaryLogPG::calc_trim_to() -{ - size_t target = cct->_conf->osd_min_pg_log_entries; - if (is_degraded() || - state_test(PG_STATE_RECOVERING | - PG_STATE_RECOVERY_WAIT | - PG_STATE_BACKFILLING | - PG_STATE_BACKFILL_WAIT | - PG_STATE_BACKFILL_TOOFULL)) { - target = cct->_conf->osd_max_pg_log_entries; - } - - eversion_t limit = std::min( - min_last_complete_ondisk, - pg_log.get_can_rollback_to()); - if (limit != eversion_t() && - limit != pg_trim_to && - pg_log.get_log().approx_size() > target) { - size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target, - cct->_conf->osd_pg_log_trim_max); - if (num_to_trim < cct->_conf->osd_pg_log_trim_min && - cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { - return; - } - list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin(); - eversion_t new_trim_to; - for (size_t i = 0; i < num_to_trim; ++i) { - new_trim_to = it->version; - ++it; - if (new_trim_to > limit) { - new_trim_to = limit; - dout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl; - break; - } - } - dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl; - pg_trim_to = new_trim_to; - assert(pg_trim_to <= pg_log.get_head()); - assert(pg_trim_to <= min_last_complete_ondisk); - } -} - -void PrimaryLogPG::calc_trim_to_aggressive() -{ - size_t target = cct->_conf->osd_min_pg_log_entries; - if (is_degraded() || - state_test(PG_STATE_RECOVERING | - PG_STATE_RECOVERY_WAIT | - PG_STATE_BACKFILLING | - PG_STATE_BACKFILL_WAIT | - PG_STATE_BACKFILL_TOOFULL)) { - target = cct->_conf->osd_max_pg_log_entries; - } - // limit pg log trimming up to the can_rollback_to value - eversion_t limit = std::min( - pg_log.get_head(), - pg_log.get_can_rollback_to()); - dout(10) << __func__ << " limit = " << limit << dendl; - - if (limit != eversion_t() && - limit != pg_trim_to && - pg_log.get_log().approx_size() > target) { - dout(10) << __func__ << " approx pg log length = " - << pg_log.get_log().approx_size() << dendl; - uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target, - cct->_conf->osd_pg_log_trim_max); - dout(10) << __func__ << " num_to_trim = " << num_to_trim << dendl; - if (num_to_trim < cct->_conf->osd_pg_log_trim_min && - cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { - return; - } - auto it = pg_log.get_log().log.begin(); // oldest log entry - auto rit = pg_log.get_log().log.rbegin(); - eversion_t by_n_to_keep; // start from tail - eversion_t by_n_to_trim = eversion_t::max(); // start from head - for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) { - i++; - if (i > target && by_n_to_keep == eversion_t()) { - by_n_to_keep = rit->version; - } - if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) { - by_n_to_trim = it->version; - } - if (by_n_to_keep != eversion_t() && - by_n_to_trim != eversion_t::max()) { - break; - } - } - - if (by_n_to_keep == eversion_t()) { - return; - } - - pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit}); - dout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl; - ceph_assert(pg_trim_to <= pg_log.get_head()); - } -} - PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap, const PGPool &_pool, const map<string,string>& ec_profile, spg_t p) : @@ -3944,10 +3845,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx) ceph_assert(op->may_write() || op->may_cache()); // trim log? - if (hard_limit_pglog()) - calc_trim_to_aggressive(); - else - calc_trim_to(); + recovery_state.update_trim_to(); // verify that we are doing this in order? if (cct->_conf->osd_debug_op_order && m->get_source().is_client() && @@ -10344,8 +10242,7 @@ void PrimaryLogPG::repop_all_committed(RepGather *repop) repop->all_committed = true; if (!repop->rep_aborted) { if (repop->v != eversion_t()) { - last_update_ondisk = repop->v; - last_complete_ondisk = repop->pg_local_last_complete; + recovery_state.complete_write(repop->v, repop->pg_local_last_complete); } eval_repop(repop); } @@ -10356,10 +10253,11 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version) dout(10) << "op_applied version " << applied_version << dendl; ceph_assert(applied_version != eversion_t()); ceph_assert(applied_version <= info.last_update); - last_update_applied = applied_version; + recovery_state.local_write_applied(applied_version); if (is_primary()) { if (scrubber.active) { - if (last_update_applied >= scrubber.subset_last_update) { + if (recovery_state.get_last_update_applied() >= + scrubber.subset_last_update) { requeue_scrub(ops_blocked_by_scrub()); } } else { @@ -10403,7 +10301,6 @@ void PrimaryLogPG::eval_repop(RepGather *repop) } publish_stats_to_osd(); - recovery_state.calc_min_last_complete_ondisk(); dout(10) << " removing " << *repop << dendl; ceph_assert(!repop_queue.empty()); @@ -10460,8 +10357,8 @@ void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx) ctx->delta_stats, ctx->at_version, std::move(ctx->op_t), - pg_trim_to, - min_last_complete_ondisk, + recovery_state.get_pg_trim_to(), + recovery_state.get_min_last_complete_ondisk(), ctx->log, ctx->updated_hset_history, on_all_commit, @@ -10553,10 +10450,7 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx) dout(20) << __func__ << " " << repop << dendl; issue_repop(repop, ctx.get()); eval_repop(repop); - if (hard_limit_pglog()) - calc_trim_to_aggressive(); - else - calc_trim_to(); + recovery_state.update_trim_to(); repop->put(); } @@ -10595,8 +10489,8 @@ void PrimaryLogPG::submit_log_entries( ObjectStore::Transaction t; eversion_t old_last_update = info.last_update; recovery_state.merge_new_log_entries( - entries, t, pg_trim_to, min_last_complete_ondisk); - + entries, t, recovery_state.get_pg_trim_to(), + recovery_state.get_min_last_complete_ondisk()); set<pg_shard_t> waiting_on; for (set<pg_shard_t>::const_iterator i = get_acting_recovery_backfill().begin(); @@ -10615,8 +10509,8 @@ void PrimaryLogPG::submit_log_entries( get_osdmap_epoch(), get_last_peering_reset(), repop->rep_tid, - pg_trim_to, - min_last_complete_ondisk); + recovery_state.get_pg_trim_to(), + recovery_state.get_min_last_complete_ondisk()); osd->send_message_osd_cluster( peer.osd, m, get_osdmap_epoch()); waiting_on.insert(peer); @@ -10671,10 +10565,7 @@ void PrimaryLogPG::submit_log_entries( op_applied(info.last_update); }); - if (hard_limit_pglog()) - calc_trim_to_aggressive(); - else - calc_trim_to(); + recovery_state.update_trim_to(); } void PrimaryLogPG::cancel_log_updates() @@ -11484,27 +11375,10 @@ void PrimaryLogPG::_committed_pushed_object( { lock(); if (!pg_has_reset_since(epoch)) { - dout(10) << __func__ << " last_complete " << last_complete << " now ondisk" << dendl; - last_complete_ondisk = last_complete; - - if (last_complete_ondisk == info.last_update) { - if (!is_primary()) { - // Either we are a replica or backfill target. - // we are fully up to date. tell the primary! - osd->send_message_osd_cluster( - get_primary().osd, - new MOSDPGTrim( - get_osdmap_epoch(), - spg_t(info.pgid.pgid, get_primary().shard), - last_complete_ondisk), - get_osdmap_epoch()); - } else { - recovery_state.calc_min_last_complete_ondisk(); - } - } - + recovery_state.recovery_committed_to(last_complete); } else { - dout(10) << __func__ << " pg has changed, not touching last_complete_ondisk" << dendl; + dout(10) << __func__ + << " pg has changed, not touching last_complete_ondisk" << dendl; } unlock(); diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 7953989902e..884db99fe71 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -482,12 +482,12 @@ public: void update_peer_last_complete_ondisk( pg_shard_t fromosd, eversion_t lcod) override { - peer_last_complete_ondisk[fromosd] = lcod; + recovery_state.update_peer_last_complete_ondisk(fromosd, lcod); } void update_last_complete_ondisk( eversion_t lcod) override { - last_complete_ondisk = lcod; + recovery_state.update_last_complete_ondisk(lcod); } void update_stats( @@ -1382,8 +1382,6 @@ protected: unsigned split_bits) override; void apply_and_flush_repops(bool requeue); - void calc_trim_to() override; - void calc_trim_to_aggressive() override; int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr); int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr); |