diff options
Diffstat (limited to 'src/crimson/osd/object_context_loader.h')
-rw-r--r-- | src/crimson/osd/object_context_loader.h | 255 |
1 files changed, 231 insertions, 24 deletions
diff --git a/src/crimson/osd/object_context_loader.h b/src/crimson/osd/object_context_loader.h index 277708eca4f..49f8f1572bf 100644 --- a/src/crimson/osd/object_context_loader.h +++ b/src/crimson/osd/object_context_loader.h @@ -1,9 +1,14 @@ #pragma once #include <seastar/core/future.hh> +#include <seastar/util/defer.hh> +#include "crimson/common/coroutine.h" #include "crimson/common/errorator.h" +#include "crimson/common/log.h" #include "crimson/osd/object_context.h" +#include "crimson/osd/osd_operation.h" #include "crimson/osd/pg_backend.h" +#include "osd/object_state_fmt.h" namespace crimson::osd { class ObjectContextLoader { @@ -29,6 +34,208 @@ public: ::crimson::osd::IOInterruptCondition, load_obc_ertr>; + class Manager { + ObjectContextLoader &loader; + hobject_t target; + + Manager() = delete; + template <typename T> + Manager(ObjectContextLoader &loader, T &&t) + : loader(loader), target(std::forward<T>(t)) {} + Manager(const Manager &) = delete; + Manager &operator=(const Manager &o) = delete; + + struct options_t { + bool resolve_clone = true; + } options; + + struct state_t { + RWState::State state = RWState::RWNONE; + ObjectContextRef obc; + bool is_empty() const { return !obc; } + + void lock_excl_sync() { + bool locked = obc->lock.try_lock_for_excl(); + ceph_assert(locked); + state = RWState::RWEXCL; + } + + void demote_excl_to(RWState::State lock_type) { + assert(state == RWState::RWEXCL); + switch (lock_type) { + case RWState::RWWRITE: + obc->lock.demote_to_write(); + state = RWState::RWWRITE; + break; + case RWState::RWREAD: + obc->lock.demote_to_read(); + state = RWState::RWREAD; + break; + case RWState::RWNONE: + obc->lock.unlock_for_excl(); + state = RWState::RWNONE; + break; + case RWState::RWEXCL: + //noop + break; + default: + ceph_assert(0 == "impossible"); + } + } + + auto lock_to(RWState::State lock_type) { + assert(state == RWState::RWNONE); + switch (lock_type) { + case RWState::RWWRITE: + return interruptor::make_interruptible( + obc->lock.lock_for_write().then([this] { + state = RWState::RWWRITE; + })); + case RWState::RWREAD: + return interruptor::make_interruptible( + obc->lock.lock_for_read().then([this] { + state = RWState::RWREAD; + })); + case RWState::RWNONE: + // noop + return interruptor::now(); + case RWState::RWEXCL: + return interruptor::make_interruptible( + obc->lock.lock_for_excl().then([this] { + state = RWState::RWEXCL; + })); + default: + ceph_assert(0 == "impossible"); + return interruptor::now(); + } + } + + void release_lock() { + switch (state) { + case RWState::RWREAD: + obc->lock.unlock_for_read(); + break; + case RWState::RWWRITE: + obc->lock.unlock_for_write(); + break; + case RWState::RWEXCL: + obc->lock.unlock_for_excl(); + break; + case RWState::RWNONE: + // noop + break; + default: + ceph_assert(0 == "invalid"); + } + state = RWState::RWNONE; + } + }; + state_t head_state; + state_t target_state; + + friend ObjectContextLoader; + + void set_state_obc(state_t &s, ObjectContextRef _obc) { + s.obc = std::move(_obc); + s.obc->append_to(loader.obc_set_accessing); + } + + void release_state(state_t &s) { + LOG_PREFIX(ObjectContextLoader::release_state); + if (s.is_empty()) return; + + s.release_lock(); + SUBDEBUGDPP(osd, "releasing obc {}, {}", loader.dpp, *(s.obc), s.obc->obs); + s.obc->remove_from(loader.obc_set_accessing); + s = state_t(); + } + public: + Manager(Manager &&rhs) : loader(rhs.loader) { + std::swap(target, rhs.target); + std::swap(options, rhs.options); + std::swap(head_state, rhs.head_state); + std::swap(target_state, rhs.target_state); + } + + Manager &operator=(Manager &&o) { + this->~Manager(); + new(this) Manager(std::move(o)); + return *this; + } + + ObjectContextRef &get_obc() { + ceph_assert(!target_state.is_empty()); + ceph_assert(target_state.obc->is_loaded()); + return target_state.obc; + } + + ObjectContextRef &get_head_obc() { + ceph_assert(!head_state.is_empty()); + ceph_assert(head_state.obc->is_loaded()); + return head_state.obc; + } + + void release() { + release_state(head_state); + release_state(target_state); + } + + auto get_releaser() { + return seastar::defer([this] { + release(); + }); + } + + ~Manager() { + release(); + } + }; + + class Orderer { + friend ObjectContextLoader; + ObjectContextRef orderer_obc; + public: + CommonOBCPipeline &obc_pp() { + ceph_assert(orderer_obc); + return orderer_obc->obc_pipeline; + } + + ~Orderer() { + LOG_PREFIX(ObjectContextLoader::~Orderer); + SUBDEBUG(osd, "releasing obc {}, {}", *(orderer_obc)); + } + }; + + Orderer get_obc_orderer(const hobject_t &oid) { + Orderer ret; + std::tie(ret.orderer_obc, std::ignore) = + obc_registry.get_cached_obc(oid.get_head()); + return ret; + } + + Manager get_obc_manager(const hobject_t &oid, bool resolve_clone = true) { + Manager ret(*this, oid); + ret.options.resolve_clone = resolve_clone; + return ret; + } + + Manager get_obc_manager( + Orderer &orderer, const hobject_t &oid, bool resolve_clone = true) { + Manager ret = get_obc_manager(oid, resolve_clone); + ret.set_state_obc(ret.head_state, orderer.orderer_obc); + return ret; + } + + using load_and_lock_ertr = load_obc_ertr; + using load_and_lock_iertr = interruptible::interruptible_errorator< + IOInterruptCondition, load_and_lock_ertr>; + using load_and_lock_fut = load_and_lock_iertr::future<>; +private: + load_and_lock_fut load_and_lock_head(Manager &, RWState::State); + load_and_lock_fut load_and_lock_clone(Manager &, RWState::State, bool lock_head=true); +public: + load_and_lock_fut load_and_lock(Manager &, RWState::State); + using interruptor = ::crimson::interruptible::interruptor< ::crimson::osd::IOInterruptCondition>; @@ -43,8 +250,13 @@ public: // See SnapTrimObjSubEvent::remove_or_update - in_removed_snaps_queue usage. template<RWState::State State> load_obc_iertr::future<> with_obc(hobject_t oid, - with_obc_func_t&& func, - bool resolve_clone = true); + with_obc_func_t func, + bool resolve_clone = true) { + auto manager = get_obc_manager(oid, resolve_clone); + co_await load_and_lock(manager, State); + co_await std::invoke( + func, manager.get_head_obc(), manager.get_obc()); + } // Use this variant in the case where the head object // obc is already locked and only the clone obc is needed. @@ -53,10 +265,20 @@ public: template<RWState::State State> load_obc_iertr::future<> with_clone_obc_only(ObjectContextRef head, hobject_t clone_oid, - with_obc_func_t&& func, - bool resolve_clone = true); - - load_obc_iertr::future<> reload_obc(ObjectContext& obc) const; + with_obc_func_t func, + bool resolve_clone = true) { + LOG_PREFIX(ObjectContextLoader::with_clone_obc_only); + SUBDEBUGDPP(osd, "{}", dpp, clone_oid); + auto manager = get_obc_manager(clone_oid, resolve_clone); + // We populate head_state here with the passed obc assuming that + // it has been loaded and locked appropriately. We do not populate + // head_state.state because we won't be taking or releasing any + // locks on head as part of this call. + manager.head_state.obc = head; + manager.head_state.obc->append_to(obc_set_accessing); + co_await load_and_lock_clone(manager, State, false); + co_await std::invoke(func, head, manager.get_obc()); + } void notify_on_change(bool is_primary); @@ -66,24 +288,9 @@ private: DoutPrefixProvider& dpp; obc_accessing_list_t obc_set_accessing; - template<RWState::State State> - load_obc_iertr::future<> with_clone_obc(const hobject_t& oid, - with_obc_func_t&& func, - bool resolve_clone); - - template<RWState::State State> - load_obc_iertr::future<> with_head_obc(const hobject_t& oid, - with_obc_func_t&& func); - - template<RWState::State State, bool track, typename Func> - load_obc_iertr::future<> with_locked_obc(const hobject_t& oid, - Func&& func); - - template<RWState::State State> - load_obc_iertr::future<ObjectContextRef> - get_or_load_obc(ObjectContextRef obc, - bool existed); - load_obc_iertr::future<> load_obc(ObjectContextRef obc); }; + +using ObjectContextManager = ObjectContextLoader::Manager; + } |