summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2016-05-15 00:13:38 +0200
committerJason Dillaman <dillaman@redhat.com>2016-05-18 17:02:29 +0200
commitbba91437dbe3b7a9b6da8a61ccc4c597858c8efc (patch)
tree7f80b26ad89f2cec79a1963d4281e0cc4b85ab4a
parentjournal: do not flag append as full if already known to be full (diff)
downloadceph-bba91437dbe3b7a9b6da8a61ccc4c597858c8efc.tar.xz
ceph-bba91437dbe3b7a9b6da8a61ccc4c597858c8efc.zip
journal: new ObjectRecorder closed callback
The callback will be invoked if there were in-flight appends when the close was requested. Signed-off-by: Jason Dillaman <dillaman@redhat.com>
-rw-r--r--src/journal/JournalRecorder.cc10
-rw-r--r--src/journal/JournalRecorder.h14
-rw-r--r--src/journal/ObjectRecorder.cc39
-rw-r--r--src/journal/ObjectRecorder.h22
-rw-r--r--src/test/journal/test_ObjectRecorder.cc61
5 files changed, 105 insertions, 41 deletions
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc
index f78f0c82c49..02836b87c86 100644
--- a/src/journal/JournalRecorder.cc
+++ b/src/journal/JournalRecorder.cc
@@ -49,7 +49,7 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
- m_overflow_handler(this), m_lock("JournalerRecorder::m_lock"),
+ m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
m_current_set(m_journal_metadata->get_active_set()) {
Mutex::Locker locker(m_lock);
@@ -151,7 +151,7 @@ void JournalRecorder::close_object_set(uint64_t object_set) {
ObjectRecorderPtr object_recorder = it->second;
if (object_recorder != NULL &&
object_recorder->get_object_number() / splay_width == m_current_set) {
- if (object_recorder->close_object()) {
+ if (object_recorder->close()) {
// no in-flight ops, immediately create new recorder
create_next_object_recorder(object_recorder);
}
@@ -164,7 +164,7 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
object_number, m_journal_metadata->get_timer(),
- m_journal_metadata->get_timer_lock(), &m_overflow_handler,
+ m_journal_metadata->get_timer_lock(), &m_object_handler,
m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
m_flush_age));
return object_recorder;
@@ -197,6 +197,10 @@ void JournalRecorder::handle_update() {
}
}
+void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
+ // TODO
+}
+
void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h
index be92298a483..dbd289883d4 100644
--- a/src/journal/JournalRecorder.h
+++ b/src/journal/JournalRecorder.h
@@ -47,12 +47,16 @@ private:
}
};
- struct OverflowHandler : public ObjectRecorder::OverflowHandler {
+ struct ObjectHandler : public ObjectRecorder::Handler {
JournalRecorder *journal_recorder;
- OverflowHandler(JournalRecorder *_journal_recorder)
- : journal_recorder(_journal_recorder) {}
+ ObjectHandler(JournalRecorder *_journal_recorder)
+ : journal_recorder(_journal_recorder) {
+ }
+ virtual void closed(ObjectRecorder *object_recorder) {
+ journal_recorder->handle_closed(object_recorder);
+ }
virtual void overflow(ObjectRecorder *object_recorder) {
journal_recorder->handle_overflow(object_recorder);
}
@@ -69,7 +73,7 @@ private:
double m_flush_age;
Listener m_listener;
- OverflowHandler m_overflow_handler;
+ ObjectHandler m_object_handler;
Mutex m_lock;
@@ -83,6 +87,8 @@ private:
void create_next_object_recorder(ObjectRecorderPtr object_recorder);
void handle_update();
+
+ void handle_closed(ObjectRecorder *object_recorder);
void handle_overflow(ObjectRecorder *object_recorder);
};
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index 78b51a2b4d1..5972d899b13 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -19,21 +19,20 @@ namespace journal {
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number,
SafeTimer &timer, Mutex &timer_lock,
- OverflowHandler *overflow_handler, uint8_t order,
+ Handler *handler, uint8_t order,
uint32_t flush_interval, uint64_t flush_bytes,
double flush_age)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
- m_overflow_handler(overflow_handler), m_order(order),
- m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
- m_append_task(NULL),
+ m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
+ m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
+ m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
m_object_closed(false), m_in_flight_flushes(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
- assert(m_overflow_handler != NULL);
+ assert(m_handler != NULL);
}
ObjectRecorder::~ObjectRecorder() {
@@ -151,15 +150,17 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
m_append_buffers.begin(), m_append_buffers.end());
}
-bool ObjectRecorder::close_object() {
+bool ObjectRecorder::close() {
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
cancel_append_task();
Mutex::Locker locker(m_lock);
- m_object_closed = true;
flush_appends(true);
- return m_in_flight_appends.empty();
+
+ assert(!m_object_closed);
+ m_object_closed = true;
+ return m_in_flight_tids.empty();
}
void ObjectRecorder::handle_append_task() {
@@ -248,7 +249,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty()) {
- notify_overflow();
+ notify_handler();
}
return;
}
@@ -260,7 +261,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
m_in_flight_appends.erase(iter);
if (m_in_flight_appends.empty() && m_object_closed) {
// all remaining unsent appends should be redirected to new object
- notify_overflow();
+ notify_handler();
}
m_in_flight_flushes = true;
}
@@ -338,7 +339,7 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
rados_completion->release();
}
-void ObjectRecorder::notify_overflow() {
+void ObjectRecorder::notify_handler() {
assert(m_lock.is_locked());
for (AppendBuffers::const_iterator it = m_append_buffers.begin();
@@ -348,10 +349,16 @@ void ObjectRecorder::notify_overflow() {
it->first->detach();
}
- // TODO need to delay completion until after aio_notify completes
- m_lock.Unlock();
- m_overflow_handler->overflow(this);
- m_lock.Lock();
+ if (m_object_closed) {
+ m_lock.Unlock();
+ m_handler->closed(this);
+ m_lock.Lock();
+ } else {
+ // TODO need to delay completion until after aio_notify completes
+ m_lock.Unlock();
+ m_handler->overflow(this);
+ m_lock.Lock();
+ }
}
} // namespace journal
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
index 378ab64df8c..53f8cc9ad0d 100644
--- a/src/journal/ObjectRecorder.h
+++ b/src/journal/ObjectRecorder.h
@@ -29,16 +29,17 @@ typedef std::list<AppendBuffer> AppendBuffers;
class ObjectRecorder : public RefCountedObject, boost::noncopyable {
public:
- struct OverflowHandler {
- virtual ~OverflowHandler() {}
+ struct Handler {
+ virtual ~Handler() {
+ }
+ virtual void closed(ObjectRecorder *object_recorder) = 0;
virtual void overflow(ObjectRecorder *object_recorder) = 0;
};
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, SafeTimer &timer, Mutex &timer_lock,
- OverflowHandler *overflow_handler, uint8_t order,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age);
+ Handler *handler, uint8_t order, uint32_t flush_interval,
+ uint64_t flush_bytes, double flush_age);
~ObjectRecorder();
inline uint64_t get_object_number() const {
@@ -53,7 +54,12 @@ public:
void flush(const FutureImplPtr &future);
void claim_append_buffers(AppendBuffers *append_buffers);
- bool close_object();
+
+ bool is_closed() const {
+ Mutex::Locker locker(m_lock);
+ return (m_object_closed && m_in_flight_appends.empty());
+ }
+ bool close();
inline CephContext *cct() const {
return m_cct;
@@ -110,7 +116,7 @@ private:
SafeTimer &m_timer;
Mutex &m_timer_lock;
- OverflowHandler *m_overflow_handler;
+ Handler *m_handler;
uint8_t m_order;
uint64_t m_soft_max_size;
@@ -149,7 +155,7 @@ private:
void append_overflowed(uint64_t tid);
void send_appends(AppendBuffers *append_buffers);
- void notify_overflow();
+ void notify_handler();
};
} // namespace journal
diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc
index f26e5261688..de82d062095 100644
--- a/src/test/journal/test_ObjectRecorder.cc
+++ b/src/test/journal/test_ObjectRecorder.cc
@@ -19,13 +19,20 @@ public:
{
}
- struct OverflowHandler : public journal::ObjectRecorder::OverflowHandler {
+ struct Handler : public journal::ObjectRecorder::Handler {
Mutex lock;
Cond cond;
- uint32_t overflows;
+ bool is_closed = false;
+ uint32_t overflows = 0;
- OverflowHandler() : lock("lock"), overflows(0) {}
+ Handler() : lock("lock") {
+ }
+ virtual void closed(journal::ObjectRecorder *object_recorder) {
+ Mutex::Locker locker(lock);
+ is_closed = true;
+ cond.Signal();
+ }
virtual void overflow(journal::ObjectRecorder *object_recorder) {
Mutex::Locker locker(lock);
journal::AppendBuffers append_buffers;
@@ -43,7 +50,7 @@ public:
uint32_t m_flush_interval;
uint64_t m_flush_bytes;
double m_flush_age;
- OverflowHandler m_overflow_handler;
+ Handler m_handler;
void TearDown() {
for (ObjectRecorders::iterator it = m_object_recorders.begin();
@@ -81,7 +88,7 @@ public:
journal::ObjectRecorderPtr create_object(const std::string &oid,
uint8_t order) {
journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
- m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_overflow_handler, order,
+ m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_handler, order,
m_flush_interval, m_flush_bytes, m_flush_age));
m_object_recorders.push_back(object);
return object;
@@ -301,6 +308,40 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
ASSERT_EQ(0, cond.wait());
}
+TEST_F(TestObjectRecorder, Close) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_flush_interval(2);
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ ASSERT_FALSE(object->close());
+
+ {
+ Mutex::Locker locker(m_handler.lock);
+ while (!m_handler.is_closed) {
+ if (m_handler.cond.WaitInterval(
+ reinterpret_cast<CephContext*>(m_ioctx.cct()),
+ m_handler.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ }
+
+ ASSERT_TRUE(m_handler.is_closed);
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
TEST_F(TestObjectRecorder, Overflow) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
@@ -334,15 +375,15 @@ TEST_F(TestObjectRecorder, Overflow) {
bool overflowed = false;
{
- Mutex::Locker locker(m_overflow_handler.lock);
- while (m_overflow_handler.overflows == 0) {
- if (m_overflow_handler.cond.WaitInterval(
+ Mutex::Locker locker(m_handler.lock);
+ while (m_handler.overflows == 0) {
+ if (m_handler.cond.WaitInterval(
reinterpret_cast<CephContext*>(m_ioctx.cct()),
- m_overflow_handler.lock, utime_t(10, 0)) != 0) {
+ m_handler.lock, utime_t(10, 0)) != 0) {
break;
}
}
- if (m_overflow_handler.overflows != 0) {
+ if (m_handler.overflows != 0) {
overflowed = true;
}
}