From 24faead086a50ea1b9614268d4dd5f3ea7bbe445 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 30 Sep 2016 12:59:16 -0400 Subject: journal: clean up object recorder closed/overflow callback Signed-off-by: Jason Dillaman --- src/journal/ObjectRecorder.cc | 46 ++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 20 deletions(-) (limited to 'src/journal/ObjectRecorder.cc') diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index e4fe777e263..58dd14df1d8 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -245,7 +245,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { AppendBuffers append_buffers; { - Mutex::Locker locker(*m_lock); + m_lock->Lock(); auto tid_iter = m_in_flight_tids.find(tid); assert(tid_iter != m_in_flight_tids.end()); m_in_flight_tids.erase(tid_iter); @@ -262,7 +262,9 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { // notify of overflow once all in-flight ops are complete if (m_in_flight_tids.empty() && !m_aio_scheduled) { append_overflowed(); - notify_handler(); + notify_handler_unlock(); + } else { + m_lock->Unlock(); } return; } @@ -272,11 +274,8 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { assert(!append_buffers.empty()); m_in_flight_appends.erase(iter); - if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) { - // all remaining unsent appends should be redirected to new object - notify_handler(); - } m_in_flight_flushes = true; + m_lock->Unlock(); } // Flag the associated futures as complete. @@ -288,9 +287,16 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { } // wake up any flush requests that raced with a RADOS callback - Mutex::Locker locker(*m_lock); + m_lock->Lock(); m_in_flight_flushes = false; m_in_flight_flushes_cond.Signal(); + + if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) { + // all remaining unsent appends should be redirected to new object + notify_handler_unlock(); + } else { + m_lock->Unlock(); + } } void ObjectRecorder::append_overflowed() { @@ -317,6 +323,13 @@ void ObjectRecorder::append_overflowed() { m_append_buffers.begin(), m_append_buffers.end()); restart_append_buffers.swap(m_append_buffers); + + for (AppendBuffers::const_iterator it = m_append_buffers.begin(); + it != m_append_buffers.end(); ++it) { + ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first + << dendl; + it->first->detach(); + } } void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { @@ -377,18 +390,21 @@ void ObjectRecorder::send_appends_aio() { rados_completion->release(); { - Mutex::Locker locker(*m_lock); + m_lock->Lock(); if (m_pending_buffers.empty()) { m_aio_scheduled = false; if (m_in_flight_appends.empty() && m_object_closed) { // all remaining unsent appends should be redirected to new object - notify_handler(); + notify_handler_unlock(); + } else { + m_lock->Unlock(); } } else { // additional pending items -- reschedule m_op_work_queue->queue(new FunctionContext([this] (int r) { send_appends_aio(); })); + m_lock->Unlock(); } } @@ -396,25 +412,15 @@ void ObjectRecorder::send_appends_aio() { gather_ctx->activate(); } -void ObjectRecorder::notify_handler() { +void ObjectRecorder::notify_handler_unlock() { assert(m_lock->is_locked()); - - for (AppendBuffers::const_iterator it = m_append_buffers.begin(); - it != m_append_buffers.end(); ++it) { - ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first - << dendl; - it->first->detach(); - } - if (m_object_closed) { m_lock->Unlock(); m_handler->closed(this); - m_lock->Lock(); } else { // TODO need to delay completion until after aio_notify completes m_lock->Unlock(); m_handler->overflow(this); - m_lock->Lock(); } } -- cgit v1.2.3