diff options
author | Mykola Golub <mgolub@suse.com> | 2019-06-20 18:02:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-20 18:02:14 +0200 |
commit | 961df6356a85311bb18acb193c6a28325a7248da (patch) | |
tree | 733f2b1cce0bae3bff29a93ed9f0c6acd018fb9a /src/journal | |
parent | Merge pull request #28655 from dengchj/multisite_doc (diff) | |
parent | librbd: tweaks to improve throughput for journaled IO (diff) | |
download | ceph-961df6356a85311bb18acb193c6a28325a7248da.tar.xz ceph-961df6356a85311bb18acb193c6a28325a7248da.zip |
Merge pull request #28539 from dillaman/wip-40072
librbd: improve journal performance to match expected degradation
Reviewed-by: Mykola Golub <mgolub@suse.com>
Diffstat (limited to 'src/journal')
-rw-r--r-- | src/journal/JournalRecorder.cc | 112 | ||||
-rw-r--r-- | src/journal/JournalRecorder.h | 15 | ||||
-rw-r--r-- | src/journal/Journaler.cc | 13 | ||||
-rw-r--r-- | src/journal/Journaler.h | 5 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.cc | 422 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.h | 50 |
6 files changed, 274 insertions, 343 deletions
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index d4b023c3d5e..aa90660a01f 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -10,7 +10,8 @@ #define dout_subsys ceph_subsys_journaler #undef dout_prefix -#define dout_prefix *_dout << "JournalRecorder: " << this << " " +#define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \ + << ": " using std::shared_ptr; @@ -49,12 +50,9 @@ struct C_Flush : public Context { JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr& journal_metadata, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), - m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), - m_flush_bytes(flush_bytes), m_flush_age(flush_age), + m_journal_metadata(journal_metadata), m_max_in_flight_appends(max_in_flight_appends), m_listener(this), m_object_handler(this), m_lock("JournalerRecorder::m_lock"), m_current_set(m_journal_metadata->get_active_set()) { @@ -65,13 +63,14 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - m_object_locks.push_back(shared_ptr<Mutex>( - new Mutex("ObjectRecorder::m_lock::"+ - std::to_string(splay_offset)))); + shared_ptr<Mutex> object_lock(new Mutex( + "ObjectRecorder::m_lock::" + std::to_string(splay_offset))); + m_object_locks.push_back(object_lock); + uint64_t object_number = splay_offset + (m_current_set * splay_width); + Mutex::Locker locker(*object_lock); m_object_ptrs[splay_offset] = create_object_recorder( - object_number, - m_object_locks[splay_offset]); + object_number, m_object_locks[splay_offset]); } m_journal_metadata->add_listener(&m_listener); @@ -108,8 +107,30 @@ void JournalRecorder::shut_down(Context *on_safe) { flush(on_safe); } +void JournalRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + + Mutex::Locker locker(m_lock); + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + Mutex::Locker object_locker(*m_object_locks[splay_offset]); + auto object_recorder = get_object(splay_offset); + object_recorder->set_append_batch_options(flush_interval, flush_bytes, + flush_age); + } +} + Future JournalRecorder::append(uint64_t tag_tid, const bufferlist &payload_bl) { + ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl; m_lock.Lock(); @@ -132,7 +153,9 @@ Future JournalRecorder::append(uint64_t tag_tid, entry_bl); ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size()); - bool object_full = object_ptr->append_unlock({{future, entry_bl}}); + bool object_full = object_ptr->append({{future, entry_bl}}); + m_object_locks[splay_offset]->Unlock(); + if (object_full) { ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" << dendl; @@ -143,6 +166,8 @@ Future JournalRecorder::append(uint64_t tag_tid, } void JournalRecorder::flush(Context *on_safe) { + ldout(m_cct, 20) << dendl; + C_Flush *ctx; { Mutex::Locker locker(m_lock); @@ -172,7 +197,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { // entry overflow from open object if (m_current_set != object_set) { - ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl; + ldout(m_cct, 20) << "close already in-progress" << dendl; return; } @@ -186,8 +211,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { ++m_current_set; ++m_in_flight_advance_sets; - ldout(m_cct, 20) << __func__ << ": closing active object set " - << object_set << dendl; + ldout(m_cct, 10) << "closing active object set " << object_set << dendl; if (close_object_set(m_current_set)) { advance_object_set(); } @@ -197,8 +221,7 @@ void JournalRecorder::advance_object_set() { ceph_assert(m_lock.is_locked()); ceph_assert(m_in_flight_object_closes == 0); - ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set - << dendl; + ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl; m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet( this)); } @@ -213,8 +236,8 @@ void JournalRecorder::handle_advance_object_set(int r) { --m_in_flight_advance_sets; if (r < 0 && r != -ESTALE) { - lderr(m_cct) << __func__ << ": failed to advance object set: " - << cpp_strerror(r) << dendl; + lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r) + << dendl; } if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { @@ -230,8 +253,7 @@ void JournalRecorder::handle_advance_object_set(int r) { void JournalRecorder::open_object_set() { ceph_assert(m_lock.is_locked()); - ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set - << dendl; + ldout(m_cct, 10) << "opening object set " << m_current_set << dendl; uint8_t splay_width = m_journal_metadata->get_splay_width(); @@ -244,15 +266,14 @@ void JournalRecorder::open_object_set() { ceph_assert(object_recorder->is_closed()); // ready to close object and open object in active set - create_next_object_recorder_unlock(object_recorder); - } else { - uint8_t splay_offset = object_number % splay_width; - m_object_locks[splay_offset]->Unlock(); + create_next_object_recorder(object_recorder); } } + unlock_object_recorders(); } bool JournalRecorder::close_object_set(uint64_t active_set) { + ldout(m_cct, 10) << "active_set=" << active_set << dendl; ceph_assert(m_lock.is_locked()); // object recorders will invoke overflow handler as they complete @@ -263,14 +284,14 @@ bool JournalRecorder::close_object_set(uint64_t active_set) { it != m_object_ptrs.end(); ++it) { ObjectRecorderPtr object_recorder = it->second; if (object_recorder->get_object_number() / splay_width != active_set) { - ldout(m_cct, 10) << __func__ << ": closing object " - << object_recorder->get_oid() << dendl; + ldout(m_cct, 10) << "closing object " << object_recorder->get_oid() + << dendl; // flush out all queued appends and hold future appends if (!object_recorder->close()) { ++m_in_flight_object_closes; } else { - ldout(m_cct, 20) << __func__ << ": object " - << object_recorder->get_oid() << " closed" << dendl; + ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed" + << dendl; } } } @@ -280,30 +301,32 @@ bool JournalRecorder::close_object_set(uint64_t active_set) { ObjectRecorderPtr JournalRecorder::create_object_recorder( uint64_t object_number, shared_ptr<Mutex> lock) { + ldout(m_cct, 10) << "object_number=" << object_number << dendl; ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), object_number, lock, m_journal_metadata->get_work_queue(), - m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), - &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, - m_flush_bytes, m_flush_age, m_max_in_flight_appends)); + &m_object_handler, m_journal_metadata->get_order(), + m_max_in_flight_appends)); + object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes, + m_flush_age); return object_recorder; } -void JournalRecorder::create_next_object_recorder_unlock( +void JournalRecorder::create_next_object_recorder( ObjectRecorderPtr object_recorder) { ceph_assert(m_lock.is_locked()); uint64_t object_number = object_recorder->get_object_number(); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_number % splay_width; + ldout(m_cct, 10) << "object_number=" << object_number << dendl; ceph_assert(m_object_locks[splay_offset]->is_locked()); ObjectRecorderPtr new_object_recorder = create_object_recorder( (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]); - ldout(m_cct, 10) << __func__ << ": " - << "old oid=" << object_recorder->get_oid() << ", " + ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", " << "new oid=" << new_object_recorder->get_oid() << dendl; AppendBuffers append_buffers; object_recorder->claim_append_buffers(&append_buffers); @@ -315,7 +338,7 @@ void JournalRecorder::create_next_object_recorder_unlock( new_object_recorder->get_object_number()); } - new_object_recorder->append_unlock(std::move(append_buffers)); + new_object_recorder->append(std::move(append_buffers)); m_object_ptrs[splay_offset] = new_object_recorder; } @@ -325,15 +348,13 @@ void JournalRecorder::handle_update() { uint64_t active_set = m_journal_metadata->get_active_set(); if (m_current_set < active_set) { // peer journal client advanced the active set - ldout(m_cct, 20) << __func__ << ": " - << "current_set=" << m_current_set << ", " + ldout(m_cct, 10) << "current_set=" << m_current_set << ", " << "active_set=" << active_set << dendl; uint64_t current_set = m_current_set; m_current_set = active_set; if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { - ldout(m_cct, 20) << __func__ << ": closing current object set " - << current_set << dendl; + ldout(m_cct, 10) << "closing current object set " << current_set << dendl; if (close_object_set(active_set)) { open_object_set(); } @@ -342,7 +363,7 @@ void JournalRecorder::handle_update() { } void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { - ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; + ldout(m_cct, 10) << object_recorder->get_oid() << dendl; Mutex::Locker locker(m_lock); @@ -356,8 +377,8 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { --m_in_flight_object_closes; // object closed after advance active set committed - ldout(m_cct, 20) << __func__ << ": object " - << active_object_recorder->get_oid() << " closed" << dendl; + ldout(m_cct, 10) << "object " << active_object_recorder->get_oid() + << " closed" << dendl; if (m_in_flight_object_closes == 0) { if (m_in_flight_advance_sets == 0) { // peer forced closing of object set @@ -370,7 +391,7 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { } void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { - ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; + ldout(m_cct, 10) << object_recorder->get_oid() << dendl; Mutex::Locker locker(m_lock); @@ -380,9 +401,8 @@ void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; ceph_assert(active_object_recorder->get_object_number() == object_number); - ldout(m_cct, 20) << __func__ << ": object " - << active_object_recorder->get_oid() << " overflowed" - << dendl; + ldout(m_cct, 10) << "object " << active_object_recorder->get_oid() + << " overflowed" << dendl; close_and_advance_object_set(object_number / splay_width); } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index bdeaab58a81..382f75acef9 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -23,11 +23,14 @@ class JournalRecorder { public: JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr &journal_metadata, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends); + uint64_t max_in_flight_appends); ~JournalRecorder(); void shut_down(Context *on_safe); + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + Future append(uint64_t tag_tid, const bufferlist &bl); void flush(Context *on_safe); @@ -79,9 +82,9 @@ private: JournalMetadataPtr m_journal_metadata; - uint32_t m_flush_interval; - uint64_t m_flush_bytes; - double m_flush_age; + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; uint64_t m_max_in_flight_appends; Listener m_listener; @@ -109,7 +112,7 @@ private: ObjectRecorderPtr create_object_recorder(uint64_t object_number, std::shared_ptr<Mutex> lock); - void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder); + void create_next_object_recorder(ObjectRecorderPtr object_recorder); void handle_update(); diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 34bd30e290f..7d9c1409584 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -393,15 +393,20 @@ void Journaler::committed(const Future &future) { m_trimmer->committed(future_impl->get_commit_tid()); } -void Journaler::start_append(int flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends) { +void Journaler::start_append(uint64_t max_in_flight_appends) { ceph_assert(m_recorder == nullptr); // TODO verify active object set >= current replay object set m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, - m_metadata, flush_interval, flush_bytes, - flush_age, max_in_flight_appends); + m_metadata, max_in_flight_appends); +} + +void Journaler::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ceph_assert(m_recorder != nullptr); + m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age); } void Journaler::stop_append(Context *on_safe) { diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 3f3987105e1..a063a6d43a0 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -110,8 +110,9 @@ public: void stop_replay(Context *on_finish); uint64_t get_max_append_size() const; - void start_append(int flush_interval, uint64_t flush_bytes, double flush_age, - uint64_t max_in_flight_appends); + void start_append(uint64_t max_in_flight_appends); + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); Future append(uint64_t tag_tid, const bufferlist &bl); void flush_append(Context *on_safe); void stop_append(Context *on_safe); diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index e4049df6520..127731e95c3 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -10,7 +10,8 @@ #define dout_subsys ceph_subsys_journaler #undef dout_prefix -#define dout_prefix *_dout << "ObjectRecorder: " << this << " " +#define dout_prefix *_dout << "ObjectRecorder: " << this << " " \ + << __func__ << " (" << m_oid << "): " using namespace cls::journal; using std::shared_ptr; @@ -19,72 +20,64 @@ namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, shared_ptr<Mutex> lock, - ContextWQ *work_queue, SafeTimer &timer, - Mutex &timer_lock, Handler *handler, - uint8_t order, uint32_t flush_interval, - uint64_t flush_bytes, double flush_age, - uint64_t max_in_flight_appends) + ContextWQ *work_queue, Handler *handler, + uint8_t order, int32_t max_in_flight_appends) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), - m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer), - m_timer_lock(timer_lock), m_handler(handler), m_order(order), - m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), - m_flush_bytes(flush_bytes), m_flush_age(flush_age), + m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler), + m_order(order), m_soft_max_size(1 << m_order), m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this), - m_lock(lock), m_append_tid(0), m_pending_bytes(0), - m_size(0), m_overflowed(false), m_object_closed(false), - m_in_flight_flushes(false), m_aio_scheduled(false) { + m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0), + m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); ceph_assert(m_handler != NULL); + ldout(m_cct, 20) << dendl; } ObjectRecorder::~ObjectRecorder() { - ceph_assert(m_append_task == NULL); - ceph_assert(m_append_buffers.empty()); + ldout(m_cct, 20) << dendl; + ceph_assert(m_pending_buffers.empty()); ceph_assert(m_in_flight_tids.empty()); ceph_assert(m_in_flight_appends.empty()); - ceph_assert(!m_aio_scheduled); } -bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) { +void ObjectRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + ceph_assert(m_lock->is_locked()); + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; +} - FutureImplPtr last_flushed_future; - bool schedule_append = false; +bool ObjectRecorder::append(AppendBuffers &&append_buffers) { + ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl; - if (m_overflowed) { - m_append_buffers.insert(m_append_buffers.end(), - append_buffers.begin(), append_buffers.end()); - m_lock->Unlock(); - return false; - } + ceph_assert(m_lock->is_locked()); - for (AppendBuffers::const_iterator iter = append_buffers.begin(); - iter != append_buffers.end(); ++iter) { - if (append(*iter, &schedule_append)) { - last_flushed_future = iter->first; + FutureImplPtr last_flushed_future; + for (auto& append_buffer : append_buffers) { + ldout(m_cct, 20) << *append_buffer.first << ", " + << "size=" << append_buffer.second.length() << dendl; + bool flush_requested = append_buffer.first->attach(&m_flush_handler); + if (flush_requested) { + last_flushed_future = append_buffer.first; } - } - if (last_flushed_future) { - flush(last_flushed_future); - m_lock->Unlock(); - } else { - m_lock->Unlock(); - if (schedule_append) { - schedule_append_task(); - } else { - cancel_append_task(); - } + m_pending_buffers.push_back(append_buffer); + m_pending_bytes += append_buffer.second.length(); } - return (!m_object_closed && !m_overflowed && - m_size + m_pending_bytes >= m_soft_max_size); + + return send_appends(!!last_flushed_future, last_flushed_future); } void ObjectRecorder::flush(Context *on_safe) { - ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; + ldout(m_cct, 20) << dendl; - cancel_append_task(); Future future; { Mutex::Locker locker(*m_lock); @@ -97,11 +90,7 @@ void ObjectRecorder::flush(Context *on_safe) { } // attach the flush to the most recent append - if (!m_append_buffers.empty()) { - future = Future(m_append_buffers.rbegin()->first); - - flush_appends(true); - } else if (!m_pending_buffers.empty()) { + if (!m_pending_buffers.empty()) { future = Future(m_pending_buffers.rbegin()->first); } else if (!m_in_flight_appends.empty()) { AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second; @@ -111,143 +100,68 @@ void ObjectRecorder::flush(Context *on_safe) { } if (future.is_valid()) { - future.flush(on_safe); + // cannot be invoked while the same lock context + m_op_work_queue->queue(new FunctionContext( + [future, on_safe] (int r) mutable { + future.flush(on_safe); + })); } else { on_safe->complete(0); } } void ObjectRecorder::flush(const FutureImplPtr &future) { - ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future - << dendl; - - ceph_assert(m_lock->is_locked()); + ldout(m_cct, 20) << "flushing " << *future << dendl; + m_lock->Lock(); if (future->get_flush_handler().get() != &m_flush_handler) { // if we don't own this future, re-issue the flush so that it hits the // correct journal object owner future->flush(); + m_lock->Unlock(); return; } else if (future->is_flush_in_progress()) { + m_lock->Unlock(); return; } - if (m_object_closed || m_overflowed) { - return; - } - - AppendBuffers::reverse_iterator r_it; - for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend(); - ++r_it) { - if (r_it->first == future) { - break; - } + bool overflowed = send_appends(true, future); + if (overflowed) { + notify_handler_unlock(); + } else { + m_lock->Unlock(); } - ceph_assert(r_it != m_append_buffers.rend()); - - auto it = (++r_it).base(); - ceph_assert(it != m_append_buffers.end()); - ++it; - - AppendBuffers flush_buffers; - flush_buffers.splice(flush_buffers.end(), m_append_buffers, - m_append_buffers.begin(), it); - send_appends(&flush_buffers); } void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { - ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; + ldout(m_cct, 20) << dendl; ceph_assert(m_lock->is_locked()); ceph_assert(m_in_flight_tids.empty()); ceph_assert(m_in_flight_appends.empty()); ceph_assert(m_object_closed || m_overflowed); - append_buffers->splice(append_buffers->end(), m_append_buffers, - m_append_buffers.begin(), m_append_buffers.end()); + + for (auto& append_buffer : m_pending_buffers) { + ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl; + append_buffer.first->detach(); + } + append_buffers->splice(append_buffers->end(), m_pending_buffers, + m_pending_buffers.begin(), m_pending_buffers.end()); } bool ObjectRecorder::close() { ceph_assert(m_lock->is_locked()); - ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; - - cancel_append_task(); - - flush_appends(true); + ldout(m_cct, 20) << dendl; + send_appends(true, {}); ceph_assert(!m_object_closed); m_object_closed = true; - return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled); -} - -void ObjectRecorder::handle_append_task() { - ceph_assert(m_timer_lock.is_locked()); - m_append_task = NULL; - - Mutex::Locker locker(*m_lock); - flush_appends(true); -} - -void ObjectRecorder::cancel_append_task() { - Mutex::Locker locker(m_timer_lock); - if (m_append_task != NULL) { - m_timer.cancel_event(m_append_task); - m_append_task = NULL; - } -} - -void ObjectRecorder::schedule_append_task() { - Mutex::Locker locker(m_timer_lock); - if (m_append_task == nullptr && m_flush_age > 0) { - m_append_task = m_timer.add_event_after( - m_flush_age, new FunctionContext([this](int) { - handle_append_task(); - })); - } -} - -bool ObjectRecorder::append(const AppendBuffer &append_buffer, - bool *schedule_append) { - ceph_assert(m_lock->is_locked()); - - bool flush_requested = false; - if (!m_object_closed && !m_overflowed) { - flush_requested = append_buffer.first->attach(&m_flush_handler); - } - - m_append_buffers.push_back(append_buffer); - m_pending_bytes += append_buffer.second.length(); - - if (!flush_appends(false)) { - *schedule_append = true; - } - return flush_requested; -} - -bool ObjectRecorder::flush_appends(bool force) { - ceph_assert(m_lock->is_locked()); - if (m_object_closed || m_overflowed) { - return true; - } - - if (m_append_buffers.empty() || - (!force && - m_size + m_pending_bytes < m_soft_max_size && - (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) && - (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) { - return false; - } - - m_pending_bytes = 0; - AppendBuffers append_buffers; - append_buffers.swap(m_append_buffers); - send_appends(&append_buffers); - return true; + return (m_in_flight_tids.empty() && !m_in_flight_flushes); } void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { - ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid - << ", r=" << r << dendl; + ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl; AppendBuffers append_buffers; { @@ -257,17 +171,14 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { m_in_flight_tids.erase(tid_iter); InFlightAppends::iterator iter = m_in_flight_appends.find(tid); - if (r == -EOVERFLOW || m_overflowed) { - if (iter != m_in_flight_appends.end()) { - m_overflowed = true; - } else { - // must have seen an overflow on a previous append op - ceph_assert(r == -EOVERFLOW && m_overflowed); - } + ceph_assert(iter != m_in_flight_appends.end()); + + if (r == -EOVERFLOW) { + ldout(m_cct, 10) << "append overflowed" << dendl; + m_overflowed = true; // notify of overflow once all in-flight ops are complete - if (m_in_flight_tids.empty() && !m_aio_scheduled) { - m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers); + if (m_in_flight_tids.empty()) { append_overflowed(); notify_handler_unlock(); } else { @@ -276,21 +187,23 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { return; } - ceph_assert(iter != m_in_flight_appends.end()); append_buffers.swap(iter->second); ceph_assert(!append_buffers.empty()); + for (auto& append_buffer : append_buffers) { + m_object_bytes += append_buffer.second.length(); + } + ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl; + m_in_flight_appends.erase(iter); m_in_flight_flushes = true; m_lock->Unlock(); } // Flag the associated futures as complete. - for (AppendBuffers::iterator buf_it = append_buffers.begin(); - buf_it != append_buffers.end(); ++buf_it) { - ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe" - << dendl; - buf_it->first->safe(r); + for (auto& append_buffer : append_buffers) { + ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl; + append_buffer.first->safe(r); } // wake up any flush requests that raced with a RADOS callback @@ -298,39 +211,25 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { m_in_flight_flushes = false; m_in_flight_flushes_cond.Signal(); - if (!m_aio_scheduled) { - if (m_in_flight_appends.empty() && - (m_object_closed || m_aio_sent_size >= m_soft_max_size)) { - if (m_aio_sent_size >= m_soft_max_size) { - ldout(m_cct, 20) << __func__ << ": " << m_oid - << " soft max size reached, notifying overflow" - << dendl; - m_overflowed = true; - } - // all remaining unsent appends should be redirected to new object - m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers); + if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) { + // all remaining unsent appends should be redirected to new object + notify_handler_unlock(); + } else { + bool overflowed = send_appends(false, {}); + if (overflowed) { notify_handler_unlock(); - } else if (!m_pending_buffers.empty()) { - m_aio_scheduled = true; - m_lock->Unlock(); - send_appends_aio(); } else { m_lock->Unlock(); } - } else { - m_lock->Unlock(); } } void ObjectRecorder::append_overflowed() { - ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed" - << dendl; + ldout(m_cct, 10) << dendl; ceph_assert(m_lock->is_locked()); ceph_assert(!m_in_flight_appends.empty()); - cancel_append_task(); - InFlightAppends in_flight_appends; in_flight_appends.swap(m_in_flight_appends); @@ -342,94 +241,109 @@ void ObjectRecorder::append_overflowed() { } restart_append_buffers.splice(restart_append_buffers.end(), - m_append_buffers, - m_append_buffers.begin(), - m_append_buffers.end()); - restart_append_buffers.swap(m_append_buffers); - - for (AppendBuffers::const_iterator it = m_append_buffers.begin(); - it != m_append_buffers.end(); ++it) { - ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first - << dendl; - it->first->detach(); - } + m_pending_buffers, + m_pending_buffers.begin(), + m_pending_buffers.end()); + restart_append_buffers.swap(m_pending_buffers); } -void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { +bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { + ldout(m_cct, 20) << dendl; + ceph_assert(m_lock->is_locked()); - ceph_assert(!append_buffers->empty()); - - for (AppendBuffers::iterator it = append_buffers->begin(); - it != append_buffers->end(); ++it) { - ldout(m_cct, 20) << __func__ << ": flushing " << *it->first - << dendl; - it->first->set_flush_in_progress(); - m_size += it->second.length(); + if (m_object_closed || m_overflowed) { + ldout(m_cct, 20) << "already closed or overflowed" << dendl; + return false; } - m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers, - append_buffers->begin(), append_buffers->end()); - if (!m_aio_scheduled) { - m_op_work_queue->queue(new FunctionContext([this] (int r) { - send_appends_aio(); - })); - m_aio_scheduled = true; + if (m_pending_buffers.empty()) { + ldout(m_cct, 20) << "append buffers empty" << dendl; + return false; } -} -void ObjectRecorder::send_appends_aio() { - librados::AioCompletion *rados_completion; - { - Mutex::Locker locker(*m_lock); - m_aio_scheduled = false; + if (!force && + ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) || + (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) || + (m_flush_age > 0 && + m_last_flush_time + m_flush_age >= ceph_clock_now()))) { + ldout(m_cct, 20) << "forcing batch flush" << dendl; + force = true; + } - if (m_pending_buffers.empty()) { - ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty" - << dendl; - return; + auto max_in_flight_appends = m_max_in_flight_appends; + if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) { + if (!force && max_in_flight_appends == 0) { + ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl; + max_in_flight_appends = 1; } + } else if (max_in_flight_appends < 0) { + max_in_flight_appends = 0; + } - if (m_max_in_flight_appends != 0 && - m_in_flight_tids.size() >= m_max_in_flight_appends) { - ldout(m_cct, 20) << __func__ << ": " << m_oid - << " max in flight appends reached" << dendl; - return; + if (!force && max_in_flight_appends != 0 && + static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) { + ldout(m_cct, 10) << "max in flight appends reached" << dendl; + return false; + } + + librados::ObjectWriteOperation op; + client::guard_append(&op, m_soft_max_size); + + size_t append_bytes = 0; + AppendBuffers append_buffers; + for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) { + auto& future = it->first; + auto& bl = it->second; + auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length(); + if (size == m_soft_max_size) { + ldout(m_cct, 10) << "object at capacity " << *future << dendl; + m_overflowed = true; + } else if (size > m_soft_max_size) { + ldout(m_cct, 10) << "object beyond capacity " << *future << dendl; + m_overflowed = true; + break; } - if (m_aio_sent_size >= m_soft_max_size) { - ldout(m_cct, 20) << __func__ << ": " << m_oid - << " soft max size reached" << dendl; - return; + bool flush_break = (force && flush_future && flush_future == future); + ldout(m_cct, 20) << "flushing " << *future << dendl; + future->set_flush_in_progress(); + + op.append(bl); + op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + + append_bytes += bl.length(); + append_buffers.push_back(*it); + it = m_pending_buffers.erase(it); + + if (flush_break) { + ldout(m_cct, 20) << "stopping at requested flush future" << dendl; + break; } + } + + if (append_bytes > 0) { + m_last_flush_time = ceph_clock_now(); uint64_t append_tid = m_append_tid++; m_in_flight_tids.insert(append_tid); + m_in_flight_appends[append_tid].swap(append_buffers); + m_in_flight_bytes += append_bytes; - ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid=" - << append_tid << dendl; - - librados::ObjectWriteOperation op; - client::guard_append(&op, m_soft_max_size); - auto append_buffers = &m_in_flight_appends[append_tid]; - - for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) { - ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl; - op.append(it->second); - op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); - m_aio_sent_size += it->second.length(); - append_buffers->push_back(*it); - it = m_pending_buffers.erase(it); - if (m_aio_sent_size >= m_soft_max_size) { - break; - } - } - rados_completion = librados::Rados::aio_create_completion( - new C_AppendFlush(this, append_tid), nullptr, - utils::rados_ctx_callback); + ceph_assert(m_pending_bytes >= append_bytes); + m_pending_bytes -= append_bytes; + + auto rados_completion = librados::Rados::aio_create_completion( + new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback); int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); ceph_assert(r == 0); + rados_completion->release(); + ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", " + << "append_bytes=" << append_bytes << ", " + << "in_flight_bytes=" << m_in_flight_bytes << ", " + << "pending_bytes=" << m_pending_bytes << dendl; } - rados_completion->release(); + + return m_overflowed; } void ObjectRecorder::notify_handler_unlock() { diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 8d250e5f042..ff00e0a0a1f 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -4,6 +4,7 @@ #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H #define CEPH_JOURNAL_OBJECT_RECORDER_H +#include "include/utime.h" #include "include/Context.h" #include "include/rados/librados.hpp" #include "common/Cond.h" @@ -39,12 +40,13 @@ public: ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, std::shared_ptr<Mutex> lock, - ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock, - Handler *handler, uint8_t order, uint32_t flush_interval, - uint64_t flush_bytes, double flush_age, - uint64_t max_in_flight_appends); + ContextWQ *work_queue, Handler *handler, uint8_t order, + int32_t max_in_flight_appends); ~ObjectRecorder() override; + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + inline uint64_t get_object_number() const { return m_object_number; } @@ -52,7 +54,7 @@ public: return m_oid; } - bool append_unlock(AppendBuffers &&append_buffers); + bool append(AppendBuffers &&append_buffers); void flush(Context *on_safe); void flush(const FutureImplPtr &future); @@ -70,7 +72,7 @@ public: inline size_t get_pending_appends() const { Mutex::Locker locker(*m_lock); - return m_append_buffers.size(); + return m_pending_buffers.size(); } private: @@ -87,7 +89,6 @@ private: object_recorder->put(); } void flush(const FutureImplPtr &future) override { - Mutex::Locker locker(*(object_recorder->m_lock)); object_recorder->flush(future); } }; @@ -111,31 +112,28 @@ private: ContextWQ *m_op_work_queue; - SafeTimer &m_timer; - Mutex &m_timer_lock; - Handler *m_handler; uint8_t m_order; uint64_t m_soft_max_size; - uint32_t m_flush_interval; - uint64_t m_flush_bytes; - double m_flush_age; - uint32_t m_max_in_flight_appends; + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; + int32_t m_max_in_flight_appends; FlushHandler m_flush_handler; - Context *m_append_task = nullptr; - mutable std::shared_ptr<Mutex> m_lock; - AppendBuffers m_append_buffers; + AppendBuffers m_pending_buffers; + uint64_t m_pending_bytes = 0; + utime_t m_last_flush_time; + uint64_t m_append_tid; - uint32_t m_pending_bytes; InFlightTids m_in_flight_tids; InFlightAppends m_in_flight_appends; - uint64_t m_size; + uint64_t m_object_bytes = 0; bool m_overflowed; bool m_object_closed; @@ -143,21 +141,11 @@ private: bool m_in_flight_flushes; Cond m_in_flight_flushes_cond; + uint64_t m_in_flight_bytes = 0; - AppendBuffers m_pending_buffers; - uint64_t m_aio_sent_size = 0; - bool m_aio_scheduled; - - void handle_append_task(); - void cancel_append_task(); - void schedule_append_task(); - - bool append(const AppendBuffer &append_buffer, bool *schedule_append); - bool flush_appends(bool force); + bool send_appends(bool force, FutureImplPtr flush_sentinal); void handle_append_flushed(uint64_t tid, int r); void append_overflowed(); - void send_appends(AppendBuffers *append_buffers); - void send_appends_aio(); void notify_handler_unlock(); }; |