// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include "MDSRank.h" #include "MDCache.h" #include "Locker.h" #include "MDBalancer.h" #include "Migrator.h" #include "CInode.h" #include "CDir.h" #include "CDentry.h" #include "Mutation.h" #include "MDSContext.h" #include "MDLog.h" #include "MDSMap.h" #include "events/EUpdate.h" #include "events/EOpen.h" #include "msg/Messenger.h" #include "osdc/Objecter.h" #include "messages/MInodeFileCaps.h" #include "messages/MLock.h" #include "messages/MClientLease.h" #include "messages/MClientReply.h" #include "messages/MClientCaps.h" #include "messages/MClientCapRelease.h" #include "messages/MMDSSlaveRequest.h" #include #include "common/config.h" #define dout_subsys ceph_subsys_mds #undef dout_prefix #define dout_context g_ceph_context #define dout_prefix _prefix(_dout, mds) static ostream& _prefix(std::ostream *_dout, MDSRank *mds) { return *_dout << "mds." << mds->get_nodeid() << ".locker "; } class LockerContext : public MDSInternalContextBase { protected: Locker *locker; MDSRank *get_mds() override { return locker->mds; } public: explicit LockerContext(Locker *locker_) : locker(locker_) { ceph_assert(locker != NULL); } }; class LockerLogContext : public MDSLogContextBase { protected: Locker *locker; MDSRank *get_mds() override { return locker->mds; } public: explicit LockerLogContext(Locker *locker_) : locker(locker_) { ceph_assert(locker != NULL); } }; Locker::Locker(MDSRank *m, MDCache *c) : mds(m), mdcache(c), need_snapflush_inodes(member_offset(CInode, item_caps)) {} void Locker::dispatch(const Message::const_ref &m) { switch (m->get_type()) { // inter-mds locking case MSG_MDS_LOCK: handle_lock(MLock::msgref_cast(m)); break; // inter-mds caps case MSG_MDS_INODEFILECAPS: handle_inode_file_caps(MInodeFileCaps::msgref_cast(m)); break; // client sync case CEPH_MSG_CLIENT_CAPS: handle_client_caps(MClientCaps::msgref_cast(m)); break; case CEPH_MSG_CLIENT_CAPRELEASE: handle_client_cap_release(MClientCapRelease::msgref_cast(m)); break; case CEPH_MSG_CLIENT_LEASE: handle_client_lease(MClientLease::msgref_cast(m)); break; default: derr << "locker unknown message " << m->get_type() << dendl; ceph_abort_msg("locker unknown message"); } } void Locker::tick() { scatter_tick(); caps_tick(); } /* * locks vs rejoin * * * */ void Locker::send_lock_message(SimpleLock *lock, int msg) { for (const auto &it : lock->get_parent()->get_replicas()) { if (mds->is_cluster_degraded() && mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) continue; auto m = MLock::create(lock, msg, mds->get_nodeid()); mds->send_message_mds(m, it.first); } } void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data) { for (const auto &it : lock->get_parent()->get_replicas()) { if (mds->is_cluster_degraded() && mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN) continue; auto m = MLock::create(lock, msg, mds->get_nodeid()); m->set_data(data); mds->send_message_mds(m, it.first); } } void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov) { // rdlock ancestor snaps CInode *t = in; while (t->get_projected_parent_dn()) { t = t->get_projected_parent_dn()->get_dir()->get_inode(); lov.add_rdlock(&t->snaplock); } lov.add_rdlock(&in->snaplock); } void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov, file_layout_t **layout) { //rdlock ancestor snaps CInode *t = in; lov.add_rdlock(&in->snaplock); lov.add_rdlock(&in->policylock); bool found_layout = false; while (t) { lov.add_rdlock(&t->snaplock); if (!found_layout) { lov.add_rdlock(&t->policylock); if (t->get_projected_inode()->has_layout()) { *layout = &t->get_projected_inode()->layout; found_layout = true; } } if (t->get_projected_parent_dn() && t->get_projected_parent_dn()->get_dir()) t = t->get_projected_parent_dn()->get_dir()->get_inode(); else t = NULL; } } struct MarkEventOnDestruct { MDRequestRef& mdr; const char* message; bool mark_event; MarkEventOnDestruct(MDRequestRef& _mdr, const char *_message) : mdr(_mdr), message(_message), mark_event(true) {} ~MarkEventOnDestruct() { if (mark_event) mdr->mark_event(message); } }; /* If this function returns false, the mdr has been placed * on the appropriate wait list */ bool Locker::acquire_locks(MDRequestRef& mdr, MutationImpl::LockOpVec& lov, CInode *auth_pin_freeze, bool auth_pin_nonblock) { if (mdr->done_locking && !mdr->is_slave()) { // not on slaves! master requests locks piecemeal. dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl; return true; // at least we had better be! } dout(10) << "acquire_locks " << *mdr << dendl; MarkEventOnDestruct marker(mdr, "failed to acquire_locks"); client_t client = mdr->get_client(); set mustpin; // items to authpin // xlocks for (int i = 0, size = lov.size(); i < size; ++i) { auto& p = lov[i]; SimpleLock *lock = p.lock; MDSCacheObject *object = lock->get_parent(); if (p.is_xlock()) { if ((lock->get_type() == CEPH_LOCK_ISNAP || lock->get_type() == CEPH_LOCK_IPOLICY) && mds->is_cluster_degraded() && mdr->is_master() && !mdr->is_queued_for_replay()) { // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout // get processed in proper order. bool wait = false; if (object->is_auth()) { if (!mdr->locks.count(lock)) { set ls; object->list_replicas(ls); for (auto m : ls) { if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) { wait = true; break; } } } } else { // if the lock is the latest locked one, it's possible that slave mds got the lock // while there are recovering mds. if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin()) wait = true; } if (wait) { dout(10) << " must xlock " << *lock << " " << *object << ", waiting for cluster recovered" << dendl; mds->locker->drop_locks(mdr.get(), NULL); mdr->drop_local_auth_pins(); mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr)); return false; } } dout(20) << " must xlock " << *lock << " " << *object << dendl; mustpin.insert(object); // augment xlock with a versionlock? if (lock->get_type() == CEPH_LOCK_DN) { CDentry *dn = static_cast(object); if (!dn->is_auth()) continue; if (mdr->is_master()) { // master. wrlock versionlock so we can pipeline dentry updates to journal. lov.add_wrlock(&dn->versionlock); } else { // slave. exclusively lock the dentry version (i.e. block other journal updates). // this makes rollback safe. lov.add_xlock(&dn->versionlock); } } if (lock->get_type() > CEPH_LOCK_IVERSION) { // inode version lock? CInode *in = static_cast(object); if (!in->is_auth()) continue; if (mdr->is_master()) { // master. wrlock versionlock so we can pipeline inode updates to journal. lov.add_wrlock(&in->versionlock); } else { // slave. exclusively lock the inode version (i.e. block other journal updates). // this makes rollback safe. lov.add_xlock(&in->versionlock); } } } else if (p.is_wrlock()) { dout(20) << " must wrlock " << *lock << " " << *object << dendl; if (object->is_auth()) { mustpin.insert(object); } else if (!object->is_auth() && !lock->can_wrlock(client) && // we might have to request a scatter !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned dout(15) << " will also auth_pin " << *object << " in case we need to request a scatter" << dendl; mustpin.insert(object); } } else if (p.is_remote_wrlock()) { dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " " << *lock << " " << *object << dendl; mustpin.insert(object); } else if (p.is_rdlock()) { dout(20) << " must rdlock " << *lock << " " << *object << dendl; if (object->is_auth()) { mustpin.insert(object); } else if (!object->is_auth() && !lock->can_rdlock(client)) { // we might have to request an rdlock dout(15) << " will also auth_pin " << *object << " in case we need to request a rdlock" << dendl; mustpin.insert(object); } } else { ceph_assert(0 == "locker unknown lock operation"); } } lov.sort_and_merge(); // AUTH PINS map > mustpin_remote; // mds -> (object set) // can i auth pin them all now? marker.message = "failed to authpin local pins"; for (const auto &p : mustpin) { MDSCacheObject *object = p; dout(10) << " must authpin " << *object << dendl; if (mdr->is_auth_pinned(object)) { if (object != (MDSCacheObject*)auth_pin_freeze) continue; if (mdr->more()->is_remote_frozen_authpin) { if (mdr->more()->rename_inode == auth_pin_freeze) continue; // unfreeze auth pin for the wrong inode mustpin_remote[mdr->more()->rename_inode->authority().first].size(); } } if (!object->is_auth()) { if (!mdr->locks.empty()) drop_locks(mdr.get()); if (object->is_ambiguous_auth()) { // wait marker.message = "waiting for single auth, object is being migrated"; dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl; object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr)); mdr->drop_local_auth_pins(); return false; } mustpin_remote[object->authority().first].insert(object); continue; } int err = 0; if (!object->can_auth_pin(&err)) { // wait drop_locks(mdr.get()); mdr->drop_local_auth_pins(); if (auth_pin_nonblock) { dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl; mdr->aborted = true; return false; } if (err == MDSCacheObject::ERR_EXPORTING_TREE) { marker.message = "failed to authpin, subtree is being exported"; } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) { marker.message = "failed to authpin, dir is being fragmented"; } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) { marker.message = "failed to authpin, inode is being exported"; } dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl; object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); if (!mdr->remote_auth_pins.empty()) notify_freeze_waiter(object); return false; } } // ok, grab local auth pins for (const auto& p : mustpin) { MDSCacheObject *object = p; if (mdr->is_auth_pinned(object)) { dout(10) << " already auth_pinned " << *object << dendl; } else if (object->is_auth()) { dout(10) << " auth_pinning " << *object << dendl; mdr->auth_pin(object); } } // request remote auth_pins if (!mustpin_remote.empty()) { marker.message = "requesting remote authpins"; for (const auto& p : mdr->remote_auth_pins) { if (mustpin.count(p.first)) { ceph_assert(p.second == p.first->authority().first); map >::iterator q = mustpin_remote.find(p.second); if (q != mustpin_remote.end()) q->second.insert(p.first); } } for (map >::iterator p = mustpin_remote.begin(); p != mustpin_remote.end(); ++p) { dout(10) << "requesting remote auth_pins from mds." << p->first << dendl; // wait for active auth if (mds->is_cluster_degraded() && !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) { dout(10) << " mds." << p->first << " is not active" << dendl; if (mdr->more()->waiting_on_slave.empty()) mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr)); return false; } auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN); for (set::iterator q = p->second.begin(); q != p->second.end(); ++q) { dout(10) << " req remote auth_pin of " << **q << dendl; MDSCacheObjectInfo info; (*q)->set_object_info(info); req->get_authpins().push_back(info); if (*q == auth_pin_freeze) (*q)->set_object_info(req->get_authpin_freeze()); mdr->pin(*q); } if (auth_pin_nonblock) req->mark_nonblock(); mds->send_message_mds(req, p->first); // put in waiting list ceph_assert(mdr->more()->waiting_on_slave.count(p->first) == 0); mdr->more()->waiting_on_slave.insert(p->first); } return false; } // caps i'll need to issue set issue_set; bool result = false; // acquire locks. // make sure they match currently acquired locks. auto existing = mdr->locks.begin(); for (const auto& p : lov) { bool need_wrlock = p.is_wrlock(); bool need_remote_wrlock = p.is_remote_wrlock(); // already locked? if (existing != mdr->locks.end() && existing->lock == p.lock) { // right kind? auto it = existing++; auto have = *it; // don't reference if (have.is_xlock() && p.is_xlock()) { dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl; continue; } if (have.is_remote_wrlock() && (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) { dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target << " " << *have.lock << " " << *have.lock->get_parent() << dendl; remote_wrlock_finish(it, mdr.get()); have.clear_remote_wrlock(); } if (need_wrlock || need_remote_wrlock) { if (need_wrlock == have.is_wrlock() && need_remote_wrlock == have.is_remote_wrlock()) { if (need_wrlock) dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl; if (need_remote_wrlock) dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl; continue; } if (have.is_wrlock()) { if (!need_wrlock) dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl; else if (need_remote_wrlock) // acquire remote_wrlock first dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl; bool need_issue = false; wrlock_finish(it, mdr.get(), &need_issue); if (need_issue) issue_set.insert(static_cast(have.lock->get_parent())); } } else if (have.is_rdlock() && p.is_rdlock()) { dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl; continue; } } // hose any stray locks while (existing != mdr->locks.end()) { auto it = existing++; auto stray = *it; // don't reference dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl; bool need_issue = false; if (stray.is_xlock()) { xlock_finish(it, mdr.get(), &need_issue); } else if (stray.is_rdlock()) { rdlock_finish(it, mdr.get(), &need_issue); } else { // may have acquired both wrlock and remore wrlock if (stray.is_wrlock()) wrlock_finish(it, mdr.get(), &need_issue); if (stray.is_remote_wrlock()) remote_wrlock_finish(it, mdr.get()); } if (need_issue) issue_set.insert(static_cast(stray.lock->get_parent())); } // lock if (mdr->locking && p.lock != mdr->locking) { cancel_locking(mdr.get(), &issue_set); } if (p.is_xlock()) { marker.message = "failed to xlock, waiting"; if (!xlock_start(p.lock, mdr)) goto out; dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl; } else if (need_wrlock || need_remote_wrlock) { if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) { marker.message = "waiting for remote wrlocks"; remote_wrlock_start(p, p.wrlock_target, mdr); goto out; } if (need_wrlock) { marker.message = "failed to wrlock, waiting"; if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) { marker.message = "failed to wrlock, dropping remote wrlock and waiting"; // can't take the wrlock because the scatter lock is gathering. need to // release the remote wrlock, so that the gathering process can finish. auto it = mdr->locks.end(); ++it; remote_wrlock_finish(it, mdr.get()); remote_wrlock_start(p, p.wrlock_target, mdr); goto out; } // nowait if we have already gotten remote wrlock if (!wrlock_start(p, mdr, need_remote_wrlock)) goto out; dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl; } } else { ceph_assert(mdr->is_master()); if (p.lock->needs_recover()) { if (mds->is_cluster_degraded()) { if (!mdr->is_queued_for_replay()) { // see comments in SimpleLock::set_state_rejoin() and // ScatterLock::encode_state_for_rejoin() drop_locks(mdr.get()); mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr)); dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent() << ", waiting for cluster recovered" << dendl; marker.message = "rejoin recovering lock, waiting for cluster recovered"; return false; } } else { p.lock->clear_need_recover(); } } marker.message = "failed to rdlock, waiting"; if (!rdlock_start(p, mdr)) goto out; dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl; } } // any extra unneeded locks? while (existing != mdr->locks.end()) { auto it = existing++; auto stray = *it; dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl; bool need_issue = false; if (stray.is_xlock()) { xlock_finish(it, mdr.get(), &need_issue); } else if (stray.is_rdlock()) { rdlock_finish(it, mdr.get(), &need_issue); } else { // may have acquired both wrlock and remore wrlock if (stray.is_wrlock()) wrlock_finish(it, mdr.get(), &need_issue); if (stray.is_remote_wrlock()) remote_wrlock_finish(it, mdr.get()); } if (need_issue) issue_set.insert(static_cast(stray.lock->get_parent())); } mdr->done_locking = true; mdr->set_mds_stamp(ceph_clock_now()); result = true; marker.message = "acquired locks"; out: issue_caps_set(issue_set); return result; } void Locker::notify_freeze_waiter(MDSCacheObject *o) { CDir *dir = NULL; if (CInode *in = dynamic_cast(o)) { if (!in->is_root()) dir = in->get_parent_dir(); } else if (CDentry *dn = dynamic_cast(o)) { dir = dn->get_dir(); } else { dir = dynamic_cast(o); ceph_assert(dir); } if (dir) { if (dir->is_freezing_dir()) mdcache->fragment_freeze_inc_num_waiters(dir); if (dir->is_freezing_tree()) { while (!dir->is_freezing_tree_root()) dir = dir->get_parent_dir(); mdcache->migrator->export_freeze_inc_num_waiters(dir); } } } void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry) { for (const auto &p : mut->locks) { if (!p.is_xlock()) continue; MDSCacheObject *obj = p.lock->get_parent(); ceph_assert(obj->is_auth()); if (skip_dentry && (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION)) continue; dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl; p.lock->set_xlock_done(); } } void Locker::_drop_locks(MutationImpl *mut, set *pneed_issue, bool drop_rdlocks) { set slaves; for (auto it = mut->locks.begin(); it != mut->locks.end(); ) { SimpleLock *lock = it->lock; MDSCacheObject *obj = lock->get_parent(); if (it->is_xlock()) { if (obj->is_auth()) { bool ni = false; xlock_finish(it++, mut, &ni); if (ni) pneed_issue->insert(static_cast(obj)); } else { ceph_assert(lock->get_sm()->can_remote_xlock); slaves.insert(obj->authority().first); lock->put_xlock(); mut->locks.erase(it++); } } else if (it->is_wrlock() || it->is_remote_wrlock()) { if (it->is_remote_wrlock()) { slaves.insert(it->wrlock_target); it->clear_remote_wrlock(); } if (it->is_wrlock()) { bool ni = false; wrlock_finish(it++, mut, &ni); if (ni) pneed_issue->insert(static_cast(obj)); } else { mut->locks.erase(it++); } } else if (drop_rdlocks && it->is_rdlock()) { bool ni = false; rdlock_finish(it++, mut, &ni); if (ni) pneed_issue->insert(static_cast(obj)); } else { ++it; } } for (set::iterator p = slaves.begin(); p != slaves.end(); ++p) { if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) { dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl; auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_DROPLOCKS); mds->send_message_mds(slavereq, *p); } } } void Locker::cancel_locking(MutationImpl *mut, set *pneed_issue) { SimpleLock *lock = mut->locking; ceph_assert(lock); dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl; if (lock->get_parent()->is_auth()) { bool need_issue = false; if (lock->get_state() == LOCK_PREXLOCK) { _finish_xlock(lock, -1, &need_issue); } else if (lock->get_state() == LOCK_LOCK_XLOCK && lock->get_num_xlocks() == 0) { lock->set_state(LOCK_XLOCKDONE); eval_gather(lock, true, &need_issue); } if (need_issue) pneed_issue->insert(static_cast(lock->get_parent())); } mut->finish_locking(lock); } void Locker::drop_locks(MutationImpl *mut, set *pneed_issue) { // leftover locks set my_need_issue; if (!pneed_issue) pneed_issue = &my_need_issue; if (mut->locking) cancel_locking(mut, pneed_issue); _drop_locks(mut, pneed_issue, true); if (pneed_issue == &my_need_issue) issue_caps_set(*pneed_issue); mut->done_locking = false; } void Locker::drop_non_rdlocks(MutationImpl *mut, set *pneed_issue) { set my_need_issue; if (!pneed_issue) pneed_issue = &my_need_issue; _drop_locks(mut, pneed_issue, false); if (pneed_issue == &my_need_issue) issue_caps_set(*pneed_issue); } void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut) { set need_issue; for (auto it = mut->locks.begin(); it != mut->locks.end(); ) { if (!it->is_rdlock()) { ++it; continue; } SimpleLock *lock = it->lock; // make later mksnap/setlayout (at other mds) wait for this unsafe request if (lock->get_type() == CEPH_LOCK_ISNAP || lock->get_type() == CEPH_LOCK_IPOLICY) { ++it; continue; } bool ni = false; rdlock_finish(it++, mut, &ni); if (ni) need_issue.insert(static_cast(lock->get_parent())); } issue_caps_set(need_issue); } void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut) { set need_issue; for (auto it = mut->locks.begin(); it != mut->locks.end(); ) { SimpleLock *lock = it->lock; if (lock->get_type() == CEPH_LOCK_IDFT) { ++it; continue; } bool ni = false; wrlock_finish(it++, mut, &ni); if (ni) need_issue.insert(static_cast(lock->get_parent())); } issue_caps_set(need_issue); } // generics void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSInternalContextBase::vec *pfinishers) { dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(!lock->is_stable()); int next = lock->get_next_state(); CInode *in = 0; bool caps = lock->get_cap_shift(); if (lock->get_type() != CEPH_LOCK_DN) in = static_cast(lock->get_parent()); bool need_issue = false; int loner_issued = 0, other_issued = 0, xlocker_issued = 0; ceph_assert(!caps || in != NULL); if (caps && in->is_head()) { in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, lock->get_cap_shift(), lock->get_cap_mask()); dout(10) << " next state is " << lock->get_state_name(next) << " issued/allows loner " << gcap_string(loner_issued) << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next)) << " xlocker " << gcap_string(xlocker_issued) << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next)) << " other " << gcap_string(other_issued) << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next)) << dendl; if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) || (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) || (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued))) need_issue = true; } #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH))) bool auth = lock->get_parent()->is_auth(); if (!lock->is_gathering() && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) && (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) && !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind! (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 && (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 && (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) && lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds lock->get_state() != LOCK_MIX_SYNC2 ) { dout(7) << "eval_gather finished gather on " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_sm() == &sm_filelock) { ceph_assert(in); if (in->state_test(CInode::STATE_RECOVERING)) { dout(7) << "eval_gather finished gather, but still recovering" << dendl; return; } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) { dout(7) << "eval_gather finished gather, but need to recover" << dendl; mds->mdcache->queue_file_recover(in); mds->mdcache->do_file_recover(); return; } } if (!lock->get_parent()->is_auth()) { // replica: tell auth mds_rank_t auth = lock->get_parent()->authority().first; if (lock->get_parent()->is_rejoining() && mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { dout(7) << "eval_gather finished gather, but still rejoining " << *lock->get_parent() << dendl; return; } if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { switch (lock->get_state()) { case LOCK_SYNC_LOCK: mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth); break; case LOCK_MIX_SYNC: { auto reply = MLock::create(lock, LOCK_AC_SYNCACK, mds->get_nodeid()); lock->encode_locked_state(reply->get_data()); mds->send_message_mds(reply, auth); next = LOCK_MIX_SYNC2; (static_cast(lock))->start_flush(); } break; case LOCK_MIX_SYNC2: (static_cast(lock))->finish_flush(); (static_cast(lock))->clear_flushed(); case LOCK_SYNC_MIX2: // do nothing, we already acked break; case LOCK_SYNC_MIX: { auto reply = MLock::create(lock, LOCK_AC_MIXACK, mds->get_nodeid()); mds->send_message_mds(reply, auth); next = LOCK_SYNC_MIX2; } break; case LOCK_MIX_LOCK: { bufferlist data; lock->encode_locked_state(data); mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth); (static_cast(lock))->start_flush(); // we'll get an AC_LOCKFLUSHED to complete } break; default: ceph_abort(); } } } else { // auth // once the first (local) stage of mix->lock gather complete we can // gather from replicas if (lock->get_state() == LOCK_MIX_LOCK && lock->get_parent()->is_replicated()) { dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl; send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); lock->set_state(LOCK_MIX_LOCK2); return; } if (lock->is_dirty() && !lock->is_flushed()) { scatter_writebehind(static_cast(lock)); mds->mdlog->flush(); return; } lock->clear_flushed(); switch (lock->get_state()) { // to mixed case LOCK_TSYN_MIX: case LOCK_SYNC_MIX: case LOCK_EXCL_MIX: case LOCK_XSYN_MIX: in->start_scatter(static_cast(lock)); if (lock->get_parent()->is_replicated()) { bufferlist softdata; lock->encode_locked_state(softdata); send_lock_message(lock, LOCK_AC_MIX, softdata); } (static_cast(lock))->clear_scatter_wanted(); break; case LOCK_XLOCK: case LOCK_XLOCKDONE: if (next != LOCK_SYNC) break; // fall-thru // to sync case LOCK_EXCL_SYNC: case LOCK_LOCK_SYNC: case LOCK_MIX_SYNC: case LOCK_XSYN_SYNC: if (lock->get_parent()->is_replicated()) { bufferlist softdata; lock->encode_locked_state(softdata); send_lock_message(lock, LOCK_AC_SYNC, softdata); } break; } } lock->set_state(next); if (lock->get_parent()->is_auth() && lock->is_stable()) lock->get_parent()->auth_unpin(lock); // drop loner before doing waiters if (caps && in->is_head() && in->is_auth() && in->get_wanted_loner() != in->get_loner()) { dout(10) << " trying to drop loner" << dendl; if (in->try_drop_loner()) { dout(10) << " dropped loner" << dendl; need_issue = true; } } if (pfinishers) lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK, *pfinishers); else lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK); if (caps && in->is_head()) need_issue = true; if (lock->get_parent()->is_auth() && lock->is_stable()) try_eval(lock, &need_issue); } if (need_issue) { if (pneed_issue) *pneed_issue = true; else if (in->is_head()) issue_caps(in); } } bool Locker::eval(CInode *in, int mask, bool caps_imported) { bool need_issue = caps_imported; MDSInternalContextBase::vec finishers; dout(10) << "eval " << mask << " " << *in << dendl; // choose loner? if (in->is_auth() && in->is_head()) { client_t orig_loner = in->get_loner(); if (in->choose_ideal_loner()) { dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl; need_issue = true; mask = -1; } else if (in->get_wanted_loner() != in->get_loner()) { dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl; mask = -1; } } retry: if (mask & CEPH_LOCK_IFILE) eval_any(&in->filelock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IAUTH) eval_any(&in->authlock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_ILINK) eval_any(&in->linklock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IXATTR) eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_INEST) eval_any(&in->nestlock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IFLOCK) eval_any(&in->flocklock, &need_issue, &finishers, caps_imported); if (mask & CEPH_LOCK_IPOLICY) eval_any(&in->policylock, &need_issue, &finishers, caps_imported); // drop loner? if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) { if (in->try_drop_loner()) { need_issue = true; if (in->get_wanted_loner() >= 0) { dout(10) << "eval end set loner to client." << in->get_loner() << dendl; bool ok = in->try_set_loner(); ceph_assert(ok); mask = -1; goto retry; } } } finish_contexts(g_ceph_context, finishers); if (need_issue && in->is_head()) issue_caps(in); dout(10) << "eval done" << dendl; return need_issue; } class C_Locker_Eval : public LockerContext { MDSCacheObject *p; int mask; public: C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) { // We are used as an MDSCacheObject waiter, so should // only be invoked by someone already holding the big lock. ceph_assert(locker->mds->mds_lock.is_locked_by_me()); p->get(MDSCacheObject::PIN_PTRWAITER); } void finish(int r) override { locker->try_eval(p, mask); p->put(MDSCacheObject::PIN_PTRWAITER); } }; void Locker::try_eval(MDSCacheObject *p, int mask) { // unstable and ambiguous auth? if (p->is_ambiguous_auth()) { dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask)); return; } if (p->is_auth() && p->is_frozen()) { dout(7) << "try_eval frozen, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask)); return; } if (mask & CEPH_LOCK_DN) { ceph_assert(mask == CEPH_LOCK_DN); bool need_issue = false; // ignore this, no caps on dentries CDentry *dn = static_cast(p); eval_any(&dn->lock, &need_issue); } else { CInode *in = static_cast(p); eval(in, mask); } } void Locker::try_eval(SimpleLock *lock, bool *pneed_issue) { MDSCacheObject *p = lock->get_parent(); // unstable and ambiguous auth? if (p->is_ambiguous_auth()) { dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type())); return; } if (!p->is_auth()) { dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl; return; } if (p->is_frozen()) { dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); return; } /* * We could have a situation like: * * - mds A authpins item on mds B * - mds B starts to freeze tree containing item * - mds A tries wrlock_start on A, sends REQSCATTER to B * - mds B lock is unstable, sets scatter_wanted * - mds B lock stabilizes, calls try_eval. * * We can defer while freezing without causing a deadlock. Honor * scatter_wanted flag here. This will never get deferred by the * checks above due to the auth_pin held by the master. */ if (lock->is_scatterlock()) { ScatterLock *slock = static_cast(lock); if (slock->get_scatter_wanted() && slock->get_state() != LOCK_MIX) { scatter_mix(slock, pneed_issue); if (!lock->is_stable()) return; } else if (slock->get_unscatter_wanted() && slock->get_state() != LOCK_LOCK) { simple_lock(slock, pneed_issue); if (!lock->is_stable()) { return; } } } if (lock->get_type() != CEPH_LOCK_DN && lock->get_type() != CEPH_LOCK_ISNAP && p->is_freezing()) { dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl; p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type())); return; } eval(lock, pneed_issue); } void Locker::eval_cap_gather(CInode *in, set *issue_set) { bool need_issue = false; MDSInternalContextBase::vec finishers; // kick locks now if (!in->filelock.is_stable()) eval_gather(&in->filelock, false, &need_issue, &finishers); if (!in->authlock.is_stable()) eval_gather(&in->authlock, false, &need_issue, &finishers); if (!in->linklock.is_stable()) eval_gather(&in->linklock, false, &need_issue, &finishers); if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock, false, &need_issue, &finishers); if (need_issue && in->is_head()) { if (issue_set) issue_set->insert(in); else issue_caps(in); } finish_contexts(g_ceph_context, finishers); } void Locker::eval_scatter_gathers(CInode *in) { bool need_issue = false; MDSInternalContextBase::vec finishers; dout(10) << "eval_scatter_gathers " << *in << dendl; // kick locks now if (!in->filelock.is_stable()) eval_gather(&in->filelock, false, &need_issue, &finishers); if (!in->nestlock.is_stable()) eval_gather(&in->nestlock, false, &need_issue, &finishers); if (!in->dirfragtreelock.is_stable()) eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers); if (need_issue && in->is_head()) issue_caps(in); finish_contexts(g_ceph_context, finishers); } void Locker::eval(SimpleLock *lock, bool *need_issue) { switch (lock->get_type()) { case CEPH_LOCK_IFILE: return file_eval(static_cast(lock), need_issue); case CEPH_LOCK_IDFT: case CEPH_LOCK_INEST: return scatter_eval(static_cast(lock), need_issue); default: return simple_eval(lock, need_issue); } } // ------------------ // rdlock bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon) { // kick the lock if (lock->is_stable()) { if (lock->get_parent()->is_auth()) { if (lock->get_sm() == &sm_scatterlock) { // not until tempsync is fully implemented //if (lock->get_parent()->is_replicated()) //scatter_tempsync((ScatterLock*)lock); //else simple_sync(lock); } else if (lock->get_sm() == &sm_filelock) { CInode *in = static_cast(lock->get_parent()); if (lock->get_state() == LOCK_EXCL && in->get_target_loner() >= 0 && !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN file_xsyn(lock); else simple_sync(lock); } else simple_sync(lock); return true; } else { // request rdlock state change from auth mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { dout(10) << "requesting rdlock from auth on " << *lock << " on " << *lock->get_parent() << dendl; mds->send_message_mds(MLock::create(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth); } return false; } } if (lock->get_type() == CEPH_LOCK_IFILE) { CInode *in = static_cast(lock->get_parent()); if (in->state_test(CInode::STATE_RECOVERING)) { mds->mdcache->recovery_queue.prioritize(in); } } return false; } bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con) { dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl; // can read? grab ref. if (lock->can_rdlock(client)) return true; _rdlock_kick(lock, false); if (lock->can_rdlock(client)) return true; // wait! if (con) { dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con); } return false; } bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon) { dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl; // client may be allowed to rdlock the same item it has xlocked. // UNLESS someone passes in as_anon, or we're reading snapped version here. if (mut->snapid != CEPH_NOSNAP) as_anon = true; client_t client = as_anon ? -1 : mut->get_client(); CInode *in = 0; if (lock->get_type() != CEPH_LOCK_DN) in = static_cast(lock->get_parent()); /* if (!lock->get_parent()->is_auth() && lock->fw_rdlock_to_auth()) { mdcache->request_forward(mut, lock->get_parent()->authority().first); return false; } */ while (1) { // can read? grab ref. if (lock->can_rdlock(client)) { lock->get_rdlock(); mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK); return true; } // hmm, wait a second. if (in && !in->is_head() && in->is_auth() && lock->get_state() == LOCK_SNAP_SYNC) { // okay, we actually need to kick the head's lock to get ourselves synced up. CInode *head = mdcache->get_inode(in->ino()); ceph_assert(head); SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE); if (hlock->get_state() == LOCK_SYNC) hlock = head->get_lock(lock->get_type()); if (hlock->get_state() != LOCK_SYNC) { dout(10) << "rdlock_start trying head inode " << *head << dendl; if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL ** return false; // oh, check our lock again then } } if (!_rdlock_kick(lock, as_anon)) break; } // wait! int wait_on; if (lock->get_parent()->is_auth() && lock->is_stable()) wait_on = SimpleLock::WAIT_RD; else wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry. dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut)); nudge_log(lock); return false; } void Locker::nudge_log(SimpleLock *lock) { dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush mds->mdlog->flush(); } void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue) { ceph_assert(it->is_rdlock()); SimpleLock *lock = it->lock; // drop ref lock->put_rdlock(); if (mut) mut->locks.erase(it); dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; // last one? if (!lock->is_rdlocked()) { if (!lock->is_stable()) eval_gather(lock, false, pneed_issue); else if (lock->get_parent()->is_auth()) try_eval(lock, pneed_issue); } } bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov) { dout(10) << "can_rdlock_set " << dendl; for (const auto& p : lov) { ceph_assert(p.is_rdlock()); if (!p.lock->can_rdlock(-1)) { dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl; return false; } } return true; } void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut) { dout(10) << "rdlock_take_set " << dendl; for (const auto& p : lov) { ceph_assert(p.is_rdlock()); p.lock->get_rdlock(); mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK); } } // ------------------ // wrlock void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_wrlock_grab(static_cast(lock), mut); dout(7) << "wrlock_force on " << *lock << " on " << *lock->get_parent() << dendl; lock->get_wrlock(true); mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK); } bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_wrlock_start(static_cast(lock), mut); dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl; CInode *in = static_cast(lock->get_parent()); client_t client = mut->get_client(); bool want_scatter = !nowait && lock->get_parent()->is_auth() && (in->has_subtree_or_exporting_dirfrag() || static_cast(lock)->get_scatter_wanted()); while (1) { // wrlock? if (lock->can_wrlock(client) && (!want_scatter || lock->get_state() == LOCK_MIX)) { lock->get_wrlock(); auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK); it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked return true; } if (lock->get_type() == CEPH_LOCK_IFILE && in->state_test(CInode::STATE_RECOVERING)) { mds->mdcache->recovery_queue.prioritize(in); } if (!lock->is_stable()) break; if (in->is_auth()) { // don't do nested lock state change if we have dirty scatterdata and // may scatter_writebehind or start_scatter, because nowait==true implies // that the caller already has a log entry open! if (nowait && lock->is_dirty()) return false; if (want_scatter) scatter_mix(static_cast(lock)); else simple_lock(lock); if (nowait && !lock->can_wrlock(client)) return false; } else { // replica. // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case). mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { dout(10) << "requesting scatter from auth on " << *lock << " on " << *lock->get_parent() << dendl; mds->send_message_mds(MLock::create(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth); } break; } } if (!nowait) { dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); nudge_log(lock); } return false; } void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue) { ceph_assert(it->is_wrlock()); SimpleLock* lock = it->lock; if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_wrlock_finish(it, mut); dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; lock->put_wrlock(); if (it->is_remote_wrlock()) it->clear_wrlock(); else mut->locks.erase(it); if (!lock->is_wrlocked()) { if (!lock->is_stable()) eval_gather(lock, false, pneed_issue); else if (lock->get_parent()->is_auth()) try_eval(lock, pneed_issue); } } // remote wrlock void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut) { dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl; // wait for active target if (mds->is_cluster_degraded() && !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) { dout(7) << " mds." << target << " is not active" << dendl; if (mut->more()->waiting_on_slave.empty()) mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut)); return; } // send lock request mut->start_locking(lock, target); mut->more()->slaves.insert(target); auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK); r->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(r->get_object_info()); mds->send_message_mds(r, target); ceph_assert(mut->more()->waiting_on_slave.count(target) == 0); mut->more()->waiting_on_slave.insert(target); } void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut) { ceph_assert(it->is_remote_wrlock()); SimpleLock *lock = it->lock; mds_rank_t target = it->wrlock_target; if (it->is_wrlock()) it->clear_remote_wrlock(); else mut->locks.erase(it); dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target << " " << *lock->get_parent() << dendl; if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) { auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK); slavereq->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(slavereq->get_object_info()); mds->send_message_mds(slavereq, target); } } // ------------------ // xlock bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut) { if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_xlock_start(static_cast(lock), mut); dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl; client_t client = mut->get_client(); // auth? if (lock->get_parent()->is_auth()) { // auth while (1) { if (lock->can_xlock(client)) { lock->set_state(LOCK_XLOCK); lock->get_xlock(mut, client); mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK); mut->finish_locking(lock); return true; } if (lock->get_type() == CEPH_LOCK_IFILE) { CInode *in = static_cast(lock->get_parent()); if (in->state_test(CInode::STATE_RECOVERING)) { mds->mdcache->recovery_queue.prioritize(in); } } if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE || lock->get_xlock_by_client() != client || lock->is_waiter_for(SimpleLock::WAIT_STABLE))) break; if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) { mut->start_locking(lock); simple_xlock(lock); } else { simple_lock(lock); } } lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); nudge_log(lock); return false; } else { // replica ceph_assert(lock->get_sm()->can_remote_xlock); ceph_assert(!mut->slave_request); // wait for single auth if (lock->get_parent()->is_ambiguous_auth()) { lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mut)); return false; } // wait for active auth mds_rank_t auth = lock->get_parent()->authority().first; if (mds->is_cluster_degraded() && !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { dout(7) << " mds." << auth << " is not active" << dendl; if (mut->more()->waiting_on_slave.empty()) mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut)); return false; } // send lock request mut->more()->slaves.insert(auth); mut->start_locking(lock, auth); auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK); r->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(r->get_object_info()); mds->send_message_mds(r, auth); ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0); mut->more()->waiting_on_slave.insert(auth); return false; } } void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue) { ceph_assert(!lock->is_stable()); if (lock->get_type() != CEPH_LOCK_DN && lock->get_type() != CEPH_LOCK_ISNAP && lock->get_num_rdlocks() == 0 && lock->get_num_wrlocks() == 0 && !lock->is_leased() && lock->get_state() != LOCK_XLOCKSNAP) { CInode *in = static_cast(lock->get_parent()); client_t loner = in->get_target_loner(); if (loner >= 0 && (xlocker < 0 || xlocker == loner)) { lock->set_state(LOCK_EXCL); lock->get_parent()->auth_unpin(lock); lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD); if (lock->get_cap_shift()) *pneed_issue = true; if (lock->get_parent()->is_auth() && lock->is_stable()) try_eval(lock, pneed_issue); return; } } // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue); } void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue) { ceph_assert(it->is_xlock()); SimpleLock *lock = it->lock; if (lock->get_type() == CEPH_LOCK_IVERSION || lock->get_type() == CEPH_LOCK_DVERSION) return local_xlock_finish(it, mut); dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl; client_t xlocker = lock->get_xlock_by_client(); // drop ref lock->put_xlock(); ceph_assert(mut); mut->locks.erase(it); bool do_issue = false; // remote xlock? if (!lock->get_parent()->is_auth()) { ceph_assert(lock->get_sm()->can_remote_xlock); // tell auth dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl; mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK); slavereq->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(slavereq->get_object_info()); mds->send_message_mds(slavereq, auth); } // others waiting? lock->finish_waiters(SimpleLock::WAIT_STABLE | SimpleLock::WAIT_WR | SimpleLock::WAIT_RD, 0); } else { if (lock->get_num_xlocks() == 0) { if (lock->get_state() == LOCK_LOCK_XLOCK) lock->set_state(LOCK_XLOCKDONE); _finish_xlock(lock, xlocker, &do_issue); } } if (do_issue) { CInode *in = static_cast(lock->get_parent()); if (in->is_head()) { if (pneed_issue) *pneed_issue = true; else issue_caps(in); } } } void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut) { ceph_assert(it->is_xlock()); SimpleLock *lock = it->lock; dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl; lock->put_xlock(); mut->locks.erase(it); MDSCacheObject *p = lock->get_parent(); ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode) if (!lock->is_stable()) lock->get_parent()->auth_unpin(lock); lock->set_state(LOCK_LOCK); } void Locker::xlock_import(SimpleLock *lock) { dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl; lock->get_parent()->auth_pin(lock); } // file i/o ----------------------------------------- version_t Locker::issue_file_data_version(CInode *in) { dout(7) << "issue_file_data_version on " << *in << dendl; return in->inode.file_data_version; } class C_Locker_FileUpdate_finish : public LockerLogContext { CInode *in; MutationRef mut; unsigned flags; client_t client; MClientCaps::ref ack; public: C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f, const MClientCaps::ref &ack, client_t c=-1) : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { locker->file_update_finish(in, mut, flags, client, ack); in->put(CInode::PIN_PTRWAITER); } }; enum { UPDATE_SHAREMAX = 1, UPDATE_NEEDSISSUE = 2, UPDATE_SNAPFLUSH = 4, }; void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags, client_t client, const MClientCaps::ref &ack) { dout(10) << "file_update_finish on " << *in << dendl; in->pop_and_dirty_projected_inode(mut->ls); mut->apply(); if (ack) { Session *session = mds->get_session(client); if (session) { // "oldest flush tid" > 0 means client uses unique TID for each flush if (ack->get_oldest_flush_tid() > 0) session->add_completed_flush(ack->get_client_tid()); mds->send_message_client_counted(ack, session); } else { dout(10) << " no session for client." << client << " " << *ack << dendl; } } set need_issue; drop_locks(mut.get(), &need_issue); if (in->is_head()) { if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) { Capability *cap = in->get_client_cap(client); if (cap && (cap->wanted() & ~cap->pending())) issue_caps(in, cap); } if ((flags & UPDATE_SHAREMAX) && in->is_auth() && (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER))) share_inode_max_size(in); } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) { dout(10) << " client_snap_caps " << in->client_snap_caps << dendl; // check for snap writeback completion bool gather = false; auto p = in->client_snap_caps.begin(); while (p != in->client_snap_caps.end()) { auto q = p->second.find(client); if (q != p->second.end()) { SimpleLock *lock = in->get_lock(p->first); ceph_assert(lock); dout(10) << " completing client_snap_caps for " << ccap_string(p->first) << " lock " << *lock << " on " << *in << dendl; lock->put_wrlock(); p->second.erase(q); if (p->second.empty()) { gather = true; in->client_snap_caps.erase(p++); } else ++p; } } if (gather) { if (in->client_snap_caps.empty()) { in->item_open_file.remove_myself(); in->item_caps.remove_myself(); } eval_cap_gather(in, &need_issue); } } issue_caps_set(need_issue); mds->balancer->hit_inode(in, META_POP_IWR); // auth unpin after issuing caps mut->cleanup(); } Capability* Locker::issue_new_caps(CInode *in, int mode, Session *session, SnapRealm *realm, bool is_replay) { dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl; bool is_new; // if replay, try to reconnect cap, and otherwise do nothing. if (is_replay) { mds->mdcache->try_reconnect_cap(in, session); return 0; } // my needs ceph_assert(session->info.inst.name.is_client()); client_t my_client = session->get_client(); int my_want = ceph_caps_for_mode(mode); // register a capability Capability *cap = in->get_client_cap(my_client); if (!cap) { // new cap cap = in->add_client_cap(my_client, session, realm); cap->set_wanted(my_want); cap->mark_new(); cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply) is_new = true; } else { is_new = false; // make sure it wants sufficient caps if (my_want & ~cap->wanted()) { // augment wanted caps for this client cap->set_wanted(cap->wanted() | my_want); } } if (in->is_auth()) { // [auth] twiddle mode? eval(in, CEPH_CAP_LOCKS); if (_need_flush_mdlog(in, my_want)) mds->mdlog->flush(); } else { // [replica] tell auth about any new caps wanted request_inode_file_caps(in); } // issue caps (pot. incl new one) //issue_caps(in); // note: _eval above may have done this already... // re-issue whatever we can //cap->issue(cap->pending()); if (is_new) cap->dec_suppress(); return cap; } void Locker::issue_caps_set(set& inset) { for (set::iterator p = inset.begin(); p != inset.end(); ++p) issue_caps(*p); } bool Locker::issue_caps(CInode *in, Capability *only_cap) { // allowed caps are determined by the lock mode. int all_allowed = in->get_caps_allowed_by_type(CAP_ANY); int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER); int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER); client_t loner = in->get_loner(); if (loner >= 0) { dout(7) << "issue_caps loner client." << loner << " allowed=" << ccap_string(loner_allowed) << ", xlocker allowed=" << ccap_string(xlocker_allowed) << ", others allowed=" << ccap_string(all_allowed) << " on " << *in << dendl; } else { dout(7) << "issue_caps allowed=" << ccap_string(all_allowed) << ", xlocker allowed=" << ccap_string(xlocker_allowed) << " on " << *in << dendl; } ceph_assert(in->is_head()); // count conflicts with int nissued = 0; // client caps map::iterator it; if (only_cap) it = in->client_caps.find(only_cap->get_client()); else it = in->client_caps.begin(); for (; it != in->client_caps.end(); ++it) { Capability *cap = &it->second; if (cap->is_stale()) continue; // do not issue _new_ bits when size|mtime is projected int allowed; if (loner == it->first) allowed = loner_allowed; else allowed = all_allowed; // add in any xlocker-only caps (for locks this client is the xlocker for) allowed |= xlocker_allowed & in->get_xlocker_mask(it->first); Session *session = mds->get_session(it->first); if (in->inode.inline_data.version != CEPH_INLINE_NONE && !(session && session->get_connection() && session->get_connection()->has_feature(CEPH_FEATURE_MDS_INLINE_DATA))) allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR); int pending = cap->pending(); int wanted = cap->wanted(); dout(20) << " client." << it->first << " pending " << ccap_string(pending) << " allowed " << ccap_string(allowed) << " wanted " << ccap_string(wanted) << dendl; if (!(pending & ~allowed)) { // skip if suppress or new, and not revocation if (cap->is_new() || cap->is_suppress()) { dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl; continue; } } // notify clients about deleted inode, to make sure they release caps ASAP. if (in->inode.nlink == 0) wanted |= CEPH_CAP_LINK_SHARED; // are there caps that the client _wants_ and can have, but aren't pending? // or do we need to revoke? if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps (pending & ~allowed)) { // need to revoke ~allowed caps. // issue nissued++; // include caps that clients generally like, while we're at it. int likes = in->get_caps_liked(); int before = pending; long seq; if (pending & ~allowed) seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new. else seq = cap->issue((wanted|likes) & allowed); int after = cap->pending(); if (cap->is_new()) { // haven't send caps to client yet if (before & ~after) cap->confirm_receipt(seq, after); } else { dout(7) << " sending MClientCaps to client." << it->first << " seq " << cap->get_last_seq() << " new pending " << ccap_string(after) << " was " << ccap_string(before) << dendl; int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT; if (op == CEPH_CAP_OP_REVOKE) { revoking_caps.push_back(&cap->item_revoking_caps); revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps); cap->set_last_revoke_stamp(ceph_clock_now()); cap->reset_num_revoke_warnings(); } auto m = MClientCaps::create(op, in->ino(), in->find_snaprealm()->inode->ino(), cap->get_cap_id(), cap->get_last_seq(), after, wanted, 0, cap->get_mseq(), mds->get_osd_epoch_barrier()); in->encode_cap_message(m, cap); mds->send_message_client_counted(m, it->first); } } if (only_cap) break; } return (nissued == 0); // true if no re-issued, no callbacks } void Locker::issue_truncate(CInode *in) { dout(7) << "issue_truncate on " << *in << dendl; for (auto &p : in->client_caps) { Capability *cap = &p.second; auto m = MClientCaps::create(CEPH_CAP_OP_TRUNC, in->ino(), in->find_snaprealm()->inode->ino(), cap->get_cap_id(), cap->get_last_seq(), cap->pending(), cap->wanted(), 0, cap->get_mseq(), mds->get_osd_epoch_barrier()); in->encode_cap_message(m, cap); mds->send_message_client_counted(m, p.first); } // should we increase max_size? if (in->is_auth() && in->is_file()) check_inode_max_size(in); } void Locker::revoke_stale_caps(Capability *cap) { CInode *in = cap->get_inode(); if (in->state_test(CInode::STATE_EXPORTINGCAPS)) { // if export succeeds, the cap will be removed. if export fails, we need to // revoke the cap if it's still stale. in->state_set(CInode::STATE_EVALSTALECAPS); return; } int issued = cap->issued(); if (issued & ~CEPH_CAP_PIN) { dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl; cap->revoke(); if (in->is_auth() && in->inode.client_ranges.count(cap->get_client())) in->state_set(CInode::STATE_NEEDSRECOVER); if (!in->filelock.is_stable()) eval_gather(&in->filelock); if (!in->linklock.is_stable()) eval_gather(&in->linklock); if (!in->authlock.is_stable()) eval_gather(&in->authlock); if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock); if (in->is_auth()) { try_eval(in, CEPH_CAP_LOCKS); } else { request_inode_file_caps(in); } } } void Locker::revoke_stale_caps(Session *session) { dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl; for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { Capability *cap = *p; cap->mark_stale(); revoke_stale_caps(cap); } } void Locker::resume_stale_caps(Session *session) { dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl; for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { Capability *cap = *p; CInode *in = cap->get_inode(); ceph_assert(in->is_head()); if (cap->is_stale()) { dout(10) << " clearing stale flag on " << *in << dendl; cap->clear_stale(); if (in->state_test(CInode::STATE_EXPORTINGCAPS)) { // if export succeeds, the cap will be removed. if export fails, // we need to re-issue the cap if it's not stale. in->state_set(CInode::STATE_EVALSTALECAPS); continue; } if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS)) issue_caps(in, cap); } } } void Locker::remove_stale_leases(Session *session) { dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl; xlist::iterator p = session->leases.begin(); while (!p.end()) { ClientLease *l = *p; ++p; CDentry *parent = static_cast(l->parent); dout(15) << " removing lease on " << *parent << dendl; parent->remove_client_lease(l, this); } } class C_MDL_RequestInodeFileCaps : public LockerContext { CInode *in; public: C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { if (!in->is_auth()) locker->request_inode_file_caps(in); in->put(CInode::PIN_PTRWAITER); } }; void Locker::request_inode_file_caps(CInode *in) { ceph_assert(!in->is_auth()); int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN; if (wanted != in->replica_caps_wanted) { // wait for single auth if (in->is_ambiguous_auth()) { in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDL_RequestInodeFileCaps(this, in)); return; } mds_rank_t auth = in->authority().first; if (mds->is_cluster_degraded() && mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) { mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in)); return; } dout(7) << "request_inode_file_caps " << ccap_string(wanted) << " was " << ccap_string(in->replica_caps_wanted) << " on " << *in << " to mds." << auth << dendl; in->replica_caps_wanted = wanted; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) mds->send_message_mds(MInodeFileCaps::create(in->ino(), in->replica_caps_wanted), auth); } } void Locker::handle_inode_file_caps(const MInodeFileCaps::const_ref &m) { // nobody should be talking to us during recovery. ceph_assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); // ok CInode *in = mdcache->get_inode(m->get_ino()); mds_rank_t from = mds_rank_t(m->get_source().num()); ceph_assert(in); ceph_assert(in->is_auth()); dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl; in->set_mds_caps_wanted(from, m->get_caps()); try_eval(in, CEPH_CAP_LOCKS); } class C_MDL_CheckMaxSize : public LockerContext { CInode *in; uint64_t new_max_size; uint64_t newsize; utime_t mtime; public: C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size, uint64_t _newsize, utime_t _mtime) : LockerContext(l), in(i), new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { if (in->is_auth()) locker->check_inode_max_size(in, false, new_max_size, newsize, mtime); in->put(CInode::PIN_PTRWAITER); } }; uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size) { uint64_t new_max = (size + 1) << 1; uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs; if (max_inc > 0) { max_inc *= pi->layout.object_size; new_max = std::min(new_max, size + max_inc); } return round_up_to(new_max, pi->get_layout_size_increment()); } void Locker::calc_new_client_ranges(CInode *in, uint64_t size, CInode::mempool_inode::client_range_map *new_ranges, bool *max_increased) { auto latest = in->get_projected_inode(); uint64_t ms; if(latest->has_layout()) { ms = calc_new_max_size(latest, size); } else { // Layout-less directories like ~mds0/, have zero size ms = 0; } // increase ranges as appropriate. // shrink to 0 if no WR|BUFFER caps issued. for (const auto &p : in->get_client_caps()) { if ((p.second.issued() | p.second.wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) { client_writeable_range_t& nr = (*new_ranges)[p.first]; nr.range.first = 0; if (latest->client_ranges.count(p.first)) { client_writeable_range_t& oldr = latest->client_ranges[p.first]; if (ms > oldr.range.last) *max_increased = true; nr.range.last = std::max(ms, oldr.range.last); nr.follows = oldr.follows; } else { *max_increased = true; nr.range.last = ms; nr.follows = in->first - 1; } } } } bool Locker::check_inode_max_size(CInode *in, bool force_wrlock, uint64_t new_max_size, uint64_t new_size, utime_t new_mtime) { ceph_assert(in->is_auth()); ceph_assert(in->is_file()); CInode::mempool_inode *latest = in->get_projected_inode(); CInode::mempool_inode::client_range_map new_ranges; uint64_t size = latest->size; bool update_size = new_size > 0; bool update_max = false; bool max_increased = false; if (update_size) { new_size = size = std::max(size, new_size); new_mtime = std::max(new_mtime, latest->mtime); if (latest->size == new_size && latest->mtime == new_mtime) update_size = false; } calc_new_client_ranges(in, std::max(new_max_size, size), &new_ranges, &max_increased); if (max_increased || latest->client_ranges != new_ranges) update_max = true; if (!update_size && !update_max) { dout(20) << "check_inode_max_size no-op on " << *in << dendl; return false; } dout(10) << "check_inode_max_size new_ranges " << new_ranges << " update_size " << update_size << " on " << *in << dendl; if (in->is_frozen()) { dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl; C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime); in->add_waiter(CInode::WAIT_UNFREEZE, cms); return false; } if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) { // lock? if (in->filelock.is_stable()) { if (in->get_target_loner() >= 0) file_excl(&in->filelock); else simple_lock(&in->filelock); } if (!in->filelock.can_wrlock(in->get_loner())) { // try again later C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime); in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl; return false; } } MutationRef mut(new MutationImpl()); mut->ls = mds->mdlog->get_current_segment(); auto &pi = in->project_inode(); pi.inode.version = in->pre_dirty(); if (update_max) { dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl; pi.inode.client_ranges = new_ranges; } if (update_size) { dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl; pi.inode.size = new_size; pi.inode.rstat.rbytes = new_size; dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl; pi.inode.mtime = new_mtime; if (new_mtime > pi.inode.ctime) { pi.inode.ctime = new_mtime; if (new_mtime > pi.inode.rstat.rctime) pi.inode.rstat.rctime = new_mtime; } } // use EOpen if the file is still open; otherwise, use EUpdate. // this is just an optimization to push open files forward into // newer log segments. LogEvent *le; EMetaBlob *metablob; if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) { EOpen *eo = new EOpen(mds->mdlog); eo->add_ino(in->ino()); metablob = &eo->metablob; le = eo; } else { EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size"); metablob = &eu->metablob; le = eu; } mds->mdlog->start_entry(le); if (update_size) { // FIXME if/when we do max_size nested accounting mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY); // no cow, here! CDentry *parent = in->get_projected_parent_dn(); metablob->add_primary_dentry(parent, in, true); } else { metablob->add_dir_context(in->get_projected_parent_dn()->get_dir()); mdcache->journal_dirty_inode(mut.get(), metablob, in); } mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SHAREMAX, MClientCaps::ref())); wrlock_force(&in->filelock, mut); // wrlock for duration of journal mut->auth_pin(in); // make max_size _increase_ timely if (max_increased) mds->mdlog->flush(); return true; } void Locker::share_inode_max_size(CInode *in, Capability *only_cap) { /* * only share if currently issued a WR cap. if client doesn't have it, * file_max doesn't matter, and the client will get it if/when they get * the cap later. */ dout(10) << "share_inode_max_size on " << *in << dendl; map::iterator it; if (only_cap) it = in->client_caps.find(only_cap->get_client()); else it = in->client_caps.begin(); for (; it != in->client_caps.end(); ++it) { const client_t client = it->first; Capability *cap = &it->second; if (cap->is_suppress()) continue; if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) { dout(10) << "share_inode_max_size with client." << client << dendl; cap->inc_last_seq(); auto m = MClientCaps::create(CEPH_CAP_OP_GRANT, in->ino(), in->find_snaprealm()->inode->ino(), cap->get_cap_id(), cap->get_last_seq(), cap->pending(), cap->wanted(), 0, cap->get_mseq(), mds->get_osd_epoch_barrier()); in->encode_cap_message(m, cap); mds->send_message_client_counted(m, client); } if (only_cap) break; } } bool Locker::_need_flush_mdlog(CInode *in, int wanted) { /* flush log if caps are wanted by client but corresponding lock is unstable and locked by * pending mutations. */ if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) && in->filelock.is_unstable_and_locked()) || ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) && in->authlock.is_unstable_and_locked()) || ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) && in->linklock.is_unstable_and_locked()) || ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) && in->xattrlock.is_unstable_and_locked())) return true; return false; } void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq) { if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) { dout(10) << " wanted " << ccap_string(cap->wanted()) << " -> " << ccap_string(wanted) << dendl; cap->set_wanted(wanted); } else if (wanted & ~cap->wanted()) { dout(10) << " wanted " << ccap_string(cap->wanted()) << " -> " << ccap_string(wanted) << " (added caps even though we had seq mismatch!)" << dendl; cap->set_wanted(wanted | cap->wanted()); } else { dout(10) << " NOT changing wanted " << ccap_string(cap->wanted()) << " -> " << ccap_string(wanted) << " (issue_seq " << issue_seq << " != last_issue " << cap->get_last_issue() << ")" << dendl; return; } CInode *cur = cap->get_inode(); if (!cur->is_auth()) { request_inode_file_caps(cur); return; } if (cap->wanted()) { if (cur->state_test(CInode::STATE_RECOVERING) && (cap->wanted() & (CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR))) { mds->mdcache->recovery_queue.prioritize(cur); } if (mdcache->open_file_table.should_log_open(cur)) { ceph_assert(cur->last == CEPH_NOSNAP); EOpen *le = new EOpen(mds->mdlog); mds->mdlog->start_entry(le); le->add_clean_inode(cur); mds->mdlog->submit_entry(le); } } } void Locker::snapflush_nudge(CInode *in) { ceph_assert(in->last != CEPH_NOSNAP); if (in->client_snap_caps.empty()) return; CInode *head = mdcache->get_inode(in->ino()); ceph_assert(head); ceph_assert(head->is_auth()); if (head->client_need_snapflush.empty()) return; SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE); if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) { hlock = NULL; for (int i = 0; i < num_cinode_locks; i++) { SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock); if (lock->get_state() != LOCK_SYNC && lock->is_stable()) { hlock = lock; break; } } } if (hlock) { _rdlock_kick(hlock, true); } else { // also, requeue, in case of unstable lock need_snapflush_inodes.push_back(&in->item_caps); } } void Locker::mark_need_snapflush_inode(CInode *in) { ceph_assert(in->last != CEPH_NOSNAP); if (!in->item_caps.is_on_list()) { need_snapflush_inodes.push_back(&in->item_caps); utime_t now = ceph_clock_now(); in->last_dirstat_prop = now; dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl; } } void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last) { dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl; for (auto p = head_in->client_need_snapflush.begin(); p != head_in->client_need_snapflush.end() && p->first < last; ) { snapid_t snapid = p->first; auto &clients = p->second; ++p; // be careful, q loop below depends on this if (clients.count(client)) { dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl; CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1); ceph_assert(sin); ceph_assert(sin->first <= snapid); _do_snap_update(sin, snapid, 0, sin->first - 1, client, MClientCaps::ref(), MClientCaps::ref()); head_in->remove_need_snapflush(sin, snapid, client); } } } bool Locker::should_defer_client_cap_frozen(CInode *in) { /* * This policy needs to be AT LEAST as permissive as allowing a client request * to go forward, or else a client request can release something, the release * gets deferred, but the request gets processed and deadlocks because when the * caps can't get revoked. * * Currently, a request wait if anything locked is freezing (can't * auth_pin), which would avoid any deadlock with cap release. Thus @in * _MUST_ be in the lock/auth_pin set. * * auth_pins==0 implies no unstable lock and not auth pinnned by * client request, otherwise continue even it's freezing. */ return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen(); } void Locker::handle_client_caps(const MClientCaps::const_ref &m) { client_t client = m->get_source().num(); snapid_t follows = m->get_snap_follows(); auto op = m->get_op(); auto dirty = m->get_dirty(); dout(7) << "handle_client_caps " << " on " << m->get_ino() << " tid " << m->get_client_tid() << " follows " << follows << " op " << ceph_cap_op_name(op) << " flags 0x" << std::hex << m->flags << std::dec << dendl; Session *session = mds->get_session(m); if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { if (!session) { dout(5) << " no session, dropping " << *m << dendl; return; } if (session->is_closed() || session->is_closing() || session->is_killing()) { dout(7) << " session closed|closing|killing, dropping " << *m << dendl; return; } if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) && dirty && m->get_client_tid() > 0 && !session->have_completed_flush(m->get_client_tid())) { mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty, op == CEPH_CAP_OP_FLUSHSNAP); } mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); return; } if (m->get_client_tid() > 0 && session && session->have_completed_flush(m->get_client_tid())) { dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid() << " for client." << client << dendl; MClientCaps::ref ack; if (op == CEPH_CAP_OP_FLUSHSNAP) { ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier()); } else { ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier()); } ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); mds->send_message_client_counted(ack, m->get_connection()); if (op == CEPH_CAP_OP_FLUSHSNAP) { return; } else { // fall-thru because the message may release some caps dirty = false; op = CEPH_CAP_OP_UPDATE; } } // "oldest flush tid" > 0 means client uses unique TID for each flush if (m->get_oldest_flush_tid() > 0 && session) { if (session->trim_completed_flushes(m->get_oldest_flush_tid())) { mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name); if (session->get_num_trim_flushes_warnings() > 0 && session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes) session->reset_num_trim_flushes_warnings(); } else { if (session->get_num_completed_flushes() >= (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) { session->inc_num_trim_flushes_warnings(); stringstream ss; ss << "client." << session->get_client() << " does not advance its oldest_flush_tid (" << m->get_oldest_flush_tid() << "), " << session->get_num_completed_flushes() << " completed flushes recorded in session"; mds->clog->warn() << ss.str(); dout(20) << __func__ << " " << ss.str() << dendl; } } } CInode *head_in = mdcache->get_inode(m->get_ino()); if (!head_in) { if (mds->is_clientreplay()) { dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", will try again after replayed client requests" << dendl; mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m)); return; } /* * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree * Sequence of events that cause this are: * - client sends caps message to mds.a * - mds finishes subtree migration, send cap export to client * - mds trim its cache * - mds receives cap messages from client */ dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl; return; } if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { // Pause RADOS operations until we see the required epoch mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); } if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { // Record the barrier so that we will retransmit it to clients mds->set_osd_epoch_barrier(m->osd_epoch_barrier); } dout(10) << " head inode " << *head_in << dendl; Capability *cap = 0; cap = head_in->get_client_cap(client); if (!cap) { dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl; return; } ceph_assert(cap); // freezing|frozen? if (should_defer_client_cap_frozen(head_in)) { dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl; head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m)); return; } if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) { dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq() << ", dropping" << dendl; return; } bool need_unpin = false; // flushsnap? if (op == CEPH_CAP_OP_FLUSHSNAP) { if (!head_in->is_auth()) { dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl; goto out; } SnapRealm *realm = head_in->find_snaprealm(); snapid_t snap = realm->get_snap_following(follows); dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl; auto p = head_in->client_need_snapflush.begin(); if (p != head_in->client_need_snapflush.end() && p->first < snap) { head_in->auth_pin(this); // prevent subtree frozen need_unpin = true; _do_null_snapflush(head_in, client, snap); } CInode *in = head_in; if (snap != CEPH_NOSNAP) { in = mdcache->pick_inode_snap(head_in, snap - 1); if (in != head_in) dout(10) << " snapped inode " << *in << dendl; } // we can prepare the ack now, since this FLUSHEDSNAP is independent of any // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst // case we get a dup response, so whatever.) MClientCaps::ref ack; if (dirty) { ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier()); ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); } if (in == head_in || (head_in->client_need_snapflush.count(snap) && head_in->client_need_snapflush[snap].count(client))) { dout(7) << " flushsnap snap " << snap << " client." << client << " on " << *in << dendl; // this cap now follows a later snap (i.e. the one initiating this flush, or later) if (in == head_in) cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq(); _do_snap_update(in, snap, dirty, follows, client, m, ack); if (in != head_in) head_in->remove_need_snapflush(in, snap, client); } else { dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl; if (ack) mds->send_message_client_counted(ack, m->get_connection()); } goto out; } if (cap->get_cap_id() != m->get_cap_id()) { dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl; } else { CInode *in = head_in; if (follows > 0) { in = mdcache->pick_inode_snap(head_in, follows); // intermediate snap inodes while (in != head_in) { ceph_assert(in->last != CEPH_NOSNAP); if (in->is_auth() && dirty) { dout(10) << " updating intermediate snapped inode " << *in << dendl; _do_cap_update(in, NULL, dirty, follows, m, MClientCaps::ref()); } in = mdcache->pick_inode_snap(head_in, in->last); } } // head inode, and cap MClientCaps::ref ack; int caps = m->get_caps(); if (caps & ~cap->issued()) { dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; caps &= cap->issued(); } cap->confirm_receipt(m->get_seq(), caps); dout(10) << " follows " << follows << " retains " << ccap_string(m->get_caps()) << " dirty " << ccap_string(dirty) << " on " << *in << dendl; // missing/skipped snapflush? // The client MAY send a snapflush if it is issued WR/EXCL caps, but // presently only does so when it has actual dirty metadata. But, we // set up the need_snapflush stuff based on the issued caps. // We can infer that the client WONT send a FLUSHSNAP once they have // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap // update/release). if (!head_in->client_need_snapflush.empty()) { if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) { head_in->auth_pin(this); // prevent subtree frozen need_unpin = true; _do_null_snapflush(head_in, client); } else { dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl; } } bool need_snapflush = cap->need_snapflush(); if (dirty && in->is_auth()) { dout(7) << " flush client." << client << " dirty " << ccap_string(dirty) << " seq " << m->get_seq() << " on " << *in << dendl; ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier()); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); // client flushes and releases caps at the same time. make sure MDCache::cow_inode() // properly setup CInode::client_need_snapflush if ((dirty & ~cap->issued()) && !need_snapflush) cap->mark_needsnapflush(); } // filter wanted based on what we could ever give out (given auth/replica status) bool need_flush = m->flags & MClientCaps::FLAG_SYNC; int new_wanted = m->get_wanted(); if (new_wanted != cap->wanted()) { if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) { // exapnding caps. make sure we aren't waiting for a log flush need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending()); } adjust_cap_wanted(cap, new_wanted, m->get_issue_seq()); } bool updated = in->is_auth() && _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush); if (cap->need_snapflush() && (!need_snapflush || !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))) cap->clear_needsnapflush(); if (updated) { eval(in, CEPH_CAP_LOCKS); if (!need_flush && (cap->wanted() & ~cap->pending())) need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending()); } else { // no update, ack now. if (ack) mds->send_message_client_counted(ack, m->get_connection()); bool did_issue = eval(in, CEPH_CAP_LOCKS); if (!did_issue && (cap->wanted() & ~cap->pending())) issue_caps(in, cap); if (cap->get_last_seq() == 0 && (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) { cap->issue_norevoke(cap->issued()); share_inode_max_size(in, cap); } } if (need_flush) mds->mdlog->flush(); } out: if (need_unpin) head_in->auth_unpin(this); } class C_Locker_RetryRequestCapRelease : public LockerContext { client_t client; ceph_mds_request_release item; public: C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) : LockerContext(l), client(c), item(it) { } void finish(int r) override { string dname; MDRequestRef null_ref; locker->process_request_cap_release(null_ref, client, item, dname); } }; void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item, std::string_view dname) { inodeno_t ino = (uint64_t)item.ino; uint64_t cap_id = item.cap_id; int caps = item.caps; int wanted = item.wanted; int seq = item.seq; int issue_seq = item.issue_seq; int mseq = item.mseq; CInode *in = mdcache->get_inode(ino); if (!in) return; if (dname.length()) { frag_t fg = in->pick_dirfrag(dname); CDir *dir = in->get_dirfrag(fg); if (dir) { CDentry *dn = dir->lookup(dname); if (dn) { ClientLease *l = dn->get_client_lease(client); if (l) { dout(10) << "process_cap_release removing lease on " << *dn << dendl; dn->remove_client_lease(l, this); } else { dout(7) << "process_cap_release client." << client << " doesn't have lease on " << *dn << dendl; } } else { dout(7) << "process_cap_release client." << client << " released lease on dn " << dir->dirfrag() << "/" << dname << " which dne" << dendl; } } } Capability *cap = in->get_client_cap(client); if (!cap) return; dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in << (mdr ? "" : " (DEFERRED, no mdr)") << dendl; if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl; return; } if (cap->get_cap_id() != cap_id) { dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl; return; } if (should_defer_client_cap_frozen(in)) { dout(7) << " frozen, deferring" << dendl; in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item)); return; } if (caps & ~cap->issued()) { dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl; caps &= cap->issued(); } cap->confirm_receipt(seq, caps); if (!in->client_need_snapflush.empty() && (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) { _do_null_snapflush(in, client); } adjust_cap_wanted(cap, wanted, issue_seq); if (mdr) cap->inc_suppress(); eval(in, CEPH_CAP_LOCKS); if (mdr) cap->dec_suppress(); // take note; we may need to reissue on this cap later if (mdr) mdr->cap_releases[in->vino()] = cap->get_last_seq(); } class C_Locker_RetryKickIssueCaps : public LockerContext { CInode *in; client_t client; ceph_seq_t seq; public: C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) : LockerContext(l), in(i), client(c), seq(s) { in->get(CInode::PIN_PTRWAITER); } void finish(int r) override { locker->kick_issue_caps(in, client, seq); in->put(CInode::PIN_PTRWAITER); } }; void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq) { Capability *cap = in->get_client_cap(client); if (!cap || cap->get_last_sent() != seq) return; if (in->is_frozen()) { dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl; in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryKickIssueCaps(this, in, client, seq)); return; } dout(10) << "kick_issue_caps released at current seq " << seq << ", reissuing" << dendl; issue_caps(in, cap); } void Locker::kick_cap_releases(MDRequestRef& mdr) { client_t client = mdr->get_client(); for (map::iterator p = mdr->cap_releases.begin(); p != mdr->cap_releases.end(); ++p) { CInode *in = mdcache->get_inode(p->first); if (!in) continue; kick_issue_caps(in, client, p->second); } } /** * m and ack might be NULL, so don't dereference them unless dirty != 0 */ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const MClientCaps::const_ref &m, const MClientCaps::ref &ack) { dout(10) << "_do_snap_update dirty " << ccap_string(dirty) << " follows " << follows << " snap " << snap << " on " << *in << dendl; if (snap == CEPH_NOSNAP) { // hmm, i guess snap was already deleted? just ack! dout(10) << " wow, the snap following " << follows << " was already deleted. nothing to record, just ack." << dendl; if (ack) mds->send_message_client_counted(ack, m->get_connection()); return; } EUpdate *le = new EUpdate(mds->mdlog, "snap flush"); mds->mdlog->start_entry(le); MutationRef mut = new MutationImpl(); mut->ls = mds->mdlog->get_current_segment(); // normal metadata updates that we can apply to the head as well. // update xattrs? CInode::mempool_xattr_map *px = nullptr; bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) && m->xattrbl.length() && m->head.xattr_version > in->get_projected_inode()->xattr_version; CInode::mempool_old_inode *oi = 0; if (in->is_multiversion()) { oi = in->pick_old_inode(snap); } CInode::mempool_inode *i; if (oi) { dout(10) << " writing into old inode" << dendl; auto &pi = in->project_inode(); pi.inode.version = in->pre_dirty(); if (snap > oi->first) in->split_old_inode(snap); i = &oi->inode; if (xattrs) px = &oi->xattrs; } else { auto &pi = in->project_inode(xattrs); pi.inode.version = in->pre_dirty(); i = &pi.inode; if (xattrs) px = pi.xattrs.get(); } _update_cap_fields(in, dirty, m, i); // xattr if (xattrs) { dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version << " len " << m->xattrbl.length() << dendl; i->xattr_version = m->head.xattr_version; auto p = m->xattrbl.cbegin(); decode(*px, p); } { auto it = i->client_ranges.find(client); if (it != i->client_ranges.end()) { if (in->last == snap) { dout(10) << " removing client_range entirely" << dendl; i->client_ranges.erase(it); } else { dout(10) << " client_range now follows " << snap << dendl; it->second.follows = snap; } } } mut->auth_pin(in); mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); // "oldest flush tid" > 0 means client uses unique TID for each flush if (ack && ack->get_oldest_flush_tid() > 0) le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), ack->get_oldest_flush_tid()); mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH, ack, client)); } void Locker::_update_cap_fields(CInode *in, int dirty, const MClientCaps::const_ref &m, CInode::mempool_inode *pi) { if (dirty == 0) return; /* m must be valid if there are dirty caps */ ceph_assert(m); uint64_t features = m->get_connection()->get_features(); if (m->get_ctime() > pi->ctime) { dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime() << " for " << *in << dendl; pi->ctime = m->get_ctime(); if (m->get_ctime() > pi->rstat.rctime) pi->rstat.rctime = m->get_ctime(); } if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) && m->get_change_attr() > pi->change_attr) { dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr() << " for " << *in << dendl; pi->change_attr = m->get_change_attr(); } // file if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { utime_t atime = m->get_atime(); utime_t mtime = m->get_mtime(); uint64_t size = m->get_size(); version_t inline_version = m->inline_version; if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) || ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) { dout(7) << " mtime " << pi->mtime << " -> " << mtime << " for " << *in << dendl; pi->mtime = mtime; if (mtime > pi->rstat.rctime) pi->rstat.rctime = mtime; } if (in->inode.is_file() && // ONLY if regular file size > pi->size) { dout(7) << " size " << pi->size << " -> " << size << " for " << *in << dendl; pi->size = size; pi->rstat.rbytes = size; } if (in->inode.is_file() && (dirty & CEPH_CAP_FILE_WR) && inline_version > pi->inline_data.version) { pi->inline_data.version = inline_version; if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0) pi->inline_data.get_data() = m->inline_data; else pi->inline_data.free_data(); } if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) { dout(7) << " atime " << pi->atime << " -> " << atime << " for " << *in << dendl; pi->atime = atime; } if ((dirty & CEPH_CAP_FILE_EXCL) && ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) { dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq() << " for " << *in << dendl; pi->time_warp_seq = m->get_time_warp_seq(); } } // auth if (dirty & CEPH_CAP_AUTH_EXCL) { if (m->head.uid != pi->uid) { dout(7) << " uid " << pi->uid << " -> " << m->head.uid << " for " << *in << dendl; pi->uid = m->head.uid; } if (m->head.gid != pi->gid) { dout(7) << " gid " << pi->gid << " -> " << m->head.gid << " for " << *in << dendl; pi->gid = m->head.gid; } if (m->head.mode != pi->mode) { dout(7) << " mode " << oct << pi->mode << " -> " << m->head.mode << dec << " for " << *in << dendl; pi->mode = m->head.mode; } if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) { dout(7) << " btime " << oct << pi->btime << " -> " << m->get_btime() << dec << " for " << *in << dendl; pi->btime = m->get_btime(); } } } /* * update inode based on cap flush|flushsnap|wanted. * adjust max_size, if needed. * if we update, return true; otherwise, false (no updated needed). */ bool Locker::_do_cap_update(CInode *in, Capability *cap, int dirty, snapid_t follows, const MClientCaps::const_ref &m, const MClientCaps::ref &ack, bool *need_flush) { dout(10) << "_do_cap_update dirty " << ccap_string(dirty) << " issued " << ccap_string(cap ? cap->issued() : 0) << " wanted " << ccap_string(cap ? cap->wanted() : 0) << " on " << *in << dendl; ceph_assert(in->is_auth()); client_t client = m->get_source().num(); CInode::mempool_inode *latest = in->get_projected_inode(); // increase or zero max_size? uint64_t size = m->get_size(); bool change_max = false; uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0; uint64_t new_max = old_max; if (in->is_file()) { bool forced_change_max = false; dout(20) << "inode is file" << dendl; if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) { dout(20) << "client has write caps; m->get_max_size=" << m->get_max_size() << "; old_max=" << old_max << dendl; if (m->get_max_size() > new_max) { dout(10) << "client requests file_max " << m->get_max_size() << " > max " << old_max << dendl; change_max = true; forced_change_max = true; new_max = calc_new_max_size(latest, m->get_max_size()); } else { new_max = calc_new_max_size(latest, size); if (new_max > old_max) change_max = true; else new_max = old_max; } } else { if (old_max) { change_max = true; new_max = 0; } } if (in->last == CEPH_NOSNAP && change_max && !in->filelock.can_wrlock(client) && !in->filelock.can_force_wrlock(client)) { dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl; if (in->filelock.is_stable()) { bool need_issue = false; if (cap) cap->inc_suppress(); if (in->get_mds_caps_wanted().empty() && (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) { if (in->filelock.get_state() != LOCK_EXCL) file_excl(&in->filelock, &need_issue); } else simple_lock(&in->filelock, &need_issue); if (need_issue) issue_caps(in); if (cap) cap->dec_suppress(); } if (!in->filelock.can_wrlock(client) && !in->filelock.can_force_wrlock(client)) { C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, forced_change_max ? new_max : 0, 0, utime_t()); in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); change_max = false; } } } if (m->flockbl.length()) { int32_t num_locks; auto bli = m->flockbl.cbegin(); decode(num_locks, bli); for ( int i=0; i < num_locks; ++i) { ceph_filelock decoded_lock; decode(decoded_lock, bli); in->get_fcntl_lock_state()->held_locks. insert(pair(decoded_lock.start, decoded_lock)); ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; } decode(num_locks, bli); for ( int i=0; i < num_locks; ++i) { ceph_filelock decoded_lock; decode(decoded_lock, bli); in->get_flock_lock_state()->held_locks. insert(pair(decoded_lock.start, decoded_lock)); ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; } } if (!dirty && !change_max) return false; Session *session = mds->get_session(m); if (session->check_access(in, MAY_WRITE, m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) { dout(10) << "check_access failed, dropping cap update on " << *in << dendl; return false; } // do the update. EUpdate *le = new EUpdate(mds->mdlog, "cap update"); mds->mdlog->start_entry(le); bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) && m->xattrbl.length() && m->head.xattr_version > in->get_projected_inode()->xattr_version; auto &pi = in->project_inode(xattr); pi.inode.version = in->pre_dirty(); MutationRef mut(new MutationImpl()); mut->ls = mds->mdlog->get_current_segment(); _update_cap_fields(in, dirty, m, &pi.inode); if (change_max) { dout(7) << " max_size " << old_max << " -> " << new_max << " for " << *in << dendl; if (new_max) { auto &cr = pi.inode.client_ranges[client]; cr.range.first = 0; cr.range.last = new_max; cr.follows = in->first - 1; } else pi.inode.client_ranges.erase(client); } if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) wrlock_force(&in->filelock, mut); // wrlock for duration of journal // auth if (dirty & CEPH_CAP_AUTH_EXCL) wrlock_force(&in->authlock, mut); // xattrs update? if (xattr) { dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl; pi.inode.xattr_version = m->head.xattr_version; auto p = m->xattrbl.cbegin(); decode(*pi.xattrs, p); wrlock_force(&in->xattrlock, mut); } mut->auth_pin(in); mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); // "oldest flush tid" > 0 means client uses unique TID for each flush if (ack && ack->get_oldest_flush_tid() > 0) le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), ack->get_oldest_flush_tid()); unsigned update_flags = 0; if (change_max) update_flags |= UPDATE_SHAREMAX; if (cap) update_flags |= UPDATE_NEEDSISSUE; mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags, ack, client)); if (need_flush && !*need_flush && ((change_max && new_max) || // max INCREASE _need_flush_mdlog(in, dirty))) *need_flush = true; return true; } void Locker::handle_client_cap_release(const MClientCapRelease::const_ref &m) { client_t client = m->get_source().num(); dout(10) << "handle_client_cap_release " << *m << dendl; if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) { mds->wait_for_replay(new C_MDS_RetryMessage(mds, m)); return; } if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) { // Pause RADOS operations until we see the required epoch mds->objecter->set_epoch_barrier(m->osd_epoch_barrier); } if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) { // Record the barrier so that we will retransmit it to clients mds->set_osd_epoch_barrier(m->osd_epoch_barrier); } Session *session = mds->get_session(m); for (const auto &cap : m->caps) { _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq); } if (session) { session->notify_cap_release(m->caps.size()); } } class C_Locker_RetryCapRelease : public LockerContext { client_t client; inodeno_t ino; uint64_t cap_id; ceph_seq_t migrate_seq; ceph_seq_t issue_seq; public: C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id, ceph_seq_t mseq, ceph_seq_t seq) : LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {} void finish(int r) override { locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq); } }; void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, ceph_seq_t mseq, ceph_seq_t seq) { CInode *in = mdcache->get_inode(ino); if (!in) { dout(7) << "_do_cap_release missing ino " << ino << dendl; return; } Capability *cap = in->get_client_cap(client); if (!cap) { dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl; return; } dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl; if (cap->get_cap_id() != cap_id) { dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl; return; } if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl; return; } if (should_defer_client_cap_frozen(in)) { dout(7) << " freezing|frozen, deferring" << dendl; in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq)); return; } if (seq != cap->get_last_issue()) { dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl; // clean out any old revoke history cap->clean_revoke_from(seq); eval_cap_gather(in); return; } remove_client_cap(in, client); } void Locker::remove_client_cap(CInode *in, client_t client) { // clean out any pending snapflush state if (!in->client_need_snapflush.empty()) _do_null_snapflush(in, client); in->remove_client_cap(client); if (in->is_auth()) { // make sure we clear out the client byte range if (in->get_projected_inode()->client_ranges.count(client) && !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray check_inode_max_size(in); } else { request_inode_file_caps(in); } try_eval(in, CEPH_CAP_LOCKS); } /** * Return true if any currently revoking caps exceed the * session_timeout threshold. */ bool Locker::any_late_revoking_caps(xlist const &revoking, double timeout) const { xlist::const_iterator p = revoking.begin(); if (p.end()) { // No revoking caps at the moment return false; } else { utime_t now = ceph_clock_now(); utime_t age = now - (*p)->get_last_revoke_stamp(); if (age <= timeout) { return false; } else { return true; } } } void Locker::get_late_revoking_clients(std::list *result, double timeout) const { if (!any_late_revoking_caps(revoking_caps, timeout)) { // Fast path: no misbehaving clients, execute in O(1) return; } // Slow path: execute in O(N_clients) for (auto &p : revoking_caps_by_client) { if (any_late_revoking_caps(p.second, timeout)) { result->push_back(p.first); } } } // Hard-code instead of surfacing a config settings because this is // really a hack that should go away at some point when we have better // inspection tools for getting at detailed cap state (#7316) #define MAX_WARN_CAPS 100 void Locker::caps_tick() { utime_t now = ceph_clock_now(); if (!need_snapflush_inodes.empty()) { // snap inodes that needs flush are auth pinned, they affect // subtree/difrarg freeze. utime_t cutoff = now; cutoff -= g_conf()->mds_freeze_tree_timeout / 3; CInode *last = need_snapflush_inodes.back(); while (!need_snapflush_inodes.empty()) { CInode *in = need_snapflush_inodes.front(); if (in->last_dirstat_prop >= cutoff) break; in->item_caps.remove_myself(); snapflush_nudge(in); if (in == last) break; } } dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl; now = ceph_clock_now(); int n = 0; for (xlist::iterator p = revoking_caps.begin(); !p.end(); ++p) { Capability *cap = *p; utime_t age = now - cap->get_last_revoke_stamp(); dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl; if (age <= mds->mdsmap->get_session_timeout()) { dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl; break; } else { ++n; if (n > MAX_WARN_CAPS) { dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late" << "revoking, ignoring subsequent caps" << dendl; break; } } // exponential backoff of warning intervals if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) { cap->inc_num_revoke_warnings(); stringstream ss; ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino " << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending()) << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago"; mds->clog->warn() << ss.str(); dout(20) << __func__ << " " << ss.str() << dendl; } else { dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl; } } } void Locker::handle_client_lease(const MClientLease::const_ref &m) { dout(10) << "handle_client_lease " << *m << dendl; ceph_assert(m->get_source().is_client()); client_t client = m->get_source().num(); CInode *in = mdcache->get_inode(m->get_ino(), m->get_last()); if (!in) { dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl; return; } CDentry *dn = 0; frag_t fg = in->pick_dirfrag(m->dname); CDir *dir = in->get_dirfrag(fg); if (dir) dn = dir->lookup(m->dname); if (!dn) { dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl; return; } dout(10) << " on " << *dn << dendl; // replica and lock ClientLease *l = dn->get_client_lease(client); if (!l) { dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl; return; } switch (m->get_action()) { case CEPH_MDS_LEASE_REVOKE_ACK: case CEPH_MDS_LEASE_RELEASE: if (l->seq != m->get_seq()) { dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl; } else { dout(7) << "handle_client_lease client." << client << " on " << *dn << dendl; dn->remove_client_lease(l, this); } break; case CEPH_MDS_LEASE_RENEW: { dout(7) << "handle_client_lease client." << client << " renew on " << *dn << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl; if (dn->lock.can_lease(client)) { auto reply = MClientLease::create(*m); int pool = 1; // fixme.. do something smart! reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]); reply->h.seq = ++l->seq; reply->clear_payload(); utime_t now = ceph_clock_now(); now += mdcache->client_lease_durations[pool]; mdcache->touch_client_lease(l, pool, now); mds->send_message_client_counted(reply, m->get_connection()); } } break; default: ceph_abort(); // implement me break; } } void Locker::issue_client_lease(CDentry *dn, client_t client, bufferlist &bl, utime_t now, Session *session) { CInode *diri = dn->get_dir()->get_inode(); if (!diri->is_stray() && // do not issue dn leases in stray dir! ((!diri->filelock.can_lease(client) && (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) && dn->lock.can_lease(client)) { int pool = 1; // fixme.. do something smart! // issue a dentry lease ClientLease *l = dn->add_client_lease(client, session); session->touch_lease(l); now += mdcache->client_lease_durations[pool]; mdcache->touch_client_lease(l, pool, now); LeaseStat lstat; lstat.mask = 1 | CEPH_LOCK_DN; // old and new bit values lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]); lstat.seq = ++l->seq; encode_lease(bl, session->info, lstat); dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms " << " on " << *dn << dendl; } else { // null lease LeaseStat lstat; encode_lease(bl, session->info, lstat); dout(20) << "issue_client_lease no/null lease on " << *dn << dendl; } } void Locker::revoke_client_leases(SimpleLock *lock) { int n = 0; CDentry *dn = static_cast(lock->get_parent()); for (map::iterator p = dn->client_lease_map.begin(); p != dn->client_lease_map.end(); ++p) { ClientLease *l = p->second; n++; ceph_assert(lock->get_type() == CEPH_LOCK_DN); CDentry *dn = static_cast(lock->get_parent()); int mask = 1 | CEPH_LOCK_DN; // old and new bits // i should also revoke the dir ICONTENT lease, if they have it! CInode *diri = dn->get_dir()->get_inode(); auto lease = MClientLease::create(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name()); mds->send_message_client_counted(lease, l->client); } } void Locker::encode_lease(bufferlist& bl, const session_info_t& info, const LeaseStat& ls) { if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) { ENCODE_START(1, 1, bl); encode(ls.mask, bl); encode(ls.duration_ms, bl); encode(ls.seq, bl); ENCODE_FINISH(bl); } else { encode(ls.mask, bl); encode(ls.duration_ms, bl); encode(ls.seq, bl); } } // locks ---------------------------------------------------------------- SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info) { switch (lock_type) { case CEPH_LOCK_DN: { // be careful; info.dirfrag may have incorrect frag; recalculate based on dname. CInode *diri = mdcache->get_inode(info.dirfrag.ino); frag_t fg; CDir *dir = 0; CDentry *dn = 0; if (diri) { fg = diri->pick_dirfrag(info.dname); dir = diri->get_dirfrag(fg); if (dir) dn = dir->lookup(info.dname, info.snapid); } if (!dn) { dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl; return 0; } return &dn->lock; } case CEPH_LOCK_IAUTH: case CEPH_LOCK_ILINK: case CEPH_LOCK_IDFT: case CEPH_LOCK_IFILE: case CEPH_LOCK_INEST: case CEPH_LOCK_IXATTR: case CEPH_LOCK_ISNAP: case CEPH_LOCK_IFLOCK: case CEPH_LOCK_IPOLICY: { CInode *in = mdcache->get_inode(info.ino, info.snapid); if (!in) { dout(7) << "get_lock don't have ino " << info.ino << dendl; return 0; } switch (lock_type) { case CEPH_LOCK_IAUTH: return &in->authlock; case CEPH_LOCK_ILINK: return &in->linklock; case CEPH_LOCK_IDFT: return &in->dirfragtreelock; case CEPH_LOCK_IFILE: return &in->filelock; case CEPH_LOCK_INEST: return &in->nestlock; case CEPH_LOCK_IXATTR: return &in->xattrlock; case CEPH_LOCK_ISNAP: return &in->snaplock; case CEPH_LOCK_IFLOCK: return &in->flocklock; case CEPH_LOCK_IPOLICY: return &in->policylock; } } default: dout(7) << "get_lock don't know lock_type " << lock_type << dendl; ceph_abort(); break; } return 0; } void Locker::handle_lock(const MLock::const_ref &m) { // nobody should be talking to us during recovery. ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping()); SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info()); if (!lock) { dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl; return; } switch (lock->get_type()) { case CEPH_LOCK_DN: case CEPH_LOCK_IAUTH: case CEPH_LOCK_ILINK: case CEPH_LOCK_ISNAP: case CEPH_LOCK_IXATTR: case CEPH_LOCK_IFLOCK: case CEPH_LOCK_IPOLICY: handle_simple_lock(lock, m); break; case CEPH_LOCK_IDFT: case CEPH_LOCK_INEST: //handle_scatter_lock((ScatterLock*)lock, m); //break; case CEPH_LOCK_IFILE: handle_file_lock(static_cast(lock), m); break; default: dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl; ceph_abort(); break; } } // ========================================================================== // simple lock /** This function may take a reference to m if it needs one, but does * not put references. */ void Locker::handle_reqrdlock(SimpleLock *lock, const MLock::const_ref &m) { MDSCacheObject *parent = lock->get_parent(); if (parent->is_auth() && lock->get_state() != LOCK_SYNC && !parent->is_frozen()) { dout(7) << "handle_reqrdlock got rdlock request on " << *lock << " on " << *parent << dendl; ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this! if (lock->is_stable()) { simple_sync(lock); } else { dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl; lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m)); } } else { dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock << " on " << *parent << dendl; // replica should retry } } void Locker::handle_simple_lock(SimpleLock *lock, const MLock::const_ref &m) { int from = m->get_asker(); dout(10) << "handle_simple_lock " << *m << " on " << *lock << " " << *lock->get_parent() << dendl; if (mds->is_rejoin()) { if (lock->get_parent()->is_rejoining()) { dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent() << ", dropping " << *m << dendl; return; } } switch (m->get_action()) { // -- replica -- case LOCK_AC_SYNC: ceph_assert(lock->get_state() == LOCK_LOCK); lock->decode_locked_state(m->get_data()); lock->set_state(LOCK_SYNC); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); break; case LOCK_AC_LOCK: ceph_assert(lock->get_state() == LOCK_SYNC); lock->set_state(LOCK_SYNC_LOCK); if (lock->is_leased()) revoke_client_leases(lock); eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; // -- auth -- case LOCK_AC_LOCKACK: ceph_assert(lock->get_state() == LOCK_SYNC_LOCK || lock->get_state() == LOCK_SYNC_EXCL); ceph_assert(lock->is_gathering(from)); lock->remove_gather(from); if (lock->is_gathering()) { dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from << ", last one" << dendl; eval_gather(lock); } break; case LOCK_AC_REQRDLOCK: handle_reqrdlock(lock, m); break; } } /* unused, currently. class C_Locker_SimpleEval : public Context { Locker *locker; SimpleLock *lock; public: C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {} void finish(int r) { locker->try_simple_eval(lock); } }; void Locker::try_simple_eval(SimpleLock *lock) { // unstable and ambiguous auth? if (!lock->is_stable() && lock->get_parent()->is_ambiguous_auth()) { dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl; //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock)); return; } if (!lock->get_parent()->is_auth()) { dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl; return; } if (!lock->get_parent()->can_auth_pin()) { dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl; //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock)); return; } if (lock->is_stable()) simple_eval(lock); } */ void Locker::simple_eval(SimpleLock *lock, bool *need_issue) { dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->is_stable()); if (lock->get_parent()->is_freezing_or_frozen()) { // dentry/snap lock in unreadable state can block path traverse if ((lock->get_type() != CEPH_LOCK_DN && lock->get_type() != CEPH_LOCK_ISNAP) || lock->get_state() == LOCK_SYNC || lock->get_parent()->is_frozen()) return; } if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } return; } CInode *in = 0; int wanted = 0; if (lock->get_cap_shift()) { in = static_cast(lock->get_parent()); in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift()); } // -> excl? if (lock->get_state() != LOCK_EXCL && in && in->get_target_loner() >= 0 && (wanted & CEPH_CAP_GEXCL)) { dout(7) << "simple_eval stable, going to excl " << *lock << " on " << *lock->get_parent() << dendl; simple_excl(lock, need_issue); } // stable -> sync? else if (lock->get_state() != LOCK_SYNC && !lock->is_wrlocked() && ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) || (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) { dout(7) << "simple_eval stable, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } } // mid bool Locker::simple_sync(SimpleLock *lock, bool *need_issue) { dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->is_stable()); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); int old_state = lock->get_state(); if (old_state != LOCK_TSYN) { switch (lock->get_state()) { case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break; case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break; case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break; default: ceph_abort(); } int gather = 0; if (lock->is_wrlocked()) gather++; if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) { send_lock_message(lock, LOCK_AC_SYNC); lock->init_gather(); gather++; } if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } } bool need_recover = false; if (lock->get_type() == CEPH_LOCK_IFILE) { ceph_assert(in); if (in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } } if (!gather && lock->is_dirty()) { lock->get_parent()->auth_pin(lock); scatter_writebehind(static_cast(lock)); mds->mdlog->flush(); return false; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); return false; } } if (lock->get_parent()->is_replicated()) { // FIXME bufferlist data; lock->encode_locked_state(data); send_lock_message(lock, LOCK_AC_SYNC, data); } lock->set_state(LOCK_SYNC); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); if (in && in->is_head()) { if (need_issue) *need_issue = true; else issue_caps(in); } return true; } void Locker::simple_excl(SimpleLock *lock, bool *need_issue) { dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->is_stable()); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); switch (lock->get_state()) { case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (lock->is_wrlocked()) gather++; if (lock->get_parent()->is_replicated() && lock->get_state() != LOCK_LOCK_EXCL && lock->get_state() != LOCK_XSYN_EXCL) { send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); gather++; } if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } } if (gather) { lock->get_parent()->auth_pin(lock); } else { lock->set_state(LOCK_EXCL); lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); if (in) { if (need_issue) *need_issue = true; else issue_caps(in); } } } void Locker::simple_lock(SimpleLock *lock, bool *need_issue) { dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->is_stable()); ceph_assert(lock->get_state() != LOCK_LOCK); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); int old_state = lock->get_state(); switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break; case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break; case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); (static_cast(lock))->clear_unscatter_wanted(); break; case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break; default: ceph_abort(); } int gather = 0; if (lock->is_leased()) { gather++; revoke_client_leases(lock); } if (lock->is_rdlocked()) gather++; if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } } bool need_recover = false; if (lock->get_type() == CEPH_LOCK_IFILE) { ceph_assert(in); if(in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } } if (lock->get_parent()->is_replicated() && lock->get_state() == LOCK_MIX_LOCK && gather) { dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl; } else { // move to second stage of gather now, so we don't send the lock action later. if (lock->get_state() == LOCK_MIX_LOCK) lock->set_state(LOCK_MIX_LOCK2); if (lock->get_parent()->is_replicated() && lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK gather++; send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); } } if (!gather && lock->is_dirty()) { lock->get_parent()->auth_pin(lock); scatter_writebehind(static_cast(lock)); mds->mdlog->flush(); return; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); } else { lock->set_state(LOCK_LOCK); lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE); } } void Locker::simple_xlock(SimpleLock *lock) { dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); //assert(lock->is_stable()); ceph_assert(lock->get_state() != LOCK_XLOCK); CInode *in = 0; if (lock->get_cap_shift()) in = static_cast(lock->get_parent()); if (lock->is_stable()) lock->get_parent()->auth_pin(lock); switch (lock->get_state()) { case LOCK_LOCK: case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (lock->is_wrlocked()) gather++; if (in && in->is_head()) { if (in->issued_caps_need_gather(lock)) { issue_caps(in); gather++; } } if (!gather) { lock->set_state(LOCK_PREXLOCK); //assert("shouldn't be called if we are already xlockable" == 0); } } // ========================================================================== // scatter lock /* Some notes on scatterlocks. - The scatter/gather is driven by the inode lock. The scatter always brings in the latest metadata from the fragments. - When in a scattered/MIX state, fragments are only allowed to update/be written to if the accounted stat matches the inode's current version. - That means, on gather, we _only_ assimilate diffs for frag metadata that match the current version, because those are the only ones written during this scatter/gather cycle. (Others didn't permit it.) We increment the version and journal this to disk. - When possible, we also simultaneously update our local frag accounted stats to match. - On scatter, the new inode info is broadcast to frags, both local and remote. If possible (auth and !frozen), the dirfrag auth should update the accounted state (if it isn't already up to date). Note that this may occur on both the local inode auth node and inode replicas, so there are two potential paths. If it is NOT possible, they need to mark_stale to prevent any possible writes. - A scatter can be to MIX (potentially writeable) or to SYNC (read only). Both are opportunities to update the frag accounted stats, even though only the MIX case is affected by a stale dirfrag. - Because many scatter/gather cycles can potentially go by without a frag being able to update its accounted stats (due to being frozen by exports/refragments in progress), the frag may have (even very) old stat versions. That's fine. If when we do want to update it, we can update accounted_* and the version first. */ class C_Locker_ScatterWB : public LockerLogContext { ScatterLock *lock; MutationRef mut; public: C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) : LockerLogContext(l), lock(sl), mut(m) {} void finish(int r) override { locker->scatter_writebehind_finish(lock, mut); } }; void Locker::scatter_writebehind(ScatterLock *lock) { CInode *in = static_cast(lock->get_parent()); dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl; // journal MutationRef mut(new MutationImpl()); mut->ls = mds->mdlog->get_current_segment(); // forcefully take a wrlock lock->get_wrlock(true); mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK); in->pre_cow_old_inode(); // avoid cow mayhem auto &pi = in->project_inode(); pi.inode.version = in->pre_dirty(); in->finish_scatter_gather_update(lock->get_type()); lock->start_flush(); EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind"); mds->mdlog->start_entry(le); mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY); mdcache->journal_dirty_inode(mut.get(), &le->metablob, in); in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob); mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut)); } void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut) { CInode *in = static_cast(lock->get_parent()); dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl; in->pop_and_dirty_projected_inode(mut->ls); lock->finish_flush(); // if replicas may have flushed in a mix->lock state, send another // message so they can finish_flush(). if (in->is_replicated()) { switch (lock->get_state()) { case LOCK_MIX_LOCK: case LOCK_MIX_LOCK2: case LOCK_MIX_EXCL: case LOCK_MIX_TSYN: send_lock_message(lock, LOCK_AC_LOCKFLUSHED); } } mut->apply(); drop_locks(mut.get()); mut->cleanup(); if (lock->is_stable()) lock->finish_waiters(ScatterLock::WAIT_STABLE); //scatter_eval_gather(lock); } void Locker::scatter_eval(ScatterLock *lock, bool *need_issue) { dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->is_stable()); if (lock->get_parent()->is_freezing_or_frozen()) { dout(20) << " freezing|frozen" << dendl; return; } if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } return; } if (!lock->is_rdlocked() && lock->get_state() != LOCK_MIX && lock->get_scatter_wanted()) { dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock << " on " << *lock->get_parent() << dendl; scatter_mix(lock, need_issue); return; } if (lock->get_type() == CEPH_LOCK_INEST) { // in general, we want to keep INEST writable at all times. if (!lock->is_rdlocked()) { if (lock->get_parent()->is_replicated()) { if (lock->get_state() != LOCK_MIX) scatter_mix(lock, need_issue); } else { if (lock->get_state() != LOCK_LOCK) simple_lock(lock, need_issue); } } return; } CInode *in = static_cast(lock->get_parent()); if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) { // i _should_ be sync. if (!lock->is_wrlocked() && lock->get_state() != LOCK_SYNC) { dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl; simple_sync(lock, need_issue); } } } /* * mark a scatterlock to indicate that the dir fnode has some dirty data */ void Locker::mark_updated_scatterlock(ScatterLock *lock) { lock->mark_dirty(); if (lock->get_updated_item()->is_on_list()) { dout(10) << "mark_updated_scatterlock " << *lock << " - already on list since " << lock->get_update_stamp() << dendl; } else { updated_scatterlocks.push_back(lock->get_updated_item()); utime_t now = ceph_clock_now(); lock->set_update_stamp(now); dout(10) << "mark_updated_scatterlock " << *lock << " - added at " << now << dendl; } } /* * this is called by scatter_tick and LogSegment::try_to_trim() when * trying to flush dirty scattered data (i.e. updated fnode) back to * the inode. * * we need to lock|scatter in order to push fnode changes into the * inode.dirstat. */ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange) { CInode *p = static_cast(lock->get_parent()); if (p->is_frozen() || p->is_freezing()) { dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl; if (c) p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c); else if (lock->is_dirty()) // just requeue. not ideal.. starvation prone.. updated_scatterlocks.push_back(lock->get_updated_item()); return; } if (p->is_ambiguous_auth()) { dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl; if (c) p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c); else if (lock->is_dirty()) // just requeue. not ideal.. starvation prone.. updated_scatterlocks.push_back(lock->get_updated_item()); return; } if (p->is_auth()) { int count = 0; while (true) { if (lock->is_stable()) { // can we do it now? // (only if we're not replicated.. if we are, we really do need // to nudge the lock state!) /* actually, even if we're not replicated, we can't stay in MIX, because another mds could discover and replicate us at any time. if that happens while we're flushing, they end up in MIX but their inode has the old scatterstat version. if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) { dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl; scatter_writebehind(lock); if (c) lock->add_waiter(SimpleLock::WAIT_STABLE, c); return; } */ if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl; simple_sync(static_cast(lock)); } break; } // adjust lock state dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl; switch (lock->get_type()) { case CEPH_LOCK_IFILE: if (p->is_replicated() && lock->get_state() != LOCK_MIX) scatter_mix(static_cast(lock)); else if (lock->get_state() != LOCK_LOCK) simple_lock(static_cast(lock)); else simple_sync(static_cast(lock)); break; case CEPH_LOCK_IDFT: case CEPH_LOCK_INEST: if (p->is_replicated() && lock->get_state() != LOCK_MIX) scatter_mix(lock); else if (lock->get_state() != LOCK_LOCK) simple_lock(lock); else simple_sync(lock); break; default: ceph_abort(); } ++count; if (lock->is_stable() && count == 2) { dout(10) << "scatter_nudge oh, stable after two cycles." << dendl; // this should only realy happen when called via // handle_file_lock due to AC_NUDGE, because the rest of the // time we are replicated or have dirty data and won't get // called. bailing here avoids an infinite loop. ceph_assert(!c); break; } } else { dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl; if (c) lock->add_waiter(SimpleLock::WAIT_STABLE, c); return; } } } else { dout(10) << "scatter_nudge replica, requesting scatter/unscatter of " << *lock << " on " << *p << dendl; // request unscatter? mds_rank_t auth = lock->get_parent()->authority().first; if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) { mds->send_message_mds(MLock::create(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth); } // wait... if (c) lock->add_waiter(SimpleLock::WAIT_STABLE, c); // also, requeue, in case we had wrong auth or something if (lock->is_dirty()) updated_scatterlocks.push_back(lock->get_updated_item()); } } void Locker::scatter_tick() { dout(10) << "scatter_tick" << dendl; // updated utime_t now = ceph_clock_now(); int n = updated_scatterlocks.size(); while (!updated_scatterlocks.empty()) { ScatterLock *lock = updated_scatterlocks.front(); if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping if (!lock->is_dirty()) { updated_scatterlocks.pop_front(); dout(10) << " removing from updated_scatterlocks " << *lock << " " << *lock->get_parent() << dendl; continue; } if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval) break; updated_scatterlocks.pop_front(); scatter_nudge(lock, 0); } mds->mdlog->flush(); } void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue) { dout(10) << "scatter_tempsync " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->is_stable()); ceph_abort_msg("not fully implemented, at least not for filelock"); CInode *in = static_cast(lock->get_parent()); switch (lock->get_state()) { case LOCK_SYNC: ceph_abort(); // this shouldn't happen case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break; case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break; default: ceph_abort(); } int gather = 0; if (lock->is_wrlocked()) gather++; if (lock->get_cap_shift() && in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } if (lock->get_state() == LOCK_MIX_TSYN && in->is_replicated()) { lock->init_gather(); send_lock_message(lock, LOCK_AC_LOCK); gather++; } if (gather) { in->auth_pin(lock); } else { // do tempsync lock->set_state(LOCK_TSYN); lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE); if (lock->get_cap_shift()) { if (need_issue) *need_issue = true; else issue_caps(in); } } } // ========================================================================== // local lock void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut) { dout(7) << "local_wrlock_grab on " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->can_wrlock()); lock->get_wrlock(mut->get_client()); auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK); ceph_assert(ret.second); } bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut) { dout(7) << "local_wrlock_start on " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); if (lock->can_wrlock()) { lock->get_wrlock(mut->get_client()); auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK); ceph_assert(it->is_wrlock()); return true; } else { lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); return false; } } void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut) { ceph_assert(it->is_wrlock()); LocalLock *lock = static_cast(it->lock); dout(7) << "local_wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; lock->put_wrlock(); mut->locks.erase(it); if (lock->get_num_wrlocks() == 0) { lock->finish_waiters(SimpleLock::WAIT_STABLE | SimpleLock::WAIT_WR | SimpleLock::WAIT_RD); } } bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut) { dout(7) << "local_xlock_start on " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); if (!lock->can_xlock_local()) { lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); return false; } lock->get_xlock(mut, mut->get_client()); mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK); return true; } void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut) { ceph_assert(it->is_xlock()); LocalLock *lock = static_cast(it->lock); dout(7) << "local_xlock_finish on " << *lock << " on " << *lock->get_parent() << dendl; lock->put_xlock(); mut->locks.erase(it); lock->finish_waiters(SimpleLock::WAIT_STABLE | SimpleLock::WAIT_WR | SimpleLock::WAIT_RD); } // ========================================================================== // file lock void Locker::file_eval(ScatterLock *lock, bool *need_issue) { CInode *in = static_cast(lock->get_parent()); int loner_wanted, other_wanted; int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE); dout(7) << "file_eval wanted=" << gcap_string(wanted) << " loner_wanted=" << gcap_string(loner_wanted) << " other_wanted=" << gcap_string(other_wanted) << " filelock=" << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(lock->get_parent()->is_auth()); ceph_assert(lock->is_stable()); if (lock->get_parent()->is_freezing_or_frozen()) return; if (mdcache->is_readonly()) { if (lock->get_state() != LOCK_SYNC) { dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } return; } // excl -> *? if (lock->get_state() == LOCK_EXCL) { dout(20) << " is excl" << dendl; int loner_issued, other_issued, xlocker_issued; in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE); dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued) << " other_issued=" << gcap_string(other_issued) << " xlocker_issued=" << gcap_string(xlocker_issued) << dendl; if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) || (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) || (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/ dout(20) << " should lose it" << dendl; // we should lose it. // loner other want // R R SYNC // R R|W MIX // R W MIX // R|W R MIX // R|W R|W MIX // R|W W MIX // W R MIX // W R|W MIX // W W MIX // -> any writer means MIX; RD doesn't matter. if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) || lock->is_waiter_for(SimpleLock::WAIT_WR)) scatter_mix(lock, need_issue); else if (!lock->is_wrlocked()) // let excl wrlocks drain first simple_sync(lock, need_issue); else dout(10) << " waiting for wrlock to drain" << dendl; } } // * -> excl? else if (lock->get_state() != LOCK_EXCL && !lock->is_rdlocked() && //!lock->is_waiter_for(SimpleLock::WAIT_WR) && ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) || (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) && in->get_target_loner() >= 0) { dout(7) << "file_eval stable, bump to loner " << *lock << " on " << *lock->get_parent() << dendl; file_excl(lock, need_issue); } // * -> mixed? else if (lock->get_state() != LOCK_MIX && !lock->is_rdlocked() && //!lock->is_waiter_for(SimpleLock::WAIT_WR) && (lock->get_scatter_wanted() || (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) { dout(7) << "file_eval stable, bump to mixed " << *lock << " on " << *lock->get_parent() << dendl; scatter_mix(lock, need_issue); } // * -> sync? else if (lock->get_state() != LOCK_SYNC && !lock->is_wrlocked() && // drain wrlocks first! !lock->is_waiter_for(SimpleLock::WAIT_WR) && !(wanted & CEPH_CAP_GWR) && !((lock->get_state() == LOCK_MIX) && in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are //((wanted & CEPH_CAP_RD) || //in->is_replicated() || //lock->is_leased() || //(!loner && lock->get_state() == LOCK_EXCL)) && ) { dout(7) << "file_eval stable, bump to sync " << *lock << " on " << *lock->get_parent() << dendl; simple_sync(lock, need_issue); } } void Locker::scatter_mix(ScatterLock *lock, bool *need_issue) { dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl; CInode *in = static_cast(lock->get_parent()); ceph_assert(in->is_auth()); ceph_assert(lock->is_stable()); if (lock->get_state() == LOCK_LOCK) { in->start_scatter(lock); if (in->is_replicated()) { // data bufferlist softdata; lock->encode_locked_state(softdata); // bcast to replicas send_lock_message(lock, LOCK_AC_MIX, softdata); } // change lock lock->set_state(LOCK_MIX); lock->clear_scatter_wanted(); if (lock->get_cap_shift()) { if (need_issue) *need_issue = true; else issue_caps(in); } } else { // gather? switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break; case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break; case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (in->is_replicated()) { if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK send_lock_message(lock, LOCK_AC_MIX); lock->init_gather(); gather++; } } if (lock->is_leased()) { revoke_client_leases(lock); gather++; } if (lock->get_cap_shift() && in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } bool need_recover = false; if (in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); } else { in->start_scatter(lock); lock->set_state(LOCK_MIX); lock->clear_scatter_wanted(); if (in->is_replicated()) { bufferlist softdata; lock->encode_locked_state(softdata); send_lock_message(lock, LOCK_AC_MIX, softdata); } if (lock->get_cap_shift()) { if (need_issue) *need_issue = true; else issue_caps(in); } } } } void Locker::file_excl(ScatterLock *lock, bool *need_issue) { CInode *in = static_cast(lock->get_parent()); dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl; ceph_assert(in->is_auth()); ceph_assert(lock->is_stable()); ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) || (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break; case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break; case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break; case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break; default: ceph_abort(); } int gather = 0; if (lock->is_rdlocked()) gather++; if (lock->is_wrlocked()) gather++; if (in->is_replicated() && lock->get_state() != LOCK_LOCK_EXCL && lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock. send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); gather++; } if (lock->is_leased()) { revoke_client_leases(lock); gather++; } if (in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } bool need_recover = false; if (in->state_test(CInode::STATE_NEEDSRECOVER)) { mds->mdcache->queue_file_recover(in); need_recover = true; gather++; } if (gather) { lock->get_parent()->auth_pin(lock); if (need_recover) mds->mdcache->do_file_recover(); } else { lock->set_state(LOCK_EXCL); if (need_issue) *need_issue = true; else issue_caps(in); } } void Locker::file_xsyn(SimpleLock *lock, bool *need_issue) { dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl; CInode *in = static_cast(lock->get_parent()); ceph_assert(in->is_auth()); ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()); switch (lock->get_state()) { case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break; default: ceph_abort(); } int gather = 0; if (lock->is_wrlocked()) gather++; if (in->is_head() && in->issued_caps_need_gather(lock)) { if (need_issue) *need_issue = true; else issue_caps(in); gather++; } if (gather) { lock->get_parent()->auth_pin(lock); } else { lock->set_state(LOCK_XSYN); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); if (need_issue) *need_issue = true; else issue_caps(in); } } void Locker::file_recover(ScatterLock *lock) { CInode *in = static_cast(lock->get_parent()); dout(7) << "file_recover " << *lock << " on " << *in << dendl; ceph_assert(in->is_auth()); //assert(lock->is_stable()); ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover() int gather = 0; /* if (in->is_replicated() lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) { send_lock_message(lock, LOCK_AC_LOCK); lock->init_gather(); gather++; } */ if (in->is_head() && in->issued_caps_need_gather(lock)) { issue_caps(in); gather++; } lock->set_state(LOCK_SCAN); if (gather) in->state_set(CInode::STATE_NEEDSRECOVER); else mds->mdcache->queue_file_recover(in); } // messenger void Locker::handle_file_lock(ScatterLock *lock, const MLock::const_ref &m) { CInode *in = static_cast(lock->get_parent()); int from = m->get_asker(); if (mds->is_rejoin()) { if (in->is_rejoining()) { dout(7) << "handle_file_lock still rejoining " << *in << ", dropping " << *m << dendl; return; } } dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action()) << " on " << *lock << " from mds." << from << " " << *in << dendl; bool caps = lock->get_cap_shift(); switch (m->get_action()) { // -- replica -- case LOCK_AC_SYNC: ceph_assert(lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_MIX || lock->get_state() == LOCK_MIX_SYNC2); if (lock->get_state() == LOCK_MIX) { lock->set_state(LOCK_MIX_SYNC); eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; } (static_cast(lock))->finish_flush(); (static_cast(lock))->clear_flushed(); // ok lock->decode_locked_state(m->get_data()); lock->set_state(LOCK_SYNC); lock->get_rdlock(); if (caps) issue_caps(in); lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE); lock->put_rdlock(); break; case LOCK_AC_LOCK: switch (lock->get_state()) { case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break; case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break; default: ceph_abort(); } eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; case LOCK_AC_LOCKFLUSHED: (static_cast(lock))->finish_flush(); (static_cast(lock))->clear_flushed(); // wake up scatter_nudge waiters if (lock->is_stable()) lock->finish_waiters(SimpleLock::WAIT_STABLE); break; case LOCK_AC_MIX: ceph_assert(lock->get_state() == LOCK_SYNC || lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_SYNC_MIX2); if (lock->get_state() == LOCK_SYNC) { // MIXED lock->set_state(LOCK_SYNC_MIX); eval_gather(lock, true); if (lock->is_unstable_and_locked()) mds->mdlog->flush(); break; } // ok lock->set_state(LOCK_MIX); lock->decode_locked_state(m->get_data()); if (caps) issue_caps(in); lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE); break; // -- auth -- case LOCK_AC_LOCKACK: ceph_assert(lock->get_state() == LOCK_SYNC_LOCK || lock->get_state() == LOCK_MIX_LOCK || lock->get_state() == LOCK_MIX_LOCK2 || lock->get_state() == LOCK_MIX_EXCL || lock->get_state() == LOCK_SYNC_EXCL || lock->get_state() == LOCK_SYNC_MIX || lock->get_state() == LOCK_MIX_TSYN); ceph_assert(lock->is_gathering(from)); lock->remove_gather(from); if (lock->get_state() == LOCK_MIX_LOCK || lock->get_state() == LOCK_MIX_LOCK2 || lock->get_state() == LOCK_MIX_EXCL || lock->get_state() == LOCK_MIX_TSYN) { lock->decode_locked_state(m->get_data()); // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not // delay calling scatter_writebehind(). lock->clear_flushed(); } if (lock->is_gathering()) { dout(7) << "handle_file_lock " << *in << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_file_lock " << *in << " from " << from << ", last one" << dendl; eval_gather(lock); } break; case LOCK_AC_SYNCACK: ceph_assert(lock->get_state() == LOCK_MIX_SYNC); ceph_assert(lock->is_gathering(from)); lock->remove_gather(from); lock->decode_locked_state(m->get_data()); if (lock->is_gathering()) { dout(7) << "handle_file_lock " << *in << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_file_lock " << *in << " from " << from << ", last one" << dendl; eval_gather(lock); } break; case LOCK_AC_MIXACK: ceph_assert(lock->get_state() == LOCK_SYNC_MIX); ceph_assert(lock->is_gathering(from)); lock->remove_gather(from); if (lock->is_gathering()) { dout(7) << "handle_file_lock " << *in << " from " << from << ", still gathering " << lock->get_gather_set() << dendl; } else { dout(7) << "handle_file_lock " << *in << " from " << from << ", last one" << dendl; eval_gather(lock); } break; // requests.... case LOCK_AC_REQSCATTER: if (lock->is_stable()) { /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) * because the replica should be holding an auth_pin if they're * doing this (and thus, we are freezing, not frozen, and indefinite * starvation isn't an issue). */ dout(7) << "handle_file_lock got scatter request on " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter scatter_mix(lock); } else { dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock << " on " << *lock->get_parent() << dendl; lock->set_scatter_wanted(); } break; case LOCK_AC_REQUNSCATTER: if (lock->is_stable()) { /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing) * because the replica should be holding an auth_pin if they're * doing this (and thus, we are freezing, not frozen, and indefinite * starvation isn't an issue). */ dout(7) << "handle_file_lock got unscatter request on " << *lock << " on " << *lock->get_parent() << dendl; if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter simple_lock(lock); // FIXME tempsync? } else { dout(7) << "handle_file_lock ignoring unscatter request on " << *lock << " on " << *lock->get_parent() << dendl; lock->set_unscatter_wanted(); } break; case LOCK_AC_REQRDLOCK: handle_reqrdlock(lock, m); break; case LOCK_AC_NUDGE: if (!lock->get_parent()->is_auth()) { dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock << " on " << *lock->get_parent() << dendl; } else if (!lock->get_parent()->is_replicated()) { dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock << " on " << *lock->get_parent() << dendl; } else { dout(7) << "handle_file_lock trying nudge on " << *lock << " on " << *lock->get_parent() << dendl; scatter_nudge(lock, 0, true); mds->mdlog->flush(); } break; default: ceph_abort(); } }