summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalMetadata.cc
diff options
context:
space:
mode:
authorKefu Chai <kchai@redhat.com>2019-07-07 06:35:23 +0200
committerKefu Chai <kchai@redhat.com>2019-08-03 05:27:19 +0200
commitf39b32ebfa3b88f43bcc7f30d77511371ac09e1c (patch)
tree6fc8e7e1ee7cc033ac0b0af7622afc798c9e9ccb /src/journal/JournalMetadata.cc
parentcommon/mutex_debug: add lockdep parameter to mutex's ctor (diff)
downloadceph-f39b32ebfa3b88f43bcc7f30d77511371ac09e1c.tar.xz
ceph-f39b32ebfa3b88f43bcc7f30d77511371ac09e1c.zip
journal: s/Mutex/ceph::mutex/
* FutureImpl::m_lock is an exception. as, before this change, the lock was initialized like `m_lock("FutureImpl::m_lock", false, false)`, see the declaration of `Mutex(const std::string &n, bool r = false, bool ld=true, bool bt=false)` so `m_lock` is actually not using the extra features offered by `Mutex` like runtime lockdeps check. and `mutex_debugging_base` does not allow us to disable lockdeps individually. but it does use the `is_locked()` method. so instead of using `ceph::mutex` directly, a cutomized `ceph::mutex` is added for `CEPH_DEBUG_MUTEX` build. Signed-off-by: Kefu Chai <kchai@redhat.com>
Diffstat (limited to 'src/journal/JournalMetadata.cc')
-rw-r--r--src/journal/JournalMetadata.cc95
1 files changed, 46 insertions, 49 deletions
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc
index f35eccb131f..7fcad836c19 100644
--- a/src/journal/JournalMetadata.cc
+++ b/src/journal/JournalMetadata.cc
@@ -401,7 +401,7 @@ struct C_AssertActiveTag : public Context {
} // anonymous namespace
JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock, librados::IoCtx &ioctx,
+ ceph::mutex *timer_lock, librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id,
const Settings &settings)
@@ -409,7 +409,7 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
m_client_id(client_id), m_settings(settings), m_order(0),
m_splay_width(0), m_pool_id(-1), m_initialized(false),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
- m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
+ m_commit_tid(0), m_watch_ctx(this),
m_watch_handle(0), m_minimum_set(0), m_active_set(0),
m_update_notifications(0), m_commit_position_ctx(NULL),
m_commit_position_task_ctx(NULL) {
@@ -418,13 +418,13 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
}
JournalMetadata::~JournalMetadata() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_initialized);
}
void JournalMetadata::init(Context *on_finish) {
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_initialized);
m_initialized = true;
}
@@ -437,7 +437,7 @@ void JournalMetadata::init(Context *on_finish) {
if (r < 0) {
lderr(m_cct) << __func__ << ": failed to watch journal"
<< cpp_strerror(r) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_watch_handle = 0;
on_finish->complete(r);
return;
@@ -459,7 +459,7 @@ void JournalMetadata::shut_down(Context *on_finish) {
uint64_t watch_handle = 0;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_initialized = false;
std::swap(watch_handle, m_watch_handle);
}
@@ -592,23 +592,23 @@ void JournalMetadata::get_tags(uint64_t start_after_tag_tid,
}
void JournalMetadata::add_listener(JournalMetadataListener *listener) {
- Mutex::Locker locker(m_lock);
- while (m_update_notifications > 0) {
- m_update_cond.Wait(m_lock);
- }
+ std::unique_lock locker{m_lock};
+ m_update_cond.wait(locker, [this] {
+ return m_update_notifications <= 0;
+ });
m_listeners.push_back(listener);
}
void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
- Mutex::Locker locker(m_lock);
- while (m_update_notifications > 0) {
- m_update_cond.Wait(m_lock);
- }
+ std::unique_lock locker{m_lock};
+ m_update_cond.wait(locker, [this] {
+ return m_update_notifications <= 0;
+ });
m_listeners.remove(listener);
}
void JournalMetadata::set_minimum_set(uint64_t object_set) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
<< ", new=" << object_set << dendl;
@@ -637,7 +637,7 @@ int JournalMetadata::set_active_set(uint64_t object_set) {
}
void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
<< ", new=" << object_set << dendl;
@@ -661,7 +661,7 @@ void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
}
void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
m_async_op_tracker,
@@ -681,8 +681,7 @@ void JournalMetadata::flush_commit_position() {
void JournalMetadata::flush_commit_position(Context *on_safe) {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker timer_locker(*m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::scoped_lock locker{*m_timer_lock, m_lock};
if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
// nothing to flush
if (on_safe != nullptr) {
@@ -703,7 +702,7 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
}
void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
if (allocated_entry_tid <= entry_tid) {
allocated_entry_tid = entry_tid + 1;
@@ -712,7 +711,7 @@ void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
uint64_t *entry_tid) const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
if (it == m_allocated_entry_tids.end()) {
@@ -740,7 +739,7 @@ void JournalMetadata::refresh(Context *on_complete) {
ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (on_complete != nullptr) {
m_refresh_ctxs.push_back(on_complete);
}
@@ -755,7 +754,7 @@ void JournalMetadata::refresh(Context *on_complete) {
void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
- m_lock.Lock();
+ m_lock.lock();
if (r == 0) {
Client client(m_client_id, bufferlist());
RegisteredClients::iterator it = refresh->registered_clients.find(client);
@@ -770,14 +769,14 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
m_client = *it;
++m_update_notifications;
- m_lock.Unlock();
+ m_lock.unlock();
for (Listeners::iterator it = m_listeners.begin();
it != m_listeners.end(); ++it) {
(*it)->handle_update(this);
}
- m_lock.Lock();
+ m_lock.lock();
if (--m_update_notifications == 0) {
- m_update_cond.Signal();
+ m_update_cond.notify_all();
}
} else {
lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
@@ -791,7 +790,7 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
if (m_refreshes_in_progress == 0) {
std::swap(refresh_ctxs, m_refresh_ctxs);
}
- m_lock.Unlock();
+ m_lock.unlock();
for (auto ctx : refresh_ctxs) {
ctx->complete(r);
@@ -801,8 +800,8 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
void JournalMetadata::cancel_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- ceph_assert(m_timer_lock->is_locked());
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(m_commit_position_ctx != nullptr);
ceph_assert(m_commit_position_task_ctx != nullptr);
m_timer->cancel_event(m_commit_position_task_ctx);
@@ -812,8 +811,8 @@ void JournalMetadata::cancel_commit_task() {
void JournalMetadata::schedule_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- ceph_assert(m_timer_lock->is_locked());
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == nullptr) {
m_commit_position_task_ctx =
@@ -823,8 +822,8 @@ void JournalMetadata::schedule_commit_task() {
}
void JournalMetadata::handle_commit_position_task() {
- ceph_assert(m_timer_lock->is_locked());
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ldout(m_cct, 20) << __func__ << ": "
<< "client_id=" << m_client_id << ", "
<< "commit_position=" << m_commit_position << dendl;
@@ -838,13 +837,13 @@ void JournalMetadata::handle_commit_position_task() {
Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
Contexts flush_commit_position_ctxs;
- m_lock.Lock();
+ m_lock.lock();
ceph_assert(m_flush_commits_in_progress > 0);
--m_flush_commits_in_progress;
if (m_flush_commits_in_progress == 0) {
std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
}
- m_lock.Unlock();
+ m_lock.unlock();
commit_position_ctx->complete(0);
for (auto ctx : flush_commit_position_ctxs) {
@@ -856,9 +855,9 @@ void JournalMetadata::handle_commit_position_task() {
ctx = new FunctionContext([this, ctx](int r) {
// manually kick of a refresh in case the notification is missed
// and ignore the next notification that we are about to send
- m_lock.Lock();
+ m_lock.lock();
++m_ignore_watch_notifies;
- m_lock.Unlock();
+ m_lock.unlock();
refresh(ctx);
});
@@ -877,12 +876,12 @@ void JournalMetadata::handle_commit_position_task() {
}
void JournalMetadata::schedule_watch_reset() {
- ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
m_timer->add_event_after(1, new C_WatchReset(this));
}
void JournalMetadata::handle_watch_reset() {
- ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
if (!m_initialized) {
return;
}
@@ -911,7 +910,7 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_ignore_watch_notifies > 0) {
--m_ignore_watch_notifies;
return;
@@ -930,8 +929,7 @@ void JournalMetadata::handle_watch_error(int err) {
lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
}
- Mutex::Locker timer_locker(*m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::scoped_lock locker{*m_timer_lock, m_lock};
// release old watch on error
if (m_watch_handle != 0) {
@@ -947,7 +945,7 @@ void JournalMetadata::handle_watch_error(int err) {
uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
uint64_t tag_tid,
uint64_t entry_tid) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
uint64_t commit_tid = ++m_commit_tid;
m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
entry_tid);
@@ -962,7 +960,7 @@ uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
uint64_t object_num) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
auto it = m_pending_commit_tids.find(commit_tid);
ceph_assert(it != m_pending_commit_tids.end());
@@ -978,7 +976,7 @@ void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
void JournalMetadata::get_commit_entry(uint64_t commit_tid,
uint64_t *object_num,
uint64_t *tag_tid, uint64_t *entry_tid) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
auto it = m_pending_commit_tids.find(commit_tid);
ceph_assert(it != m_pending_commit_tids.end());
@@ -995,8 +993,7 @@ void JournalMetadata::committed(uint64_t commit_tid,
ObjectSetPosition commit_position;
Context *stale_ctx = nullptr;
{
- Mutex::Locker timer_locker(*m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::scoped_lock locker{*m_timer_lock, m_lock};
ceph_assert(commit_tid > m_commit_position_tid);
if (!m_commit_position.object_positions.empty()) {
@@ -1103,7 +1100,7 @@ void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
Context *ctx = on_finish;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
@@ -1158,7 +1155,7 @@ std::ostream &operator<<(std::ostream &os,
std::ostream &operator<<(std::ostream &os,
const JournalMetadata &jm) {
- Mutex::Locker locker(jm.m_lock);
+ std::lock_guard locker{jm.m_lock};
os << "[oid=" << jm.m_oid << ", "
<< "initialized=" << jm.m_initialized << ", "
<< "order=" << (int)jm.m_order << ", "