summaryrefslogtreecommitdiffstats
path: root/src/journal
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2015-07-16 20:41:49 +0200
committerJason Dillaman <dillaman@redhat.com>2015-11-06 02:42:42 +0100
commit4d3969d6c15bdfa3dca3aae1ed0e2aa9737c81e9 (patch)
tree71626047a6fd9b008f52ae1e1e84ae0410649517 /src/journal
parenttests: update journal tests based on API changes (diff)
downloadceph-4d3969d6c15bdfa3dca3aae1ed0e2aa9737c81e9.tar.xz
ceph-4d3969d6c15bdfa3dca3aae1ed0e2aa9737c81e9.zip
journal: fix issues discovered via valgrind
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/AsyncOpTracker.cc39
-rw-r--r--src/journal/AsyncOpTracker.h32
-rw-r--r--src/journal/JournalMetadata.cc56
-rw-r--r--src/journal/JournalMetadata.h34
-rw-r--r--src/journal/JournalPlayer.cc41
-rw-r--r--src/journal/JournalPlayer.h20
-rw-r--r--src/journal/JournalTrimmer.cc39
-rw-r--r--src/journal/JournalTrimmer.h13
-rw-r--r--src/journal/Journaler.cc2
-rw-r--r--src/journal/Makefile.am2
-rw-r--r--src/journal/ObjectPlayer.cc83
-rw-r--r--src/journal/ObjectPlayer.h17
-rw-r--r--src/journal/ObjectRecorder.cc20
-rw-r--r--src/journal/ObjectRecorder.h6
-rw-r--r--src/journal/ReplayHandler.h3
15 files changed, 259 insertions, 148 deletions
diff --git a/src/journal/AsyncOpTracker.cc b/src/journal/AsyncOpTracker.cc
new file mode 100644
index 00000000000..8c24088681e
--- /dev/null
+++ b/src/journal/AsyncOpTracker.cc
@@ -0,0 +1,39 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/AsyncOpTracker.h"
+#include "journal/Utils.h"
+#include "include/assert.h"
+
+namespace journal {
+
+AsyncOpTracker::AsyncOpTracker()
+ : m_lock(utils::unique_lock_name("AsyncOpTracker::m_lock", this)),
+ m_pending_ops(0) {
+}
+
+AsyncOpTracker::~AsyncOpTracker() {
+ wait_for_ops();
+}
+
+void AsyncOpTracker::start_op() {
+ Mutex::Locker locker(m_lock);
+ ++m_pending_ops;
+}
+
+void AsyncOpTracker::finish_op() {
+ Mutex::Locker locker(m_lock);
+ assert(m_pending_ops > 0);
+ if (--m_pending_ops == 0) {
+ m_cond.Signal();
+ }
+}
+
+void AsyncOpTracker::wait_for_ops() {
+ Mutex::Locker locker(m_lock);
+ while (m_pending_ops > 0) {
+ m_cond.Wait(m_lock);
+ }
+}
+
+} // namespace journal
diff --git a/src/journal/AsyncOpTracker.h b/src/journal/AsyncOpTracker.h
new file mode 100644
index 00000000000..cec332f8471
--- /dev/null
+++ b/src/journal/AsyncOpTracker.h
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_ASYNC_OP_TRACKER_H
+#define CEPH_JOURNAL_ASYNC_OP_TRACKER_H
+
+#include "include/int_types.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+
+namespace journal {
+
+class AsyncOpTracker {
+public:
+ AsyncOpTracker();
+ ~AsyncOpTracker();
+
+ void start_op();
+ void finish_op();
+
+ void wait_for_ops();
+
+private:
+ Mutex m_lock;
+ Cond m_cond;
+ uint32_t m_pending_ops;
+
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_ASYNC_OP_TRACKER_H
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc
index 51c4cef8031..56c0db32e3f 100644
--- a/src/journal/JournalMetadata.cc
+++ b/src/journal/JournalMetadata.cc
@@ -20,33 +20,21 @@ JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id,
double commit_interval)
- : m_cct(NULL), m_oid(oid), m_client_id(client_id),
- m_commit_interval(commit_interval), m_order(0), m_splay_width(0),
- m_initialized(false), m_finisher(NULL), m_timer(NULL),
+ : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
+ m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
+ m_splay_width(0), m_initialized(false), m_finisher(NULL), m_timer(NULL),
m_timer_lock("JournalMetadata::m_timer_lock"),
m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
- m_update_notifications(0), m_commit_position_pending(false),
- m_commit_position_ctx(NULL) {
+ m_minimum_set(0), m_active_set(0), m_update_notifications(0),
+ m_commit_position_pending(false), m_commit_position_ctx(NULL) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
JournalMetadata::~JournalMetadata() {
- if (m_timer != NULL) {
- Mutex::Locker locker(m_timer_lock);
- m_timer->shutdown();
- delete m_timer;
- m_timer = NULL;
+ if (m_initialized) {
+ shutdown();
}
- if (m_finisher != NULL) {
- m_finisher->stop();
- delete m_finisher;
- m_finisher = NULL;
- }
-
- m_ioctx.unwatch2(m_watch_handle);
- librados::Rados rados(m_ioctx);
- rados.watch_flush();
}
void JournalMetadata::init(Context *on_init) {
@@ -56,7 +44,7 @@ void JournalMetadata::init(Context *on_init) {
m_finisher = new Finisher(m_cct);
m_finisher->start();
- m_timer = new SafeTimer(m_cct, m_timer_lock, false);
+ m_timer = new SafeTimer(m_cct, m_timer_lock, true);
m_timer->init();
int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
@@ -72,6 +60,34 @@ void JournalMetadata::init(Context *on_init) {
ctx);
}
+void JournalMetadata::shutdown() {
+ assert(m_initialized);
+ m_initialized = false;
+
+ if (m_timer != NULL) {
+ Mutex::Locker locker(m_timer_lock);
+ m_timer->shutdown();
+ delete m_timer;
+ m_timer = NULL;
+ }
+
+ if (m_finisher != NULL) {
+ m_finisher->stop();
+ delete m_finisher;
+ m_finisher = NULL;
+ }
+
+ if (m_watch_handle != 0) {
+ m_ioctx.unwatch2(m_watch_handle);
+ librados::Rados rados(m_ioctx);
+ rados.watch_flush();
+ m_watch_handle = 0;
+ }
+
+ m_async_op_tracker.wait_for_ops();
+ m_ioctx.aio_flush();
+}
+
int JournalMetadata::register_client(const std::string &description) {
ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h
index ce621c5e157..89ddcf1296b 100644
--- a/src/journal/JournalMetadata.h
+++ b/src/journal/JournalMetadata.h
@@ -7,9 +7,11 @@
#include "include/int_types.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
+#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "cls/journal/cls_journal_types.h"
+#include "journal/AsyncOpTracker.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <list>
@@ -44,6 +46,7 @@ public:
~JournalMetadata();
void init(Context *on_init);
+ void shutdown();
void add_listener(Listener *listener);
void remove_listener(Listener *listener);
@@ -148,12 +151,16 @@ private:
};
struct C_NotifyUpdate : public Context {
- JournalMetadataPtr journal_metadata;
+ JournalMetadata* journal_metadata;
Context *on_safe;
C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL)
- : journal_metadata(_journal_metadata), on_safe(_on_safe) {}
-
+ : journal_metadata(_journal_metadata), on_safe(_on_safe) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ virtual ~C_NotifyUpdate() {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
virtual void finish(int r) {
if (r == 0) {
journal_metadata->async_notify_update();
@@ -165,20 +172,24 @@ private:
};
struct C_ImmutableMetadata : public Context {
- JournalMetadataPtr journal_metadata;
+ JournalMetadata* journal_metadata;
Context *on_finish;
C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
: journal_metadata(_journal_metadata), on_finish(_on_finish) {
+ Mutex::Locker locker(journal_metadata->m_lock);
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ virtual ~C_ImmutableMetadata() {
+ journal_metadata->m_async_op_tracker.finish_op();
}
-
virtual void finish(int r) {
journal_metadata->handle_immutable_metadata(r, on_finish);
}
};
struct C_Refresh : public Context {
- JournalMetadataPtr journal_metadata;
+ JournalMetadata* journal_metadata;
uint64_t minimum_set;
uint64_t active_set;
RegisteredClients registered_clients;
@@ -186,8 +197,13 @@ private:
C_Refresh(JournalMetadata *_journal_metadata, Context *_on_finish)
: journal_metadata(_journal_metadata), minimum_set(0), active_set(0),
- on_finish(_on_finish) {}
-
+ on_finish(_on_finish) {
+ Mutex::Locker locker(journal_metadata->m_lock);
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ virtual ~C_Refresh() {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
virtual void finish(int r) {
journal_metadata->handle_refresh_complete(this, r);
}
@@ -228,6 +244,8 @@ private:
ObjectSetPosition m_commit_position;
Context *m_commit_position_ctx;
+ AsyncOpTracker m_async_op_tracker;
+
void handle_immutable_metadata(int r, Context *on_init);
void refresh(Context *on_finish);
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
index ef11420d74d..421a38b121e 100644
--- a/src/journal/JournalPlayer.cc
+++ b/src/journal/JournalPlayer.cc
@@ -14,18 +14,36 @@ namespace journal {
namespace {
-struct C_HandleComplete: public Context {
+struct C_HandleComplete : public Context {
ReplayHandler *replay_handler;
C_HandleComplete(ReplayHandler *_replay_handler)
: replay_handler(_replay_handler) {
+ replay_handler->get();
+ }
+ virtual ~C_HandleComplete() {
+ replay_handler->put();
}
-
virtual void finish(int r) {
replay_handler->handle_complete(r);
}
};
+struct C_HandleEntriesAvailable : public Context {
+ ReplayHandler *replay_handler;
+
+ C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
+ : replay_handler(_replay_handler) {
+ replay_handler->get();
+ }
+ virtual ~C_HandleEntriesAvailable() {
+ replay_handler->put();
+ }
+ virtual void finish(int r) {
+ replay_handler->handle_entries_available();
+ }
+};
+
} // anonymous namespace
JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
@@ -37,6 +55,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
m_watch_interval(0), m_commit_object(0) {
+ m_replay_handler->get();
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -55,6 +74,11 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
}
}
+JournalPlayer::~JournalPlayer() {
+ m_async_op_tracker.wait_for_ops();
+ m_replay_handler->put();
+}
+
void JournalPlayer::prefetch() {
m_lock.Lock();
assert(m_state == STATE_INIT);
@@ -244,9 +268,8 @@ int JournalPlayer::process_prefetch() {
ObjectPlayerPtr object_player = get_object_player();
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_lock.Unlock();
- m_replay_handler->handle_entries_available();
- m_lock.Lock();
+ m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
} else if (m_watch_enabled) {
object_player->watch(&m_process_state, m_watch_interval);
m_watch_scheduled = true;
@@ -268,9 +291,8 @@ int JournalPlayer::process_playback() {
ObjectPlayerPtr object_player = get_object_player();
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_lock.Unlock();
- m_replay_handler->handle_entries_available();
- m_lock.Lock();
+ m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
}
return 0;
}
@@ -355,7 +377,7 @@ int JournalPlayer::handle_fetched(int r, uint64_t object_num) {
JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p)
: player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1),
return_value(0) {
- player->get();
+ player->m_async_op_tracker.start_op();
}
void JournalPlayer::C_PrefetchBatch::add_fetch() {
@@ -374,7 +396,6 @@ void JournalPlayer::C_PrefetchBatch::complete(int r) {
if (refs == 0) {
player->process_state(return_value);
- player->put();
delete this;
}
}
diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h
index 43d81f4c9e1..613c9e305b6 100644
--- a/src/journal/JournalPlayer.h
+++ b/src/journal/JournalPlayer.h
@@ -8,7 +8,7 @@
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
-#include "common/RefCountedObj.h"
+#include "journal/AsyncOpTracker.h"
#include "journal/JournalMetadata.h"
#include "journal/ObjectPlayer.h"
#include "cls/journal/cls_journal_types.h"
@@ -19,10 +19,8 @@ class SafeTimer;
namespace journal {
class ReplayHandler;
-class JournalPlayer;
-typedef boost::intrusive_ptr<JournalPlayer> JournalPlayerPtr;
-class JournalPlayer : public RefCountedObject {
+class JournalPlayer {
public:
typedef cls::journal::EntryPosition EntryPosition;
typedef cls::journal::EntryPositions EntryPositions;
@@ -31,6 +29,7 @@ public:
JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
ReplayHandler *replay_handler);
+ ~JournalPlayer();
void prefetch();
void prefetch_and_watch(double interval);
@@ -58,6 +57,7 @@ private:
}
virtual void finish(int r) {}
};
+
struct C_PrefetchBatch : public Context {
JournalPlayer *player;
Mutex lock;
@@ -65,22 +65,28 @@ private:
int return_value;
C_PrefetchBatch(JournalPlayer *p);
+ virtual ~C_PrefetchBatch() {
+ player->m_async_op_tracker.finish_op();
+ }
void add_fetch();
virtual void complete(int r);
virtual void finish(int r) {}
};
+
struct C_Fetch : public Context {
JournalPlayer *player;
uint64_t object_num;
Context *on_fetch;
C_Fetch(JournalPlayer *p, uint64_t o, Context *c)
: player(p), object_num(o), on_fetch(c) {
- player->get();
+ player->m_async_op_tracker.start_op();
+ }
+ virtual ~C_Fetch() {
+ player->m_async_op_tracker.finish_op();
}
virtual void finish(int r) {
r = player->handle_fetched(r, object_num);
on_fetch->complete(r);
- player->put();
}
};
@@ -93,6 +99,8 @@ private:
C_ProcessState m_process_state;
+ AsyncOpTracker m_async_op_tracker;
+
mutable Mutex m_lock;
State m_state;
uint8_t m_splay_offset;
diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc
index 2b34873e5a2..cdb517b9b50 100644
--- a/src/journal/JournalTrimmer.cc
+++ b/src/journal/JournalTrimmer.cc
@@ -5,6 +5,7 @@
#include "journal/Utils.h"
#include "common/Cond.h"
#include "common/errno.h"
+#include "common/Finisher.h"
#include <limits>
#define dout_subsys ceph_subsys_journaler
@@ -18,19 +19,18 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
const JournalMetadataPtr &journal_metadata)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"),
- m_pending_ops(0), m_remove_set_pending(false), m_remove_set(0),
- m_remove_set_ctx(NULL) {
+ m_remove_set_pending(false), m_remove_set(0), m_remove_set_ctx(NULL) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
}
JournalTrimmer::~JournalTrimmer() {
- wait_for_ops();
+ m_async_op_tracker.wait_for_ops();
}
int JournalTrimmer::remove_objects() {
ldout(m_cct, 20) << __func__ << dendl;
- wait_for_ops();
+ m_async_op_tracker.wait_for_ops();
C_SaferCond ctx;
{
@@ -60,33 +60,13 @@ void JournalTrimmer::update_commit_position(
{
Mutex::Locker locker(m_lock);
- start_op();
+ m_async_op_tracker.start_op();
}
Context *ctx = new C_CommitPositionSafe(this, object_set_position);
m_journal_metadata->set_commit_position(object_set_position, ctx);
}
-void JournalTrimmer::start_op() {
- assert(m_lock.is_locked());
- ++m_pending_ops;
-}
-
-void JournalTrimmer::finish_op() {
- assert(m_lock.is_locked());
- assert(m_pending_ops > 0);
- if (--m_pending_ops == 0) {
- m_pending_ops_cond.Signal();
- }
-}
-
-void JournalTrimmer::wait_for_ops() {
- Mutex::Locker locker(m_lock);
- while (m_pending_ops > 0) {
- m_pending_ops_cond.Wait(m_lock);
- }
-}
-
void JournalTrimmer::trim_objects(uint64_t minimum_set) {
assert(m_lock.is_locked());
@@ -108,7 +88,7 @@ void JournalTrimmer::trim_objects(uint64_t minimum_set) {
void JournalTrimmer::remove_set(uint64_t object_set) {
assert(m_lock.is_locked());
- start_op();
+ m_async_op_tracker.start_op();
uint8_t splay_width = m_journal_metadata->get_splay_width();
C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width);
@@ -128,7 +108,6 @@ void JournalTrimmer::remove_set(uint64_t object_set) {
assert(r == 0);
comp->release();
}
- ctx->complete(-ENOENT);
}
void JournalTrimmer::handle_commit_position_safe(
@@ -164,7 +143,7 @@ void JournalTrimmer::handle_commit_position_safe(
trim_objects(object_set_position.object_number / splay_width);
}
}
- finish_op();
+ m_async_op_tracker.finish_op();
}
void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
@@ -192,7 +171,7 @@ void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
ldout(m_cct, 20) << "completing remove set context" << dendl;
m_remove_set_ctx->complete(r);
}
- finish_op();
+ m_async_op_tracker.finish_op();
}
JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer,
@@ -200,7 +179,7 @@ JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer,
uint8_t _splay_width)
: journal_trimmer(_journal_trimmer), object_set(_object_set),
lock(utils::unique_lock_name("C_RemoveSet::lock", this)),
- refs(_splay_width + 1), return_value(-ENOENT) {
+ refs(_splay_width), return_value(-ENOENT) {
}
void JournalTrimmer::C_RemoveSet::complete(int r) {
diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h
index 1ed6486a830..1ae994da57d 100644
--- a/src/journal/JournalTrimmer.h
+++ b/src/journal/JournalTrimmer.h
@@ -8,6 +8,7 @@
#include "include/rados/librados.hpp"
#include "include/Context.h"
#include "common/Mutex.h"
+#include "journal/AsyncOpTracker.h"
#include "journal/JournalMetadata.h"
#include "cls/journal/cls_journal_types.h"
@@ -34,9 +35,6 @@ private:
: journal_trimmer(_journal_trimmer),
object_set_position(_object_set_position) {}
- virtual void complete(int r) {
- finish(r);
- }
virtual void finish(int r) {
journal_trimmer->handle_commit_position_safe(r, object_set_position);
}
@@ -62,19 +60,14 @@ private:
JournalMetadataPtr m_journal_metadata;
- Mutex m_lock;
+ AsyncOpTracker m_async_op_tracker;
- size_t m_pending_ops;
- Cond m_pending_ops_cond;
+ Mutex m_lock;
bool m_remove_set_pending;
uint64_t m_remove_set;
Context *m_remove_set_ctx;
- void start_op();
- void finish_op();
- void wait_for_ops();
-
void trim_objects(uint64_t minimum_set);
void remove_set(uint64_t object_set);
diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc
index c61676bc009..b7ca392fc97 100644
--- a/src/journal/Journaler.cc
+++ b/src/journal/Journaler.cc
@@ -147,7 +147,7 @@ bool Journaler::try_pop_front(Payload *payload) {
void Journaler::stop_replay() {
assert(m_player != NULL);
m_player->unwatch();
- m_player->put();
+ delete m_player;
m_player = NULL;
}
diff --git a/src/journal/Makefile.am b/src/journal/Makefile.am
index af85525080d..99b84b14876 100644
--- a/src/journal/Makefile.am
+++ b/src/journal/Makefile.am
@@ -2,6 +2,7 @@ if ENABLE_CLIENT
if WITH_RADOS
libjournal_la_SOURCES = \
+ journal/AsyncOpTracker.cc \
journal/Entry.cc \
journal/Future.cc \
journal/FutureImpl.cc \
@@ -18,6 +19,7 @@ libjournal_la_SOURCES = \
noinst_LTLIBRARIES += libjournal.la
noinst_HEADERS += \
+ journal/AsyncOpTracker.h \
journal/Entry.h \
journal/Future.h \
journal/FutureImpl.h \
diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc
index 0a5b0568640..ed5798198d1 100644
--- a/src/journal/ObjectPlayer.cc
+++ b/src/journal/ObjectPlayer.cc
@@ -19,10 +19,10 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
: RefCountedObject(NULL, 0), m_object_num(object_num),
m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
- m_watch_interval(0), m_watch_task(NULL), m_watch_fetch(this),
+ m_watch_interval(0), m_watch_task(NULL),
m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
- m_watch_ctx_in_progress(false) {
+ m_watch_in_progress(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
@@ -30,6 +30,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
ObjectPlayer::~ObjectPlayer() {
{
Mutex::Locker locker(m_lock);
+ assert(!m_fetch_in_progress);
assert(m_watch_ctx == NULL);
}
}
@@ -54,29 +55,33 @@ void ObjectPlayer::fetch(Context *on_finish) {
void ObjectPlayer::watch(Context *on_fetch, double interval) {
ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
- {
- Mutex::Locker locker(m_lock);
- assert(m_watch_ctx == NULL);
- m_watch_ctx = on_fetch;
- }
- {
- Mutex::Locker locker(m_timer_lock);
- m_watch_interval = interval;
- }
+
+ Mutex::Locker timer_locker(m_timer_lock);
+ m_watch_interval = interval;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_watch_ctx == NULL);
+ m_watch_ctx = on_fetch;
+
schedule_watch();
}
void ObjectPlayer::unwatch() {
ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
- {
- Mutex::Locker locker(m_lock);
- while (m_watch_ctx_in_progress) {
- m_watch_ctx_cond.Wait(m_lock);
- }
- delete m_watch_ctx;
- m_watch_ctx = NULL;
- }
+ Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
+
cancel_watch();
+
+ Context *ctx = m_watch_ctx;
+ m_watch_ctx = NULL;
+
+ m_timer_lock.Unlock();
+ while (m_watch_in_progress) {
+ m_watch_in_progress_cond.Wait(m_lock);
+ }
+ m_timer_lock.Lock();
+ delete ctx;
}
void ObjectPlayer::front(Entry *entry) const {
@@ -167,16 +172,21 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
}
void ObjectPlayer::schedule_watch() {
+ assert(m_timer_lock.is_locked());
+ assert(m_lock.is_locked());
+ if (m_watch_ctx == NULL) {
+ return;
+ }
+
ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
- Mutex::Locker locker(m_timer_lock);
assert(m_watch_task == NULL);
m_watch_task = new C_WatchTask(this);
m_timer.add_event_after(m_watch_interval, m_watch_task);
}
void ObjectPlayer::cancel_watch() {
+ assert(m_timer_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
- Mutex::Locker locker(m_timer_lock);
if (m_watch_task != NULL) {
m_timer.cancel_event(m_watch_task);
m_watch_task = NULL;
@@ -184,28 +194,34 @@ void ObjectPlayer::cancel_watch() {
}
void ObjectPlayer::handle_watch_task() {
+ assert(m_timer_lock.is_locked());
+
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
{
- Mutex::Locker locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ assert(m_watch_ctx != NULL);
+
+ m_watch_in_progress = true;
m_watch_task = NULL;
}
- fetch(&m_watch_fetch);
+ fetch(new C_WatchFetch(this));
}
void ObjectPlayer::handle_watch_fetched(int r) {
ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
<< dendl;
- if (r == -ENOENT) {
- schedule_watch();
- return;
- }
- Context *on_finish;
+ Context *on_finish = NULL;
{
+ Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker locker(m_lock);
- m_watch_ctx_in_progress = true;
- on_finish = m_watch_ctx;
- m_watch_ctx = NULL;
+ assert(m_watch_in_progress);
+ if (r == -ENOENT) {
+ schedule_watch();
+ } else {
+ on_finish = m_watch_ctx;
+ m_watch_ctx = NULL;
+ }
}
if (on_finish != NULL) {
@@ -214,15 +230,14 @@ void ObjectPlayer::handle_watch_fetched(int r) {
{
Mutex::Locker locker(m_lock);
- m_watch_ctx_in_progress = false;
- m_watch_ctx_cond.Signal();
+ m_watch_in_progress = false;
+ m_watch_in_progress_cond.Signal();
}
}
void ObjectPlayer::C_Fetch::finish(int r) {
r = object_player->handle_fetch_complete(r, read_bl);
on_finish->complete(r);
- object_player->put();
}
void ObjectPlayer::C_WatchTask::finish(int r) {
diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h
index ff85575f304..5c00ba1c056 100644
--- a/src/journal/ObjectPlayer.h
+++ b/src/journal/ObjectPlayer.h
@@ -73,30 +73,24 @@ private:
typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
struct C_Fetch : public Context {
- ObjectPlayer *object_player;
+ ObjectPlayerPtr object_player;
Context *on_finish;
bufferlist read_bl;
C_Fetch(ObjectPlayer *o, Context *ctx)
: object_player(o), on_finish(ctx) {
- object_player->get();
}
virtual void finish(int r);
};
struct C_WatchTask : public Context {
- ObjectPlayer *object_player;
+ ObjectPlayerPtr object_player;
C_WatchTask(ObjectPlayer *o) : object_player(o) {
- object_player->get();
}
virtual void finish(int r);
};
struct C_WatchFetch : public Context {
- ObjectPlayer *object_player;
+ ObjectPlayerPtr object_player;
C_WatchFetch(ObjectPlayer *o) : object_player(o) {
}
- virtual void complete(int r) {
- finish(r);
- object_player->put();
- }
virtual void finish(int r);
};
@@ -113,7 +107,6 @@ private:
double m_watch_interval;
Context *m_watch_task;
- C_WatchFetch m_watch_fetch;
mutable Mutex m_lock;
bool m_fetch_in_progress;
@@ -125,8 +118,8 @@ private:
InvalidRanges m_invalid_ranges;
Context *m_watch_ctx;
- Cond m_watch_ctx_cond;
- bool m_watch_ctx_in_progress;
+ Cond m_watch_in_progress_cond;
+ bool m_watch_in_progress;
int handle_fetch_complete(int r, const bufferlist &bl);
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index af16ccdb00a..cf96b9417ec 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -37,7 +37,7 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
}
ObjectRecorder::~ObjectRecorder() {
- cancel_append_task();
+ assert(m_append_task == NULL);
assert(m_append_buffers.empty());
assert(m_in_flight_appends.empty());
}
@@ -72,12 +72,12 @@ void ObjectRecorder::flush(Context *on_safe) {
future = Future(m_append_buffers.rbegin()->first);
flush_appends(true);
- cancel_append_task();
} else if (!m_in_flight_appends.empty()) {
AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
assert(!append_buffers.empty());
future = Future(append_buffers.rbegin()->first);
}
+ cancel_append_task();
}
if (future.is_valid()) {
@@ -85,7 +85,6 @@ void ObjectRecorder::flush(Context *on_safe) {
} else {
on_safe->complete(0);
}
-
}
void ObjectRecorder::flush(const FutureImplPtr &future) {
@@ -140,16 +139,11 @@ bool ObjectRecorder::close_object() {
}
void ObjectRecorder::handle_append_task() {
- {
- Mutex::Locker locker(m_lock);
- flush_appends(true);
- }
+ assert(m_timer_lock.is_locked());
+ m_append_task = NULL;
- {
- Mutex::Locker locker(m_timer_lock);
- m_append_task = NULL;
- put();
- }
+ Mutex::Locker locker(m_lock);
+ flush_appends(true);
}
void ObjectRecorder::cancel_append_task() {
@@ -157,14 +151,12 @@ void ObjectRecorder::cancel_append_task() {
if (m_append_task != NULL) {
m_timer.cancel_event(m_append_task);
m_append_task = NULL;
- put();
}
}
void ObjectRecorder::schedule_append_task() {
Mutex::Locker locker(m_timer_lock);
if (m_append_task == NULL && m_flush_age > 0) {
- get();
m_append_task = new C_AppendTask(this);
m_timer.add_event_after(m_flush_age, m_append_task);
}
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
index cc6425c23b8..566c41fd780 100644
--- a/src/journal/ObjectRecorder.h
+++ b/src/journal/ObjectRecorder.h
@@ -79,11 +79,11 @@ private:
};
struct C_AppendTask : public Context {
ObjectRecorder *object_recorder;
- C_AppendTask(ObjectRecorder *o) : object_recorder(o) {}
- virtual void complete(int r) {
+ C_AppendTask(ObjectRecorder *o) : object_recorder(o) {
+ }
+ virtual void finish(int r) {
object_recorder->handle_append_task();
}
- virtual void finish(int r) {}
};
struct C_AppendFlush : public Context {
ObjectRecorder *object_recorder;
diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h
index 208350461fe..e61240d8c1f 100644
--- a/src/journal/ReplayHandler.h
+++ b/src/journal/ReplayHandler.h
@@ -9,6 +9,9 @@ namespace journal {
struct ReplayHandler {
virtual ~ReplayHandler() {}
+ virtual void get() = 0;
+ virtual void put() = 0;
+
virtual void handle_entries_available() = 0;
virtual void handle_complete(int r) = 0;
};