summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSamuel Just <sjust@redhat.com>2019-04-13 02:12:45 +0200
committersjust@redhat.com <sjust@redhat.com>2019-05-01 20:22:27 +0200
commit4eef7f405082d7d23f63404e8a952cc0d237df3d (patch)
treef16170210f75cb6910569b7a9f8de1742053af3f
parentosd/: refactor to avoid mutable peer_missing refs in PG (diff)
downloadceph-4eef7f405082d7d23f63404e8a952cc0d237df3d.tar.xz
ceph-4eef7f405082d7d23f63404e8a952cc0d237df3d.zip
osd/: Move log version pointer updates to PeeringState
Signed-off-by: sjust@redhat.com <sjust@redhat.com>
-rw-r--r--src/osd/PG.cc17
-rw-r--r--src/osd/PG.h14
-rw-r--r--src/osd/PeeringState.cc132
-rw-r--r--src/osd/PeeringState.h37
-rw-r--r--src/osd/PrimaryLogPG.cc158
-rw-r--r--src/osd/PrimaryLogPG.h6
6 files changed, 193 insertions, 171 deletions
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index a2f616e663d..06fd8340463 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -188,13 +188,7 @@ PG::PG(OSDService *o, OSDMapRef curmap,
pg_whoami(recovery_state.pg_whoami),
info(recovery_state.info),
pg_log(recovery_state.pg_log),
- last_update_ondisk(recovery_state.last_update_ondisk),
- last_complete_ondisk(recovery_state.last_complete_ondisk),
- last_update_applied(recovery_state.last_update_applied),
peer_info(recovery_state.peer_info),
- peer_last_complete_ondisk(recovery_state.peer_last_complete_ondisk),
- min_last_complete_ondisk(recovery_state.min_last_complete_ondisk),
- pg_trim_to(recovery_state.pg_trim_to),
missing_loc(recovery_state.missing_loc),
pg_id(p),
coll(p),
@@ -332,8 +326,6 @@ void PG::update_object_snap_mapping(
/******* PG ***********/
void PG::clear_primary_state()
{
- last_update_ondisk = eversion_t();
-
projected_log = PGLog::IndexedLog();
snap_trimq.clear();
@@ -2814,7 +2806,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
break;
case PG::Scrubber::WAIT_LAST_UPDATE:
- if (last_update_applied < scrubber.subset_last_update) {
+ if (recovery_state.get_last_update_applied() <
+ scrubber.subset_last_update) {
// will be requeued by op_applied
dout(15) << "wait for EC read/modify/writes to queue" << dendl;
done = true;
@@ -2842,7 +2835,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
break;
case PG::Scrubber::BUILD_MAP:
- ceph_assert(last_update_applied >= scrubber.subset_last_update);
+ ceph_assert(recovery_state.get_last_update_applied() >=
+ scrubber.subset_last_update);
// build my own scrub map
if (scrub_preempted) {
@@ -2898,7 +2892,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
break;
case PG::Scrubber::COMPARE_MAPS:
- ceph_assert(last_update_applied >= scrubber.subset_last_update);
+ ceph_assert(recovery_state.get_last_update_applied() >=
+ scrubber.subset_last_update);
ceph_assert(scrubber.waiting_on_whom.empty());
scrub_compare_maps();
diff --git a/src/osd/PG.h b/src/osd/PG.h
index f38ba46ad04..1469862fa9f 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -185,13 +185,7 @@ protected:
pg_shard_t pg_whoami;
pg_info_t &info;
PGLog &pg_log;
- eversion_t &last_update_ondisk;
- eversion_t &last_complete_ondisk;
- eversion_t &last_update_applied;
map<pg_shard_t, pg_info_t> &peer_info;
- map<pg_shard_t,eversion_t> &peer_last_complete_ondisk;
- eversion_t &min_last_complete_ondisk;
- eversion_t &pg_trim_to;
MissingLoc &missing_loc;
public:
@@ -998,10 +992,6 @@ protected:
bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const;
- virtual void calc_trim_to() = 0;
-
- virtual void calc_trim_to_aggressive() = 0;
-
struct PGLogEntryHandler : public PGLog::LogEntryHandler {
PG *pg;
ObjectStore::Transaction *t;
@@ -1377,10 +1367,6 @@ protected:
bool delete_needs_sleep = false;
protected:
- bool hard_limit_pglog() const {
- return (get_osdmap()->test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT));
- }
-
bool state_test(uint64_t m) const { return recovery_state.state_test(m); }
void state_set(uint64_t m) { recovery_state.state_set(m); }
void state_clear(uint64_t m) { recovery_state.state_clear(m); }
diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc
index 4f2fac332f5..0802f5dc372 100644
--- a/src/osd/PeeringState.cc
+++ b/src/osd/PeeringState.cc
@@ -11,6 +11,7 @@
#include "messages/MRecoveryReserve.h"
#include "messages/MOSDScrubReserve.h"
#include "messages/MOSDPGInfo.h"
+#include "messages/MOSDPGTrim.h"
#define dout_context cct
#define dout_subsys ceph_subsys_osd
@@ -797,6 +798,7 @@ void PeeringState::clear_primary_state()
clear_recovery_state();
+ last_update_ondisk = eversion_t();
pl->clear_primary_state();
}
@@ -3743,6 +3745,136 @@ void PeeringState::pre_submit_op(
}
}
+void PeeringState::recovery_committed_to(eversion_t version)
+{
+ psdout(10) << __func__ << " version " << version
+ << " now ondisk" << dendl;
+ last_complete_ondisk = version;
+
+ if (last_complete_ondisk == info.last_update) {
+ if (!is_primary()) {
+ // Either we are a replica or backfill target.
+ // we are fully up to date. tell the primary!
+ pl->send_cluster_message(
+ get_primary().osd,
+ new MOSDPGTrim(
+ get_osdmap_epoch(),
+ spg_t(info.pgid.pgid, primary.shard),
+ last_complete_ondisk),
+ get_osdmap_epoch());
+ } else {
+ calc_min_last_complete_ondisk();
+ }
+ }
+}
+
+void PeeringState::complete_write(eversion_t v, eversion_t lc)
+{
+ last_update_ondisk = v;
+ last_complete_ondisk = lc;
+ calc_min_last_complete_ondisk();
+}
+
+void PeeringState::calc_trim_to()
+{
+ size_t target = cct->_conf->osd_min_pg_log_entries;
+ if (is_degraded() ||
+ state_test(PG_STATE_RECOVERING |
+ PG_STATE_RECOVERY_WAIT |
+ PG_STATE_BACKFILLING |
+ PG_STATE_BACKFILL_WAIT |
+ PG_STATE_BACKFILL_TOOFULL)) {
+ target = cct->_conf->osd_max_pg_log_entries;
+ }
+
+ eversion_t limit = std::min(
+ min_last_complete_ondisk,
+ pg_log.get_can_rollback_to());
+ if (limit != eversion_t() &&
+ limit != pg_trim_to &&
+ pg_log.get_log().approx_size() > target) {
+ size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target,
+ cct->_conf->osd_pg_log_trim_max);
+ if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+ cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
+ return;
+ }
+ list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
+ eversion_t new_trim_to;
+ for (size_t i = 0; i < num_to_trim; ++i) {
+ new_trim_to = it->version;
+ ++it;
+ if (new_trim_to > limit) {
+ new_trim_to = limit;
+ psdout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
+ break;
+ }
+ }
+ psdout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
+ pg_trim_to = new_trim_to;
+ assert(pg_trim_to <= pg_log.get_head());
+ assert(pg_trim_to <= min_last_complete_ondisk);
+ }
+}
+
+void PeeringState::calc_trim_to_aggressive()
+{
+ size_t target = cct->_conf->osd_min_pg_log_entries;
+ if (is_degraded() ||
+ state_test(PG_STATE_RECOVERING |
+ PG_STATE_RECOVERY_WAIT |
+ PG_STATE_BACKFILLING |
+ PG_STATE_BACKFILL_WAIT |
+ PG_STATE_BACKFILL_TOOFULL)) {
+ target = cct->_conf->osd_max_pg_log_entries;
+ }
+ // limit pg log trimming up to the can_rollback_to value
+ eversion_t limit = std::min(
+ pg_log.get_head(),
+ pg_log.get_can_rollback_to());
+ psdout(10) << __func__ << " limit = " << limit << dendl;
+
+ if (limit != eversion_t() &&
+ limit != pg_trim_to &&
+ pg_log.get_log().approx_size() > target) {
+ psdout(10) << __func__ << " approx pg log length = "
+ << pg_log.get_log().approx_size() << dendl;
+ uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target,
+ cct->_conf->osd_pg_log_trim_max);
+ psdout(10) << __func__ << " num_to_trim = " << num_to_trim << dendl;
+ if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+ cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
+ return;
+ }
+ auto it = pg_log.get_log().log.begin(); // oldest log entry
+ auto rit = pg_log.get_log().log.rbegin();
+ eversion_t by_n_to_keep; // start from tail
+ eversion_t by_n_to_trim = eversion_t::max(); // start from head
+ for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) {
+ i++;
+ if (i > target && by_n_to_keep == eversion_t()) {
+ by_n_to_keep = rit->version;
+ }
+ if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) {
+ by_n_to_trim = it->version;
+ }
+ if (by_n_to_keep != eversion_t() &&
+ by_n_to_trim != eversion_t::max()) {
+ break;
+ }
+ }
+
+ if (by_n_to_keep == eversion_t()) {
+ return;
+ }
+
+ pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit});
+ psdout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl;
+ ceph_assert(pg_trim_to <= pg_log.get_head());
+ }
+}
+
+
/*------------ Peering State Machine----------------*/
#undef dout_prefix
#define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \
diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h
index 8af0d6ed90a..1211c62d313 100644
--- a/src/osd/PeeringState.h
+++ b/src/osd/PeeringState.h
@@ -1422,6 +1422,9 @@ public:
void add_log_entry(const pg_log_entry_t& e, bool applied);
+ void calc_trim_to();
+ void calc_trim_to_aggressive();
+
public:
PeeringState(
CephContext *cct,
@@ -1525,6 +1528,14 @@ public:
bool transaction_applied,
bool async);
+ void update_trim_to() {
+ bool hard_limit = (get_osdmap()->test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT));
+ if (hard_limit)
+ calc_trim_to_aggressive();
+ else
+ calc_trim_to();
+ }
+
void pre_submit_op(
const hobject_t &hoid,
const vector<pg_log_entry_t>& logv,
@@ -1555,6 +1566,24 @@ public:
const hobject_t &oid,
eversion_t version);
+ void update_peer_last_complete_ondisk(
+ pg_shard_t fromosd,
+ eversion_t lcod) {
+ peer_last_complete_ondisk[fromosd] = lcod;
+ }
+
+ void update_last_complete_ondisk(
+ eversion_t lcod) {
+ last_complete_ondisk = lcod;
+ }
+
+ void recovery_committed_to(eversion_t version);
+
+ void complete_write(eversion_t v, eversion_t lc);
+ void local_write_applied(eversion_t v) {
+ last_update_applied = v;
+ }
+
void dump_history(Formatter *f) const {
state_history.dump(f);
}
@@ -1815,6 +1844,14 @@ public:
return min_last_complete_ondisk;
}
+ eversion_t get_pg_trim_to() const {
+ return pg_trim_to;
+ }
+
+ eversion_t get_last_update_applied() const {
+ return last_update_applied;
+ }
+
bool debug_has_dirty_state() const {
return dirty_info || dirty_big_info;
}
diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc
index 84e99eb9a40..2dec89ee12b 100644
--- a/src/osd/PrimaryLogPG.cc
+++ b/src/osd/PrimaryLogPG.cc
@@ -1533,105 +1533,6 @@ int PrimaryLogPG::do_scrub_ls(MOSDOp *m, OSDOp *osd_op)
return r;
}
-void PrimaryLogPG::calc_trim_to()
-{
- size_t target = cct->_conf->osd_min_pg_log_entries;
- if (is_degraded() ||
- state_test(PG_STATE_RECOVERING |
- PG_STATE_RECOVERY_WAIT |
- PG_STATE_BACKFILLING |
- PG_STATE_BACKFILL_WAIT |
- PG_STATE_BACKFILL_TOOFULL)) {
- target = cct->_conf->osd_max_pg_log_entries;
- }
-
- eversion_t limit = std::min(
- min_last_complete_ondisk,
- pg_log.get_can_rollback_to());
- if (limit != eversion_t() &&
- limit != pg_trim_to &&
- pg_log.get_log().approx_size() > target) {
- size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target,
- cct->_conf->osd_pg_log_trim_max);
- if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
- cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
- return;
- }
- list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
- eversion_t new_trim_to;
- for (size_t i = 0; i < num_to_trim; ++i) {
- new_trim_to = it->version;
- ++it;
- if (new_trim_to > limit) {
- new_trim_to = limit;
- dout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
- break;
- }
- }
- dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
- pg_trim_to = new_trim_to;
- assert(pg_trim_to <= pg_log.get_head());
- assert(pg_trim_to <= min_last_complete_ondisk);
- }
-}
-
-void PrimaryLogPG::calc_trim_to_aggressive()
-{
- size_t target = cct->_conf->osd_min_pg_log_entries;
- if (is_degraded() ||
- state_test(PG_STATE_RECOVERING |
- PG_STATE_RECOVERY_WAIT |
- PG_STATE_BACKFILLING |
- PG_STATE_BACKFILL_WAIT |
- PG_STATE_BACKFILL_TOOFULL)) {
- target = cct->_conf->osd_max_pg_log_entries;
- }
- // limit pg log trimming up to the can_rollback_to value
- eversion_t limit = std::min(
- pg_log.get_head(),
- pg_log.get_can_rollback_to());
- dout(10) << __func__ << " limit = " << limit << dendl;
-
- if (limit != eversion_t() &&
- limit != pg_trim_to &&
- pg_log.get_log().approx_size() > target) {
- dout(10) << __func__ << " approx pg log length = "
- << pg_log.get_log().approx_size() << dendl;
- uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target,
- cct->_conf->osd_pg_log_trim_max);
- dout(10) << __func__ << " num_to_trim = " << num_to_trim << dendl;
- if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
- cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
- return;
- }
- auto it = pg_log.get_log().log.begin(); // oldest log entry
- auto rit = pg_log.get_log().log.rbegin();
- eversion_t by_n_to_keep; // start from tail
- eversion_t by_n_to_trim = eversion_t::max(); // start from head
- for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) {
- i++;
- if (i > target && by_n_to_keep == eversion_t()) {
- by_n_to_keep = rit->version;
- }
- if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) {
- by_n_to_trim = it->version;
- }
- if (by_n_to_keep != eversion_t() &&
- by_n_to_trim != eversion_t::max()) {
- break;
- }
- }
-
- if (by_n_to_keep == eversion_t()) {
- return;
- }
-
- pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit});
- dout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl;
- ceph_assert(pg_trim_to <= pg_log.get_head());
- }
-}
-
PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool,
const map<string,string>& ec_profile, spg_t p) :
@@ -3944,10 +3845,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
ceph_assert(op->may_write() || op->may_cache());
// trim log?
- if (hard_limit_pglog())
- calc_trim_to_aggressive();
- else
- calc_trim_to();
+ recovery_state.update_trim_to();
// verify that we are doing this in order?
if (cct->_conf->osd_debug_op_order && m->get_source().is_client() &&
@@ -10344,8 +10242,7 @@ void PrimaryLogPG::repop_all_committed(RepGather *repop)
repop->all_committed = true;
if (!repop->rep_aborted) {
if (repop->v != eversion_t()) {
- last_update_ondisk = repop->v;
- last_complete_ondisk = repop->pg_local_last_complete;
+ recovery_state.complete_write(repop->v, repop->pg_local_last_complete);
}
eval_repop(repop);
}
@@ -10356,10 +10253,11 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
dout(10) << "op_applied version " << applied_version << dendl;
ceph_assert(applied_version != eversion_t());
ceph_assert(applied_version <= info.last_update);
- last_update_applied = applied_version;
+ recovery_state.local_write_applied(applied_version);
if (is_primary()) {
if (scrubber.active) {
- if (last_update_applied >= scrubber.subset_last_update) {
+ if (recovery_state.get_last_update_applied() >=
+ scrubber.subset_last_update) {
requeue_scrub(ops_blocked_by_scrub());
}
} else {
@@ -10403,7 +10301,6 @@ void PrimaryLogPG::eval_repop(RepGather *repop)
}
publish_stats_to_osd();
- recovery_state.calc_min_last_complete_ondisk();
dout(10) << " removing " << *repop << dendl;
ceph_assert(!repop_queue.empty());
@@ -10460,8 +10357,8 @@ void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
ctx->delta_stats,
ctx->at_version,
std::move(ctx->op_t),
- pg_trim_to,
- min_last_complete_ondisk,
+ recovery_state.get_pg_trim_to(),
+ recovery_state.get_min_last_complete_ondisk(),
ctx->log,
ctx->updated_hset_history,
on_all_commit,
@@ -10553,10 +10450,7 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx)
dout(20) << __func__ << " " << repop << dendl;
issue_repop(repop, ctx.get());
eval_repop(repop);
- if (hard_limit_pglog())
- calc_trim_to_aggressive();
- else
- calc_trim_to();
+ recovery_state.update_trim_to();
repop->put();
}
@@ -10595,8 +10489,8 @@ void PrimaryLogPG::submit_log_entries(
ObjectStore::Transaction t;
eversion_t old_last_update = info.last_update;
recovery_state.merge_new_log_entries(
- entries, t, pg_trim_to, min_last_complete_ondisk);
-
+ entries, t, recovery_state.get_pg_trim_to(),
+ recovery_state.get_min_last_complete_ondisk());
set<pg_shard_t> waiting_on;
for (set<pg_shard_t>::const_iterator i = get_acting_recovery_backfill().begin();
@@ -10615,8 +10509,8 @@ void PrimaryLogPG::submit_log_entries(
get_osdmap_epoch(),
get_last_peering_reset(),
repop->rep_tid,
- pg_trim_to,
- min_last_complete_ondisk);
+ recovery_state.get_pg_trim_to(),
+ recovery_state.get_min_last_complete_ondisk());
osd->send_message_osd_cluster(
peer.osd, m, get_osdmap_epoch());
waiting_on.insert(peer);
@@ -10671,10 +10565,7 @@ void PrimaryLogPG::submit_log_entries(
op_applied(info.last_update);
});
- if (hard_limit_pglog())
- calc_trim_to_aggressive();
- else
- calc_trim_to();
+ recovery_state.update_trim_to();
}
void PrimaryLogPG::cancel_log_updates()
@@ -11484,27 +11375,10 @@ void PrimaryLogPG::_committed_pushed_object(
{
lock();
if (!pg_has_reset_since(epoch)) {
- dout(10) << __func__ << " last_complete " << last_complete << " now ondisk" << dendl;
- last_complete_ondisk = last_complete;
-
- if (last_complete_ondisk == info.last_update) {
- if (!is_primary()) {
- // Either we are a replica or backfill target.
- // we are fully up to date. tell the primary!
- osd->send_message_osd_cluster(
- get_primary().osd,
- new MOSDPGTrim(
- get_osdmap_epoch(),
- spg_t(info.pgid.pgid, get_primary().shard),
- last_complete_ondisk),
- get_osdmap_epoch());
- } else {
- recovery_state.calc_min_last_complete_ondisk();
- }
- }
-
+ recovery_state.recovery_committed_to(last_complete);
} else {
- dout(10) << __func__ << " pg has changed, not touching last_complete_ondisk" << dendl;
+ dout(10) << __func__
+ << " pg has changed, not touching last_complete_ondisk" << dendl;
}
unlock();
diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h
index 7953989902e..884db99fe71 100644
--- a/src/osd/PrimaryLogPG.h
+++ b/src/osd/PrimaryLogPG.h
@@ -482,12 +482,12 @@ public:
void update_peer_last_complete_ondisk(
pg_shard_t fromosd,
eversion_t lcod) override {
- peer_last_complete_ondisk[fromosd] = lcod;
+ recovery_state.update_peer_last_complete_ondisk(fromosd, lcod);
}
void update_last_complete_ondisk(
eversion_t lcod) override {
- last_complete_ondisk = lcod;
+ recovery_state.update_last_complete_ondisk(lcod);
}
void update_stats(
@@ -1382,8 +1382,6 @@ protected:
unsigned split_bits) override;
void apply_and_flush_repops(bool requeue);
- void calc_trim_to() override;
- void calc_trim_to_aggressive() override;
int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);