diff options
author | Samuel Just <sjust@redhat.com> | 2020-01-16 20:41:25 +0100 |
---|---|---|
committer | Xuehan Xu <xxhdx1985126@163.com> | 2020-04-26 09:46:35 +0200 |
commit | 12a19749e306fb0b9aad8861253369d5ccb17c93 (patch) | |
tree | 5d5acec0f609142eaf264e645364a7f946560fae /src/crimson/osd/osd_operation.h | |
parent | crimson/osd: introduce scheduler implementations and operation throttler (diff) | |
download | ceph-12a19749e306fb0b9aad8861253369d5ccb17c93.tar.xz ceph-12a19749e306fb0b9aad8861253369d5ccb17c93.zip |
crimson/osd/osd_operation: add support for AggregateBlocker
Signed-off-by: Samuel Just <sjust@redhat.com>
Diffstat (limited to '')
-rw-r--r-- | src/crimson/osd/osd_operation.h | 62 |
1 files changed, 55 insertions, 7 deletions
diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index a4382c09586..6e368384d57 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -60,21 +60,39 @@ class Blocker; * Provides an abstraction for registering and unregistering a blocker * for the duration of a future becoming available. */ -template <typename... T> -class blocking_future { +template <typename Fut> +class blocking_future_detail { friend class Operation; friend class Blocker; Blocker *blocker; - seastar::future<T...> fut; - blocking_future(Blocker *b, seastar::future<T...> &&f) + Fut fut; + blocking_future_detail(Blocker *b, Fut &&f) : blocker(b), fut(std::move(f)) {} template <typename... V, typename... U> - friend blocking_future<V...> make_ready_blocking_future(U&&... args); + friend blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args); + + template <typename U> + friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u); + + template <typename U> + friend class blocking_future_detail; + +public: + template <typename F> + auto then(F &&f) && { + using result = decltype(std::declval<Fut>().then(f)); + return blocking_future_detail<seastar::futurize_t<result>>( + blocker, + std::move(fut).then(std::forward<F>(f))); + } }; +template <typename... T> +using blocking_future = blocking_future_detail<seastar::future<T...>>; + template <typename... V, typename... U> -blocking_future<V...> make_ready_blocking_future(U&&... args) { +blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args) { return blocking_future<V...>( nullptr, seastar::make_ready_future<V...>(std::forward<U>(args)...)); @@ -91,7 +109,7 @@ protected: public: template <typename... T> blocking_future<T...> make_blocking_future(seastar::future<T...> &&f) { - return blocking_future(this, std::move(f)); + return blocking_future<T...>(this, std::move(f)); } void dump(ceph::Formatter *f) const; @@ -111,6 +129,36 @@ public: virtual ~BlockerT() = default; }; +class AggregateBlocker : public BlockerT<AggregateBlocker> { + vector<Blocker*> parent_blockers; +protected: + void dump_detail(ceph::Formatter *f) const final; +public: + AggregateBlocker(vector<Blocker*> &&parent_blockers) + : parent_blockers(std::move(parent_blockers)) {} + static constexpr const char *type_name = "AggregateBlocker"; +}; + +template <typename T> +blocking_future<> join_blocking_futures(T &&t) { + vector<Blocker*> blockers; + blockers.reserve(t.size()); + for (auto &&bf: t) { + blockers.push_back(bf.blocker); + bf.blocker = nullptr; + } + auto agg = std::make_unique<AggregateBlocker>(std::move(blockers)); + return agg->make_blocking_future( + seastar::parallel_for_each( + std::forward<T>(t), + [](auto &&bf) { + return std::move(bf.fut); + }).then([agg=std::move(agg)] { + return seastar::now(); + })); +} + + /** * Common base for all crimson-osd operations. Mainly provides * an interface for registering ops in flight and dumping |