diff options
author | Jason Dillaman <dillaman@redhat.com> | 2016-09-28 14:35:36 +0200 |
---|---|---|
committer | Jason Dillaman <dillaman@redhat.com> | 2016-10-05 03:31:54 +0200 |
commit | dc77a629ed353d586b63f0bd8e20f54a7595afba (patch) | |
tree | 53cbe8a71e01fb40bf0e5509a8bd6fdfe58a3cd5 /src/journal | |
parent | Merge pull request #10223 from mikulely/doc-s3-static-website-support (diff) | |
download | ceph-dc77a629ed353d586b63f0bd8e20f54a7595afba.tar.xz ceph-dc77a629ed353d586b63f0bd8e20f54a7595afba.zip |
journal: avoid holding lock while sending journal append
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/journal')
-rw-r--r-- | src/journal/JournalRecorder.cc | 8 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.cc | 49 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.h | 2 |
3 files changed, 38 insertions, 21 deletions
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 1b0e5704f71..1917165008f 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -105,10 +105,7 @@ Future JournalRecorder::append(uint64_t tag_tid, entry_bl); assert(entry_bl.length() <= m_journal_metadata->get_object_size()); - AppendBuffers append_buffers; - append_buffers.push_back(std::make_pair(future, entry_bl)); - bool object_full = object_ptr->append_unlock(append_buffers); - + bool object_full = object_ptr->append_unlock({{future, entry_bl}}); if (object_full) { ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" << dendl; @@ -284,8 +281,7 @@ void JournalRecorder::create_next_object_recorder_unlock( new_object_recorder->get_object_number()); } - new_object_recorder->append_unlock(append_buffers); - + new_object_recorder->append_unlock(std::move(append_buffers)); m_object_ptrs[splay_offset] = new_object_recorder; } diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index cbd3842da39..1079b0a8db7 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -44,12 +44,12 @@ ObjectRecorder::~ObjectRecorder() { assert(!m_aio_scheduled); } -bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) { +bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) { assert(m_lock->is_locked()); FutureImplPtr last_flushed_future; bool schedule_append = false; - + if (m_overflowed) { m_append_buffers.insert(m_append_buffers.end(), append_buffers.begin(), append_buffers.end()); @@ -339,37 +339,58 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { } void ObjectRecorder::send_appends_aio() { - Mutex::Locker locker(*m_lock); - - m_aio_scheduled = false; + AppendBuffers *append_buffers; + uint64_t append_tid; + { + Mutex::Locker locker(*m_lock); + append_tid = m_append_tid++; + m_in_flight_tids.insert(append_tid); - AppendBuffers append_buffers; - m_pending_buffers.swap(append_buffers); + // safe to hold pointer outside lock until op is submitted + append_buffers = &m_in_flight_appends[append_tid]; + append_buffers->swap(m_pending_buffers); + } - uint64_t append_tid = m_append_tid++; ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid=" << append_tid << dendl; C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid); + C_Gather *gather_ctx = new C_Gather(m_cct, append_flush); librados::ObjectWriteOperation op; client::guard_append(&op, m_soft_max_size); - - for (AppendBuffers::iterator it = append_buffers.begin(); - it != append_buffers.end(); ++it) { + for (AppendBuffers::iterator it = append_buffers->begin(); + it != append_buffers->end(); ++it) { ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl; op.append(it->second); op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); } - m_in_flight_tids.insert(append_tid); - m_in_flight_appends[append_tid].swap(append_buffers); librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(append_flush, NULL, + librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr, utils::rados_ctx_callback); int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); assert(r == 0); rados_completion->release(); + + { + Mutex::Locker locker(*m_lock); + if (m_pending_buffers.empty()) { + m_aio_scheduled = false; + if (m_in_flight_appends.empty() && m_object_closed) { + // all remaining unsent appends should be redirected to new object + notify_handler(); + } + } else { + // additional pending items -- reschedule + m_op_work_queue->queue(new FunctionContext([this] (int r) { + send_appends_aio(); + })); + } + } + + // allow append op to complete + gather_ctx->activate(); } void ObjectRecorder::notify_handler() { diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index f9cf1065786..9b285f81908 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -51,7 +51,7 @@ public: return m_oid; } - bool append_unlock(const AppendBuffers &append_buffers); + bool append_unlock(AppendBuffers &&append_buffers); void flush(Context *on_safe); void flush(const FutureImplPtr &future); |