summaryrefslogtreecommitdiffstats
path: root/src/journal
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2016-09-28 14:35:36 +0200
committerJason Dillaman <dillaman@redhat.com>2016-10-05 03:31:54 +0200
commitdc77a629ed353d586b63f0bd8e20f54a7595afba (patch)
tree53cbe8a71e01fb40bf0e5509a8bd6fdfe58a3cd5 /src/journal
parentMerge pull request #10223 from mikulely/doc-s3-static-website-support (diff)
downloadceph-dc77a629ed353d586b63f0bd8e20f54a7595afba.tar.xz
ceph-dc77a629ed353d586b63f0bd8e20f54a7595afba.zip
journal: avoid holding lock while sending journal append
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/JournalRecorder.cc8
-rw-r--r--src/journal/ObjectRecorder.cc49
-rw-r--r--src/journal/ObjectRecorder.h2
3 files changed, 38 insertions, 21 deletions
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc
index 1b0e5704f71..1917165008f 100644
--- a/src/journal/JournalRecorder.cc
+++ b/src/journal/JournalRecorder.cc
@@ -105,10 +105,7 @@ Future JournalRecorder::append(uint64_t tag_tid,
entry_bl);
assert(entry_bl.length() <= m_journal_metadata->get_object_size());
- AppendBuffers append_buffers;
- append_buffers.push_back(std::make_pair(future, entry_bl));
- bool object_full = object_ptr->append_unlock(append_buffers);
-
+ bool object_full = object_ptr->append_unlock({{future, entry_bl}});
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
@@ -284,8 +281,7 @@ void JournalRecorder::create_next_object_recorder_unlock(
new_object_recorder->get_object_number());
}
- new_object_recorder->append_unlock(append_buffers);
-
+ new_object_recorder->append_unlock(std::move(append_buffers));
m_object_ptrs[splay_offset] = new_object_recorder;
}
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index cbd3842da39..1079b0a8db7 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -44,12 +44,12 @@ ObjectRecorder::~ObjectRecorder() {
assert(!m_aio_scheduled);
}
-bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
+bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
assert(m_lock->is_locked());
FutureImplPtr last_flushed_future;
bool schedule_append = false;
-
+
if (m_overflowed) {
m_append_buffers.insert(m_append_buffers.end(),
append_buffers.begin(), append_buffers.end());
@@ -339,37 +339,58 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
}
void ObjectRecorder::send_appends_aio() {
- Mutex::Locker locker(*m_lock);
-
- m_aio_scheduled = false;
+ AppendBuffers *append_buffers;
+ uint64_t append_tid;
+ {
+ Mutex::Locker locker(*m_lock);
+ append_tid = m_append_tid++;
+ m_in_flight_tids.insert(append_tid);
- AppendBuffers append_buffers;
- m_pending_buffers.swap(append_buffers);
+ // safe to hold pointer outside lock until op is submitted
+ append_buffers = &m_in_flight_appends[append_tid];
+ append_buffers->swap(m_pending_buffers);
+ }
- uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
+ C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);
librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);
-
- for (AppendBuffers::iterator it = append_buffers.begin();
- it != append_buffers.end(); ++it) {
+ for (AppendBuffers::iterator it = append_buffers->begin();
+ it != append_buffers->end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
}
- m_in_flight_tids.insert(append_tid);
- m_in_flight_appends[append_tid].swap(append_buffers);
librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(append_flush, NULL,
+ librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
assert(r == 0);
rados_completion->release();
+
+ {
+ Mutex::Locker locker(*m_lock);
+ if (m_pending_buffers.empty()) {
+ m_aio_scheduled = false;
+ if (m_in_flight_appends.empty() && m_object_closed) {
+ // all remaining unsent appends should be redirected to new object
+ notify_handler();
+ }
+ } else {
+ // additional pending items -- reschedule
+ m_op_work_queue->queue(new FunctionContext([this] (int r) {
+ send_appends_aio();
+ }));
+ }
+ }
+
+ // allow append op to complete
+ gather_ctx->activate();
}
void ObjectRecorder::notify_handler() {
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
index f9cf1065786..9b285f81908 100644
--- a/src/journal/ObjectRecorder.h
+++ b/src/journal/ObjectRecorder.h
@@ -51,7 +51,7 @@ public:
return m_oid;
}
- bool append_unlock(const AppendBuffers &append_buffers);
+ bool append_unlock(AppendBuffers &&append_buffers);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);