summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalPlayer.cc
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2015-07-16 20:41:49 +0200
committerJason Dillaman <dillaman@redhat.com>2015-11-06 02:42:42 +0100
commit4d3969d6c15bdfa3dca3aae1ed0e2aa9737c81e9 (patch)
tree71626047a6fd9b008f52ae1e1e84ae0410649517 /src/journal/JournalPlayer.cc
parenttests: update journal tests based on API changes (diff)
downloadceph-4d3969d6c15bdfa3dca3aae1ed0e2aa9737c81e9.tar.xz
ceph-4d3969d6c15bdfa3dca3aae1ed0e2aa9737c81e9.zip
journal: fix issues discovered via valgrind
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/journal/JournalPlayer.cc')
-rw-r--r--src/journal/JournalPlayer.cc41
1 files changed, 31 insertions, 10 deletions
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
index ef11420d74d..421a38b121e 100644
--- a/src/journal/JournalPlayer.cc
+++ b/src/journal/JournalPlayer.cc
@@ -14,18 +14,36 @@ namespace journal {
namespace {
-struct C_HandleComplete: public Context {
+struct C_HandleComplete : public Context {
ReplayHandler *replay_handler;
C_HandleComplete(ReplayHandler *_replay_handler)
: replay_handler(_replay_handler) {
+ replay_handler->get();
+ }
+ virtual ~C_HandleComplete() {
+ replay_handler->put();
}
-
virtual void finish(int r) {
replay_handler->handle_complete(r);
}
};
+struct C_HandleEntriesAvailable : public Context {
+ ReplayHandler *replay_handler;
+
+ C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
+ : replay_handler(_replay_handler) {
+ replay_handler->get();
+ }
+ virtual ~C_HandleEntriesAvailable() {
+ replay_handler->put();
+ }
+ virtual void finish(int r) {
+ replay_handler->handle_entries_available();
+ }
+};
+
} // anonymous namespace
JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
@@ -37,6 +55,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
m_watch_interval(0), m_commit_object(0) {
+ m_replay_handler->get();
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -55,6 +74,11 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
}
}
+JournalPlayer::~JournalPlayer() {
+ m_async_op_tracker.wait_for_ops();
+ m_replay_handler->put();
+}
+
void JournalPlayer::prefetch() {
m_lock.Lock();
assert(m_state == STATE_INIT);
@@ -244,9 +268,8 @@ int JournalPlayer::process_prefetch() {
ObjectPlayerPtr object_player = get_object_player();
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_lock.Unlock();
- m_replay_handler->handle_entries_available();
- m_lock.Lock();
+ m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
} else if (m_watch_enabled) {
object_player->watch(&m_process_state, m_watch_interval);
m_watch_scheduled = true;
@@ -268,9 +291,8 @@ int JournalPlayer::process_playback() {
ObjectPlayerPtr object_player = get_object_player();
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_lock.Unlock();
- m_replay_handler->handle_entries_available();
- m_lock.Lock();
+ m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
}
return 0;
}
@@ -355,7 +377,7 @@ int JournalPlayer::handle_fetched(int r, uint64_t object_num) {
JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p)
: player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1),
return_value(0) {
- player->get();
+ player->m_async_op_tracker.start_op();
}
void JournalPlayer::C_PrefetchBatch::add_fetch() {
@@ -374,7 +396,6 @@ void JournalPlayer::C_PrefetchBatch::complete(int r) {
if (refs == 0) {
player->process_state(return_value);
- player->put();
delete this;
}
}