summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrick Donnelly <pdonnell@redhat.com>2019-02-06 23:48:57 +0100
committerKefu Chai <tchaikov@gmail.com>2019-09-16 13:53:58 +0200
commit517bdca529db390e7ebc4e596570f80522060485 (patch)
tree3d7c2376b36f61ea8e85e97f6de515ccf6d6d4a1
parentcommon: add make_ref factory for RefCountedObject (diff)
downloadceph-517bdca529db390e7ebc4e596570f80522060485.tar.xz
ceph-517bdca529db390e7ebc4e596570f80522060485.zip
common/RefCountedObj: cleanup con/des
Also, don't allow children to set nref (to 0). This is the more significant change as it required fixing various code to not do this: <reftype> ptr = new RefCountedObjectFoo(..., 0); as a way to create a starting reference with nref==1. This is a pretty bad code smell so I've converted all the code doing this to use the new factory method which produces the reference safely: auto ptr = ceph::make_ref<RefCountedObjectFoo>(...); libradosstriper was particularly egregious in its abuse of setting the starting nref. :( Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
-rw-r--r--src/common/RefCountedObj.h35
-rw-r--r--src/journal/Future.cc18
-rw-r--r--src/journal/Future.h33
-rw-r--r--src/journal/FutureImpl.cc33
-rw-r--r--src/journal/FutureImpl.h55
-rw-r--r--src/journal/JournalMetadata.cc11
-rw-r--r--src/journal/JournalMetadata.h39
-rw-r--r--src/journal/JournalPlayer.cc88
-rw-r--r--src/journal/JournalPlayer.h33
-rw-r--r--src/journal/JournalRecorder.cc69
-rw-r--r--src/journal/JournalRecorder.h20
-rw-r--r--src/journal/JournalTrimmer.cc2
-rw-r--r--src/journal/JournalTrimmer.h4
-rw-r--r--src/journal/Journaler.cc48
-rw-r--r--src/journal/Journaler.h15
-rw-r--r--src/journal/ObjectPlayer.cc11
-rw-r--r--src/journal/ObjectPlayer.h27
-rw-r--r--src/journal/ObjectRecorder.cc43
-rw-r--r--src/journal/ObjectRecorder.h62
-rw-r--r--src/journal/ReplayHandler.h8
-rw-r--r--src/libradosstriper/RadosStriperImpl.cc253
-rw-r--r--src/librbd/DeepCopyRequest.cc2
-rw-r--r--src/librbd/Journal.h7
-rw-r--r--src/librbd/deep_copy/ImageCopyRequest.cc2
-rw-r--r--src/librbd/deep_copy/SnapshotCopyRequest.cc2
-rw-r--r--src/mgr/DaemonServer.cc2
-rw-r--r--src/mgr/DaemonState.h26
-rw-r--r--src/mgr/MgrSession.h10
-rw-r--r--src/mon/Monitor.cc12
-rw-r--r--src/msg/Connection.h45
-rw-r--r--src/msg/async/AsyncConnection.cc3
-rw-r--r--src/msg/async/AsyncConnection.h7
-rw-r--r--src/msg/async/AsyncMessenger.cc6
-rw-r--r--src/os/ObjectStore.h10
-rw-r--r--src/os/bluestore/BlueFS.cc18
-rw-r--r--src/os/bluestore/BlueFS.h35
-rw-r--r--src/os/bluestore/BlueStore.cc13
-rw-r--r--src/os/bluestore/BlueStore.h19
-rw-r--r--src/os/filestore/FileStore.cc4
-rw-r--r--src/os/filestore/FileStore.h4
-rw-r--r--src/os/kstore/KStore.cc6
-rw-r--r--src/os/kstore/KStore.h4
-rw-r--r--src/os/memstore/MemStore.cc12
-rw-r--r--src/os/memstore/MemStore.h15
-rw-r--r--src/osd/OSD.cc64
-rw-r--r--src/osd/OSD.h17
-rw-r--r--src/osd/PG.cc22
-rw-r--r--src/osd/PG.h8
-rw-r--r--src/osd/PeeringState.h11
-rw-r--r--src/osd/PrimaryLogPG.cc6
-rw-r--r--src/osd/ReplicatedBackend.cc11
-rw-r--r--src/osd/ReplicatedBackend.h17
-rw-r--r--src/osd/Session.cc4
-rw-r--r--src/osd/Session.h59
-rw-r--r--src/test/direct_messenger/DirectMessenger.cc10
-rw-r--r--src/test/journal/RadosTestFixture.cc10
-rw-r--r--src/test/journal/RadosTestFixture.h8
-rw-r--r--src/test/journal/test_FutureImpl.cc146
-rw-r--r--src/test/journal/test_JournalMetadata.cc28
-rw-r--r--src/test/journal/test_JournalPlayer.cc49
-rw-r--r--src/test/journal/test_JournalRecorder.cc14
-rw-r--r--src/test/journal/test_JournalTrimmer.cc19
-rw-r--r--src/test/journal/test_ObjectPlayer.cc35
-rw-r--r--src/test/journal/test_ObjectRecorder.cc57
-rw-r--r--src/test/librbd/fsx.cc5
-rw-r--r--src/test/librbd/journal/test_Entries.cc5
-rw-r--r--src/test/mds/TestSessionFilter.cc27
-rw-r--r--src/test/objectstore/test_bluestore_types.cc30
-rw-r--r--src/tools/rbd/action/Journal.cc3
-rw-r--r--src/tools/rbd_mirror/BaseRequest.h2
-rw-r--r--src/tools/rbd_mirror/ImageReplayer.cc2
71 files changed, 861 insertions, 979 deletions
diff --git a/src/common/RefCountedObj.h b/src/common/RefCountedObj.h
index 024d2f4d2c9..5c0473c9598 100644
--- a/src/common/RefCountedObj.h
+++ b/src/common/RefCountedObj.h
@@ -20,7 +20,27 @@
#include <atomic>
-struct RefCountedObject {
+/* This class provides mechanisms to make a sub-class work with
+ * boost::intrusive_ptr (aka ceph::ref_t).
+ *
+ * Generally, you'll want to inherit from RefCountedObjectSafe and not from
+ * RefCountedObject directly. This is because the ::get and ::put methods are
+ * public and can be used to create/delete references outside of the
+ * ceph::ref_t pointers with the potential to leak memory.
+ *
+ * It is also suggested that you make constructors and destructors private in
+ * your final class. This prevents instantiation of the object with assignment
+ * to a raw pointer. Consequently, you'll want to use ceph::make_ref<> to
+ * create a ceph::ref_t<> holding your object:
+ *
+ * auto ptr = ceph::make_ref<Foo>(...);
+ *
+ * Use FRIEND_MAKE_REF(ClassName) to allow ceph::make_ref to call the private
+ * constructors.
+ *
+ */
+
+class RefCountedObject {
public:
void set_cct(class CephContext *c) {
cct = c;
@@ -46,7 +66,7 @@ protected:
RefCountedObject& operator=(const RefCountedObject& o) = delete;
RefCountedObject(RefCountedObject&&) = delete;
RefCountedObject& operator=(RefCountedObject&&) = delete;
- RefCountedObject(class CephContext* c = nullptr, int n = 1) : cct(c), nref(n) {}
+ RefCountedObject(class CephContext* c) : cct(c) {}
virtual ~RefCountedObject();
@@ -62,6 +82,17 @@ private:
class CephContext *cct{nullptr};
};
+class RefCountedObjectSafe : public RefCountedObject {
+public:
+ RefCountedObject *get() = delete;
+ const RefCountedObject *get() const = delete;
+ void put() const = delete;
+protected:
+template<typename... Args>
+ RefCountedObjectSafe(Args&&... args) : RefCountedObject(std::forward<Args>(args)...) {}
+ virtual ~RefCountedObjectSafe() override {}
+};
+
#ifndef WITH_SEASTAR
/**
diff --git a/src/journal/Future.cc b/src/journal/Future.cc
index 89f7fd326e9..0e794d165e8 100644
--- a/src/journal/Future.cc
+++ b/src/journal/Future.cc
@@ -7,6 +7,14 @@
namespace journal {
+Future::Future() = default;
+Future::Future(const Future& o) = default;
+Future& Future::operator=(const Future& o) = default;
+Future::Future(Future&& o) = default;
+Future& Future::operator=(Future&& o) = default;
+Future::Future(ceph::ref_t<FutureImpl> future_impl) : m_future_impl(std::move(future_impl)) {}
+Future::~Future() = default;
+
void Future::flush(Context *on_safe) {
m_future_impl->flush(on_safe);
}
@@ -24,16 +32,8 @@ int Future::get_return_value() const {
return m_future_impl->get_return_value();
}
-void intrusive_ptr_add_ref(FutureImpl *p) {
- p->get();
-}
-
-void intrusive_ptr_release(FutureImpl *p) {
- p->put();
-}
-
std::ostream &operator<<(std::ostream &os, const Future &future) {
- return os << *future.m_future_impl.get();
+ return os << *future.m_future_impl;
}
} // namespace journal
diff --git a/src/journal/Future.h b/src/journal/Future.h
index fef0015651c..ba835b3538d 100644
--- a/src/journal/Future.h
+++ b/src/journal/Future.h
@@ -4,11 +4,12 @@
#ifndef CEPH_JOURNAL_FUTURE_H
#define CEPH_JOURNAL_FUTURE_H
-#include "include/int_types.h"
-#include <string>
#include <iosfwd>
-#include <boost/intrusive_ptr.hpp>
+#include <string>
+
#include "include/ceph_assert.h"
+#include "include/int_types.h"
+#include "common/ref.h"
class Context;
@@ -18,13 +19,16 @@ class FutureImpl;
class Future {
public:
- typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
-
- Future() {}
- Future(const FutureImplPtr &future_impl) : m_future_impl(future_impl) {}
-
- inline bool is_valid() const {
- return m_future_impl.get() != nullptr;
+ Future();
+ Future(const Future&);
+ Future& operator=(const Future&);
+ Future(Future&&);
+ Future& operator=(Future&&);
+ Future(ceph::ref_t<FutureImpl> future_impl);
+ ~Future();
+
+ bool is_valid() const {
+ return bool(m_future_impl);
}
void flush(Context *on_safe);
@@ -37,22 +41,17 @@ private:
friend class Journaler;
friend std::ostream& operator<<(std::ostream&, const Future&);
- inline FutureImplPtr get_future_impl() const {
+ const auto& get_future_impl() const {
return m_future_impl;
}
- FutureImplPtr m_future_impl;
+ ceph::ref_t<FutureImpl> m_future_impl;
};
-void intrusive_ptr_add_ref(FutureImpl *p);
-void intrusive_ptr_release(FutureImpl *p);
-
std::ostream &operator<<(std::ostream &os, const Future &future);
} // namespace journal
-using journal::intrusive_ptr_add_ref;
-using journal::intrusive_ptr_release;
using journal::operator<<;
#endif // CEPH_JOURNAL_FUTURE_H
diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc
index 474c025c608..4e804f8dc65 100644
--- a/src/journal/FutureImpl.cc
+++ b/src/journal/FutureImpl.cc
@@ -8,14 +8,14 @@ namespace journal {
FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid)
- : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
+ : m_tag_tid(tag_tid),
+ m_entry_tid(entry_tid),
m_commit_tid(commit_tid),
- m_safe(false),
- m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
- m_consistent_ack(this) {
+ m_consistent_ack(this)
+{
}
-void FutureImpl::init(const FutureImplPtr &prev_future) {
+void FutureImpl::init(const ceph::ref_t<FutureImpl> &prev_future) {
// chain ourself to the prior future (if any) to that we known when the
// journal is consistent
if (prev_future) {
@@ -30,7 +30,7 @@ void FutureImpl::flush(Context *on_safe) {
bool complete;
FlushHandlers flush_handlers;
- FutureImplPtr prev_future;
+ ceph::ref_t<FutureImpl> prev_future;
{
std::lock_guard locker{m_lock};
complete = (m_safe && m_consistent);
@@ -60,20 +60,21 @@ void FutureImpl::flush(Context *on_safe) {
}
}
-FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
+ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
std::lock_guard locker{m_lock};
return prepare_flush(flush_handlers, m_lock);
}
-FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
+ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
ceph::mutex &lock) {
ceph_assert(ceph_mutex_is_locked(m_lock));
if (m_flush_state == FLUSH_STATE_NONE) {
m_flush_state = FLUSH_STATE_REQUESTED;
- if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) {
- flush_handlers->insert({m_flush_handler, this});
+ auto h = m_flush_handler;
+ if (h) {
+ flush_handlers->try_emplace(std::move(h), this);
}
}
return m_prev_future;
@@ -103,10 +104,10 @@ int FutureImpl::get_return_value() const {
return m_return_value;
}
-bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
+bool FutureImpl::attach(FlushHandler::ref flush_handler) {
std::lock_guard locker{m_lock};
ceph_assert(!m_flush_handler);
- m_flush_handler = flush_handler;
+ m_flush_handler = std::move(flush_handler);
return m_flush_state != FLUSH_STATE_NONE;
}
@@ -163,12 +164,4 @@ std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
return os;
}
-void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) {
- p->get();
-}
-
-void intrusive_ptr_release(FutureImpl::FlushHandler *p) {
- p->put();
-}
-
} // namespace journal
diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h
index b81fba200bc..241a09709ff 100644
--- a/src/journal/FutureImpl.h
+++ b/src/journal/FutureImpl.h
@@ -11,29 +11,21 @@
#include <list>
#include <map>
#include <boost/noncopyable.hpp>
-#include <boost/intrusive_ptr.hpp>
#include "include/ceph_assert.h"
class Context;
namespace journal {
-class FutureImpl;
-typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
-
class FutureImpl : public RefCountedObject, boost::noncopyable {
public:
struct FlushHandler {
- virtual ~FlushHandler() {}
- virtual void flush(const FutureImplPtr &future) = 0;
- virtual void get() = 0;
- virtual void put() = 0;
+ using ref = std::shared_ptr<FlushHandler>;
+ virtual void flush(const ceph::ref_t<FutureImpl> &future) = 0;
+ virtual ~FlushHandler() = default;
};
- typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
-
- FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid);
- void init(const FutureImplPtr &prev_future);
+ void init(const ceph::ref_t<FutureImpl> &prev_future);
inline uint64_t get_tag_tid() const {
return m_tag_tid;
@@ -56,19 +48,17 @@ public:
return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
}
inline void set_flush_in_progress() {
+ auto h = std::move(m_flush_handler);
+ ceph_assert(h);
std::lock_guard locker{m_lock};
- ceph_assert(m_flush_handler);
- m_flush_handler.reset();
m_flush_state = FLUSH_STATE_IN_PROGRESS;
}
- bool attach(const FlushHandlerPtr &flush_handler);
+ bool attach(FlushHandler::ref flush_handler);
inline void detach() {
- std::lock_guard locker{m_lock};
m_flush_handler.reset();
}
- inline FlushHandlerPtr get_flush_handler() const {
- std::lock_guard locker{m_lock};
+ inline FlushHandler::ref get_flush_handler() const {
return m_flush_handler;
}
@@ -77,7 +67,7 @@ public:
private:
friend std::ostream &operator<<(std::ostream &, const FutureImpl &);
- typedef std::map<FlushHandlerPtr, FutureImplPtr> FlushHandlers;
+ typedef std::map<FlushHandler::ref, ceph::ref_t<FutureImpl>> FlushHandlers;
typedef std::list<Context *> Contexts;
enum FlushState {
@@ -87,8 +77,8 @@ private:
};
struct C_ConsistentAck : public Context {
- FutureImplPtr future;
- C_ConsistentAck(FutureImpl *_future) : future(_future) {}
+ ceph::ref_t<FutureImpl> future;
+ C_ConsistentAck(ceph::ref_t<FutureImpl> _future) : future(std::move(_future)) {}
void complete(int r) override {
future->consistent(r);
future.reset();
@@ -96,32 +86,33 @@ private:
void finish(int r) override {}
};
+ FRIEND_MAKE_REF(FutureImpl);
+ FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid);
+ ~FutureImpl() override = default;
+
uint64_t m_tag_tid;
uint64_t m_entry_tid;
uint64_t m_commit_tid;
mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false);
- FutureImplPtr m_prev_future;
- bool m_safe;
- bool m_consistent;
- int m_return_value;
+ ceph::ref_t<FutureImpl> m_prev_future;
+ bool m_safe = false;
+ bool m_consistent = false;
+ int m_return_value = 0;
- FlushHandlerPtr m_flush_handler;
- FlushState m_flush_state;
+ FlushHandler::ref m_flush_handler;
+ FlushState m_flush_state = FLUSH_STATE_NONE;
C_ConsistentAck m_consistent_ack;
Contexts m_contexts;
- FutureImplPtr prepare_flush(FlushHandlers *flush_handlers);
- FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);
+ ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers);
+ ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);
void consistent(int r);
void finish_unlock();
};
-void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p);
-void intrusive_ptr_release(FutureImpl::FlushHandler *p);
-
std::ostream &operator<<(std::ostream &os, const FutureImpl &future);
} // namespace journal
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc
index bf9c21be174..9801fc0a0b8 100644
--- a/src/journal/JournalMetadata.cc
+++ b/src/journal/JournalMetadata.cc
@@ -405,14 +405,11 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
const std::string &oid,
const std::string &client_id,
const Settings &settings)
- : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
- m_client_id(client_id), m_settings(settings), m_order(0),
- m_splay_width(0), m_pool_id(-1), m_initialized(false),
+ : m_oid(oid),
+ m_client_id(client_id), m_settings(settings),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
- m_commit_tid(0), m_watch_ctx(this),
- m_watch_handle(0), m_minimum_set(0), m_active_set(0),
- m_update_notifications(0), m_commit_position_ctx(NULL),
- m_commit_position_task_ctx(NULL) {
+ m_watch_ctx(this)
+{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h
index 13d9fd44ff1..1b6911daecb 100644
--- a/src/journal/JournalMetadata.h
+++ b/src/journal/JournalMetadata.h
@@ -15,7 +15,6 @@
#include "cls/journal/cls_journal_types.h"
#include "journal/JournalMetadataListener.h"
#include "journal/Settings.h"
-#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
#include <functional>
@@ -28,9 +27,6 @@ class SafeTimer;
namespace journal {
-class JournalMetadata;
-typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
-
class JournalMetadata : public RefCountedObject, boost::noncopyable {
public:
typedef std::function<Context*()> CreateContext;
@@ -43,11 +39,6 @@ public:
typedef std::set<Client> RegisteredClients;
typedef std::list<Tag> Tags;
- JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
- librados::IoCtx &ioctx, const std::string &oid,
- const std::string &client_id, const Settings &settings);
- ~JournalMetadata() override;
-
void init(Context *on_init);
void shut_down(Context *on_finish);
@@ -156,6 +147,12 @@ public:
void wait_for_ops();
private:
+ FRIEND_MAKE_REF(JournalMetadata);
+ JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
+ librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, const Settings &settings);
+ ~JournalMetadata() override;
+
typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
typedef std::list<JournalMetadataListener*> Listeners;
typedef std::list<Context*> Contexts;
@@ -299,15 +296,15 @@ private:
};
librados::IoCtx m_ioctx;
- CephContext *m_cct;
+ CephContext *m_cct = nullptr;
std::string m_oid;
std::string m_client_id;
Settings m_settings;
- uint8_t m_order;
- uint8_t m_splay_width;
- int64_t m_pool_id;
- bool m_initialized;
+ uint8_t m_order = 0;
+ uint8_t m_splay_width = 0;
+ int64_t m_pool_id = -1;
+ bool m_initialized = false;
ContextWQ *m_work_queue;
SafeTimer *m_timer;
@@ -315,22 +312,22 @@ private:
mutable ceph::mutex m_lock = ceph::make_mutex("JournalMetadata::m_lock");
- uint64_t m_commit_tid;
+ uint64_t m_commit_tid = 0;
CommitTids m_pending_commit_tids;
Listeners m_listeners;
C_WatchCtx m_watch_ctx;
- uint64_t m_watch_handle;
+ uint64_t m_watch_handle = 0;
- uint64_t m_minimum_set;
- uint64_t m_active_set;
+ uint64_t m_minimum_set = 0;
+ uint64_t m_active_set = 0;
RegisteredClients m_registered_clients;
Client m_client;
AllocatedEntryTids m_allocated_entry_tids;
- size_t m_update_notifications;
+ size_t m_update_notifications = 0;
ceph::condition_variable m_update_cond;
size_t m_ignore_watch_notifies = 0;
@@ -339,8 +336,8 @@ private:
uint64_t m_commit_position_tid = 0;
ObjectSetPosition m_commit_position;
- Context *m_commit_position_ctx;
- Context *m_commit_position_task_ctx;
+ Context *m_commit_position_ctx = nullptr;
+ Context *m_commit_position_task_ctx = nullptr;
size_t m_flush_commits_in_progress = 0;
Contexts m_flush_commit_position_ctxs;
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
index 811508bf03c..4a091247613 100644
--- a/src/journal/JournalPlayer.cc
+++ b/src/journal/JournalPlayer.cc
@@ -20,30 +20,20 @@ namespace {
static const uint64_t MIN_FETCH_BYTES = 32768;
struct C_HandleComplete : public Context {
- ReplayHandler *replay_handler;
+ ReplayHandler* replay_handler;
- explicit C_HandleComplete(ReplayHandler *_replay_handler)
- : replay_handler(_replay_handler) {
- replay_handler->get();
- }
- ~C_HandleComplete() override {
- replay_handler->put();
- }
+ explicit C_HandleComplete(ReplayHandler* r) : replay_handler(std::move(r)) {}
+ ~C_HandleComplete() override {}
void finish(int r) override {
replay_handler->handle_complete(r);
}
};
struct C_HandleEntriesAvailable : public Context {
- ReplayHandler *replay_handler;
+ ReplayHandler* replay_handler;
- explicit C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
- : replay_handler(_replay_handler) {
- replay_handler->get();
- }
- ~C_HandleEntriesAvailable() override {
- replay_handler->put();
- }
+ explicit C_HandleEntriesAvailable(ReplayHandler* r) : replay_handler(std::move(r)) {}
+ ~C_HandleEntriesAvailable() override {}
void finish(int r) override {
replay_handler->handle_entries_available();
}
@@ -52,17 +42,16 @@ struct C_HandleEntriesAvailable : public Context {
} // anonymous namespace
JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
- const std::string &object_oid_prefix,
- const JournalMetadataPtr& journal_metadata,
- ReplayHandler *replay_handler,
+ std::string_view object_oid_prefix,
+ ceph::ref_t<JournalMetadata> journal_metadata,
+ ReplayHandler* replay_handler,
CacheManagerHandler *cache_manager_handler)
- : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
- m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
+ : m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(std::move(journal_metadata)),
+ m_replay_handler(std::move(replay_handler)),
m_cache_manager_handler(cache_manager_handler),
- m_cache_rebalance_handler(this),
- m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false),
- m_watch_scheduled(false), m_watch_interval(0) {
- m_replay_handler->get();
+ m_cache_rebalance_handler(this)
+{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
@@ -109,7 +98,6 @@ JournalPlayer::~JournalPlayer() {
ceph_assert(m_fetch_object_numbers.empty());
ceph_assert(!m_watch_scheduled);
}
- m_replay_handler->put();
if (m_cache_manager_handler != nullptr) {
m_cache_manager_handler->unregister_cache(m_cache_name);
@@ -186,7 +174,7 @@ void JournalPlayer::shut_down(Context *on_finish) {
m_journal_metadata, on_finish);
if (m_watch_scheduled) {
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_FIRST:
object_player = m_object_players.begin()->second;
@@ -220,7 +208,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
return false;
}
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
ceph_assert(object_player && !object_player->empty());
object_player->front(entry);
@@ -294,7 +282,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
bool prefetch_complete = false;
ceph_assert(m_object_players.count(splay_offset) == 1);
- ObjectPlayerPtr object_player = m_object_players[splay_offset];
+ auto object_player = m_object_players[splay_offset];
// prefetch in-order since a newer splay object could prefetch first
if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) {
@@ -415,7 +403,7 @@ bool JournalPlayer::verify_playback_ready() {
return false;
}
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
ceph_assert(object_player);
uint64_t object_num = object_player->get_object_number();
@@ -520,8 +508,8 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) {
}
bool pruned = false;
- for (auto &player_pair : m_object_players) {
- ObjectPlayerPtr object_player(player_pair.second);
+ for (const auto &player_pair : m_object_players) {
+ auto& object_player = player_pair.second;
ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid()
<< dendl;
while (!object_player->empty()) {
@@ -541,14 +529,14 @@ void JournalPlayer::prune_tag(uint64_t tag_tid) {
if (pruned) {
ldout(m_cct, 15) << __func__ << ": resetting refetch state to immediate"
<< dendl;
- for (auto &player_pair : m_object_players) {
- ObjectPlayerPtr object_player(player_pair.second);
+ for (const auto &player_pair : m_object_players) {
+ auto& object_player = player_pair.second;
object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
}
}
// trim empty player to prefetch the next available object
- for (auto &player_pair : m_object_players) {
+ for (const auto &player_pair : m_object_players) {
remove_empty_object_player(player_pair.second);
}
}
@@ -567,7 +555,7 @@ void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
prune_tag(active_tag_tid);
}
-ObjectPlayerPtr JournalPlayer::get_object_player() const {
+ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player() const {
ceph_assert(ceph_mutex_is_locked(m_lock));
SplayedObjectPlayers::const_iterator it = m_object_players.find(
@@ -576,7 +564,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player() const {
return it->second;
}
-ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
+ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player(uint64_t object_number) const {
ceph_assert(ceph_mutex_is_locked(m_lock));
uint8_t splay_width = m_journal_metadata->get_splay_width();
@@ -584,7 +572,7 @@ ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
auto splay_it = m_object_players.find(splay_offset);
ceph_assert(splay_it != m_object_players.end());
- ObjectPlayerPtr object_player = splay_it->second;
+ auto object_player = splay_it->second;
ceph_assert(object_player->get_object_number() == object_number);
return object_player;
}
@@ -598,7 +586,7 @@ void JournalPlayer::advance_splay_object() {
<< static_cast<uint32_t>(m_splay_offset) << dendl;
}
-bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
+bool JournalPlayer::remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &player) {
ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(!m_watch_scheduled);
@@ -615,7 +603,7 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
ldout(m_cct, 20) << __func__ << ": new active set detected, all players "
<< "require refetch" << dendl;
m_active_set = active_set;
- for (auto &pair : m_object_players) {
+ for (const auto& pair : m_object_players) {
pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
}
return false;
@@ -635,17 +623,17 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
void JournalPlayer::fetch(uint64_t object_num) {
ceph_assert(ceph_mutex_is_locked(m_lock));
- ObjectPlayerPtr object_player(new ObjectPlayer(
+ auto object_player = ceph::make_ref<ObjectPlayer>(
m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
- m_max_fetch_bytes));
+ m_max_fetch_bytes);
auto splay_width = m_journal_metadata->get_splay_width();
m_object_players[object_num % splay_width] = object_player;
fetch(object_player);
}
-void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
+void JournalPlayer::fetch(const ceph::ref_t<ObjectPlayer> &object_player) {
ceph_assert(ceph_mutex_is_locked(m_lock));
uint64_t object_num = object_player->get_object_number();
@@ -673,7 +661,7 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
}
if (r == 0) {
- ObjectPlayerPtr object_player = get_object_player(object_num);
+ auto object_player = get_object_player(object_num);
remove_empty_object_player(object_player);
}
process_state(object_num, r);
@@ -690,7 +678,7 @@ void JournalPlayer::refetch(bool immediate) {
return;
}
- ObjectPlayerPtr object_player = get_object_player();
+ auto object_player = get_object_player();
if (object_player->refetch_required()) {
object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
fetch(object_player);
@@ -723,7 +711,7 @@ void JournalPlayer::schedule_watch(bool immediate) {
return;
}
- ObjectPlayerPtr object_player;
+ ceph::ref_t<ObjectPlayer> object_player;
double watch_interval = m_watch_interval;
switch (m_watch_step) {
@@ -772,7 +760,7 @@ void JournalPlayer::handle_watch(uint64_t object_num, int r) {
return;
}
- ObjectPlayerPtr object_player = get_object_player(object_num);
+ auto object_player = get_object_player(object_num);
if (r == 0 && object_player->empty()) {
// possibly need to prune this empty object player if we've
// already fetched it after the active set was advanced with no
@@ -824,8 +812,7 @@ void JournalPlayer::notify_entries_available() {
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->queue(new C_HandleEntriesAvailable(
- m_replay_handler), 0);
+ m_journal_metadata->queue(new C_HandleEntriesAvailable(m_replay_handler), 0);
}
void JournalPlayer::notify_complete(int r) {
@@ -833,8 +820,7 @@ void JournalPlayer::notify_complete(int r) {
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
- m_journal_metadata->queue(new C_HandleComplete(
- m_replay_handler), r);
+ m_journal_metadata->queue(new C_HandleComplete(m_replay_handler), r);
}
void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) {
diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h
index 21f215410f7..f2ab14d7b43 100644
--- a/src/journal/JournalPlayer.h
+++ b/src/journal/JournalPlayer.h
@@ -30,9 +30,10 @@ public:
typedef cls::journal::ObjectPositions ObjectPositions;
typedef cls::journal::ObjectSetPosition ObjectSetPosition;
- JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
- const JournalMetadataPtr& journal_metadata,
- ReplayHandler *replay_handler, CacheManagerHandler *cache_manager_handler);
+ JournalPlayer(librados::IoCtx &ioctx, std::string_view object_oid_prefix,
+ ceph::ref_t<JournalMetadata> journal_metadata,
+ ReplayHandler* replay_handler,
+ CacheManagerHandler *cache_manager_handler);
~JournalPlayer();
void prefetch();
@@ -43,7 +44,7 @@ public:
private:
typedef std::set<uint8_t> PrefetchSplayOffsets;
- typedef std::map<uint8_t, ObjectPlayerPtr> SplayedObjectPlayers;
+ typedef std::map<uint8_t, ceph::ref_t<ObjectPlayer>> SplayedObjectPlayers;
typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions;
typedef std::set<uint64_t> ObjectNumbers;
@@ -103,10 +104,10 @@ private:
};
librados::IoCtx m_ioctx;
- CephContext *m_cct;
+ CephContext *m_cct = nullptr;
std::string m_object_oid_prefix;
- JournalMetadataPtr m_journal_metadata;
- ReplayHandler *m_replay_handler;
+ ceph::ref_t<JournalMetadata> m_journal_metadata;
+ ReplayHandler* m_replay_handler;
CacheManagerHandler *m_cache_manager_handler;
std::string m_cache_name;
@@ -116,12 +117,12 @@ private:
AsyncOpTracker m_async_op_tracker;
mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock");
- State m_state;
- uint8_t m_splay_offset;
+ State m_state = STATE_INIT;
+ uint8_t m_splay_offset = 0;
- bool m_watch_enabled;
- bool m_watch_scheduled;
- double m_watch_interval;
+ bool m_watch_enabled = false;
+ bool m_watch_scheduled = false;
+ double m_watch_interval = 0;
WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
bool m_watch_prune_active_tag = false;
@@ -148,16 +149,16 @@ private:
void prune_tag(uint64_t tag_tid);
void prune_active_tag(const boost::optional<uint64_t>& tag_tid);
- ObjectPlayerPtr get_object_player() const;
- ObjectPlayerPtr get_object_player(uint64_t object_number) const;
- bool remove_empty_object_player(const ObjectPlayerPtr &object_player);
+ ceph::ref_t<ObjectPlayer> get_object_player() const;
+ ceph::ref_t<ObjectPlayer> get_object_player(uint64_t object_number) const;
+ bool remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &object_player);
void process_state(uint64_t object_number, int r);
int process_prefetch(uint64_t object_number);
int process_playback(uint64_t object_number);
void fetch(uint64_t object_num);
- void fetch(const ObjectPlayerPtr &object_player);
+ void fetch(const ceph::ref_t<ObjectPlayer> &object_player);
void handle_fetched(uint64_t object_num, int r);
void refetch(bool immediate);
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc
index 9629d9f735f..402c903fe81 100644
--- a/src/journal/JournalRecorder.cc
+++ b/src/journal/JournalRecorder.cc
@@ -20,15 +20,16 @@ namespace journal {
namespace {
struct C_Flush : public Context {
- JournalMetadataPtr journal_metadata;
+ ceph::ref_t<JournalMetadata> journal_metadata;
Context *on_finish;
- std::atomic<int64_t> pending_flushes = { 0 };
- int ret_val;
+ std::atomic<int64_t> pending_flushes{0};
+ int ret_val = 0;
- C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
+ C_Flush(ceph::ref_t<JournalMetadata> _journal_metadata, Context *_on_finish,
size_t _pending_flushes)
- : journal_metadata(_journal_metadata), on_finish(_on_finish),
- pending_flushes(_pending_flushes), ret_val(0) {
+ : journal_metadata(std::move(_journal_metadata)),
+ on_finish(_on_finish),
+ pending_flushes(_pending_flushes) {
}
void complete(int r) override {
@@ -48,22 +49,21 @@ struct C_Flush : public Context {
} // anonymous namespace
JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
- const std::string &object_oid_prefix,
- const JournalMetadataPtr& journal_metadata,
+ std::string_view object_oid_prefix,
+ ceph::ref_t<JournalMetadata> journal_metadata,
uint64_t max_in_flight_appends)
- : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
- m_journal_metadata(journal_metadata),
- m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
+ : m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(std::move(journal_metadata)),
+ m_max_in_flight_appends(max_in_flight_appends),
+ m_listener(this),
m_object_handler(this),
- m_lock(ceph::make_mutex("JournalerRecorder::m_lock")),
m_current_set(m_journal_metadata->get_active_set()),
m_object_locks{ceph::make_lock_container<ceph::mutex>(
- journal_metadata->get_splay_width(), [](const size_t splay_offset) {
+ m_journal_metadata->get_splay_width(), [](const size_t splay_offset) {
return ceph::make_mutex("ObjectRecorder::m_lock::" +
std::to_string(splay_offset));
})}
{
-
std::lock_guard locker{m_lock};
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
@@ -141,10 +141,10 @@ Future JournalRecorder::append(uint64_t tag_tid,
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = entry_tid % splay_width;
- ObjectRecorderPtr object_ptr = get_object(splay_offset);
+ auto object_ptr = get_object(splay_offset);
uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
object_ptr->get_object_number(), tag_tid, entry_tid);
- FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid));
+ auto future = ceph::make_ref<FutureImpl>(tag_tid, entry_tid, commit_tid);
future->init(m_prev_future);
m_prev_future = future;
@@ -176,9 +176,8 @@ void JournalRecorder::flush(Context *on_safe) {
std::lock_guard locker{m_lock};
ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
- for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
- it != m_object_ptrs.end(); ++it) {
- it->second->flush(ctx);
+ for (const auto& p : m_object_ptrs) {
+ p.second->flush(ctx);
}
}
@@ -187,11 +186,11 @@ void JournalRecorder::flush(Context *on_safe) {
ctx->complete(0);
}
-ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
+ceph::ref_t<ObjectRecorder> JournalRecorder::get_object(uint8_t splay_offset) {
ceph_assert(ceph_mutex_is_locked(m_lock));
- ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
- ceph_assert(object_recoder != NULL);
+ const auto& object_recoder = m_object_ptrs.at(splay_offset);
+ ceph_assert(object_recoder);
return object_recoder;
}
@@ -261,9 +260,8 @@ void JournalRecorder::open_object_set() {
uint8_t splay_width = m_journal_metadata->get_splay_width();
lock_object_recorders();
- for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
- it != m_object_ptrs.end(); ++it) {
- ObjectRecorderPtr object_recorder = it->second;
+ for (const auto& p : m_object_ptrs) {
+ const auto& object_recorder = p.second;
uint64_t object_number = object_recorder->get_object_number();
if (object_number / splay_width != m_current_set) {
ceph_assert(object_recorder->is_closed());
@@ -283,9 +281,8 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
// closing the object to ensure correct order of future appends
uint8_t splay_width = m_journal_metadata->get_splay_width();
lock_object_recorders();
- for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
- it != m_object_ptrs.end(); ++it) {
- ObjectRecorderPtr object_recorder = it->second;
+ for (const auto& p : m_object_ptrs) {
+ const auto& object_recorder = p.second;
if (object_recorder->get_object_number() / splay_width != active_set) {
ldout(m_cct, 10) << "closing object " << object_recorder->get_oid()
<< dendl;
@@ -302,21 +299,21 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
return (m_in_flight_object_closes == 0);
}
-ObjectRecorderPtr JournalRecorder::create_object_recorder(
+ceph::ref_t<ObjectRecorder> JournalRecorder::create_object_recorder(
uint64_t object_number, ceph::mutex* lock) {
ldout(m_cct, 10) << "object_number=" << object_number << dendl;
- ObjectRecorderPtr object_recorder(new ObjectRecorder(
+ auto object_recorder = ceph::make_ref<ObjectRecorder>(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
object_number, lock, m_journal_metadata->get_work_queue(),
&m_object_handler, m_journal_metadata->get_order(),
- m_max_in_flight_appends));
+ m_max_in_flight_appends);
object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
m_flush_age);
return object_recorder;
}
void JournalRecorder::create_next_object_recorder(
- ObjectRecorderPtr object_recorder) {
+ ceph::ref_t<ObjectRecorder> object_recorder) {
ceph_assert(ceph_mutex_is_locked(m_lock));
uint64_t object_number = object_recorder->get_object_number();
@@ -326,7 +323,7 @@ void JournalRecorder::create_next_object_recorder(
ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset]));
- ObjectRecorderPtr new_object_recorder = create_object_recorder(
+ auto new_object_recorder = create_object_recorder(
(m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]);
ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
@@ -342,7 +339,7 @@ void JournalRecorder::create_next_object_recorder(
}
new_object_recorder->append(std::move(append_buffers));
- m_object_ptrs[splay_offset] = new_object_recorder;
+ m_object_ptrs[splay_offset] = std::move(new_object_recorder);
}
void JournalRecorder::handle_update() {
@@ -373,7 +370,7 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
- ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+ auto& active_object_recorder = m_object_ptrs.at(splay_offset);
ceph_assert(active_object_recorder->get_object_number() == object_number);
ceph_assert(m_in_flight_object_closes > 0);
@@ -401,7 +398,7 @@ void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
- ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+ auto& active_object_recorder = m_object_ptrs.at(splay_offset);
ceph_assert(active_object_recorder->get_object_number() == object_number);
ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h
index 7395283325e..3bd036fb5f7 100644
--- a/src/journal/JournalRecorder.h
+++ b/src/journal/JournalRecorder.h
@@ -22,8 +22,8 @@ namespace journal {
class JournalRecorder {
public:
- JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
- const JournalMetadataPtr &journal_metadata,
+ JournalRecorder(librados::IoCtx &ioctx, std::string_view object_oid_prefix,
+ ceph::ref_t<JournalMetadata> journal_metadata,
uint64_t max_in_flight_appends);
~JournalRecorder();
@@ -35,10 +35,10 @@ public:
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
- ObjectRecorderPtr get_object(uint8_t splay_offset);
+ ceph::ref_t<ObjectRecorder> get_object(uint8_t splay_offset);
private:
- typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
+ typedef std::map<uint8_t, ceph::ref_t<ObjectRecorder>> ObjectRecorderPtrs;
struct Listener : public JournalMetadataListener {
JournalRecorder *journal_recorder;
@@ -78,10 +78,10 @@ private:
};
librados::IoCtx m_ioctx;
- CephContext *m_cct;
+ CephContext *m_cct = nullptr;
std::string m_object_oid_prefix;
- JournalMetadataPtr m_journal_metadata;
+ ceph::ref_t<JournalMetadata> m_journal_metadata;
uint32_t m_flush_interval = 0;
uint64_t m_flush_bytes = 0;
@@ -91,7 +91,7 @@ private:
Listener m_listener;
ObjectHandler m_object_handler;
- ceph::mutex m_lock;
+ ceph::mutex m_lock = ceph::make_mutex("JournalerRecorder::m_lock");
uint32_t m_in_flight_advance_sets = 0;
uint32_t m_in_flight_object_closes = 0;
@@ -99,7 +99,7 @@ private:
ObjectRecorderPtrs m_object_ptrs;
ceph::containers::tiny_vector<ceph::mutex> m_object_locks;
- FutureImplPtr m_prev_future;
+ ceph::ref_t<FutureImpl> m_prev_future;
Context *m_on_object_set_advanced = nullptr;
@@ -111,9 +111,9 @@ private:
void close_and_advance_object_set(uint64_t object_set);
- ObjectRecorderPtr create_object_recorder(uint64_t object_number,
+ ceph::ref_t<ObjectRecorder> create_object_recorder(uint64_t object_number,
ceph::mutex* lock);
- void create_next_object_recorder(ObjectRecorderPtr object_recorder);
+ void create_next_object_recorder(ceph::ref_t<ObjectRecorder> object_recorder);
void handle_update();
diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc
index d091243b388..3f05d40e6b5 100644
--- a/src/journal/JournalTrimmer.cc
+++ b/src/journal/JournalTrimmer.cc
@@ -31,7 +31,7 @@ struct JournalTrimmer::C_RemoveSet : public Context {
JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
- const JournalMetadataPtr &journal_metadata)
+ const ceph::ref_t<JournalMetadata>& journal_metadata)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_metadata_listener(this),
m_remove_set_pending(false),
diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h
index 719be88e77b..9c74961c925 100644
--- a/src/journal/JournalTrimmer.h
+++ b/src/journal/JournalTrimmer.h
@@ -21,7 +21,7 @@ public:
typedef cls::journal::ObjectSetPosition ObjectSetPosition;
JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
- const JournalMetadataPtr &journal_metadata);
+ const ceph::ref_t<JournalMetadata> &journal_metadata);
~JournalTrimmer();
void shut_down(Context *on_finish);
@@ -64,7 +64,7 @@ private:
CephContext *m_cct;
std::string m_object_oid_prefix;
- JournalMetadataPtr m_journal_metadata;
+ ceph::ref_t<JournalMetadata> m_journal_metadata;
MetadataListener m_metadata_listener;
AsyncOpTracker m_async_op_tracker;
diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc
index 6190674ade7..b5aa8cf23fb 100644
--- a/src/journal/Journaler.cc
+++ b/src/journal/Journaler.cc
@@ -100,10 +100,9 @@ void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
m_header_oid = header_oid(journal_id);
m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id);
- m_metadata = new JournalMetadata(work_queue, timer, timer_lock,
+ m_metadata = ceph::make_ref<JournalMetadata>(work_queue, timer, timer_lock,
m_header_ioctx, m_header_oid, m_client_id,
settings);
- m_metadata->get();
}
Journaler::~Journaler() {
@@ -114,8 +113,7 @@ Journaler::~Journaler() {
// since we wouldn't expect shut_down to be invoked
m_metadata->wait_for_ops();
}
- m_metadata->put();
- m_metadata = nullptr;
+ m_metadata.reset();
}
ceph_assert(m_trimmer == nullptr);
ceph_assert(m_player == nullptr);
@@ -175,13 +173,10 @@ void Journaler::shut_down(Context *on_finish) {
ceph_assert(m_player == nullptr);
ceph_assert(m_recorder == nullptr);
- JournalMetadata *metadata = nullptr;
- ceph_assert(m_metadata != nullptr);
- std::swap(metadata, m_metadata);
- ceph_assert(metadata != nullptr);
+ auto metadata = std::move(m_metadata);
+ ceph_assert(metadata);
on_finish = new LambdaContext([metadata, on_finish](int r) {
- metadata->put();
on_finish->complete(0);
});
@@ -336,12 +331,12 @@ void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class,
m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish);
}
-void Journaler::start_replay(ReplayHandler *replay_handler) {
+void Journaler::start_replay(ReplayHandler* replay_handler) {
create_player(replay_handler);
m_player->prefetch();
}
-void Journaler::start_live_replay(ReplayHandler *replay_handler,
+void Journaler::start_live_replay(ReplayHandler* replay_handler,
double interval) {
create_player(replay_handler);
m_player->prefetch_and_watch(interval);
@@ -371,17 +366,14 @@ void Journaler::stop_replay() {
}
void Journaler::stop_replay(Context *on_finish) {
- JournalPlayer *player = nullptr;
- ceph_assert(m_player != nullptr);
- std::swap(player, m_player);
- ceph_assert(player != nullptr);
+ auto player = std::move(m_player);
+ auto* playerp = player.get();
- auto f = [player, on_finish](int r) {
- delete player;
+ auto f = [player=std::move(player), on_finish](int r) {
on_finish->complete(r);
};
on_finish = new LambdaContext(std::move(f));
- player->shut_down(on_finish);
+ playerp->shut_down(on_finish);
}
void Journaler::committed(const ReplayEntry &replay_entry) {
@@ -389,7 +381,7 @@ void Journaler::committed(const ReplayEntry &replay_entry) {
}
void Journaler::committed(const Future &future) {
- FutureImplPtr future_impl = future.get_future_impl();
+ auto& future_impl = future.get_future_impl();
m_trimmer->committed(future_impl->get_commit_tid());
}
@@ -398,7 +390,7 @@ void Journaler::start_append(uint64_t max_in_flight_appends) {
// TODO verify active object set >= current replay object set
- m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
+ m_recorder = std::make_unique<JournalRecorder>(m_data_ioctx, m_object_oid_prefix,
m_metadata, max_in_flight_appends);
}
@@ -410,16 +402,14 @@ void Journaler::set_append_batch_options(int flush_interval,
}
void Journaler::stop_append(Context *on_safe) {
- JournalRecorder *recorder = nullptr;
- ceph_assert(m_recorder != nullptr);
- std::swap(recorder, m_recorder);
- ceph_assert(recorder != nullptr);
+ auto recorder = std::move(m_recorder);
+ ceph_assert(recorder);
- on_safe = new LambdaContext([recorder, on_safe](int r) {
- delete recorder;
+ auto* recorderp = recorder.get();
+ on_safe = new LambdaContext([recorder=std::move(recorder), on_safe](int r) {
on_safe->complete(r);
});
- recorder->shut_down(on_safe);
+ recorderp->shut_down(on_safe);
}
uint64_t Journaler::get_max_append_size() const {
@@ -440,9 +430,9 @@ void Journaler::flush_append(Context *on_safe) {
m_recorder->flush(on_safe);
}
-void Journaler::create_player(ReplayHandler *replay_handler) {
+void Journaler::create_player(ReplayHandler* replay_handler) {
ceph_assert(m_player == nullptr);
- m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
+ m_player = std::make_unique<JournalPlayer>(m_data_ioctx, m_object_oid_prefix, m_metadata,
replay_handler, m_cache_manager_handler);
}
diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h
index 17397d7ec2a..fe44401848a 100644
--- a/src/journal/Journaler.h
+++ b/src/journal/Journaler.h
@@ -24,9 +24,6 @@ namespace journal {
struct CacheManagerHandler;
-class JournalMetadata;
-class JournalPlayer;
-class JournalRecorder;
class JournalTrimmer;
class ReplayEntry;
class ReplayHandler;
@@ -103,8 +100,8 @@ public:
void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, Tags *tags,
Context *on_finish);
- void start_replay(ReplayHandler *replay_handler);
- void start_live_replay(ReplayHandler *replay_handler, double interval);
+ void start_replay(ReplayHandler* replay_handler);
+ void start_live_replay(ReplayHandler* replay_handler, double interval);
bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr);
void stop_replay();
void stop_replay(Context *on_finish);
@@ -149,9 +146,9 @@ private:
std::string m_object_oid_prefix;
bool m_initialized = false;
- JournalMetadata *m_metadata = nullptr;
- JournalPlayer *m_player = nullptr;
- JournalRecorder *m_recorder = nullptr;
+ ceph::ref_t<class JournalMetadata> m_metadata;
+ std::unique_ptr<class JournalPlayer> m_player;
+ std::unique_ptr<class JournalRecorder> m_recorder;
JournalTrimmer *m_trimmer = nullptr;
void set_up(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
@@ -159,7 +156,7 @@ private:
const Settings &settings);
int init_complete();
- void create_player(ReplayHandler *replay_handler);
+ void create_player(ReplayHandler* replay_handler);
friend std::ostream &operator<<(std::ostream &os,
const Journaler &journaler);
diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc
index 17bb8574a56..939b294b618 100644
--- a/src/journal/ObjectPlayer.cc
+++ b/src/journal/ObjectPlayer.cc
@@ -43,17 +43,16 @@ bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
} // anonymous namespace
ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
- const std::string &object_oid_prefix,
+ const std::string& object_oid_prefix,
uint64_t object_num, SafeTimer &timer,
ceph::mutex &timer_lock, uint8_t order,
uint64_t max_fetch_bytes)
- : RefCountedObject(NULL, 0), m_object_num(object_num),
+ : m_object_num(object_num),
m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
- m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+ m_timer(timer), m_timer_lock(timer_lock), m_order(order),
m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
- m_watch_interval(0), m_watch_task(NULL),
- m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this))),
- m_fetch_in_progress(false) {
+ m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this)))
+{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h
index 568d31b0635..41641dd150a 100644
--- a/src/journal/ObjectPlayer.h
+++ b/src/journal/ObjectPlayer.h
@@ -12,7 +12,6 @@
#include "journal/Entry.h"
#include <list>
#include <string>
-#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <boost/unordered_map.hpp>
#include "include/ceph_assert.h"
@@ -21,9 +20,6 @@ class SafeTimer;
namespace journal {
-class ObjectPlayer;
-typedef boost::intrusive_ptr<ObjectPlayer> ObjectPlayerPtr;
-
class ObjectPlayer : public RefCountedObject {
public:
typedef std::list<Entry> Entries;
@@ -35,11 +31,6 @@ public:
REFETCH_STATE_IMMEDIATE
};
- ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
- uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock,
- uint8_t order, uint64_t max_fetch_bytes);
- ~ObjectPlayer() override;
-
inline const std::string &get_oid() const {
return m_oid;
}
@@ -83,11 +74,17 @@ public:
}
private:
+ FRIEND_MAKE_REF(ObjectPlayer);
+ ObjectPlayer(librados::IoCtx &ioctx, const std::string& object_oid_prefix,
+ uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock,
+ uint8_t order, uint64_t max_fetch_bytes);
+ ~ObjectPlayer() override;
+
typedef std::pair<uint64_t, uint64_t> EntryKey;
typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
struct C_Fetch : public Context {
- ObjectPlayerPtr object_player;
+ ceph::ref_t<ObjectPlayer> object_player;
Context *on_finish;
bufferlist read_bl;
C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) {
@@ -95,7 +92,7 @@ private:
void finish(int r) override;
};
struct C_WatchFetch : public Context {
- ObjectPlayerPtr object_player;
+ ceph::ref_t<ObjectPlayer> object_player;
C_WatchFetch(ObjectPlayer *o) : object_player(o) {
}
void finish(int r) override;
@@ -104,7 +101,7 @@ private:
librados::IoCtx m_ioctx;
uint64_t m_object_num;
std::string m_oid;
- CephContext *m_cct;
+ CephContext *m_cct = nullptr;
SafeTimer &m_timer;
ceph::mutex &m_timer_lock;
@@ -112,11 +109,11 @@ private:
uint8_t m_order;
uint64_t m_max_fetch_bytes;
- double m_watch_interval;
- Context *m_watch_task;
+ double m_watch_interval = 0;
+ Context *m_watch_task = nullptr;
mutable ceph::mutex m_lock;
- bool m_fetch_in_progress;
+ bool m_fetch_in_progress = false;
bufferlist m_read_bl;
uint32_t m_read_off = 0;
uint32_t m_read_bl_off = 0;
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index 9f1f37eed1c..1823d7a8884 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -19,16 +19,16 @@ using std::shared_ptr;
namespace journal {
-ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
+ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid,
uint64_t object_number, ceph::mutex* lock,
ContextWQ *work_queue, Handler *handler,
uint8_t order, int32_t max_in_flight_appends)
- : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
- m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
+ : m_oid(oid), m_object_number(object_number),
+ m_op_work_queue(work_queue), m_handler(handler),
m_order(order), m_soft_max_size(1 << m_order),
- m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
- m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
- m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
+ m_max_in_flight_appends(max_in_flight_appends),
+ m_lock(lock), m_last_flush_time(ceph_clock_now())
+{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
ceph_assert(m_handler != NULL);
@@ -70,11 +70,12 @@ bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
ceph_assert(ceph_mutex_is_locked(*m_lock));
- FutureImplPtr last_flushed_future;
+ ceph::ref_t<FutureImpl> last_flushed_future;
+ auto flush_handler = get_flush_handler();
for (auto& append_buffer : append_buffers) {
ldout(m_cct, 20) << *append_buffer.first << ", "
<< "size=" << append_buffer.second.length() << dendl;
- bool flush_requested = append_buffer.first->attach(&m_flush_handler);
+ bool flush_requested = append_buffer.first->attach(flush_handler);
if (flush_requested) {
last_flushed_future = append_buffer.first;
}
@@ -121,19 +122,23 @@ void ObjectRecorder::flush(Context *on_safe) {
}
}
-void ObjectRecorder::flush(const FutureImplPtr &future) {
+void ObjectRecorder::flush(const ceph::ref_t<FutureImpl>& future) {
ldout(m_cct, 20) << "flushing " << *future << dendl;
m_lock->lock();
- if (future->get_flush_handler().get() != &m_flush_handler) {
- // if we don't own this future, re-issue the flush so that it hits the
- // correct journal object owner
- future->flush();
- m_lock->unlock();
- return;
- } else if (future->is_flush_in_progress()) {
- m_lock->unlock();
- return;
+ {
+ auto flush_handler = future->get_flush_handler();
+ auto my_handler = get_flush_handler();
+ if (flush_handler != my_handler) {
+ // if we don't own this future, re-issue the flush so that it hits the
+ // correct journal object owner
+ future->flush();
+ m_lock->unlock();
+ return;
+ } else if (future->is_flush_in_progress()) {
+ m_lock->unlock();
+ return;
+ }
}
bool overflowed = send_appends(true, future);
@@ -258,7 +263,7 @@ void ObjectRecorder::append_overflowed() {
restart_append_buffers.swap(m_pending_buffers);
}
-bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
+bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_future) {
ldout(m_cct, 20) << dendl;
ceph_assert(ceph_mutex_is_locked(*m_lock));
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
index 30dde76c92b..8b4e0a20dc3 100644
--- a/src/journal/ObjectRecorder.h
+++ b/src/journal/ObjectRecorder.h
@@ -14,7 +14,6 @@
#include <list>
#include <map>
#include <set>
-#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include "include/ceph_assert.h"
@@ -23,9 +22,8 @@ class SafeTimer;
namespace journal {
class ObjectRecorder;
-typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
-typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
+typedef std::pair<ceph::ref_t<FutureImpl>, bufferlist> AppendBuffer;
typedef std::list<AppendBuffer> AppendBuffers;
class ObjectRecorder : public RefCountedObject, boost::noncopyable {
@@ -37,12 +35,6 @@ public:
virtual void overflow(ObjectRecorder *object_recorder) = 0;
};
- ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
- uint64_t object_number, ceph::mutex* lock,
- ContextWQ *work_queue, Handler *handler, uint8_t order,
- int32_t max_in_flight_appends);
- ~ObjectRecorder() override;
-
void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
double flush_age);
@@ -55,7 +47,7 @@ public:
bool append(AppendBuffers &&append_buffers);
void flush(Context *on_safe);
- void flush(const FutureImplPtr &future);
+ void flush(const ceph::ref_t<FutureImpl> &future);
void claim_append_buffers(AppendBuffers *append_buffers);
@@ -75,39 +67,38 @@ public:
}
private:
+ FRIEND_MAKE_REF(ObjectRecorder);
+ ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid,
+ uint64_t object_number, ceph::mutex* lock,
+ ContextWQ *work_queue, Handler *handler, uint8_t order,
+ int32_t max_in_flight_appends);
+ ~ObjectRecorder() override;
+
typedef std::set<uint64_t> InFlightTids;
typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
struct FlushHandler : public FutureImpl::FlushHandler {
- ObjectRecorder *object_recorder;
- FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
- void get() override {
- object_recorder->get();
- }
- void put() override {
- object_recorder->put();
- }
- void flush(const FutureImplPtr &future) override {
+ ceph::ref_t<ObjectRecorder> object_recorder;
+ virtual void flush(const ceph::ref_t<FutureImpl> &future) override {
object_recorder->flush(future);
}
+ FlushHandler(ceph::ref_t<ObjectRecorder> o) : object_recorder(std::move(o)) {}
};
struct C_AppendFlush : public Context {
- ObjectRecorder *object_recorder;
+ ceph::ref_t<ObjectRecorder> object_recorder;
uint64_t tid;
- C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
- : object_recorder(o), tid(_tid) {
- object_recorder->get();
+ C_AppendFlush(ceph::ref_t<ObjectRecorder> o, uint64_t _tid)
+ : object_recorder(std::move(o)), tid(_tid) {
}
void finish(int r) override {
object_recorder->handle_append_flushed(tid, r);
- object_recorder->put();
}
};
librados::IoCtx m_ioctx;
std::string m_oid;
uint64_t m_object_number;
- CephContext *m_cct;
+ CephContext *m_cct = nullptr;
ContextWQ *m_op_work_queue;
@@ -123,28 +114,37 @@ private:
bool m_compat_mode;
- FlushHandler m_flush_handler;
+ /* So that ObjectRecorder::FlushHandler doesn't create a circular reference: */
+ std::weak_ptr<FlushHandler> m_flush_handler;
+ auto get_flush_handler() {
+ auto h = m_flush_handler.lock();
+ if (!h) {
+ h = std::make_shared<FlushHandler>(this);
+ m_flush_handler = h;
+ }
+ return h;
+ }
mutable ceph::mutex* m_lock;
AppendBuffers m_pending_buffers;
uint64_t m_pending_bytes = 0;
utime_t m_last_flush_time;
- uint64_t m_append_tid;
+ uint64_t m_append_tid = 0;
InFlightTids m_in_flight_tids;
InFlightAppends m_in_flight_appends;
uint64_t m_object_bytes = 0;
- bool m_overflowed;
- bool m_object_closed;
+ bool m_overflowed = false;
+ bool m_object_closed = false;
bufferlist m_prefetch_bl;
- bool m_in_flight_flushes;
+ bool m_in_flight_flushes = false;
ceph::condition_variable m_in_flight_flushes_cond;
uint64_t m_in_flight_bytes = 0;
- bool send_appends(bool force, FutureImplPtr flush_sentinal);
+ bool send_appends(bool force, ceph::ref_t<FutureImpl> flush_sentinal);
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed();
diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h
index e61240d8c1f..d0967c68d7e 100644
--- a/src/journal/ReplayHandler.h
+++ b/src/journal/ReplayHandler.h
@@ -6,14 +6,10 @@
namespace journal {
-struct ReplayHandler {
- virtual ~ReplayHandler() {}
-
- virtual void get() = 0;
- virtual void put() = 0;
-
+struct ReplayHandler {
virtual void handle_entries_available() = 0;
virtual void handle_complete(int r) = 0;
+ virtual ~ReplayHandler() {}
};
} // namespace journal
diff --git a/src/libradosstriper/RadosStriperImpl.cc b/src/libradosstriper/RadosStriperImpl.cc
index d2af7875848..57d88f59aed 100644
--- a/src/libradosstriper/RadosStriperImpl.cc
+++ b/src/libradosstriper/RadosStriperImpl.cc
@@ -27,6 +27,7 @@
#include "include/ceph_fs.h"
#include "common/dout.h"
#include "common/strtol.h"
+#include "common/RefCountedObj.h"
#include "osdc/Striper.h"
#include "librados/AioCompletionImpl.h"
#include <cls/lock/cls_lock_client.h>
@@ -124,14 +125,6 @@ namespace {
* function in asynchronous operations
*/
struct CompletionData : RefCountedObject {
- /// constructor
- CompletionData(libradosstriper::RadosStriperImpl * striper,
- const std::string& soid,
- const std::string& lockCookie,
- librados::AioCompletionImpl *userCompletion = 0,
- int n = 1);
- /// destructor
- ~CompletionData() override;
/// complete method
void complete(int r);
/// striper to be used to handle the write completion
@@ -142,15 +135,21 @@ struct CompletionData : RefCountedObject {
std::string m_lockCookie;
/// completion handler
librados::IoCtxImpl::C_aio_Complete *m_ack;
+protected:
+ CompletionData(libradosstriper::RadosStriperImpl * striper,
+ const std::string& soid,
+ const std::string& lockCookie,
+ librados::AioCompletionImpl *userCompletion = 0);
+ ~CompletionData() override;
+
};
CompletionData::CompletionData
(libradosstriper::RadosStriperImpl* striper,
const std::string& soid,
const std::string& lockCookie,
- librados::AioCompletionImpl *userCompletion,
- int n) :
- RefCountedObject(striper->cct(), n),
+ librados::AioCompletionImpl *userCompletion) :
+ RefCountedObject(striper->cct()),
m_striper(striper), m_soid(soid), m_lockCookie(lockCookie), m_ack(0) {
m_striper->get();
if (userCompletion) {
@@ -183,21 +182,21 @@ struct ReadCompletionData : CompletionData {
int m_readRc;
/// completion object for the unlocking of the striped object at the end of the read
librados::AioCompletion *m_unlockCompletion;
- /// constructor
+ /// complete method for when reading is over
+ void complete_read(int r);
+ /// complete method for when object is unlocked
+ void complete_unlock(int r);
+
+private:
+ FRIEND_MAKE_REF(ReadCompletionData);
ReadCompletionData(libradosstriper::RadosStriperImpl * striper,
const std::string& soid,
const std::string& lockCookie,
librados::AioCompletionImpl *userCompletion,
bufferlist* bl,
std::vector<ObjectExtent>* extents,
- std::vector<bufferlist>* resultbl,
- int n);
- /// destructor
+ std::vector<bufferlist>* resultbl);
~ReadCompletionData() override;
- /// complete method for when reading is over
- void complete_read(int r);
- /// complete method for when object is unlocked
- void complete_unlock(int r);
};
ReadCompletionData::ReadCompletionData
@@ -207,9 +206,8 @@ ReadCompletionData::ReadCompletionData
librados::AioCompletionImpl *userCompletion,
bufferlist* bl,
std::vector<ObjectExtent>* extents,
- std::vector<bufferlist>* resultbl,
- int n) :
- CompletionData(striper, soid, lockCookie, userCompletion, n),
+ std::vector<bufferlist>* resultbl) :
+ CompletionData(striper, soid, lockCookie, userCompletion),
m_bl(bl), m_extents(extents), m_resultbl(resultbl), m_readRc(0),
m_unlockCompletion(0) {}
@@ -251,30 +249,30 @@ struct WriteCompletionData : CompletionData {
librados::AioCompletion *m_unlockCompletion;
/// return code of write completion, to be remembered until unlocking happened
int m_writeRc;
- /// constructor
- WriteCompletionData(libradosstriper::RadosStriperImpl * striper,
- const std::string& soid,
- const std::string& lockCookie,
- librados::AioCompletionImpl *userCompletion,
- int n);
- /// destructor
- ~WriteCompletionData() override;
/// complete method for when writing is over
void complete_write(int r);
/// complete method for when object is unlocked
void complete_unlock(int r);
/// safe method
void safe(int r);
+private:
+ FRIEND_MAKE_REF(WriteCompletionData);
+ /// constructor
+ WriteCompletionData(libradosstriper::RadosStriperImpl * striper,
+ const std::string& soid,
+ const std::string& lockCookie,
+ librados::AioCompletionImpl *userCompletion);
+ /// destructor
+ ~WriteCompletionData() override;
};
WriteCompletionData::WriteCompletionData
(libradosstriper::RadosStriperImpl* striper,
const std::string& soid,
const std::string& lockCookie,
- librados::AioCompletionImpl *userCompletion,
- int n) :
- CompletionData(striper, soid, lockCookie, userCompletion, n), m_safe(0),
- m_unlockCompletion(0), m_writeRc(0) {
+ librados::AioCompletionImpl *userCompletion) :
+ CompletionData(striper, soid, lockCookie, userCompletion),
+ m_safe(0), m_unlockCompletion(0), m_writeRc(0) {
if (userCompletion) {
m_safe = new librados::IoCtxImpl::C_aio_Complete(userCompletion);
}
@@ -303,6 +301,9 @@ void WriteCompletionData::safe(int r) {
struct RemoveCompletionData : CompletionData {
/// removal flags
int flags;
+
+private:
+ FRIEND_MAKE_REF(RemoveCompletionData);
/**
* constructor
* note that the constructed object will take ownership of the lock
@@ -320,6 +321,15 @@ struct RemoveCompletionData : CompletionData {
* function in asynchronous truncate operations
*/
struct TruncateCompletionData : RefCountedObject {
+ /// striper to be used
+ libradosstriper::RadosStriperImpl *m_striper;
+ /// striped object concerned by the truncate operation
+ std::string m_soid;
+ /// the final size of the truncated object
+ uint64_t m_size;
+
+private:
+ FRIEND_MAKE_REF(TruncateCompletionData);
/// constructor
TruncateCompletionData(libradosstriper::RadosStriperImpl* striper,
const std::string& soid,
@@ -332,12 +342,6 @@ struct TruncateCompletionData : RefCountedObject {
~TruncateCompletionData() override {
m_striper->put();
}
- /// striper to be used
- libradosstriper::RadosStriperImpl *m_striper;
- /// striped object concerned by the truncate operation
- std::string m_soid;
- /// the final size of the truncated object
- uint64_t m_size;
};
/**
@@ -345,20 +349,22 @@ struct TruncateCompletionData : RefCountedObject {
* function in asynchronous read operations of a Rados File
*/
struct RadosReadCompletionData : RefCountedObject {
- /// constructor
- RadosReadCompletionData(MultiAioCompletionImplPtr multiAioCompl,
- uint64_t expectedBytes,
- bufferlist *bl,
- CephContext *context,
- int n = 1) :
- RefCountedObject(context, n),
- m_multiAioCompl(multiAioCompl), m_expectedBytes(expectedBytes), m_bl(bl) {}
/// the multi asynch io completion object to be used
MultiAioCompletionImplPtr m_multiAioCompl;
/// the expected number of bytes
uint64_t m_expectedBytes;
/// the bufferlist object where data have been written
bufferlist *m_bl;
+
+private:
+ FRIEND_MAKE_REF(RadosReadCompletionData);
+ /// constructor
+ RadosReadCompletionData(MultiAioCompletionImplPtr multiAioCompl,
+ uint64_t expectedBytes,
+ bufferlist *bl,
+ CephContext *context) :
+ RefCountedObject(context),
+ m_multiAioCompl(multiAioCompl), m_expectedBytes(expectedBytes), m_bl(bl) {}
};
/**
@@ -368,16 +374,6 @@ struct RadosReadCompletionData : RefCountedObject {
* versions (time_t or struct timespec)
*/
struct BasicStatCompletionData : CompletionData {
- /// constructor
- BasicStatCompletionData(libradosstriper::RadosStriperImpl* striper,
- const std::string& soid,
- librados::AioCompletionImpl *userCompletion,
- libradosstriper::MultiAioCompletionImpl *multiCompletion,
- uint64_t *psize,
- int n = 1) :
- CompletionData(striper, soid, "", userCompletion, n),
- m_multiCompletion(multiCompletion), m_psize(psize),
- m_statRC(0), m_getxattrRC(0) {};
// MultiAioCompletionImpl used to handle the double aysnc
// call in the back (stat + getxattr)
libradosstriper::MultiAioCompletionImpl *m_multiCompletion;
@@ -393,6 +389,18 @@ struct BasicStatCompletionData : CompletionData {
int m_statRC;
/// return code of the getxattr
int m_getxattrRC;
+
+protected:
+ /// constructor
+ BasicStatCompletionData(libradosstriper::RadosStriperImpl* striper,
+ const std::string& soid,
+ librados::AioCompletionImpl *userCompletion,
+ libradosstriper::MultiAioCompletionImpl *multiCompletion,
+ uint64_t *psize) :
+ CompletionData(striper, soid, "", userCompletion),
+ m_multiCompletion(multiCompletion), m_psize(psize),
+ m_statRC(0), m_getxattrRC(0) {};
+
};
/**
@@ -404,18 +412,19 @@ struct BasicStatCompletionData : CompletionData {
*/
template<class TimeType>
struct StatCompletionData : BasicStatCompletionData {
+ // where to store the file time
+ TimeType *m_pmtime;
+private:
+ FRIEND_MAKE_REF(StatCompletionData);
/// constructor
- StatCompletionData(libradosstriper::RadosStriperImpl* striper,
+ StatCompletionData<TimeType>(libradosstriper::RadosStriperImpl* striper,
const std::string& soid,
librados::AioCompletionImpl *userCompletion,
libradosstriper::MultiAioCompletionImpl *multiCompletion,
uint64_t *psize,
- TimeType *pmtime,
- int n = 1) :
- BasicStatCompletionData(striper, soid, userCompletion, multiCompletion, psize, n),
+ TimeType *pmtime) :
+ BasicStatCompletionData(striper, soid, userCompletion, multiCompletion, psize),
m_pmtime(pmtime) {};
- // where to store the file time
- TimeType *m_pmtime;
};
/**
@@ -423,13 +432,15 @@ struct StatCompletionData : BasicStatCompletionData {
* function in asynchronous remove operations of a Rados File
*/
struct RadosRemoveCompletionData : RefCountedObject {
+ /// the multi asynch io completion object to be used
+ MultiAioCompletionImplPtr m_multiAioCompl;
+private:
+ FRIEND_MAKE_REF(RadosRemoveCompletionData);
/// constructor
RadosRemoveCompletionData(MultiAioCompletionImplPtr multiAioCompl,
CephContext *context) :
- RefCountedObject(context, 2),
+ RefCountedObject(context),
m_multiAioCompl(multiAioCompl) {};
- /// the multi asynch io completion object to be used
- MultiAioCompletionImplPtr m_multiAioCompl;
};
@@ -614,16 +625,15 @@ int libradosstriper::RadosStriperImpl::aio_write_full(const std::string& soid,
static void rados_read_aio_unlock_complete(rados_striper_multi_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<ReadCompletionData*>(arg);
+ auto cdata = ceph::ref_t<ReadCompletionData>(static_cast<ReadCompletionData*>(arg), false);
libradosstriper::MultiAioCompletionImpl *comp =
reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
cdata->complete_unlock(comp->rval);
- cdata->put();
}
static void striper_read_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<ReadCompletionData*>(arg);
+ auto cdata = static_cast<ReadCompletionData*>(arg);
// launch the async unlocking of the object
cdata->m_striper->aio_unlockObject(cdata->m_soid, cdata->m_lockCookie, cdata->m_unlockCompletion);
// complete the read part in parallel
@@ -634,19 +644,18 @@ static void striper_read_aio_req_complete(rados_striper_multi_completion_t c, vo
static void rados_req_read_safe(rados_completion_t c, void *arg)
{
- auto data = reinterpret_cast<RadosReadCompletionData*>(arg);
+ auto data = ceph::ref_t<RadosReadCompletionData>(static_cast<RadosReadCompletionData*>(arg), false);
int rc = rados_aio_get_return_value(c);
// ENOENT means that we are dealing with a sparse file. This is fine,
// data (0s) will be created on the fly by the rados_req_read_complete method
if (rc == -ENOENT) rc = 0;
auto multiAioComp = data->m_multiAioCompl;
multiAioComp->safe_request(rc);
- data->put();
}
static void rados_req_read_complete(rados_completion_t c, void *arg)
{
- auto data = reinterpret_cast<RadosReadCompletionData*>(arg);
+ auto data = static_cast<RadosReadCompletionData*>(arg);
int rc = rados_aio_get_return_value(c);
// We need to handle the case of sparse files here
if (rc == -ENOENT) {
@@ -672,7 +681,6 @@ static void rados_req_read_complete(rados_completion_t c, void *arg)
}
auto multiAioComp = data->m_multiAioCompl;
multiAioComp->complete_request(rc);
- data->put();
}
int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid,
@@ -710,18 +718,17 @@ int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid,
// create a completion object and transfer ownership of extents and resultbl
vector<bufferlist> *resultbl = new vector<bufferlist>(extents->size());
- ReadCompletionData *cdata = new ReadCompletionData(this, soid, lockCookie, c,
- bl, extents, resultbl, 1);
+ auto cdata = ceph::make_ref<ReadCompletionData>(this, soid, lockCookie, c, bl, extents, resultbl);
c->is_read = true;
c->io = m_ioCtxImpl;
// create a completion for the unlocking of the striped object at the end of the read
librados::AioCompletion *unlock_completion =
- librados::Rados::aio_create_completion(cdata, rados_read_aio_unlock_complete, 0);
+ librados::Rados::aio_create_completion(cdata->get() /* create ref! */, rados_read_aio_unlock_complete, 0);
cdata->m_unlockCompletion = unlock_completion;
// create the multiCompletion object handling the reads
MultiAioCompletionImplPtr nc{new libradosstriper::MultiAioCompletionImpl,
false};
- nc->set_complete_callback(cdata, striper_read_aio_req_complete);
+ nc->set_complete_callback(cdata.get(), striper_read_aio_req_complete);
// go through the extents
int r = 0, i = 0;
for (vector<ObjectExtent>::iterator p = extents->begin(); p != extents->end(); ++p) {
@@ -738,9 +745,9 @@ int libradosstriper::RadosStriperImpl::aio_read(const std::string& soid,
nc->add_request();
// we need 2 references on data as both rados_req_read_safe and rados_req_read_complete
// will release one
- RadosReadCompletionData *data = new RadosReadCompletionData(nc, p->length, oid_bl, cct(), 2);
+ auto data = ceph::make_ref<RadosReadCompletionData>(nc, p->length, oid_bl, cct());
librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(data, rados_req_read_complete, rados_req_read_safe);
+ librados::Rados::aio_create_completion(data.detach(), rados_req_read_complete, rados_req_read_safe);
r = m_ioCtx.aio_read(p->oid.name, rados_completion, oid_bl, p->length, p->offset);
rados_completion->release();
if (r < 0)
@@ -794,18 +801,17 @@ int libradosstriper::RadosStriperImpl::stat(const std::string& soid, uint64_t *p
}
static void striper_stat_aio_stat_complete(rados_completion_t c, void *arg) {
- auto data = reinterpret_cast<BasicStatCompletionData*>(arg);
+ auto data = ceph::ref_t<BasicStatCompletionData>(static_cast<BasicStatCompletionData*>(arg), false);
int rc = rados_aio_get_return_value(c);
if (rc == -ENOENT) {
// remember this has failed
data->m_statRC = rc;
}
data->m_multiCompletion->complete_request(rc);
- data->put();
}
static void striper_stat_aio_getxattr_complete(rados_completion_t c, void *arg) {
- auto data = reinterpret_cast<BasicStatCompletionData*>(arg);
+ auto data = ceph::ref_t<BasicStatCompletionData>(static_cast<BasicStatCompletionData*>(arg), false);
int rc = rados_aio_get_return_value(c);
// We need to handle the case of sparse files here
if (rc < 0) {
@@ -823,12 +829,11 @@ static void striper_stat_aio_getxattr_complete(rados_completion_t c, void *arg)
rc = 0;
}
data->m_multiCompletion->complete_request(rc);
- data->put();
}
static void striper_stat_aio_req_complete(rados_striper_multi_completion_t c,
void *arg) {
- auto data = reinterpret_cast<BasicStatCompletionData*>(arg);
+ auto data = ceph::ref_t<BasicStatCompletionData>(static_cast<BasicStatCompletionData*>(arg), false);
if (data->m_statRC) {
data->complete(data->m_statRC);
} else {
@@ -838,7 +843,6 @@ static void striper_stat_aio_req_complete(rados_striper_multi_completion_t c,
data->complete(0);
}
}
- data->put();
}
template<class TimeType>
@@ -855,13 +859,11 @@ int libradosstriper::RadosStriperImpl::aio_generic_stat
new libradosstriper::MultiAioCompletionImpl, false};
// Data object used for passing context to asynchronous calls
std::string firstObjOid = getObjectId(soid, 0);
- StatCompletionData<TimeType> *cdata =
- new StatCompletionData<TimeType>(this, firstObjOid, c,
- multi_completion.get(), psize, pmtime, 4);
- multi_completion->set_complete_callback(cdata, striper_stat_aio_req_complete);
+ auto cdata = ceph::make_ref<StatCompletionData<TimeType>>(this, firstObjOid, c, multi_completion.get(), psize, pmtime);
+ multi_completion->set_complete_callback(cdata->get() /* create ref! */, striper_stat_aio_req_complete);
// use a regular AioCompletion for the stat async call
librados::AioCompletion *stat_completion =
- librados::Rados::aio_create_completion(cdata, striper_stat_aio_stat_complete, 0);
+ librados::Rados::aio_create_completion(cdata->get() /* create ref! */, striper_stat_aio_stat_complete, 0);
multi_completion->add_safe_request();
object_t obj(firstObjOid);
int rc = (m_ioCtxImpl->*statFunction)(obj, stat_completion->pc,
@@ -869,12 +871,12 @@ int libradosstriper::RadosStriperImpl::aio_generic_stat
stat_completion->release();
if (rc < 0) {
// nothing is really started so cancel everything
- delete cdata;
+ delete cdata.detach();
return rc;
}
// use a regular AioCompletion for the getxattr async call
librados::AioCompletion *getxattr_completion =
- librados::Rados::aio_create_completion(cdata, striper_stat_aio_getxattr_complete, 0);
+ librados::Rados::aio_create_completion(cdata->get() /* create ref! */, striper_stat_aio_getxattr_complete, 0);
multi_completion->add_safe_request();
// in parallel, get the pmsize from the first object asynchronously
rc = m_ioCtxImpl->aio_getxattr(obj, getxattr_completion->pc,
@@ -888,7 +890,6 @@ int libradosstriper::RadosStriperImpl::aio_generic_stat
multi_completion->complete_request(rc);
return rc;
}
- cdata->put();
return 0;
}
@@ -925,31 +926,29 @@ int libradosstriper::RadosStriperImpl::aio_stat2(const std::string& soid,
static void rados_req_remove_complete(rados_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<RadosRemoveCompletionData*>(arg);
+ auto cdata = static_cast<RadosRemoveCompletionData*>(arg);
int rc = rados_aio_get_return_value(c);
// in case the object did not exist, it means we had a sparse file, all is fine
if (rc == -ENOENT) {
rc = 0;
}
cdata->m_multiAioCompl->complete_request(rc);
- cdata->put();
}
static void rados_req_remove_safe(rados_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<RadosRemoveCompletionData*>(arg);
+ auto cdata = ceph::ref_t<RadosRemoveCompletionData>(static_cast<RadosRemoveCompletionData*>(arg), false);
int rc = rados_aio_get_return_value(c);
// in case the object did not exist, it means we had a sparse file, all is fine
if (rc == -ENOENT) {
rc = 0;
}
cdata->m_multiAioCompl->safe_request(rc);
- cdata->put();
}
static void striper_remove_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<RemoveCompletionData*>(arg);
+ auto cdata = ceph::ref_t<RemoveCompletionData>(static_cast<RemoveCompletionData*>(arg), false);
libradosstriper::MultiAioCompletionImpl *comp =
reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
ldout(cdata->m_striper->cct(), 10)
@@ -968,7 +967,6 @@ static void striper_remove_aio_req_complete(rados_striper_multi_completion_t c,
<< dendl;
}
cdata->complete(rc);
- cdata->put();
}
int libradosstriper::RadosStriperImpl::remove(const std::string& soid, int flags)
@@ -996,10 +994,10 @@ int libradosstriper::RadosStriperImpl::aio_remove(const std::string& soid,
int rc = m_ioCtx.lock_exclusive(getObjectId(soid, 0), RADOS_LOCK_NAME, lockCookie, "", 0, 0);
if (rc) return rc;
// create CompletionData for the async remove call
- RemoveCompletionData *cdata = new RemoveCompletionData(this, soid, lockCookie, c, flags);
+ auto cdata = ceph::make_ref<RemoveCompletionData>(this, soid, lockCookie, c, flags);
MultiAioCompletionImplPtr multi_completion{
new libradosstriper::MultiAioCompletionImpl, false};
- multi_completion->set_complete_callback(cdata, striper_remove_aio_req_complete);
+ multi_completion->set_complete_callback(cdata->get() /* create ref! */, striper_remove_aio_req_complete);
// call asynchronous internal version of remove
ldout(cct(), 10)
<< "RadosStriperImpl : Aio_remove starting for "
@@ -1054,10 +1052,9 @@ int libradosstriper::RadosStriperImpl::internal_aio_remove(
int rcr = 0;
for (int i = nb_objects-1; i >= 1; i--) {
multi_completion->add_request();
- RadosRemoveCompletionData *data =
- new RadosRemoveCompletionData(multi_completion, cct());
+ auto data = ceph::make_ref<RadosRemoveCompletionData>(multi_completion, cct());
librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(data,
+ librados::Rados::aio_create_completion(data->get() /* create ref! */,
rados_req_remove_complete,
rados_req_remove_safe);
if (flags == 0) {
@@ -1142,32 +1139,29 @@ void libradosstriper::RadosStriperImpl::aio_unlockObject(const std::string& soid
static void rados_write_aio_unlock_complete(rados_striper_multi_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<WriteCompletionData*>(arg);
+ auto cdata = ceph::ref_t<WriteCompletionData>(static_cast<WriteCompletionData*>(arg), false);
libradosstriper::MultiAioCompletionImpl *comp =
reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
cdata->complete_unlock(comp->rval);
- cdata->put();
}
static void striper_write_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<WriteCompletionData*>(arg);
+ auto cdata = ceph::ref_t<WriteCompletionData>(static_cast<WriteCompletionData*>(arg), false);
// launch the async unlocking of the object
cdata->m_striper->aio_unlockObject(cdata->m_soid, cdata->m_lockCookie, cdata->m_unlockCompletion);
// complete the write part in parallel
libradosstriper::MultiAioCompletionImpl *comp =
reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
cdata->complete_write(comp->rval);
- cdata->put();
}
static void striper_write_aio_req_safe(rados_striper_multi_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<WriteCompletionData*>(arg);
+ auto cdata = ceph::ref_t<WriteCompletionData>(static_cast<WriteCompletionData*>(arg), false);
libradosstriper::MultiAioCompletionImpl *comp =
reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
cdata->safe(comp->rval);
- cdata->put();
}
int libradosstriper::RadosStriperImpl::write_in_open_object(const std::string& soid,
@@ -1179,17 +1173,16 @@ int libradosstriper::RadosStriperImpl::write_in_open_object(const std::string& s
// create a completion object to be passed to the callbacks of the multicompletion
// we need 3 references as striper_write_aio_req_complete will release two and
// striper_write_aio_req_safe will release one
- WriteCompletionData *cdata = new WriteCompletionData(this, soid, lockCookie, 0, 3);
- cdata->get(); // local ref
+ auto cdata = ceph::make_ref<WriteCompletionData>(this, soid, lockCookie, nullptr);
// create a completion object for the unlocking of the striped object at the end of the write
librados::AioCompletion *unlock_completion =
- librados::Rados::aio_create_completion(cdata, rados_write_aio_unlock_complete, 0);
+ librados::Rados::aio_create_completion(cdata->get() /* create ref! */, rados_write_aio_unlock_complete, 0);
cdata->m_unlockCompletion = unlock_completion;
// create the multicompletion that will handle the write completion
MultiAioCompletionImplPtr c{new libradosstriper::MultiAioCompletionImpl,
false};
- c->set_complete_callback(cdata, striper_write_aio_req_complete);
- c->set_safe_callback(cdata, striper_write_aio_req_safe);
+ c->set_complete_callback(cdata->get() /* create ref! */, striper_write_aio_req_complete);
+ c->set_safe_callback(cdata->get() /* create ref! */, striper_write_aio_req_safe);
// call the asynchronous API
int rc = internal_aio_write(soid, c, bl, len, off, layout);
if (!rc) {
@@ -1201,7 +1194,6 @@ int libradosstriper::RadosStriperImpl::write_in_open_object(const std::string& s
// return result
rc = c->get_return_value();
}
- cdata->put();
return rc;
}
@@ -1215,22 +1207,20 @@ int libradosstriper::RadosStriperImpl::aio_write_in_open_object(const std::strin
// create a completion object to be passed to the callbacks of the multicompletion
// we need 3 references as striper_write_aio_req_complete will release two and
// striper_write_aio_req_safe will release one
- WriteCompletionData *cdata = new WriteCompletionData(this, soid, lockCookie, c, 3);
- cdata->get(); // local ref
+ auto cdata = ceph::make_ref<WriteCompletionData>(this, soid, lockCookie, c);
m_ioCtxImpl->get();
c->io = m_ioCtxImpl;
// create a completion object for the unlocking of the striped object at the end of the write
librados::AioCompletion *unlock_completion =
- librados::Rados::aio_create_completion(cdata, rados_write_aio_unlock_complete, 0);
+ librados::Rados::aio_create_completion(cdata->get() /* create ref! */, rados_write_aio_unlock_complete, 0);
cdata->m_unlockCompletion = unlock_completion;
// create the multicompletion that will handle the write completion
libradosstriper::MultiAioCompletionImplPtr nc{
new libradosstriper::MultiAioCompletionImpl, false};
- nc->set_complete_callback(cdata, striper_write_aio_req_complete);
- nc->set_safe_callback(cdata, striper_write_aio_req_safe);
+ nc->set_complete_callback(cdata->get() /* create ref! */, striper_write_aio_req_complete);
+ nc->set_safe_callback(cdata->get() /* create ref! */, striper_write_aio_req_safe);
// internal asynchronous API
int rc = internal_aio_write(soid, nc, bl, len, off, layout);
- cdata->put();
return rc;
}
@@ -1503,7 +1493,7 @@ int libradosstriper::RadosStriperImpl::createAndOpenStripedObject(const std::str
static void striper_truncate_aio_req_complete(rados_striper_multi_completion_t c, void *arg)
{
- auto cdata = reinterpret_cast<TruncateCompletionData*>(arg);
+ auto cdata = ceph::ref_t<TruncateCompletionData>(static_cast<TruncateCompletionData*>(arg), false);
libradosstriper::MultiAioCompletionImpl *comp =
reinterpret_cast<libradosstriper::MultiAioCompletionImpl*>(c);
if (0 == comp->rval) {
@@ -1514,7 +1504,6 @@ static void striper_truncate_aio_req_complete(rados_striper_multi_completion_t c
bl.append(oss.str());
cdata->m_striper->setxattr(cdata->m_soid, XATTR_SIZE, bl);
}
- cdata->put();
}
int libradosstriper::RadosStriperImpl::truncate(const std::string& soid,
@@ -1522,10 +1511,10 @@ int libradosstriper::RadosStriperImpl::truncate(const std::string& soid,
uint64_t size,
ceph_file_layout &layout)
{
- TruncateCompletionData *cdata = new TruncateCompletionData(this, soid, size);
+ auto cdata = ceph::make_ref<TruncateCompletionData>(this, soid, size);
libradosstriper::MultiAioCompletionImplPtr multi_completion{
new libradosstriper::MultiAioCompletionImpl, false};
- multi_completion->set_complete_callback(cdata, striper_truncate_aio_req_complete);
+ multi_completion->set_complete_callback(cdata->get() /* create ref! */, striper_truncate_aio_req_complete);
// call asynchrous version of truncate
int rc = aio_truncate(soid, multi_completion, original_size, size, layout);
// wait for completion of the truncation
@@ -1573,10 +1562,9 @@ int libradosstriper::RadosStriperImpl::aio_truncate
if (exists) {
// remove asynchronously
multi_completion->add_request();
- RadosRemoveCompletionData *data =
- new RadosRemoveCompletionData(multi_completion, cct());
+ auto data = ceph::make_ref<RadosRemoveCompletionData>(multi_completion, cct());
librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(data,
+ librados::Rados::aio_create_completion(data->get() /* create ref! */,
rados_req_remove_complete,
rados_req_remove_safe);
int rc = m_ioCtx.aio_remove(getObjectId(soid, objectno), rados_completion);
@@ -1608,10 +1596,9 @@ int libradosstriper::RadosStriperImpl::aio_truncate
} else {
// removes are asynchronous in order to speed up truncations of big files
multi_completion->add_request();
- RadosRemoveCompletionData *data =
- new RadosRemoveCompletionData(multi_completion, cct());
+ auto data = ceph::make_ref<RadosRemoveCompletionData>(multi_completion, cct());
librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(data,
+ librados::Rados::aio_create_completion(data->get() /* create ref! */,
rados_req_remove_complete,
rados_req_remove_safe);
rc = m_ioCtx.aio_remove(getObjectId(soid, objectno), rados_completion);
diff --git a/src/librbd/DeepCopyRequest.cc b/src/librbd/DeepCopyRequest.cc
index c96fa93bed6..fee8b0274f1 100644
--- a/src/librbd/DeepCopyRequest.cc
+++ b/src/librbd/DeepCopyRequest.cc
@@ -33,7 +33,7 @@ DeepCopyRequest<I>::DeepCopyRequest(I *src_image_ctx, I *dst_image_ctx,
ContextWQ *work_queue, SnapSeqs *snap_seqs,
ProgressContext *prog_ctx,
Context *on_finish)
- : RefCountedObject(dst_image_ctx->cct, 1), m_src_image_ctx(src_image_ctx),
+ : RefCountedObject(dst_image_ctx->cct), m_src_image_ctx(src_image_ctx),
m_dst_image_ctx(dst_image_ctx), m_snap_id_start(snap_id_start),
m_snap_id_end(snap_id_end), m_flatten(flatten),
m_object_number(object_number), m_work_queue(work_queue),
diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h
index a467178a922..bdd18f99d21 100644
--- a/src/librbd/Journal.h
+++ b/src/librbd/Journal.h
@@ -254,13 +254,6 @@ private:
ReplayHandler(Journal *_journal) : journal(_journal) {
}
- void get() override {
- // TODO
- }
- void put() override {
- // TODO
- }
-
void handle_entries_available() override {
journal->handle_replay_ready();
}
diff --git a/src/librbd/deep_copy/ImageCopyRequest.cc b/src/librbd/deep_copy/ImageCopyRequest.cc
index f1e95c1bb4a..2fd2ed4a40a 100644
--- a/src/librbd/deep_copy/ImageCopyRequest.cc
+++ b/src/librbd/deep_copy/ImageCopyRequest.cc
@@ -30,7 +30,7 @@ ImageCopyRequest<I>::ImageCopyRequest(I *src_image_ctx, I *dst_image_ctx,
const SnapSeqs &snap_seqs,
ProgressContext *prog_ctx,
Context *on_finish)
- : RefCountedObject(dst_image_ctx->cct, 1), m_src_image_ctx(src_image_ctx),
+ : RefCountedObject(dst_image_ctx->cct), m_src_image_ctx(src_image_ctx),
m_dst_image_ctx(dst_image_ctx), m_snap_id_start(snap_id_start),
m_snap_id_end(snap_id_end), m_flatten(flatten),
m_object_number(object_number), m_snap_seqs(snap_seqs),
diff --git a/src/librbd/deep_copy/SnapshotCopyRequest.cc b/src/librbd/deep_copy/SnapshotCopyRequest.cc
index 0eaaa477860..25d24b06246 100644
--- a/src/librbd/deep_copy/SnapshotCopyRequest.cc
+++ b/src/librbd/deep_copy/SnapshotCopyRequest.cc
@@ -49,7 +49,7 @@ SnapshotCopyRequest<I>::SnapshotCopyRequest(I *src_image_ctx,
bool flatten, ContextWQ *work_queue,
SnapSeqs *snap_seqs,
Context *on_finish)
- : RefCountedObject(dst_image_ctx->cct, 1), m_src_image_ctx(src_image_ctx),
+ : RefCountedObject(dst_image_ctx->cct), m_src_image_ctx(src_image_ctx),
m_dst_image_ctx(dst_image_ctx), m_snap_id_end(snap_id_end),
m_flatten(flatten), m_work_queue(work_queue), m_snap_seqs_result(snap_seqs),
m_snap_seqs(*snap_seqs), m_on_finish(on_finish), m_cct(dst_image_ctx->cct),
diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc
index ace4366dfdc..ba6c7f8c4a8 100644
--- a/src/mgr/DaemonServer.cc
+++ b/src/mgr/DaemonServer.cc
@@ -168,7 +168,7 @@ entity_addrvec_t DaemonServer::get_myaddrs() const
int DaemonServer::ms_handle_authentication(Connection *con)
{
- MgrSession *s = new MgrSession(cct);
+ auto s = ceph::make_ref<MgrSession>(cct);
con->set_priv(s);
s->inst.addr = con->get_peer_addr();
s->entity_name = con->peer_name;
diff --git a/src/mgr/DaemonState.h b/src/mgr/DaemonState.h
index ff8bf8f2c41..8c5a8021eaf 100644
--- a/src/mgr/DaemonState.h
+++ b/src/mgr/DaemonState.h
@@ -211,10 +211,6 @@ struct DeviceState : public RefCountedObject
pair<utime_t,utime_t> life_expectancy; ///< when device failure is expected
utime_t life_expectancy_stamp; ///< when life expectency was recorded
- DeviceState(const std::string& n)
- : RefCountedObject(nullptr, 0),
- devid(n) {}
-
void set_metadata(map<string,string>&& m);
void set_life_expectancy(utime_t from, utime_t to, utime_t now);
@@ -229,9 +225,11 @@ struct DeviceState : public RefCountedObject
void dump(Formatter *f) const;
void print(ostream& out) const;
-};
-typedef boost::intrusive_ptr<DeviceState> DeviceStateRef;
+private:
+ FRIEND_MAKE_REF(DeviceState);
+ DeviceState(const std::string& n) : devid(n) {}
+};
/**
* Fuse the collection of per-daemon metadata from Ceph into
@@ -248,19 +246,19 @@ private:
DaemonStateCollection all;
std::set<DaemonKey> updating;
- std::map<std::string,DeviceStateRef> devices;
+ std::map<std::string,ceph::ref_t<DeviceState>> devices;
void _erase(const DaemonKey& dmk);
- DeviceStateRef _get_or_create_device(const std::string& dev) {
- auto p = devices.find(dev);
- if (p != devices.end()) {
- return p->second;
+ ceph::ref_t<DeviceState> _get_or_create_device(const std::string& dev) {
+ auto em = devices.try_emplace(dev, nullptr);
+ auto& d = em.first->second;
+ if (em.second) {
+ d = ceph::make_ref<DeviceState>(dev);
}
- devices[dev] = new DeviceState(dev);
- return devices[dev];
+ return d;
}
- void _erase_device(DeviceStateRef d) {
+ void _erase_device(const ceph::ref_t<DeviceState>& d) {
devices.erase(d->devid);
}
diff --git a/src/mgr/MgrSession.h b/src/mgr/MgrSession.h
index c61a80b63e7..aacbeb8209f 100644
--- a/src/mgr/MgrSession.h
+++ b/src/mgr/MgrSession.h
@@ -25,15 +25,17 @@ struct MgrSession : public RefCountedObject {
std::set<std::string> declared_types;
- explicit MgrSession(CephContext *cct) : RefCountedObject(cct, 0) {}
- ~MgrSession() override {}
-
const entity_addr_t& get_peer_addr() {
return inst.addr;
}
+
+private:
+ FRIEND_MAKE_REF(MgrSession);
+ explicit MgrSession(CephContext *cct) : RefCountedObject(cct) {}
+ ~MgrSession() override = default;
};
-typedef boost::intrusive_ptr<MgrSession> MgrSessionRef;
+using MgrSessionRef = ceph::ref_t<MgrSession>;
#endif
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 8d37cbd9161..1ed7d3f6f42 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -3979,10 +3979,6 @@ void Monitor::forward_request_leader(MonOpRequestRef op)
// fake connection attached to forwarded messages
struct AnonConnection : public Connection {
entity_addr_t socket_addr;
- explicit AnonConnection(CephContext *cct,
- const entity_addr_t& sa)
- : Connection(cct, NULL),
- socket_addr(sa) {}
int send_message(Message *m) override {
ceph_assert(!"send_message on anonymous connection");
@@ -4000,6 +3996,12 @@ struct AnonConnection : public Connection {
entity_addr_t get_peer_socket_addr() const override {
return socket_addr;
}
+
+private:
+ FRIEND_MAKE_REF(AnonConnection);
+ explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
+ : Connection(cct, nullptr),
+ socket_addr(sa) {}
};
//extract the original message and put it into the regular dispatch function
@@ -4022,7 +4024,7 @@ void Monitor::handle_forward(MonOpRequestRef op)
PaxosServiceMessage *req = m->claim_message();
ceph_assert(req != NULL);
- ConnectionRef c(new AnonConnection(cct, m->client_socket_addr));
+ auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr);
MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
s->_ident(req->get_source(),
req->get_source_addrs());
diff --git a/src/msg/Connection.h b/src/msg/Connection.h
index 840a3fba67f..95f89fd19b8 100644
--- a/src/msg/Connection.h
+++ b/src/msg/Connection.h
@@ -18,12 +18,11 @@
#include <stdlib.h>
#include <ostream>
-#include <boost/intrusive_ptr.hpp>
-
#include "auth/Auth.h"
#include "common/RefCountedObj.h"
#include "common/config.h"
#include "common/debug.h"
+#include "common/ref.h"
#include "common/ceph_mutex.h"
#include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
#include "include/buffer.h"
@@ -41,21 +40,21 @@ class Messenger;
class Interceptor;
#endif
-struct Connection : public RefCountedObject {
+struct Connection : public RefCountedObjectSafe {
mutable ceph::mutex lock = ceph::make_mutex("Connection::lock");
Messenger *msgr;
RefCountedPtr priv;
- int peer_type;
+ int peer_type = -1;
int64_t peer_id = -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567
safe_item_history<entity_addrvec_t> peer_addrs;
utime_t last_keepalive, last_keepalive_ack;
private:
- uint64_t features;
+ uint64_t features = 0;
public:
- bool is_loopback;
- bool failed; // true if we are a lossy connection that has failed.
+ bool is_loopback = false;
+ bool failed = false; // true if we are a lossy connection that has failed.
- int rx_buffers_version;
+ int rx_buffers_version = 0;
std::map<ceph_tid_t,std::pair<ceph::buffer::list, int>> rx_buffers;
// authentication state
@@ -69,26 +68,9 @@ public:
Interceptor *interceptor;
#endif
- friend class boost::intrusive_ptr<Connection>;
friend class PipeConnection;
public:
- Connection(CephContext *cct, Messenger *m)
- // we are managed exclusively by ConnectionRef; make it so you can
- // ConnectionRef foo = new Connection;
- : RefCountedObject(cct, 0),
- msgr(m),
- peer_type(-1),
- features(0),
- is_loopback(false),
- failed(false),
- rx_buffers_version(0) {
- }
-
- ~Connection() override {
- //generic_dout(0) << "~Connection " << this << dendl;
- }
-
void set_priv(const RefCountedPtr& o) {
std::lock_guard l{lock};
priv = o;
@@ -254,9 +236,18 @@ public:
last_keepalive_ack = t;
}
bool is_blackhole() const;
-};
-typedef boost::intrusive_ptr<Connection> ConnectionRef;
+protected:
+ Connection(CephContext *cct, Messenger *m)
+ : RefCountedObjectSafe(cct),
+ msgr(m)
+ {}
+
+ ~Connection() override {
+ //generic_dout(0) << "~Connection " << this << dendl;
+ }
+};
+using ConnectionRef = ceph::ref_t<Connection>;
#endif /* CEPH_CONNECTION_H */
diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc
index b86035eca26..3e3fae80030 100644
--- a/src/msg/async/AsyncConnection.cc
+++ b/src/msg/async/AsyncConnection.cc
@@ -113,7 +113,8 @@ class C_tick_wakeup : public EventCallback {
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
Worker *w, bool m2, bool local)
- : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
+ : Connection(cct, m),
+ delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()),
state(STATE_NONE), port(-1),
dispatch_queue(q), recv_buf(NULL),
diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h
index 3ce26e6d52a..6b2f0e98b0c 100644
--- a/src/msg/async/AsyncConnection.h
+++ b/src/msg/async/AsyncConnection.h
@@ -51,7 +51,6 @@ static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
* sequence, try to reconnect peer endpoint.
*/
class AsyncConnection : public Connection {
-
ssize_t read(unsigned len, char *buffer,
std::function<void(char *, ssize_t)> callback);
ssize_t read_until(unsigned needed, char *p);
@@ -106,10 +105,12 @@ class AsyncConnection : public Connection {
void flush();
} *delay_state;
- public:
+private:
+ FRIEND_MAKE_REF(AsyncConnection);
AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
Worker *w, bool is_msgr2, bool local);
~AsyncConnection() override;
+public:
void maybe_start_delay_thread();
ostream& _conn_prefix(std::ostream *_dout);
@@ -235,6 +236,6 @@ class AsyncConnection : public Connection {
friend class ProtocolV2;
}; /* AsyncConnection */
-typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
+using AsyncConnectionRef = ceph::ref_t<AsyncConnection>;
#endif
diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc
index fc1107279e6..3556c4960f5 100644
--- a/src/msg/async/AsyncMessenger.cc
+++ b/src/msg/async/AsyncMessenger.cc
@@ -292,7 +292,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
stack = single->stack.get();
stack->start();
local_worker = stack->get_worker();
- local_connection = new AsyncConnection(cct, this, &dispatch_queue,
+ local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
local_worker, true, true);
init_local_connection();
reap_handler = new C_handle_reap(this);
@@ -556,7 +556,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
const entity_addr_t &peer_addr)
{
std::lock_guard l{lock};
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+ auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
listen_addr.is_msgr2(), false);
conn->accept(std::move(cli_socket), listen_addr, peer_addr);
accepting_conns.insert(conn);
@@ -585,7 +585,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(
// create connection
Worker *w = stack->get_worker();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+ auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
target.is_msgr2(), false);
conn->connect(addrs, type, target);
ceph_assert(!conns.count(addrs));
diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h
index b60993daab0..d3d13d04ba3 100644
--- a/src/os/ObjectStore.h
+++ b/src/os/ObjectStore.h
@@ -124,10 +124,6 @@ public:
struct CollectionImpl : public RefCountedObject {
const coll_t cid;
- CollectionImpl(const coll_t& c)
- : RefCountedObject(NULL, 0),
- cid(c) {}
-
/// wait for any queued transactions to apply
// block until any previous transactions are visible. specifically,
// collection_list and collection_empty need to reflect prior operations.
@@ -149,8 +145,12 @@ public:
const coll_t &get_cid() {
return cid;
}
+ protected:
+ CollectionImpl() = delete;
+ CollectionImpl(CephContext* cct, const coll_t& c) : RefCountedObject(cct), cid(c) {}
+ ~CollectionImpl() = default;
};
- typedef boost::intrusive_ptr<CollectionImpl> CollectionHandle;
+ using CollectionHandle = ceph::ref_t<CollectionImpl>;
/*********************************
diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc
index 2841e3daa24..9430b654b9b 100644
--- a/src/os/bluestore/BlueFS.cc
+++ b/src/os/bluestore/BlueFS.cc
@@ -463,7 +463,7 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
dout(1) << __func__ << " uuid " << super.uuid << dendl;
// init log
- FileRef log_file = new File;
+ FileRef log_file = ceph::make_ref<File>();
log_file->fnode.ino = 1;
log_file->fnode.prefer_bdev = BDEV_WAL;
int r = _allocate(
@@ -1060,7 +1060,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
if (!noop) {
map<string,DirRef>::iterator q = dir_map.find(dirname);
ceph_assert(q == dir_map.end());
- dir_map[dirname] = new Dir;
+ dir_map[dirname] = ceph::make_ref<Dir>();
}
}
break;
@@ -1465,7 +1465,7 @@ BlueFS::FileRef BlueFS::_get_file(uint64_t ino)
{
auto p = file_map.find(ino);
if (p == file_map.end()) {
- FileRef f = new File;
+ FileRef f = ceph::make_ref<File>();
file_map[ino] = f;
dout(30) << __func__ << " ino " << ino << " = " << f
<< " (new)" << dendl;
@@ -1959,7 +1959,7 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
// create a new log [writer] so that we know compaction is in progress
// (see _should_compact_log)
- new_log = new File;
+ new_log = ceph::make_ref<File>();
new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode
// 0. wait for any racing flushes to complete. (We do not want to block
@@ -2833,7 +2833,7 @@ int BlueFS::open_for_write(
<< " does not exist" << dendl;
return -ENOENT;
}
- file = new File;
+ file = ceph::make_ref<File>();
file->fnode.ino = ++ino_last;
file_map[ino_last] = file;
dir->file_map[filename] = file;
@@ -3012,7 +3012,7 @@ int BlueFS::mkdir(const string& dirname)
dout(20) << __func__ << " dir " << dirname << " exists" << dendl;
return -EEXIST;
}
- dir_map[dirname] = new Dir;
+ dir_map[dirname] = ceph::make_ref<Dir>();
log_t.op_dir_create(dirname);
return 0;
}
@@ -3085,12 +3085,12 @@ int BlueFS::lock_file(const string& dirname, const string& filename,
}
DirRef dir = p->second;
map<string,FileRef>::iterator q = dir->file_map.find(filename);
- File *file;
+ FileRef file;
if (q == dir->file_map.end()) {
dout(20) << __func__ << " dir " << dirname << " (" << dir
<< ") file " << filename
<< " not found, creating" << dendl;
- file = new File;
+ file = ceph::make_ref<File>();
file->fnode.ino = ++ino_last;
file->fnode.mtime = ceph_clock_now();
file_map[ino_last] = file;
@@ -3099,7 +3099,7 @@ int BlueFS::lock_file(const string& dirname, const string& filename,
log_t.op_file_update(file->fnode);
log_t.op_dir_link(dirname, filename, file->fnode.ino);
} else {
- file = q->second.get();
+ file = q->second;
if (file->locked) {
dout(10) << __func__ << " already locked" << dendl;
return -ENOLCK;
diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h
index 07e3334902e..14dc7bdf906 100644
--- a/src/os/bluestore/BlueFS.h
+++ b/src/os/bluestore/BlueFS.h
@@ -7,11 +7,13 @@
#include <mutex>
#include "bluefs_types.h"
-#include "common/RefCountedObj.h"
#include "BlockDevice.h"
+#include "common/RefCountedObj.h"
+#include "common/ceph_context.h"
+#include "global/global_context.h"
+
#include "boost/intrusive/list.hpp"
-#include <boost/intrusive_ptr.hpp>
class PerfCounters;
@@ -101,8 +103,10 @@ public:
std::atomic_int num_readers, num_writers;
std::atomic_int num_reading;
+ private:
+ FRIEND_MAKE_REF(File);
File()
- : RefCountedObject(NULL, 0),
+ :
refs(0),
dirty_seq(0),
locked(false),
@@ -117,15 +121,8 @@ public:
ceph_assert(num_reading.load() == 0);
ceph_assert(!locked);
}
-
- friend void intrusive_ptr_add_ref(File *f) {
- f->get();
- }
- friend void intrusive_ptr_release(File *f) {
- f->put();
- }
};
- typedef boost::intrusive_ptr<File> FileRef;
+ using FileRef = ceph::ref_t<File>;
typedef boost::intrusive::list<
File,
@@ -139,22 +136,17 @@ public:
mempool::bluefs::map<string,FileRef> file_map;
- Dir() : RefCountedObject(NULL, 0) {}
-
- friend void intrusive_ptr_add_ref(Dir *d) {
- d->get();
- }
- friend void intrusive_ptr_release(Dir *d) {
- d->put();
- }
+ private:
+ FRIEND_MAKE_REF(Dir);
+ Dir() = default;
};
- typedef boost::intrusive_ptr<Dir> DirRef;
+ using DirRef = ceph::ref_t<Dir>;
struct FileWriter {
MEMPOOL_CLASS_HELPERS();
FileRef file;
- uint64_t pos; ///< start offset for buffer
+ uint64_t pos = 0; ///< start offset for buffer
bufferlist buffer; ///< new data to write (at end of file)
bufferlist tail_block; ///< existing partial block at end of file, if any
bufferlist::page_aligned_appender buffer_appender; //< for const char* only
@@ -167,7 +159,6 @@ public:
FileWriter(FileRef f)
: file(f),
- pos(0),
buffer_appender(buffer.get_page_aligned_appender(
g_conf()->bluefs_alloc_size / CEPH_PAGE_SIZE)) {
++file->num_writers;
diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc
index e2c39e33f8a..872b29eb205 100644
--- a/src/os/bluestore/BlueStore.cc
+++ b/src/os/bluestore/BlueStore.cc
@@ -3502,7 +3502,7 @@ void BlueStore::DeferredBatch::_audit(CephContext *cct)
#define dout_prefix *_dout << "bluestore(" << store->path << ").collection(" << cid << " " << this << ") "
BlueStore::Collection::Collection(BlueStore *store_, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t cid)
- : CollectionImpl(cid),
+ : CollectionImpl(store_->cct, cid),
store(store_),
cache(bc),
exists(true),
@@ -5982,12 +5982,11 @@ int BlueStore::_open_collections()
it->next()) {
coll_t cid;
if (cid.parse(it->key())) {
- CollectionRef c(
- new Collection(
+ auto c = ceph::make_ref<Collection>(
this,
onode_cache_shards[cid.hash_to_shard(onode_cache_shards.size())],
buffer_cache_shards[cid.hash_to_shard(buffer_cache_shards.size())],
- cid));
+ cid);
bufferlist bl = it->value();
auto p = bl.cbegin();
try {
@@ -9197,13 +9196,13 @@ ObjectStore::CollectionHandle BlueStore::create_new_collection(
const coll_t& cid)
{
std::unique_lock l{coll_lock};
- Collection *c = new Collection(
+ auto c = ceph::make_ref<Collection>(
this,
onode_cache_shards[cid.hash_to_shard(onode_cache_shards.size())],
buffer_cache_shards[cid.hash_to_shard(buffer_cache_shards.size())],
cid);
new_coll_map[cid] = c;
- _osr_attach(c);
+ _osr_attach(c.get());
return c;
}
@@ -11345,7 +11344,7 @@ void BlueStore::_osr_attach(Collection *c)
std::lock_guard l(zombie_osr_lock);
auto p = zombie_osr_set.find(c->cid);
if (p == zombie_osr_set.end()) {
- c->osr = new OpSequencer(this, c->cid);
+ c->osr = ceph::make_ref<OpSequencer>(this, c->cid);
ldout(cct, 10) << __func__ << " " << c->cid
<< " fresh osr " << c->osr << dendl;
} else {
diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h
index 4b98c83bfe6..48ddc1299c4 100644
--- a/src/os/bluestore/BlueStore.h
+++ b/src/os/bluestore/BlueStore.h
@@ -1266,7 +1266,7 @@ public:
};
class OpSequencer;
- typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
+ using OpSequencerRef = ceph::ref_t<OpSequencer>;
struct Collection : public CollectionImpl {
BlueStore *store;
@@ -1659,14 +1659,6 @@ public:
std::atomic_bool zombie = {false}; ///< in zombie_osr set (collection going away)
- OpSequencer(BlueStore *store, const coll_t& c)
- : RefCountedObject(store->cct, 0),
- store(store), cid(c) {
- }
- ~OpSequencer() {
- ceph_assert(q.empty());
- }
-
void queue_new(TransContext *txc) {
std::lock_guard l(qlock);
txc->seq = ++last_seq;
@@ -1747,6 +1739,15 @@ public:
txc->oncommits.push_back(c);
return false;
}
+ private:
+ FRIEND_MAKE_REF(OpSequencer);
+ OpSequencer(BlueStore *store, const coll_t& c)
+ : RefCountedObject(store->cct),
+ store(store), cid(c) {
+ }
+ ~OpSequencer() {
+ ceph_assert(q.empty());
+ }
};
typedef boost::intrusive::list<
diff --git a/src/os/filestore/FileStore.cc b/src/os/filestore/FileStore.cc
index 30889581a1a..42d4b313069 100644
--- a/src/os/filestore/FileStore.cc
+++ b/src/os/filestore/FileStore.cc
@@ -2016,7 +2016,7 @@ void FileStore::init_temp_collections()
for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
if (p->is_temp())
continue;
- coll_map[*p] = new OpSequencer(cct, ++next_osr_id, *p);
+ coll_map[*p] = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, *p);
if (p->is_meta())
continue;
coll_t temp = p->get_temp();
@@ -2130,7 +2130,7 @@ ObjectStore::CollectionHandle FileStore::create_new_collection(const coll_t& c)
std::lock_guard l{coll_lock};
auto p = coll_map.find(c);
if (p == coll_map.end()) {
- auto *r = new OpSequencer(cct, ++next_osr_id, c);
+ auto r = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, c);
coll_map[c] = r;
return r;
} else {
diff --git a/src/os/filestore/FileStore.h b/src/os/filestore/FileStore.h
index ffd39a86003..e0a21e619a0 100644
--- a/src/os/filestore/FileStore.h
+++ b/src/os/filestore/FileStore.h
@@ -359,8 +359,10 @@ private:
}
}
+ private:
+ FRIEND_MAKE_REF(OpSequencer);
OpSequencer(CephContext* cct, int i, coll_t cid)
- : CollectionImpl(cid),
+ : CollectionImpl(cct, cid),
cct(cct),
osr_name_str(stringify(cid)),
id(i),
diff --git a/src/os/kstore/KStore.cc b/src/os/kstore/KStore.cc
index db28d355a8a..ce18d967701 100644
--- a/src/os/kstore/KStore.cc
+++ b/src/os/kstore/KStore.cc
@@ -565,7 +565,7 @@ int KStore::OnodeHashLRU::trim(int max)
#define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
KStore::Collection::Collection(KStore *ns, coll_t cid)
- : CollectionImpl(cid),
+ : CollectionImpl(ns->cct, cid),
store(ns),
osr(new OpSequencer()),
onode_map(store->cct)
@@ -895,7 +895,7 @@ int KStore::_open_collections(int *errors)
it->next()) {
coll_t cid;
if (cid.parse(it->key())) {
- CollectionRef c(new Collection(this, cid));
+ auto c = ceph::make_ref<Collection>(this, cid);
bufferlist bl = it->value();
auto p = bl.cbegin();
try {
@@ -1107,7 +1107,7 @@ ObjectStore::CollectionHandle KStore::open_collection(const coll_t& cid)
ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid)
{
- auto *c = new Collection(this, cid);
+ auto c = ceph::make_ref<Collection>(this, cid);
std::unique_lock l{coll_lock};
new_coll_map[cid] = c;
return c;
diff --git a/src/os/kstore/KStore.h b/src/os/kstore/KStore.h
index c79a586962c..06d2d61f200 100644
--- a/src/os/kstore/KStore.h
+++ b/src/os/kstore/KStore.h
@@ -161,9 +161,11 @@ public:
void flush() override;
bool flush_commit(Context *c) override;
+ private:
+ FRIEND_MAKE_REF(Collection);
Collection(KStore *ns, coll_t c);
};
- typedef boost::intrusive_ptr<Collection> CollectionRef;
+ using CollectionRef = ceph::ref_t<Collection>;
class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
CollectionRef c;
diff --git a/src/os/memstore/MemStore.cc b/src/os/memstore/MemStore.cc
index a4d1e981495..05d16edb6cc 100644
--- a/src/os/memstore/MemStore.cc
+++ b/src/os/memstore/MemStore.cc
@@ -157,7 +157,7 @@ int MemStore::_load()
int r = cbl.read_file(fn.c_str(), &err);
if (r < 0)
return r;
- CollectionRef c(new Collection(cct, *q));
+ auto c = ceph::make_ref<Collection>(cct, *q);
auto p = cbl.cbegin();
c->decode(p);
coll_map[*q] = c;
@@ -258,7 +258,7 @@ MemStore::CollectionRef MemStore::get_collection(const coll_t& cid)
ObjectStore::CollectionHandle MemStore::create_new_collection(const coll_t& cid)
{
std::lock_guard l{coll_lock};
- Collection *c = new Collection(cct, cid);
+ auto c = ceph::make_ref<Collection>(cct, cid);
new_coll_map[cid] = c;
return c;
}
@@ -1573,8 +1573,6 @@ struct MemStore::PageSetObject : public Object {
static thread_local PageSet::page_vector tls_pages;
#endif
- explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
-
size_t get_size() const override { return data_len; }
int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
@@ -1597,6 +1595,10 @@ struct MemStore::PageSetObject : public Object {
decode_base(p);
DECODE_FINISH(p);
}
+
+private:
+ FRIEND_MAKE_REF(PageSetObject);
+ explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
};
#if defined(__GLIBCXX__)
@@ -1800,6 +1802,6 @@ int MemStore::PageSetObject::truncate(uint64_t size)
MemStore::ObjectRef MemStore::Collection::create_object() const {
if (use_page_set)
- return new PageSetObject(cct->_conf->memstore_page_size);
+ return ceph::make_ref<PageSetObject>(cct->_conf->memstore_page_size);
return new BufferlistObject();
}
diff --git a/src/os/memstore/MemStore.h b/src/os/memstore/MemStore.h
index ec7cf53b501..55f55ee4bfa 100644
--- a/src/os/memstore/MemStore.h
+++ b/src/os/memstore/MemStore.h
@@ -36,11 +36,8 @@ public:
bufferlist omap_header;
map<string,bufferlist> omap;
- typedef boost::intrusive_ptr<Object> Ref;
- friend void intrusive_ptr_add_ref(Object *o) { o->get(); }
- friend void intrusive_ptr_release(Object *o) { o->put(); }
+ using Ref = ceph::ref_t<Object>;
- Object() : RefCountedObject(nullptr, 0) {}
// interface for object data
virtual size_t get_size() const = 0;
virtual int read(uint64_t offset, uint64_t len, bufferlist &bl) = 0;
@@ -90,8 +87,10 @@ public:
}
f->close_section();
}
+ protected:
+ Object() = default;
};
- typedef Object::Ref ObjectRef;
+ using ObjectRef = Object::Ref;
struct PageSetObject;
struct Collection : public CollectionImpl {
@@ -110,8 +109,6 @@ public:
ceph::make_mutex("MemStore::Collection::sequencer_mutex")};
typedef boost::intrusive_ptr<Collection> Ref;
- friend void intrusive_ptr_add_ref(Collection *c) { c->get(); }
- friend void intrusive_ptr_release(Collection *c) { c->put(); }
ObjectRef create_object() const;
@@ -184,8 +181,10 @@ public:
return true;
}
+ private:
+ FRIEND_MAKE_REF(Collection);
explicit Collection(CephContext *cct, coll_t c)
- : CollectionImpl(c),
+ : CollectionImpl(cct, c),
cct(cct),
use_page_set(cct->_conf->memstore_page_set) {}
};
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 2d8760e0093..96aed0b706e 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -450,7 +450,7 @@ HeartbeatStampsRef OSDService::get_hb_stamps(unsigned peer)
hb_stamps.resize(peer + 1);
}
if (!hb_stamps[peer]) {
- hb_stamps[peer].reset(new HeartbeatStamps(peer));
+ hb_stamps[peer] = ceph::make_ref<HeartbeatStamps>(peer);
}
return hb_stamps[peer];
}
@@ -4539,20 +4539,18 @@ void OSD::_add_heartbeat_peer(int p)
auto stamps = service.get_hb_stamps(p);
- Session *sb = new Session(cct, cons.first.get());
+ auto sb = ceph::make_ref<Session>(cct, cons.first.get());
sb->peer = p;
sb->stamps = stamps;
- RefCountedPtr sbref{sb, false};
hi->hb_interval_start = ceph_clock_now();
hi->con_back = cons.first.get();
- hi->con_back->set_priv(sbref);
+ hi->con_back->set_priv(sb);
- Session *sf = new Session(cct, cons.second.get());
+ auto sf = ceph::make_ref<Session>(cct, cons.second.get());
sf->peer = p;
sf->stamps = stamps;
- RefCountedPtr sfref{sf, false};
hi->con_front = cons.second.get();
- hi->con_front->set_priv(sfref);
+ hi->con_front->set_priv(sf);
dout(10) << "_add_heartbeat_peer: new peer osd." << p
<< " " << hi->con_back->get_peer_addr()
@@ -5691,11 +5689,9 @@ void OSD::ms_handle_fast_connect(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
- auto priv = con->get_priv();
- auto s = static_cast<Session*>(priv.get());
- if (!s) {
- s = new Session{cct, con};
- con->set_priv(RefCountedPtr{s, false});
+ if (auto s = ceph::ref_cast<Session>(con->get_priv()); !s) {
+ s = ceph::make_ref<Session>(cct, con);
+ con->set_priv(s);
dout(10) << " new session (outgoing) " << s << " con=" << s->con
<< " addr=" << s->con->get_peer_addr() << dendl;
// we don't connect to clients
@@ -5709,11 +5705,9 @@ void OSD::ms_handle_fast_accept(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
- auto priv = con->get_priv();
- auto s = static_cast<Session*>(priv.get());
- if (!s) {
- s = new Session{cct, con};
- con->set_priv(RefCountedPtr{s, false});
+ if (auto s = ceph::ref_cast<Session>(con->get_priv()); !s) {
+ s = ceph::make_ref<Session>(cct, con);
+ con->set_priv(s);
dout(10) << "new session (incoming)" << s << " con=" << con
<< " addr=" << con->get_peer_addr()
<< " must have raced with connect" << dendl;
@@ -5725,9 +5719,8 @@ void OSD::ms_handle_fast_accept(Connection *con)
bool OSD::ms_handle_reset(Connection *con)
{
- auto s = con->get_priv();
- auto session = static_cast<Session*>(s.get());
- dout(2) << "ms_handle_reset con " << con << " session " << session << dendl;
+ auto session = ceph::ref_cast<Session>(con->get_priv());
+ dout(2) << "ms_handle_reset con " << con << " session " << session.get() << dendl;
if (!session)
return false;
session->wstate.reset(con);
@@ -5736,7 +5729,7 @@ bool OSD::ms_handle_reset(Connection *con)
// note that we break session->con *before* the session_handle_reset
// cleanup below. this avoids a race between us and
// PG::add_backoff, Session::check_backoff, etc.
- session_handle_reset(SessionRef{session});
+ session_handle_reset(session);
return true;
}
@@ -5745,9 +5738,8 @@ bool OSD::ms_handle_refused(Connection *con)
if (!cct->_conf->osd_fast_fail_on_connection_refused)
return false;
- auto priv = con->get_priv();
- auto session = static_cast<Session*>(priv.get());
- dout(2) << "ms_handle_refused con " << con << " session " << session << dendl;
+ auto session = ceph::ref_cast<Session>(con->get_priv());
+ dout(2) << "ms_handle_refused con " << con << " session " << session.get() << dendl;
if (!session)
return false;
int type = con->get_peer_type();
@@ -6320,8 +6312,7 @@ void OSD::handle_command(MMonCommand *m)
void OSD::handle_command(MCommand *m)
{
ConnectionRef con = m->get_connection();
- auto priv = con->get_priv();
- auto session = static_cast<Session *>(priv.get());
+ auto session = ceph::ref_cast<Session>(con->get_priv());
if (!session) {
con->send_message(new MCommandReply(m, -EPERM));
m->put();
@@ -6329,7 +6320,6 @@ void OSD::handle_command(MCommand *m)
}
OSDCap& caps = session->caps;
- priv.reset();
if (!caps.allow_all() || m->get_source().is_mon()) {
con->send_message(new MCommandReply(m, -EPERM));
@@ -7154,8 +7144,7 @@ void OSDService::maybe_share_map(
{
// NOTE: we assume caller hold something that keeps the Connection itself
// pinned (e.g., an OpRequest's MessageRef).
- auto priv = con->get_priv();
- auto session = static_cast<Session*>(priv.get());
+ auto session = ceph::ref_cast<Session>(con->get_priv());
if (!session) {
return;
}
@@ -7191,7 +7180,7 @@ void OSDService::maybe_share_map(
session->sent_epoch_lock.unlock();
}
-void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
+void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap)
{
ceph_assert(ceph_mutex_is_locked(session->session_dispatch_lock));
@@ -7327,11 +7316,10 @@ void OSD::ms_fast_dispatch(Message *m)
int OSD::ms_handle_authentication(Connection *con)
{
int ret = 0;
- auto priv = con->get_priv();
- Session *s = static_cast<Session*>(priv.get());
+ auto s = ceph::ref_cast<Session>(con->get_priv());
if (!s) {
- s = new Session(cct, con);
- con->set_priv(RefCountedPtr{s, false});
+ s = ceph::make_ref<Session>(cct, con);
+ con->set_priv(s);
s->entity_name = con->get_peer_entity_name();
dout(10) << __func__ << " new session " << s << " con " << s->con
<< " entity " << s->entity_name
@@ -8017,9 +8005,8 @@ void OSD::handle_osd_map(MOSDMap *m)
return;
}
- auto priv = m->get_connection()->get_priv();
- if (auto session = static_cast<Session *>(priv.get());
- session && !(session->entity_name.is_mon() ||
+ auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv());
+ if (session && !(session->entity_name.is_mon() ||
session->entity_name.is_osd())) {
//not enough perms!
dout(10) << "got osd map from Session " << session
@@ -9055,8 +9042,7 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map,
<< dendl;
ConnectionRef con = m->get_connection();
con->mark_down();
- auto priv = con->get_priv();
- if (auto s = static_cast<Session*>(priv.get()); s) {
+ if (auto s = ceph::ref_cast<Session>(con->get_priv()); s) {
if (!is_fast_dispatch)
s->session_dispatch_lock.lock();
clear_session_waiting_on_map(s);
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 0468c66c416..3c8d9678a23 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -1334,36 +1334,35 @@ private:
// -- sessions --
private:
- void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
+ void dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap);
ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
- set<SessionRef> session_waiting_for_map;
+ set<ceph::ref_t<Session>> session_waiting_for_map;
/// Caller assumes refs for included Sessions
- void get_sessions_waiting_for_map(set<SessionRef> *out) {
+ void get_sessions_waiting_for_map(set<ceph::ref_t<Session>> *out) {
std::lock_guard l(session_waiting_lock);
out->swap(session_waiting_for_map);
}
- void register_session_waiting_on_map(SessionRef session) {
+ void register_session_waiting_on_map(const ceph::ref_t<Session>& session) {
std::lock_guard l(session_waiting_lock);
session_waiting_for_map.insert(session);
}
- void clear_session_waiting_on_map(SessionRef session) {
+ void clear_session_waiting_on_map(const ceph::ref_t<Session>& session) {
std::lock_guard l(session_waiting_lock);
session_waiting_for_map.erase(session);
}
void dispatch_sessions_waiting_on_map() {
- set<SessionRef> sessions_to_check;
+ set<ceph::ref_t<Session>> sessions_to_check;
get_sessions_waiting_for_map(&sessions_to_check);
for (auto i = sessions_to_check.begin();
i != sessions_to_check.end();
sessions_to_check.erase(i++)) {
std::lock_guard l{(*i)->session_dispatch_lock};
- SessionRef session = *i;
- dispatch_session_waiting(session, osdmap);
+ dispatch_session_waiting(*i, osdmap);
}
}
- void session_handle_reset(SessionRef session) {
+ void session_handle_reset(const ceph::ref_t<Session>& session) {
std::lock_guard l(session->session_dispatch_lock);
clear_session_waiting_on_map(session);
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 3d02bab0580..ae28ad704d1 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -605,24 +605,22 @@ void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
snap_mapper.update_bits(split_bits);
}
-void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end)
+void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
{
- ConnectionRef con = s->con;
+ auto con = s->con;
if (!con) // OSD::ms_handle_reset clears s->con without a lock
return;
- BackoffRef b(s->have_backoff(info.pgid, begin));
+ auto b = s->have_backoff(info.pgid, begin);
if (b) {
derr << __func__ << " already have backoff for " << s << " begin " << begin
<< " " << *b << dendl;
ceph_abort();
}
std::lock_guard l(backoff_lock);
- {
- b = new Backoff(info.pgid, this, s, ++s->backoff_seq, begin, end);
- backoffs[begin].insert(b);
- s->add_backoff(b);
- dout(10) << __func__ << " session " << s << " added " << *b << dendl;
- }
+ b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
+ backoffs[begin].insert(b);
+ s->add_backoff(b);
+ dout(10) << __func__ << " session " << s << " added " << *b << dendl;
con->send_message(
new MOSDBackoff(
info.pgid,
@@ -636,7 +634,7 @@ void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end)
void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
{
dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
- vector<BackoffRef> bv;
+ vector<ceph::ref_t<Backoff>> bv;
{
std::lock_guard l(backoff_lock);
auto p = backoffs.lower_bound(begin);
@@ -698,7 +696,7 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
void PG::clear_backoffs()
{
dout(10) << __func__ << " " << dendl;
- map<hobject_t,set<BackoffRef>> ls;
+ map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
{
std::lock_guard l(backoff_lock);
ls.swap(backoffs);
@@ -722,7 +720,7 @@ void PG::clear_backoffs()
}
// called by Session::clear_backoffs()
-void PG::rm_backoff(BackoffRef b)
+void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
{
dout(10) << __func__ << " " << *b << dendl;
std::lock_guard l(backoff_lock);
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 585c93e51b0..1eb149d5545 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -356,7 +356,7 @@ public:
__u8 &);
static bool _has_removal_flag(ObjectStore *store, spg_t pgid);
- void rm_backoff(BackoffRef b);
+ void rm_backoff(const ceph::ref_t<Backoff>& b);
void update_snap_mapper_bits(uint32_t bits) {
snap_mapper.update_bits(bits);
@@ -1063,16 +1063,16 @@ protected:
// -- backoff --
ceph::mutex backoff_lock = // orders inside Backoff::lock
ceph::make_mutex("PG::backoff_lock");
- map<hobject_t,set<BackoffRef>> backoffs;
+ map<hobject_t,set<ceph::ref_t<Backoff>>> backoffs;
- void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end);
+ void add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end);
void release_backoffs(const hobject_t& begin, const hobject_t& end);
void release_backoffs(const hobject_t& o) {
release_backoffs(o, o);
}
void clear_backoffs();
- void add_pg_backoff(SessionRef s) {
+ void add_pg_backoff(const ceph::ref_t<Session>& s) {
hobject_t begin = info.pgid.pgid.get_hobj_start();
hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
add_backoff(s, begin, end);
diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h
index cbd6f500b91..fac4109c122 100644
--- a/src/osd/PeeringState.h
+++ b/src/osd/PeeringState.h
@@ -109,10 +109,6 @@ struct HeartbeatStamps : public RefCountedObject {
/// highest up_from we've seen from this rank
epoch_t up_from = 0;
- HeartbeatStamps(int o)
- : RefCountedObject(NULL, 0),
- osd(o) {}
-
void print(ostream& out) const {
std::lock_guard l(lock);
out << "hbstamp(osd." << osd << " up_from " << up_from
@@ -163,8 +159,13 @@ struct HeartbeatStamps : public RefCountedObject {
peer_clock_delta_ub = delta_ub;
}
+private:
+ FRIEND_MAKE_REF(HeartbeatStamps);
+ HeartbeatStamps(int o)
+ : RefCountedObject(NULL),
+ osd(o) {}
};
-typedef boost::intrusive_ptr<HeartbeatStamps> HeartbeatStampsRef;
+using HeartbeatStampsRef = ceph::ref_t<HeartbeatStamps>;
inline ostream& operator<<(ostream& out, const HeartbeatStamps& hb)
{
diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc
index fa58df8231e..eb06312d0a7 100644
--- a/src/osd/PrimaryLogPG.cc
+++ b/src/osd/PrimaryLogPG.cc
@@ -1439,7 +1439,7 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
void PrimaryLogPG::handle_backoff(OpRequestRef& op)
{
auto m = op->get_req<MOSDBackoff>();
- SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
+ auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv());
if (!session)
return; // drop it.
hobject_t begin = info.pgid.pgid.get_hobj_start();
@@ -1490,7 +1490,7 @@ void PrimaryLogPG::do_request(
const Message *m = op->get_req();
int msg_type = m->get_type();
if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
- SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
+ auto session = ceph::ref_cast<Session>(m->get_connection()->get_priv());
if (!session)
return; // drop it.
@@ -1674,7 +1674,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
bool can_backoff =
m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
- SessionRef session;
+ ceph::ref_t<Session> session;
if (can_backoff) {
session = static_cast<Session*>(m->get_connection()->get_priv().get());
if (!session.get()) {
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index f7d923f9748..187fb6a1556 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -281,10 +281,10 @@ void ReplicatedBackend::objects_read_async(
class C_OSD_OnOpCommit : public Context {
ReplicatedBackend *pg;
- ReplicatedBackend::InProgressOpRef op;
+ ceph::ref_t<ReplicatedBackend::InProgressOp> op;
public:
- C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
- : pg(pg), op(op) {}
+ C_OSD_OnOpCommit(ReplicatedBackend *pg, ceph::ref_t<ReplicatedBackend::InProgressOp> op)
+ : pg(pg), op(std::move(op)) {}
void finish(int) override {
pg->op_commit(op);
}
@@ -479,7 +479,7 @@ void ReplicatedBackend::submit_transaction(
auto insert_res = in_progress_ops.insert(
make_pair(
tid,
- new InProgressOp(
+ ceph::make_ref<InProgressOp>(
tid, on_all_commit,
orig_op, at_version)
)
@@ -529,8 +529,7 @@ void ReplicatedBackend::submit_transaction(
}
}
-void ReplicatedBackend::op_commit(
- InProgressOpRef& op)
+void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op)
{
if (op->on_commit == nullptr) {
// aborted
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
index f70a0501179..4e272e492ee 100644
--- a/src/osd/ReplicatedBackend.h
+++ b/src/osd/ReplicatedBackend.h
@@ -339,18 +339,17 @@ private:
Context *on_commit;
OpRequestRef op;
eversion_t v;
- InProgressOp(
- ceph_tid_t tid, Context *on_commit,
- OpRequestRef op, eversion_t v)
- : RefCountedObject(nullptr, 0),
- tid(tid), on_commit(on_commit),
- op(op), v(v) {}
bool done() const {
return waiting_for_commit.empty();
}
+ private:
+ FRIEND_MAKE_REF(InProgressOp);
+ InProgressOp(ceph_tid_t tid, Context *on_commit, OpRequestRef op, eversion_t v)
+ :
+ tid(tid), on_commit(on_commit),
+ op(op), v(v) {}
};
- typedef boost::intrusive_ptr<InProgressOp> InProgressOpRef;
- map<ceph_tid_t, InProgressOpRef> in_progress_ops;
+ map<ceph_tid_t, ceph::ref_t<InProgressOp>> in_progress_ops;
public:
friend class C_OSD_OnOpCommit;
@@ -403,7 +402,7 @@ private:
std::optional<pg_hit_set_history_t> &hset_history,
InProgressOp *op,
ObjectStore::Transaction &op_t);
- void op_commit(InProgressOpRef& op);
+ void op_commit(const ceph::ref_t<InProgressOp>& op);
void do_repop_reply(OpRequestRef op);
void do_repop(OpRequestRef op);
diff --git a/src/osd/Session.cc b/src/osd/Session.cc
index 44b5817a374..c3699593e5a 100644
--- a/src/osd/Session.cc
+++ b/src/osd/Session.cc
@@ -11,7 +11,7 @@
void Session::clear_backoffs()
{
- map<spg_t,map<hobject_t,set<BackoffRef>>> ls;
+ map<spg_t,map<hobject_t,set<ceph::ref_t<Backoff>>>> ls;
{
std::lock_guard l(backoff_lock);
ls.swap(backoffs);
@@ -85,7 +85,7 @@ void Session::ack_backoff(
bool Session::check_backoff(
CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m)
{
- BackoffRef b(have_backoff(pgid, oid));
+ auto b = have_backoff(pgid, oid);
if (b) {
dout(10) << __func__ << " session " << this << " has backoff " << *b
<< " for " << *m << dendl;
diff --git a/src/osd/Session.h b/src/osd/Session.h
index 432d3ee8744..27000a95a56 100644
--- a/src/osd/Session.h
+++ b/src/osd/Session.h
@@ -26,10 +26,6 @@
//#define PG_DEBUG_REFS
-struct Session;
-typedef boost::intrusive_ptr<Session> SessionRef;
-struct Backoff;
-typedef boost::intrusive_ptr<Backoff> BackoffRef;
class PG;
#ifdef PG_DEBUG_REFS
#include "common/tracked_int_ptr.hpp"
@@ -103,20 +99,9 @@ struct Backoff : public RefCountedObject {
// - both null (teardown), or
// - only session is set (and state == DELETING)
PGRef pg; ///< owning pg
- SessionRef session; ///< owning session
+ ceph::ref_t<class Session> session; ///< owning session
hobject_t begin, end; ///< [) range to block, unless ==, then single obj
- Backoff(spg_t pgid, PGRef pg, SessionRef s,
- uint64_t i,
- const hobject_t& b, const hobject_t& e)
- : RefCountedObject(g_ceph_context, 0),
- pgid(pgid),
- id(i),
- pg(pg),
- session(s),
- begin(b),
- end(e) {}
-
friend ostream& operator<<(ostream& out, const Backoff& b) {
return out << "Backoff(" << &b << " " << b.pgid << " " << b.id
<< " " << b.get_state_name()
@@ -124,6 +109,19 @@ struct Backoff : public RefCountedObject {
<< " session " << b.session
<< " pg " << b.pg << ")";
}
+
+private:
+ FRIEND_MAKE_REF(Backoff);
+ Backoff(spg_t pgid, PGRef pg, ceph::ref_t<Session> s,
+ uint64_t i,
+ const hobject_t& b, const hobject_t& e)
+ : RefCountedObject(g_ceph_context),
+ pgid(pgid),
+ id(i),
+ pg(pg),
+ session(std::move(s)),
+ begin(b),
+ end(e) {}
};
@@ -140,12 +138,12 @@ struct Session : public RefCountedObject {
boost::intrusive::list<OpRequest> waiting_on_map;
ceph::spinlock sent_epoch_lock;
- epoch_t last_sent_epoch;
+ epoch_t last_sent_epoch = 0;
/// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock");
std::atomic<int> backoff_count= {0}; ///< simple count of backoffs
- map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs;
+ map<spg_t,map<hobject_t,set<ceph::ref_t<Backoff>>>> backoffs;
std::atomic<uint64_t> backoff_seq = {0};
@@ -153,14 +151,6 @@ struct Session : public RefCountedObject {
int peer = -1;
HeartbeatStampsRef stamps;
- explicit Session(CephContext *cct, Connection *con_) :
- RefCountedObject(cct),
- con(con_),
- socket_addr(con_->get_peer_socket_addr()),
- wstate(cct),
- last_sent_epoch(0)
- {}
-
entity_addr_t& get_peer_socket_addr() {
return socket_addr;
}
@@ -172,7 +162,7 @@ struct Session : public RefCountedObject {
const hobject_t& start,
const hobject_t& end);
- BackoffRef have_backoff(spg_t pgid, const hobject_t& oid) {
+ ceph::ref_t<Backoff> have_backoff(spg_t pgid, const hobject_t& oid) {
if (!backoff_count.load()) {
return nullptr;
}
@@ -203,15 +193,15 @@ struct Session : public RefCountedObject {
bool check_backoff(
CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m);
- void add_backoff(BackoffRef b) {
+ void add_backoff(ceph::ref_t<Backoff> b) {
std::lock_guard l(backoff_lock);
ceph_assert(!backoff_count == backoffs.empty());
- backoffs[b->pgid][b->begin].insert(b);
+ backoffs[b->pgid][b->begin].insert(std::move(b));
++backoff_count;
}
// called by PG::release_*_backoffs and PG::clear_backoffs()
- void rm_backoff(BackoffRef b) {
+ void rm_backoff(const ceph::ref_t<Backoff>& b) {
std::lock_guard l(backoff_lock);
ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
ceph_assert(b->session == this);
@@ -236,6 +226,15 @@ struct Session : public RefCountedObject {
ceph_assert(!backoff_count == backoffs.empty());
}
void clear_backoffs();
+
+private:
+ FRIEND_MAKE_REF(Session);
+ explicit Session(CephContext *cct, Connection *con_) :
+ RefCountedObject(cct),
+ con(con_),
+ socket_addr(con_->get_peer_socket_addr()),
+ wstate(cct)
+ {}
};
#endif
diff --git a/src/test/direct_messenger/DirectMessenger.cc b/src/test/direct_messenger/DirectMessenger.cc
index 076f5fc39a6..c554a379402 100644
--- a/src/test/direct_messenger/DirectMessenger.cc
+++ b/src/test/direct_messenger/DirectMessenger.cc
@@ -28,13 +28,15 @@ class DirectConnection : public Connection {
/// clear this pointer) before dropping its own reference
std::atomic<Connection*> reply_connection{nullptr};
- public:
+ private:
+ FRIEND_MAKE_REF(DirectConnection);
DirectConnection(CephContext *cct, DirectMessenger *m,
DispatchStrategy *dispatchers)
: Connection(cct, m),
dispatchers(dispatchers)
{}
+ public:
/// sets the Connection that will receive replies to outgoing messages
void set_direct_reply_connection(ConnectionRef conn);
@@ -100,8 +102,7 @@ static ConnectionRef create_loopback(DirectMessenger *m,
entity_name_t name,
DispatchStrategy *dispatchers)
{
- auto loopback = boost::intrusive_ptr<DirectConnection>(
- new DirectConnection(m->cct, m, dispatchers));
+ auto loopback = ceph::make_ref<DirectConnection>(m->cct, m, dispatchers);
// loopback replies go to itself
loopback->set_direct_reply_connection(loopback);
loopback->set_peer_type(name.type());
@@ -131,8 +132,7 @@ int DirectMessenger::set_direct_peer(DirectMessenger *peer)
peer_inst = peer->get_myinst();
// allocate a Connection that dispatches to the peer messenger
- auto direct_connection = boost::intrusive_ptr<DirectConnection>(
- new DirectConnection(cct, peer, peer->dispatchers.get()));
+ auto direct_connection = ceph::make_ref<DirectConnection>(cct, peer, peer->dispatchers.get());
direct_connection->set_peer_addr(peer_inst.addr);
direct_connection->set_peer_type(peer_inst.name.type());
diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc
index 57325380186..64d05eef703 100644
--- a/src/test/journal/RadosTestFixture.cc
+++ b/src/test/journal/RadosTestFixture.cc
@@ -68,15 +68,15 @@ int RadosTestFixture::create(const std::string &oid, uint8_t order,
return cls::journal::client::create(m_ioctx, oid, order, splay_width, -1);
}
-journal::JournalMetadataPtr RadosTestFixture::create_metadata(
+ceph::ref_t<journal::JournalMetadata> RadosTestFixture::create_metadata(
const std::string &oid, const std::string &client_id,
double commit_interval, int max_concurrent_object_sets) {
journal::Settings settings;
settings.commit_interval = commit_interval;
settings.max_concurrent_object_sets = max_concurrent_object_sets;
- journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
- m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings));
+ auto metadata = ceph::make_ref<journal::JournalMetadata>(
+ m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings);
m_metadatas.push_back(metadata);
return metadata;
}
@@ -109,13 +109,13 @@ bufferlist RadosTestFixture::create_payload(const std::string &payload) {
return bl;
}
-int RadosTestFixture::init_metadata(journal::JournalMetadataPtr metadata) {
+int RadosTestFixture::init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata) {
C_SaferCond cond;
metadata->init(&cond);
return cond.wait();
}
-bool RadosTestFixture::wait_for_update(journal::JournalMetadataPtr metadata) {
+bool RadosTestFixture::wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata) {
std::unique_lock locker{m_listener.mutex};
while (m_listener.updates[metadata.get()] == 0) {
if (m_listener.cond.wait_for(locker, 10s) == std::cv_status::timeout) {
diff --git a/src/test/journal/RadosTestFixture.h b/src/test/journal/RadosTestFixture.h
index 9600c839bd2..8ec66293102 100644
--- a/src/test/journal/RadosTestFixture.h
+++ b/src/test/journal/RadosTestFixture.h
@@ -23,7 +23,7 @@ public:
int create(const std::string &oid, uint8_t order = 14,
uint8_t splay_width = 2);
- journal::JournalMetadataPtr create_metadata(const std::string &oid,
+ ceph::ref_t<journal::JournalMetadata> create_metadata(const std::string &oid,
const std::string &client_id = "client",
double commit_internal = 0.1,
int max_concurrent_object_sets = 0);
@@ -52,9 +52,9 @@ public:
}
};
- int init_metadata(journal::JournalMetadataPtr metadata);
+ int init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata);
- bool wait_for_update(journal::JournalMetadataPtr metadata);
+ bool wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata);
static std::string _pool_name;
static librados::Rados _rados;
@@ -70,5 +70,5 @@ public:
Listener m_listener;
- std::list<journal::JournalMetadataPtr> m_metadatas;
+ std::list<ceph::ref_t<journal::JournalMetadata>> m_metadatas;
};
diff --git a/src/test/journal/test_FutureImpl.cc b/src/test/journal/test_FutureImpl.cc
index e4e127d6ee9..1ff346dcf3b 100644
--- a/src/test/journal/test_FutureImpl.cc
+++ b/src/test/journal/test_FutureImpl.cc
@@ -9,46 +9,39 @@
class TestFutureImpl : public RadosTestFixture {
public:
struct FlushHandler : public journal::FutureImpl::FlushHandler {
- uint64_t refs;
- uint64_t flushes;
- FlushHandler() : refs(0), flushes(0) {}
- void get() override {
- ++refs;
- }
- void put() override {
- ceph_assert(refs > 0);
- --refs;
- }
- void flush(const journal::FutureImplPtr &future) override {
+ uint64_t flushes = 0;
+ void flush(const ceph::ref_t<journal::FutureImpl>& future) override {
++flushes;
}
+ FlushHandler() = default;
};
- journal::FutureImplPtr create_future(uint64_t tag_tid, uint64_t entry_tid,
- uint64_t commit_tid,
- const journal::FutureImplPtr &prev =
- journal::FutureImplPtr()) {
- journal::FutureImplPtr future(new journal::FutureImpl(tag_tid,
- entry_tid,
- commit_tid));
+ TestFutureImpl() {
+ m_flush_handler = std::make_shared<FlushHandler>();
+ }
+
+ auto create_future(uint64_t tag_tid, uint64_t entry_tid,
+ uint64_t commit_tid,
+ ceph::ref_t<journal::FutureImpl> prev = nullptr) {
+ auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, commit_tid);
future->init(prev);
return future;
}
- void flush(const journal::FutureImplPtr &future) {
+ void flush(const ceph::ref_t<journal::FutureImpl>& future) {
}
- FlushHandler m_flush_handler;
+ std::shared_ptr<FlushHandler> m_flush_handler;
};
TEST_F(TestFutureImpl, Getters) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
+ auto future = create_future(234, 123, 456);
ASSERT_EQ(234U, future->get_tag_tid());
ASSERT_EQ(123U, future->get_entry_tid());
ASSERT_EQ(456U, future->get_commit_tid());
@@ -58,68 +51,68 @@ TEST_F(TestFutureImpl, Attach) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
- ASSERT_FALSE(future->attach(&m_flush_handler));
- ASSERT_EQ(1U, m_flush_handler.refs);
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
+ ASSERT_EQ(2U, m_flush_handler.use_count());
}
TEST_F(TestFutureImpl, AttachWithPendingFlush) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
+ auto future = create_future(234, 123, 456);
future->flush(NULL);
- ASSERT_TRUE(future->attach(&m_flush_handler));
- ASSERT_EQ(1U, m_flush_handler.refs);
+ ASSERT_TRUE(future->attach(m_flush_handler));
+ ASSERT_EQ(2U, m_flush_handler.use_count());
}
TEST_F(TestFutureImpl, Detach) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
- ASSERT_FALSE(future->attach(&m_flush_handler));
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
future->detach();
- ASSERT_EQ(0U, m_flush_handler.refs);
+ ASSERT_EQ(1U, m_flush_handler.use_count());
}
TEST_F(TestFutureImpl, DetachImplicit) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
- ASSERT_FALSE(future->attach(&m_flush_handler));
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
future.reset();
- ASSERT_EQ(0U, m_flush_handler.refs);
+ ASSERT_EQ(1U, m_flush_handler.use_count());
}
TEST_F(TestFutureImpl, Flush) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
- ASSERT_FALSE(future->attach(&m_flush_handler));
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
C_SaferCond cond;
future->flush(&cond);
- ASSERT_EQ(1U, m_flush_handler.flushes);
+ ASSERT_EQ(1U, m_flush_handler->flushes);
future->safe(-EIO);
ASSERT_EQ(-EIO, cond.wait());
}
@@ -128,14 +121,14 @@ TEST_F(TestFutureImpl, FlushWithoutContext) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
- ASSERT_FALSE(future->attach(&m_flush_handler));
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
future->flush(NULL);
- ASSERT_EQ(1U, m_flush_handler.flushes);
+ ASSERT_EQ(1U, m_flush_handler->flushes);
future->safe(-EIO);
ASSERT_TRUE(future->is_complete());
ASSERT_EQ(-EIO, future->get_return_value());
@@ -145,25 +138,23 @@ TEST_F(TestFutureImpl, FlushChain) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future1 = create_future(234, 123, 456);
- journal::FutureImplPtr future2 = create_future(234, 124, 457,
- future1);
- journal::FutureImplPtr future3 = create_future(235, 1, 458,
- future2);
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
+ auto future3 = create_future(235, 1, 458, future2);
- FlushHandler flush_handler;
- ASSERT_FALSE(future1->attach(&m_flush_handler));
- ASSERT_FALSE(future2->attach(&flush_handler));
- ASSERT_FALSE(future3->attach(&m_flush_handler));
+ auto flush_handler = std::make_shared<FlushHandler>();
+ ASSERT_FALSE(future1->attach(m_flush_handler));
+ ASSERT_FALSE(future2->attach(flush_handler));
+ ASSERT_FALSE(future3->attach(m_flush_handler));
C_SaferCond cond;
future3->flush(&cond);
- ASSERT_EQ(1U, m_flush_handler.flushes);
- ASSERT_EQ(1U, flush_handler.flushes);
+ ASSERT_EQ(1U, m_flush_handler->flushes);
+ ASSERT_EQ(1U, flush_handler->flushes);
future3->safe(0);
ASSERT_FALSE(future3->is_complete());
@@ -182,20 +173,19 @@ TEST_F(TestFutureImpl, FlushInProgress) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future1 = create_future(234, 123, 456);
- journal::FutureImplPtr future2 = create_future(234, 124, 457,
- future1);
- ASSERT_FALSE(future1->attach(&m_flush_handler));
- ASSERT_FALSE(future2->attach(&m_flush_handler));
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
+ ASSERT_FALSE(future1->attach(m_flush_handler));
+ ASSERT_FALSE(future2->attach(m_flush_handler));
future1->set_flush_in_progress();
ASSERT_TRUE(future1->is_flush_in_progress());
future1->flush(NULL);
- ASSERT_EQ(0U, m_flush_handler.flushes);
+ ASSERT_EQ(0U, m_flush_handler->flushes);
future1->safe(0);
}
@@ -204,10 +194,10 @@ TEST_F(TestFutureImpl, FlushAlreadyComplete) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 123, 456);
+ auto future = create_future(234, 123, 456);
future->safe(-EIO);
C_SaferCond cond;
@@ -219,10 +209,10 @@ TEST_F(TestFutureImpl, Wait) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 1, 456);
+ auto future = create_future(234, 1, 456);
C_SaferCond cond;
future->wait(&cond);
@@ -234,10 +224,10 @@ TEST_F(TestFutureImpl, WaitAlreadyComplete) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future = create_future(234, 1, 456);
+ auto future = create_future(234, 1, 456);
future->safe(-EEXIST);
C_SaferCond cond;
@@ -249,12 +239,11 @@ TEST_F(TestFutureImpl, SafePreservesError) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future1 = create_future(234, 123, 456);
- journal::FutureImplPtr future2 = create_future(234, 124, 457,
- future1);
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
future1->safe(-EIO);
future2->safe(-EEXIST);
@@ -266,12 +255,11 @@ TEST_F(TestFutureImpl, ConsistentPreservesError) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::FutureImplPtr future1 = create_future(234, 123, 456);
- journal::FutureImplPtr future2 = create_future(234, 124, 457,
- future1);
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
future2->safe(-EEXIST);
future1->safe(-EIO);
diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc
index 7cecb6fb1ce..4108d4da3ec 100644
--- a/src/test/journal/test_JournalMetadata.cc
+++ b/src/test/journal/test_JournalMetadata.cc
@@ -18,25 +18,25 @@ public:
RadosTestFixture::TearDown();
}
- journal::JournalMetadataPtr create_metadata(const std::string &oid,
- const std::string &client_id,
- double commit_interval = 0.1,
- int max_concurrent_object_sets = 0) {
- journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata(
+ auto create_metadata(const std::string &oid,
+ const std::string &client_id,
+ double commit_interval = 0.1,
+ int max_concurrent_object_sets = 0) {
+ auto metadata = RadosTestFixture::create_metadata(
oid, client_id, commit_interval, max_concurrent_object_sets);
m_metadata_list.push_back(metadata);
metadata->add_listener(&m_listener);
return metadata;
}
- typedef std::list<journal::JournalMetadataPtr> MetadataList;
+ typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList;
MetadataList m_metadata_list;
};
TEST_F(TestJournalMetadata, JournalDNE) {
std::string oid = get_temp_oid();
- journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ auto metadata1 = create_metadata(oid, "client1");
ASSERT_EQ(-ENOENT, init_metadata(metadata1));
}
@@ -46,10 +46,10 @@ TEST_F(TestJournalMetadata, ClientDNE) {
ASSERT_EQ(0, create(oid, 14, 2));
ASSERT_EQ(0, client_register(oid, "client1", ""));
- journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ auto metadata1 = create_metadata(oid, "client1");
ASSERT_EQ(0, init_metadata(metadata1));
- journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client2");
+ auto metadata2 = create_metadata(oid, "client2");
ASSERT_EQ(-ENOENT, init_metadata(metadata2));
}
@@ -59,10 +59,10 @@ TEST_F(TestJournalMetadata, Committed) {
ASSERT_EQ(0, create(oid, 14, 2));
ASSERT_EQ(0, client_register(oid, "client1", ""));
- journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1", 600);
+ auto metadata1 = create_metadata(oid, "client1", 600);
ASSERT_EQ(0, init_metadata(metadata1));
- journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1");
+ auto metadata2 = create_metadata(oid, "client1");
ASSERT_EQ(0, init_metadata(metadata2));
ASSERT_TRUE(wait_for_update(metadata2));
@@ -104,7 +104,7 @@ TEST_F(TestJournalMetadata, UpdateActiveObject) {
ASSERT_EQ(0, create(oid, 14, 2));
ASSERT_EQ(0, client_register(oid, "client1", ""));
- journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ auto metadata1 = create_metadata(oid, "client1");
ASSERT_EQ(0, init_metadata(metadata1));
ASSERT_TRUE(wait_for_update(metadata1));
@@ -124,7 +124,7 @@ TEST_F(TestJournalMetadata, DisconnectLaggyClient) {
ASSERT_EQ(0, client_register(oid, "client2", "laggy"));
int max_concurrent_object_sets = 100;
- journal::JournalMetadataPtr metadata =
+ auto metadata =
create_metadata(oid, "client1", 0.1, max_concurrent_object_sets);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
@@ -186,7 +186,7 @@ TEST_F(TestJournalMetadata, AssertActiveTag) {
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid, "client1", ""));
- journal::JournalMetadataPtr metadata = create_metadata(oid, "client1");
+ auto metadata = create_metadata(oid, "client1");
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc
index 94490c2357a..1601705dacf 100644
--- a/src/test/journal/test_JournalPlayer.cc
+++ b/src/test/journal/test_JournalPlayer.cc
@@ -32,9 +32,6 @@ public:
: entries_available(false), complete(false),
complete_result(0) {}
- void get() override {}
- void put() override {}
-
void handle_entries_available() override {
std::lock_guard locker{lock};
entries_available = true;
@@ -57,7 +54,7 @@ public:
RadosTestFixture::TearDown();
}
- journal::JournalMetadataPtr create_metadata(const std::string &oid) {
+ auto create_metadata(const std::string &oid) {
return RadosTestFixture::create_metadata(oid, "client", 0.1,
max_fetch_bytes);
}
@@ -75,7 +72,7 @@ public:
}
journal::JournalPlayer *create_player(const std::string &oid,
- const journal::JournalMetadataPtr &metadata) {
+ const ceph::ref_t<journal::JournalMetadata>& metadata) {
journal::JournalPlayer *player(new journal::JournalPlayer(
m_ioctx, oid + ".", metadata, &m_replay_hander, nullptr));
m_players.push_back(player);
@@ -156,7 +153,7 @@ TYPED_TEST(TestJournalPlayer, Prefetch) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -202,7 +199,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -237,7 +234,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -277,7 +274,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -316,7 +313,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -350,7 +347,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -400,7 +397,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -436,7 +433,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -475,7 +472,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -511,7 +508,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -548,7 +545,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -586,7 +583,7 @@ TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
ASSERT_EQ(0, metadata->set_active_set(2));
@@ -637,7 +634,7 @@ TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
ASSERT_EQ(0, metadata->set_active_set(2));
metadata->set_minimum_set(2);
@@ -686,7 +683,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -736,7 +733,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -791,7 +788,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -827,7 +824,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -886,7 +883,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -922,7 +919,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, commit_position));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -964,7 +961,7 @@ TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, {}));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
@@ -983,7 +980,7 @@ TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
ASSERT_EQ(0, this->client_register(oid));
ASSERT_EQ(0, this->client_commit(oid, {}));
- journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ auto metadata = this->create_metadata(oid);
ASSERT_EQ(0, this->init_metadata(metadata));
journal::JournalPlayer *player = this->create_player(oid, metadata);
diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc
index 41518f8e9be..466ee274128 100644
--- a/src/test/journal/test_JournalRecorder.cc
+++ b/src/test/journal/test_JournalRecorder.cc
@@ -14,7 +14,7 @@ public:
using JournalRecorderPtr = std::unique_ptr<journal::JournalRecorder,
std::function<void(journal::JournalRecorder*)>>;
JournalRecorderPtr create_recorder(
- const std::string &oid, const journal::JournalMetadataPtr &metadata) {
+ const std::string &oid, const ceph::ref_t<journal::JournalMetadata>& metadata) {
JournalRecorderPtr recorder{
new journal::JournalRecorder(m_ioctx, oid + ".", metadata, 0),
[](journal::JournalRecorder* recorder) {
@@ -34,7 +34,7 @@ TEST_F(TestJournalRecorder, Append) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
JournalRecorderPtr recorder = create_recorder(oid, metadata);
@@ -51,7 +51,7 @@ TEST_F(TestJournalRecorder, AppendKnownOverflow) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_EQ(0U, metadata->get_active_set());
@@ -73,7 +73,7 @@ TEST_F(TestJournalRecorder, AppendDelayedOverflow) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_EQ(0U, metadata->get_active_set());
@@ -98,7 +98,7 @@ TEST_F(TestJournalRecorder, FutureFlush) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
JournalRecorderPtr recorder = create_recorder(oid, metadata);
@@ -118,7 +118,7 @@ TEST_F(TestJournalRecorder, Flush) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
JournalRecorderPtr recorder = create_recorder(oid, metadata);
@@ -142,7 +142,7 @@ TEST_F(TestJournalRecorder, OverflowCommitObjectNumber) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_EQ(0U, metadata->get_active_set());
diff --git a/src/test/journal/test_JournalTrimmer.cc b/src/test/journal/test_JournalTrimmer.cc
index 6d6917e4300..aaf10979fdf 100644
--- a/src/test/journal/test_JournalTrimmer.cc
+++ b/src/test/journal/test_JournalTrimmer.cc
@@ -28,7 +28,7 @@ public:
RadosTestFixture::TearDown();
}
- int append_payload(journal::JournalMetadataPtr metadata,
+ int append_payload(const ceph::ref_t<journal::JournalMetadata>& metadata,
const std::string &oid, uint64_t object_num,
const std::string &payload, uint64_t *commit_tid) {
int r = append(oid + "." + stringify(object_num), create_payload(payload));
@@ -39,16 +39,15 @@ public:
return r;
}
- journal::JournalMetadataPtr create_metadata(const std::string &oid) {
- journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata(
- oid);
+ auto create_metadata(const std::string &oid) {
+ auto metadata = RadosTestFixture::create_metadata(oid);
m_metadata_list.push_back(metadata);
metadata->add_listener(&m_listener);
return metadata;
}
journal::JournalTrimmer *create_trimmer(const std::string &oid,
- const journal::JournalMetadataPtr &metadata) {
+ const ceph::ref_t<journal::JournalMetadata>& metadata) {
journal::JournalTrimmer *trimmer(new journal::JournalTrimmer(
m_ioctx, oid + ".", metadata));
m_trimmers.push_back(trimmer);
@@ -61,7 +60,7 @@ public:
return m_ioctx.operate(oid, &op);
}
- typedef std::list<journal::JournalMetadataPtr> MetadataList;
+ typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList;
MetadataList m_metadata_list;
std::list<journal::JournalTrimmer*> m_trimmers;
};
@@ -71,7 +70,7 @@ TEST_F(TestJournalTrimmer, Committed) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
@@ -114,7 +113,7 @@ TEST_F(TestJournalTrimmer, CommittedWithOtherClient) {
ASSERT_EQ(0, client_register(oid));
ASSERT_EQ(0, client_register(oid, "client2", "slow client"));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
@@ -149,7 +148,7 @@ TEST_F(TestJournalTrimmer, RemoveObjects) {
ASSERT_EQ(0, create(oid, 12, 2));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
@@ -181,7 +180,7 @@ TEST_F(TestJournalTrimmer, RemoveObjectsWithOtherClient) {
ASSERT_EQ(0, client_register(oid));
ASSERT_EQ(0, client_register(oid, "client2", "other client"));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc
index 8e3f3323bba..5ac3d8b1211 100644
--- a/src/test/journal/test_ObjectPlayer.cc
+++ b/src/test/journal/test_ObjectPlayer.cc
@@ -10,19 +10,16 @@
#include "test/journal/RadosTestFixture.h"
template <typename T>
-class TestObjectPlayer : public RadosTestFixture {
+class TestObjectPlayer : public RadosTestFixture, public T {
public:
- static const uint32_t max_fetch_bytes = T::max_fetch_bytes;
-
- journal::ObjectPlayerPtr create_object(const std::string &oid,
- uint8_t order) {
- journal::ObjectPlayerPtr object(new journal::ObjectPlayer(
+ auto create_object(const std::string &oid, uint8_t order) {
+ auto object = ceph::make_ref<journal::ObjectPlayer>(
m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order,
- max_fetch_bytes));
+ T::max_fetch_bytes);
return object;
}
- int fetch(journal::ObjectPlayerPtr object_player) {
+ int fetch(const ceph::ref_t<journal::ObjectPlayer>& object_player) {
while (true) {
C_SaferCond ctx;
object_player->set_refetch_state(
@@ -36,7 +33,7 @@ public:
return 0;
}
- int watch_and_wait_for_entries(journal::ObjectPlayerPtr object_player,
+ int watch_and_wait_for_entries(const ceph::ref_t<journal::ObjectPlayer>& object_player,
journal::ObjectPlayer::Entries *entries,
size_t count) {
for (size_t i = 0; i < 50; ++i) {
@@ -63,7 +60,7 @@ public:
template <uint32_t _max_fetch_bytes>
struct TestObjectPlayerParams {
- static const uint32_t max_fetch_bytes = _max_fetch_bytes;
+ static inline const uint32_t max_fetch_bytes = _max_fetch_bytes;
};
typedef ::testing::Types<TestObjectPlayerParams<0>,
@@ -81,7 +78,7 @@ TYPED_TEST(TestObjectPlayer, Fetch) {
encode(entry2, bl);
ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
@@ -104,7 +101,7 @@ TYPED_TEST(TestObjectPlayer, FetchLarge) {
encode(entry2, bl);
ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = this->create_object(oid, 12);
+ auto object = this->create_object(oid, 12);
ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
@@ -126,7 +123,7 @@ TYPED_TEST(TestObjectPlayer, FetchDeDup) {
encode(entry2, bl);
ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
@@ -143,7 +140,7 @@ TYPED_TEST(TestObjectPlayer, FetchEmpty) {
bufferlist bl;
ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
ASSERT_EQ(0, this->fetch(object));
ASSERT_TRUE(object->empty());
@@ -161,7 +158,7 @@ TYPED_TEST(TestObjectPlayer, FetchCorrupt) {
encode(entry2, bl);
ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
ASSERT_EQ(-EBADMSG, this->fetch(object));
journal::ObjectPlayer::Entries entries;
@@ -182,7 +179,7 @@ TYPED_TEST(TestObjectPlayer, FetchAppend) {
encode(entry1, bl);
ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
@@ -215,7 +212,7 @@ TYPED_TEST(TestObjectPlayer, PopEntry) {
encode(entry2, bl);
ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
@@ -234,7 +231,7 @@ TYPED_TEST(TestObjectPlayer, PopEntry) {
TYPED_TEST(TestObjectPlayer, Watch) {
std::string oid = this->get_temp_oid();
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
C_SaferCond cond1;
object->watch(&cond1, 0.1);
@@ -272,7 +269,7 @@ TYPED_TEST(TestObjectPlayer, Watch) {
TYPED_TEST(TestObjectPlayer, Unwatch) {
std::string oid = this->get_temp_oid();
- journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ auto object = this->create_object(oid, 14);
C_SaferCond watch_ctx;
object->watch(&watch_ctx, 600);
diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc
index 87efca571a6..72e4fd9c194 100644
--- a/src/test/journal/test_ObjectRecorder.cc
+++ b/src/test/journal/test_ObjectRecorder.cc
@@ -76,12 +76,10 @@ public:
}
}
}
- journal::ObjectRecorderPtr create_object(const std::string& oid,
- uint8_t order,
- ceph::mutex* lock) {
- journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
+ auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) {
+ auto object = ceph::make_ref<journal::ObjectRecorder>(
m_ioctx, oid, 0, lock, m_work_queue, &m_handler,
- order, m_max_in_flight_appends));
+ order, m_max_in_flight_appends);
{
std::lock_guard locker{*lock};
object->set_append_batch_options(m_flush_interval,
@@ -115,16 +113,15 @@ public:
double m_flush_age = 600;
uint64_t m_max_in_flight_appends = 0;
using ObjectRecorders =
- std::list<std::pair<journal::ObjectRecorderPtr, ceph::mutex*>>;
+ std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>;
ObjectRecorders m_object_recorders;
Handler m_handler;
};
journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
const std::string &payload) {
- journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
- 456));
- future->init(journal::FutureImplPtr());
+ auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
+ future->init(ceph::ref_t<journal::FutureImpl>());
bufferlist bl;
bl.append(payload);
@@ -136,12 +133,12 @@ TEST_F(TestObjectRecorder, Append) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
@@ -170,12 +167,12 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
@@ -203,12 +200,12 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
@@ -236,12 +233,12 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.1, -1);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
@@ -268,12 +265,12 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 12, &lock);
+ auto object = flusher.create_object(oid, 12, &lock);
std::string payload(2048, '1');
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
@@ -301,12 +298,12 @@ TEST_F(TestObjectRecorder, Flush) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
@@ -331,12 +328,12 @@ TEST_F(TestObjectRecorder, FlushFuture) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
@@ -359,12 +356,12 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
@@ -388,12 +385,12 @@ TEST_F(TestObjectRecorder, Close) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
- journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
@@ -418,14 +415,14 @@ TEST_F(TestObjectRecorder, Overflow) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1");
ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2");
ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
- journal::ObjectRecorderPtr object1 = flusher.create_object(oid, 12, &lock1);
+ auto object1 = flusher.create_object(oid, 12, &lock1);
std::string payload(1 << 11, '1');
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
@@ -445,7 +442,7 @@ TEST_F(TestObjectRecorder, Overflow) {
ASSERT_TRUE(flusher.wait_for_overflow());
- journal::ObjectRecorderPtr object2 = flusher.create_object(oid, 12, &lock2);
+ auto object2 = flusher.create_object(oid, 12, &lock2);
journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
payload);
diff --git a/src/test/librbd/fsx.cc b/src/test/librbd/fsx.cc
index 4318b1ce462..599e2596e1e 100644
--- a/src/test/librbd/fsx.cc
+++ b/src/test/librbd/fsx.cc
@@ -277,11 +277,6 @@ struct ReplayHandler : public journal::ReplayHandler {
on_finish(on_finish) {
}
- void get() override {
- }
- void put() override {
- }
-
void handle_entries_available() override {
while (true) {
journal::ReplayEntry replay_entry;
diff --git a/src/test/librbd/journal/test_Entries.cc b/src/test/librbd/journal/test_Entries.cc
index a79d2c0cbc8..353886626d7 100644
--- a/src/test/librbd/journal/test_Entries.cc
+++ b/src/test/librbd/journal/test_Entries.cc
@@ -32,11 +32,6 @@ public:
: entries_available(false), complete(false) {
}
- void get() override {
- }
- void put() override {
- }
-
void handle_entries_available() override {
std::lock_guard locker{lock};
entries_available = true;
diff --git a/src/test/mds/TestSessionFilter.cc b/src/test/mds/TestSessionFilter.cc
index 150663da0e7..9fa33f4517f 100644
--- a/src/test/mds/TestSessionFilter.cc
+++ b/src/test/mds/TestSessionFilter.cc
@@ -74,15 +74,13 @@ TEST(MDSSessionFilter, IdEquality)
SessionFilter filter;
std::stringstream ss;
filter.parse({"id=123"}, &ss);
- Session *a = new Session(nullptr);;
- Session *b = new Session(nullptr);;
+ auto a = ceph::make_ref<Session>(nullptr);;
+ auto b = ceph::make_ref<Session>(nullptr);;
a->info.inst.name.parse("client.123");
b->info.inst.name.parse("client.456");
ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;}));
ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;}));
- a->put();
- b->put();
}
TEST(MDSSessionFilter, StateEquality)
@@ -90,15 +88,13 @@ TEST(MDSSessionFilter, StateEquality)
SessionFilter filter;
std::stringstream ss;
filter.parse({"state=closing"}, &ss);
- Session *a = new Session(nullptr);
+ auto a = ceph::make_ref<Session>(nullptr);
a->set_state(Session::STATE_CLOSING);
- Session *b = new Session(nullptr);
+ auto b = ceph::make_ref<Session>(nullptr);
b->set_state(Session::STATE_OPENING);
ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;}));
ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;}));
- a->put();
- b->put();
}
TEST(MDSSessionFilter, AuthEquality)
@@ -106,15 +102,13 @@ TEST(MDSSessionFilter, AuthEquality)
SessionFilter filter;
std::stringstream ss;
filter.parse({"auth_name=rhubarb"}, &ss);
- Session *a = new Session(nullptr);
+ auto a = ceph::make_ref<Session>(nullptr);
a->info.auth_name.set_id("rhubarb");
- Session *b = new Session(nullptr);
+ auto b = ceph::make_ref<Session>(nullptr);
b->info.auth_name.set_id("custard");
ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;}));
ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;}));
- a->put();
- b->put();
}
TEST(MDSSessionFilter, MetadataEquality)
@@ -124,17 +118,15 @@ TEST(MDSSessionFilter, MetadataEquality)
int r = filter.parse({"client_metadata.root=/rhubarb"}, &ss);
ASSERT_EQ(r, 0);
client_metadata_t meta;
- Session *a = new Session(nullptr);
+ auto a = ceph::make_ref<Session>(nullptr);
meta.kv_map = {{"root", "/rhubarb"}};
a->set_client_metadata(meta);
- Session *b = new Session(nullptr);
+ auto b = ceph::make_ref<Session>(nullptr);
meta.kv_map = {{"root", "/custard"}};
b->set_client_metadata(meta);
ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return false;}));
ASSERT_FALSE(filter.match(*b, [](client_t c) -> bool {return false;}));
- a->put();
- b->put();
}
TEST(MDSSessionFilter, ReconnectingEquality)
@@ -143,9 +135,8 @@ TEST(MDSSessionFilter, ReconnectingEquality)
std::stringstream ss;
int r = filter.parse({"reconnecting=true"}, &ss);
ASSERT_EQ(r, 0);
- Session *a = new Session(nullptr);
+ auto a = ceph::make_ref<Session>(nullptr);
ASSERT_TRUE(filter.match(*a, [](client_t c) -> bool {return true;}));
ASSERT_FALSE(filter.match(*a, [](client_t c) -> bool {return false;}));
- a->put();
}
diff --git a/src/test/objectstore/test_bluestore_types.cc b/src/test/objectstore/test_bluestore_types.cc
index 25320222619..f0339fe943e 100644
--- a/src/test/objectstore/test_bluestore_types.cc
+++ b/src/test/objectstore/test_bluestore_types.cc
@@ -341,7 +341,7 @@ TEST(Blob, put_ref)
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
- BlueStore::Collection coll(&store, oc, bc, coll_t());
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Blob b;
b.shared_blob = new BlueStore::SharedBlob(nullptr);
b.shared_blob->get(); // hack to avoid dtor from running
@@ -349,19 +349,19 @@ TEST(Blob, put_ref)
b.dirty_blob().allocated_test(
bluestore_pextent_t(bluestore_pextent_t::INVALID_OFFSET, 0x8000));
b.dirty_blob().allocated_test(bluestore_pextent_t(0x4071f000, 0x5000));
- b.get_ref(&coll, 0, 0x1200);
- b.get_ref(&coll, 0xae00, 0x4200);
+ b.get_ref(coll.get(), 0, 0x1200);
+ b.get_ref(coll.get(), 0xae00, 0x4200);
ASSERT_EQ(0x5400u, b.get_referenced_bytes());
cout << b << std::endl;
PExtentVector r;
- ASSERT_FALSE(b.put_ref(&coll, 0, 0x1200, &r));
+ ASSERT_FALSE(b.put_ref(coll.get(), 0, 0x1200, &r));
ASSERT_EQ(0x4200u, b.get_referenced_bytes());
cout << " r " << r << std::endl;
cout << b << std::endl;
r.clear();
- ASSERT_TRUE(b.put_ref(&coll, 0xae00, 0x4200, &r));
+ ASSERT_TRUE(b.put_ref(coll.get(), 0xae00, 0x4200, &r));
ASSERT_EQ(0u, b.get_referenced_bytes());
cout << " r " << r << std::endl;
cout << b << std::endl;
@@ -373,7 +373,7 @@ TEST(Blob, put_ref)
g_ceph_context, "lru", NULL);
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
{
BlueStore::Blob B;
@@ -822,7 +822,7 @@ TEST(Blob, put_ref)
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Blob B;
B.shared_blob = new BlueStore::SharedBlob(nullptr);
B.shared_blob->get(); // hack to avoid dtor from running
@@ -911,7 +911,7 @@ TEST(Blob, split)
g_ceph_context, "lru", NULL);
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
{
BlueStore::Blob L, R;
L.shared_blob = new BlueStore::SharedBlob(coll.get());
@@ -969,7 +969,7 @@ TEST(Blob, legacy_decode)
g_ceph_context, "lru", NULL);
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
bufferlist bl, bl2;
{
BlueStore::Blob B;
@@ -1050,7 +1050,7 @@ TEST(ExtentMap, seek_lextent)
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Onode onode(coll.get(), ghobject_t(), "");
BlueStore::ExtentMap em(&onode);
BlueStore::BlobRef br(new BlueStore::Blob);
@@ -1102,7 +1102,7 @@ TEST(ExtentMap, has_any_lextents)
g_ceph_context, "lru", NULL);
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Onode onode(coll.get(), ghobject_t(), "");
BlueStore::ExtentMap em(&onode);
BlueStore::BlobRef b(new BlueStore::Blob);
@@ -1153,7 +1153,7 @@ TEST(ExtentMap, compress_extent_map)
BlueStore::BufferCacheShard *bc = BlueStore::BufferCacheShard::create(
g_ceph_context, "lru", NULL);
-BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Onode onode(coll.get(), ghobject_t(), "");
BlueStore::ExtentMap em(&onode);
BlueStore::BlobRef b1(new BlueStore::Blob);
@@ -1210,7 +1210,7 @@ TEST(GarbageCollector, BasicTest)
g_ceph_context, "lru", NULL);
BlueStore store(g_ceph_context, "", 4096);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Onode onode(coll.get(), ghobject_t(), "");
BlueStore::ExtentMap em(&onode);
@@ -1295,7 +1295,7 @@ TEST(GarbageCollector, BasicTest)
*/
{
BlueStore store(g_ceph_context, "", 0x10000);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Onode onode(coll.get(), ghobject_t(), "");
BlueStore::ExtentMap em(&onode);
@@ -1411,7 +1411,7 @@ TEST(GarbageCollector, BasicTest)
*/
{
BlueStore store(g_ceph_context, "", 0x10000);
- BlueStore::CollectionRef coll(new BlueStore::Collection(&store, oc, bc, coll_t()));
+ auto coll = ceph::make_ref<BlueStore::Collection>(&store, oc, bc, coll_t());
BlueStore::Onode onode(coll.get(), ghobject_t(), "");
BlueStore::ExtentMap em(&onode);
diff --git a/src/tools/rbd/action/Journal.cc b/src/tools/rbd/action/Journal.cc
index db506207c4b..5d283420a5e 100644
--- a/src/tools/rbd/action/Journal.cc
+++ b/src/tools/rbd/action/Journal.cc
@@ -461,9 +461,6 @@ protected:
JournalPlayer *journal;
explicit ReplayHandler(JournalPlayer *_journal) : journal(_journal) {}
- void get() override {}
- void put() override {}
-
void handle_entries_available() override {
journal->handle_replay_ready();
}
diff --git a/src/tools/rbd_mirror/BaseRequest.h b/src/tools/rbd_mirror/BaseRequest.h
index 5053eb830d9..f1ebc0cf7e5 100644
--- a/src/tools/rbd_mirror/BaseRequest.h
+++ b/src/tools/rbd_mirror/BaseRequest.h
@@ -13,7 +13,7 @@ namespace mirror {
class BaseRequest : public RefCountedObject {
public:
BaseRequest(const std::string& name, CephContext *cct, Context *on_finish)
- : RefCountedObject(cct, 1), m_name(name), m_cct(cct),
+ : RefCountedObject(cct), m_name(name), m_cct(cct),
m_on_finish(on_finish) {
}
diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc
index 1111b3b8438..cc419f58da3 100644
--- a/src/tools/rbd_mirror/ImageReplayer.cc
+++ b/src/tools/rbd_mirror/ImageReplayer.cc
@@ -62,8 +62,6 @@ template <typename I>
struct ReplayHandler : public ::journal::ReplayHandler {
ImageReplayer<I> *replayer;
ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
- void get() override {}
- void put() override {}
void handle_entries_available() override {
replayer->handle_replay_ready();