diff options
author | Patrick Donnelly <pdonnell@redhat.com> | 2019-02-06 23:48:57 +0100 |
---|---|---|
committer | Kefu Chai <tchaikov@gmail.com> | 2019-09-16 13:53:58 +0200 |
commit | 517bdca529db390e7ebc4e596570f80522060485 (patch) | |
tree | 3d7c2376b36f61ea8e85e97f6de515ccf6d6d4a1 | |
parent | common: add make_ref factory for RefCountedObject (diff) | |
download | ceph-517bdca529db390e7ebc4e596570f80522060485.tar.xz ceph-517bdca529db390e7ebc4e596570f80522060485.zip |
common/RefCountedObj: cleanup con/des
Also, don't allow children to set nref (to 0). This is the more significant
change as it required fixing various code to not do this:
<reftype> ptr = new RefCountedObjectFoo(..., 0);
as a way to create a starting reference with nref==1. This is a pretty
bad code smell so I've converted all the code doing this to use the new
factory method which produces the reference safely:
auto ptr = ceph::make_ref<RefCountedObjectFoo>(...);
libradosstriper was particularly egregious in its abuse of setting the starting
nref. :(
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
71 files changed, 861 insertions, 979 deletions
diff --git a/src/common/RefCountedObj.h b/src/common/RefCountedObj.h index 024d2f4d2c9..5c0473c9598 100644 --- a/src/common/RefCountedObj.h +++ b/src/common/RefCountedObj.h @@ -20,7 +20,27 @@ #include <atomic> -struct RefCountedObject { +/* This class provides mechanisms to make a sub-class work with + * boost::intrusive_ptr (aka ceph::ref_t). + * + * Generally, you'll want to inherit from RefCountedObjectSafe and not from + * RefCountedObject directly. This is because the ::get and ::put methods are + * public and can be used to create/delete references outside of the + * ceph::ref_t pointers with the potential to leak memory. + * + * It is also suggested that you make constructors and destructors private in + * your final class. This prevents instantiation of the object with assignment + * to a raw pointer. Consequently, you'll want to use ceph::make_ref<> to + * create a ceph::ref_t<> holding your object: + * + * auto ptr = ceph::make_ref<Foo>(...); + * + * Use FRIEND_MAKE_REF(ClassName) to allow ceph::make_ref to call the private + * constructors. + * + */ + +class RefCountedObject { public: void set_cct(class CephContext *c) { cct = c; @@ -46,7 +66,7 @@ protected: RefCountedObject& operator=(const RefCountedObject& o) = delete; RefCountedObject(RefCountedObject&&) = delete; RefCountedObject& operator=(RefCountedObject&&) = delete; - RefCountedObject(class CephContext* c = nullptr, int n = 1) : cct(c), nref(n) {} + RefCountedObject(class CephContext* c) : cct(c) {} virtual ~RefCountedObject(); @@ -62,6 +82,17 @@ private: class CephContext *cct{nullptr}; }; +class RefCountedObjectSafe : public RefCountedObject { +public: + RefCountedObject *get() = delete; + const RefCountedObject *get() const = delete; + void put() const = delete; +protected: +template<typename... Args> + RefCountedObjectSafe(Args&&... args) : RefCountedObject(std::forward<Args>(args)...) {} + virtual ~RefCountedObjectSafe() override {} +}; + #ifndef WITH_SEASTAR /** diff --git a/src/journal/Future.cc b/src/journal/Future.cc index 89f7fd326e9..0e794d165e8 100644 --- a/src/journal/Future.cc +++ b/src/journal/Future.cc @@ -7,6 +7,14 @@ namespace journal { +Future::Future() = default; +Future::Future(const Future& o) = default; +Future& Future::operator=(const Future& o) = default; +Future::Future(Future&& o) = default; +Future& Future::operator=(Future&& o) = default; +Future::Future(ceph::ref_t<FutureImpl> future_impl) : m_future_impl(std::move(future_impl)) {} +Future::~Future() = default; + void Future::flush(Context *on_safe) { m_future_impl->flush(on_safe); } @@ -24,16 +32,8 @@ int Future::get_return_value() const { return m_future_impl->get_return_value(); } -void intrusive_ptr_add_ref(FutureImpl *p) { - p->get(); -} - -void intrusive_ptr_release(FutureImpl *p) { - p->put(); -} - std::ostream &operator<<(std::ostream &os, const Future &future) { - return os << *future.m_future_impl.get(); + return os << *future.m_future_impl; } } // namespace journal diff --git a/src/journal/Future.h b/src/journal/Future.h index fef0015651c..ba835b3538d 100644 --- a/src/journal/Future.h +++ b/src/journal/Future.h @@ -4,11 +4,12 @@ #ifndef CEPH_JOURNAL_FUTURE_H #define CEPH_JOURNAL_FUTURE_H -#include "include/int_types.h" -#include <string> #include <iosfwd> -#include <boost/intrusive_ptr.hpp> +#include <string> + #include "include/ceph_assert.h" +#include "include/int_types.h" +#include "common/ref.h" class Context; @@ -18,13 +19,16 @@ class FutureImpl; class Future { public: - typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr; - - Future() {} - Future(const FutureImplPtr &future_impl) : m_future_impl(future_impl) {} - - inline bool is_valid() const { - return m_future_impl.get() != nullptr; + Future(); + Future(const Future&); + Future& operator=(const Future&); + Future(Future&&); + Future& operator=(Future&&); + Future(ceph::ref_t<FutureImpl> future_impl); + ~Future(); + + bool is_valid() const { + return bool(m_future_impl); } void flush(Context *on_safe); @@ -37,22 +41,17 @@ private: friend class Journaler; friend std::ostream& operator<<(std::ostream&, const Future&); - inline FutureImplPtr get_future_impl() const { + const auto& get_future_impl() const { return m_future_impl; } - FutureImplPtr m_future_impl; + ceph::ref_t<FutureImpl> m_future_impl; }; -void intrusive_ptr_add_ref(FutureImpl *p); -void intrusive_ptr_release(FutureImpl *p); - std::ostream &operator<<(std::ostream &os, const Future &future); } // namespace journal -using journal::intrusive_ptr_add_ref; -using journal::intrusive_ptr_release; using journal::operator<<; #endif // CEPH_JOURNAL_FUTURE_H diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc index 474c025c608..4e804f8dc65 100644 --- a/src/journal/FutureImpl.cc +++ b/src/journal/FutureImpl.cc @@ -8,14 +8,14 @@ namespace journal { FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid) - : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid), + : m_tag_tid(tag_tid), + m_entry_tid(entry_tid), m_commit_tid(commit_tid), - m_safe(false), - m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE), - m_consistent_ack(this) { + m_consistent_ack(this) +{ } -void FutureImpl::init(const FutureImplPtr &prev_future) { +void FutureImpl::init(const ceph::ref_t<FutureImpl> &prev_future) { // chain ourself to the prior future (if any) to that we known when the // journal is consistent if (prev_future) { @@ -30,7 +30,7 @@ void FutureImpl::flush(Context *on_safe) { bool complete; FlushHandlers flush_handlers; - FutureImplPtr prev_future; + ceph::ref_t<FutureImpl> prev_future; { std::lock_guard locker{m_lock}; complete = (m_safe && m_consistent); @@ -60,20 +60,21 @@ void FutureImpl::flush(Context *on_safe) { } } -FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) { +ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers) { std::lock_guard locker{m_lock}; return prepare_flush(flush_handlers, m_lock); } -FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers, +ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock) { ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_flush_state == FLUSH_STATE_NONE) { m_flush_state = FLUSH_STATE_REQUESTED; - if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) { - flush_handlers->insert({m_flush_handler, this}); + auto h = m_flush_handler; + if (h) { + flush_handlers->try_emplace(std::move(h), this); } } return m_prev_future; @@ -103,10 +104,10 @@ int FutureImpl::get_return_value() const { return m_return_value; } -bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) { +bool FutureImpl::attach(FlushHandler::ref flush_handler) { std::lock_guard locker{m_lock}; ceph_assert(!m_flush_handler); - m_flush_handler = flush_handler; + m_flush_handler = std::move(flush_handler); return m_flush_state != FLUSH_STATE_NONE; } @@ -163,12 +164,4 @@ std::ostream &operator<<(std::ostream &os, const FutureImpl &future) { return os; } -void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) { - p->get(); -} - -void intrusive_ptr_release(FutureImpl::FlushHandler *p) { - p->put(); -} - } // namespace journal diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h index b81fba200bc..241a09709ff 100644 --- a/src/journal/FutureImpl.h +++ b/src/journal/FutureImpl.h @@ -11,29 +11,21 @@ #include <list> #include <map> #include <boost/noncopyable.hpp> -#include <boost/intrusive_ptr.hpp> #include "include/ceph_assert.h" class Context; namespace journal { -class FutureImpl; -typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr; - class FutureImpl : public RefCountedObject, boost::noncopyable { public: struct FlushHandler { - virtual ~FlushHandler() {} - virtual void flush(const FutureImplPtr &future) = 0; - virtual void get() = 0; - virtual void put() = 0; + using ref = std::shared_ptr<FlushHandler>; + virtual void flush(const ceph::ref_t<FutureImpl> &future) = 0; + virtual ~FlushHandler() = default; }; - typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr; - - FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid); - void init(const FutureImplPtr &prev_future); + void init(const ceph::ref_t<FutureImpl> &prev_future); inline uint64_t get_tag_tid() const { return m_tag_tid; @@ -56,19 +48,17 @@ public: return (m_flush_state == FLUSH_STATE_IN_PROGRESS); } inline void set_flush_in_progress() { + auto h = std::move(m_flush_handler); + ceph_assert(h); std::lock_guard locker{m_lock}; - ceph_assert(m_flush_handler); - m_flush_handler.reset(); m_flush_state = FLUSH_STATE_IN_PROGRESS; } - bool attach(const FlushHandlerPtr &flush_handler); + bool attach(FlushHandler::ref flush_handler); inline void detach() { - std::lock_guard locker{m_lock}; m_flush_handler.reset(); } - inline FlushHandlerPtr get_flush_handler() const { - std::lock_guard locker{m_lock}; + inline FlushHandler::ref get_flush_handler() const { return m_flush_handler; } @@ -77,7 +67,7 @@ public: private: friend std::ostream &operator<<(std::ostream &, const FutureImpl &); - typedef std::map<FlushHandlerPtr, FutureImplPtr> FlushHandlers; + typedef std::map<FlushHandler::ref, ceph::ref_t<FutureImpl>> FlushHandlers; typedef std::list<Context *> Contexts; enum FlushState { @@ -87,8 +77,8 @@ private: }; struct C_ConsistentAck : public Context { - FutureImplPtr future; - C_ConsistentAck(FutureImpl *_future) : future(_future) {} + ceph::ref_t<FutureImpl> future; + C_ConsistentAck(ceph::ref_t<FutureImpl> _future) : future(std::move(_future)) {} void complete(int r) override { future->consistent(r); future.reset(); @@ -96,32 +86,33 @@ private: void finish(int r) override {} }; + FRIEND_MAKE_REF(FutureImpl); + FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid); + ~FutureImpl() override = default; + uint64_t m_tag_tid; uint64_t m_entry_tid; uint64_t m_commit_tid; mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false); - FutureImplPtr m_prev_future; - bool m_safe; - bool m_consistent; - int m_return_value; + ceph::ref_t<FutureImpl> m_prev_future; + bool m_safe = false; + bool m_consistent = false; + int m_return_value = 0; - FlushHandlerPtr m_flush_handler; - FlushState m_flush_state; + FlushHandler::ref m_flush_handler; + FlushState m_flush_state = FLUSH_STATE_NONE; C_ConsistentAck m_consistent_ack; Contexts m_contexts; - FutureImplPtr prepare_flush(FlushHandlers *flush_handlers); - FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock); + ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers); + ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock); void consistent(int r); void finish_unlock(); }; -void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p); -void intrusive_ptr_release(FutureImpl::FlushHandler *p); - std::ostream &operator<<(std::ostream &os, const FutureImpl &future); } // namespace journal diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index bf9c21be174..9801fc0a0b8 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -405,14 +405,11 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, const std::string &oid, const std::string &client_id, const Settings &settings) - : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid), - m_client_id(client_id), m_settings(settings), m_order(0), - m_splay_width(0), m_pool_id(-1), m_initialized(false), + : m_oid(oid), + m_client_id(client_id), m_settings(settings), m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), - m_commit_tid(0), m_watch_ctx(this), - m_watch_handle(0), m_minimum_set(0), m_active_set(0), - m_update_notifications(0), m_commit_position_ctx(NULL), - m_commit_position_task_ctx(NULL) { + m_watch_ctx(this) +{ m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); } diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 13d9fd44ff1..1b6911daecb 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -15,7 +15,6 @@ #include "cls/journal/cls_journal_types.h" #include "journal/JournalMetadataListener.h" #include "journal/Settings.h" -#include <boost/intrusive_ptr.hpp> #include <boost/noncopyable.hpp> #include <boost/optional.hpp> #include <functional> @@ -28,9 +27,6 @@ class SafeTimer; namespace journal { -class JournalMetadata; -typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr; - class JournalMetadata : public RefCountedObject, boost::noncopyable { public: typedef std::function<Context*()> CreateContext; @@ -43,11 +39,6 @@ public: typedef std::set<Client> RegisteredClients; typedef std::list<Tag> Tags; - JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, - librados::IoCtx &ioctx, const std::string &oid, - const std::string &client_id, const Settings &settings); - ~JournalMetadata() override; - void init(Context *on_init); void shut_down(Context *on_finish); @@ -156,6 +147,12 @@ public: void wait_for_ops(); private: + FRIEND_MAKE_REF(JournalMetadata); + JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, + librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id, const Settings &settings); + ~JournalMetadata() override; + typedef std::map<uint64_t, uint64_t> AllocatedEntryTids; typedef std::list<JournalMetadataListener*> Listeners; typedef std::list<Context*> Contexts; @@ -299,15 +296,15 @@ private: }; librados::IoCtx m_ioctx; - CephContext *m_cct; + CephContext *m_cct = nullptr; std::string m_oid; std::string m_client_id; Settings m_settings; - uint8_t m_order; - uint8_t m_splay_width; - int64_t m_pool_id; - bool m_initialized; + uint8_t m_order = 0; + uint8_t m_splay_width = 0; + int64_t m_pool_id = -1; + bool m_initialized = false; ContextWQ *m_work_queue; SafeTimer *m_timer; @@ -315,22 +312,22 @@ private: mutable ceph::mutex m_lock = ceph::make_mutex("JournalMetadata::m_lock"); - uint64_t m_commit_tid; + uint64_t m_commit_tid = 0; CommitTids m_pending_commit_tids; Listeners m_listeners; C_WatchCtx m_watch_ctx; - uint64_t m_watch_handle; + uint64_t m_watch_handle = 0; - uint64_t m_minimum_set; - uint64_t m_active_set; + uint64_t m_minimum_set = 0; + uint64_t m_active_set = 0; RegisteredClients m_registered_clients; Client m_client; AllocatedEntryTids m_allocated_entry_tids; - size_t m_update_notifications; + size_t m_update_notifications = 0; ceph::condition_variable m_update_cond; size_t m_ignore_watch_notifies = 0; @@ -339,8 +336,8 @@ private: uint64_t m_commit_position_tid = 0; ObjectSetPosition m_commit_position; - Context *m_commit_position_ctx; - Context *m_commit_position_task_ctx; + Context *m_commit_position_ctx = nullptr; + Context *m_commit_position_task_ctx = nullptr; size_t m_flush_commits_in_progress = 0; Contexts m_flush_commit_position_ctxs; diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 811508bf03c..4a091247613 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -20,30 +20,20 @@ namespace { static const uint64_t MIN_FETCH_BYTES = 32768; struct C_HandleComplete : public Context { - ReplayHandler *replay_handler; + ReplayHandler* replay_handler; - explicit C_HandleComplete(ReplayHandler *_replay_handler) - : replay_handler(_replay_handler) { - replay_handler->get(); - } - ~C_HandleComplete() override { - replay_handler->put(); - } + explicit C_HandleComplete(ReplayHandler* r) : replay_handler(std::move(r)) {} + ~C_HandleComplete() override {} void finish(int r) override { replay_handler->handle_complete(r); } }; struct C_HandleEntriesAvailable : public Context { - ReplayHandler *replay_handler; + ReplayHandler* replay_handler; - explicit C_HandleEntriesAvailable(ReplayHandler *_replay_handler) - : replay_handler(_replay_handler) { - replay_handler->get(); - } - ~C_HandleEntriesAvailable() override { - replay_handler->put(); - } + explicit C_HandleEntriesAvailable(ReplayHandler* r) : replay_handler(std::move(r)) {} + ~C_HandleEntriesAvailable() override {} void finish(int r) override { replay_handler->handle_entries_available(); } @@ -52,17 +42,16 @@ struct C_HandleEntriesAvailable : public Context { } // anonymous namespace JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, - const std::string &object_oid_prefix, - const JournalMetadataPtr& journal_metadata, - ReplayHandler *replay_handler, + std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, + ReplayHandler* replay_handler, CacheManagerHandler *cache_manager_handler) - : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), - m_journal_metadata(journal_metadata), m_replay_handler(replay_handler), + : m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(std::move(journal_metadata)), + m_replay_handler(std::move(replay_handler)), m_cache_manager_handler(cache_manager_handler), - m_cache_rebalance_handler(this), - m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false), - m_watch_scheduled(false), m_watch_interval(0) { - m_replay_handler->get(); + m_cache_rebalance_handler(this) +{ m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); @@ -109,7 +98,6 @@ JournalPlayer::~JournalPlayer() { ceph_assert(m_fetch_object_numbers.empty()); ceph_assert(!m_watch_scheduled); } - m_replay_handler->put(); if (m_cache_manager_handler != nullptr) { m_cache_manager_handler->unregister_cache(m_cache_name); @@ -186,7 +174,7 @@ void JournalPlayer::shut_down(Context *on_finish) { m_journal_metadata, on_finish); if (m_watch_scheduled) { - ObjectPlayerPtr object_player = get_object_player(); + auto object_player = get_object_player(); switch (m_watch_step) { case WATCH_STEP_FETCH_FIRST: object_player = m_object_players.begin()->second; @@ -220,7 +208,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { return false; } - ObjectPlayerPtr object_player = get_object_player(); + auto object_player = get_object_player(); ceph_assert(object_player && !object_player->empty()); object_player->front(entry); @@ -294,7 +282,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { bool prefetch_complete = false; ceph_assert(m_object_players.count(splay_offset) == 1); - ObjectPlayerPtr object_player = m_object_players[splay_offset]; + auto object_player = m_object_players[splay_offset]; // prefetch in-order since a newer splay object could prefetch first if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) { @@ -415,7 +403,7 @@ bool JournalPlayer::verify_playback_ready() { return false; } - ObjectPlayerPtr object_player = get_object_player(); + auto object_player = get_object_player(); ceph_assert(object_player); uint64_t object_num = object_player->get_object_number(); @@ -520,8 +508,8 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) { } bool pruned = false; - for (auto &player_pair : m_object_players) { - ObjectPlayerPtr object_player(player_pair.second); + for (const auto &player_pair : m_object_players) { + auto& object_player = player_pair.second; ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid() << dendl; while (!object_player->empty()) { @@ -541,14 +529,14 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) { if (pruned) { ldout(m_cct, 15) << __func__ << ": resetting refetch state to immediate" << dendl; - for (auto &player_pair : m_object_players) { - ObjectPlayerPtr object_player(player_pair.second); + for (const auto &player_pair : m_object_players) { + auto& object_player = player_pair.second; object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE); } } // trim empty player to prefetch the next available object - for (auto &player_pair : m_object_players) { + for (const auto &player_pair : m_object_players) { remove_empty_object_player(player_pair.second); } } @@ -567,7 +555,7 @@ void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) { prune_tag(active_tag_tid); } -ObjectPlayerPtr JournalPlayer::get_object_player() const { +ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player() const { ceph_assert(ceph_mutex_is_locked(m_lock)); SplayedObjectPlayers::const_iterator it = m_object_players.find( @@ -576,7 +564,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player() const { return it->second; } -ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const { +ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player(uint64_t object_number) const { ceph_assert(ceph_mutex_is_locked(m_lock)); uint8_t splay_width = m_journal_metadata->get_splay_width(); @@ -584,7 +572,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const { auto splay_it = m_object_players.find(splay_offset); ceph_assert(splay_it != m_object_players.end()); - ObjectPlayerPtr object_player = splay_it->second; + auto object_player = splay_it->second; ceph_assert(object_player->get_object_number() == object_number); return object_player; } @@ -598,7 +586,7 @@ void JournalPlayer::advance_splay_object() { << static_cast<uint32_t>(m_splay_offset) << dendl; } -bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { +bool JournalPlayer::remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &player) { ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_watch_scheduled); @@ -615,7 +603,7 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { ldout(m_cct, 20) << __func__ << ": new active set detected, all players " << "require refetch" << dendl; m_active_set = active_set; - for (auto &pair : m_object_players) { + for (const auto& pair : m_object_players) { pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE); } return false; @@ -635,17 +623,17 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { void JournalPlayer::fetch(uint64_t object_num) { ceph_assert(ceph_mutex_is_locked(m_lock)); - ObjectPlayerPtr object_player(new ObjectPlayer( + auto object_player = ceph::make_ref<ObjectPlayer>( m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(), - m_max_fetch_bytes)); + m_max_fetch_bytes); auto splay_width = m_journal_metadata->get_splay_width(); m_object_players[object_num % splay_width] = object_player; fetch(object_player); } -void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) { +void JournalPlayer::fetch(const ceph::ref_t<ObjectPlayer> &object_player) { ceph_assert(ceph_mutex_is_locked(m_lock)); uint64_t object_num = object_player->get_object_number(); @@ -673,7 +661,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { } if (r == 0) { - ObjectPlayerPtr object_player = get_object_player(object_num); + auto object_player = get_object_player(object_num); remove_empty_object_player(object_player); } process_state(object_num, r); @@ -690,7 +678,7 @@ void JournalPlayer::refetch(bool immediate) { return; } - ObjectPlayerPtr object_player = get_object_player(); + auto object_player = get_object_player(); if (object_player->refetch_required()) { object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE); fetch(object_player); @@ -723,7 +711,7 @@ void JournalPlayer::schedule_watch(bool immediate) { return; } - ObjectPlayerPtr object_player; + ceph::ref_t<ObjectPlayer> object_player; double watch_interval = m_watch_interval; switch (m_watch_step) { @@ -772,7 +760,7 @@ void JournalPlayer::handle_watch(uint64_t object_num, int r) { return; } - ObjectPlayerPtr object_player = get_object_player(object_num); + auto object_player = get_object_player(object_num); if (r == 0 && object_player->empty()) { // possibly need to prune this empty object player if we've // already fetched it after the active set was advanced with no @@ -824,8 +812,7 @@ void JournalPlayer::notify_entries_available() { m_handler_notified = true; ldout(m_cct, 10) << __func__ << ": entries available" << dendl; - m_journal_metadata->queue(new C_HandleEntriesAvailable( - m_replay_handler), 0); + m_journal_metadata->queue(new C_HandleEntriesAvailable(m_replay_handler), 0); } void JournalPlayer::notify_complete(int r) { @@ -833,8 +820,7 @@ void JournalPlayer::notify_complete(int r) { m_handler_notified = true; ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl; - m_journal_metadata->queue(new C_HandleComplete( - m_replay_handler), r); + m_journal_metadata->queue(new C_HandleComplete(m_replay_handler), r); } void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) { diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index 21f215410f7..f2ab14d7b43 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -30,9 +30,10 @@ public: typedef cls::journal::ObjectPositions ObjectPositions; typedef cls::journal::ObjectSetPosition ObjectSetPosition; - JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, - const JournalMetadataPtr& journal_metadata, - ReplayHandler *replay_handler, CacheManagerHandler *cache_manager_handler); + JournalPlayer(librados::IoCtx &ioctx, std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, + ReplayHandler* replay_handler, + CacheManagerHandler *cache_manager_handler); ~JournalPlayer(); void prefetch(); @@ -43,7 +44,7 @@ public: private: typedef std::set<uint8_t> PrefetchSplayOffsets; - typedef std::map<uint8_t, ObjectPlayerPtr> SplayedObjectPlayers; + typedef std::map<uint8_t, ceph::ref_t<ObjectPlayer>> SplayedObjectPlayers; typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions; typedef std::set<uint64_t> ObjectNumbers; @@ -103,10 +104,10 @@ private: }; librados::IoCtx m_ioctx; - CephContext *m_cct; + CephContext *m_cct = nullptr; std::string m_object_oid_prefix; - JournalMetadataPtr m_journal_metadata; - ReplayHandler *m_replay_handler; + ceph::ref_t<JournalMetadata> m_journal_metadata; + ReplayHandler* m_replay_handler; CacheManagerHandler *m_cache_manager_handler; std::string m_cache_name; @@ -116,12 +117,12 @@ private: AsyncOpTracker m_async_op_tracker; mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock"); - State m_state; - uint8_t m_splay_offset; + State m_state = STATE_INIT; + uint8_t m_splay_offset = 0; - bool m_watch_enabled; - bool m_watch_scheduled; - double m_watch_interval; + bool m_watch_enabled = false; + bool m_watch_scheduled = false; + double m_watch_interval = 0; WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT; bool m_watch_prune_active_tag = false; @@ -148,16 +149,16 @@ private: void prune_tag(uint64_t tag_tid); void prune_active_tag(const boost::optional<uint64_t>& tag_tid); - ObjectPlayerPtr get_object_player() const; - ObjectPlayerPtr get_object_player(uint64_t object_number) const; - bool remove_empty_object_player(const ObjectPlayerPtr &object_player); + ceph::ref_t<ObjectPlayer> get_object_player() const; + ceph::ref_t<ObjectPlayer> get_object_player(uint64_t object_number) const; + bool remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &object_player); void process_state(uint64_t object_number, int r); int process_prefetch(uint64_t object_number); int process_playback(uint64_t object_number); void fetch(uint64_t object_num); - void fetch(const ObjectPlayerPtr &object_player); + void fetch(const ceph::ref_t<ObjectPlayer> &object_player); void handle_fetched(uint64_t object_num, int r); void refetch(bool immediate); diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 9629d9f735f..402c903fe81 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -20,15 +20,16 @@ namespace journal { namespace { struct C_Flush : public Context { - JournalMetadataPtr journal_metadata; + ceph::ref_t<JournalMetadata> journal_metadata; Context *on_finish; - std::atomic<int64_t> pending_flushes = { 0 }; - int ret_val; + std::atomic<int64_t> pending_flushes{0}; + int ret_val = 0; - C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish, + C_Flush(ceph::ref_t<JournalMetadata> _journal_metadata, Context *_on_finish, size_t _pending_flushes) - : journal_metadata(_journal_metadata), on_finish(_on_finish), - pending_flushes(_pending_flushes), ret_val(0) { + : journal_metadata(std::move(_journal_metadata)), + on_finish(_on_finish), + pending_flushes(_pending_flushes) { } void complete(int r) override { @@ -48,22 +49,21 @@ struct C_Flush : public Context { } // anonymous namespace JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, - const std::string &object_oid_prefix, - const JournalMetadataPtr& journal_metadata, + std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, uint64_t max_in_flight_appends) - : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), - m_journal_metadata(journal_metadata), - m_max_in_flight_appends(max_in_flight_appends), m_listener(this), + : m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(std::move(journal_metadata)), + m_max_in_flight_appends(max_in_flight_appends), + m_listener(this), m_object_handler(this), - m_lock(ceph::make_mutex("JournalerRecorder::m_lock")), m_current_set(m_journal_metadata->get_active_set()), m_object_locks{ceph::make_lock_container<ceph::mutex>( - journal_metadata->get_splay_width(), [](const size_t splay_offset) { + m_journal_metadata->get_splay_width(), [](const size_t splay_offset) { return ceph::make_mutex("ObjectRecorder::m_lock::" + std::to_string(splay_offset)); })} { - std::lock_guard locker{m_lock}; m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); @@ -141,10 +141,10 @@ Future JournalRecorder::append(uint64_t tag_tid, uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = entry_tid % splay_width; - ObjectRecorderPtr object_ptr = get_object(splay_offset); + auto object_ptr = get_object(splay_offset); uint64_t commit_tid = m_journal_metadata->allocate_commit_tid( object_ptr->get_object_number(), tag_tid, entry_tid); - FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid)); + auto future = ceph::make_ref<FutureImpl>(tag_tid, entry_tid, commit_tid); future->init(m_prev_future); m_prev_future = future; @@ -176,9 +176,8 @@ void JournalRecorder::flush(Context *on_safe) { std::lock_guard locker{m_lock}; ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); - for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); - it != m_object_ptrs.end(); ++it) { - it->second->flush(ctx); + for (const auto& p : m_object_ptrs) { + p.second->flush(ctx); } } @@ -187,11 +186,11 @@ void JournalRecorder::flush(Context *on_safe) { ctx->complete(0); } -ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { +ceph::ref_t<ObjectRecorder> JournalRecorder::get_object(uint8_t splay_offset) { ceph_assert(ceph_mutex_is_locked(m_lock)); - ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset]; - ceph_assert(object_recoder != NULL); + const auto& object_recoder = m_object_ptrs.at(splay_offset); + ceph_assert(object_recoder); return object_recoder; } @@ -261,9 +260,8 @@ void JournalRecorder::open_object_set() { uint8_t splay_width = m_journal_metadata->get_splay_width(); lock_object_recorders(); - for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); - it != m_object_ptrs.end(); ++it) { - ObjectRecorderPtr object_recorder = it->second; + for (const auto& p : m_object_ptrs) { + const auto& object_recorder = p.second; uint64_t object_number = object_recorder->get_object_number(); if (object_number / splay_width != m_current_set) { ceph_assert(object_recorder->is_closed()); @@ -283,9 +281,8 @@ bool JournalRecorder::close_object_set(uint64_t active_set) { // closing the object to ensure correct order of future appends uint8_t splay_width = m_journal_metadata->get_splay_width(); lock_object_recorders(); - for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); - it != m_object_ptrs.end(); ++it) { - ObjectRecorderPtr object_recorder = it->second; + for (const auto& p : m_object_ptrs) { + const auto& object_recorder = p.second; if (object_recorder->get_object_number() / splay_width != active_set) { ldout(m_cct, 10) << "closing object " << object_recorder->get_oid() << dendl; @@ -302,21 +299,21 @@ bool JournalRecorder::close_object_set(uint64_t active_set) { return (m_in_flight_object_closes == 0); } -ObjectRecorderPtr JournalRecorder::create_object_recorder( +ceph::ref_t<ObjectRecorder> JournalRecorder::create_object_recorder( uint64_t object_number, ceph::mutex* lock) { ldout(m_cct, 10) << "object_number=" << object_number << dendl; - ObjectRecorderPtr object_recorder(new ObjectRecorder( + auto object_recorder = ceph::make_ref<ObjectRecorder>( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), object_number, lock, m_journal_metadata->get_work_queue(), &m_object_handler, m_journal_metadata->get_order(), - m_max_in_flight_appends)); + m_max_in_flight_appends); object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes, m_flush_age); return object_recorder; } void JournalRecorder::create_next_object_recorder( - ObjectRecorderPtr object_recorder) { + ceph::ref_t<ObjectRecorder> object_recorder) { ceph_assert(ceph_mutex_is_locked(m_lock)); uint64_t object_number = object_recorder->get_object_number(); @@ -326,7 +323,7 @@ void JournalRecorder::create_next_object_recorder( ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset])); - ObjectRecorderPtr new_object_recorder = create_object_recorder( + auto new_object_recorder = create_object_recorder( (m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]); ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", " @@ -342,7 +339,7 @@ void JournalRecorder::create_next_object_recorder( } new_object_recorder->append(std::move(append_buffers)); - m_object_ptrs[splay_offset] = new_object_recorder; + m_object_ptrs[splay_offset] = std::move(new_object_recorder); } void JournalRecorder::handle_update() { @@ -373,7 +370,7 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { uint64_t object_number = object_recorder->get_object_number(); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_number % splay_width; - ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; + auto& active_object_recorder = m_object_ptrs.at(splay_offset); ceph_assert(active_object_recorder->get_object_number() == object_number); ceph_assert(m_in_flight_object_closes > 0); @@ -401,7 +398,7 @@ void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { uint64_t object_number = object_recorder->get_object_number(); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_number % splay_width; - ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; + auto& active_object_recorder = m_object_ptrs.at(splay_offset); ceph_assert(active_object_recorder->get_object_number() == object_number); ldout(m_cct, 10) << "object " << active_object_recorder->get_oid() diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index 7395283325e..3bd036fb5f7 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -22,8 +22,8 @@ namespace journal { class JournalRecorder { public: - JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, - const JournalMetadataPtr &journal_metadata, + JournalRecorder(librados::IoCtx &ioctx, std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, uint64_t max_in_flight_appends); ~JournalRecorder(); @@ -35,10 +35,10 @@ public: Future append(uint64_t tag_tid, const bufferlist &bl); void flush(Context *on_safe); - ObjectRecorderPtr get_object(uint8_t splay_offset); + ceph::ref_t<ObjectRecorder> get_object(uint8_t splay_offset); private: - typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs; + typedef std::map<uint8_t, ceph::ref_t<ObjectRecorder>> ObjectRecorderPtrs; struct Listener : public JournalMetadataListener { JournalRecorder *journal_recorder; @@ -78,10 +78,10 @@ private: }; librados::IoCtx m_ioctx; - CephContext *m_cct; + CephContext *m_cct = nullptr; std::string m_object_oid_prefix; - JournalMetadataPtr m_journal_metadata; + ceph::ref_t<JournalMetadata> m_journal_metadata; uint32_t m_flush_interval = 0; uint64_t m_flush_bytes = 0; @@ -91,7 +91,7 @@ private: Listener m_listener; ObjectHandler m_object_handler; - ceph::mutex m_lock; + ceph::mutex m_lock = ceph::make_mutex("JournalerRecorder::m_lock"); uint32_t m_in_flight_advance_sets = 0; uint32_t m_in_flight_object_closes = 0; @@ -99,7 +99,7 @@ private: ObjectRecorderPtrs m_object_ptrs; ceph::containers::tiny_vector<ceph::mutex> m_object_locks; - FutureImplPtr m_prev_future; + ceph::ref_t<FutureImpl> m_prev_future; Context *m_on_object_set_advanced = nullptr; @@ -111,9 +111,9 @@ private: void close_and_advance_object_set(uint64_t object_set); - ObjectRecorderPtr create_object_recorder(uint64_t object_number, + ceph::ref_t<ObjectRecorder> create_object_recorder(uint64_t object_number, ceph::mutex* lock); - void create_next_object_recorder(ObjectRecorderPtr object_recorder); + void create_next_object_recorder(ceph::ref_t<ObjectRecorder> object_recorder); void handle_update(); diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc index d091243b388..3f05d40e6b5 100644 --- a/src/journal/JournalTrimmer.cc +++ b/src/journal/JournalTrimmer.cc @@ -31,7 +31,7 @@ struct JournalTrimmer::C_RemoveSet : public Context { JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, - const JournalMetadataPtr &journal_metadata) + const ceph::ref_t<JournalMetadata>& journal_metadata) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), m_journal_metadata(journal_metadata), m_metadata_listener(this), m_remove_set_pending(false), diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h index 719be88e77b..9c74961c925 100644 --- a/src/journal/JournalTrimmer.h +++ b/src/journal/JournalTrimmer.h @@ -21,7 +21,7 @@ public: typedef cls::journal::ObjectSetPosition ObjectSetPosition; JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, - const JournalMetadataPtr &journal_metadata); + const ceph::ref_t<JournalMetadata> &journal_metadata); ~JournalTrimmer(); void shut_down(Context *on_finish); @@ -64,7 +64,7 @@ private: CephContext *m_cct; std::string m_object_oid_prefix; - JournalMetadataPtr m_journal_metadata; + ceph::ref_t<JournalMetadata> m_journal_metadata; MetadataListener m_metadata_listener; AsyncOpTracker m_async_op_tracker; diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 6190674ade7..b5aa8cf23fb 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -100,10 +100,9 @@ void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer, m_header_oid = header_oid(journal_id); m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id); - m_metadata = new JournalMetadata(work_queue, timer, timer_lock, + m_metadata = ceph::make_ref<JournalMetadata>(work_queue, timer, timer_lock, m_header_ioctx, m_header_oid, m_client_id, settings); - m_metadata->get(); } Journaler::~Journaler() { @@ -114,8 +113,7 @@ Journaler::~Journaler() { // since we wouldn't expect shut_down to be invoked m_metadata->wait_for_ops(); } - m_metadata->put(); - m_metadata = nullptr; + m_metadata.reset(); } ceph_assert(m_trimmer == nullptr); ceph_assert(m_player == nullptr); @@ -175,13 +173,10 @@ void Journaler::shut_down(Context *on_finish) { ceph_assert(m_player == nullptr); ceph_assert(m_recorder == nullptr); - JournalMetadata *metadata = nullptr; - ceph_assert(m_metadata != nullptr); - std::swap(metadata, m_metadata); - ceph_assert(metadata != nullptr); + auto metadata = std::move(m_metadata); + ceph_assert(metadata); on_finish = new LambdaContext([metadata, on_finish](int r) { - metadata->put(); on_finish->complete(0); }); @@ -336,12 +331,12 @@ void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish); } -void Journaler::start_replay(ReplayHandler *replay_handler) { +void Journaler::start_replay(ReplayHandler* replay_handler) { create_player(replay_handler); m_player->prefetch(); } -void Journaler::start_live_replay(ReplayHandler *replay_handler, +void Journaler::start_live_replay(ReplayHandler* replay_handler, double interval) { create_player(replay_handler); m_player->prefetch_and_watch(interval); @@ -371,17 +366,14 @@ void Journaler::stop_replay() { } void Journaler::stop_replay(Context *on_finish) { - JournalPlayer *player = nullptr; - ceph_assert(m_player != nullptr); - std::swap(player, m_player); - ceph_assert(player != nullptr); + auto player = std::move(m_player); + auto* playerp = player.get(); - auto f = [player, on_finish](int r) { - delete player; + auto f = [player=std::move(player), on_finish](int r) { on_finish->complete(r); }; on_finish = new LambdaContext(std::move(f)); - player->shut_down(on_finish); + playerp->shut_down(on_finish); } void Journaler::committed(const ReplayEntry &replay_entry) { @@ -389,7 +381,7 @@ void Journaler::committed(const ReplayEntry &replay_entry) { } void Journaler::committed(const Future &future) { - FutureImplPtr future_impl = future.get_future_impl(); + auto& future_impl = future.get_future_impl(); m_trimmer->committed(future_impl->get_commit_tid()); } @@ -398,7 +390,7 @@ void Journaler::start_append(uint64_t max_in_flight_appends) { // TODO verify active object set >= current replay object set - m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, + m_recorder = std::make_unique<JournalRecorder>(m_data_ioctx, m_object_oid_prefix, m_metadata, max_in_flight_appends); } @@ -410,16 +402,14 @@ void Journaler::set_append_batch_options(int flush_interval, } void Journaler::stop_append(Context *on_safe) { - JournalRecorder *recorder = nullptr; - ceph_assert(m_recorder != nullptr); - std::swap(recorder, m_recorder); - ceph_assert(recorder != nullptr); + auto recorder = std::move(m_recorder); + ceph_assert(recorder); - on_safe = new LambdaContext([recorder, on_safe](int r) { - delete recorder; + auto* recorderp = recorder.get(); + on_safe = new LambdaContext([recorder=std::move(recorder), on_safe](int r) { on_safe->complete(r); }); - recorder->shut_down(on_safe); + recorderp->shut_down(on_safe); } uint64_t Journaler::get_max_append_size() const { @@ -440,9 +430,9 @@ void Journaler::flush_append(Context *on_safe) { m_recorder->flush(on_safe); } -void Journaler::create_player(ReplayHandler *replay_handler) { +void Journaler::create_player(ReplayHandler* replay_handler) { ceph_assert(m_player == nullptr); - m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata, + m_player = std::make_unique<JournalPlayer>(m_data_ioctx, m_object_oid_prefix, m_metadata, replay_handler, m_cache_manager_handler); } diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 17397d7ec2a..fe44401848a 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -24,9 +24,6 @@ namespace journal { struct CacheManagerHandler; -class JournalMetadata; -class JournalPlayer; -class JournalRecorder; class JournalTrimmer; class ReplayEntry; class ReplayHandler; @@ -103,8 +100,8 @@ public: void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, Tags *tags, Context *on_finish); - void start_replay(ReplayHandler *replay_handler); - void start_live_replay(ReplayHandler *replay_handler, double interval); + void start_replay(ReplayHandler* replay_handler); + void start_live_replay(ReplayHandler* replay_handler, double interval); bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr); void stop_replay(); void stop_replay(Context *on_finish); @@ -149,9 +146,9 @@ private: std::string m_object_oid_prefix; bool m_initialized = false; - JournalMetadata *m_metadata = nullptr; - JournalPlayer *m_player = nullptr; - JournalRecorder *m_recorder = nullptr; + ceph::ref_t<class JournalMetadata> m_metadata; + std::unique_ptr<class JournalPlayer> m_player; + std::unique_ptr<class JournalRecorder> m_recorder; JournalTrimmer *m_trimmer = nullptr; void set_up(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, @@ -159,7 +156,7 @@ private: const Settings &settings); int init_complete(); - void create_player(ReplayHandler *replay_handler); + void create_player(ReplayHandler* replay_handler); friend std::ostream &operator<<(std::ostream &os, const Journaler &journaler); diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index 17bb8574a56..939b294b618 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -43,17 +43,16 @@ bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter, } // anonymous namespace ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, - const std::string &object_oid_prefix, + const std::string& object_oid_prefix, uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock, uint8_t order, uint64_t max_fetch_bytes) - : RefCountedObject(NULL, 0), m_object_num(object_num), + : m_object_num(object_num), m_oid(utils::get_object_name(object_oid_prefix, m_object_num)), - m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order), + m_timer(timer), m_timer_lock(timer_lock), m_order(order), m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order), - m_watch_interval(0), m_watch_task(NULL), - m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this))), - m_fetch_in_progress(false) { + m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this))) +{ m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); } diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h index 568d31b0635..41641dd150a 100644 --- a/src/journal/ObjectPlayer.h +++ b/src/journal/ObjectPlayer.h @@ -12,7 +12,6 @@ #include "journal/Entry.h" #include <list> #include <string> -#include <boost/intrusive_ptr.hpp> #include <boost/noncopyable.hpp> #include <boost/unordered_map.hpp> #include "include/ceph_assert.h" @@ -21,9 +20,6 @@ class SafeTimer; namespace journal { -class ObjectPlayer; -typedef boost::intrusive_ptr<ObjectPlayer> ObjectPlayerPtr; - class ObjectPlayer : public RefCountedObject { public: typedef std::list<Entry> Entries; @@ -35,11 +31,6 @@ public: REFETCH_STATE_IMMEDIATE }; - ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, - uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock, - uint8_t order, uint64_t max_fetch_bytes); - ~ObjectPlayer() override; - inline const std::string &get_oid() const { return m_oid; } @@ -83,11 +74,17 @@ public: } private: + FRIEND_MAKE_REF(ObjectPlayer); + ObjectPlayer(librados::IoCtx &ioctx, const std::string& object_oid_prefix, + uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock, + uint8_t order, uint64_t max_fetch_bytes); + ~ObjectPlayer() override; + typedef std::pair<uint64_t, uint64_t> EntryKey; typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys; struct C_Fetch : public Context { - ObjectPlayerPtr object_player; + ceph::ref_t<ObjectPlayer> object_player; Context *on_finish; bufferlist read_bl; C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) { @@ -95,7 +92,7 @@ private: void finish(int r) override; }; struct C_WatchFetch : public Context { - ObjectPlayerPtr object_player; + ceph::ref_t<ObjectPlayer> object_player; C_WatchFetch(ObjectPlayer *o) : object_player(o) { } void finish(int r) override; @@ -104,7 +101,7 @@ private: librados::IoCtx m_ioctx; uint64_t m_object_num; std::string m_oid; - CephContext *m_cct; + CephContext *m_cct = nullptr; SafeTimer &m_timer; ceph::mutex &m_timer_lock; @@ -112,11 +109,11 @@ private: uint8_t m_order; uint64_t m_max_fetch_bytes; - double m_watch_interval; - Context *m_watch_task; + double m_watch_interval = 0; + Context *m_watch_task = nullptr; mutable ceph::mutex m_lock; - bool m_fetch_in_progress; + bool m_fetch_in_progress = false; bufferlist m_read_bl; uint32_t m_read_off = 0; uint32_t m_read_bl_off = 0; diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 9f1f37eed1c..1823d7a8884 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -19,16 +19,16 @@ using std::shared_ptr; namespace journal { -ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, +ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid, uint64_t object_number, ceph::mutex* lock, ContextWQ *work_queue, Handler *handler, uint8_t order, int32_t max_in_flight_appends) - : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), - m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler), + : m_oid(oid), m_object_number(object_number), + m_op_work_queue(work_queue), m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), - m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this), - m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0), - m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { + m_max_in_flight_appends(max_in_flight_appends), + m_lock(lock), m_last_flush_time(ceph_clock_now()) +{ m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); ceph_assert(m_handler != NULL); @@ -70,11 +70,12 @@ bool ObjectRecorder::append(AppendBuffers &&append_buffers) { ceph_assert(ceph_mutex_is_locked(*m_lock)); - FutureImplPtr last_flushed_future; + ceph::ref_t<FutureImpl> last_flushed_future; + auto flush_handler = get_flush_handler(); for (auto& append_buffer : append_buffers) { ldout(m_cct, 20) << *append_buffer.first << ", " << "size=" << append_buffer.second.length() << dendl; - bool flush_requested = append_buffer.first->attach(&m_flush_handler); + bool flush_requested = append_buffer.first->attach(flush_handler); if (flush_requested) { last_flushed_future = append_buffer.first; } @@ -121,19 +122,23 @@ void ObjectRecorder::flush(Context *on_safe) { } } -void ObjectRecorder::flush(const FutureImplPtr &future) { +void ObjectRecorder::flush(const ceph::ref_t<FutureImpl>& future) { ldout(m_cct, 20) << "flushing " << *future << dendl; m_lock->lock(); - if (future->get_flush_handler().get() != &m_flush_handler) { - // if we don't own this future, re-issue the flush so that it hits the - // correct journal object owner - future->flush(); - m_lock->unlock(); - return; - } else if (future->is_flush_in_progress()) { - m_lock->unlock(); - return; + { + auto flush_handler = future->get_flush_handler(); + auto my_handler = get_flush_handler(); + if (flush_handler != my_handler) { + // if we don't own this future, re-issue the flush so that it hits the + // correct journal object owner + future->flush(); + m_lock->unlock(); + return; + } else if (future->is_flush_in_progress()) { + m_lock->unlock(); + return; + } } bool overflowed = send_appends(true, future); @@ -258,7 +263,7 @@ void ObjectRecorder::append_overflowed() { restart_append_buffers.swap(m_pending_buffers); } -bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { +bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_future) { ldout(m_cct, 20) << dendl; ceph_assert(ceph_mutex_is_locked(*m_lock)); diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 30dde76c92b..8b4e0a20dc3 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -14,7 +14,6 @@ #include <list> #include <map> #include <set> -#include <boost/intrusive_ptr.hpp> #include <boost/noncopyable.hpp> #include "include/ceph_assert.h" @@ -23,9 +22,8 @@ class SafeTimer; namespace journal { class ObjectRecorder; -typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr; -typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer; +typedef std::pair<ceph::ref_t<FutureImpl>, bufferlist> AppendBuffer; typedef std::list<AppendBuffer> AppendBuffers; class ObjectRecorder : public RefCountedObject, boost::noncopyable { @@ -37,12 +35,6 @@ public: virtual void overflow(ObjectRecorder *object_recorder) = 0; }; - ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, - uint64_t object_number, ceph::mutex* lock, - ContextWQ *work_queue, Handler *handler, uint8_t order, - int32_t max_in_flight_appends); - ~ObjectRecorder() override; - void set_append_batch_options(int flush_interval, uint64_t flush_bytes, double flush_age); @@ -55,7 +47,7 @@ public: bool append(AppendBuffers &&append_buffers); void flush(Context *on_safe); - void flush(const FutureImplPtr &future); + void flush(const ceph::ref_t<FutureImpl> &future); void claim_append_buffers(AppendBuffers *append_buffers); @@ -75,39 +67,38 @@ public: } private: + FRIEND_MAKE_REF(ObjectRecorder); + ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid, + uint64_t object_number, ceph::mutex* lock, + ContextWQ *work_queue, Handler *handler, uint8_t order, + int32_t max_in_flight_appends); + ~ObjectRecorder() override; + typedef std::set<uint64_t> InFlightTids; typedef std::map<uint64_t, AppendBuffers> InFlightAppends; struct FlushHandler : public FutureImpl::FlushHandler { - ObjectRecorder *object_recorder; - FlushHandler(ObjectRecorder *o) : object_recorder(o) {} - void get() override { - object_recorder->get(); - } - void put() override { - object_recorder->put(); - } - void flush(const FutureImplPtr &future) override { + ceph::ref_t<ObjectRecorder> object_recorder; + virtual void flush(const ceph::ref_t<FutureImpl> &future) override { object_recorder->flush(future); } + FlushHandler(ceph::ref_t<ObjectRecorder> o) : object_recorder(std::move(o)) {} }; struct C_AppendFlush : public Context { - ObjectRecorder *object_recorder; + ceph::ref_t<ObjectRecorder> object_recorder; uint64_t tid; - C_AppendFlush(ObjectRecorder *o, uint64_t _tid) - : object_recorder(o), tid(_tid) { - object_recorder->get(); + C_AppendFlush(ceph::ref_t<ObjectRecorder> o, uint64_t _tid) + : object_recorder(std::move(o)), tid(_tid) { } void finish(int r) override { object_recorder->handle_append_flushed(tid, r); - object_recorder->put(); } }; librados::IoCtx m_ioctx; std::string m_oid; uint64_t m_object_number; - CephContext *m_cct; + CephContext *m_cct = nullptr; ContextWQ *m_op_work_queue; @@ -123,28 +114,37 @@ private: bool m_compat_mode; - FlushHandler m_flush_handler; + /* So that ObjectRecorder::FlushHandler doesn't create a circular reference: */ + std::weak_ptr<FlushHandler> m_flush_handler; + auto get_flush_handler() { + auto h = m_flush_handler.lock(); + if (!h) { + h = std::make_shared<FlushHandler>(this); + m_flush_handler = h; + } + return h; + } mutable ceph::mutex* m_lock; AppendBuffers m_pending_buffers; uint64_t m_pending_bytes = 0; utime_t m_last_flush_time; - uint64_t m_append_tid; + uint64_t m_append_tid = 0; InFlightTids m_in_flight_tids; InFlightAppends m_in_flight_appends; uint64_t m_object_bytes = 0; - bool m_overflowed; - bool m_object_closed; + bool m_overflowed = false; + bool m_object_closed = false; bufferlist m_prefetch_bl; - bool m_in_flight_flushes; + bool m_in_flight_flushes = false; ceph::condition_variable m_in_flight_flushes_cond; uint64_t m_in_flight_bytes = 0; - bool send_appends(bool force, FutureImplPtr flush_sentinal); + bool send_appends(bool force, ceph::ref_t<FutureImpl> flush_sentinal); void handle_append_flushed(uint64_t tid, int r); void append_overflowed(); diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h index e61240d8c1f..d0967c68d7e 100644 --- a/src/journal/ReplayHandler.h +++ b/src/journal/ReplayHandler.h @@ -6,14 +6,10 @@ namespace journal { -struct ReplayHandler { - virtual ~ReplayHandler() {} - - virtual void get() = 0; - virtual void put() = 0; - +struct ReplayHandler { virtual void handle_entries_available() = 0; virtual void handle_complete(int r) = 0; + virtual ~ReplayHandler() {} }; } // namespace journal diff --git a/src/libradosstriper/RadosStriperImpl.cc b/src/libradosstriper/RadosStriperImpl.cc index d2af7875848..57d88f59aed 100644 --- a/src/libradosstriper/RadosStriperImpl.cc +++ b/src/libradosstriper/RadosStriperImpl.cc @@ -27,6 +27,7 @@ #include "include/ceph_fs.h" #include "common/dout.h" #include "common/strtol.h" +#include "common/RefCountedObj.h" #include "osdc/Striper.h" #include "librados/AioCompletionImpl.h" #include <cls/lock/cls_lock_client.h> @@ -124,14 +125,6 @@ namespace { * function in asynchronous operations */ struct CompletionData : RefCountedObject { - /// constructor - CompletionData(libradosstriper::RadosStriperImpl * striper, - const std::string& soid, - const std::string& lockCookie, - librados::AioCompletionImpl *userCompletion = 0, - int n = 1); - /// destructor - ~CompletionData() override; /// complete method void complete(int r); /// striper to be used to handle the write completion @@ -142,15 +135,21 @@ struct CompletionData : RefCountedObject { std::string m_lockCookie; /// completion handler librados::IoCtxImpl::C_aio_Complete *m_ack; +protected: + CompletionData(libradosstriper::RadosStriperImpl * striper, + const std::string& soid, + const std::string& lockCookie, + librados::AioCompletionImpl *userCompletion = 0); + ~CompletionData() override; + }; CompletionData::CompletionData (libradosstriper::RadosStriperImpl* striper, const std::string& soid, const std::string& lockCookie, - librados::AioCompletionImpl *userCompletion, - int n) : - RefCountedObject(striper->cct(), n), + librados::AioCompletionImpl *userCompletion) : + RefCountedObject(striper->cct()), m_striper(striper), m_soid(soid), m_lockCookie(lockCookie), m_ack(0) { m_striper->get(); if (userCompletion) { @@ -183,21 +182,21 @@ struct ReadCompletionData : CompletionData { int m_readRc; /// completion object for the unlocking of the striped object at the end of the read librados::AioCompletion *m_unlockCompletion; - /// constructor + /// complete method for when reading is over + void complete_read(int r); + /// complete method for when object is unlocked + void complete_unlock(int r); + +private: + FRIEND_MAKE_REF(ReadCompletionData); ReadCompletionData(libradosstriper::RadosStriperImpl * striper, const std::string& soid, const std::string& lockCookie, librados::AioCompletionImpl *userCompletion, bufferlist* bl, std::vector<ObjectExtent>* extents, - std::vector<bufferlist>* resultbl, - int n); - /// destructor + std::vector<bufferlist>* resultbl); ~ReadCompletionData() override; - /// complete method for when reading is over - void complete_read(int r); - /// complete method for when object is unlocked - void complete_unlock(int r); }; ReadCompletionData::ReadCompletionData @@ -207,9 +206,8 @@ ReadCompletionData::ReadCompletionData librados::AioCompletionImpl *userCompletion, bufferlist* bl, std::vector<ObjectExtent>* extents, - std::vector<bufferlist>* resultbl, - int n) : - CompletionData(striper, soid, lockCookie, userCompletion, n), + std::vector<bufferlist>* resultbl) : + CompletionData(striper, soid, lockCookie, userCompletion), m_bl(bl), m_extents(extents), m_resultbl(resultbl), m_readRc(0), m_unlockCompletion(0) {} @@ -251,30 +249,30 @@ struct WriteCompletionData : CompletionData { librados::AioCompletion *m_unlockCompletion; /// return code of write completion, to be remembered until unlocking happened int m_writeRc; - /// constructor - WriteCompletionData(libradosstriper::RadosStriperImpl * striper, - const std::string& soid, - const std::string& lockCookie, - librados::AioCompletionImpl *userCompletion, - int n); - /// destructor - ~WriteCompletionData() override; /// complete method for when writing is over void complete_write(int r); /// complete method for when object is unlocked void complete_unlock(int r); /// safe method void safe(int r); +private: + FRIEND_MAKE_REF(WriteCompletionData); + /// constructor + WriteCompletionData(libradosstriper::RadosStriperImpl * striper, + const std::string& soid, + const std::string& lockCookie, + librados::AioCompletionImpl *userCompletion); + /// destructor + ~WriteCompletionData() override; }; WriteCompletionData::WriteCompletionData (libradosstriper::RadosStriperImpl* striper, const std::string& soid, const std::string& lockCookie, - librados::AioCompletionImpl *userCompletion, - int n) : - CompletionData(striper, soid, lockCookie, userCompletion, n), m_safe(0), - m_unlockCompletion(0), m_writeRc(0) { + librados::AioCompletionImpl *userCompletion) : + CompletionData(striper, soid, lockCookie, userCompletion), + m_safe(0), m_unlockCompletion(0), m_writeRc(0) { if (userCompletion) { m_safe = new librados::IoCtxImpl::C_aio_Complete(userCompletion); } @@ -303,6 +301,9 @@ void WriteCompletionData::safe(int r) { struct RemoveCompletionData : CompletionData { /// removal flags int flags; + +private: + FRIEND_MAKE_REF(RemoveCompletionData); /** * constructor * note that the constructed object will take ownership of the lock @@ -320,6 +321,15 @@ struct RemoveCompletionData : CompletionData { * function in asynchronous truncate operations */ struct TruncateCompletionData : RefCountedObject { + /// striper to be used + libradosstriper::RadosStriperImpl *m_striper; + /// striped object concerned by the truncate operation + std::string m_soid; + /// the final size of the truncated object + uint64_t m_size; + +private: + FRIEND_MAKE_REF(TruncateCompletionData); /// constructor TruncateCompletionData(libradosstriper::RadosStriperImpl* striper, const std::string& soid, @@ -332,12 +342,6 @@ struct TruncateCompletionData : RefCountedObject { ~TruncateCompletionData() override { m_striper->put(); } - /// striper to be used - libradosstriper::RadosStriperImpl *m_striper; - /// striped object concerned by the truncate operation - std::string m_soid; - /// the final size of the truncated object - uint64_t m_size; }; /** @@ -345,20 +349,22 @@ struct TruncateCompletionData : RefCountedObject { * function in asynchronous read operations of a Rados File */ struct RadosReadCompletionData : RefCountedObject { - /// constructor - RadosReadCompletionData(MultiAioCompletionImplPtr multiAioCompl, - uint64_t expectedBytes, - bufferlist *bl, - CephContext *context, - int n = 1) : - RefCountedObject(context, n), - m_multiAioCompl(multiAioCompl), m_expectedBytes(expectedBytes), m_bl(bl) {} /// the multi asynch io completion object to be used MultiAioCompletionImplPtr m_multiAioCompl; /// the expected number of bytes uint64_t m_expectedBytes; /// the bufferlist object where data have been written bufferlist *m_bl; + +private: + FRIEND_MAKE_REF(RadosReadCompletionData); + /// constructor + RadosReadCompletionData(MultiAioCompletionImplPtr multiAioCompl, + uint64_t expectedBytes, + bufferlist *bl, + CephContext *context) : + RefCountedObject(context), + m_multiAioCompl(multiAioCompl), m_expectedBytes(expectedBytes), m_bl(bl) {} }; /** @@ -368,16 +374,6 @@ struct RadosReadCompletionData : RefCountedObject { * versions (time_t or struct timespec) */ struct BasicStatCompletionData : CompletionData { - /// constructor - BasicStatCompletionData(libradosstriper::RadosStriperImpl* striper, - const std::string& soid, - librados::AioCompletionImpl *userCompletion, - libradosstriper::MultiAioCompletionImpl *multiCompletion, - uint64_t *psize, - int n = 1) : - CompletionData(striper, soid, "", userCompletion, n), - m_multiCompletion(multiCompletion), m_psize(psize), - m_statRC(0), m_getxattrRC(0) {}; // MultiAioCompletionImpl used to handle the double aysnc // call in the back (stat + getxattr) libradosstriper::MultiAioCompletionImpl *m_multiCompletion; @@ -393,6 +389,18 @@ struct BasicStatCompletionData : CompletionData { int m_statRC; /// return code of the getxattr int m_getxattrRC; + +protected: + /// constructor + BasicStatCompletionData(libradosstriper::RadosStriperImpl* striper, + const std::string& soid, + librados::AioCompletionImpl *userCompletion, + libradosstriper::MultiAioCompletionImpl *multiCompletion, + uint64_t *psize) : + CompletionData(striper, soid, "", userCompletion), + m_multiCompletion(multiCompletion), m_psize(psize), + m_statRC(0), m_getxattrRC(0) {}; + }; /** @@ -404,18 +412,19 @@ struct BasicStatCompletionData : CompletionData { */ template<class TimeType> struct StatCompletionData : BasicStatCompletionData { + // where to store the file time + TimeType *m_pmtime; +private: + FRIEND_MAKE_REF(StatCompletionData); /// constructor - StatCompletionData(libradosstriper::RadosStriperImpl* striper, + StatCompletionData<TimeType>(libradosstriper::RadosStriperImpl* striper, const std::string& soid, librados::AioCompletionImpl *userCompletion, libradosstriper::MultiAioCompletionImpl *multiCompletion, uint64_t *psize, - TimeType *pmtime, - int n = 1) : - BasicStatCompletionData(striper, soid, userCompletion, multiCompletion, psize, n), + TimeType *pmtime) : + BasicStatCompletionData(striper, soid, userCompletion, multiCompletion, psize), m_pmtime(pmtime) {}; - // where to store the file time - TimeType *m_pmtime; }; /** @@ -423,13 +432,15 @@ struct StatCompletionData : BasicStatCompletionData { * function in asynchronous remove operations of a Rados File */ struct RadosRemoveCompletionData : RefCountedObject { + /// the multi asynch io completion object to be used + MultiAioCompletionImplPtr m_multiAioCompl; +private: + FRIEND_MAKE_REF(RadosRemoveCompletionData); /// constructor RadosRemoveCompletionData(MultiAioCompletionImplPtr multiAioCompl, CephContext *context) : - RefCountedObject(context, 2), + RefCountedObject(context), m_multiAioCompl(multiAioCompl) {}; - /// the multi asynch io completion object to be used - MultiAioCompletionImplPtr m_multiAioCompl; }; @@ -614,16 +625,15 @@ int libradosstriper::RadosStriperImpl::aio_write_full(const std::string& soid, static void rados_read_aio_unlock_complete(rados_striper_multi_completion_t c, void *arg) { - auto cdata = reinterpret_cast<ReadCompletionData*>(arg); + auto cdata = ceph::ref_t<ReadCompletionData>(static_cast<ReadCompletionData*>(arg), false); libradosstriper::MultiAioCompletionImpl *comp = reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c); cdata->complete_unlock(comp->rval); - cdata->put(); } static void striper_read_aio_req_complete(rados_striper_multi_completion_t c, void *arg) { - auto cdata = reinterpret_cast<ReadCompletionData*>(arg); + auto cdata = static_cast<ReadCompletionData*>(arg); // launch the async unlocking of the object cdata->m_striper->aio_unlockObject(cdata->m_soid, cdata->m_lockCookie, cdata->m_unlockCompletion); // complete the read part in parallel @@ -634,19 +644,18 @@ static void striper_read_aio_req_complete(rados_striper_multi_completion_t c, vo static void rados_req_read_safe(rados_completion_t c, void *arg) { - auto data = reinterpret_cast<RadosReadCompletionData*>(arg); + auto data = ceph::ref_t<RadosReadCompletionData>(static_cast<RadosReadCompletionData*>(arg), false); int rc = rados_aio_get_return_value(c); // ENOENT means that we are dealing with a sparse file. This is fine, // data (0s) will be created on the fly by the rados_req_read_complete method if (rc == -ENOENT) rc = 0; auto multiAioComp = data->m_multiAioCompl; multiAioComp->safe_request(rc); - data->put(); } static void rados_req_read_complete(rados_completion_t c, void *arg) { - auto data = reinterpret_cast<RadosReadCompletionData*>(arg); + auto data = static_cast<RadosReadCompletionData*>(arg); int rc = rados_aio_get_return_value(c); // We need to handle the case of sparse files here if (rc == -ENOENT) { @@ -672,7 +681,6 @@ static void rados_req_read_complete(rados_completion_t c, void *arg) } auto multiAioComp = data->m_multiAioCompl; multiAioComp->complete_request(rc); - data->put(); } int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid, @@ -710,18 +718,17 @@ int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid, // create a completion object and transfer ownership of extents and resultbl vector<bufferlist> *resultbl = new vector<bufferlist>(extents->size()); - ReadCompletionData *cdata = new ReadCompletionData(this, soid, lockCookie, c, - bl, extents, resultbl, 1); + auto cdata = ceph::make_ref<ReadCompletionData>(this, soid, lockCookie, c, bl, extents, resultbl); c->is_read = true; c->io = m_ioCtxImpl; // create a completion for the unlocking of the striped object at the end of the read librados::AioCompletion *unlock_completion = - librados::Rados::aio_create_completion(cdata, rados_read_aio_unlock_complete, 0); + librados::Rados::aio_create_completion(cdata->get() /* create ref! */, rados_read_aio_unlock_complete, 0); cdata->m_unlockCompletion = unlock_completion; // create the multiCompletion object handling the reads MultiAioCompletionImplPtr nc{new libradosstriper::MultiAioCompletionImpl, false}; - nc->set_complete_callback(cdata, striper_read_aio_req_complete); + nc->set_complete_callback(cdata.get(), striper_read_aio_req_complete); // go through the extents int r = 0, i = 0; for (vector<ObjectExtent>::iterator p = extents->begin(); p != extents->end(); ++p) { @@ -738,9 +745,9 @@ int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid, nc->add_request(); // we need 2 references on data as both rados_req_read_safe and rados_req_read_complete // will release one - RadosReadCompletionData *data = new RadosReadCompletionData(nc, p->length, oid_bl, cct(), 2); + auto data = ceph::make_ref<RadosReadCompletionData>(nc, p->length, oid_bl, cct()); librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(data, rados_req_read_complete, rados_req_read_safe); + librados::Rados::aio_create_completion(data.detach(), rados_req_read_complete, rados_req_read_safe); r = m_ioCtx.aio_read(p->oid.name, rados_completion, oid_bl, p->length, p->offset); rados_completion->release(); if (r < 0) @@ -794,18 +801,17 @@ int libradosstriper::RadosStriperImpl::stat(const std::string& soid, uint64_t *p } static void striper_stat_aio_stat_complete(rados_completion_t c, void *arg) { - auto data = reinterpret_cast<BasicStatCompletionData*>(arg); + auto data = ceph::ref_t<BasicStatCompletionData>(static_cast<BasicStatCompletionData*>(arg), false); int rc = rados_aio_get_return_value(c); if (rc == -ENOENT) { // remember this has failed data->m_statRC = rc; } data->m_multiCompletion->complete_request(rc); - data->put(); } static void striper_stat_aio_getxattr_complete(rados_completion_t c, void *arg) { - auto data = reinterpret_cast<BasicStatCompletionData*>(arg); + auto data = ceph::ref_t<BasicStatCompletionData>(static_cast<BasicStatCompletionData*>(arg), false); int rc = rados_aio_get_return_value(c); // We need to handle the case of sparse files here if (rc < 0) { @@ -823,12 +829,11 @@ static void striper_stat_aio_getxattr_complete(rados_completion_t c, void *arg) rc = 0; } data->m_multiCompletion->complete_request(rc); - data->put(); } static void striper_stat_aio_req_complete(rados_striper_multi_completion_t c, void *arg) { - auto data = reinterpret_cast<BasicStatCompletionData*>(arg); + auto data = ceph::ref_t<BasicStatCompletionData>(static_cast<BasicStatCompletionData*>(arg), false); if (data->m_statRC) { data->complete(data->m_statRC); } else { @@ -838,7 +843,6 @@ static void striper_stat_aio_req_complete(rados_striper_multi_completion_t c, data->complete(0); } } - data->put(); } template<class TimeType> @@ -855,13 +859,11 @@ int libradosstriper::RadosStriperImpl::aio_generic_stat new libradosstriper::MultiAioCompletionImpl, false}; // Data object used for passing context to asynchronous calls std::string firstObjOid = getObjectId(soid, 0); - StatCompletionData<TimeType> *cdata = - new StatCompletionData<TimeType>(this, firstObjOid, c, - multi_completion.get(), psize, pmtime, 4); - multi_completion->set_complete_callback(cdata, striper_stat_aio_req_complete); + auto cdata = ceph::make_ref<StatCompletionData<TimeType>>(this, firstObjOid, c, multi_completion.get(), psize, pmtime); + multi_completion->set_complete_callback(cdata->get() /* create ref! */, striper_stat_aio_req_complete); // use a regular AioCompletion for the stat async call librados::AioCompletion *stat_completion = - librados::Rados::aio_create_completion(cdata, striper_stat_aio_stat_complete, 0); + librados::Rados::aio_create_completion(cdata->get() /* create ref! */, striper_stat_aio_stat_complete, 0); multi_completion->add_safe_request(); object_t obj(firstObjOid); int rc = (m_ioCtxImpl->*statFunction)(obj, stat_completion->pc, @@ -869,12 +871,12 @@ int libradosstriper::RadosStriperImpl::aio_generic_stat stat_completion->release(); if (rc < 0) { // nothing is really started so cancel everything - delete cdata; + delete cdata.detach(); return rc; } // use a regular AioCompletion for the getxattr async call librados::AioCompletion *getxattr_completion = - librados::Rados::aio_create_completion(cdata, striper_stat_aio_getxattr_complete, 0); + librados::Rados::aio_create_completion(cdata->get() /* create ref! */, striper_stat_aio_getxattr_complete, 0); multi_completion->add_safe_request(); // in parallel, get the pmsize from the first object asynchronously rc = m_ioCtxImpl->aio_getxattr(obj, getxattr_completion->pc, @@ -888,7 +890,6 @@ int libradosstriper::RadosStriperImpl::aio_generic_stat multi_completion->complete_request(rc); return rc; } - cdata->put(); return 0; } @@ -925,31 +926,29 @@ int libradosstriper::RadosStriperImpl::aio_stat2(const std::string& soid, static void rados_req_remove_complete(rados_completion_t c, void *arg) { - auto cdata = reinterpret_cast<RadosRemoveCompletionData*>(arg); + auto cdata = static_cast<RadosRemoveCompletionData*>(arg); int rc = rados_aio_get_return_value(c); // in case the object did not exist, it means we had a sparse file, all is fine if (rc == -ENOENT) { rc = 0; } cdata->m_multiAioCompl->complete_request(rc); - cdata->put(); } static void rados_req_remove_safe(rados_completion_t c, void *arg) { - auto cdata = reinterpret_cast<RadosRemoveCompletionData*>(arg); + auto cdata = ceph::ref_t<RadosRemoveCompletionData>(static_cast<RadosRemoveCompletionData*>(arg), false); int rc = rados_aio_get_return_value(c); // in case the object did not exist, it means we had a sparse file, all is fine if (rc == -ENOENT) { rc = 0; } cdata->m_multiAioCompl->safe_request(rc); - cdata->put(); } static void striper_remove_aio_req_complete(rados_striper_multi_completion_t c, void *arg) { - auto cdata = reinterpret_cast<RemoveCompletionData*>(arg); + auto cdata = ceph::ref_t<RemoveCompletionData>(static_cast<RemoveCompletionData*>(arg), false); libradosstriper::MultiAioCompletionImpl *comp = reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c); ldout(cdata->m_striper->cct(), 10) @@ -968,7 +967,6 @@ static void striper_remove_aio_req_complete(rados_striper_multi_completion_t c, << dendl; } cdata->complete(rc); - cdata->put(); } int libradosstriper::RadosStriperImpl::remove(const std::string& soid, int flags) @@ -996,10 +994,10 @@ int libradosstriper::RadosStriperImpl::aio_remove(const std::string& soid, int rc = m_ioCtx.lock_exclusive(getObjectId(soid, 0), RADOS_LOCK_NAME, lockCookie, "", 0, 0); if (rc) return rc; // create CompletionData for the async remove call - RemoveCompletionData *cdata = new RemoveCompletionData(this, soid, lockCookie, c, flags); + auto cdata = ceph::make_ref<RemoveCompletionData>(this, soid, lockCookie, c, flags); MultiAioCompletionImplPtr multi_completion{ new libradosstriper::MultiAioCompletionImpl, false}; - multi_completion->set_complete_callback(cdata, striper_remove_aio_req_complete); + multi_completion->set_complete_callback(cdata->get() /* create ref! */, striper_remove_aio_req_complete); // call asynchronous internal version of remove ldout(cct(), 10) << "RadosStriperImpl : Aio_remove starting for " @@ -1054,10 +1052,9 @@ int libradosstriper::RadosStriperImpl::internal_aio_remove( int rcr = 0; for (int i = nb_objects-1; i >= 1; i--) { multi_completion->add_request(); - RadosRemoveCompletionData *data = - new RadosRemoveCompletionData(multi_completion, cct()); + auto data = ceph::make_ref<RadosRemoveCompletionData>(multi_completion, cct()); librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(data, + librados::Rados::aio_create_completion(data->get() /* create ref! */, rados_req_remove_complete, rados_req_remove_safe); if (flags == 0) { @@ -1142,32 +1139,29 @@ void libradosstriper::RadosStriperImpl::aio_unlockObject(const std::string& soid static void rados_write_aio_unlock_complete(rados_striper_multi_completion_t c, void *arg) { - auto cdata = reinterpret_cast<WriteCompletionData*>(arg); + auto cdata = ceph::ref_t<WriteCompletionData>(static_cast<WriteCompletionData*>(arg), false); libradosstriper::MultiAioCompletionImpl *comp = reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c); cdata->complete_unlock(comp->rval); - cdata->put(); } static void striper_write_aio_req_complete(rados_striper_multi_completion_t c, void *arg) { - auto cdata = reinterpret_cast<WriteCompletionData*>(arg); + auto cdata = ceph::ref_t<WriteCompletionData>(static_cast<WriteCompletionData*>(arg), false); // launch the async unlocking of the object cdata->m_striper->aio_unlockObject(cdata->m_soid, cdata->m_lockCookie, cdata->m_unlockCompletion); // complete the write part in parallel libradosstriper::MultiAioCompletionImpl *comp = reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c); cdata->complete_write(comp->rval); - cdata->put(); } static void striper_write_aio_req_safe(rados_striper_multi_completion_t c, void *arg) { - auto cdata = reinterpret_cast<WriteCompletionData*>(arg); + auto cdata = ceph::ref_t<WriteCompletionData>(static_cast<WriteCompletionData*>(arg), false); libradosstriper::MultiAioCompletionImpl *comp = reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c); cdata->safe(comp->rval); - cdata->put(); } int libradosstriper::RadosStriperImpl::write_in_open_object(const std::string& soid, @@ -1179,17 +1173,16 @@ int libradosstriper::RadosStriperImpl::write_in_open_object(const std::string& s // create a completion object to be passed to the callbacks of the multicompletion // we need 3 references as striper_write_aio_req_complete will release two and // striper_write_aio_req_safe will release one - WriteCompletionData *cdata = new WriteCompletionData(this, soid, lockCookie, 0, 3); - cdata->get(); // local ref + auto cdata = ceph::make_ref<WriteCompletionData>(this, soid, lockCookie, nullptr); // create a completion object for the unlocking of the striped object at the end of the write librados::AioCompletion *unlock_completion = - librados::Rados::aio_create_completion(cdata, rados_write_aio_unlock_complete, 0); + librados::Rados::aio_create_completion(cdata->get() /* create ref! */, rados_write_aio_unlock_complete, 0); cdata->m_unlockCompletion = unlock_completion; // create the multicompletion that will handle the write completion MultiAioCompletionImplPtr c{new libradosstriper::MultiAioCompletionImpl, false}; - c->set_complete_callback(cdata, striper_write_aio_req_complete); - c->set_safe_callback(cdata, striper_write_aio_req_safe); + c->set_complete_callback(cdata->get() /* create ref! */, striper_write_aio_req_complete); + c->set_safe_callback(cdata->get() /* create ref! */, striper_write_aio_req_safe); // call the asynchronous API int rc = internal_aio_write(soid, c, bl, len, off, layout); if (!rc) { @@ -1201,7 +1194,6 @@ int libradosstriper::RadosStriperImpl::write_in_open_object(const std::string& s // return result rc = c->get_return_value(); } - cdata->put(); return rc; } @@ -1215,22 +1207,20 @@ int libradosstriper::RadosStriperImpl::aio_write_in_open_object(const std::strin // create a completion object to be passed to the callbacks of the multicompletion // we need 3 references as striper_write_aio_req_complete will release two and // striper_write_aio_req_safe will release one - WriteCompletionData *cdata = new WriteCompletionData(this, soid, lockCookie, c, 3); - cdata->get(); // local ref + auto cdata = ceph::make_ref<WriteCompletionData>(this, soid, lockCookie, c); m_ioCtxImpl->get(); c->io = m_ioCtxImpl; // create a completion object for the unlocking of the striped object at the end of the write librados::AioCompletion *unlock_completion = - librados::Rados::aio_create_completion(cdata, rados_write_aio_unlock_complete, 0); + librados::Rados::aio_create_completion(cdata->get() /* create ref! */, rados_write_aio_unlock_complete, 0); cdata->m_unlockCompletion = unlock_completion; // create the multicompletion that will handle the write completion libradosstriper::MultiAioCompletionImplPtr nc{ new libradosstriper::MultiAioCompletionImpl, false}; - nc->set_complete_callback(cdata, striper_write_aio_req_complete); - nc->set_safe_callback(cdata, striper_write_aio_req_safe); + nc->set_complete_callback(cdata->get() /* create ref! */, striper_write_aio_req_complete); + nc->set_safe_callback(cdata->get() /* create ref! */, striper_write_aio_req_safe); // internal asynchronous API int rc = internal_aio_write(soid, nc, bl, len, off, layout); - cdata->put(); return rc; } @@ -1503,7 +1493,7 @@ int libradosstriper::RadosStriperImpl::createAndOpenStripedObject(const std::str static void striper_truncate_aio_req_complete(rados_striper_multi_completion_t c, void *arg) { - auto cdata = reinterpret_cast<TruncateCompletionData*>(arg); + auto cdata = ceph::ref_t<TruncateCompletionData>(static_cast<TruncateCompletionData*>(arg), false); libradosstriper::MultiAioCompletionImpl *comp = reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c); if (0 == comp->rval) { @@ -1514,7 +1504,6 @@ static void striper_truncate_aio_req_complete(rados_striper_multi_completion_t c bl.append(oss.str()); cdata->m_striper->setxattr(cdata->m_soid, XATTR_SIZE, bl); } - cdata->put(); } int libradosstriper::RadosStriperImpl::truncate(const std::string& soid, @@ -1522,10 +1511,10 @@ int libradosstriper::RadosStriperImpl::truncate(const std::string& soid, uint64_t size, ceph_file_layout &layout) { - TruncateCompletionData *cdata = new TruncateCompletionData(this, soid, size); + auto cdata = ceph::make_ref<TruncateCompletionData>(this, soid, size); libradosstriper::MultiAioCompletionImplPtr multi_completion{ new libradosstriper::MultiAioCompletionImpl, false}; - multi_completion->set_complete_callback(cdata, striper_truncate_aio_req_complete); + multi_completion->set_complete_callback(cdata->get() /* create ref! */, striper_truncate_aio_req_complete); // call asynchrous version of truncate int rc = aio_truncate(soid, multi_completion, original_size, size, layout); // wait for completion of the truncation @@ -1573,10 +1562,9 @@ int libradosstriper::RadosStriperImpl::aio_truncate if (exists) { // remove asynchronously multi_completion->add_request(); - RadosRemoveCompletionData *data = - new RadosRemoveCompletionData(multi_completion, cct()); + auto data = ceph::make_ref<RadosRemoveCompletionData>(multi_completion, cct()); librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(data, + librados::Rados::aio_create_completion(data->get() /* create ref! */, rados_req_remove_complete, rados_req_remove_safe); int rc = m_ioCtx.aio_remove(getObjectId(soid, objectno), rados_completion); @@ -1608,10 +1596,9 @@ int libradosstriper::RadosStriperImpl::aio_truncate } else { // removes are asynchronous in order to speed up truncations of big files multi_completion->add_request(); - RadosRemoveCompletionData *data = - new RadosRemoveCompletionData(multi_completion, cct()); + auto data = ceph::make_ref<RadosRemoveCompletionData>(multi_completion, cct()); librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(data, + librados::Rados::aio_create_completion(data->get() /* create ref! */, rados_req_remove_complete, rados_req_remove_safe); rc = m_ioCtx.aio_remove(getObjectId(soid, objectno), rados_completion); diff --git a/src/librbd/DeepCopyRequest.cc b/src/librbd/DeepCopyRequest.cc index c96fa93bed6..fee8b0274f1 100644 --- a/src/librbd/DeepCopyRequest.cc +++ b/src/librbd/DeepCopyRequest.cc @@ -33,7 +33,7 @@ DeepCopyRequest<I>::DeepCopyRequest(I *src_image_ctx, I *dst_image_ctx, ContextWQ *work_queue, SnapSeqs *snap_seqs, ProgressContext *prog_ctx, Context *on_finish) - : RefCountedObject(dst_image_ctx->cct, 1), m_src_image_ctx(src_image_ctx), + : RefCountedObject(dst_image_ctx->cct), m_src_image_ctx(src_image_ctx), m_dst_image_ctx(dst_image_ctx), m_snap_id_start(snap_id_start), m_snap_id_end(snap_id_end), m_flatten(flatten), m_object_number(object_number), m_work_queue(work_queue), diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index a467178a922..bdd18f99d21 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -254,13 +254,6 @@ private: ReplayHandler(Journal *_journal) : journal(_journal) { } - void get() override { - // TODO - } - void put() override { - // TODO - } - void handle_entries_available() override { journal->handle_replay_ready(); } diff --git a/src/librbd/deep_copy/ImageCopyRequest.cc b/src/librbd/deep_copy/ImageCopyRequest.cc index f1e95c1bb4a..2fd2ed4a40a 100644 --- a/src/librbd/deep_copy/ImageCopyRequest.cc +++ b/src/librbd/deep_copy/ImageCopyRequest.cc @@ -30,7 +30,7 @@ ImageCopyRequest<I>::ImageCopyRequest(I *src_image_ctx, I *dst_image_ctx, const SnapSeqs &snap_seqs, ProgressContext *prog_ctx, Context *on_finish) - : RefCountedObject(dst_image_ctx->cct, 1), m_src_image_ctx(src_image_ctx), + : RefCountedObject(dst_image_ctx->cct), m_src_image_ctx(src_image_ctx), m_dst_image_ctx(dst_image_ctx), m_snap_id_start(snap_id_start), m_snap_id_end(snap_id_end), m_flatten(flatten), m_object_number(object_number), m_snap_seqs(snap_seqs), diff --git a/src/librbd/deep_copy/SnapshotCopyRequest.cc b/src/librbd/deep_copy/SnapshotCopyRequest.cc index 0eaaa477860..25d24b06246 100644 --- a/src/librbd/deep_copy/SnapshotCopyRequest.cc +++ b/src/librbd/deep_copy/SnapshotCopyRequest.cc @@ -49,7 +49,7 @@ SnapshotCopyRequest<I>::SnapshotCopyRequest(I *src_image_ctx, bool flatten, ContextWQ *work_queue, SnapSeqs *snap_seqs, Context *on_finish) - : RefCountedObject(dst_image_ctx->cct, 1), m_src_image_ctx(src_image_ctx), + : RefCountedObject(dst_image_ctx->cct), m_src_image_ctx(src_image_ctx), m_dst_image_ctx(dst_image_ctx), m_snap_id_end(snap_id_end), m_flatten(flatten), m_work_queue(work_queue), m_snap_seqs_result(snap_seqs), m_snap_seqs(*snap_seqs), m_on_finish(on_finish), m_cct(dst_image_ctx->cct), diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index ace4366dfdc..ba6c7f8c4a8 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -168,7 +168,7 @@ entity_addrvec_t DaemonServer::get_myaddrs() const int DaemonServer::ms_handle_authentication(Connection *con) { - MgrSession *s = new MgrSession(cct); + auto s = ceph::make_ref<MgrSession>(cct); con->set_priv(s); s->inst.addr = con->get_peer_addr(); s->entity_name = con->peer_name; diff --git a/src/mgr/DaemonState.h b/src/mgr/DaemonState.h index ff8bf8f2c41..8c5a8021eaf 100644 --- a/src/mgr/DaemonState.h +++ b/src/mgr/DaemonState.h @@ -211,10 +211,6 @@ struct DeviceState : public RefCountedObject pair<utime_t,utime_t> life_expectancy; ///< when device failure is expected utime_t life_expectancy_stamp; ///< when life expectency was recorded - DeviceState(const std::string& n) - : RefCountedObject(nullptr, 0), - devid(n) {} - void set_metadata(map<string,string>&& m); void set_life_expectancy(utime_t from, utime_t to, utime_t now); @@ -229,9 +225,11 @@ struct DeviceState : public RefCountedObject void dump(Formatter *f) const; void print(ostream& out) const; -}; -typedef boost::intrusive_ptr<DeviceState> DeviceStateRef; +private: + FRIEND_MAKE_REF(DeviceState); + DeviceState(const std::string& n) : devid(n) {} +}; /** * Fuse the collection of per-daemon metadata from Ceph into @@ -248,19 +246,19 @@ private: DaemonStateCollection all; std::set<DaemonKey> updating; - std::map<std::string,DeviceStateRef> devices; + std::map<std::string,ceph::ref_t<DeviceState>> devices; void _erase(const DaemonKey& dmk); - DeviceStateRef _get_or_create_device(const std::string& dev) { - auto p = devices.find(dev); - if (p != devices.end()) { - return p->second; + ceph::ref_t<DeviceState> _get_or_create_device(const std::string& dev) { + auto em = devices.try_emplace(dev, nullptr); + auto& d = em.first->second; + if (em.second) { + d = ceph::make_ref<DeviceState>(dev); } - devices[dev] = new DeviceState(dev); - return devices[dev]; + return d; } - void _erase_device(DeviceStateRef d) { + void _erase_device(const ceph::ref_t<DeviceState>& d) { devices.erase(d->devid); } diff --git a/src/mgr/MgrSession.h b/src/mgr/MgrSession.h index c61a80b63e7..aacbeb8209f 100644 --- a/src/mgr/MgrSession.h +++ b/src/mgr/MgrSession.h @@ -25,15 +25,17 @@ struct MgrSession : public RefCountedObject { std::set<std::string> declared_types; - explicit MgrSession(CephContext *cct) : RefCountedObject(cct, 0) {} - ~MgrSession() override {} - const entity_addr_t& get_peer_addr() { return inst.addr; } + +private: + FRIEND_MAKE_REF(MgrSession); + explicit MgrSession(CephContext *cct) : RefCountedObject(cct) {} + ~MgrSession() override = default; }; -typedef boost::intrusive_ptr<MgrSession> MgrSessionRef; +using MgrSessionRef = ceph::ref_t<MgrSession>; #endif diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 8d37cbd9161..1ed7d3f6f42 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -3979,10 +3979,6 @@ void Monitor::forward_request_leader(MonOpRequestRef op) // fake connection attached to forwarded messages struct AnonConnection : public Connection { entity_addr_t socket_addr; - explicit AnonConnection(CephContext *cct, - const entity_addr_t& sa) - : Connection(cct, NULL), - socket_addr(sa) {} int send_message(Message *m) override { ceph_assert(!"send_message on anonymous connection"); @@ -4000,6 +3996,12 @@ struct AnonConnection : public Connection { entity_addr_t get_peer_socket_addr() const override { return socket_addr; } + +private: + FRIEND_MAKE_REF(AnonConnection); + explicit AnonConnection(CephContext *cct, const entity_addr_t& sa) + : Connection(cct, nullptr), + socket_addr(sa) {} }; //extract the original message and put it into the regular dispatch function @@ -4022,7 +4024,7 @@ void Monitor::handle_forward(MonOpRequestRef op) PaxosServiceMessage *req = m->claim_message(); ceph_assert(req != NULL); - ConnectionRef c(new AnonConnection(cct, m->client_socket_addr)); + auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr); MonSession *s = new MonSession(static_cast<Connection*>(c.get())); s->_ident(req->get_source(), req->get_source_addrs()); diff --git a/src/msg/Connection.h b/src/msg/Connection.h index 840a3fba67f..95f89fd19b8 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -18,12 +18,11 @@ #include <stdlib.h> #include <ostream> -#include <boost/intrusive_ptr.hpp> - #include "auth/Auth.h" #include "common/RefCountedObj.h" #include "common/config.h" #include "common/debug.h" +#include "common/ref.h" #include "common/ceph_mutex.h" #include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert... #include "include/buffer.h" @@ -41,21 +40,21 @@ class Messenger; class Interceptor; #endif -struct Connection : public RefCountedObject { +struct Connection : public RefCountedObjectSafe { mutable ceph::mutex lock = ceph::make_mutex("Connection::lock"); Messenger *msgr; RefCountedPtr priv; - int peer_type; + int peer_type = -1; int64_t peer_id = -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567 safe_item_history<entity_addrvec_t> peer_addrs; utime_t last_keepalive, last_keepalive_ack; private: - uint64_t features; + uint64_t features = 0; public: - bool is_loopback; - bool failed; // true if we are a lossy connection that has failed. + bool is_loopback = false; + bool failed = false; // true if we are a lossy connection that has failed. - int rx_buffers_version; + int rx_buffers_version = 0; std::map<ceph_tid_t,std::pair<ceph::buffer::list, int>> rx_buffers; // authentication state @@ -69,26 +68,9 @@ public: Interceptor *interceptor; #endif - friend class boost::intrusive_ptr<Connection>; friend class PipeConnection; public: - Connection(CephContext *cct, Messenger *m) - // we are managed exclusively by ConnectionRef; make it so you can - // ConnectionRef foo = new Connection; - : RefCountedObject(cct, 0), - msgr(m), - peer_type(-1), - features(0), - is_loopback(false), - failed(false), - rx_buffers_version(0) { - } - - ~Connection() override { - //generic_dout(0) << "~Connection " << this << dendl; - } - void set_priv(const RefCountedPtr& o) { std::lock_guard l{lock}; priv = o; @@ -254,9 +236,18 @@ public: last_keepalive_ack = t; } bool is_blackhole() const; -}; -typedef boost::intrusive_ptr<Connection> ConnectionRef; +protected: + Connection(CephContext *cct, Messenger *m) + : RefCountedObjectSafe(cct), + msgr(m) + {} + + ~Connection() override { + //generic_dout(0) << "~Connection " << this << dendl; + } +}; +using ConnectionRef = ceph::ref_t<Connection>; #endif /* CEPH_CONNECTION_H */ diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b86035eca26..3e3fae80030 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -113,7 +113,8 @@ class C_tick_wakeup : public EventCallback { AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w, bool m2, bool local) - : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), + : Connection(cct, m), + delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), state(STATE_NONE), port(-1), dispatch_queue(q), recv_buf(NULL), diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3ce26e6d52a..6b2f0e98b0c 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -51,7 +51,6 @@ static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); * sequence, try to reconnect peer endpoint. */ class AsyncConnection : public Connection { - ssize_t read(unsigned len, char *buffer, std::function<void(char *, ssize_t)> callback); ssize_t read_until(unsigned needed, char *p); @@ -106,10 +105,12 @@ class AsyncConnection : public Connection { void flush(); } *delay_state; - public: +private: + FRIEND_MAKE_REF(AsyncConnection); AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w, bool is_msgr2, bool local); ~AsyncConnection() override; +public: void maybe_start_delay_thread(); ostream& _conn_prefix(std::ostream *_dout); @@ -235,6 +236,6 @@ class AsyncConnection : public Connection { friend class ProtocolV2; }; /* AsyncConnection */ -typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef; +using AsyncConnectionRef = ceph::ref_t<AsyncConnection>; #endif diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index fc1107279e6..3556c4960f5 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -292,7 +292,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, stack = single->stack.get(); stack->start(); local_worker = stack->get_worker(); - local_connection = new AsyncConnection(cct, this, &dispatch_queue, + local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, local_worker, true, true); init_local_connection(); reap_handler = new C_handle_reap(this); @@ -556,7 +556,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, const entity_addr_t &peer_addr) { std::lock_guard l{lock}; - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, + auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w, listen_addr.is_msgr2(), false); conn->accept(std::move(cli_socket), listen_addr, peer_addr); accepting_conns.insert(conn); @@ -585,7 +585,7 @@ AsyncConnectionRef AsyncMessenger::create_connect( // create connection Worker *w = stack->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, + auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w, target.is_msgr2(), false); conn->connect(addrs, type, target); ceph_assert(!conns.count(addrs)); diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index b60993daab0..d3d13d04ba3 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -124,10 +124,6 @@ public: struct CollectionImpl : public RefCountedObject { const coll_t cid; - CollectionImpl(const coll_t& c) - : RefCountedObject(NULL, 0), - cid(c) {} - /// wait for any queued transactions to apply // block until any previous transactions are visible. specifically, // collection_list and collection_empty need to reflect prior operations. @@ -149,8 +145,12 @@ public: const coll_t &get_cid() { return cid; } + protected: + CollectionImpl() = delete; + CollectionImpl(CephContext* cct, const coll_t& c) : RefCountedObject(cct), cid(c) {} + ~CollectionImpl() = default; }; - typedef boost::intrusive_ptr<CollectionImpl> CollectionHandle; + using CollectionHandle = ceph::ref_t<CollectionImpl>; /********************************* diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index 2841e3daa24..9430b654b9b 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -463,7 +463,7 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout) dout(1) << __func__ << " uuid " << super.uuid << dendl; // init log - FileRef log_file = new File; + FileRef log_file = ceph::make_ref<File>(); log_file->fnode.ino = 1; log_file->fnode.prefer_bdev = BDEV_WAL; int r = _allocate( @@ -1060,7 +1060,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) if (!noop) { map<string,DirRef>::iterator q = dir_map.find(dirname); ceph_assert(q == dir_map.end()); - dir_map[dirname] = new Dir; + dir_map[dirname] = ceph::make_ref<Dir>(); } } break; @@ -1465,7 +1465,7 @@ BlueFS::FileRef BlueFS::_get_file(uint64_t ino) { auto p = file_map.find(ino); if (p == file_map.end()) { - FileRef f = new File; + FileRef f = ceph::make_ref<File>(); file_map[ino] = f; dout(30) << __func__ << " ino " << ino << " = " << f << " (new)" << dendl; @@ -1959,7 +1959,7 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l) // create a new log [writer] so that we know compaction is in progress // (see _should_compact_log) - new_log = new File; + new_log = ceph::make_ref<File>(); new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode // 0. wait for any racing flushes to complete. (We do not want to block @@ -2833,7 +2833,7 @@ int BlueFS::open_for_write( << " does not exist" << dendl; return -ENOENT; } - file = new File; + file = ceph::make_ref<File>(); file->fnode.ino = ++ino_last; file_map[ino_last] = file; dir->file_map[filename] = file; @@ -3012,7 +3012,7 @@ int BlueFS::mkdir(const string& dirname) dout(20) << __func__ << " dir " << dirname << " exists" << dendl; return -EEXIST; } - dir_map[dirname] = new Dir; + dir_map[dirname] = ceph::make_ref<Dir>(); log_t.op_dir_create(dirname); return 0; } @@ -3085,12 +3085,12 @@ int BlueFS::lock_file(const string& dirname, const string& filename, } DirRef dir = p->second; map<string,FileRef>::iterator q = dir->file_map.find(filename); - File *file; + FileRef file; if (q == dir->file_map.end()) { dout(20) << __func__ << " dir " << dirname << " (" << dir << ") file " << filename << " not found, creating" << dendl; - file = new File; + file = ceph::make_ref<File>(); file->fnode.ino = ++ino_last; file->fnode.mtime = ceph_clock_now(); file_map[ino_last] = file; @@ -3099,7 +3099,7 @@ int BlueFS::lock_file(const string& dirname, const string& filename, log_t.op_file_update(file->fnode); log_t.op_dir_link(dirname, filename, file->fnode.ino); } else { - file = q->second.get(); + file = q->second; if (file->locked) { dout(10) << __func__ << " already locked" << dendl; return -ENOLCK; diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index 07e3334902e..14dc7bdf906 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -7,11 +7,13 @@ #include <mutex> #include "bluefs_types.h" -#include "common/RefCountedObj.h" #include "BlockDevice.h" +#include "common/RefCountedObj.h" +#include "common/ceph_context.h" +#include "global/global_context.h" + #include "boost/intrusive/list.hpp" -#include <boost/intrusive_ptr.hpp> class PerfCounters; @@ -101,8 +103,10 @@ public: std::atomic_int num_readers, num_writers; std::atomic_int num_reading; + private: + FRIEND_MAKE_REF(File); File() - : RefCountedObject(NULL, 0), + : refs(0), dirty_seq(0), locked(false), @@ -117,15 +121,8 @@ public: ceph_assert(num_reading.load() == 0); ceph_assert(!locked); } - - friend void intrusive_ptr_add_ref(File *f) { - f->get(); - } - friend void intrusive_ptr_release(File *f) { - f->put(); - } }; - typedef boost::intrusive_ptr<File> FileRef; + using FileRef = ceph::ref_t<File>; typedef boost::intrusive::list< File, @@ -139,22 +136,17 @@ public: mempool::bluefs::map<string,FileRef> file_map; - Dir() : RefCountedObject(NULL, 0) {} - - friend void intrusive_ptr_add_ref(Dir *d) { - d->get(); - } - friend void intrusive_ptr_release(Dir *d) { - d->put(); - } + private: + FRIEND_MAKE_REF(Dir); + Dir() = default; }; - typedef boost::intrusive_ptr<Dir> DirRef; + using DirRef = ceph::ref_t<Dir>; struct FileWriter { MEMPOOL_CLASS_HELPERS(); FileRef file; - uint64_t pos; ///< start offset for buffer + uint64_t pos = 0; ///< start offset for buffer bufferlist buffer; ///< new data to write (at end of file) bufferlist tail_block; ///< existing partial block at end of file, if any bufferlist::page_aligned_appender buffer_appender; //< for const char* only @@ -167,7 +159,6 @@ public: FileWriter(FileRef f) : file(f), - pos(0), buffer_appender(buffer.get_page_aligned_appender( g_conf()->bluefs_alloc_size / CEPH_PAGE_SIZE)) { ++file->num_writers; diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index e2c39e33f8a..872b29eb205 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -3502,7 +3502,7 @@ void BlueStore::DeferredBatch::_audit(CephContext *cct) #define dout_prefix *_dout << "bluestore(" << store->path << ").collection(" << cid << " " << this << ") " BlueStore::Collection::Collection(BlueStore *store_, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t cid) - : CollectionImpl(cid), + : CollectionImpl(store_->cct, cid), store(store_), cache(bc), exists(true), @@ -5982,12 +5982,11 @@ int BlueStore::_open_collections() it->next()) { coll_t cid; if (cid.parse(it->key())) { - CollectionRef c( - new Collection( + auto c = ceph::make_ref<Collection>( this, onode_cache_shards[cid.hash_to_shard(onode_cache_shards.size())], buffer_cache_shards[cid.hash_to_shard(buffer_cache_shards.size())], - cid)); + cid); bufferlist bl = it->value(); auto p = bl.cbegin(); try { @@ -9197,13 +9196,13 @@ ObjectStore::CollectionHandle BlueStore::create_new_collection( const coll_t& cid) { std::unique_lock l{coll_lock}; - Collection *c = new Collection( + auto c = ceph::make_ref<Collection>( this, onode_cache_shards[cid.hash_to_shard(onode_cache_shards.size())], buffer_cache_shards[cid.hash_to_shard(buffer_cache_shards.size())], cid); new_coll_map[cid] = c; - _osr_attach(c); + _osr_attach(c.get()); return c; } @@ -11345,7 +11344,7 @@ void BlueStore::_osr_attach(Collection *c) std::lock_guard l(zombie_osr_lock); auto p = zombie_osr_set.find(c->cid); if (p == zombie_osr_set.end()) { - c->osr = new OpSequencer(this, c->cid); + c->osr = ceph::make_ref<OpSequencer>(this, c->cid); ldout(cct, 10) << __func__ << " " << c->cid << " fresh osr " << c->osr << dendl; } else { diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 4b98c83bfe6..48ddc1299c4 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -1266,7 +1266,7 @@ public: }; class OpSequencer; - typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef; + using OpSequencerRef = ceph::ref_t<OpSequencer>; struct Collection : public CollectionImpl { BlueStore *store; @@ -1659,14 +1659,6 @@ public: std::atomic_bool zombie = {false}; ///< in zombie_osr set (collection going away) - OpSequencer(BlueStore *store, const coll_t& c) - : RefCountedObject(store->cct, 0), - store(store), cid(c) { - } - ~OpSequencer() { - ceph_assert(q.empty()); - } - void queue_new(TransContext *txc) { std::lock_guard l(qlock); txc->seq = ++last_seq; @@ -1747,6 +1739,15 @@ public: txc->oncommits.push_back(c); return false; } + private: + FRIEND_MAKE_REF(OpSequencer); + OpSequencer(BlueStore *store, const coll_t& c) + : RefCountedObject(store->cct), + store(store), cid(c) { + } + ~OpSequencer() { + ceph_assert(q.empty()); + } }; typedef boost::intrusive::list< diff --git a/src/os/filestore/FileStore.cc b/src/os/filestore/FileStore.cc index 30889581a1a..42d4b313069 100644 --- a/src/os/filestore/FileStore.cc +++ b/src/os/filestore/FileStore.cc @@ -2016,7 +2016,7 @@ void FileStore::init_temp_collections() for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) { if (p->is_temp()) continue; - coll_map[*p] = new OpSequencer(cct, ++next_osr_id, *p); + coll_map[*p] = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, *p); if (p->is_meta()) continue; coll_t temp = p->get_temp(); @@ -2130,7 +2130,7 @@ ObjectStore::CollectionHandle FileStore::create_new_collection(const coll_t& c) std::lock_guard l{coll_lock}; auto p = coll_map.find(c); if (p == coll_map.end()) { - auto *r = new OpSequencer(cct, ++next_osr_id, c); + auto r = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, c); coll_map[c] = r; return r; } else { diff --git a/src/os/filestore/FileStore.h b/src/os/filestore/FileStore.h index ffd39a86003..e0a21e619a0 100644 --- a/src/os/filestore/FileStore.h +++ b/src/os/filestore/FileStore.h @@ -359,8 +359,10 @@ private: } } + private: + FRIEND_MAKE_REF(OpSequencer); OpSequencer(CephContext* cct, int i, coll_t cid) - : CollectionImpl(cid), + : CollectionImpl(cct, cid), cct(cct), osr_name_str(stringify(cid)), id(i), diff --git a/src/os/kstore/KStore.cc b/src/os/kstore/KStore.cc index db28d355a8a..ce18d967701 100644 --- a/src/os/kstore/KStore.cc +++ b/src/os/kstore/KStore.cc @@ -565,7 +565,7 @@ int KStore::OnodeHashLRU::trim(int max) #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") " KStore::Collection::Collection(KStore *ns, coll_t cid) - : CollectionImpl(cid), + : CollectionImpl(ns->cct, cid), store(ns), osr(new OpSequencer()), onode_map(store->cct) @@ -895,7 +895,7 @@ int KStore::_open_collections(int *errors) it->next()) { coll_t cid; if (cid.parse(it->key())) { - CollectionRef c(new Collection(this, cid)); + auto c = ceph::make_ref<Collection>(this, cid); bufferlist bl = it->value(); auto p = bl.cbegin(); try { @@ -1107,7 +1107,7 @@ ObjectStore::CollectionHandle KStore::open_collection(const coll_t& cid) ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid) { - auto *c = new Collection(this, cid); + auto c = ceph::make_ref<Collection>(this, cid); std::unique_lock l{coll_lock}; new_coll_map[cid] = c; return c; diff --git a/src/os/kstore/KStore.h b/src/os/kstore/KStore.h index c79a586962c..06d2d61f200 100644 --- a/src/os/kstore/KStore.h +++ b/src/os/kstore/KStore.h @@ -161,9 +161,11 @@ public: void flush() override; bool flush_commit(Context *c) override; + private: + FRIEND_MAKE_REF(Collection); Collection(KStore *ns, coll_t c); }; - typedef boost::intrusive_ptr<Collection> CollectionRef; + using CollectionRef = ceph::ref_t<Collection>; class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl { CollectionRef c; diff --git a/src/os/memstore/MemStore.cc b/src/os/memstore/MemStore.cc index a4d1e981495..05d16edb6cc 100644 --- a/src/os/memstore/MemStore.cc +++ b/src/os/memstore/MemStore.cc @@ -157,7 +157,7 @@ int MemStore::_load() int r = cbl.read_file(fn.c_str(), &err); if (r < 0) return r; - CollectionRef c(new Collection(cct, *q)); + auto c = ceph::make_ref<Collection>(cct, *q); auto p = cbl.cbegin(); c->decode(p); coll_map[*q] = c; @@ -258,7 +258,7 @@ MemStore::CollectionRef MemStore::get_collection(const coll_t& cid) ObjectStore::CollectionHandle MemStore::create_new_collection(const coll_t& cid) { std::lock_guard l{coll_lock}; - Collection *c = new Collection(cct, cid); + auto c = ceph::make_ref<Collection>(cct, cid); new_coll_map[cid] = c; return c; } @@ -1573,8 +1573,6 @@ struct MemStore::PageSetObject : public Object { static thread_local PageSet::page_vector tls_pages; #endif - explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {} - size_t get_size() const override { return data_len; } int read(uint64_t offset, uint64_t len, bufferlist &bl) override; @@ -1597,6 +1595,10 @@ struct MemStore::PageSetObject : public Object { decode_base(p); DECODE_FINISH(p); } + +private: + FRIEND_MAKE_REF(PageSetObject); + explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {} }; #if defined(__GLIBCXX__) @@ -1800,6 +1802,6 @@ int MemStore::PageSetObject::truncate(uint64_t size) MemStore::ObjectRef MemStore::Collection::create_object() const { if (use_page_set) - return new PageSetObject(cct->_conf->memstore_page_size); + return ceph::make_ref<PageSetObject>(cct->_conf->memstore_page_size); return new BufferlistObject(); } diff --git a/src/os/memstore/MemStore.h b/src/os/memstore/MemStore.h index ec7cf53b501..55f55ee4bfa 100644 --- a/src/os/memstore/MemStore.h +++ b/src/os/memstore/MemStore.h @@ -36,11 +36,8 @@ public: bufferlist omap_header; map<string,bufferlist> omap; - typedef boost::intrusive_ptr<Object> Ref; - friend void intrusive_ptr_add_ref(Object *o) { o->get(); } - friend void intrusive_ptr_release(Object *o) { o->put(); } + using Ref = ceph::ref_t<Object>; - Object() : RefCountedObject(nullptr, 0) {} // interface for object data virtual size_t get_size() const = 0; virtual int read(uint64_t offset, uint64_t len, bufferlist &bl) = 0; @@ -90,8 +87,10 @@ public: } f->close_section(); } + protected: + Object() = default; }; - typedef Object::Ref ObjectRef; + using ObjectRef = Object::Ref; struct PageSetObject; struct Collection : public CollectionImpl { @@ -110,8 +109,6 @@ public: ceph::make_mutex("MemStore::Collection::sequencer_mutex")}; typedef boost::intrusive_ptr<Collection> Ref; - friend void intrusive_ptr_add_ref(Collection *c) { c->get(); } - friend void intrusive_ptr_release(Collection *c) { c->put(); } ObjectRef create_object() const; @@ -184,8 +181,10 @@ public: return true; } + private: + FRIEND_MAKE_REF(Collection); explicit Collection(CephContext *cct, coll_t c) - : CollectionImpl(c), + : CollectionImpl(cct, c), cct(cct), use_page_set(cct->_conf->memstore_page_set) {} }; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 2d8760e0093..96aed0b706e 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -450,7 +450,7 @@ HeartbeatStampsRef OSDService::get_hb_stamps(unsigned peer) hb_stamps.resize(peer + 1); } if (!hb_stamps[peer]) { - hb_stamps[peer].reset(new HeartbeatStamps(peer)); + hb_stamps[peer] = ceph::make_ref<HeartbeatStamps>(peer); } return hb_stamps[peer]; } @@ -4539,20 +4539,18 @@ void OSD::_add_heartbeat_peer(int p) auto stamps = service.get_hb_stamps(p); - Session *sb = new Session(cct, cons.first.get()); + auto sb = ceph::make_ref<Session>(cct, cons.first.get()); sb->peer = p; sb->stamps = stamps; - RefCountedPtr sbref{sb, false}; hi->hb_interval_start = ceph_clock_now(); hi->con_back = cons.first.get(); - hi->con_back->set_priv(sbref); + hi->con_back->set_priv(sb); - Session *sf = new Session(cct, cons.second.get()); + auto sf = ceph::make_ref<Session>(cct, cons.second.get()); sf->peer = p; sf->stamps = stamps; - RefCountedPtr sfref{sf, false}; hi->con_front = cons.second.get(); - hi->con_front->set_priv(sfref); + hi->con_front->set_priv(sf); dout(10) << "_add_heartbeat_peer: new peer osd." << p << " " << hi->con_back->get_peer_addr() @@ -5691,11 +5689,9 @@ void OSD::ms_handle_fast_connect(Connection *con) { if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON && con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) { - auto priv = con->get_priv(); - auto s = static_cast<Session*>(priv.get()); - if (!s) { - s = new Session{cct, con}; - con->set_priv(RefCountedPtr{s, false}); + if (auto s = ceph::ref_cast<Session>(con->get_priv()); !s) { + s = ceph::make_ref<Session>(cct, con); + con->set_priv(s); dout(10) << " new session (outgoing) " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl; // we don't connect to clients @@ -5709,11 +5705,9 @@ void OSD::ms_handle_fast_accept(Connection *con) { if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON && con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) { - auto priv = con->get_priv(); - auto s = static_cast<Session*>(priv.get()); - if (!s) { - s = new Session{cct, con}; - con->set_priv(RefCountedPtr{s, false}); + if (auto s = ceph::ref_cast<Session>(con->get_priv()); !s) { + s = ceph::make_ref<Session>(cct, con); + con->set_priv(s); dout(10) << "new session (incoming)" << s << " con=" << con << " addr=" << con->get_peer_addr() << " must have raced with connect" << dendl; @@ -5725,9 +5719,8 @@ void OSD::ms_handle_fast_accept(Connection *con) bool OSD::ms_handle_reset(Connection *con) { - auto s = con->get_priv(); - auto session = static_cast<Session*>(s.get()); - dout(2) << "ms_handle_reset con " << con << " session " << session << dendl; + auto session = ceph::ref_cast<Session>(con->get_priv()); + dout(2) << "ms_handle_reset con " << con << " session " << session.get() << dendl; if (!session) return false; session->wstate.reset(con); @@ -5736,7 +5729,7 @@ bool OSD::ms_handle_reset(Connection *con) // note that we break session->con *before* the session_handle_reset // cleanup below. this avoids a race between us and // PG::add_backoff, Session::check_backoff, etc. - session_handle_reset(SessionRef{session}); + session_handle_reset(session); return true; } @@ -5745,9 +5738,8 @@ bool OSD::ms_handle_refused(Connection *con) if (!cct->_conf->osd_fast_fail_on_connection_refused) return false; - auto priv = con->get_priv(); - auto session = static_cast<Session*>(priv.get()); - dout(2) << "ms_handle_refused con " << con << " session " << session << dendl; + auto session = ceph::ref_cast<Session>(con->get_priv()); + dout(2) << "ms_handle_refused con " << con << " session " << session.get() << dendl; if (!session) return false; int type = con->get_peer_type(); @@ -6320,8 +6312,7 @@ void OSD::handle_command(MMonCommand *m) void OSD::handle_command(MCommand *m) { ConnectionRef con = m->get_connection(); - auto priv = con->get_priv(); - auto session = static_cast<Session *>(priv.get()); + auto session = ceph::ref_cast<Session>(con->get_priv()); if (!session) { con->send_message(new MCommandReply(m, -EPERM)); m->put(); @@ -6329,7 +6320,6 @@ void OSD::handle_command(MCommand *m) } OSDCap& caps = session->caps; - priv.reset(); if (!caps.allow_all() || m->get_source().is_mon()) { con->send_message(new MCommandReply(m, -EPERM)); @@ -7154,8 +7144,7 @@ void OSDService::maybe_share_map( { // NOTE: we assume caller hold something that keeps the Connection itself // pinned (e.g., an OpRequest's MessageRef). - auto priv = con->get_priv(); - auto session = static_cast<Session*>(priv.get()); + auto session = ceph::ref_cast<Session>(con->get_priv()); if (!session) { return; } @@ -7191,7 +7180,7 @@ void OSDService::maybe_share_map( session->sent_epoch_lock.unlock(); } -void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap) +void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap) { ceph_assert(ceph_mutex_is_locked(session->session_dispatch_lock)); @@ -7327,11 +7316,10 @@ void OSD::ms_fast_dispatch(Message *m) int OSD::ms_handle_authentication(Connection *con) { int ret = 0; - auto priv = con->get_priv(); - Session *s = static_cast<Session*>(priv.get()); + auto s = ceph::ref_cast<Session>(con->get_priv()); if (!s) { - s = new Session(cct, con); - con->set_priv(RefCountedPtr{s, false}); + s = ceph::make_ref<Session>(cct, con); + con->set_priv(s); s->entity_name = con->get_peer_entity_name(); dout(10) << __func__ << " new session " << s << " con " << s->con << " entity " << s->entity_name @@ -8017,9 +8005,8 @@ void OSD::handle_osd_map(MOSDMap *m) return; } - auto priv = m->get_connection()->get_priv(); - if (auto session = static_cast<Session *>(priv.get()); - session && !(session->entity_name.is_mon() || + auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv()); + if (session && !(session->entity_name.is_mon() || session->entity_name.is_osd())) { //not enough perms! dout(10) << "got osd map from Session " << session @@ -9055,8 +9042,7 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map, << dendl; ConnectionRef con = m->get_connection(); con->mark_down(); - auto priv = con->get_priv(); - if (auto s = static_cast<Session*>(priv.get()); s) { + if (auto s = ceph::ref_cast<Session>(con->get_priv()); s) { if (!is_fast_dispatch) s->session_dispatch_lock.lock(); clear_session_waiting_on_map(s); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 0468c66c416..3c8d9678a23 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1334,36 +1334,35 @@ private: // -- sessions -- private: - void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap); + void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap); ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock"); - set<SessionRef> session_waiting_for_map; + set<ceph::ref_t<Session>> session_waiting_for_map; /// Caller assumes refs for included Sessions - void get_sessions_waiting_for_map(set<SessionRef> *out) { + void get_sessions_waiting_for_map(set<ceph::ref_t<Session>> *out) { std::lock_guard l(session_waiting_lock); out->swap(session_waiting_for_map); } - void register_session_waiting_on_map(SessionRef session) { + void register_session_waiting_on_map(const ceph::ref_t<Session>& session) { std::lock_guard l(session_waiting_lock); session_waiting_for_map.insert(session); } - void clear_session_waiting_on_map(SessionRef session) { + void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) { std::lock_guard l(session_waiting_lock); session_waiting_for_map.erase(session); } void dispatch_sessions_waiting_on_map() { - set<SessionRef> sessions_to_check; + set<ceph::ref_t<Session>> sessions_to_check; get_sessions_waiting_for_map(&sessions_to_check); for (auto i = sessions_to_check.begin(); i != sessions_to_check.end(); sessions_to_check.erase(i++)) { std::lock_guard l{(*i)->session_dispatch_lock}; - SessionRef session = *i; - dispatch_session_waiting(session, osdmap); + dispatch_session_waiting(*i, osdmap); } } - void session_handle_reset(SessionRef session) { + void session_handle_reset(const ceph::ref_t<Session>& session) { std::lock_guard l(session->session_dispatch_lock); clear_session_waiting_on_map(session); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 3d02bab0580..ae28ad704d1 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -605,24 +605,22 @@ void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx, snap_mapper.update_bits(split_bits); } -void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end) +void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end) { - ConnectionRef con = s->con; + auto con = s->con; if (!con) // OSD::ms_handle_reset clears s->con without a lock return; - BackoffRef b(s->have_backoff(info.pgid, begin)); + auto b = s->have_backoff(info.pgid, begin); if (b) { derr << __func__ << " already have backoff for " << s << " begin " << begin << " " << *b << dendl; ceph_abort(); } std::lock_guard l(backoff_lock); - { - b = new Backoff(info.pgid, this, s, ++s->backoff_seq, begin, end); - backoffs[begin].insert(b); - s->add_backoff(b); - dout(10) << __func__ << " session " << s << " added " << *b << dendl; - } + b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end); + backoffs[begin].insert(b); + s->add_backoff(b); + dout(10) << __func__ << " session " << s << " added " << *b << dendl; con->send_message( new MOSDBackoff( info.pgid, @@ -636,7 +634,7 @@ void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end) void PG::release_backoffs(const hobject_t& begin, const hobject_t& end) { dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl; - vector<BackoffRef> bv; + vector<ceph::ref_t<Backoff>> bv; { std::lock_guard l(backoff_lock); auto p = backoffs.lower_bound(begin); @@ -698,7 +696,7 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end) void PG::clear_backoffs() { dout(10) << __func__ << " " << dendl; - map<hobject_t,set<BackoffRef>> ls; + map<hobject_t,set<ceph::ref_t<Backoff>>> ls; { std::lock_guard l(backoff_lock); ls.swap(backoffs); @@ -722,7 +720,7 @@ void PG::clear_backoffs() } // called by Session::clear_backoffs() -void PG::rm_backoff(BackoffRef b) +void PG::rm_backoff(const ceph::ref_t<Backoff>& b) { dout(10) << __func__ << " " << *b << dendl; std::lock_guard l(backoff_lock); diff --git a/src/osd/PG.h b/src/osd/PG.h index 585c93e51b0..1eb149d5545 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -356,7 +356,7 @@ public: __u8 &); static bool _has_removal_flag(ObjectStore *store, spg_t pgid); - void rm_backoff(BackoffRef b); + void rm_backoff(const ceph::ref_t<Backoff>& b); void update_snap_mapper_bits(uint32_t bits) { snap_mapper.update_bits(bits); @@ -1063,16 +1063,16 @@ protected: // -- backoff -- ceph::mutex backoff_lock = // orders inside Backoff::lock ceph::make_mutex("PG::backoff_lock"); - map<hobject_t,set<BackoffRef>> backoffs; + map<hobject_t,set<ceph::ref_t<Backoff>>> backoffs; - void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end); + void add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end); void release_backoffs(const hobject_t& begin, const hobject_t& end); void release_backoffs(const hobject_t& o) { release_backoffs(o, o); } void clear_backoffs(); - void add_pg_backoff(SessionRef s) { + void add_pg_backoff(const ceph::ref_t<Session>& s) { hobject_t begin = info.pgid.pgid.get_hobj_start(); hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); add_backoff(s, begin, end); diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index cbd6f500b91..fac4109c122 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -109,10 +109,6 @@ struct HeartbeatStamps : public RefCountedObject { /// highest up_from we've seen from this rank epoch_t up_from = 0; - HeartbeatStamps(int o) - : RefCountedObject(NULL, 0), - osd(o) {} - void print(ostream& out) const { std::lock_guard l(lock); out << "hbstamp(osd." << osd << " up_from " << up_from @@ -163,8 +159,13 @@ struct HeartbeatStamps : public RefCountedObject { peer_clock_delta_ub = delta_ub; } +private: + FRIEND_MAKE_REF(HeartbeatStamps); + HeartbeatStamps(int o) + : RefCountedObject(NULL), + osd(o) {} }; -typedef boost::intrusive_ptr<HeartbeatStamps> HeartbeatStampsRef; +using HeartbeatStampsRef = ceph::ref_t<HeartbeatStamps>; inline ostream& operator<<(ostream& out, const HeartbeatStamps& hb) { diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index fa58df8231e..eb06312d0a7 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1439,7 +1439,7 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo void PrimaryLogPG::handle_backoff(OpRequestRef& op) { auto m = op->get_req<MOSDBackoff>(); - SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())}; + auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv()); if (!session) return; // drop it. hobject_t begin = info.pgid.pgid.get_hobj_start(); @@ -1490,7 +1490,7 @@ void PrimaryLogPG::do_request( const Message *m = op->get_req(); int msg_type = m->get_type(); if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) { - SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())}; + auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv()); if (!session) return; // drop it. @@ -1674,7 +1674,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op) bool can_backoff = m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF); - SessionRef session; + ceph::ref_t<Session> session; if (can_backoff) { session = static_cast<Session*>(m->get_connection()->get_priv().get()); if (!session.get()) { diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index f7d923f9748..187fb6a1556 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -281,10 +281,10 @@ void ReplicatedBackend::objects_read_async( class C_OSD_OnOpCommit : public Context { ReplicatedBackend *pg; - ReplicatedBackend::InProgressOpRef op; + ceph::ref_t<ReplicatedBackend::InProgressOp> op; public: - C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) - : pg(pg), op(op) {} + C_OSD_OnOpCommit(ReplicatedBackend *pg, ceph::ref_t<ReplicatedBackend::InProgressOp> op) + : pg(pg), op(std::move(op)) {} void finish(int) override { pg->op_commit(op); } @@ -479,7 +479,7 @@ void ReplicatedBackend::submit_transaction( auto insert_res = in_progress_ops.insert( make_pair( tid, - new InProgressOp( + ceph::make_ref<InProgressOp>( tid, on_all_commit, orig_op, at_version) ) @@ -529,8 +529,7 @@ void ReplicatedBackend::submit_transaction( } } -void ReplicatedBackend::op_commit( - InProgressOpRef& op) +void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op) { if (op->on_commit == nullptr) { // aborted diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index f70a0501179..4e272e492ee 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -339,18 +339,17 @@ private: Context *on_commit; OpRequestRef op; eversion_t v; - InProgressOp( - ceph_tid_t tid, Context *on_commit, - OpRequestRef op, eversion_t v) - : RefCountedObject(nullptr, 0), - tid(tid), on_commit(on_commit), - op(op), v(v) {} bool done() const { return waiting_for_commit.empty(); } + private: + FRIEND_MAKE_REF(InProgressOp); + InProgressOp(ceph_tid_t tid, Context *on_commit, OpRequestRef op, eversion_t v) + : + tid(tid), on_commit(on_commit), + op(op), v(v) {} }; - typedef boost::intrusive_ptr<InProgressOp> InProgressOpRef; - map<ceph_tid_t, InProgressOpRef> in_progress_ops; + map<ceph_tid_t, ceph::ref_t<InProgressOp>> in_progress_ops; public: friend class C_OSD_OnOpCommit; @@ -403,7 +402,7 @@ private: std::optional<pg_hit_set_history_t> &hset_history, InProgressOp *op, ObjectStore::Transaction &op_t); - void op_commit(InProgressOpRef& op); + void op_commit(const ceph::ref_t<InProgressOp>& op); void do_repop_reply(OpRequestRef op); void do_repop(OpRequestRef op); diff --git a/src/osd/Session.cc b/src/osd/Session.cc index 44b5817a374..c3699593e5a 100644 --- a/src/osd/Session.cc +++ b/src/osd/Session.cc @@ -11,7 +11,7 @@ void Session::clear_backoffs() { - map<spg_t,map<hobject_t,set<BackoffRef>>> ls; + map<spg_t,map<hobject_t,set<ceph::ref_t<Backoff>>>> ls; { std::lock_guard l(backoff_lock); ls.swap(backoffs); @@ -85,7 +85,7 @@ void Session::ack_backoff( bool Session::check_backoff( CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m) { - BackoffRef b(have_backoff(pgid, oid)); + auto b = have_backoff(pgid, oid); if (b) { dout(10) << __func__ << " session " << this << " has backoff " << *b << " for " << *m << dendl; diff --git a/src/osd/Session.h b/src/osd/Session.h index 432d3ee8744..27000a95a56 100644 --- a/src/osd/Session.h +++ b/src/osd/Session.h @@ -26,10 +26,6 @@ //#define PG_DEBUG_REFS -struct Session; -typedef boost::intrusive_ptr<Session> SessionRef; -struct Backoff; -typedef boost::intrusive_ptr<Backoff> BackoffRef; class PG; #ifdef PG_DEBUG_REFS #include "common/tracked_int_ptr.hpp" @@ -103,20 +99,9 @@ struct Backoff : public RefCountedObject { // - both null (teardown), or // - only session is set (and state == DELETING) PGRef pg; ///< owning pg - SessionRef session; ///< owning session + ceph::ref_t<class Session> session; ///< owning session hobject_t begin, end; ///< [) range to block, unless ==, then single obj - Backoff(spg_t pgid, PGRef pg, SessionRef s, - uint64_t i, - const hobject_t& b, const hobject_t& e) - : RefCountedObject(g_ceph_context, 0), - pgid(pgid), - id(i), - pg(pg), - session(s), - begin(b), - end(e) {} - friend ostream& operator<<(ostream& out, const Backoff& b) { return out << "Backoff(" << &b << " " << b.pgid << " " << b.id << " " << b.get_state_name() @@ -124,6 +109,19 @@ struct Backoff : public RefCountedObject { << " session " << b.session << " pg " << b.pg << ")"; } + +private: + FRIEND_MAKE_REF(Backoff); + Backoff(spg_t pgid, PGRef pg, ceph::ref_t<Session> s, + uint64_t i, + const hobject_t& b, const hobject_t& e) + : RefCountedObject(g_ceph_context), + pgid(pgid), + id(i), + pg(pg), + session(std::move(s)), + begin(b), + end(e) {} }; @@ -140,12 +138,12 @@ struct Session : public RefCountedObject { boost::intrusive::list<OpRequest> waiting_on_map; ceph::spinlock sent_epoch_lock; - epoch_t last_sent_epoch; + epoch_t last_sent_epoch = 0; /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock"); std::atomic<int> backoff_count= {0}; ///< simple count of backoffs - map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs; + map<spg_t,map<hobject_t,set<ceph::ref_t<Backoff>>>> backoffs; std::atomic<uint64_t> backoff_seq = {0}; @@ -153,14 +151,6 @@ struct Session : public RefCountedObject { int peer = -1; HeartbeatStampsRef stamps; - explicit Session(CephContext *cct, Connection *con_) : - RefCountedObject(cct), - con(con_), - socket_addr(con_->get_peer_socket_addr()), - wstate(cct), - last_sent_epoch(0) - {} - entity_addr_t& get_peer_socket_addr() { return socket_addr; } @@ -172,7 +162,7 @@ struct Session : public RefCountedObject { const hobject_t& start, const hobject_t& end); - BackoffRef have_backoff(spg_t pgid, const hobject_t& oid) { + ceph::ref_t<Backoff> have_backoff(spg_t pgid, const hobject_t& oid) { if (!backoff_count.load()) { return nullptr; } @@ -203,15 +193,15 @@ struct Session : public RefCountedObject { bool check_backoff( CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m); - void add_backoff(BackoffRef b) { + void add_backoff(ceph::ref_t<Backoff> b) { std::lock_guard l(backoff_lock); ceph_assert(!backoff_count == backoffs.empty()); - backoffs[b->pgid][b->begin].insert(b); + backoffs[b->pgid][b->begin].insert(std::move(b)); ++backoff_count; } // called by PG::release_*_backoffs and PG::clear_backoffs() - void rm_backoff(BackoffRef b) { + void rm_backoff(const ceph::ref_t<Backoff>& b) { std::lock_guard l(backoff_lock); ceph_assert(ceph_mutex_is_locked_by_me(b->lock)); ceph_assert(b->session == this); @@ -236,6 +226,15 @@ struct Session : public RefCountedObject { ceph_assert(!backoff_count == backoffs.empty()); } void clear_backoffs(); + +private: + FRIEND_MAKE_REF(Session); + explicit Session(CephContext *cct, Connection *con_) : + RefCountedObject(cct), + con(con_), + socket_addr(con_->get_peer_socket_addr()), + wstate(cct) + {} }; #endif diff --git a/src/test/direct_messenger/DirectMessenger.cc b/src/test/direct_messenger/DirectMessenger.cc index 076f5fc39a6..c554a379402 100644 --- a/src/test/direct_messenger/DirectMessenger.cc +++ b/src/test/direct_messenger/DirectMessenger.cc @@ -28,13 +28,15 @@ class DirectConnection : public Connection { /// clear this pointer) before dropping its own reference std::atomic<Connection*> reply_connection{nullptr}; - public: + private: + FRIEND_MAKE_REF(DirectConnection); DirectConnection(CephContext *cct, DirectMessenger *m, DispatchStrategy *dispatchers) : Connection(cct, m), dispatchers(dispatchers) {} + public: /// sets the Connection that will receive replies to outgoing messages void set_direct_reply_connection(ConnectionRef conn); @@ -100,8 +102,7 @@ static ConnectionRef create_loopback(DirectMessenger *m, entity_name_t name, DispatchStrategy *dispatchers) { - auto loopback = boost::intrusive_ptr<DirectConnection>( - new DirectConnection(m->cct, m, dispatchers)); + auto loopback = ceph::make_ref<DirectConnection>(m->cct, m, dispatchers); // loopback replies go to itself loopback->set_direct_reply_connection(loopback); loopback->set_peer_type(name.type()); @@ -131,8 +132,7 @@ int DirectMessenger::set_direct_peer(DirectMessenger *peer) peer_inst = peer->get_myinst(); // allocate a Connection that dispatches to the peer messenger - auto direct_connection = boost::intrusive_ptr<DirectConnection>( - new DirectConnection(cct, peer, peer->dispatchers.get())); + auto direct_connection = ceph::make_ref<DirectConnection>(cct, peer, peer->dispatchers.get()); direct_connection->set_peer_addr(peer_inst.addr); direct_connection->set_peer_type(peer_inst.name.type()); diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc index 57325380186..64d05eef703 100644 --- a/src/test/journal/RadosTestFixture.cc +++ b/src/test/journal/RadosTestFixture.cc @@ -68,15 +68,15 @@ int RadosTestFixture::create(const std::string &oid, uint8_t order, return cls::journal::client::create(m_ioctx, oid, order, splay_width, -1); } -journal::JournalMetadataPtr RadosTestFixture::create_metadata( +ceph::ref_t<journal::JournalMetadata> RadosTestFixture::create_metadata( const std::string &oid, const std::string &client_id, double commit_interval, int max_concurrent_object_sets) { journal::Settings settings; settings.commit_interval = commit_interval; settings.max_concurrent_object_sets = max_concurrent_object_sets; - journal::JournalMetadataPtr metadata(new journal::JournalMetadata( - m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings)); + auto metadata = ceph::make_ref<journal::JournalMetadata>( + m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings); m_metadatas.push_back(metadata); return metadata; } @@ -109,13 +109,13 @@ bufferlist RadosTestFixture::create_payload(const std::string &payload) { return bl; } -int RadosTestFixture::init_metadata(journal::JournalMetadataPtr metadata) { +int RadosTestFixture::init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata) { C_SaferCond cond; metadata->init(&cond); return cond.wait(); } -bool RadosTestFixture::wait_for_update(journal::JournalMetadataPtr metadata) { +bool RadosTestFixture::wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata) { std::unique_lock locker{m_listener.mutex}; while (m_listener.updates[metadata.get()] == 0) { if (m_listener.cond.wait_for(locker, 10s) == std::cv_status::timeout) { diff --git a/src/test/journal/RadosTestFixture.h b/src/test/journal/RadosTestFixture.h index 9600c839bd2..8ec66293102 100644 --- a/src/test/journal/RadosTestFixture.h +++ b/src/test/journal/RadosTestFixture.h @@ -23,7 +23,7 @@ public: int create(const std::string &oid, uint8_t order = 14, uint8_t splay_width = 2); - journal::JournalMetadataPtr create_metadata(const std::string &oid, + ceph::ref_t<journal::JournalMetadata> create_metadata(const std::string &oid, const std::string &client_id = "client", double commit_internal = 0.1, int max_concurrent_object_sets = 0); @@ -52,9 +52,9 @@ public: } }; - int init_metadata(journal::JournalMetadataPtr metadata); + int init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata); - bool wait_for_update(journal::JournalMetadataPtr metadata); + bool wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata); static std::string _pool_name; static librados::Rados _rados; @@ -70,5 +70,5 @@ public: Listener m_listener; - std::list<journal::JournalMetadataPtr> m_metadatas; + std::list<ceph::ref_t<journal::JournalMetadata>> m_metadatas; }; diff --git a/src/test/journal/test_FutureImpl.cc b/src/test/journal/test_FutureImpl.cc index e4e127d6ee9..1ff346dcf3b 100644 --- a/src/test/journal/test_FutureImpl.cc +++ b/src/test/journal/test_FutureImpl.cc @@ -9,46 +9,39 @@ class TestFutureImpl : public RadosTestFixture { public: struct FlushHandler : public journal::FutureImpl::FlushHandler { - uint64_t refs; - uint64_t flushes; - FlushHandler() : refs(0), flushes(0) {} - void get() override { - ++refs; - } - void put() override { - ceph_assert(refs > 0); - --refs; - } - void flush(const journal::FutureImplPtr &future) override { + uint64_t flushes = 0; + void flush(const ceph::ref_t<journal::FutureImpl>& future) override { ++flushes; } + FlushHandler() = default; }; - journal::FutureImplPtr create_future(uint64_t tag_tid, uint64_t entry_tid, - uint64_t commit_tid, - const journal::FutureImplPtr &prev = - journal::FutureImplPtr()) { - journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, - entry_tid, - commit_tid)); + TestFutureImpl() { + m_flush_handler = std::make_shared<FlushHandler>(); + } + + auto create_future(uint64_t tag_tid, uint64_t entry_tid, + uint64_t commit_tid, + ceph::ref_t<journal::FutureImpl> prev = nullptr) { + auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, commit_tid); future->init(prev); return future; } - void flush(const journal::FutureImplPtr &future) { + void flush(const ceph::ref_t<journal::FutureImpl>& future) { } - FlushHandler m_flush_handler; + std::shared_ptr<FlushHandler> m_flush_handler; }; TEST_F(TestFutureImpl, Getters) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); + auto future = create_future(234, 123, 456); ASSERT_EQ(234U, future->get_tag_tid()); ASSERT_EQ(123U, future->get_entry_tid()); ASSERT_EQ(456U, future->get_commit_tid()); @@ -58,68 +51,68 @@ TEST_F(TestFutureImpl, Attach) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); - ASSERT_FALSE(future->attach(&m_flush_handler)); - ASSERT_EQ(1U, m_flush_handler.refs); + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); + ASSERT_EQ(2U, m_flush_handler.use_count()); } TEST_F(TestFutureImpl, AttachWithPendingFlush) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); + auto future = create_future(234, 123, 456); future->flush(NULL); - ASSERT_TRUE(future->attach(&m_flush_handler)); - ASSERT_EQ(1U, m_flush_handler.refs); + ASSERT_TRUE(future->attach(m_flush_handler)); + ASSERT_EQ(2U, m_flush_handler.use_count()); } TEST_F(TestFutureImpl, Detach) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); - ASSERT_FALSE(future->attach(&m_flush_handler)); + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); future->detach(); - ASSERT_EQ(0U, m_flush_handler.refs); + ASSERT_EQ(1U, m_flush_handler.use_count()); } TEST_F(TestFutureImpl, DetachImplicit) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); - ASSERT_FALSE(future->attach(&m_flush_handler)); + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); future.reset(); - ASSERT_EQ(0U, m_flush_handler.refs); + ASSERT_EQ(1U, m_flush_handler.use_count()); } TEST_F(TestFutureImpl, Flush) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); - ASSERT_FALSE(future->attach(&m_flush_handler)); + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); C_SaferCond cond; future->flush(&cond); - ASSERT_EQ(1U, m_flush_handler.flushes); + ASSERT_EQ(1U, m_flush_handler->flushes); future->safe(-EIO); ASSERT_EQ(-EIO, cond.wait()); } @@ -128,14 +121,14 @@ TEST_F(TestFutureImpl, FlushWithoutContext) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); - ASSERT_FALSE(future->attach(&m_flush_handler)); + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); future->flush(NULL); - ASSERT_EQ(1U, m_flush_handler.flushes); + ASSERT_EQ(1U, m_flush_handler->flushes); future->safe(-EIO); ASSERT_TRUE(future->is_complete()); ASSERT_EQ(-EIO, future->get_return_value()); @@ -145,25 +138,23 @@ TEST_F(TestFutureImpl, FlushChain) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future1 = create_future(234, 123, 456); - journal::FutureImplPtr future2 = create_future(234, 124, 457, - future1); - journal::FutureImplPtr future3 = create_future(235, 1, 458, - future2); + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); + auto future3 = create_future(235, 1, 458, future2); - FlushHandler flush_handler; - ASSERT_FALSE(future1->attach(&m_flush_handler)); - ASSERT_FALSE(future2->attach(&flush_handler)); - ASSERT_FALSE(future3->attach(&m_flush_handler)); + auto flush_handler = std::make_shared<FlushHandler>(); + ASSERT_FALSE(future1->attach(m_flush_handler)); + ASSERT_FALSE(future2->attach(flush_handler)); + ASSERT_FALSE(future3->attach(m_flush_handler)); C_SaferCond cond; future3->flush(&cond); - ASSERT_EQ(1U, m_flush_handler.flushes); - ASSERT_EQ(1U, flush_handler.flushes); + ASSERT_EQ(1U, m_flush_handler->flushes); + ASSERT_EQ(1U, flush_handler->flushes); future3->safe(0); ASSERT_FALSE(future3->is_complete()); @@ -182,20 +173,19 @@ TEST_F(TestFutureImpl, FlushInProgress) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future1 = create_future(234, 123, 456); - journal::FutureImplPtr future2 = create_future(234, 124, 457, - future1); - ASSERT_FALSE(future1->attach(&m_flush_handler)); - ASSERT_FALSE(future2->attach(&m_flush_handler)); + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); + ASSERT_FALSE(future1->attach(m_flush_handler)); + ASSERT_FALSE(future2->attach(m_flush_handler)); future1->set_flush_in_progress(); ASSERT_TRUE(future1->is_flush_in_progress()); future1->flush(NULL); - ASSERT_EQ(0U, m_flush_handler.flushes); + ASSERT_EQ(0U, m_flush_handler->flushes); future1->safe(0); } @@ -204,10 +194,10 @@ TEST_F(TestFutureImpl, FlushAlreadyComplete) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 123, 456); + auto future = create_future(234, 123, 456); future->safe(-EIO); C_SaferCond cond; @@ -219,10 +209,10 @@ TEST_F(TestFutureImpl, Wait) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 1, 456); + auto future = create_future(234, 1, 456); C_SaferCond cond; future->wait(&cond); @@ -234,10 +224,10 @@ TEST_F(TestFutureImpl, WaitAlreadyComplete) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future = create_future(234, 1, 456); + auto future = create_future(234, 1, 456); future->safe(-EEXIST); C_SaferCond cond; @@ -249,12 +239,11 @@ TEST_F(TestFutureImpl, SafePreservesError) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future1 = create_future(234, 123, 456); - journal::FutureImplPtr future2 = create_future(234, 124, 457, - future1); + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); future1->safe(-EIO); future2->safe(-EEXIST); @@ -266,12 +255,11 @@ TEST_F(TestFutureImpl, ConsistentPreservesError) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::FutureImplPtr future1 = create_future(234, 123, 456); - journal::FutureImplPtr future2 = create_future(234, 124, 457, - future1); + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); future2->safe(-EEXIST); future1->safe(-EIO); diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc index 7cecb6fb1ce..4108d4da3ec 100644 --- a/src/test/journal/test_JournalMetadata.cc +++ b/src/test/journal/test_JournalMetadata.cc @@ -18,25 +18,25 @@ public: RadosTestFixture::TearDown(); } - journal::JournalMetadataPtr create_metadata(const std::string &oid, - const std::string &client_id, - double commit_interval = 0.1, - int max_concurrent_object_sets = 0) { - journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata( + auto create_metadata(const std::string &oid, + const std::string &client_id, + double commit_interval = 0.1, + int max_concurrent_object_sets = 0) { + auto metadata = RadosTestFixture::create_metadata( oid, client_id, commit_interval, max_concurrent_object_sets); m_metadata_list.push_back(metadata); metadata->add_listener(&m_listener); return metadata; } - typedef std::list<journal::JournalMetadataPtr> MetadataList; + typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList; MetadataList m_metadata_list; }; TEST_F(TestJournalMetadata, JournalDNE) { std::string oid = get_temp_oid(); - journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1"); + auto metadata1 = create_metadata(oid, "client1"); ASSERT_EQ(-ENOENT, init_metadata(metadata1)); } @@ -46,10 +46,10 @@ TEST_F(TestJournalMetadata, ClientDNE) { ASSERT_EQ(0, create(oid, 14, 2)); ASSERT_EQ(0, client_register(oid, "client1", "")); - journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1"); + auto metadata1 = create_metadata(oid, "client1"); ASSERT_EQ(0, init_metadata(metadata1)); - journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client2"); + auto metadata2 = create_metadata(oid, "client2"); ASSERT_EQ(-ENOENT, init_metadata(metadata2)); } @@ -59,10 +59,10 @@ TEST_F(TestJournalMetadata, Committed) { ASSERT_EQ(0, create(oid, 14, 2)); ASSERT_EQ(0, client_register(oid, "client1", "")); - journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1", 600); + auto metadata1 = create_metadata(oid, "client1", 600); ASSERT_EQ(0, init_metadata(metadata1)); - journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1"); + auto metadata2 = create_metadata(oid, "client1"); ASSERT_EQ(0, init_metadata(metadata2)); ASSERT_TRUE(wait_for_update(metadata2)); @@ -104,7 +104,7 @@ TEST_F(TestJournalMetadata, UpdateActiveObject) { ASSERT_EQ(0, create(oid, 14, 2)); ASSERT_EQ(0, client_register(oid, "client1", "")); - journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1"); + auto metadata1 = create_metadata(oid, "client1"); ASSERT_EQ(0, init_metadata(metadata1)); ASSERT_TRUE(wait_for_update(metadata1)); @@ -124,7 +124,7 @@ TEST_F(TestJournalMetadata, DisconnectLaggyClient) { ASSERT_EQ(0, client_register(oid, "client2", "laggy")); int max_concurrent_object_sets = 100; - journal::JournalMetadataPtr metadata = + auto metadata = create_metadata(oid, "client1", 0.1, max_concurrent_object_sets); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_TRUE(wait_for_update(metadata)); @@ -186,7 +186,7 @@ TEST_F(TestJournalMetadata, AssertActiveTag) { ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid, "client1", "")); - journal::JournalMetadataPtr metadata = create_metadata(oid, "client1"); + auto metadata = create_metadata(oid, "client1"); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_TRUE(wait_for_update(metadata)); diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index 94490c2357a..1601705dacf 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -32,9 +32,6 @@ public: : entries_available(false), complete(false), complete_result(0) {} - void get() override {} - void put() override {} - void handle_entries_available() override { std::lock_guard locker{lock}; entries_available = true; @@ -57,7 +54,7 @@ public: RadosTestFixture::TearDown(); } - journal::JournalMetadataPtr create_metadata(const std::string &oid) { + auto create_metadata(const std::string &oid) { return RadosTestFixture::create_metadata(oid, "client", 0.1, max_fetch_bytes); } @@ -75,7 +72,7 @@ public: } journal::JournalPlayer *create_player(const std::string &oid, - const journal::JournalMetadataPtr &metadata) { + const ceph::ref_t<journal::JournalMetadata>& metadata) { journal::JournalPlayer *player(new journal::JournalPlayer( m_ioctx, oid + ".", metadata, &m_replay_hander, nullptr)); m_players.push_back(player); @@ -156,7 +153,7 @@ TYPED_TEST(TestJournalPlayer, Prefetch) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -202,7 +199,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchSkip) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -237,7 +234,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -277,7 +274,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -316,7 +313,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -350,7 +347,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -400,7 +397,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -436,7 +433,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -475,7 +472,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -511,7 +508,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -548,7 +545,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -586,7 +583,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); ASSERT_EQ(0, metadata->set_active_set(2)); @@ -637,7 +634,7 @@ TYPED_TEST(TestJournalPlayer, ImbalancedJournal) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); ASSERT_EQ(0, metadata->set_active_set(2)); metadata->set_minimum_set(2); @@ -686,7 +683,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -736,7 +733,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -791,7 +788,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -827,7 +824,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -886,7 +883,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -922,7 +919,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, commit_position)); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -964,7 +961,7 @@ TYPED_TEST(TestJournalPlayer, PrefechShutDown) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, {})); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); @@ -983,7 +980,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) { ASSERT_EQ(0, this->client_register(oid)); ASSERT_EQ(0, this->client_commit(oid, {})); - journal::JournalMetadataPtr metadata = this->create_metadata(oid); + auto metadata = this->create_metadata(oid); ASSERT_EQ(0, this->init_metadata(metadata)); journal::JournalPlayer *player = this->create_player(oid, metadata); diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc index 41518f8e9be..466ee274128 100644 --- a/src/test/journal/test_JournalRecorder.cc +++ b/src/test/journal/test_JournalRecorder.cc @@ -14,7 +14,7 @@ public: using JournalRecorderPtr = std::unique_ptr<journal::JournalRecorder, std::function<void(journal::JournalRecorder*)>>; JournalRecorderPtr create_recorder( - const std::string &oid, const journal::JournalMetadataPtr &metadata) { + const std::string &oid, const ceph::ref_t<journal::JournalMetadata>& metadata) { JournalRecorderPtr recorder{ new journal::JournalRecorder(m_ioctx, oid + ".", metadata, 0), [](journal::JournalRecorder* recorder) { @@ -34,7 +34,7 @@ TEST_F(TestJournalRecorder, Append) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); JournalRecorderPtr recorder = create_recorder(oid, metadata); @@ -51,7 +51,7 @@ TEST_F(TestJournalRecorder, AppendKnownOverflow) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_EQ(0U, metadata->get_active_set()); @@ -73,7 +73,7 @@ TEST_F(TestJournalRecorder, AppendDelayedOverflow) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_EQ(0U, metadata->get_active_set()); @@ -98,7 +98,7 @@ TEST_F(TestJournalRecorder, FutureFlush) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); JournalRecorderPtr recorder = create_recorder(oid, metadata); @@ -118,7 +118,7 @@ TEST_F(TestJournalRecorder, Flush) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); JournalRecorderPtr recorder = create_recorder(oid, metadata); @@ -142,7 +142,7 @@ TEST_F(TestJournalRecorder, OverflowCommitObjectNumber) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_EQ(0U, metadata->get_active_set()); diff --git a/src/test/journal/test_JournalTrimmer.cc b/src/test/journal/test_JournalTrimmer.cc index 6d6917e4300..aaf10979fdf 100644 --- a/src/test/journal/test_JournalTrimmer.cc +++ b/src/test/journal/test_JournalTrimmer.cc @@ -28,7 +28,7 @@ public: RadosTestFixture::TearDown(); } - int append_payload(journal::JournalMetadataPtr metadata, + int append_payload(const ceph::ref_t<journal::JournalMetadata>& metadata, const std::string &oid, uint64_t object_num, const std::string &payload, uint64_t *commit_tid) { int r = append(oid + "." + stringify(object_num), create_payload(payload)); @@ -39,16 +39,15 @@ public: return r; } - journal::JournalMetadataPtr create_metadata(const std::string &oid) { - journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata( - oid); + auto create_metadata(const std::string &oid) { + auto metadata = RadosTestFixture::create_metadata(oid); m_metadata_list.push_back(metadata); metadata->add_listener(&m_listener); return metadata; } journal::JournalTrimmer *create_trimmer(const std::string &oid, - const journal::JournalMetadataPtr &metadata) { + const ceph::ref_t<journal::JournalMetadata>& metadata) { journal::JournalTrimmer *trimmer(new journal::JournalTrimmer( m_ioctx, oid + ".", metadata)); m_trimmers.push_back(trimmer); @@ -61,7 +60,7 @@ public: return m_ioctx.operate(oid, &op); } - typedef std::list<journal::JournalMetadataPtr> MetadataList; + typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList; MetadataList m_metadata_list; std::list<journal::JournalTrimmer*> m_trimmers; }; @@ -71,7 +70,7 @@ TEST_F(TestJournalTrimmer, Committed) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_TRUE(wait_for_update(metadata)); @@ -114,7 +113,7 @@ TEST_F(TestJournalTrimmer, CommittedWithOtherClient) { ASSERT_EQ(0, client_register(oid)); ASSERT_EQ(0, client_register(oid, "client2", "slow client")); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_TRUE(wait_for_update(metadata)); @@ -149,7 +148,7 @@ TEST_F(TestJournalTrimmer, RemoveObjects) { ASSERT_EQ(0, create(oid, 12, 2)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_TRUE(wait_for_update(metadata)); @@ -181,7 +180,7 @@ TEST_F(TestJournalTrimmer, RemoveObjectsWithOtherClient) { ASSERT_EQ(0, client_register(oid)); ASSERT_EQ(0, client_register(oid, "client2", "other client")); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ASSERT_TRUE(wait_for_update(metadata)); diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc index 8e3f3323bba..5ac3d8b1211 100644 --- a/src/test/journal/test_ObjectPlayer.cc +++ b/src/test/journal/test_ObjectPlayer.cc @@ -10,19 +10,16 @@ #include "test/journal/RadosTestFixture.h" template <typename T> -class TestObjectPlayer : public RadosTestFixture { +class TestObjectPlayer : public RadosTestFixture, public T { public: - static const uint32_t max_fetch_bytes = T::max_fetch_bytes; - - journal::ObjectPlayerPtr create_object(const std::string &oid, - uint8_t order) { - journal::ObjectPlayerPtr object(new journal::ObjectPlayer( + auto create_object(const std::string &oid, uint8_t order) { + auto object = ceph::make_ref<journal::ObjectPlayer>( m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order, - max_fetch_bytes)); + T::max_fetch_bytes); return object; } - int fetch(journal::ObjectPlayerPtr object_player) { + int fetch(const ceph::ref_t<journal::ObjectPlayer>& object_player) { while (true) { C_SaferCond ctx; object_player->set_refetch_state( @@ -36,7 +33,7 @@ public: return 0; } - int watch_and_wait_for_entries(journal::ObjectPlayerPtr object_player, + int watch_and_wait_for_entries(const ceph::ref_t<journal::ObjectPlayer>& object_player, journal::ObjectPlayer::Entries *entries, size_t count) { for (size_t i = 0; i < 50; ++i) { @@ -63,7 +60,7 @@ public: template <uint32_t _max_fetch_bytes> struct TestObjectPlayerParams { - static const uint32_t max_fetch_bytes = _max_fetch_bytes; + static inline const uint32_t max_fetch_bytes = _max_fetch_bytes; }; typedef ::testing::Types<TestObjectPlayerParams<0>, @@ -81,7 +78,7 @@ TYPED_TEST(TestObjectPlayer, Fetch) { encode(entry2, bl); ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); ASSERT_LE(0, this->fetch(object)); journal::ObjectPlayer::Entries entries; @@ -104,7 +101,7 @@ TYPED_TEST(TestObjectPlayer, FetchLarge) { encode(entry2, bl); ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); - journal::ObjectPlayerPtr object = this->create_object(oid, 12); + auto object = this->create_object(oid, 12); ASSERT_LE(0, this->fetch(object)); journal::ObjectPlayer::Entries entries; @@ -126,7 +123,7 @@ TYPED_TEST(TestObjectPlayer, FetchDeDup) { encode(entry2, bl); ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); ASSERT_LE(0, this->fetch(object)); journal::ObjectPlayer::Entries entries; @@ -143,7 +140,7 @@ TYPED_TEST(TestObjectPlayer, FetchEmpty) { bufferlist bl; ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); ASSERT_EQ(0, this->fetch(object)); ASSERT_TRUE(object->empty()); @@ -161,7 +158,7 @@ TYPED_TEST(TestObjectPlayer, FetchCorrupt) { encode(entry2, bl); ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); ASSERT_EQ(-EBADMSG, this->fetch(object)); journal::ObjectPlayer::Entries entries; @@ -182,7 +179,7 @@ TYPED_TEST(TestObjectPlayer, FetchAppend) { encode(entry1, bl); ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); ASSERT_LE(0, this->fetch(object)); journal::ObjectPlayer::Entries entries; @@ -215,7 +212,7 @@ TYPED_TEST(TestObjectPlayer, PopEntry) { encode(entry2, bl); ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); ASSERT_LE(0, this->fetch(object)); journal::ObjectPlayer::Entries entries; @@ -234,7 +231,7 @@ TYPED_TEST(TestObjectPlayer, PopEntry) { TYPED_TEST(TestObjectPlayer, Watch) { std::string oid = this->get_temp_oid(); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); C_SaferCond cond1; object->watch(&cond1, 0.1); @@ -272,7 +269,7 @@ TYPED_TEST(TestObjectPlayer, Watch) { TYPED_TEST(TestObjectPlayer, Unwatch) { std::string oid = this->get_temp_oid(); - journal::ObjectPlayerPtr object = this->create_object(oid, 14); + auto object = this->create_object(oid, 14); C_SaferCond watch_ctx; object->watch(&watch_ctx, 600); diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index 87efca571a6..72e4fd9c194 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -76,12 +76,10 @@ public: } } } - journal::ObjectRecorderPtr create_object(const std::string& oid, - uint8_t order, - ceph::mutex* lock) { - journal::ObjectRecorderPtr object(new journal::ObjectRecorder( + auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) { + auto object = ceph::make_ref<journal::ObjectRecorder>( m_ioctx, oid, 0, lock, m_work_queue, &m_handler, - order, m_max_in_flight_appends)); + order, m_max_in_flight_appends); { std::lock_guard locker{*lock}; object->set_append_batch_options(m_flush_interval, @@ -115,16 +113,15 @@ public: double m_flush_age = 600; uint64_t m_max_in_flight_appends = 0; using ObjectRecorders = - std::list<std::pair<journal::ObjectRecorderPtr, ceph::mutex*>>; + std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>; ObjectRecorders m_object_recorders; Handler m_handler; }; journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid, const std::string &payload) { - journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid, - 456)); - future->init(journal::FutureImplPtr()); + auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456); + future->init(ceph::ref_t<journal::FutureImpl>()); bufferlist bl; bl.append(payload); @@ -136,12 +133,12 @@ TEST_F(TestObjectRecorder, Append) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); @@ -170,12 +167,12 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); @@ -203,12 +200,12 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); @@ -236,12 +233,12 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.1, -1); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); @@ -268,12 +265,12 @@ TEST_F(TestObjectRecorder, AppendFilledObject) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 12, &lock); + auto object = flusher.create_object(oid, 12, &lock); std::string payload(2048, '1'); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, @@ -301,12 +298,12 @@ TEST_F(TestObjectRecorder, Flush) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); @@ -331,12 +328,12 @@ TEST_F(TestObjectRecorder, FlushFuture) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer = create_append_buffer(234, 123, "payload"); @@ -359,12 +356,12 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer = create_append_buffer(234, 123, "payload"); @@ -388,12 +385,12 @@ TEST_F(TestObjectRecorder, Close) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1); - journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); + auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); @@ -418,14 +415,14 @@ TEST_F(TestObjectRecorder, Overflow) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid)); ASSERT_EQ(0, client_register(oid)); - journal::JournalMetadataPtr metadata = create_metadata(oid); + auto metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1"); ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2"); ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); - journal::ObjectRecorderPtr object1 = flusher.create_object(oid, 12, &lock1); + auto object1 = flusher.create_object(oid, 12, &lock1); std::string payload(1 << 11, '1'); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, @@ -445,7 +442,7 @@ TEST_F(TestObjectRecorder, Overflow) { ASSERT_TRUE(flusher.wait_for_overflow()); - journal::ObjectRecorderPtr object2 = flusher.create_object(oid, 12, &lock2); + auto object2 = flusher.create_object(oid, 12, &lock2); journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123, payload); diff --git a/src/test/librbd/fsx.cc b/src/test/librbd/fsx.cc index 4318b1ce462..599e2596e1e 100644 --- a/src/test/librbd/fsx.cc +++ b/src/test/librbd/fsx.cc @@ -277,11 +277,6 @@ struct ReplayHandler : public journal::ReplayHandler { on_finish(on_finish) { } - void get() override { - } - void put() override { - } - void handle_entries_available() override { while (true) { journal::ReplayEntry replay_entry; diff --git a/src/test/librbd/journal/test_Entries.cc b/src/test/librbd/journal/test_Entries.cc index a79d2c0cbc8..353886626d7 100644 --- a/src/test/librbd/journal/test_Entries.cc +++ b/src/test/librbd/journal/test_Entries.cc @@ -32,11 +32,6 @@ public: : entries_available(false), complete(false) { } - void get() override { - } - void put() override { - } - void handle_entries_available() override { std::lock_guard locker{lock}; entries_available = true; diff --git a/src/test/mds/TestSessionFilter.cc b/src/test/mds/TestSessionFilter.cc index 150663da0e7..9fa33f4517f 100644 --- a/src/test/mds/TestSessionFilter.cc +++ b/src/test/mds/TestSessionFilter.cc @@ -74,15 +74,13 @@ TEST(MDSSessionFilter, IdEquality) SessionFilter filter; std::stringstream ss; filter.parse({"id=123"}, &ss); - Session *a = new Session(nullptr);; - Session *b = new Session(nullptr);; + auto a = ceph::make_ref<Session>(nullptr);; + auto b = ceph::make_ref<Session>(nullptr);; a->info.inst.name.parse("client.123"); b->info.inst.name.parse("client.456"); ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;})); ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;})); - a->put(); - b->put(); } TEST(MDSSessionFilter, StateEquality) @@ -90,15 +88,13 @@ TEST(MDSSessionFilter, StateEquality) SessionFilter filter; std::stringstream ss; filter.parse({"state=closing"}, &ss); - Session *a = new Session(nullptr); + auto a = ceph::make_ref<Session>(nullptr); a->set_state(Session::STATE_CLOSING); - Session *b = new Session(nullptr); + auto b = ceph::make_ref<Session>(nullptr); b->set_state(Session::STATE_OPENING); ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;})); ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;})); - a->put(); - b->put(); } TEST(MDSSessionFilter, AuthEquality) @@ -106,15 +102,13 @@ TEST(MDSSessionFilter, AuthEquality) SessionFilter filter; std::stringstream ss; filter.parse({"auth_name=rhubarb"}, &ss); - Session *a = new Session(nullptr); + auto a = ceph::make_ref<Session>(nullptr); a->info.auth_name.set_id("rhubarb"); - Session *b = new Session(nullptr); + auto b = ceph::make_ref<Session>(nullptr); b->info.auth_name.set_id("custard"); ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;})); ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;})); - a->put(); - b->put(); } TEST(MDSSessionFilter, MetadataEquality) @@ -124,17 +118,15 @@ TEST(MDSSessionFilter, MetadataEquality) int r = filter.parse({"client_metadata.root=/rhubarb"}, &ss); ASSERT_EQ(r, 0); client_metadata_t meta; - Session *a = new Session(nullptr); + auto a = ceph::make_ref<Session>(nullptr); meta.kv_map = {{"root", "/rhubarb"}}; a->set_client_metadata(meta); - Session *b = new Session(nullptr); + auto b = ceph::make_ref<Session>(nullptr); meta.kv_map = {{"root", "/custard"}}; b->set_client_metadata(meta); ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;})); ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;})); - a->put(); - b->put(); } TEST(MDSSessionFilter, ReconnectingEquality) @@ -143,9 +135,8 @@ TEST(MDSSessionFilter, ReconnectingEquality) std::stringstream ss; int r = filter.parse({"reconnecting=true"}, &ss); ASSERT_EQ(r, 0); - Session *a = new Session(nullptr); + auto a = ceph::make_ref<Session>(nullptr); ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return true;})); ASSERT_FALSE(filter.match(*a, [](client_t c) -> bool {return false;})); - a->put(); } diff --git a/src/test/objectstore/test_bluestore_types.cc b/src/test/objectstore/test_bluestore_types.cc index 25320222619..f0339fe943e 100644 --- a/src/test/objectstore/test_bluestore_types.cc +++ b/src/test/objectstore/test_bluestore_types.cc @@ -341,7 +341,7 @@ TEST(Blob, put_ref) BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); - BlueStore::Collection coll(&store, oc, bc, coll_t()); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Blob b; b.shared_blob = new BlueStore::SharedBlob(nullptr); b.shared_blob->get(); // hack to avoid dtor from running @@ -349,19 +349,19 @@ TEST(Blob, put_ref) b.dirty_blob().allocated_test( bluestore_pextent_t(bluestore_pextent_t::INVALID_OFFSET, 0x8000)); b.dirty_blob().allocated_test(bluestore_pextent_t(0x4071f000, 0x5000)); - b.get_ref(&coll, 0, 0x1200); - b.get_ref(&coll, 0xae00, 0x4200); + b.get_ref(coll.get(), 0, 0x1200); + b.get_ref(coll.get(), 0xae00, 0x4200); ASSERT_EQ(0x5400u, b.get_referenced_bytes()); cout << b << std::endl; PExtentVector r; - ASSERT_FALSE(b.put_ref(&coll, 0, 0x1200, &r)); + ASSERT_FALSE(b.put_ref(coll.get(), 0, 0x1200, &r)); ASSERT_EQ(0x4200u, b.get_referenced_bytes()); cout << " r " << r << std::endl; cout << b << std::endl; r.clear(); - ASSERT_TRUE(b.put_ref(&coll, 0xae00, 0x4200, &r)); + ASSERT_TRUE(b.put_ref(coll.get(), 0xae00, 0x4200, &r)); ASSERT_EQ(0u, b.get_referenced_bytes()); cout << " r " << r << std::endl; cout << b << std::endl; @@ -373,7 +373,7 @@ TEST(Blob, put_ref) g_ceph_context, "lru", NULL); BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); { BlueStore::Blob B; @@ -822,7 +822,7 @@ TEST(Blob, put_ref) BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Blob B; B.shared_blob = new BlueStore::SharedBlob(nullptr); B.shared_blob->get(); // hack to avoid dtor from running @@ -911,7 +911,7 @@ TEST(Blob, split) g_ceph_context, "lru", NULL); BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); { BlueStore::Blob L, R; L.shared_blob = new BlueStore::SharedBlob(coll.get()); @@ -969,7 +969,7 @@ TEST(Blob, legacy_decode) g_ceph_context, "lru", NULL); BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); bufferlist bl, bl2; { BlueStore::Blob B; @@ -1050,7 +1050,7 @@ TEST(ExtentMap, seek_lextent) BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Onode onode(coll.get(), ghobject_t(), ""); BlueStore::ExtentMap em(&onode); BlueStore::BlobRef br(new BlueStore::Blob); @@ -1102,7 +1102,7 @@ TEST(ExtentMap, has_any_lextents) g_ceph_context, "lru", NULL); BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Onode onode(coll.get(), ghobject_t(), ""); BlueStore::ExtentMap em(&onode); BlueStore::BlobRef b(new BlueStore::Blob); @@ -1153,7 +1153,7 @@ TEST(ExtentMap, compress_extent_map) BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create( g_ceph_context, "lru", NULL); -BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Onode onode(coll.get(), ghobject_t(), ""); BlueStore::ExtentMap em(&onode); BlueStore::BlobRef b1(new BlueStore::Blob); @@ -1210,7 +1210,7 @@ TEST(GarbageCollector, BasicTest) g_ceph_context, "lru", NULL); BlueStore store(g_ceph_context, "", 4096); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Onode onode(coll.get(), ghobject_t(), ""); BlueStore::ExtentMap em(&onode); @@ -1295,7 +1295,7 @@ TEST(GarbageCollector, BasicTest) */ { BlueStore store(g_ceph_context, "", 0x10000); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Onode onode(coll.get(), ghobject_t(), ""); BlueStore::ExtentMap em(&onode); @@ -1411,7 +1411,7 @@ TEST(GarbageCollector, BasicTest) */ { BlueStore store(g_ceph_context, "", 0x10000); - BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t())); + auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t()); BlueStore::Onode onode(coll.get(), ghobject_t(), ""); BlueStore::ExtentMap em(&onode); diff --git a/src/tools/rbd/action/Journal.cc b/src/tools/rbd/action/Journal.cc index db506207c4b..5d283420a5e 100644 --- a/src/tools/rbd/action/Journal.cc +++ b/src/tools/rbd/action/Journal.cc @@ -461,9 +461,6 @@ protected: JournalPlayer *journal; explicit ReplayHandler(JournalPlayer *_journal) : journal(_journal) {} - void get() override {} - void put() override {} - void handle_entries_available() override { journal->handle_replay_ready(); } diff --git a/src/tools/rbd_mirror/BaseRequest.h b/src/tools/rbd_mirror/BaseRequest.h index 5053eb830d9..f1ebc0cf7e5 100644 --- a/src/tools/rbd_mirror/BaseRequest.h +++ b/src/tools/rbd_mirror/BaseRequest.h @@ -13,7 +13,7 @@ namespace mirror { class BaseRequest : public RefCountedObject { public: BaseRequest(const std::string& name, CephContext *cct, Context *on_finish) - : RefCountedObject(cct, 1), m_name(name), m_cct(cct), + : RefCountedObject(cct), m_name(name), m_cct(cct), m_on_finish(on_finish) { } diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 1111b3b8438..cc419f58da3 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -62,8 +62,6 @@ template <typename I> struct ReplayHandler : public ::journal::ReplayHandler { ImageReplayer<I> *replayer; ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {} - void get() override {} - void put() override {} void handle_entries_available() override { replayer->handle_replay_ready(); |