diff options
author | Jason Dillaman <dillaman@redhat.com> | 2016-05-15 00:13:38 +0200 |
---|---|---|
committer | Jason Dillaman <dillaman@redhat.com> | 2016-05-18 17:02:29 +0200 |
commit | bba91437dbe3b7a9b6da8a61ccc4c597858c8efc (patch) | |
tree | 7f80b26ad89f2cec79a1963d4281e0cc4b85ab4a | |
parent | journal: do not flag append as full if already known to be full (diff) | |
download | ceph-bba91437dbe3b7a9b6da8a61ccc4c597858c8efc.tar.xz ceph-bba91437dbe3b7a9b6da8a61ccc4c597858c8efc.zip |
journal: new ObjectRecorder closed callback
The callback will be invoked if there were in-flight appends
when the close was requested.
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
-rw-r--r-- | src/journal/JournalRecorder.cc | 10 | ||||
-rw-r--r-- | src/journal/JournalRecorder.h | 14 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.cc | 39 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.h | 22 | ||||
-rw-r--r-- | src/test/journal/test_ObjectRecorder.cc | 61 |
5 files changed, 105 insertions, 41 deletions
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index f78f0c82c49..02836b87c86 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -49,7 +49,7 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this), - m_overflow_handler(this), m_lock("JournalerRecorder::m_lock"), + m_object_handler(this), m_lock("JournalerRecorder::m_lock"), m_current_set(m_journal_metadata->get_active_set()) { Mutex::Locker locker(m_lock); @@ -151,7 +151,7 @@ void JournalRecorder::close_object_set(uint64_t object_set) { ObjectRecorderPtr object_recorder = it->second; if (object_recorder != NULL && object_recorder->get_object_number() / splay_width == m_current_set) { - if (object_recorder->close_object()) { + if (object_recorder->close()) { // no in-flight ops, immediately create new recorder create_next_object_recorder(object_recorder); } @@ -164,7 +164,7 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder( ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), object_number, m_journal_metadata->get_timer(), - m_journal_metadata->get_timer_lock(), &m_overflow_handler, + m_journal_metadata->get_timer_lock(), &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes, m_flush_age)); return object_recorder; @@ -197,6 +197,10 @@ void JournalRecorder::handle_update() { } } +void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { + // TODO +} + void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index be92298a483..dbd289883d4 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -47,12 +47,16 @@ private: } }; - struct OverflowHandler : public ObjectRecorder::OverflowHandler { + struct ObjectHandler : public ObjectRecorder::Handler { JournalRecorder *journal_recorder; - OverflowHandler(JournalRecorder *_journal_recorder) - : journal_recorder(_journal_recorder) {} + ObjectHandler(JournalRecorder *_journal_recorder) + : journal_recorder(_journal_recorder) { + } + virtual void closed(ObjectRecorder *object_recorder) { + journal_recorder->handle_closed(object_recorder); + } virtual void overflow(ObjectRecorder *object_recorder) { journal_recorder->handle_overflow(object_recorder); } @@ -69,7 +73,7 @@ private: double m_flush_age; Listener m_listener; - OverflowHandler m_overflow_handler; + ObjectHandler m_object_handler; Mutex m_lock; @@ -83,6 +87,8 @@ private: void create_next_object_recorder(ObjectRecorderPtr object_recorder); void handle_update(); + + void handle_closed(ObjectRecorder *object_recorder); void handle_overflow(ObjectRecorder *object_recorder); }; diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 78b51a2b4d1..5972d899b13 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -19,21 +19,20 @@ namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, SafeTimer &timer, Mutex &timer_lock, - OverflowHandler *overflow_handler, uint8_t order, + Handler *handler, uint8_t order, uint32_t flush_interval, uint64_t flush_bytes, double flush_age) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), - m_overflow_handler(overflow_handler), m_order(order), - m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), - m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this), - m_append_task(NULL), + m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), + m_flush_interval(flush_interval), m_flush_bytes(flush_bytes), + m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL), m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)), m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); - assert(m_overflow_handler != NULL); + assert(m_handler != NULL); } ObjectRecorder::~ObjectRecorder() { @@ -151,15 +150,17 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { m_append_buffers.begin(), m_append_buffers.end()); } -bool ObjectRecorder::close_object() { +bool ObjectRecorder::close() { ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; cancel_append_task(); Mutex::Locker locker(m_lock); - m_object_closed = true; flush_appends(true); - return m_in_flight_appends.empty(); + + assert(!m_object_closed); + m_object_closed = true; + return m_in_flight_tids.empty(); } void ObjectRecorder::handle_append_task() { @@ -248,7 +249,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { // notify of overflow once all in-flight ops are complete if (m_in_flight_tids.empty()) { - notify_overflow(); + notify_handler(); } return; } @@ -260,7 +261,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { m_in_flight_appends.erase(iter); if (m_in_flight_appends.empty() && m_object_closed) { // all remaining unsent appends should be redirected to new object - notify_overflow(); + notify_handler(); } m_in_flight_flushes = true; } @@ -338,7 +339,7 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { rados_completion->release(); } -void ObjectRecorder::notify_overflow() { +void ObjectRecorder::notify_handler() { assert(m_lock.is_locked()); for (AppendBuffers::const_iterator it = m_append_buffers.begin(); @@ -348,10 +349,16 @@ void ObjectRecorder::notify_overflow() { it->first->detach(); } - // TODO need to delay completion until after aio_notify completes - m_lock.Unlock(); - m_overflow_handler->overflow(this); - m_lock.Lock(); + if (m_object_closed) { + m_lock.Unlock(); + m_handler->closed(this); + m_lock.Lock(); + } else { + // TODO need to delay completion until after aio_notify completes + m_lock.Unlock(); + m_handler->overflow(this); + m_lock.Lock(); + } } } // namespace journal diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 378ab64df8c..53f8cc9ad0d 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -29,16 +29,17 @@ typedef std::list<AppendBuffer> AppendBuffers; class ObjectRecorder : public RefCountedObject, boost::noncopyable { public: - struct OverflowHandler { - virtual ~OverflowHandler() {} + struct Handler { + virtual ~Handler() { + } + virtual void closed(ObjectRecorder *object_recorder) = 0; virtual void overflow(ObjectRecorder *object_recorder) = 0; }; ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, SafeTimer &timer, Mutex &timer_lock, - OverflowHandler *overflow_handler, uint8_t order, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age); + Handler *handler, uint8_t order, uint32_t flush_interval, + uint64_t flush_bytes, double flush_age); ~ObjectRecorder(); inline uint64_t get_object_number() const { @@ -53,7 +54,12 @@ public: void flush(const FutureImplPtr &future); void claim_append_buffers(AppendBuffers *append_buffers); - bool close_object(); + + bool is_closed() const { + Mutex::Locker locker(m_lock); + return (m_object_closed && m_in_flight_appends.empty()); + } + bool close(); inline CephContext *cct() const { return m_cct; @@ -110,7 +116,7 @@ private: SafeTimer &m_timer; Mutex &m_timer_lock; - OverflowHandler *m_overflow_handler; + Handler *m_handler; uint8_t m_order; uint64_t m_soft_max_size; @@ -149,7 +155,7 @@ private: void append_overflowed(uint64_t tid); void send_appends(AppendBuffers *append_buffers); - void notify_overflow(); + void notify_handler(); }; } // namespace journal diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index f26e5261688..de82d062095 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -19,13 +19,20 @@ public: { } - struct OverflowHandler : public journal::ObjectRecorder::OverflowHandler { + struct Handler : public journal::ObjectRecorder::Handler { Mutex lock; Cond cond; - uint32_t overflows; + bool is_closed = false; + uint32_t overflows = 0; - OverflowHandler() : lock("lock"), overflows(0) {} + Handler() : lock("lock") { + } + virtual void closed(journal::ObjectRecorder *object_recorder) { + Mutex::Locker locker(lock); + is_closed = true; + cond.Signal(); + } virtual void overflow(journal::ObjectRecorder *object_recorder) { Mutex::Locker locker(lock); journal::AppendBuffers append_buffers; @@ -43,7 +50,7 @@ public: uint32_t m_flush_interval; uint64_t m_flush_bytes; double m_flush_age; - OverflowHandler m_overflow_handler; + Handler m_handler; void TearDown() { for (ObjectRecorders::iterator it = m_object_recorders.begin(); @@ -81,7 +88,7 @@ public: journal::ObjectRecorderPtr create_object(const std::string &oid, uint8_t order) { journal::ObjectRecorderPtr object(new journal::ObjectRecorder( - m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_overflow_handler, order, + m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_handler, order, m_flush_interval, m_flush_bytes, m_flush_age)); m_object_recorders.push_back(object); return object; @@ -301,6 +308,40 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) { ASSERT_EQ(0, cond.wait()); } +TEST_F(TestObjectRecorder, Close) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_flush_interval(2); + journal::ObjectRecorderPtr object = create_object(oid, 24); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + ASSERT_FALSE(object->append(append_buffers)); + ASSERT_EQ(1U, object->get_pending_appends()); + + ASSERT_FALSE(object->close()); + + { + Mutex::Locker locker(m_handler.lock); + while (!m_handler.is_closed) { + if (m_handler.cond.WaitInterval( + reinterpret_cast<CephContext*>(m_ioctx.cct()), + m_handler.lock, utime_t(10, 0)) != 0) { + break; + } + } + } + + ASSERT_TRUE(m_handler.is_closed); + ASSERT_EQ(0U, object->get_pending_appends()); +} + TEST_F(TestObjectRecorder, Overflow) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); @@ -334,15 +375,15 @@ TEST_F(TestObjectRecorder, Overflow) { bool overflowed = false; { - Mutex::Locker locker(m_overflow_handler.lock); - while (m_overflow_handler.overflows == 0) { - if (m_overflow_handler.cond.WaitInterval( + Mutex::Locker locker(m_handler.lock); + while (m_handler.overflows == 0) { + if (m_handler.cond.WaitInterval( reinterpret_cast<CephContext*>(m_ioctx.cct()), - m_overflow_handler.lock, utime_t(10, 0)) != 0) { + m_handler.lock, utime_t(10, 0)) != 0) { break; } } - if (m_overflow_handler.overflows != 0) { + if (m_handler.overflows != 0) { overflowed = true; } } |