summaryrefslogtreecommitdiffstats
path: root/src/journal
diff options
context:
space:
mode:
authorMykola Golub <mgolub@suse.com>2019-06-20 18:02:14 +0200
committerGitHub <noreply@github.com>2019-06-20 18:02:14 +0200
commit961df6356a85311bb18acb193c6a28325a7248da (patch)
tree733f2b1cce0bae3bff29a93ed9f0c6acd018fb9a /src/journal
parentMerge pull request #28655 from dengchj/multisite_doc (diff)
parentlibrbd: tweaks to improve throughput for journaled IO (diff)
downloadceph-961df6356a85311bb18acb193c6a28325a7248da.tar.xz
ceph-961df6356a85311bb18acb193c6a28325a7248da.zip
Merge pull request #28539 from dillaman/wip-40072
librbd: improve journal performance to match expected degradation Reviewed-by: Mykola Golub <mgolub@suse.com>
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/JournalRecorder.cc112
-rw-r--r--src/journal/JournalRecorder.h15
-rw-r--r--src/journal/Journaler.cc13
-rw-r--r--src/journal/Journaler.h5
-rw-r--r--src/journal/ObjectRecorder.cc422
-rw-r--r--src/journal/ObjectRecorder.h50
6 files changed, 274 insertions, 343 deletions
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc
index d4b023c3d5e..aa90660a01f 100644
--- a/src/journal/JournalRecorder.cc
+++ b/src/journal/JournalRecorder.cc
@@ -10,7 +10,8 @@
#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
-#define dout_prefix *_dout << "JournalRecorder: " << this << " "
+#define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \
+ << ": "
using std::shared_ptr;
@@ -49,12 +50,9 @@ struct C_Flush : public Context {
JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age,
uint64_t max_in_flight_appends)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
- m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+ m_journal_metadata(journal_metadata),
m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
m_current_set(m_journal_metadata->get_active_set()) {
@@ -65,13 +63,14 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
- m_object_locks.push_back(shared_ptr<Mutex>(
- new Mutex("ObjectRecorder::m_lock::"+
- std::to_string(splay_offset))));
+ shared_ptr<Mutex> object_lock(new Mutex(
+ "ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
+ m_object_locks.push_back(object_lock);
+
uint64_t object_number = splay_offset + (m_current_set * splay_width);
+ Mutex::Locker locker(*object_lock);
m_object_ptrs[splay_offset] = create_object_recorder(
- object_number,
- m_object_locks[splay_offset]);
+ object_number, m_object_locks[splay_offset]);
}
m_journal_metadata->add_listener(&m_listener);
@@ -108,8 +107,30 @@ void JournalRecorder::shut_down(Context *on_safe) {
flush(on_safe);
}
+void JournalRecorder::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+ << "flush_bytes=" << flush_bytes << ", "
+ << "flush_age=" << flush_age << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ Mutex::Locker object_locker(*m_object_locks[splay_offset]);
+ auto object_recorder = get_object(splay_offset);
+ object_recorder->set_append_batch_options(flush_interval, flush_bytes,
+ flush_age);
+ }
+}
+
Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
+ ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
m_lock.Lock();
@@ -132,7 +153,9 @@ Future JournalRecorder::append(uint64_t tag_tid,
entry_bl);
ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
- bool object_full = object_ptr->append_unlock({{future, entry_bl}});
+ bool object_full = object_ptr->append({{future, entry_bl}});
+ m_object_locks[splay_offset]->Unlock();
+
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
@@ -143,6 +166,8 @@ Future JournalRecorder::append(uint64_t tag_tid,
}
void JournalRecorder::flush(Context *on_safe) {
+ ldout(m_cct, 20) << dendl;
+
C_Flush *ctx;
{
Mutex::Locker locker(m_lock);
@@ -172,7 +197,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
// entry overflow from open object
if (m_current_set != object_set) {
- ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
+ ldout(m_cct, 20) << "close already in-progress" << dendl;
return;
}
@@ -186,8 +211,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
++m_current_set;
++m_in_flight_advance_sets;
- ldout(m_cct, 20) << __func__ << ": closing active object set "
- << object_set << dendl;
+ ldout(m_cct, 10) << "closing active object set " << object_set << dendl;
if (close_object_set(m_current_set)) {
advance_object_set();
}
@@ -197,8 +221,7 @@ void JournalRecorder::advance_object_set() {
ceph_assert(m_lock.is_locked());
ceph_assert(m_in_flight_object_closes == 0);
- ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
- << dendl;
+ ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl;
m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
this));
}
@@ -213,8 +236,8 @@ void JournalRecorder::handle_advance_object_set(int r) {
--m_in_flight_advance_sets;
if (r < 0 && r != -ESTALE) {
- lderr(m_cct) << __func__ << ": failed to advance object set: "
- << cpp_strerror(r) << dendl;
+ lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r)
+ << dendl;
}
if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
@@ -230,8 +253,7 @@ void JournalRecorder::handle_advance_object_set(int r) {
void JournalRecorder::open_object_set() {
ceph_assert(m_lock.is_locked());
- ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
- << dendl;
+ ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;
uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -244,15 +266,14 @@ void JournalRecorder::open_object_set() {
ceph_assert(object_recorder->is_closed());
// ready to close object and open object in active set
- create_next_object_recorder_unlock(object_recorder);
- } else {
- uint8_t splay_offset = object_number % splay_width;
- m_object_locks[splay_offset]->Unlock();
+ create_next_object_recorder(object_recorder);
}
}
+ unlock_object_recorders();
}
bool JournalRecorder::close_object_set(uint64_t active_set) {
+ ldout(m_cct, 10) << "active_set=" << active_set << dendl;
ceph_assert(m_lock.is_locked());
// object recorders will invoke overflow handler as they complete
@@ -263,14 +284,14 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
if (object_recorder->get_object_number() / splay_width != active_set) {
- ldout(m_cct, 10) << __func__ << ": closing object "
- << object_recorder->get_oid() << dendl;
+ ldout(m_cct, 10) << "closing object " << object_recorder->get_oid()
+ << dendl;
// flush out all queued appends and hold future appends
if (!object_recorder->close()) {
++m_in_flight_object_closes;
} else {
- ldout(m_cct, 20) << __func__ << ": object "
- << object_recorder->get_oid() << " closed" << dendl;
+ ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed"
+ << dendl;
}
}
}
@@ -280,30 +301,32 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
ObjectRecorderPtr JournalRecorder::create_object_recorder(
uint64_t object_number, shared_ptr<Mutex> lock) {
+ ldout(m_cct, 10) << "object_number=" << object_number << dendl;
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
object_number, lock, m_journal_metadata->get_work_queue(),
- m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
- &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
- m_flush_bytes, m_flush_age, m_max_in_flight_appends));
+ &m_object_handler, m_journal_metadata->get_order(),
+ m_max_in_flight_appends));
+ object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
+ m_flush_age);
return object_recorder;
}
-void JournalRecorder::create_next_object_recorder_unlock(
+void JournalRecorder::create_next_object_recorder(
ObjectRecorderPtr object_recorder) {
ceph_assert(m_lock.is_locked());
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
+ ldout(m_cct, 10) << "object_number=" << object_number << dendl;
ceph_assert(m_object_locks[splay_offset]->is_locked());
ObjectRecorderPtr new_object_recorder = create_object_recorder(
(m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
- ldout(m_cct, 10) << __func__ << ": "
- << "old oid=" << object_recorder->get_oid() << ", "
+ ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
<< "new oid=" << new_object_recorder->get_oid() << dendl;
AppendBuffers append_buffers;
object_recorder->claim_append_buffers(&append_buffers);
@@ -315,7 +338,7 @@ void JournalRecorder::create_next_object_recorder_unlock(
new_object_recorder->get_object_number());
}
- new_object_recorder->append_unlock(std::move(append_buffers));
+ new_object_recorder->append(std::move(append_buffers));
m_object_ptrs[splay_offset] = new_object_recorder;
}
@@ -325,15 +348,13 @@ void JournalRecorder::handle_update() {
uint64_t active_set = m_journal_metadata->get_active_set();
if (m_current_set < active_set) {
// peer journal client advanced the active set
- ldout(m_cct, 20) << __func__ << ": "
- << "current_set=" << m_current_set << ", "
+ ldout(m_cct, 10) << "current_set=" << m_current_set << ", "
<< "active_set=" << active_set << dendl;
uint64_t current_set = m_current_set;
m_current_set = active_set;
if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
- ldout(m_cct, 20) << __func__ << ": closing current object set "
- << current_set << dendl;
+ ldout(m_cct, 10) << "closing current object set " << current_set << dendl;
if (close_object_set(active_set)) {
open_object_set();
}
@@ -342,7 +363,7 @@ void JournalRecorder::handle_update() {
}
void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
- ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+ ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
Mutex::Locker locker(m_lock);
@@ -356,8 +377,8 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
--m_in_flight_object_closes;
// object closed after advance active set committed
- ldout(m_cct, 20) << __func__ << ": object "
- << active_object_recorder->get_oid() << " closed" << dendl;
+ ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
+ << " closed" << dendl;
if (m_in_flight_object_closes == 0) {
if (m_in_flight_advance_sets == 0) {
// peer forced closing of object set
@@ -370,7 +391,7 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
}
void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
- ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+ ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
Mutex::Locker locker(m_lock);
@@ -380,9 +401,8 @@ void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
ceph_assert(active_object_recorder->get_object_number() == object_number);
- ldout(m_cct, 20) << __func__ << ": object "
- << active_object_recorder->get_oid() << " overflowed"
- << dendl;
+ ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
+ << " overflowed" << dendl;
close_and_advance_object_set(object_number / splay_width);
}
diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h
index bdeaab58a81..382f75acef9 100644
--- a/src/journal/JournalRecorder.h
+++ b/src/journal/JournalRecorder.h
@@ -23,11 +23,14 @@ class JournalRecorder {
public:
JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
const JournalMetadataPtr &journal_metadata,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age, uint64_t max_in_flight_appends);
+ uint64_t max_in_flight_appends);
~JournalRecorder();
void shut_down(Context *on_safe);
+
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
@@ -79,9 +82,9 @@ private:
JournalMetadataPtr m_journal_metadata;
- uint32_t m_flush_interval;
- uint64_t m_flush_bytes;
- double m_flush_age;
+ uint32_t m_flush_interval = 0;
+ uint64_t m_flush_bytes = 0;
+ double m_flush_age = 0;
uint64_t m_max_in_flight_appends;
Listener m_listener;
@@ -109,7 +112,7 @@ private:
ObjectRecorderPtr create_object_recorder(uint64_t object_number,
std::shared_ptr<Mutex> lock);
- void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
+ void create_next_object_recorder(ObjectRecorderPtr object_recorder);
void handle_update();
diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc
index 34bd30e290f..7d9c1409584 100644
--- a/src/journal/Journaler.cc
+++ b/src/journal/Journaler.cc
@@ -393,15 +393,20 @@ void Journaler::committed(const Future &future) {
m_trimmer->committed(future_impl->get_commit_tid());
}
-void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
- double flush_age, uint64_t max_in_flight_appends) {
+void Journaler::start_append(uint64_t max_in_flight_appends) {
ceph_assert(m_recorder == nullptr);
// TODO verify active object set >= current replay object set
m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
- m_metadata, flush_interval, flush_bytes,
- flush_age, max_in_flight_appends);
+ m_metadata, max_in_flight_appends);
+}
+
+void Journaler::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ceph_assert(m_recorder != nullptr);
+ m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age);
}
void Journaler::stop_append(Context *on_safe) {
diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h
index 3f3987105e1..a063a6d43a0 100644
--- a/src/journal/Journaler.h
+++ b/src/journal/Journaler.h
@@ -110,8 +110,9 @@ public:
void stop_replay(Context *on_finish);
uint64_t get_max_append_size() const;
- void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
- uint64_t max_in_flight_appends);
+ void start_append(uint64_t max_in_flight_appends);
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush_append(Context *on_safe);
void stop_append(Context *on_safe);
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index e4049df6520..127731e95c3 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -10,7 +10,8 @@
#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
-#define dout_prefix *_dout << "ObjectRecorder: " << this << " "
+#define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
+ << __func__ << " (" << m_oid << "): "
using namespace cls::journal;
using std::shared_ptr;
@@ -19,72 +20,64 @@ namespace journal {
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, shared_ptr<Mutex> lock,
- ContextWQ *work_queue, SafeTimer &timer,
- Mutex &timer_lock, Handler *handler,
- uint8_t order, uint32_t flush_interval,
- uint64_t flush_bytes, double flush_age,
- uint64_t max_in_flight_appends)
+ ContextWQ *work_queue, Handler *handler,
+ uint8_t order, int32_t max_in_flight_appends)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
- m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
- m_timer_lock(timer_lock), m_handler(handler), m_order(order),
- m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+ m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
+ m_order(order), m_soft_max_size(1 << m_order),
m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
- m_lock(lock), m_append_tid(0), m_pending_bytes(0),
- m_size(0), m_overflowed(false), m_object_closed(false),
- m_in_flight_flushes(false), m_aio_scheduled(false) {
+ m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
+ m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
ceph_assert(m_handler != NULL);
+ ldout(m_cct, 20) << dendl;
}
ObjectRecorder::~ObjectRecorder() {
- ceph_assert(m_append_task == NULL);
- ceph_assert(m_append_buffers.empty());
+ ldout(m_cct, 20) << dendl;
+ ceph_assert(m_pending_buffers.empty());
ceph_assert(m_in_flight_tids.empty());
ceph_assert(m_in_flight_appends.empty());
- ceph_assert(!m_aio_scheduled);
}
-bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
+void ObjectRecorder::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+ << "flush_bytes=" << flush_bytes << ", "
+ << "flush_age=" << flush_age << dendl;
+
ceph_assert(m_lock->is_locked());
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+}
- FutureImplPtr last_flushed_future;
- bool schedule_append = false;
+bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
+ ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
- if (m_overflowed) {
- m_append_buffers.insert(m_append_buffers.end(),
- append_buffers.begin(), append_buffers.end());
- m_lock->Unlock();
- return false;
- }
+ ceph_assert(m_lock->is_locked());
- for (AppendBuffers::const_iterator iter = append_buffers.begin();
- iter != append_buffers.end(); ++iter) {
- if (append(*iter, &schedule_append)) {
- last_flushed_future = iter->first;
+ FutureImplPtr last_flushed_future;
+ for (auto& append_buffer : append_buffers) {
+ ldout(m_cct, 20) << *append_buffer.first << ", "
+ << "size=" << append_buffer.second.length() << dendl;
+ bool flush_requested = append_buffer.first->attach(&m_flush_handler);
+ if (flush_requested) {
+ last_flushed_future = append_buffer.first;
}
- }
- if (last_flushed_future) {
- flush(last_flushed_future);
- m_lock->Unlock();
- } else {
- m_lock->Unlock();
- if (schedule_append) {
- schedule_append_task();
- } else {
- cancel_append_task();
- }
+ m_pending_buffers.push_back(append_buffer);
+ m_pending_bytes += append_buffer.second.length();
}
- return (!m_object_closed && !m_overflowed &&
- m_size + m_pending_bytes >= m_soft_max_size);
+
+ return send_appends(!!last_flushed_future, last_flushed_future);
}
void ObjectRecorder::flush(Context *on_safe) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+ ldout(m_cct, 20) << dendl;
- cancel_append_task();
Future future;
{
Mutex::Locker locker(*m_lock);
@@ -97,11 +90,7 @@ void ObjectRecorder::flush(Context *on_safe) {
}
// attach the flush to the most recent append
- if (!m_append_buffers.empty()) {
- future = Future(m_append_buffers.rbegin()->first);
-
- flush_appends(true);
- } else if (!m_pending_buffers.empty()) {
+ if (!m_pending_buffers.empty()) {
future = Future(m_pending_buffers.rbegin()->first);
} else if (!m_in_flight_appends.empty()) {
AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
@@ -111,143 +100,68 @@ void ObjectRecorder::flush(Context *on_safe) {
}
if (future.is_valid()) {
- future.flush(on_safe);
+ // cannot be invoked while the same lock context
+ m_op_work_queue->queue(new FunctionContext(
+ [future, on_safe] (int r) mutable {
+ future.flush(on_safe);
+ }));
} else {
on_safe->complete(0);
}
}
void ObjectRecorder::flush(const FutureImplPtr &future) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
- << dendl;
-
- ceph_assert(m_lock->is_locked());
+ ldout(m_cct, 20) << "flushing " << *future << dendl;
+ m_lock->Lock();
if (future->get_flush_handler().get() != &m_flush_handler) {
// if we don't own this future, re-issue the flush so that it hits the
// correct journal object owner
future->flush();
+ m_lock->Unlock();
return;
} else if (future->is_flush_in_progress()) {
+ m_lock->Unlock();
return;
}
- if (m_object_closed || m_overflowed) {
- return;
- }
-
- AppendBuffers::reverse_iterator r_it;
- for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
- ++r_it) {
- if (r_it->first == future) {
- break;
- }
+ bool overflowed = send_appends(true, future);
+ if (overflowed) {
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
}
- ceph_assert(r_it != m_append_buffers.rend());
-
- auto it = (++r_it).base();
- ceph_assert(it != m_append_buffers.end());
- ++it;
-
- AppendBuffers flush_buffers;
- flush_buffers.splice(flush_buffers.end(), m_append_buffers,
- m_append_buffers.begin(), it);
- send_appends(&flush_buffers);
}
void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+ ldout(m_cct, 20) << dendl;
ceph_assert(m_lock->is_locked());
ceph_assert(m_in_flight_tids.empty());
ceph_assert(m_in_flight_appends.empty());
ceph_assert(m_object_closed || m_overflowed);
- append_buffers->splice(append_buffers->end(), m_append_buffers,
- m_append_buffers.begin(), m_append_buffers.end());
+
+ for (auto& append_buffer : m_pending_buffers) {
+ ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl;
+ append_buffer.first->detach();
+ }
+ append_buffers->splice(append_buffers->end(), m_pending_buffers,
+ m_pending_buffers.begin(), m_pending_buffers.end());
}
bool ObjectRecorder::close() {
ceph_assert(m_lock->is_locked());
- ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
-
- cancel_append_task();
-
- flush_appends(true);
+ ldout(m_cct, 20) << dendl;
+ send_appends(true, {});
ceph_assert(!m_object_closed);
m_object_closed = true;
- return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled);
-}
-
-void ObjectRecorder::handle_append_task() {
- ceph_assert(m_timer_lock.is_locked());
- m_append_task = NULL;
-
- Mutex::Locker locker(*m_lock);
- flush_appends(true);
-}
-
-void ObjectRecorder::cancel_append_task() {
- Mutex::Locker locker(m_timer_lock);
- if (m_append_task != NULL) {
- m_timer.cancel_event(m_append_task);
- m_append_task = NULL;
- }
-}
-
-void ObjectRecorder::schedule_append_task() {
- Mutex::Locker locker(m_timer_lock);
- if (m_append_task == nullptr && m_flush_age > 0) {
- m_append_task = m_timer.add_event_after(
- m_flush_age, new FunctionContext([this](int) {
- handle_append_task();
- }));
- }
-}
-
-bool ObjectRecorder::append(const AppendBuffer &append_buffer,
- bool *schedule_append) {
- ceph_assert(m_lock->is_locked());
-
- bool flush_requested = false;
- if (!m_object_closed && !m_overflowed) {
- flush_requested = append_buffer.first->attach(&m_flush_handler);
- }
-
- m_append_buffers.push_back(append_buffer);
- m_pending_bytes += append_buffer.second.length();
-
- if (!flush_appends(false)) {
- *schedule_append = true;
- }
- return flush_requested;
-}
-
-bool ObjectRecorder::flush_appends(bool force) {
- ceph_assert(m_lock->is_locked());
- if (m_object_closed || m_overflowed) {
- return true;
- }
-
- if (m_append_buffers.empty() ||
- (!force &&
- m_size + m_pending_bytes < m_soft_max_size &&
- (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
- (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
- return false;
- }
-
- m_pending_bytes = 0;
- AppendBuffers append_buffers;
- append_buffers.swap(m_append_buffers);
- send_appends(&append_buffers);
- return true;
+ return (m_in_flight_tids.empty() && !m_in_flight_flushes);
}
void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
- ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
- << ", r=" << r << dendl;
+ ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
AppendBuffers append_buffers;
{
@@ -257,17 +171,14 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
m_in_flight_tids.erase(tid_iter);
InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
- if (r == -EOVERFLOW || m_overflowed) {
- if (iter != m_in_flight_appends.end()) {
- m_overflowed = true;
- } else {
- // must have seen an overflow on a previous append op
- ceph_assert(r == -EOVERFLOW && m_overflowed);
- }
+ ceph_assert(iter != m_in_flight_appends.end());
+
+ if (r == -EOVERFLOW) {
+ ldout(m_cct, 10) << "append overflowed" << dendl;
+ m_overflowed = true;
// notify of overflow once all in-flight ops are complete
- if (m_in_flight_tids.empty() && !m_aio_scheduled) {
- m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+ if (m_in_flight_tids.empty()) {
append_overflowed();
notify_handler_unlock();
} else {
@@ -276,21 +187,23 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
return;
}
- ceph_assert(iter != m_in_flight_appends.end());
append_buffers.swap(iter->second);
ceph_assert(!append_buffers.empty());
+ for (auto& append_buffer : append_buffers) {
+ m_object_bytes += append_buffer.second.length();
+ }
+ ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
+
m_in_flight_appends.erase(iter);
m_in_flight_flushes = true;
m_lock->Unlock();
}
// Flag the associated futures as complete.
- for (AppendBuffers::iterator buf_it = append_buffers.begin();
- buf_it != append_buffers.end(); ++buf_it) {
- ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe"
- << dendl;
- buf_it->first->safe(r);
+ for (auto& append_buffer : append_buffers) {
+ ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl;
+ append_buffer.first->safe(r);
}
// wake up any flush requests that raced with a RADOS callback
@@ -298,39 +211,25 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();
- if (!m_aio_scheduled) {
- if (m_in_flight_appends.empty() &&
- (m_object_closed || m_aio_sent_size >= m_soft_max_size)) {
- if (m_aio_sent_size >= m_soft_max_size) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid
- << " soft max size reached, notifying overflow"
- << dendl;
- m_overflowed = true;
- }
- // all remaining unsent appends should be redirected to new object
- m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+ if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
+ // all remaining unsent appends should be redirected to new object
+ notify_handler_unlock();
+ } else {
+ bool overflowed = send_appends(false, {});
+ if (overflowed) {
notify_handler_unlock();
- } else if (!m_pending_buffers.empty()) {
- m_aio_scheduled = true;
- m_lock->Unlock();
- send_appends_aio();
} else {
m_lock->Unlock();
}
- } else {
- m_lock->Unlock();
}
}
void ObjectRecorder::append_overflowed() {
- ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
- << dendl;
+ ldout(m_cct, 10) << dendl;
ceph_assert(m_lock->is_locked());
ceph_assert(!m_in_flight_appends.empty());
- cancel_append_task();
-
InFlightAppends in_flight_appends;
in_flight_appends.swap(m_in_flight_appends);
@@ -342,94 +241,109 @@ void ObjectRecorder::append_overflowed() {
}
restart_append_buffers.splice(restart_append_buffers.end(),
- m_append_buffers,
- m_append_buffers.begin(),
- m_append_buffers.end());
- restart_append_buffers.swap(m_append_buffers);
-
- for (AppendBuffers::const_iterator it = m_append_buffers.begin();
- it != m_append_buffers.end(); ++it) {
- ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
- << dendl;
- it->first->detach();
- }
+ m_pending_buffers,
+ m_pending_buffers.begin(),
+ m_pending_buffers.end());
+ restart_append_buffers.swap(m_pending_buffers);
}
-void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
+bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
+ ldout(m_cct, 20) << dendl;
+
ceph_assert(m_lock->is_locked());
- ceph_assert(!append_buffers->empty());
-
- for (AppendBuffers::iterator it = append_buffers->begin();
- it != append_buffers->end(); ++it) {
- ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
- << dendl;
- it->first->set_flush_in_progress();
- m_size += it->second.length();
+ if (m_object_closed || m_overflowed) {
+ ldout(m_cct, 20) << "already closed or overflowed" << dendl;
+ return false;
}
- m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
- append_buffers->begin(), append_buffers->end());
- if (!m_aio_scheduled) {
- m_op_work_queue->queue(new FunctionContext([this] (int r) {
- send_appends_aio();
- }));
- m_aio_scheduled = true;
+ if (m_pending_buffers.empty()) {
+ ldout(m_cct, 20) << "append buffers empty" << dendl;
+ return false;
}
-}
-void ObjectRecorder::send_appends_aio() {
- librados::AioCompletion *rados_completion;
- {
- Mutex::Locker locker(*m_lock);
- m_aio_scheduled = false;
+ if (!force &&
+ ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
+ (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
+ (m_flush_age > 0 &&
+ m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
+ ldout(m_cct, 20) << "forcing batch flush" << dendl;
+ force = true;
+ }
- if (m_pending_buffers.empty()) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty"
- << dendl;
- return;
+ auto max_in_flight_appends = m_max_in_flight_appends;
+ if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
+ if (!force && max_in_flight_appends == 0) {
+ ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl;
+ max_in_flight_appends = 1;
}
+ } else if (max_in_flight_appends < 0) {
+ max_in_flight_appends = 0;
+ }
- if (m_max_in_flight_appends != 0 &&
- m_in_flight_tids.size() >= m_max_in_flight_appends) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid
- << " max in flight appends reached" << dendl;
- return;
+ if (!force && max_in_flight_appends != 0 &&
+ static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) {
+ ldout(m_cct, 10) << "max in flight appends reached" << dendl;
+ return false;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::guard_append(&op, m_soft_max_size);
+
+ size_t append_bytes = 0;
+ AppendBuffers append_buffers;
+ for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
+ auto& future = it->first;
+ auto& bl = it->second;
+ auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
+ if (size == m_soft_max_size) {
+ ldout(m_cct, 10) << "object at capacity " << *future << dendl;
+ m_overflowed = true;
+ } else if (size > m_soft_max_size) {
+ ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
+ m_overflowed = true;
+ break;
}
- if (m_aio_sent_size >= m_soft_max_size) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid
- << " soft max size reached" << dendl;
- return;
+ bool flush_break = (force && flush_future && flush_future == future);
+ ldout(m_cct, 20) << "flushing " << *future << dendl;
+ future->set_flush_in_progress();
+
+ op.append(bl);
+ op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ append_bytes += bl.length();
+ append_buffers.push_back(*it);
+ it = m_pending_buffers.erase(it);
+
+ if (flush_break) {
+ ldout(m_cct, 20) << "stopping at requested flush future" << dendl;
+ break;
}
+ }
+
+ if (append_bytes > 0) {
+ m_last_flush_time = ceph_clock_now();
uint64_t append_tid = m_append_tid++;
m_in_flight_tids.insert(append_tid);
+ m_in_flight_appends[append_tid].swap(append_buffers);
+ m_in_flight_bytes += append_bytes;
- ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
- << append_tid << dendl;
-
- librados::ObjectWriteOperation op;
- client::guard_append(&op, m_soft_max_size);
- auto append_buffers = &m_in_flight_appends[append_tid];
-
- for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
- ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl;
- op.append(it->second);
- op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
- m_aio_sent_size += it->second.length();
- append_buffers->push_back(*it);
- it = m_pending_buffers.erase(it);
- if (m_aio_sent_size >= m_soft_max_size) {
- break;
- }
- }
- rados_completion = librados::Rados::aio_create_completion(
- new C_AppendFlush(this, append_tid), nullptr,
- utils::rados_ctx_callback);
+ ceph_assert(m_pending_bytes >= append_bytes);
+ m_pending_bytes -= append_bytes;
+
+ auto rados_completion = librados::Rados::aio_create_completion(
+ new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
ceph_assert(r == 0);
+ rados_completion->release();
+ ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", "
+ << "append_bytes=" << append_bytes << ", "
+ << "in_flight_bytes=" << m_in_flight_bytes << ", "
+ << "pending_bytes=" << m_pending_bytes << dendl;
}
- rados_completion->release();
+
+ return m_overflowed;
}
void ObjectRecorder::notify_handler_unlock() {
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
index 8d250e5f042..ff00e0a0a1f 100644
--- a/src/journal/ObjectRecorder.h
+++ b/src/journal/ObjectRecorder.h
@@ -4,6 +4,7 @@
#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
#define CEPH_JOURNAL_OBJECT_RECORDER_H
+#include "include/utime.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/Cond.h"
@@ -39,12 +40,13 @@ public:
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, std::shared_ptr<Mutex> lock,
- ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
- Handler *handler, uint8_t order, uint32_t flush_interval,
- uint64_t flush_bytes, double flush_age,
- uint64_t max_in_flight_appends);
+ ContextWQ *work_queue, Handler *handler, uint8_t order,
+ int32_t max_in_flight_appends);
~ObjectRecorder() override;
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+
inline uint64_t get_object_number() const {
return m_object_number;
}
@@ -52,7 +54,7 @@ public:
return m_oid;
}
- bool append_unlock(AppendBuffers &&append_buffers);
+ bool append(AppendBuffers &&append_buffers);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);
@@ -70,7 +72,7 @@ public:
inline size_t get_pending_appends() const {
Mutex::Locker locker(*m_lock);
- return m_append_buffers.size();
+ return m_pending_buffers.size();
}
private:
@@ -87,7 +89,6 @@ private:
object_recorder->put();
}
void flush(const FutureImplPtr &future) override {
- Mutex::Locker locker(*(object_recorder->m_lock));
object_recorder->flush(future);
}
};
@@ -111,31 +112,28 @@ private:
ContextWQ *m_op_work_queue;
- SafeTimer &m_timer;
- Mutex &m_timer_lock;
-
Handler *m_handler;
uint8_t m_order;
uint64_t m_soft_max_size;
- uint32_t m_flush_interval;
- uint64_t m_flush_bytes;
- double m_flush_age;
- uint32_t m_max_in_flight_appends;
+ uint32_t m_flush_interval = 0;
+ uint64_t m_flush_bytes = 0;
+ double m_flush_age = 0;
+ int32_t m_max_in_flight_appends;
FlushHandler m_flush_handler;
- Context *m_append_task = nullptr;
-
mutable std::shared_ptr<Mutex> m_lock;
- AppendBuffers m_append_buffers;
+ AppendBuffers m_pending_buffers;
+ uint64_t m_pending_bytes = 0;
+ utime_t m_last_flush_time;
+
uint64_t m_append_tid;
- uint32_t m_pending_bytes;
InFlightTids m_in_flight_tids;
InFlightAppends m_in_flight_appends;
- uint64_t m_size;
+ uint64_t m_object_bytes = 0;
bool m_overflowed;
bool m_object_closed;
@@ -143,21 +141,11 @@ private:
bool m_in_flight_flushes;
Cond m_in_flight_flushes_cond;
+ uint64_t m_in_flight_bytes = 0;
- AppendBuffers m_pending_buffers;
- uint64_t m_aio_sent_size = 0;
- bool m_aio_scheduled;
-
- void handle_append_task();
- void cancel_append_task();
- void schedule_append_task();
-
- bool append(const AppendBuffer &append_buffer, bool *schedule_append);
- bool flush_appends(bool force);
+ bool send_appends(bool force, FutureImplPtr flush_sentinal);
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed();
- void send_appends(AppendBuffers *append_buffers);
- void send_appends_aio();
void notify_handler_unlock();
};