diff options
author | Jason Dillaman <dillaman@redhat.com> | 2019-06-13 02:06:11 +0200 |
---|---|---|
committer | Jason Dillaman <dillaman@redhat.com> | 2019-06-19 16:38:51 +0200 |
commit | c0322a13c83f590067a120212620ebba15fc8661 (patch) | |
tree | b46c229ba62112e1278fb2b8097ea54311432a48 | |
parent | journal: fix broken append batching implementation (diff) | |
download | ceph-c0322a13c83f590067a120212620ebba15fc8661.tar.xz ceph-c0322a13c83f590067a120212620ebba15fc8661.zip |
journal: support dynamically updating recorder flush options
Default to disabling writeback-style append flushes unless overridden
by a call to 'set_append_batch_options'.
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
-rw-r--r-- | src/journal/JournalRecorder.cc | 43 | ||||
-rw-r--r-- | src/journal/JournalRecorder.h | 13 | ||||
-rw-r--r-- | src/journal/Journaler.cc | 13 | ||||
-rw-r--r-- | src/journal/Journaler.h | 5 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.cc | 25 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.h | 12 | ||||
-rw-r--r-- | src/librbd/Journal.cc | 7 | ||||
-rw-r--r-- | src/librbd/journal/DemoteRequest.cc | 2 | ||||
-rw-r--r-- | src/librbd/journal/PromoteRequest.cc | 2 | ||||
-rw-r--r-- | src/test/journal/mock/MockJournaler.h | 16 | ||||
-rw-r--r-- | src/test/journal/test_JournalRecorder.cc | 5 | ||||
-rw-r--r-- | src/test/journal/test_ObjectRecorder.cc | 9 | ||||
-rw-r--r-- | src/test/librbd/fsx.cc | 2 | ||||
-rw-r--r-- | src/test/librbd/journal/test_mock_PromoteRequest.cc | 2 | ||||
-rw-r--r-- | src/test/librbd/test_mock_Journal.cc | 16 | ||||
-rw-r--r-- | src/tools/rbd/action/Journal.cc | 2 |
16 files changed, 121 insertions, 53 deletions
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index bf795e0e6dd..aa90660a01f 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -50,12 +50,9 @@ struct C_Flush : public Context { JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr& journal_metadata, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), - m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), - m_flush_bytes(flush_bytes), m_flush_age(flush_age), + m_journal_metadata(journal_metadata), m_max_in_flight_appends(max_in_flight_appends), m_listener(this), m_object_handler(this), m_lock("JournalerRecorder::m_lock"), m_current_set(m_journal_metadata->get_active_set()) { @@ -66,13 +63,14 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - m_object_locks.push_back(shared_ptr<Mutex>( - new Mutex("ObjectRecorder::m_lock::"+ - std::to_string(splay_offset)))); + shared_ptr<Mutex> object_lock(new Mutex( + "ObjectRecorder::m_lock::" + std::to_string(splay_offset))); + m_object_locks.push_back(object_lock); + uint64_t object_number = splay_offset + (m_current_set * splay_width); + Mutex::Locker locker(*object_lock); m_object_ptrs[splay_offset] = create_object_recorder( - object_number, - m_object_locks[splay_offset]); + object_number, m_object_locks[splay_offset]); } m_journal_metadata->add_listener(&m_listener); @@ -109,6 +107,27 @@ void JournalRecorder::shut_down(Context *on_safe) { flush(on_safe); } +void JournalRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + + Mutex::Locker locker(m_lock); + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + Mutex::Locker object_locker(*m_object_locks[splay_offset]); + auto object_recorder = get_object(splay_offset); + object_recorder->set_append_batch_options(flush_interval, flush_bytes, + flush_age); + } +} + Future JournalRecorder::append(uint64_t tag_tid, const bufferlist &payload_bl) { ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl; @@ -286,8 +305,10 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder( ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), object_number, lock, m_journal_metadata->get_work_queue(), - &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, - m_flush_bytes, m_flush_age, m_max_in_flight_appends)); + &m_object_handler, m_journal_metadata->get_order(), + m_max_in_flight_appends)); + object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes, + m_flush_age); return object_recorder; } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index c27520a7589..382f75acef9 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -23,11 +23,14 @@ class JournalRecorder { public: JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr &journal_metadata, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends); + uint64_t max_in_flight_appends); ~JournalRecorder(); void shut_down(Context *on_safe); + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + Future append(uint64_t tag_tid, const bufferlist &bl); void flush(Context *on_safe); @@ -79,9 +82,9 @@ private: JournalMetadataPtr m_journal_metadata; - uint32_t m_flush_interval; - uint64_t m_flush_bytes; - double m_flush_age; + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; uint64_t m_max_in_flight_appends; Listener m_listener; diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 34bd30e290f..7d9c1409584 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -393,15 +393,20 @@ void Journaler::committed(const Future &future) { m_trimmer->committed(future_impl->get_commit_tid()); } -void Journaler::start_append(int flush_interval, uint64_t flush_bytes, - double flush_age, uint64_t max_in_flight_appends) { +void Journaler::start_append(uint64_t max_in_flight_appends) { ceph_assert(m_recorder == nullptr); // TODO verify active object set >= current replay object set m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, - m_metadata, flush_interval, flush_bytes, - flush_age, max_in_flight_appends); + m_metadata, max_in_flight_appends); +} + +void Journaler::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ceph_assert(m_recorder != nullptr); + m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age); } void Journaler::stop_append(Context *on_safe) { diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 3f3987105e1..a063a6d43a0 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -110,8 +110,9 @@ public: void stop_replay(Context *on_finish); uint64_t get_max_append_size() const; - void start_append(int flush_interval, uint64_t flush_bytes, double flush_age, - uint64_t max_in_flight_appends); + void start_append(uint64_t max_in_flight_appends); + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); Future append(uint64_t tag_tid, const bufferlist &bl); void flush_append(Context *on_safe); void stop_append(Context *on_safe); diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 162dfe90c08..127731e95c3 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -21,17 +21,13 @@ namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, shared_ptr<Mutex> lock, ContextWQ *work_queue, Handler *handler, - uint8_t order, uint32_t flush_interval, - uint64_t flush_bytes, double flush_age, - int32_t max_in_flight_appends) + uint8_t order, int32_t max_in_flight_appends) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), - m_flush_interval(flush_interval), m_flush_bytes(flush_bytes), - m_flush_age(flush_age), m_max_in_flight_appends(max_in_flight_appends), - m_flush_handler(this), m_lock(lock), m_last_flush_time(ceph_clock_now()), - m_append_tid(0), m_overflowed(false), m_object_closed(false), - m_in_flight_flushes(false) { + m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this), + m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0), + m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); ceph_assert(m_handler != NULL); @@ -45,6 +41,19 @@ ObjectRecorder::~ObjectRecorder() { ceph_assert(m_in_flight_appends.empty()); } +void ObjectRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + + ceph_assert(m_lock->is_locked()); + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; +} + bool ObjectRecorder::append(AppendBuffers &&append_buffers) { ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl; diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index d7cf6e668a9..ff00e0a0a1f 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -41,10 +41,12 @@ public: ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, std::shared_ptr<Mutex> lock, ContextWQ *work_queue, Handler *handler, uint8_t order, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, int32_t max_in_flight_appends); + int32_t max_in_flight_appends); ~ObjectRecorder() override; + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + inline uint64_t get_object_number() const { return m_object_number; } @@ -115,9 +117,9 @@ private: uint8_t m_order; uint64_t m_soft_max_size; - uint32_t m_flush_interval; - uint64_t m_flush_bytes; - double m_flush_age; + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; int32_t m_max_in_flight_appends; FlushHandler m_flush_handler; diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 5a648282fe9..11b2f23fa0f 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -1169,11 +1169,14 @@ void Journal<I>::complete_event(typename Events::iterator it, int r) { template <typename I> void Journal<I>::start_append() { ceph_assert(m_lock.is_locked()); + m_journaler->start_append( + m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends")); + m_journaler->set_append_batch_options( m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"), m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"), - m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"), - m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends")); + m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age")); + transition_state(STATE_READY, 0); } diff --git a/src/librbd/journal/DemoteRequest.cc b/src/librbd/journal/DemoteRequest.cc index 5b34942684e..7656caac809 100644 --- a/src/librbd/journal/DemoteRequest.cc +++ b/src/librbd/journal/DemoteRequest.cc @@ -135,7 +135,7 @@ void DemoteRequest<I>::append_event() { bufferlist event_entry_bl; encode(event_entry, event_entry_bl); - m_journaler->start_append(0, 0, 0, 0); + m_journaler->start_append(0); m_future = m_journaler->append(m_tag_tid, event_entry_bl); auto ctx = create_context_callback< diff --git a/src/librbd/journal/PromoteRequest.cc b/src/librbd/journal/PromoteRequest.cc index 30d9f3bb159..22dc83a32d8 100644 --- a/src/librbd/journal/PromoteRequest.cc +++ b/src/librbd/journal/PromoteRequest.cc @@ -119,7 +119,7 @@ void PromoteRequest<I>::append_event() { bufferlist event_entry_bl; encode(event_entry, event_entry_bl); - m_journaler->start_append(0, 0, 0, 0); + m_journaler->start_append(0); m_future = m_journaler->append(m_tag_tid, event_entry_bl); auto ctx = create_context_callback< diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h index 787d197dd93..ab424cd6b09 100644 --- a/src/test/journal/mock/MockJournaler.h +++ b/src/test/journal/mock/MockJournaler.h @@ -120,7 +120,8 @@ struct MockJournaler { MOCK_METHOD0(stop_replay, void()); MOCK_METHOD1(stop_replay, void(Context *on_finish)); - MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t)); + MOCK_METHOD1(start_append, void(uint64_t)); + MOCK_METHOD3(set_append_batch_options, void(int, uint64_t, double)); MOCK_CONST_METHOD0(get_max_append_size, uint64_t()); MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id, const bufferlist &bl)); @@ -259,11 +260,14 @@ struct MockJournalerProxy { MockJournaler::get_instance().stop_replay(on_finish); } - void start_append(int flush_interval, uint64_t flush_bytes, double flush_age, - uint64_t max_in_flight_appends) { - MockJournaler::get_instance().start_append(flush_interval, flush_bytes, - flush_age, - max_in_flight_appends); + void start_append(uint64_t max_in_flight_appends) { + MockJournaler::get_instance().start_append(max_in_flight_appends); + } + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age) { + MockJournaler::get_instance().set_append_batch_options( + flush_interval, flush_bytes, flush_age); } uint64_t get_max_append_size() const { diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc index fb7c06772ec..7197526a1ce 100644 --- a/src/test/journal/test_JournalRecorder.cc +++ b/src/test/journal/test_JournalRecorder.cc @@ -22,8 +22,9 @@ public: journal::JournalRecorder *create_recorder( const std::string &oid, const journal::JournalMetadataPtr &metadata) { journal::JournalRecorder *recorder(new journal::JournalRecorder( - m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(), - 0, 0)); + m_ioctx, oid + ".", metadata, 0)); + recorder->set_append_batch_options(0, std::numeric_limits<uint32_t>::max(), + 0); m_recorders.push_back(recorder); return recorder; } diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index 7c3e7e99d16..3cc8e893cfe 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -94,8 +94,13 @@ public: journal::ObjectRecorderPtr create_object(const std::string &oid, uint8_t order, shared_ptr<Mutex> lock) { journal::ObjectRecorderPtr object(new journal::ObjectRecorder( - m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, m_flush_interval, - m_flush_bytes, m_flush_age, m_max_in_flight_appends)); + m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, + m_max_in_flight_appends)); + { + Mutex::Locker locker(*lock); + object->set_append_batch_options(m_flush_interval, m_flush_bytes, + m_flush_age); + } m_object_recorders.push_back(object); m_object_recorder_locks.insert(std::make_pair(oid, lock)); m_handler.object_lock = lock; diff --git a/src/test/librbd/fsx.cc b/src/test/librbd/fsx.cc index 1e96b502798..25766b5002d 100644 --- a/src/test/librbd/fsx.cc +++ b/src/test/librbd/fsx.cc @@ -435,7 +435,7 @@ int replay_journal(rados_ioctx_t ioctx, const char *image_name, return r; } - replay_journaler.start_append(0, 0, 0, 0); + replay_journaler.start_append(0); C_SaferCond replay_ctx; ReplayHandler replay_handler(&journaler, &replay_journaler, diff --git a/src/test/librbd/journal/test_mock_PromoteRequest.cc b/src/test/librbd/journal/test_mock_PromoteRequest.cc index 0e61a88890d..68a627a79a8 100644 --- a/src/test/librbd/journal/test_mock_PromoteRequest.cc +++ b/src/test/librbd/journal/test_mock_PromoteRequest.cc @@ -120,7 +120,7 @@ public: } void expect_start_append(::journal::MockJournaler &mock_journaler) { - EXPECT_CALL(mock_journaler, start_append(_, _, _, _)); + EXPECT_CALL(mock_journaler, start_append(_)); } void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) { diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 727d0db8381..00367e2214e 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -396,7 +396,11 @@ public: } void expect_start_append(::journal::MockJournaler &mock_journaler) { - EXPECT_CALL(mock_journaler, start_append(_, _, _, _)); + EXPECT_CALL(mock_journaler, start_append(_)); + } + + void expect_set_append_batch_options(::journal::MockJournaler &mock_journaler) { + EXPECT_CALL(mock_journaler, set_append_batch_options(_, _, _)); } void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) { @@ -518,6 +522,7 @@ public: expect_committed(mock_journaler, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); } @@ -585,6 +590,7 @@ TEST_F(TestMockJournal, StateTransitions) { expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); @@ -662,6 +668,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, 0); @@ -719,6 +726,7 @@ TEST_F(TestMockJournal, FlushReplayError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, 0); @@ -773,6 +781,7 @@ TEST_F(TestMockJournal, CorruptEntry) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, -EINVAL); @@ -811,6 +820,7 @@ TEST_F(TestMockJournal, StopError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); ASSERT_EQ(0, when_open(mock_journal)); expect_stop_append(mock_journaler, -EINVAL); @@ -876,6 +886,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); C_SaferCond ctx; mock_journal.open(&ctx); @@ -958,6 +969,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0); expect_flush_commit_position(mock_journaler); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); C_SaferCond ctx; mock_journal.open(&ctx); @@ -1272,6 +1284,7 @@ TEST_F(TestMockJournal, ExternalReplay) { InSequence seq; expect_stop_append(mock_journaler, 0); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; @@ -1303,6 +1316,7 @@ TEST_F(TestMockJournal, ExternalReplayFailure) { InSequence seq; expect_stop_append(mock_journaler, -EINVAL); expect_start_append(mock_journaler); + expect_set_append_batch_options(mock_journaler); expect_shut_down_journaler(mock_journaler); C_SaferCond start_ctx; diff --git a/src/tools/rbd/action/Journal.cc b/src/tools/rbd/action/Journal.cc index 76f76b1c5eb..db506207c4b 100644 --- a/src/tools/rbd/action/Journal.cc +++ b/src/tools/rbd/action/Journal.cc @@ -832,7 +832,7 @@ public: if (r < 0) { return r; } - m_journaler.start_append(0, 0, 0, 0); + m_journaler.start_append(0); int r1 = 0; bufferlist bl; |