summaryrefslogtreecommitdiffstats
path: root/src/journal/ObjectRecorder.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal/ObjectRecorder.cc')
-rw-r--r--src/journal/ObjectRecorder.cc46
1 files changed, 26 insertions, 20 deletions
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index e4fe777e263..58dd14df1d8 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -245,7 +245,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
AppendBuffers append_buffers;
{
- Mutex::Locker locker(*m_lock);
+ m_lock->Lock();
auto tid_iter = m_in_flight_tids.find(tid);
assert(tid_iter != m_in_flight_tids.end());
m_in_flight_tids.erase(tid_iter);
@@ -262,7 +262,9 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty() && !m_aio_scheduled) {
append_overflowed();
- notify_handler();
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
}
return;
}
@@ -272,11 +274,8 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
assert(!append_buffers.empty());
m_in_flight_appends.erase(iter);
- if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
- // all remaining unsent appends should be redirected to new object
- notify_handler();
- }
m_in_flight_flushes = true;
+ m_lock->Unlock();
}
// Flag the associated futures as complete.
@@ -288,9 +287,16 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
}
// wake up any flush requests that raced with a RADOS callback
- Mutex::Locker locker(*m_lock);
+ m_lock->Lock();
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();
+
+ if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
+ // all remaining unsent appends should be redirected to new object
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
+ }
}
void ObjectRecorder::append_overflowed() {
@@ -317,6 +323,13 @@ void ObjectRecorder::append_overflowed() {
m_append_buffers.begin(),
m_append_buffers.end());
restart_append_buffers.swap(m_append_buffers);
+
+ for (AppendBuffers::const_iterator it = m_append_buffers.begin();
+ it != m_append_buffers.end(); ++it) {
+ ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
+ << dendl;
+ it->first->detach();
+ }
}
void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
@@ -377,18 +390,21 @@ void ObjectRecorder::send_appends_aio() {
rados_completion->release();
{
- Mutex::Locker locker(*m_lock);
+ m_lock->Lock();
if (m_pending_buffers.empty()) {
m_aio_scheduled = false;
if (m_in_flight_appends.empty() && m_object_closed) {
// all remaining unsent appends should be redirected to new object
- notify_handler();
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
}
} else {
// additional pending items -- reschedule
m_op_work_queue->queue(new FunctionContext([this] (int r) {
send_appends_aio();
}));
+ m_lock->Unlock();
}
}
@@ -396,25 +412,15 @@ void ObjectRecorder::send_appends_aio() {
gather_ctx->activate();
}
-void ObjectRecorder::notify_handler() {
+void ObjectRecorder::notify_handler_unlock() {
assert(m_lock->is_locked());
-
- for (AppendBuffers::const_iterator it = m_append_buffers.begin();
- it != m_append_buffers.end(); ++it) {
- ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
- << dendl;
- it->first->detach();
- }
-
if (m_object_closed) {
m_lock->Unlock();
m_handler->closed(this);
- m_lock->Lock();
} else {
// TODO need to delay completion until after aio_notify completes
m_lock->Unlock();
m_handler->overflow(this);
- m_lock->Lock();
}
}