summaryrefslogtreecommitdiffstats
path: root/src/mds/MDLog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds/MDLog.cc')
-rw-r--r--src/mds/MDLog.cc88
1 files changed, 73 insertions, 15 deletions
diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc
index 0be568433ef..d041e3b2fc8 100644
--- a/src/mds/MDLog.cc
+++ b/src/mds/MDLog.cc
@@ -53,11 +53,12 @@ MDLog::MDLog(MDSRank* m)
event_large_threshold = g_conf().get_val<uint64_t>("mds_log_event_large_threshold");
events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment");
pause = g_conf().get_val<bool>("mds_log_pause");
- major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio");
max_segments = g_conf().get_val<uint64_t>("mds_log_max_segments");
max_events = g_conf().get_val<int64_t>("mds_log_max_events");
skip_corrupt_events = g_conf().get_val<bool>("mds_log_skip_corrupt_events");
skip_unbounded_events = g_conf().get_val<bool>("mds_log_skip_unbounded_events");
+ log_warn_factor = g_conf().get_val<double>("mds_log_warn_factor");
+ minor_segments_per_major_segment = g_conf().get_val<uint64_t>("mds_log_minor_segments_per_major_segment");
upkeep_thread = std::thread(&MDLog::log_trim_upkeep, this);
}
@@ -220,7 +221,46 @@ uint64_t MDLog::get_safe_pos() const
return journaler->get_write_safe_pos();
}
+// estimate the replay completion time based on mdlog journal pointers
+EstimatedReplayTime MDLog::get_estimated_replay_finish_time() {
+ ceph_assert(mds->is_replay());
+ EstimatedReplayTime estimated_time{0, std::chrono::seconds::zero(), std::chrono::seconds::zero()};
+ if (!journaler) {
+ return estimated_time;
+ }
+
+ auto read_pos = journaler->get_read_pos();
+ auto write_pos = journaler->get_write_pos();
+ auto trimmed_pos = journaler->get_trimmed_pos();
+
+ dout(20) << __func__ << ": read_pos=" << read_pos << ", write_pos="
+ << write_pos << ", trimmed_pos=" << trimmed_pos << dendl;
+
+ if (read_pos == trimmed_pos || write_pos == trimmed_pos) {
+ return estimated_time;
+ }
+
+ auto total_bytes = write_pos - trimmed_pos;
+ double percent_complete = ((double)(read_pos - trimmed_pos)) / (double)total_bytes;
+ auto elapsed_time = std::chrono::duration_cast<std::chrono::seconds>
+ (ceph::coarse_mono_clock::now() - replay_start_time);
+ auto time = ((1 - percent_complete) / percent_complete) * elapsed_time;
+
+ dout(20) << __func__ << "percent_complete=" << percent_complete
+ << ", elapsed_time=" << elapsed_time
+ << ", estimated_time=" << std::chrono::round<std::chrono::seconds>(time)
+ << dendl;
+
+ estimated_time.percent_complete = percent_complete * 100;
+ estimated_time.elapsed_time = elapsed_time;
+ estimated_time.estimated_time = std::chrono::round<std::chrono::seconds>(time);
+ dout(20) << __func__ << "estimated_time.percent_complete=" << estimated_time.percent_complete
+ << ", estimated_time.elapsed_time=" << estimated_time.elapsed_time
+ << ", estimated_time.estimated_time=" << estimated_time.estimated_time
+ << dendl;
+ return estimated_time;
+}
void MDLog::create(MDSContext *c)
{
@@ -258,7 +298,7 @@ void MDLog::create(MDSContext *c)
logger->set(l_mdl_expos, journaler->get_expire_pos());
logger->set(l_mdl_wrpos, journaler->get_write_pos());
- submit_thread.create("md_submit");
+ submit_thread.create("mds-log-submit");
}
void MDLog::open(MDSContext *c)
@@ -267,9 +307,9 @@ void MDLog::open(MDSContext *c)
ceph_assert(!recovery_thread.is_started());
recovery_thread.set_completion(c);
- recovery_thread.create("md_recov_open");
+ recovery_thread.create("mds-log-recvr");
- submit_thread.create("md_submit");
+ submit_thread.create("mds-log-submit");
// either append() or replay() will follow.
}
@@ -311,7 +351,7 @@ void MDLog::reopen(MDSContext *c)
recovery_thread.join();
recovery_thread.set_completion(new C_ReopenComplete(this, c));
- recovery_thread.create("md_recov_reopen");
+ recovery_thread.create("mds-log-reopen");
}
void MDLog::append()
@@ -357,14 +397,15 @@ void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase* c)
ceph_assert(!mds_is_shutting_down);
event_seq++;
- events_since_last_major_segment++;
if (auto sb = dynamic_cast<SegmentBoundary*>(le); sb) {
auto ls = _start_new_segment(sb);
if (sb->is_major_segment_boundary()) {
major_segments.insert(ls->seq);
logger->set(l_mdl_segmjr, major_segments.size());
- events_since_last_major_segment = 0;
+ minor_segments_since_last_major_segment = 0;
+ } else {
+ ++minor_segments_since_last_major_segment;
}
}
@@ -403,7 +444,7 @@ void MDLog::_segment_upkeep()
uint64_t period = journaler->get_layout_period();
auto ls = get_current_segment();
// start a new segment?
- if (events_since_last_major_segment > events_per_segment*major_segment_event_ratio) {
+ if (minor_segments_since_last_major_segment > minor_segments_per_major_segment) {
dout(10) << __func__ << ": starting new major segment, current " << *ls << dendl;
auto sle = mds->mdcache->create_subtree_map();
_submit_entry(sle, NULL);
@@ -656,7 +697,13 @@ void MDLog::try_to_commit_open_file_table(uint64_t last_seq)
}
}
+bool MDLog::is_trim_slow() const {
+ return (segments.size() > (size_t)(max_segments * log_warn_factor));
+}
+
void MDLog::log_trim_upkeep(void) {
+ ceph_pthread_setname("mds-log-trim");
+
dout(10) << dendl;
std::unique_lock mds_lock(mds->mds_lock);
@@ -1008,7 +1055,7 @@ void MDLog::replay(MDSContext *c)
}
already_replayed = true;
- replay_thread.create("md_log_replay");
+ replay_thread.create("mds-log-replay");
}
@@ -1129,6 +1176,7 @@ void MDLog::_recovery_thread(MDSContext *completion)
{
std::lock_guard l(mds->mds_lock);
journaler = front_journal;
+ replay_start_time = ceph::coarse_mono_clock::now();
}
C_SaferCond recover_wait;
@@ -1366,11 +1414,17 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
// i am a separate thread
void MDLog::_replay_thread()
{
- dout(10) << "_replay_thread start" << dendl;
+ dout(10) << __func__ << ": start time: " << replay_start_time << ", now: "
+ << ceph::coarse_mono_clock::now() << dendl;
// loop
int r = 0;
while (1) {
+ auto sleep_time = g_conf().get_val<std::chrono::milliseconds>("mds_delay_journal_replay_for_testing");
+ if (unlikely(sleep_time > 0ms)) {
+ dout(10) << __func__ << ": sleeping for " << sleep_time << "ms" << dendl;
+ std::this_thread::sleep_for(sleep_time);
+ }
// wait for read?
journaler->check_isreadable();
if (journaler->get_error()) {
@@ -1474,7 +1528,6 @@ void MDLog::_replay_thread()
}
le->set_start_off(pos);
- events_since_last_major_segment++;
if (auto sb = dynamic_cast<SegmentBoundary*>(le.get()); sb) {
auto seq = sb->get_seq();
if (seq > 0) {
@@ -1487,7 +1540,9 @@ void MDLog::_replay_thread()
if (sb->is_major_segment_boundary()) {
major_segments.insert(event_seq);
logger->set(l_mdl_segmjr, major_segments.size());
- events_since_last_major_segment = 0;
+ minor_segments_since_last_major_segment = 0;
+ } else {
+ ++minor_segments_since_last_major_segment;
}
} else {
event_seq++;
@@ -1618,9 +1673,6 @@ void MDLog::handle_conf_change(const std::set<std::string>& changed, const MDSMa
if (changed.count("mds_log_events_per_segment")) {
events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment");
}
- if (changed.count("mds_log_major_segment_event_ratio")) {
- major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio");
- }
if (changed.count("mds_log_max_events")) {
max_events = g_conf().get_val<int64_t>("mds_log_max_events");
}
@@ -1642,4 +1694,10 @@ void MDLog::handle_conf_change(const std::set<std::string>& changed, const MDSMa
if (changed.count("mds_log_trim_decay_rate")){
log_trim_counter = DecayCounter(g_conf().get_val<double>("mds_log_trim_decay_rate"));
}
+ if (changed.count("mds_log_warn_factor")) {
+ log_warn_factor = g_conf().get_val<double>("mds_log_warn_factor");
+ }
+ if (changed.count("mds_log_minor_segments_per_major_segment")) {
+ minor_segments_per_major_segment = g_conf().get_val<uint64_t>("mds_log_minor_segments_per_major_segment");
+ }
}