summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalPlayer.cc
diff options
context:
space:
mode:
authorPatrick Donnelly <pdonnell@redhat.com>2019-02-06 23:48:57 +0100
committerKefu Chai <tchaikov@gmail.com>2019-09-16 13:53:58 +0200
commit517bdca529db390e7ebc4e596570f80522060485 (patch)
tree3d7c2376b36f61ea8e85e97f6de515ccf6d6d4a1 /src/journal/JournalPlayer.cc
parentcommon: add make_ref factory for RefCountedObject (diff)
downloadceph-517bdca529db390e7ebc4e596570f80522060485.tar.xz
ceph-517bdca529db390e7ebc4e596570f80522060485.zip
common/RefCountedObj: cleanup con/des
Also, don't allow children to set nref (to 0). This is the more significant change as it required fixing various code to not do this: <reftype> ptr = new RefCountedObjectFoo(..., 0); as a way to create a starting reference with nref==1. This is a pretty bad code smell so I've converted all the code doing this to use the new factory method which produces the reference safely: auto ptr = ceph::make_ref<RefCountedObjectFoo>(...); libradosstriper was particularly egregious in its abuse of setting the starting nref. :( Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
Diffstat (limited to 'src/journal/JournalPlayer.cc')
-rw-r--r--src/journal/JournalPlayer.cc88
1 files changed, 37 insertions, 51 deletions
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
index 811508bf03c..4a091247613 100644
--- a/src/journal/JournalPlayer.cc
+++ b/src/journal/JournalPlayer.cc
@@ -20,30 +20,20 @@ namespace {
static const uint64_t MIN_FETCH_BYTES = 32768;
struct C_HandleComplete : public Context {
- ReplayHandler *replay_handler;
+ ReplayHandler* replay_handler;
- explicit C_HandleComplete(ReplayHandler *_replay_handler)
- : replay_handler(_replay_handler) {
- replay_handler->get();
- }
- ~C_HandleComplete() override {
- replay_handler->put();
- }
+ explicit C_HandleComplete(ReplayHandler* r) : replay_handler(std::move(r)) {}
+ ~C_HandleComplete() override {}
void finish(int r) override {
replay_handler->handle_complete(r);
}
};
struct C_HandleEntriesAvailable : public Context {
- ReplayHandler *replay_handler;
+ ReplayHandler* replay_handler;
- explicit C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
- : replay_handler(_replay_handler) {
- replay_handler->get();
- }
- ~C_HandleEntriesAvailable() override {
- replay_handler->put();
- }
+ explicit C_HandleEntriesAvailable(ReplayHandler* r) : replay_handler(std::move(r)) {}
+ ~C_HandleEntriesAvailable() override {}
void finish(int r) override {
replay_handler->handle_entries_available();
}
@@ -52,17 +42,16 @@ struct C_HandleEntriesAvailable : public Context {
} // anonymous namespace
JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
- const std::string &object_oid_prefix,
- const JournalMetadataPtr& journal_metadata,
- ReplayHandler *replay_handler,
+ std::string_view object_oid_prefix,
+ ceph::ref_t<JournalMetadata> journal_metadata,
+ ReplayHandler* replay_handler,
CacheManagerHandler *cache_manager_handler)
- : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
- m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
+ : m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(std::move(journal_metadata)),
+ m_replay_handler(std::move(replay_handler)),
m_cache_manager_handler(cache_manager_handler),
- m_cache_rebalance_handler(this),
- m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false),
- m_watch_scheduled(false), m_watch_interval(0) {
- m_replay_handler->get();
+ m_cache_rebalance_handler(this)
+{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -109,7 +98,6 @@ JournalPlayer::~JournalPlayer() {
ceph_assert(m_fetch_object_numbers.empty());
ceph_assert(!m_watch_scheduled);
}
- m_replay_handler->put();
if (m_cache_manager_handler != nullptr) {
m_cache_manager_handler->unregister_cache(m_cache_name);
@@ -186,7 +174,7 @@ void JournalPlayer::shut_down(Context *on_finish) {
m_journal_metadata, on_finish);
if (m_watch_scheduled) {
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_FIRST:
object_player = m_object_players.begin()->second;
@@ -220,7 +208,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
return false;
}
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
ceph_assert(object_player && !object_player->empty());
object_player->front(entry);
@@ -294,7 +282,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
bool prefetch_complete = false;
ceph_assert(m_object_players.count(splay_offset) == 1);
- ObjectPlayerPtr object_player = m_object_players[splay_offset];
+ auto object_player = m_object_players[splay_offset];
// prefetch in-order since a newer splay object could prefetch first
if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) {
@@ -415,7 +403,7 @@ bool JournalPlayer::verify_playback_ready() {
return false;
}
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
ceph_assert(object_player);
uint64_t object_num = object_player->get_object_number();
@@ -520,8 +508,8 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) {
}
bool pruned = false;
- for (auto &player_pair : m_object_players) {
- ObjectPlayerPtr object_player(player_pair.second);
+ for (const auto &player_pair : m_object_players) {
+ auto& object_player = player_pair.second;
ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid()
<< dendl;
while (!object_player->empty()) {
@@ -541,14 +529,14 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) {
if (pruned) {
ldout(m_cct, 15) << __func__ << ": resetting refetch state to immediate"
<< dendl;
- for (auto &player_pair : m_object_players) {
- ObjectPlayerPtr object_player(player_pair.second);
+ for (const auto &player_pair : m_object_players) {
+ auto& object_player = player_pair.second;
object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
}
}
// trim empty player to prefetch the next available object
- for (auto &player_pair : m_object_players) {
+ for (const auto &player_pair : m_object_players) {
remove_empty_object_player(player_pair.second);
}
}
@@ -567,7 +555,7 @@ void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
prune_tag(active_tag_tid);
}
-ObjectPlayerPtr JournalPlayer::get_object_player() const {
+ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player() const {
ceph_assert(ceph_mutex_is_locked(m_lock));
SplayedObjectPlayers::const_iterator it = m_object_players.find(
@@ -576,7 +564,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player() const {
return it->second;
}
-ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
+ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player(uint64_t object_number) const {
ceph_assert(ceph_mutex_is_locked(m_lock));
uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -584,7 +572,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
auto splay_it = m_object_players.find(splay_offset);
ceph_assert(splay_it != m_object_players.end());
- ObjectPlayerPtr object_player = splay_it->second;
+ auto object_player = splay_it->second;
ceph_assert(object_player->get_object_number() == object_number);
return object_player;
}
@@ -598,7 +586,7 @@ void JournalPlayer::advance_splay_object() {
<< static_cast<uint32_t>(m_splay_offset) << dendl;
}
-bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
+bool JournalPlayer::remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &player) {
ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(!m_watch_scheduled);
@@ -615,7 +603,7 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
ldout(m_cct, 20) << __func__ << ": new active set detected, all players "
<< "require refetch" << dendl;
m_active_set = active_set;
- for (auto &pair : m_object_players) {
+ for (const auto& pair : m_object_players) {
pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
}
return false;
@@ -635,17 +623,17 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
void JournalPlayer::fetch(uint64_t object_num) {
ceph_assert(ceph_mutex_is_locked(m_lock));
- ObjectPlayerPtr object_player(new ObjectPlayer(
+ auto object_player = ceph::make_ref<ObjectPlayer>(
m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
- m_max_fetch_bytes));
+ m_max_fetch_bytes);
auto splay_width = m_journal_metadata->get_splay_width();
m_object_players[object_num % splay_width] = object_player;
fetch(object_player);
}
-void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
+void JournalPlayer::fetch(const ceph::ref_t<ObjectPlayer> &object_player) {
ceph_assert(ceph_mutex_is_locked(m_lock));
uint64_t object_num = object_player->get_object_number();
@@ -673,7 +661,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
}
if (r == 0) {
- ObjectPlayerPtr object_player = get_object_player(object_num);
+ auto object_player = get_object_player(object_num);
remove_empty_object_player(object_player);
}
process_state(object_num, r);
@@ -690,7 +678,7 @@ void JournalPlayer::refetch(bool immediate) {
return;
}
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
if (object_player->refetch_required()) {
object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
fetch(object_player);
@@ -723,7 +711,7 @@ void JournalPlayer::schedule_watch(bool immediate) {
return;
}
- ObjectPlayerPtr object_player;
+ ceph::ref_t<ObjectPlayer> object_player;
double watch_interval = m_watch_interval;
switch (m_watch_step) {
@@ -772,7 +760,7 @@ void JournalPlayer::handle_watch(uint64_t object_num, int r) {
return;
}
- ObjectPlayerPtr object_player = get_object_player(object_num);
+ auto object_player = get_object_player(object_num);
if (r == 0 && object_player->empty()) {
// possibly need to prune this empty object player if we've
// already fetched it after the active set was advanced with no
@@ -824,8 +812,7 @@ void JournalPlayer::notify_entries_available() {
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->queue(new C_HandleEntriesAvailable(
- m_replay_handler), 0);
+ m_journal_metadata->queue(new C_HandleEntriesAvailable(m_replay_handler), 0);
}
void JournalPlayer::notify_complete(int r) {
@@ -833,8 +820,7 @@ void JournalPlayer::notify_complete(int r) {
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
- m_journal_metadata->queue(new C_HandleComplete(
- m_replay_handler), r);
+ m_journal_metadata->queue(new C_HandleComplete(m_replay_handler), r);
}
void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) {