diff options
Diffstat (limited to 'src/mds/MDLog.cc')
-rw-r--r-- | src/mds/MDLog.cc | 88 |
1 files changed, 73 insertions, 15 deletions
diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 0be568433ef..d041e3b2fc8 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -53,11 +53,12 @@ MDLog::MDLog(MDSRank* m) event_large_threshold = g_conf().get_val<uint64_t>("mds_log_event_large_threshold"); events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment"); pause = g_conf().get_val<bool>("mds_log_pause"); - major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio"); max_segments = g_conf().get_val<uint64_t>("mds_log_max_segments"); max_events = g_conf().get_val<int64_t>("mds_log_max_events"); skip_corrupt_events = g_conf().get_val<bool>("mds_log_skip_corrupt_events"); skip_unbounded_events = g_conf().get_val<bool>("mds_log_skip_unbounded_events"); + log_warn_factor = g_conf().get_val<double>("mds_log_warn_factor"); + minor_segments_per_major_segment = g_conf().get_val<uint64_t>("mds_log_minor_segments_per_major_segment"); upkeep_thread = std::thread(&MDLog::log_trim_upkeep, this); } @@ -220,7 +221,46 @@ uint64_t MDLog::get_safe_pos() const return journaler->get_write_safe_pos(); } +// estimate the replay completion time based on mdlog journal pointers +EstimatedReplayTime MDLog::get_estimated_replay_finish_time() { + ceph_assert(mds->is_replay()); + EstimatedReplayTime estimated_time{0, std::chrono::seconds::zero(), std::chrono::seconds::zero()}; + if (!journaler) { + return estimated_time; + } + + auto read_pos = journaler->get_read_pos(); + auto write_pos = journaler->get_write_pos(); + auto trimmed_pos = journaler->get_trimmed_pos(); + + dout(20) << __func__ << ": read_pos=" << read_pos << ", write_pos=" + << write_pos << ", trimmed_pos=" << trimmed_pos << dendl; + + if (read_pos == trimmed_pos || write_pos == trimmed_pos) { + return estimated_time; + } + + auto total_bytes = write_pos - trimmed_pos; + double percent_complete = ((double)(read_pos - trimmed_pos)) / (double)total_bytes; + auto elapsed_time = std::chrono::duration_cast<std::chrono::seconds> + (ceph::coarse_mono_clock::now() - replay_start_time); + auto time = ((1 - percent_complete) / percent_complete) * elapsed_time; + + dout(20) << __func__ << "percent_complete=" << percent_complete + << ", elapsed_time=" << elapsed_time + << ", estimated_time=" << std::chrono::round<std::chrono::seconds>(time) + << dendl; + + estimated_time.percent_complete = percent_complete * 100; + estimated_time.elapsed_time = elapsed_time; + estimated_time.estimated_time = std::chrono::round<std::chrono::seconds>(time); + dout(20) << __func__ << "estimated_time.percent_complete=" << estimated_time.percent_complete + << ", estimated_time.elapsed_time=" << estimated_time.elapsed_time + << ", estimated_time.estimated_time=" << estimated_time.estimated_time + << dendl; + return estimated_time; +} void MDLog::create(MDSContext *c) { @@ -258,7 +298,7 @@ void MDLog::create(MDSContext *c) logger->set(l_mdl_expos, journaler->get_expire_pos()); logger->set(l_mdl_wrpos, journaler->get_write_pos()); - submit_thread.create("md_submit"); + submit_thread.create("mds-log-submit"); } void MDLog::open(MDSContext *c) @@ -267,9 +307,9 @@ void MDLog::open(MDSContext *c) ceph_assert(!recovery_thread.is_started()); recovery_thread.set_completion(c); - recovery_thread.create("md_recov_open"); + recovery_thread.create("mds-log-recvr"); - submit_thread.create("md_submit"); + submit_thread.create("mds-log-submit"); // either append() or replay() will follow. } @@ -311,7 +351,7 @@ void MDLog::reopen(MDSContext *c) recovery_thread.join(); recovery_thread.set_completion(new C_ReopenComplete(this, c)); - recovery_thread.create("md_recov_reopen"); + recovery_thread.create("mds-log-reopen"); } void MDLog::append() @@ -357,14 +397,15 @@ void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase* c) ceph_assert(!mds_is_shutting_down); event_seq++; - events_since_last_major_segment++; if (auto sb = dynamic_cast<SegmentBoundary*>(le); sb) { auto ls = _start_new_segment(sb); if (sb->is_major_segment_boundary()) { major_segments.insert(ls->seq); logger->set(l_mdl_segmjr, major_segments.size()); - events_since_last_major_segment = 0; + minor_segments_since_last_major_segment = 0; + } else { + ++minor_segments_since_last_major_segment; } } @@ -403,7 +444,7 @@ void MDLog::_segment_upkeep() uint64_t period = journaler->get_layout_period(); auto ls = get_current_segment(); // start a new segment? - if (events_since_last_major_segment > events_per_segment*major_segment_event_ratio) { + if (minor_segments_since_last_major_segment > minor_segments_per_major_segment) { dout(10) << __func__ << ": starting new major segment, current " << *ls << dendl; auto sle = mds->mdcache->create_subtree_map(); _submit_entry(sle, NULL); @@ -656,7 +697,13 @@ void MDLog::try_to_commit_open_file_table(uint64_t last_seq) } } +bool MDLog::is_trim_slow() const { + return (segments.size() > (size_t)(max_segments * log_warn_factor)); +} + void MDLog::log_trim_upkeep(void) { + ceph_pthread_setname("mds-log-trim"); + dout(10) << dendl; std::unique_lock mds_lock(mds->mds_lock); @@ -1008,7 +1055,7 @@ void MDLog::replay(MDSContext *c) } already_replayed = true; - replay_thread.create("md_log_replay"); + replay_thread.create("mds-log-replay"); } @@ -1129,6 +1176,7 @@ void MDLog::_recovery_thread(MDSContext *completion) { std::lock_guard l(mds->mds_lock); journaler = front_journal; + replay_start_time = ceph::coarse_mono_clock::now(); } C_SaferCond recover_wait; @@ -1366,11 +1414,17 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa // i am a separate thread void MDLog::_replay_thread() { - dout(10) << "_replay_thread start" << dendl; + dout(10) << __func__ << ": start time: " << replay_start_time << ", now: " + << ceph::coarse_mono_clock::now() << dendl; // loop int r = 0; while (1) { + auto sleep_time = g_conf().get_val<std::chrono::milliseconds>("mds_delay_journal_replay_for_testing"); + if (unlikely(sleep_time > 0ms)) { + dout(10) << __func__ << ": sleeping for " << sleep_time << "ms" << dendl; + std::this_thread::sleep_for(sleep_time); + } // wait for read? journaler->check_isreadable(); if (journaler->get_error()) { @@ -1474,7 +1528,6 @@ void MDLog::_replay_thread() } le->set_start_off(pos); - events_since_last_major_segment++; if (auto sb = dynamic_cast<SegmentBoundary*>(le.get()); sb) { auto seq = sb->get_seq(); if (seq > 0) { @@ -1487,7 +1540,9 @@ void MDLog::_replay_thread() if (sb->is_major_segment_boundary()) { major_segments.insert(event_seq); logger->set(l_mdl_segmjr, major_segments.size()); - events_since_last_major_segment = 0; + minor_segments_since_last_major_segment = 0; + } else { + ++minor_segments_since_last_major_segment; } } else { event_seq++; @@ -1618,9 +1673,6 @@ void MDLog::handle_conf_change(const std::set<std::string>& changed, const MDSMa if (changed.count("mds_log_events_per_segment")) { events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment"); } - if (changed.count("mds_log_major_segment_event_ratio")) { - major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio"); - } if (changed.count("mds_log_max_events")) { max_events = g_conf().get_val<int64_t>("mds_log_max_events"); } @@ -1642,4 +1694,10 @@ void MDLog::handle_conf_change(const std::set<std::string>& changed, const MDSMa if (changed.count("mds_log_trim_decay_rate")){ log_trim_counter = DecayCounter(g_conf().get_val<double>("mds_log_trim_decay_rate")); } + if (changed.count("mds_log_warn_factor")) { + log_warn_factor = g_conf().get_val<double>("mds_log_warn_factor"); + } + if (changed.count("mds_log_minor_segments_per_major_segment")) { + minor_segments_per_major_segment = g_conf().get_val<uint64_t>("mds_log_minor_segments_per_major_segment"); + } } |