diff options
Diffstat (limited to 'src/mds/MDLog.cc')
-rw-r--r-- | src/mds/MDLog.cc | 131 |
1 files changed, 79 insertions, 52 deletions
diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 40d893d6262..0be568433ef 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -146,13 +146,63 @@ class C_MDL_WriteError : public MDSIOContextBase { }; +class C_MDL_WriteHead : public MDSIOContextBase { +public: + explicit C_MDL_WriteHead(MDLog* m) + : MDSIOContextBase(true) + , mdlog(m) + {} + void print(ostream& out) const override { + out << "mdlog_write_head"; + } +protected: + void finish(int r) override { + mdlog->finish_head_waiters(); + } + MDSRank *get_mds() override {return mdlog->mds;} + + MDLog *mdlog; +}; + +void MDLog::finish_head_waiters() +{ + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); + + auto&& last_committed = journaler->get_last_committed(); + auto& expire_pos = last_committed.expire_pos; + + dout(20) << __func__ << " expire_pos=" << std::hex << expire_pos << dendl; + + { + auto last = waiting_for_expire.upper_bound(expire_pos); + for (auto it = waiting_for_expire.begin(); it != last; it++) { + finish_contexts(g_ceph_context, it->second); + } + waiting_for_expire.erase(waiting_for_expire.begin(), last); + } +} + void MDLog::write_head(MDSContext *c) { - Context *fin = NULL; - if (c != NULL) { - fin = new C_IO_Wrapper(mds, c); + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); + + auto&& last_written = journaler->get_last_written(); + auto expire_pos = journaler->get_expire_pos(); + dout(10) << __func__ << " last_written=" << last_written << " current expire_pos=" << std::hex << expire_pos << dendl; + + if (last_written.expire_pos < expire_pos) { + if (c != NULL) { + dout(25) << __func__ << " queueing waiter " << c << dendl; + waiting_for_expire[expire_pos].push_back(c); + } + + auto* fin = new C_MDL_WriteHead(this); + journaler->write_head(fin); + } else { + if (c) { + c->complete(0); + } } - journaler->write_head(fin); } uint64_t MDLog::get_read_pos() const @@ -174,6 +224,8 @@ uint64_t MDLog::get_safe_pos() const void MDLog::create(MDSContext *c) { + ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock)); + dout(5) << "create empty log" << dendl; C_GatherBuilder gather(g_ceph_context); @@ -287,6 +339,8 @@ LogSegment* MDLog::_start_new_segment(SegmentBoundary* sb) logger->set(l_mdl_seg, segments.size()); sb->set_seq(event_seq); + dout(20) << __func__ << ": starting new segment " << *ls << dendl; + // Adjust to next stray dir if (!mds->is_stopping()) { mds->mdcache->advance_stray(); @@ -583,17 +637,6 @@ void MDLog::shutdown() } } -class C_OFT_Committed : public MDSInternalContext { - MDLog *mdlog; - uint64_t seq; -public: - C_OFT_Committed(MDLog *l, uint64_t s) : - MDSInternalContext(l->mds), mdlog(l), seq(s) {} - void finish(int ret) override { - mdlog->trim_expired_segments(); - } -}; - void MDLog::try_to_commit_open_file_table(uint64_t last_seq) { ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); @@ -608,8 +651,7 @@ void MDLog::try_to_commit_open_file_table(uint64_t last_seq) if (mds->mdcache->open_file_table.is_any_dirty() || last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) { submit_mutex.unlock(); - mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq), - last_seq, CEPH_MSG_PRIO_HIGH); + mds->mdcache->open_file_table.commit(nullptr, last_seq, CEPH_MSG_PRIO_HIGH); submit_mutex.lock(); } } @@ -642,7 +684,7 @@ void MDLog::trim() max_ev = events_per_segment + 1; } - submit_mutex.lock(); + std::unique_lock locker{submit_mutex}; // trim! dout(10) << "trim " @@ -653,7 +695,6 @@ void MDLog::trim() << dendl; if (segments.empty()) { - submit_mutex.unlock(); return; } @@ -723,22 +764,23 @@ void MDLog::trim() new_expiring_segments++; expiring_segments.insert(ls); expiring_events += ls->num_events; - submit_mutex.unlock(); + locker.unlock(); uint64_t last_seq = ls->seq; try_expire(ls, op_prio); log_trim_counter.hit(); trim_end = ceph::coarse_mono_clock::now(); - submit_mutex.lock(); + locker.lock(); p = segments.lower_bound(last_seq + 1); } } + ceph_assert(locker.owns_lock()); + try_to_commit_open_file_table(get_last_segment_seq()); - // discard expired segments and unlock submit_mutex - _trim_expired_segments(); + _trim_expired_segments(locker); } class C_MaybeExpiredSegment : public MDSInternalContext { @@ -760,17 +802,18 @@ class C_MaybeExpiredSegment : public MDSInternalContext { * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest * segment. */ -int MDLog::trim_all() +int MDLog::trim_to(SegmentBoundary::seq_t seq) { - submit_mutex.lock(); + std::unique_lock locker(submit_mutex); dout(10) << __func__ << ": " - << segments.size() + << seq + << " " << segments.size() << "/" << expiring_segments.size() << "/" << expired_segments.size() << dendl; - uint64_t last_seq = 0; - if (!segments.empty()) { + uint64_t last_seq = seq; + if (last_seq == 0 || !segments.empty()) { last_seq = get_last_segment_seq(); try_to_commit_open_file_table(last_seq); } @@ -785,7 +828,7 @@ int MDLog::trim_all() // Caller should have flushed journaler before calling this if (pending_events.count(ls->seq)) { dout(5) << __func__ << ": " << *ls << " has pending events" << dendl; - submit_mutex.unlock(); + locker.unlock(); return -CEPHFS_EAGAIN; } @@ -797,17 +840,17 @@ int MDLog::trim_all() ceph_assert(expiring_segments.count(ls) == 0); expiring_segments.insert(ls); expiring_events += ls->num_events; - submit_mutex.unlock(); + locker.unlock(); uint64_t next_seq = ls->seq + 1; try_expire(ls, CEPH_MSG_PRIO_DEFAULT); - submit_mutex.lock(); + locker.lock(); p = segments.lower_bound(next_seq); } } - _trim_expired_segments(); + _trim_expired_segments(locker); return 0; } @@ -848,14 +891,12 @@ void MDLog::_maybe_expired(LogSegment *ls, int op_prio) try_expire(ls, op_prio); } -void MDLog::_trim_expired_segments() +void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx) { ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex)); - - uint64_t const oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq(); + ceph_assert(locker.owns_lock()); // trim expired segments? - bool trimmed = false; uint64_t end = 0; for (auto it = segments.begin(); it != segments.end(); ++it) { auto& [seq, ls] = *it; @@ -891,7 +932,6 @@ void MDLog::_trim_expired_segments() } else { logger->set(l_mdl_expos, jexpire_pos); } - trimmed = true; } if (!expired_segments.count(ls)) { @@ -899,26 +939,13 @@ void MDLog::_trim_expired_segments() break; } - if (!mds_is_shutting_down && ls->seq >= oft_committed_seq) { - dout(10) << __func__ << " defer expire for open file table committedseq " << oft_committed_seq - << " <= " << ls->seq << "/" << ls->offset << dendl; - break; - } - end = seq; dout(10) << __func__ << ": maybe expiring " << *ls << dendl; } - submit_mutex.unlock(); + locker.unlock(); - if (trimmed) - journaler->write_head(0); -} - -void MDLog::trim_expired_segments() -{ - submit_mutex.lock(); - _trim_expired_segments(); + write_head(ctx); } void MDLog::_expired(LogSegment *ls) |