diff options
author | Ricardo Dias <rdias@suse.com> | 2016-09-07 16:26:34 +0200 |
---|---|---|
committer | Ricardo Dias <rdias@suse.com> | 2016-09-26 15:25:25 +0200 |
commit | 7b740f5b4ac1c66ac3c80782d2d34e846d00fddd (patch) | |
tree | 2d7ffe8facc2fe193f21096dc05771f2c0137bf9 /src/journal | |
parent | journal: increase concurrency of journal recorder (diff) | |
download | ceph-7b740f5b4ac1c66ac3c80782d2d34e846d00fddd.tar.xz ceph-7b740f5b4ac1c66ac3c80782d2d34e846d00fddd.zip |
journal: make librados call async in ObjectRecorder
Signed-off-by: Ricardo Dias <rdias@suse.com>
Diffstat (limited to 'src/journal')
-rw-r--r-- | src/journal/JournalMetadata.h | 4 | ||||
-rw-r--r-- | src/journal/JournalRecorder.cc | 29 | ||||
-rw-r--r-- | src/journal/JournalRecorder.h | 2 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.cc | 111 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.h | 15 |
5 files changed, 95 insertions, 66 deletions
diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 880130126dd..d28710b3c15 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -98,6 +98,10 @@ public: m_work_queue->queue(on_finish, r); } + inline ContextWQ *get_work_queue() { + return m_work_queue; + } + inline SafeTimer &get_timer() { return *m_timer; } diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 0113bd4a7b9..1b0e5704f71 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -61,10 +61,12 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - m_object_locks.push_back(shared_ptr<Mutex>(new Mutex("ObjectRecorder::m_lock::"+ - std::to_string(splay_offset)))); + m_object_locks.push_back(shared_ptr<Mutex>( + new Mutex("ObjectRecorder::m_lock::"+ + std::to_string(splay_offset)))); uint64_t object_number = splay_offset + (m_current_set * splay_width); - m_object_ptrs[splay_offset] = create_object_recorder(object_number, + m_object_ptrs[splay_offset] = create_object_recorder( + object_number, m_object_locks[splay_offset]); } @@ -206,14 +208,17 @@ void JournalRecorder::open_object_set() { for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); it != m_object_ptrs.end(); ++it) { ObjectRecorderPtr object_recorder = it->second; - if (object_recorder->get_object_number() / splay_width != m_current_set) { + uint64_t object_number = object_recorder->get_object_number(); + if (object_number / splay_width != m_current_set) { assert(object_recorder->is_closed()); // ready to close object and open object in active set - create_next_object_recorder(object_recorder); + create_next_object_recorder_unlock(object_recorder); + } else { + uint8_t splay_offset = object_number % splay_width; + m_object_locks[splay_offset]->Unlock(); } } - unlock_object_recorders(); } bool JournalRecorder::close_object_set(uint64_t active_set) { @@ -246,14 +251,14 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder( uint64_t object_number, shared_ptr<Mutex> lock) { ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), - object_number, lock, m_journal_metadata->get_timer(), - m_journal_metadata->get_timer_lock(), &m_object_handler, - m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes, - m_flush_age)); + object_number, lock, m_journal_metadata->get_work_queue(), + m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), + &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, + m_flush_bytes, m_flush_age)); return object_recorder; } -void JournalRecorder::create_next_object_recorder( +void JournalRecorder::create_next_object_recorder_unlock( ObjectRecorderPtr object_recorder) { assert(m_lock.is_locked()); @@ -279,7 +284,7 @@ void JournalRecorder::create_next_object_recorder( new_object_recorder->get_object_number()); } - new_object_recorder->append(append_buffers, false); + new_object_recorder->append_unlock(append_buffers); m_object_ptrs[splay_offset] = new_object_recorder; } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index acef0e4b179..7a4af52ee83 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -106,7 +106,7 @@ private: ObjectRecorderPtr create_object_recorder(uint64_t object_number, std::shared_ptr<Mutex> lock); - void create_next_object_recorder(ObjectRecorderPtr object_recorder); + void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder); void handle_update(); diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index c12ecff02c6..cbd3842da39 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -19,17 +19,18 @@ namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, shared_ptr<Mutex> lock, - SafeTimer &timer, Mutex &timer_lock, - Handler *handler, uint8_t order, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age) + ContextWQ *work_queue, SafeTimer &timer, + Mutex &timer_lock, Handler *handler, + uint8_t order, uint32_t flush_interval, + uint64_t flush_bytes, double flush_age) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), - m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), - m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), - m_flush_interval(flush_interval), m_flush_bytes(flush_bytes), - m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL), - m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0), - m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { + m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer), + m_timer_lock(timer_lock), m_handler(handler), m_order(order), + m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), + m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this), + m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0), + m_size(0), m_overflowed(false), m_object_closed(false), + m_in_flight_flushes(false), m_aio_scheduled(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); assert(m_handler != NULL); @@ -40,51 +41,39 @@ ObjectRecorder::~ObjectRecorder() { assert(m_append_buffers.empty()); assert(m_in_flight_tids.empty()); assert(m_in_flight_appends.empty()); + assert(!m_aio_scheduled); } bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) { - return append(append_buffers, true); -} - -bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) { assert(m_lock->is_locked()); FutureImplPtr last_flushed_future; bool schedule_append = false; - { - if (m_overflowed) { - m_append_buffers.insert(m_append_buffers.end(), - append_buffers.begin(), append_buffers.end()); - if (unlock) { - m_lock->Unlock(); - } - return false; - } - - for (AppendBuffers::const_iterator iter = append_buffers.begin(); - iter != append_buffers.end(); ++iter) { - if (append(*iter, &schedule_append)) { - last_flushed_future = iter->first; - } - } + + if (m_overflowed) { + m_append_buffers.insert(m_append_buffers.end(), + append_buffers.begin(), append_buffers.end()); + m_lock->Unlock(); + return false; + } - if (unlock) { - m_lock->Unlock(); + for (AppendBuffers::const_iterator iter = append_buffers.begin(); + iter != append_buffers.end(); ++iter) { + if (append(*iter, &schedule_append)) { + last_flushed_future = iter->first; } } if (last_flushed_future) { - if (unlock) { - m_lock->Lock(); - } flush(last_flushed_future); - if (unlock) { - m_lock->Unlock(); - } - } else if (schedule_append) { - schedule_append_task(); + m_lock->Unlock(); } else { - cancel_append_task(); + m_lock->Unlock(); + if (schedule_append) { + schedule_append_task(); + } else { + cancel_append_task(); + } } return (!m_object_closed && !m_overflowed && m_size + m_pending_bytes >= m_soft_max_size); @@ -180,7 +169,7 @@ bool ObjectRecorder::close() { assert(!m_object_closed); m_object_closed = true; - return m_in_flight_tids.empty(); + return m_in_flight_tids.empty() && !m_aio_scheduled; } void ObjectRecorder::handle_append_task() { @@ -268,7 +257,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { } // notify of overflow once all in-flight ops are complete - if (m_in_flight_tids.empty()) { + if (m_in_flight_tids.empty() && !m_aio_scheduled) { notify_handler(); } return; @@ -279,7 +268,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { assert(!append_buffers.empty()); m_in_flight_appends.erase(iter); - if (m_in_flight_appends.empty() && m_object_closed) { + if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) { // all remaining unsent appends should be redirected to new object notify_handler(); } @@ -331,6 +320,32 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { assert(m_lock->is_locked()); assert(!append_buffers->empty()); + for (AppendBuffers::iterator it = append_buffers->begin(); + it != append_buffers->end(); ++it) { + ldout(m_cct, 20) << __func__ << ": flushing " << *it->first + << dendl; + it->first->set_flush_in_progress(); + m_size += it->second.length(); + } + + m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers, + append_buffers->begin(), append_buffers->end()); + if (!m_aio_scheduled) { + m_op_work_queue->queue(new FunctionContext([this] (int r) { + send_appends_aio(); + })); + m_aio_scheduled = true; + } +} + +void ObjectRecorder::send_appends_aio() { + Mutex::Locker locker(*m_lock); + + m_aio_scheduled = false; + + AppendBuffers append_buffers; + m_pending_buffers.swap(append_buffers); + uint64_t append_tid = m_append_tid++; ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid=" << append_tid << dendl; @@ -339,17 +354,15 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { librados::ObjectWriteOperation op; client::guard_append(&op, m_soft_max_size); - for (AppendBuffers::iterator it = append_buffers->begin(); - it != append_buffers->end(); ++it) { + for (AppendBuffers::iterator it = append_buffers.begin(); + it != append_buffers.end(); ++it) { ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl; - it->first->set_flush_in_progress(); op.append(it->second); op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); - m_size += it->second.length(); } m_in_flight_tids.insert(append_tid); - m_in_flight_appends[append_tid].swap(*append_buffers); + m_in_flight_appends[append_tid].swap(append_buffers); librados::AioCompletion *rados_completion = librados::Rados::aio_create_completion(append_flush, NULL, diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 7d93ca26c6f..f9cf1065786 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -9,6 +9,7 @@ #include "common/Cond.h" #include "common/Mutex.h" #include "common/RefCountedObj.h" +#include "common/WorkQueue.h" #include "journal/FutureImpl.h" #include <list> #include <map> @@ -38,9 +39,9 @@ public: ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, std::shared_ptr<Mutex> lock, - SafeTimer &timer, Mutex &timer_lock, Handler *handler, - uint8_t order, uint32_t flush_interval, uint64_t flush_bytes, - double flush_age); + ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock, + Handler *handler, uint8_t order, uint32_t flush_interval, + uint64_t flush_bytes, double flush_age); ~ObjectRecorder(); inline uint64_t get_object_number() const { @@ -51,7 +52,6 @@ public: } bool append_unlock(const AppendBuffers &append_buffers); - bool append(const AppendBuffers &append_buffers, bool unlock); void flush(Context *on_safe); void flush(const FutureImplPtr &future); @@ -86,6 +86,7 @@ private: object_recorder->put(); } virtual void flush(const FutureImplPtr &future) { + Mutex::Locker locker(*(object_recorder->m_lock)); object_recorder->flush(future); } }; @@ -115,6 +116,8 @@ private: uint64_t m_object_number; CephContext *m_cct; + ContextWQ *m_op_work_queue; + SafeTimer &m_timer; Mutex &m_timer_lock; @@ -147,6 +150,9 @@ private: bool m_in_flight_flushes; Cond m_in_flight_flushes_cond; + AppendBuffers m_pending_buffers; + bool m_aio_scheduled; + void handle_append_task(); void cancel_append_task(); void schedule_append_task(); @@ -156,6 +162,7 @@ private: void handle_append_flushed(uint64_t tid, int r); void append_overflowed(uint64_t tid); void send_appends(AppendBuffers *append_buffers); + void send_appends_aio(); void notify_handler(); }; |