diff options
author | Jason Dillaman <dillaman@redhat.com> | 2016-05-24 18:06:26 +0200 |
---|---|---|
committer | Jason Dillaman <dillaman@redhat.com> | 2016-05-25 14:19:17 +0200 |
commit | 79b41346678b3acdeb547fe07e44d8d0543d37a7 (patch) | |
tree | 342a5d2cd4cd7d088e71cbecdbece0428e0d4bce /src/journal/JournalPlayer.cc | |
parent | Merge pull request #9122 from cbodley/wip-rgw-httpmgr-pipe (diff) | |
download | ceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.tar.xz ceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.zip |
journal: player shutdown is now handled asynchronously
Fixes: http://tracker.ceph.com/issues/15949
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/journal/JournalPlayer.cc')
-rw-r--r-- | src/journal/JournalPlayer.cc | 53 |
1 files changed, 40 insertions, 13 deletions
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index a79b2d484f7..28905a2ed13 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -79,9 +79,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, } JournalPlayer::~JournalPlayer() { - m_async_op_tracker.wait_for_ops(); + assert(m_async_op_tracker.empty()); { Mutex::Locker locker(m_lock); + assert(m_shut_down); assert(m_fetch_object_numbers.empty()); assert(!m_watch_scheduled); } @@ -140,16 +141,32 @@ void JournalPlayer::prefetch_and_watch(double interval) { prefetch(); } -void JournalPlayer::unwatch() { +void JournalPlayer::shut_down(Context *on_finish) { ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); + + assert(!m_shut_down); + m_shut_down = true; m_watch_enabled = false; + + on_finish = utils::create_async_context_callback( + m_journal_metadata, on_finish); + if (m_watch_scheduled) { - for (auto &players : m_object_players) { - players.second.begin()->second->unwatch(); + ObjectPlayerPtr object_player = get_object_player(); + switch (m_watch_step) { + case WATCH_STEP_FETCH_FIRST: + object_player = m_object_players.begin()->second.begin()->second; + // fallthrough + case WATCH_STEP_FETCH_CURRENT: + object_player->unwatch(); + break; + case WATCH_STEP_ASSERT_ACTIVE: + break; } - m_watch_scheduled = false; } + + m_async_op_tracker.wait_for_ops(on_finish); } bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { @@ -623,6 +640,10 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { assert(m_fetch_object_numbers.count(object_num) == 1); m_fetch_object_numbers.erase(object_num); + if (m_shut_down) { + return; + } + if (r == -ENOENT) { r = 0; } @@ -647,6 +668,8 @@ void JournalPlayer::schedule_watch() { // by an incomplete tag sequence ldout(m_cct, 20) << __func__ << ": asserting active tag=" << *m_active_tag_tid << dendl; + + m_async_op_tracker.start_op(); FunctionContext *ctx = new FunctionContext([this](int r) { handle_watch_assert_active(r); }); @@ -654,9 +677,9 @@ void JournalPlayer::schedule_watch() { return; } + ObjectPlayerPtr object_player; double watch_interval = m_watch_interval; - ObjectPlayerPtr object_player = get_object_player(); switch (m_watch_step) { case WATCH_STEP_FETCH_CURRENT: { @@ -684,21 +707,22 @@ void JournalPlayer::schedule_watch() { ldout(m_cct, 20) << __func__ << ": scheduling watch on " << object_player->get_oid() << dendl; - C_Watch *ctx = new C_Watch(this, object_player->get_object_number()); + Context *ctx = utils::create_async_context_callback( + m_journal_metadata, new C_Watch(this, object_player->get_object_number())); object_player->watch(ctx, watch_interval); } void JournalPlayer::handle_watch(uint64_t object_num, int r) { ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; - if (r == -ECANCELED) { - // unwatch of object player(s) - return; - } - Mutex::Locker locker(m_lock); assert(m_watch_scheduled); m_watch_scheduled = false; + if (m_shut_down || r == -ECANCELED) { + // unwatch of object player(s) + return; + } + ObjectPlayerPtr object_player = get_object_player(object_num); if (r == 0 && object_player->empty()) { // possibly need to prune this empty object player if we've @@ -737,7 +761,10 @@ void JournalPlayer::handle_watch_assert_active(int r) { } m_watch_step = WATCH_STEP_FETCH_CURRENT; - schedule_watch(); + if (!m_shut_down && m_watch_enabled) { + schedule_watch(); + } + m_async_op_tracker.finish_op(); } void JournalPlayer::notify_entries_available() { |