summaryrefslogtreecommitdiffstats
path: root/src/mds/MDLog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/mds/MDLog.cc')
-rw-r--r--src/mds/MDLog.cc131
1 files changed, 79 insertions, 52 deletions
diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc
index 40d893d6262..0be568433ef 100644
--- a/src/mds/MDLog.cc
+++ b/src/mds/MDLog.cc
@@ -146,13 +146,63 @@ class C_MDL_WriteError : public MDSIOContextBase {
};
+class C_MDL_WriteHead : public MDSIOContextBase {
+public:
+ explicit C_MDL_WriteHead(MDLog* m)
+ : MDSIOContextBase(true)
+ , mdlog(m)
+ {}
+ void print(ostream& out) const override {
+ out << "mdlog_write_head";
+ }
+protected:
+ void finish(int r) override {
+ mdlog->finish_head_waiters();
+ }
+ MDSRank *get_mds() override {return mdlog->mds;}
+
+ MDLog *mdlog;
+};
+
+void MDLog::finish_head_waiters()
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
+ auto&& last_committed = journaler->get_last_committed();
+ auto& expire_pos = last_committed.expire_pos;
+
+ dout(20) << __func__ << " expire_pos=" << std::hex << expire_pos << dendl;
+
+ {
+ auto last = waiting_for_expire.upper_bound(expire_pos);
+ for (auto it = waiting_for_expire.begin(); it != last; it++) {
+ finish_contexts(g_ceph_context, it->second);
+ }
+ waiting_for_expire.erase(waiting_for_expire.begin(), last);
+ }
+}
+
void MDLog::write_head(MDSContext *c)
{
- Context *fin = NULL;
- if (c != NULL) {
- fin = new C_IO_Wrapper(mds, c);
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
+ auto&& last_written = journaler->get_last_written();
+ auto expire_pos = journaler->get_expire_pos();
+ dout(10) << __func__ << " last_written=" << last_written << " current expire_pos=" << std::hex << expire_pos << dendl;
+
+ if (last_written.expire_pos < expire_pos) {
+ if (c != NULL) {
+ dout(25) << __func__ << " queueing waiter " << c << dendl;
+ waiting_for_expire[expire_pos].push_back(c);
+ }
+
+ auto* fin = new C_MDL_WriteHead(this);
+ journaler->write_head(fin);
+ } else {
+ if (c) {
+ c->complete(0);
+ }
}
- journaler->write_head(fin);
}
uint64_t MDLog::get_read_pos() const
@@ -174,6 +224,8 @@ uint64_t MDLog::get_safe_pos() const
void MDLog::create(MDSContext *c)
{
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
dout(5) << "create empty log" << dendl;
C_GatherBuilder gather(g_ceph_context);
@@ -287,6 +339,8 @@ LogSegment* MDLog::_start_new_segment(SegmentBoundary* sb)
logger->set(l_mdl_seg, segments.size());
sb->set_seq(event_seq);
+ dout(20) << __func__ << ": starting new segment " << *ls << dendl;
+
// Adjust to next stray dir
if (!mds->is_stopping()) {
mds->mdcache->advance_stray();
@@ -583,17 +637,6 @@ void MDLog::shutdown()
}
}
-class C_OFT_Committed : public MDSInternalContext {
- MDLog *mdlog;
- uint64_t seq;
-public:
- C_OFT_Committed(MDLog *l, uint64_t s) :
- MDSInternalContext(l->mds), mdlog(l), seq(s) {}
- void finish(int ret) override {
- mdlog->trim_expired_segments();
- }
-};
-
void MDLog::try_to_commit_open_file_table(uint64_t last_seq)
{
ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
@@ -608,8 +651,7 @@ void MDLog::try_to_commit_open_file_table(uint64_t last_seq)
if (mds->mdcache->open_file_table.is_any_dirty() ||
last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) {
submit_mutex.unlock();
- mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
- last_seq, CEPH_MSG_PRIO_HIGH);
+ mds->mdcache->open_file_table.commit(nullptr, last_seq, CEPH_MSG_PRIO_HIGH);
submit_mutex.lock();
}
}
@@ -642,7 +684,7 @@ void MDLog::trim()
max_ev = events_per_segment + 1;
}
- submit_mutex.lock();
+ std::unique_lock locker{submit_mutex};
// trim!
dout(10) << "trim "
@@ -653,7 +695,6 @@ void MDLog::trim()
<< dendl;
if (segments.empty()) {
- submit_mutex.unlock();
return;
}
@@ -723,22 +764,23 @@ void MDLog::trim()
new_expiring_segments++;
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.unlock();
+ locker.unlock();
uint64_t last_seq = ls->seq;
try_expire(ls, op_prio);
log_trim_counter.hit();
trim_end = ceph::coarse_mono_clock::now();
- submit_mutex.lock();
+ locker.lock();
p = segments.lower_bound(last_seq + 1);
}
}
+ ceph_assert(locker.owns_lock());
+
try_to_commit_open_file_table(get_last_segment_seq());
- // discard expired segments and unlock submit_mutex
- _trim_expired_segments();
+ _trim_expired_segments(locker);
}
class C_MaybeExpiredSegment : public MDSInternalContext {
@@ -760,17 +802,18 @@ class C_MaybeExpiredSegment : public MDSInternalContext {
* Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
* segment.
*/
-int MDLog::trim_all()
+int MDLog::trim_to(SegmentBoundary::seq_t seq)
{
- submit_mutex.lock();
+ std::unique_lock locker(submit_mutex);
dout(10) << __func__ << ": "
- << segments.size()
+ << seq
+ << " " << segments.size()
<< "/" << expiring_segments.size()
<< "/" << expired_segments.size() << dendl;
- uint64_t last_seq = 0;
- if (!segments.empty()) {
+ uint64_t last_seq = seq;
+ if (last_seq == 0 || !segments.empty()) {
last_seq = get_last_segment_seq();
try_to_commit_open_file_table(last_seq);
}
@@ -785,7 +828,7 @@ int MDLog::trim_all()
// Caller should have flushed journaler before calling this
if (pending_events.count(ls->seq)) {
dout(5) << __func__ << ": " << *ls << " has pending events" << dendl;
- submit_mutex.unlock();
+ locker.unlock();
return -CEPHFS_EAGAIN;
}
@@ -797,17 +840,17 @@ int MDLog::trim_all()
ceph_assert(expiring_segments.count(ls) == 0);
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.unlock();
+ locker.unlock();
uint64_t next_seq = ls->seq + 1;
try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
- submit_mutex.lock();
+ locker.lock();
p = segments.lower_bound(next_seq);
}
}
- _trim_expired_segments();
+ _trim_expired_segments(locker);
return 0;
}
@@ -848,14 +891,12 @@ void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
try_expire(ls, op_prio);
}
-void MDLog::_trim_expired_segments()
+void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx)
{
ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
-
- uint64_t const oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
+ ceph_assert(locker.owns_lock());
// trim expired segments?
- bool trimmed = false;
uint64_t end = 0;
for (auto it = segments.begin(); it != segments.end(); ++it) {
auto& [seq, ls] = *it;
@@ -891,7 +932,6 @@ void MDLog::_trim_expired_segments()
} else {
logger->set(l_mdl_expos, jexpire_pos);
}
- trimmed = true;
}
if (!expired_segments.count(ls)) {
@@ -899,26 +939,13 @@ void MDLog::_trim_expired_segments()
break;
}
- if (!mds_is_shutting_down && ls->seq >= oft_committed_seq) {
- dout(10) << __func__ << " defer expire for open file table committedseq " << oft_committed_seq
- << " <= " << ls->seq << "/" << ls->offset << dendl;
- break;
- }
-
end = seq;
dout(10) << __func__ << ": maybe expiring " << *ls << dendl;
}
- submit_mutex.unlock();
+ locker.unlock();
- if (trimmed)
- journaler->write_head(0);
-}
-
-void MDLog::trim_expired_segments()
-{
- submit_mutex.lock();
- _trim_expired_segments();
+ write_head(ctx);
}
void MDLog::_expired(LogSegment *ls)