summaryrefslogtreecommitdiffstats
path: root/src/journal/ObjectRecorder.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal/ObjectRecorder.cc')
-rw-r--r--src/journal/ObjectRecorder.cc39
1 files changed, 23 insertions, 16 deletions
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index 78b51a2b4d1..5972d899b13 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -19,21 +19,20 @@ namespace journal {
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number,
SafeTimer &timer, Mutex &timer_lock,
- OverflowHandler *overflow_handler, uint8_t order,
+ Handler *handler, uint8_t order,
uint32_t flush_interval, uint64_t flush_bytes,
double flush_age)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
- m_overflow_handler(overflow_handler), m_order(order),
- m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
- m_append_task(NULL),
+ m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
+ m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
+ m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
m_object_closed(false), m_in_flight_flushes(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
- assert(m_overflow_handler != NULL);
+ assert(m_handler != NULL);
}
ObjectRecorder::~ObjectRecorder() {
@@ -151,15 +150,17 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
m_append_buffers.begin(), m_append_buffers.end());
}
-bool ObjectRecorder::close_object() {
+bool ObjectRecorder::close() {
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
cancel_append_task();
Mutex::Locker locker(m_lock);
- m_object_closed = true;
flush_appends(true);
- return m_in_flight_appends.empty();
+
+ assert(!m_object_closed);
+ m_object_closed = true;
+ return m_in_flight_tids.empty();
}
void ObjectRecorder::handle_append_task() {
@@ -248,7 +249,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty()) {
- notify_overflow();
+ notify_handler();
}
return;
}
@@ -260,7 +261,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
m_in_flight_appends.erase(iter);
if (m_in_flight_appends.empty() && m_object_closed) {
// all remaining unsent appends should be redirected to new object
- notify_overflow();
+ notify_handler();
}
m_in_flight_flushes = true;
}
@@ -338,7 +339,7 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
rados_completion->release();
}
-void ObjectRecorder::notify_overflow() {
+void ObjectRecorder::notify_handler() {
assert(m_lock.is_locked());
for (AppendBuffers::const_iterator it = m_append_buffers.begin();
@@ -348,10 +349,16 @@ void ObjectRecorder::notify_overflow() {
it->first->detach();
}
- // TODO need to delay completion until after aio_notify completes
- m_lock.Unlock();
- m_overflow_handler->overflow(this);
- m_lock.Lock();
+ if (m_object_closed) {
+ m_lock.Unlock();
+ m_handler->closed(this);
+ m_lock.Lock();
+ } else {
+ // TODO need to delay completion until after aio_notify completes
+ m_lock.Unlock();
+ m_handler->overflow(this);
+ m_lock.Lock();
+ }
}
} // namespace journal