summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalPlayer.cc
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2016-05-24 18:06:26 +0200
committerJason Dillaman <dillaman@redhat.com>2016-05-25 14:19:17 +0200
commit79b41346678b3acdeb547fe07e44d8d0543d37a7 (patch)
tree342a5d2cd4cd7d088e71cbecdbece0428e0d4bce /src/journal/JournalPlayer.cc
parentMerge pull request #9122 from cbodley/wip-rgw-httpmgr-pipe (diff)
downloadceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.tar.xz
ceph-79b41346678b3acdeb547fe07e44d8d0543d37a7.zip
journal: player shutdown is now handled asynchronously
Fixes: http://tracker.ceph.com/issues/15949 Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/journal/JournalPlayer.cc')
-rw-r--r--src/journal/JournalPlayer.cc53
1 files changed, 40 insertions, 13 deletions
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
index a79b2d484f7..28905a2ed13 100644
--- a/src/journal/JournalPlayer.cc
+++ b/src/journal/JournalPlayer.cc
@@ -79,9 +79,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
}
JournalPlayer::~JournalPlayer() {
- m_async_op_tracker.wait_for_ops();
+ assert(m_async_op_tracker.empty());
{
Mutex::Locker locker(m_lock);
+ assert(m_shut_down);
assert(m_fetch_object_numbers.empty());
assert(!m_watch_scheduled);
}
@@ -140,16 +141,32 @@ void JournalPlayer::prefetch_and_watch(double interval) {
prefetch();
}
-void JournalPlayer::unwatch() {
+void JournalPlayer::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
+
+ assert(!m_shut_down);
+ m_shut_down = true;
m_watch_enabled = false;
+
+ on_finish = utils::create_async_context_callback(
+ m_journal_metadata, on_finish);
+
if (m_watch_scheduled) {
- for (auto &players : m_object_players) {
- players.second.begin()->second->unwatch();
+ ObjectPlayerPtr object_player = get_object_player();
+ switch (m_watch_step) {
+ case WATCH_STEP_FETCH_FIRST:
+ object_player = m_object_players.begin()->second.begin()->second;
+ // fallthrough
+ case WATCH_STEP_FETCH_CURRENT:
+ object_player->unwatch();
+ break;
+ case WATCH_STEP_ASSERT_ACTIVE:
+ break;
}
- m_watch_scheduled = false;
}
+
+ m_async_op_tracker.wait_for_ops(on_finish);
}
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
@@ -623,6 +640,10 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
assert(m_fetch_object_numbers.count(object_num) == 1);
m_fetch_object_numbers.erase(object_num);
+ if (m_shut_down) {
+ return;
+ }
+
if (r == -ENOENT) {
r = 0;
}
@@ -647,6 +668,8 @@ void JournalPlayer::schedule_watch() {
// by an incomplete tag sequence
ldout(m_cct, 20) << __func__ << ": asserting active tag="
<< *m_active_tag_tid << dendl;
+
+ m_async_op_tracker.start_op();
FunctionContext *ctx = new FunctionContext([this](int r) {
handle_watch_assert_active(r);
});
@@ -654,9 +677,9 @@ void JournalPlayer::schedule_watch() {
return;
}
+ ObjectPlayerPtr object_player;
double watch_interval = m_watch_interval;
- ObjectPlayerPtr object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_CURRENT:
{
@@ -684,21 +707,22 @@ void JournalPlayer::schedule_watch() {
ldout(m_cct, 20) << __func__ << ": scheduling watch on "
<< object_player->get_oid() << dendl;
- C_Watch *ctx = new C_Watch(this, object_player->get_object_number());
+ Context *ctx = utils::create_async_context_callback(
+ m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
object_player->watch(ctx, watch_interval);
}
void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
- if (r == -ECANCELED) {
- // unwatch of object player(s)
- return;
- }
-
Mutex::Locker locker(m_lock);
assert(m_watch_scheduled);
m_watch_scheduled = false;
+ if (m_shut_down || r == -ECANCELED) {
+ // unwatch of object player(s)
+ return;
+ }
+
ObjectPlayerPtr object_player = get_object_player(object_num);
if (r == 0 && object_player->empty()) {
// possibly need to prune this empty object player if we've
@@ -737,7 +761,10 @@ void JournalPlayer::handle_watch_assert_active(int r) {
}
m_watch_step = WATCH_STEP_FETCH_CURRENT;
- schedule_watch();
+ if (!m_shut_down && m_watch_enabled) {
+ schedule_watch();
+ }
+ m_async_op_tracker.finish_op();
}
void JournalPlayer::notify_entries_available() {