diff options
author | Jason Dillaman <dillaman@redhat.com> | 2016-05-24 18:06:26 +0200 |
---|---|---|
committer | Jason Dillaman <dillaman@redhat.com> | 2016-05-25 14:19:17 +0200 |
commit | 79b41346678b3acdeb547fe07e44d8d0543d37a7 (patch) | |
tree | 342a5d2cd4cd7d088e71cbecdbece0428e0d4bce /src | |
parent | Merge pull request #9122 from cbodley/wip-rgw-httpmgr-pipe (diff) | |
download | ceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.tar.xz ceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.zip |
journal: player shutdown is now handled asynchronously
Fixes: http://tracker.ceph.com/issues/15949
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/journal/AsyncOpTracker.cc | 33 | ||||
-rw-r--r-- | src/journal/AsyncOpTracker.h | 6 | ||||
-rw-r--r-- | src/journal/JournalPlayer.cc | 53 | ||||
-rw-r--r-- | src/journal/JournalPlayer.h | 7 | ||||
-rw-r--r-- | src/journal/Journaler.cc | 7 | ||||
-rw-r--r-- | src/journal/ObjectPlayer.cc | 68 | ||||
-rw-r--r-- | src/journal/ObjectPlayer.h | 7 | ||||
-rw-r--r-- | src/journal/Utils.h | 24 | ||||
-rw-r--r-- | src/test/journal/test_JournalPlayer.cc | 134 | ||||
-rw-r--r-- | src/test/journal/test_ObjectPlayer.cc | 11 |
10 files changed, 282 insertions, 68 deletions
diff --git a/src/journal/AsyncOpTracker.cc b/src/journal/AsyncOpTracker.cc index 8c24088681e..13a55fa7a47 100644 --- a/src/journal/AsyncOpTracker.cc +++ b/src/journal/AsyncOpTracker.cc @@ -22,10 +22,18 @@ void AsyncOpTracker::start_op() { } void AsyncOpTracker::finish_op() { - Mutex::Locker locker(m_lock); - assert(m_pending_ops > 0); - if (--m_pending_ops == 0) { - m_cond.Signal(); + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + assert(m_pending_ops > 0); + if (--m_pending_ops == 0) { + m_cond.Signal(); + std::swap(on_finish, m_on_finish); + } + } + + if (on_finish != nullptr) { + on_finish->complete(0); } } @@ -36,4 +44,21 @@ void AsyncOpTracker::wait_for_ops() { } } +void AsyncOpTracker::wait_for_ops(Context *on_finish) { + { + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + if (m_pending_ops > 0) { + m_on_finish = on_finish; + return; + } + } + on_finish->complete(0); +} + +bool AsyncOpTracker::empty() { + Mutex::Locker locker(m_lock); + return (m_pending_ops == 0); +} + } // namespace journal diff --git a/src/journal/AsyncOpTracker.h b/src/journal/AsyncOpTracker.h index cec332f8471..a88cd453fe9 100644 --- a/src/journal/AsyncOpTracker.h +++ b/src/journal/AsyncOpTracker.h @@ -8,6 +8,8 @@ #include "common/Cond.h" #include "common/Mutex.h" +struct Context; + namespace journal { class AsyncOpTracker { @@ -19,11 +21,15 @@ public: void finish_op(); void wait_for_ops(); + void wait_for_ops(Context *on_finish); + + bool empty(); private: Mutex m_lock; Cond m_cond; uint32_t m_pending_ops; + Context *m_on_finish = nullptr; }; diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index a79b2d484f7..28905a2ed13 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -79,9 +79,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, } JournalPlayer::~JournalPlayer() { - m_async_op_tracker.wait_for_ops(); + assert(m_async_op_tracker.empty()); { Mutex::Locker locker(m_lock); + assert(m_shut_down); assert(m_fetch_object_numbers.empty()); assert(!m_watch_scheduled); } @@ -140,16 +141,32 @@ void JournalPlayer::prefetch_and_watch(double interval) { prefetch(); } -void JournalPlayer::unwatch() { +void JournalPlayer::shut_down(Context *on_finish) { ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); + + assert(!m_shut_down); + m_shut_down = true; m_watch_enabled = false; + + on_finish = utils::create_async_context_callback( + m_journal_metadata, on_finish); + if (m_watch_scheduled) { - for (auto &players : m_object_players) { - players.second.begin()->second->unwatch(); + ObjectPlayerPtr object_player = get_object_player(); + switch (m_watch_step) { + case WATCH_STEP_FETCH_FIRST: + object_player = m_object_players.begin()->second.begin()->second; + // fallthrough + case WATCH_STEP_FETCH_CURRENT: + object_player->unwatch(); + break; + case WATCH_STEP_ASSERT_ACTIVE: + break; } - m_watch_scheduled = false; } + + m_async_op_tracker.wait_for_ops(on_finish); } bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { @@ -623,6 +640,10 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { assert(m_fetch_object_numbers.count(object_num) == 1); m_fetch_object_numbers.erase(object_num); + if (m_shut_down) { + return; + } + if (r == -ENOENT) { r = 0; } @@ -647,6 +668,8 @@ void JournalPlayer::schedule_watch() { // by an incomplete tag sequence ldout(m_cct, 20) << __func__ << ": asserting active tag=" << *m_active_tag_tid << dendl; + + m_async_op_tracker.start_op(); FunctionContext *ctx = new FunctionContext([this](int r) { handle_watch_assert_active(r); }); @@ -654,9 +677,9 @@ void JournalPlayer::schedule_watch() { return; } + ObjectPlayerPtr object_player; double watch_interval = m_watch_interval; - ObjectPlayerPtr object_player = get_object_player(); switch (m_watch_step) { case WATCH_STEP_FETCH_CURRENT: { @@ -684,21 +707,22 @@ void JournalPlayer::schedule_watch() { ldout(m_cct, 20) << __func__ << ": scheduling watch on " << object_player->get_oid() << dendl; - C_Watch *ctx = new C_Watch(this, object_player->get_object_number()); + Context *ctx = utils::create_async_context_callback( + m_journal_metadata, new C_Watch(this, object_player->get_object_number())); object_player->watch(ctx, watch_interval); } void JournalPlayer::handle_watch(uint64_t object_num, int r) { ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; - if (r == -ECANCELED) { - // unwatch of object player(s) - return; - } - Mutex::Locker locker(m_lock); assert(m_watch_scheduled); m_watch_scheduled = false; + if (m_shut_down || r == -ECANCELED) { + // unwatch of object player(s) + return; + } + ObjectPlayerPtr object_player = get_object_player(object_num); if (r == 0 && object_player->empty()) { // possibly need to prune this empty object player if we've @@ -737,7 +761,10 @@ void JournalPlayer::handle_watch_assert_active(int r) { } m_watch_step = WATCH_STEP_FETCH_CURRENT; - schedule_watch(); + if (!m_shut_down && m_watch_enabled) { + schedule_watch(); + } + m_async_op_tracker.finish_op(); } void JournalPlayer::notify_entries_available() { diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index eb156b3e711..f50258296c9 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -36,7 +36,7 @@ public: void prefetch(); void prefetch_and_watch(double interval); - void unwatch(); + void shut_down(Context *on_finish); bool try_pop_front(Entry *entry, uint64_t *commit_tid); @@ -79,6 +79,10 @@ private: uint64_t object_num; C_Watch(JournalPlayer *player, uint64_t object_num) : player(player), object_num(object_num) { + player->m_async_op_tracker.start_op(); + } + virtual ~C_Watch() { + player->m_async_op_tracker.finish_op(); } virtual void finish(int r) override { @@ -105,6 +109,7 @@ private: WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT; bool m_watch_prune_active_tag = false; + bool m_shut_down = false; bool m_handler_notified = false; ObjectNumbers m_fetch_object_numbers; diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 2a02f60af3c..d1e80ea8b61 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -317,7 +317,12 @@ bool Journaler::try_pop_front(ReplayEntry *replay_entry, void Journaler::stop_replay() { assert(m_player != NULL); - m_player->unwatch(); + + // TODO + C_SaferCond ctx; + m_player->shut_down(&ctx); + ctx.wait(); + delete m_player; m_player = NULL; } diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index 2c2f3e33a40..f86e3ef9370 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -21,8 +21,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order), m_watch_interval(0), m_watch_task(NULL), m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)), - m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL), - m_watch_in_progress(false) { + m_fetch_in_progress(false), m_read_off(0) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); } @@ -32,8 +31,7 @@ ObjectPlayer::~ObjectPlayer() { Mutex::Locker timer_locker(m_timer_lock); Mutex::Locker locker(m_lock); assert(!m_fetch_in_progress); - assert(!m_watch_in_progress); - assert(m_watch_ctx == NULL); + assert(m_watch_ctx == nullptr); } } @@ -62,13 +60,10 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) { Mutex::Locker timer_locker(m_timer_lock); m_watch_interval = interval; - assert(m_watch_ctx == NULL); + assert(m_watch_ctx == nullptr); m_watch_ctx = on_fetch; - // watch callback might lead to re-scheduled watch - if (!m_watch_in_progress) { - schedule_watch(); - } + schedule_watch(); } void ObjectPlayer::unwatch() { @@ -76,13 +71,14 @@ void ObjectPlayer::unwatch() { Context *watch_ctx = nullptr; { Mutex::Locker timer_locker(m_timer_lock); + assert(!m_unwatched); + m_unwatched = true; - cancel_watch(); + if (!cancel_watch()) { + return; + } std::swap(watch_ctx, m_watch_ctx); - while (m_watch_in_progress) { - m_watch_in_progress_cond.Wait(m_timer_lock); - } } if (watch_ctx != nullptr) { @@ -190,24 +186,27 @@ void ObjectPlayer::schedule_watch() { m_timer.add_event_after(m_watch_interval, m_watch_task); } -void ObjectPlayer::cancel_watch() { +bool ObjectPlayer::cancel_watch() { assert(m_timer_lock.is_locked()); ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl; - if (m_watch_task != NULL) { - m_timer.cancel_event(m_watch_task); - m_watch_task = NULL; + if (m_watch_task != nullptr) { + bool canceled = m_timer.cancel_event(m_watch_task); + assert(canceled); + + m_watch_task = nullptr; + return true; } + return false; } void ObjectPlayer::handle_watch_task() { assert(m_timer_lock.is_locked()); ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl; - assert(m_watch_ctx != NULL); + assert(m_watch_ctx != nullptr); + assert(m_watch_task != nullptr); - assert(!m_watch_in_progress); - m_watch_in_progress = true; - m_watch_task = NULL; + m_watch_task = nullptr; fetch(new C_WatchFetch(this)); } @@ -215,38 +214,31 @@ void ObjectPlayer::handle_watch_fetched(int r) { ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r << dendl; - Context *on_finish = nullptr; + Context *watch_ctx = nullptr; { Mutex::Locker timer_locker(m_timer_lock); - assert(m_watch_in_progress); if (r == -ENOENT) { r = 0; } else { m_refetch_required = true; } - std::swap(on_finish, m_watch_ctx); - } - - if (on_finish != nullptr) { - on_finish->complete(r); - } - - { - Mutex::Locker locker(m_timer_lock); - assert(m_watch_in_progress); + std::swap(watch_ctx, m_watch_ctx); - // callback might have attempted to re-schedule the watch -- complete now - if (m_watch_ctx != nullptr) { - schedule_watch(); + if (m_unwatched) { + m_unwatched = false; + r = -ECANCELED; } + } - m_watch_in_progress = false; - m_watch_in_progress_cond.Signal(); + if (watch_ctx != nullptr) { + watch_ctx->complete(r); } } void ObjectPlayer::C_Fetch::finish(int r) { r = object_player->handle_fetch_complete(r, read_bl); + object_player.reset(); + on_finish->complete(r); } diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h index 73fa7d132b2..d0809cec8fe 100644 --- a/src/journal/ObjectPlayer.h +++ b/src/journal/ObjectPlayer.h @@ -117,16 +117,15 @@ private: EntryKeys m_entry_keys; InvalidRanges m_invalid_ranges; - Context *m_watch_ctx; - Cond m_watch_in_progress_cond; - bool m_watch_in_progress; + Context *m_watch_ctx = nullptr; + bool m_unwatched = false; bool m_refetch_required = true; int handle_fetch_complete(int r, const bufferlist &bl); void schedule_watch(); - void cancel_watch(); + bool cancel_watch(); void handle_watch_task(); void handle_watch_fetched(int r); }; diff --git a/src/journal/Utils.h b/src/journal/Utils.h index e29f359acae..b0cee75ae82 100644 --- a/src/journal/Utils.h +++ b/src/journal/Utils.h @@ -5,12 +5,30 @@ #define CEPH_JOURNAL_UTILS_H #include "include/int_types.h" +#include "include/Context.h" #include "include/rados/librados.hpp" #include <string> namespace journal { namespace utils { +namespace detail { + +template <typename M> +struct C_AsyncCallback : public Context { + M journal_metadata; + Context *on_finish; + + C_AsyncCallback(M journal_metadata, Context *on_finish) + : journal_metadata(journal_metadata), on_finish(on_finish) { + } + virtual void finish(int r) { + journal_metadata->queue(on_finish, r); + } +}; + +} // namespace detail + template <typename T, void(T::*MF)(int)> void rados_state_callback(rados_completion_t c, void *arg) { T *obj = reinterpret_cast<T*>(arg); @@ -24,6 +42,12 @@ std::string unique_lock_name(const std::string &name, void *address); void rados_ctx_callback(rados_completion_t c, void *arg); +template <typename M> +Context *create_async_context_callback(M journal_metadata, Context *on_finish) { + // use async callback to acquire a clean lock context + return new detail::C_AsyncCallback<M>(journal_metadata, on_finish); +} + } // namespace utils } // namespace journal diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index d106ac4e28d..000f13ba4d8 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -11,6 +11,7 @@ #include "gtest/gtest.h" #include "test/journal/RadosTestFixture.h" #include <list> +#include <boost/scope_exit.hpp> class TestJournalPlayer : public RadosTestFixture { public: @@ -142,6 +143,11 @@ TEST_F(TestJournalPlayer, Prefetch) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 122)); ASSERT_EQ(0, write_entry(oid, 1, 234, 123)); @@ -183,6 +189,11 @@ TEST_F(TestJournalPlayer, PrefetchSkip) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 122)); ASSERT_EQ(0, write_entry(oid, 1, 234, 123)); @@ -213,6 +224,11 @@ TEST_F(TestJournalPlayer, PrefetchWithoutCommit) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 122)); ASSERT_EQ(0, write_entry(oid, 1, 234, 123)); @@ -248,6 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); ASSERT_EQ(0, write_entry(oid, 1, 234, 121)); @@ -282,6 +303,11 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); ASSERT_EQ(0, write_entry(oid, 1, 234, 121)); @@ -311,6 +337,11 @@ TEST_F(TestJournalPlayer, PrefetchMissingSequence) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, metadata->set_active_set(1)); ASSERT_EQ(0, write_entry(oid, 0, 2, 852)); @@ -356,6 +387,11 @@ TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, metadata->set_active_set(2)); ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); @@ -387,6 +423,11 @@ TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); @@ -421,6 +462,11 @@ TEST_F(TestJournalPlayer, PrefetchStaleEntries) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); ASSERT_EQ(0, write_entry(oid, 1, 0, 3)); @@ -452,6 +498,11 @@ TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); ASSERT_EQ(0, write_entry(oid, 1, 235, 121)); @@ -484,6 +535,11 @@ TEST_F(TestJournalPlayer, PrefetchAndWatch) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 122)); @@ -518,6 +574,11 @@ TEST_F(TestJournalPlayer, PrefetchSkippedObject) { ASSERT_EQ(0, metadata->set_active_set(2)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 234, 122)); ASSERT_EQ(0, write_entry(oid, 1, 234, 123)); @@ -565,6 +626,11 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) { metadata->set_minimum_set(2); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 8, 300, 0)); ASSERT_EQ(0, write_entry(oid, 8, 301, 0)); @@ -607,6 +673,11 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); @@ -652,6 +723,11 @@ TEST_F(TestJournalPlayer, LiveReplayMissingSequence) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 0, 2, 852)); ASSERT_EQ(0, write_entry(oid, 0, 2, 856)); @@ -702,6 +778,11 @@ TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, metadata->set_active_set(2)); ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); @@ -733,6 +814,11 @@ TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; C_SaferCond ctx1; cls::journal::Tag tag1; @@ -787,6 +873,11 @@ TEST_F(TestJournalPlayer, LiveReplayStaleEntries) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); ASSERT_EQ(0, write_entry(oid, 1, 0, 3)); @@ -818,6 +909,11 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) { ASSERT_EQ(0, init_metadata(metadata)); journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; ASSERT_EQ(0, metadata->set_active_set(1)); ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); @@ -844,3 +940,41 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) { ASSERT_EQ(expected_entries, entries); } +TEST_F(TestJournalPlayer, PrefechShutDown) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, {})); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + player->prefetch(); +} + +TEST_F(TestJournalPlayer, LiveReplayShutDown) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, {})); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + player->prefetch_and_watch(0.25); +} + diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc index 6103ee6b67f..67c35a1d12c 100644 --- a/src/test/journal/test_ObjectPlayer.cc +++ b/src/test/journal/test_ObjectPlayer.cc @@ -262,14 +262,11 @@ TEST_F(TestObjectPlayer, Unwatch) { std::string oid = get_temp_oid(); journal::ObjectPlayerPtr object = create_object(oid, 14); - Mutex mutex("lock"); - Cond cond; - bool done = false; - int rval = 0; - C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval); - object->watch(ctx, 600); + C_SaferCond watch_ctx; + object->watch(&watch_ctx, 600); usleep(200000); - ASSERT_FALSE(done); + object->unwatch(); + ASSERT_EQ(-ECANCELED, watch_ctx.wait()); } |