summaryrefslogtreecommitdiffstats
path: root/src/journal
diff options
context:
space:
mode:
authorRicardo Dias <rdias@suse.com>2016-09-07 16:26:34 +0200
committerRicardo Dias <rdias@suse.com>2016-09-26 15:25:25 +0200
commit7b740f5b4ac1c66ac3c80782d2d34e846d00fddd (patch)
tree2d7ffe8facc2fe193f21096dc05771f2c0137bf9 /src/journal
parentjournal: increase concurrency of journal recorder (diff)
downloadceph-7b740f5b4ac1c66ac3c80782d2d34e846d00fddd.tar.xz
ceph-7b740f5b4ac1c66ac3c80782d2d34e846d00fddd.zip
journal: make librados call async in ObjectRecorder
Signed-off-by: Ricardo Dias <rdias@suse.com>
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/JournalMetadata.h4
-rw-r--r--src/journal/JournalRecorder.cc29
-rw-r--r--src/journal/JournalRecorder.h2
-rw-r--r--src/journal/ObjectRecorder.cc111
-rw-r--r--src/journal/ObjectRecorder.h15
5 files changed, 95 insertions, 66 deletions
diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h
index 880130126dd..d28710b3c15 100644
--- a/src/journal/JournalMetadata.h
+++ b/src/journal/JournalMetadata.h
@@ -98,6 +98,10 @@ public:
m_work_queue->queue(on_finish, r);
}
+ inline ContextWQ *get_work_queue() {
+ return m_work_queue;
+ }
+
inline SafeTimer &get_timer() {
return *m_timer;
}
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc
index 0113bd4a7b9..1b0e5704f71 100644
--- a/src/journal/JournalRecorder.cc
+++ b/src/journal/JournalRecorder.cc
@@ -61,10 +61,12 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
- m_object_locks.push_back(shared_ptr<Mutex>(new Mutex("ObjectRecorder::m_lock::"+
- std::to_string(splay_offset))));
+ m_object_locks.push_back(shared_ptr<Mutex>(
+ new Mutex("ObjectRecorder::m_lock::"+
+ std::to_string(splay_offset))));
uint64_t object_number = splay_offset + (m_current_set * splay_width);
- m_object_ptrs[splay_offset] = create_object_recorder(object_number,
+ m_object_ptrs[splay_offset] = create_object_recorder(
+ object_number,
m_object_locks[splay_offset]);
}
@@ -206,14 +208,17 @@ void JournalRecorder::open_object_set() {
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
- if (object_recorder->get_object_number() / splay_width != m_current_set) {
+ uint64_t object_number = object_recorder->get_object_number();
+ if (object_number / splay_width != m_current_set) {
assert(object_recorder->is_closed());
// ready to close object and open object in active set
- create_next_object_recorder(object_recorder);
+ create_next_object_recorder_unlock(object_recorder);
+ } else {
+ uint8_t splay_offset = object_number % splay_width;
+ m_object_locks[splay_offset]->Unlock();
}
}
- unlock_object_recorders();
}
bool JournalRecorder::close_object_set(uint64_t active_set) {
@@ -246,14 +251,14 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
uint64_t object_number, shared_ptr<Mutex> lock) {
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
- object_number, lock, m_journal_metadata->get_timer(),
- m_journal_metadata->get_timer_lock(), &m_object_handler,
- m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
- m_flush_age));
+ object_number, lock, m_journal_metadata->get_work_queue(),
+ m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
+ &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
+ m_flush_bytes, m_flush_age));
return object_recorder;
}
-void JournalRecorder::create_next_object_recorder(
+void JournalRecorder::create_next_object_recorder_unlock(
ObjectRecorderPtr object_recorder) {
assert(m_lock.is_locked());
@@ -279,7 +284,7 @@ void JournalRecorder::create_next_object_recorder(
new_object_recorder->get_object_number());
}
- new_object_recorder->append(append_buffers, false);
+ new_object_recorder->append_unlock(append_buffers);
m_object_ptrs[splay_offset] = new_object_recorder;
}
diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h
index acef0e4b179..7a4af52ee83 100644
--- a/src/journal/JournalRecorder.h
+++ b/src/journal/JournalRecorder.h
@@ -106,7 +106,7 @@ private:
ObjectRecorderPtr create_object_recorder(uint64_t object_number,
std::shared_ptr<Mutex> lock);
- void create_next_object_recorder(ObjectRecorderPtr object_recorder);
+ void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
void handle_update();
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index c12ecff02c6..cbd3842da39 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -19,17 +19,18 @@ namespace journal {
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, shared_ptr<Mutex> lock,
- SafeTimer &timer, Mutex &timer_lock,
- Handler *handler, uint8_t order,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age)
+ ContextWQ *work_queue, SafeTimer &timer,
+ Mutex &timer_lock, Handler *handler,
+ uint8_t order, uint32_t flush_interval,
+ uint64_t flush_bytes, double flush_age)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
- m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
- m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
- m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
- m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
- m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0),
- m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
+ m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
+ m_timer_lock(timer_lock), m_handler(handler), m_order(order),
+ m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
+ m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
+ m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0),
+ m_size(0), m_overflowed(false), m_object_closed(false),
+ m_in_flight_flushes(false), m_aio_scheduled(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
assert(m_handler != NULL);
@@ -40,51 +41,39 @@ ObjectRecorder::~ObjectRecorder() {
assert(m_append_buffers.empty());
assert(m_in_flight_tids.empty());
assert(m_in_flight_appends.empty());
+ assert(!m_aio_scheduled);
}
bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
- return append(append_buffers, true);
-}
-
-bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) {
assert(m_lock->is_locked());
FutureImplPtr last_flushed_future;
bool schedule_append = false;
- {
- if (m_overflowed) {
- m_append_buffers.insert(m_append_buffers.end(),
- append_buffers.begin(), append_buffers.end());
- if (unlock) {
- m_lock->Unlock();
- }
- return false;
- }
-
- for (AppendBuffers::const_iterator iter = append_buffers.begin();
- iter != append_buffers.end(); ++iter) {
- if (append(*iter, &schedule_append)) {
- last_flushed_future = iter->first;
- }
- }
+
+ if (m_overflowed) {
+ m_append_buffers.insert(m_append_buffers.end(),
+ append_buffers.begin(), append_buffers.end());
+ m_lock->Unlock();
+ return false;
+ }
- if (unlock) {
- m_lock->Unlock();
+ for (AppendBuffers::const_iterator iter = append_buffers.begin();
+ iter != append_buffers.end(); ++iter) {
+ if (append(*iter, &schedule_append)) {
+ last_flushed_future = iter->first;
}
}
if (last_flushed_future) {
- if (unlock) {
- m_lock->Lock();
- }
flush(last_flushed_future);
- if (unlock) {
- m_lock->Unlock();
- }
- } else if (schedule_append) {
- schedule_append_task();
+ m_lock->Unlock();
} else {
- cancel_append_task();
+ m_lock->Unlock();
+ if (schedule_append) {
+ schedule_append_task();
+ } else {
+ cancel_append_task();
+ }
}
return (!m_object_closed && !m_overflowed &&
m_size + m_pending_bytes >= m_soft_max_size);
@@ -180,7 +169,7 @@ bool ObjectRecorder::close() {
assert(!m_object_closed);
m_object_closed = true;
- return m_in_flight_tids.empty();
+ return m_in_flight_tids.empty() && !m_aio_scheduled;
}
void ObjectRecorder::handle_append_task() {
@@ -268,7 +257,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
}
// notify of overflow once all in-flight ops are complete
- if (m_in_flight_tids.empty()) {
+ if (m_in_flight_tids.empty() && !m_aio_scheduled) {
notify_handler();
}
return;
@@ -279,7 +268,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
assert(!append_buffers.empty());
m_in_flight_appends.erase(iter);
- if (m_in_flight_appends.empty() && m_object_closed) {
+ if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler();
}
@@ -331,6 +320,32 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
assert(m_lock->is_locked());
assert(!append_buffers->empty());
+ for (AppendBuffers::iterator it = append_buffers->begin();
+ it != append_buffers->end(); ++it) {
+ ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
+ << dendl;
+ it->first->set_flush_in_progress();
+ m_size += it->second.length();
+ }
+
+ m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
+ append_buffers->begin(), append_buffers->end());
+ if (!m_aio_scheduled) {
+ m_op_work_queue->queue(new FunctionContext([this] (int r) {
+ send_appends_aio();
+ }));
+ m_aio_scheduled = true;
+ }
+}
+
+void ObjectRecorder::send_appends_aio() {
+ Mutex::Locker locker(*m_lock);
+
+ m_aio_scheduled = false;
+
+ AppendBuffers append_buffers;
+ m_pending_buffers.swap(append_buffers);
+
uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
@@ -339,17 +354,15 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);
- for (AppendBuffers::iterator it = append_buffers->begin();
- it != append_buffers->end(); ++it) {
+ for (AppendBuffers::iterator it = append_buffers.begin();
+ it != append_buffers.end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
- it->first->set_flush_in_progress();
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
- m_size += it->second.length();
}
m_in_flight_tids.insert(append_tid);
- m_in_flight_appends[append_tid].swap(*append_buffers);
+ m_in_flight_appends[append_tid].swap(append_buffers);
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(append_flush, NULL,
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
index 7d93ca26c6f..f9cf1065786 100644
--- a/src/journal/ObjectRecorder.h
+++ b/src/journal/ObjectRecorder.h
@@ -9,6 +9,7 @@
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
#include "journal/FutureImpl.h"
#include <list>
#include <map>
@@ -38,9 +39,9 @@ public:
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, std::shared_ptr<Mutex> lock,
- SafeTimer &timer, Mutex &timer_lock, Handler *handler,
- uint8_t order, uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age);
+ ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
+ Handler *handler, uint8_t order, uint32_t flush_interval,
+ uint64_t flush_bytes, double flush_age);
~ObjectRecorder();
inline uint64_t get_object_number() const {
@@ -51,7 +52,6 @@ public:
}
bool append_unlock(const AppendBuffers &append_buffers);
- bool append(const AppendBuffers &append_buffers, bool unlock);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);
@@ -86,6 +86,7 @@ private:
object_recorder->put();
}
virtual void flush(const FutureImplPtr &future) {
+ Mutex::Locker locker(*(object_recorder->m_lock));
object_recorder->flush(future);
}
};
@@ -115,6 +116,8 @@ private:
uint64_t m_object_number;
CephContext *m_cct;
+ ContextWQ *m_op_work_queue;
+
SafeTimer &m_timer;
Mutex &m_timer_lock;
@@ -147,6 +150,9 @@ private:
bool m_in_flight_flushes;
Cond m_in_flight_flushes_cond;
+ AppendBuffers m_pending_buffers;
+ bool m_aio_scheduled;
+
void handle_append_task();
void cancel_append_task();
void schedule_append_task();
@@ -156,6 +162,7 @@ private:
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed(uint64_t tid);
void send_appends(AppendBuffers *append_buffers);
+ void send_appends_aio();
void notify_handler();
};