diff options
author | Jason Dillaman <dillaman@redhat.com> | 2016-03-08 16:03:14 +0100 |
---|---|---|
committer | Jason Dillaman <dillaman@redhat.com> | 2016-03-08 16:08:13 +0100 |
commit | b37f135c6495ae6f57797ff92e81acefc8bc2dc4 (patch) | |
tree | 405b53ecef8244cb22e2d240ebfad9870eece67e /src/journal/JournalPlayer.cc | |
parent | tests: updated test cases for librbd journal tag allocation (diff) | |
download | ceph-b37f135c6495ae6f57797ff92e81acefc8bc2dc4.tar.xz ceph-b37f135c6495ae6f57797ff92e81acefc8bc2dc4.zip |
journal: clean up playback notification handling
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/journal/JournalPlayer.cc')
-rw-r--r-- | src/journal/JournalPlayer.cc | 51 |
1 files changed, 31 insertions, 20 deletions
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 664b4b00102..e001c018c6e 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -150,6 +150,8 @@ void JournalPlayer::unwatch() { bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); + + m_handler_notified = false; if (m_state != STATE_PLAYBACK) { return false; } @@ -160,9 +162,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { if (!verify_playback_ready()) { if (!m_watch_enabled) { - ldout(m_cct, 10) << __func__ << ": replay complete" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), 0); + notify_complete(0); } else if (!m_watch_scheduled) { schedule_watch(); } @@ -180,8 +180,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl; m_state = STATE_ERROR; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), -ENOMSG); + notify_complete(-ENOMSG); return false; } else if (m_journal_metadata->get_last_allocated_entry_tid( entry->get_tag_tid(), &last_entry_tid) && @@ -189,8 +188,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { lderr(m_cct) << "missing prior journal entry: " << *entry << dendl; m_state = STATE_ERROR; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), -ENOMSG); + notify_complete(-ENOMSG); return false; } @@ -233,8 +231,7 @@ void JournalPlayer::process_state(uint64_t object_number, int r) { if (r < 0) { m_state = STATE_ERROR; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), r); + notify_complete(r); } } @@ -329,16 +326,13 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { if (!is_object_set_ready()) { ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl; } else if (verify_playback_ready()) { - ldout(m_cct, 10) << __func__ << ": entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( - m_replay_handler), 0); + notify_entries_available(); } else if (m_watch_enabled) { schedule_watch(); } else { ldout(m_cct, 10) << __func__ << ": no uncommitted entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), 0); + notify_complete(0); } return 0; } @@ -353,17 +347,13 @@ int JournalPlayer::process_playback(uint64_t object_number) { ObjectPlayerPtr object_player = get_object_player(); if (verify_playback_ready()) { - ldout(m_cct, 10) << __func__ << ": entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( - m_replay_handler), 0); + notify_entries_available(); } else if (!m_watch_enabled && is_object_set_ready()) { uint8_t splay_width = m_journal_metadata->get_splay_width(); uint64_t active_set = m_journal_metadata->get_active_set(); uint64_t object_set = object_player->get_object_number() / splay_width; if (object_set == active_set) { - ldout(m_cct, 10) << __func__ << ": replay complete" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), 0); + notify_complete(0); } } return 0; @@ -558,4 +548,25 @@ void JournalPlayer::handle_watch(int r) { } } +void JournalPlayer::notify_entries_available() { + assert(m_lock.is_locked()); + if (m_handler_notified) { + return; + } + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; + m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( + m_replay_handler), 0); +} + +void JournalPlayer::notify_complete(int r) { + assert(m_lock.is_locked()); + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl; + m_journal_metadata->get_finisher().queue(new C_HandleComplete( + m_replay_handler), r); +} + } // namespace journal |