summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2016-05-24 18:06:26 +0200
committerJason Dillaman <dillaman@redhat.com>2016-05-25 14:19:17 +0200
commit79b41346678b3acdeb547fe07e44d8d0543d37a7 (patch)
tree342a5d2cd4cd7d088e71cbecdbece0428e0d4bce /src
parentMerge pull request #9122 from cbodley/wip-rgw-httpmgr-pipe (diff)
downloadceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.tar.xz
ceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.zip
journal: player shutdown is now handled asynchronously
Fixes: http://tracker.ceph.com/issues/15949 Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src')
-rw-r--r--src/journal/AsyncOpTracker.cc33
-rw-r--r--src/journal/AsyncOpTracker.h6
-rw-r--r--src/journal/JournalPlayer.cc53
-rw-r--r--src/journal/JournalPlayer.h7
-rw-r--r--src/journal/Journaler.cc7
-rw-r--r--src/journal/ObjectPlayer.cc68
-rw-r--r--src/journal/ObjectPlayer.h7
-rw-r--r--src/journal/Utils.h24
-rw-r--r--src/test/journal/test_JournalPlayer.cc134
-rw-r--r--src/test/journal/test_ObjectPlayer.cc11
10 files changed, 282 insertions, 68 deletions
diff --git a/src/journal/AsyncOpTracker.cc b/src/journal/AsyncOpTracker.cc
index 8c24088681e..13a55fa7a47 100644
--- a/src/journal/AsyncOpTracker.cc
+++ b/src/journal/AsyncOpTracker.cc
@@ -22,10 +22,18 @@ void AsyncOpTracker::start_op() {
}
void AsyncOpTracker::finish_op() {
- Mutex::Locker locker(m_lock);
- assert(m_pending_ops > 0);
- if (--m_pending_ops == 0) {
- m_cond.Signal();
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_pending_ops > 0);
+ if (--m_pending_ops == 0) {
+ m_cond.Signal();
+ std::swap(on_finish, m_on_finish);
+ }
+ }
+
+ if (on_finish != nullptr) {
+ on_finish->complete(0);
}
}
@@ -36,4 +44,21 @@ void AsyncOpTracker::wait_for_ops() {
}
}
+void AsyncOpTracker::wait_for_ops(Context *on_finish) {
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_on_finish == nullptr);
+ if (m_pending_ops > 0) {
+ m_on_finish = on_finish;
+ return;
+ }
+ }
+ on_finish->complete(0);
+}
+
+bool AsyncOpTracker::empty() {
+ Mutex::Locker locker(m_lock);
+ return (m_pending_ops == 0);
+}
+
} // namespace journal
diff --git a/src/journal/AsyncOpTracker.h b/src/journal/AsyncOpTracker.h
index cec332f8471..a88cd453fe9 100644
--- a/src/journal/AsyncOpTracker.h
+++ b/src/journal/AsyncOpTracker.h
@@ -8,6 +8,8 @@
#include "common/Cond.h"
#include "common/Mutex.h"
+struct Context;
+
namespace journal {
class AsyncOpTracker {
@@ -19,11 +21,15 @@ public:
void finish_op();
void wait_for_ops();
+ void wait_for_ops(Context *on_finish);
+
+ bool empty();
private:
Mutex m_lock;
Cond m_cond;
uint32_t m_pending_ops;
+ Context *m_on_finish = nullptr;
};
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
index a79b2d484f7..28905a2ed13 100644
--- a/src/journal/JournalPlayer.cc
+++ b/src/journal/JournalPlayer.cc
@@ -79,9 +79,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
}
JournalPlayer::~JournalPlayer() {
- m_async_op_tracker.wait_for_ops();
+ assert(m_async_op_tracker.empty());
{
Mutex::Locker locker(m_lock);
+ assert(m_shut_down);
assert(m_fetch_object_numbers.empty());
assert(!m_watch_scheduled);
}
@@ -140,16 +141,32 @@ void JournalPlayer::prefetch_and_watch(double interval) {
prefetch();
}
-void JournalPlayer::unwatch() {
+void JournalPlayer::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
+
+ assert(!m_shut_down);
+ m_shut_down = true;
m_watch_enabled = false;
+
+ on_finish = utils::create_async_context_callback(
+ m_journal_metadata, on_finish);
+
if (m_watch_scheduled) {
- for (auto &players : m_object_players) {
- players.second.begin()->second->unwatch();
+ ObjectPlayerPtr object_player = get_object_player();
+ switch (m_watch_step) {
+ case WATCH_STEP_FETCH_FIRST:
+ object_player = m_object_players.begin()->second.begin()->second;
+ // fallthrough
+ case WATCH_STEP_FETCH_CURRENT:
+ object_player->unwatch();
+ break;
+ case WATCH_STEP_ASSERT_ACTIVE:
+ break;
}
- m_watch_scheduled = false;
}
+
+ m_async_op_tracker.wait_for_ops(on_finish);
}
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
@@ -623,6 +640,10 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
assert(m_fetch_object_numbers.count(object_num) == 1);
m_fetch_object_numbers.erase(object_num);
+ if (m_shut_down) {
+ return;
+ }
+
if (r == -ENOENT) {
r = 0;
}
@@ -647,6 +668,8 @@ void JournalPlayer::schedule_watch() {
// by an incomplete tag sequence
ldout(m_cct, 20) << __func__ << ": asserting active tag="
<< *m_active_tag_tid << dendl;
+
+ m_async_op_tracker.start_op();
FunctionContext *ctx = new FunctionContext([this](int r) {
handle_watch_assert_active(r);
});
@@ -654,9 +677,9 @@ void JournalPlayer::schedule_watch() {
return;
}
+ ObjectPlayerPtr object_player;
double watch_interval = m_watch_interval;
- ObjectPlayerPtr object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_CURRENT:
{
@@ -684,21 +707,22 @@ void JournalPlayer::schedule_watch() {
ldout(m_cct, 20) << __func__ << ": scheduling watch on "
<< object_player->get_oid() << dendl;
- C_Watch *ctx = new C_Watch(this, object_player->get_object_number());
+ Context *ctx = utils::create_async_context_callback(
+ m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
object_player->watch(ctx, watch_interval);
}
void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
- if (r == -ECANCELED) {
- // unwatch of object player(s)
- return;
- }
-
Mutex::Locker locker(m_lock);
assert(m_watch_scheduled);
m_watch_scheduled = false;
+ if (m_shut_down || r == -ECANCELED) {
+ // unwatch of object player(s)
+ return;
+ }
+
ObjectPlayerPtr object_player = get_object_player(object_num);
if (r == 0 && object_player->empty()) {
// possibly need to prune this empty object player if we've
@@ -737,7 +761,10 @@ void JournalPlayer::handle_watch_assert_active(int r) {
}
m_watch_step = WATCH_STEP_FETCH_CURRENT;
- schedule_watch();
+ if (!m_shut_down && m_watch_enabled) {
+ schedule_watch();
+ }
+ m_async_op_tracker.finish_op();
}
void JournalPlayer::notify_entries_available() {
diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h
index eb156b3e711..f50258296c9 100644
--- a/src/journal/JournalPlayer.h
+++ b/src/journal/JournalPlayer.h
@@ -36,7 +36,7 @@ public:
void prefetch();
void prefetch_and_watch(double interval);
- void unwatch();
+ void shut_down(Context *on_finish);
bool try_pop_front(Entry *entry, uint64_t *commit_tid);
@@ -79,6 +79,10 @@ private:
uint64_t object_num;
C_Watch(JournalPlayer *player, uint64_t object_num)
: player(player), object_num(object_num) {
+ player->m_async_op_tracker.start_op();
+ }
+ virtual ~C_Watch() {
+ player->m_async_op_tracker.finish_op();
}
virtual void finish(int r) override {
@@ -105,6 +109,7 @@ private:
WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
bool m_watch_prune_active_tag = false;
+ bool m_shut_down = false;
bool m_handler_notified = false;
ObjectNumbers m_fetch_object_numbers;
diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc
index 2a02f60af3c..d1e80ea8b61 100644
--- a/src/journal/Journaler.cc
+++ b/src/journal/Journaler.cc
@@ -317,7 +317,12 @@ bool Journaler::try_pop_front(ReplayEntry *replay_entry,
void Journaler::stop_replay() {
assert(m_player != NULL);
- m_player->unwatch();
+
+ // TODO
+ C_SaferCond ctx;
+ m_player->shut_down(&ctx);
+ ctx.wait();
+
delete m_player;
m_player = NULL;
}
diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc
index 2c2f3e33a40..f86e3ef9370 100644
--- a/src/journal/ObjectPlayer.cc
+++ b/src/journal/ObjectPlayer.cc
@@ -21,8 +21,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
m_watch_interval(0), m_watch_task(NULL),
m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
- m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
- m_watch_in_progress(false) {
+ m_fetch_in_progress(false), m_read_off(0) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
@@ -32,8 +31,7 @@ ObjectPlayer::~ObjectPlayer() {
Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker locker(m_lock);
assert(!m_fetch_in_progress);
- assert(!m_watch_in_progress);
- assert(m_watch_ctx == NULL);
+ assert(m_watch_ctx == nullptr);
}
}
@@ -62,13 +60,10 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) {
Mutex::Locker timer_locker(m_timer_lock);
m_watch_interval = interval;
- assert(m_watch_ctx == NULL);
+ assert(m_watch_ctx == nullptr);
m_watch_ctx = on_fetch;
- // watch callback might lead to re-scheduled watch
- if (!m_watch_in_progress) {
- schedule_watch();
- }
+ schedule_watch();
}
void ObjectPlayer::unwatch() {
@@ -76,13 +71,14 @@ void ObjectPlayer::unwatch() {
Context *watch_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
+ assert(!m_unwatched);
+ m_unwatched = true;
- cancel_watch();
+ if (!cancel_watch()) {
+ return;
+ }
std::swap(watch_ctx, m_watch_ctx);
- while (m_watch_in_progress) {
- m_watch_in_progress_cond.Wait(m_timer_lock);
- }
}
if (watch_ctx != nullptr) {
@@ -190,24 +186,27 @@ void ObjectPlayer::schedule_watch() {
m_timer.add_event_after(m_watch_interval, m_watch_task);
}
-void ObjectPlayer::cancel_watch() {
+bool ObjectPlayer::cancel_watch() {
assert(m_timer_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
- if (m_watch_task != NULL) {
- m_timer.cancel_event(m_watch_task);
- m_watch_task = NULL;
+ if (m_watch_task != nullptr) {
+ bool canceled = m_timer.cancel_event(m_watch_task);
+ assert(canceled);
+
+ m_watch_task = nullptr;
+ return true;
}
+ return false;
}
void ObjectPlayer::handle_watch_task() {
assert(m_timer_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
- assert(m_watch_ctx != NULL);
+ assert(m_watch_ctx != nullptr);
+ assert(m_watch_task != nullptr);
- assert(!m_watch_in_progress);
- m_watch_in_progress = true;
- m_watch_task = NULL;
+ m_watch_task = nullptr;
fetch(new C_WatchFetch(this));
}
@@ -215,38 +214,31 @@ void ObjectPlayer::handle_watch_fetched(int r) {
ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
<< dendl;
- Context *on_finish = nullptr;
+ Context *watch_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
- assert(m_watch_in_progress);
if (r == -ENOENT) {
r = 0;
} else {
m_refetch_required = true;
}
- std::swap(on_finish, m_watch_ctx);
- }
-
- if (on_finish != nullptr) {
- on_finish->complete(r);
- }
-
- {
- Mutex::Locker locker(m_timer_lock);
- assert(m_watch_in_progress);
+ std::swap(watch_ctx, m_watch_ctx);
- // callback might have attempted to re-schedule the watch -- complete now
- if (m_watch_ctx != nullptr) {
- schedule_watch();
+ if (m_unwatched) {
+ m_unwatched = false;
+ r = -ECANCELED;
}
+ }
- m_watch_in_progress = false;
- m_watch_in_progress_cond.Signal();
+ if (watch_ctx != nullptr) {
+ watch_ctx->complete(r);
}
}
void ObjectPlayer::C_Fetch::finish(int r) {
r = object_player->handle_fetch_complete(r, read_bl);
+ object_player.reset();
+
on_finish->complete(r);
}
diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h
index 73fa7d132b2..d0809cec8fe 100644
--- a/src/journal/ObjectPlayer.h
+++ b/src/journal/ObjectPlayer.h
@@ -117,16 +117,15 @@ private:
EntryKeys m_entry_keys;
InvalidRanges m_invalid_ranges;
- Context *m_watch_ctx;
- Cond m_watch_in_progress_cond;
- bool m_watch_in_progress;
+ Context *m_watch_ctx = nullptr;
+ bool m_unwatched = false;
bool m_refetch_required = true;
int handle_fetch_complete(int r, const bufferlist &bl);
void schedule_watch();
- void cancel_watch();
+ bool cancel_watch();
void handle_watch_task();
void handle_watch_fetched(int r);
};
diff --git a/src/journal/Utils.h b/src/journal/Utils.h
index e29f359acae..b0cee75ae82 100644
--- a/src/journal/Utils.h
+++ b/src/journal/Utils.h
@@ -5,12 +5,30 @@
#define CEPH_JOURNAL_UTILS_H
#include "include/int_types.h"
+#include "include/Context.h"
#include "include/rados/librados.hpp"
#include <string>
namespace journal {
namespace utils {
+namespace detail {
+
+template <typename M>
+struct C_AsyncCallback : public Context {
+ M journal_metadata;
+ Context *on_finish;
+
+ C_AsyncCallback(M journal_metadata, Context *on_finish)
+ : journal_metadata(journal_metadata), on_finish(on_finish) {
+ }
+ virtual void finish(int r) {
+ journal_metadata->queue(on_finish, r);
+ }
+};
+
+} // namespace detail
+
template <typename T, void(T::*MF)(int)>
void rados_state_callback(rados_completion_t c, void *arg) {
T *obj = reinterpret_cast<T*>(arg);
@@ -24,6 +42,12 @@ std::string unique_lock_name(const std::string &name, void *address);
void rados_ctx_callback(rados_completion_t c, void *arg);
+template <typename M>
+Context *create_async_context_callback(M journal_metadata, Context *on_finish) {
+ // use async callback to acquire a clean lock context
+ return new detail::C_AsyncCallback<M>(journal_metadata, on_finish);
+}
+
} // namespace utils
} // namespace journal
diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc
index d106ac4e28d..000f13ba4d8 100644
--- a/src/test/journal/test_JournalPlayer.cc
+++ b/src/test/journal/test_JournalPlayer.cc
@@ -11,6 +11,7 @@
#include "gtest/gtest.h"
#include "test/journal/RadosTestFixture.h"
#include <list>
+#include <boost/scope_exit.hpp>
class TestJournalPlayer : public RadosTestFixture {
public:
@@ -142,6 +143,11 @@ TEST_F(TestJournalPlayer, Prefetch) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -183,6 +189,11 @@ TEST_F(TestJournalPlayer, PrefetchSkip) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -213,6 +224,11 @@ TEST_F(TestJournalPlayer, PrefetchWithoutCommit) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -248,6 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
@@ -282,6 +303,11 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
@@ -311,6 +337,11 @@ TEST_F(TestJournalPlayer, PrefetchMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(1));
ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
@@ -356,6 +387,11 @@ TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(2));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@@ -387,6 +423,11 @@ TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
@@ -421,6 +462,11 @@ TEST_F(TestJournalPlayer, PrefetchStaleEntries) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
@@ -452,6 +498,11 @@ TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
@@ -484,6 +535,11 @@ TEST_F(TestJournalPlayer, PrefetchAndWatch) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
@@ -518,6 +574,11 @@ TEST_F(TestJournalPlayer, PrefetchSkippedObject) {
ASSERT_EQ(0, metadata->set_active_set(2));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -565,6 +626,11 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) {
metadata->set_minimum_set(2);
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 8, 300, 0));
ASSERT_EQ(0, write_entry(oid, 8, 301, 0));
@@ -607,6 +673,11 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
@@ -652,6 +723,11 @@ TEST_F(TestJournalPlayer, LiveReplayMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
@@ -702,6 +778,11 @@ TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(2));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@@ -733,6 +814,11 @@ TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
C_SaferCond ctx1;
cls::journal::Tag tag1;
@@ -787,6 +873,11 @@ TEST_F(TestJournalPlayer, LiveReplayStaleEntries) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
@@ -818,6 +909,11 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(1));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@@ -844,3 +940,41 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
ASSERT_EQ(expected_entries, entries);
}
+TEST_F(TestJournalPlayer, PrefechShutDown) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, {}));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch();
+}
+
+TEST_F(TestJournalPlayer, LiveReplayShutDown) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, {}));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch_and_watch(0.25);
+}
+
diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc
index 6103ee6b67f..67c35a1d12c 100644
--- a/src/test/journal/test_ObjectPlayer.cc
+++ b/src/test/journal/test_ObjectPlayer.cc
@@ -262,14 +262,11 @@ TEST_F(TestObjectPlayer, Unwatch) {
std::string oid = get_temp_oid();
journal::ObjectPlayerPtr object = create_object(oid, 14);
- Mutex mutex("lock");
- Cond cond;
- bool done = false;
- int rval = 0;
- C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval);
- object->watch(ctx, 600);
+ C_SaferCond watch_ctx;
+ object->watch(&watch_ctx, 600);
usleep(200000);
- ASSERT_FALSE(done);
+
object->unwatch();
+ ASSERT_EQ(-ECANCELED, watch_ctx.wait());
}