summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/osd_operation.h
diff options
context:
space:
mode:
authorSamuel Just <sjust@redhat.com>2020-01-16 20:41:25 +0100
committerXuehan Xu <xxhdx1985126@163.com>2020-04-26 09:46:35 +0200
commit12a19749e306fb0b9aad8861253369d5ccb17c93 (patch)
tree5d5acec0f609142eaf264e645364a7f946560fae /src/crimson/osd/osd_operation.h
parentcrimson/osd: introduce scheduler implementations and operation throttler (diff)
downloadceph-12a19749e306fb0b9aad8861253369d5ccb17c93.tar.xz
ceph-12a19749e306fb0b9aad8861253369d5ccb17c93.zip
crimson/osd/osd_operation: add support for AggregateBlocker
Signed-off-by: Samuel Just <sjust@redhat.com>
Diffstat (limited to '')
-rw-r--r--src/crimson/osd/osd_operation.h62
1 files changed, 55 insertions, 7 deletions
diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h
index a4382c09586..6e368384d57 100644
--- a/src/crimson/osd/osd_operation.h
+++ b/src/crimson/osd/osd_operation.h
@@ -60,21 +60,39 @@ class Blocker;
* Provides an abstraction for registering and unregistering a blocker
* for the duration of a future becoming available.
*/
-template <typename... T>
-class blocking_future {
+template <typename Fut>
+class blocking_future_detail {
friend class Operation;
friend class Blocker;
Blocker *blocker;
- seastar::future<T...> fut;
- blocking_future(Blocker *b, seastar::future<T...> &&f)
+ Fut fut;
+ blocking_future_detail(Blocker *b, Fut &&f)
: blocker(b), fut(std::move(f)) {}
template <typename... V, typename... U>
- friend blocking_future<V...> make_ready_blocking_future(U&&... args);
+ friend blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args);
+
+ template <typename U>
+ friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+
+ template <typename U>
+ friend class blocking_future_detail;
+
+public:
+ template <typename F>
+ auto then(F &&f) && {
+ using result = decltype(std::declval<Fut>().then(f));
+ return blocking_future_detail<seastar::futurize_t<result>>(
+ blocker,
+ std::move(fut).then(std::forward<F>(f)));
+ }
};
+template <typename... T>
+using blocking_future = blocking_future_detail<seastar::future<T...>>;
+
template <typename... V, typename... U>
-blocking_future<V...> make_ready_blocking_future(U&&... args) {
+blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args) {
return blocking_future<V...>(
nullptr,
seastar::make_ready_future<V...>(std::forward<U>(args)...));
@@ -91,7 +109,7 @@ protected:
public:
template <typename... T>
blocking_future<T...> make_blocking_future(seastar::future<T...> &&f) {
- return blocking_future(this, std::move(f));
+ return blocking_future<T...>(this, std::move(f));
}
void dump(ceph::Formatter *f) const;
@@ -111,6 +129,36 @@ public:
virtual ~BlockerT() = default;
};
+class AggregateBlocker : public BlockerT<AggregateBlocker> {
+ vector<Blocker*> parent_blockers;
+protected:
+ void dump_detail(ceph::Formatter *f) const final;
+public:
+ AggregateBlocker(vector<Blocker*> &&parent_blockers)
+ : parent_blockers(std::move(parent_blockers)) {}
+ static constexpr const char *type_name = "AggregateBlocker";
+};
+
+template <typename T>
+blocking_future<> join_blocking_futures(T &&t) {
+ vector<Blocker*> blockers;
+ blockers.reserve(t.size());
+ for (auto &&bf: t) {
+ blockers.push_back(bf.blocker);
+ bf.blocker = nullptr;
+ }
+ auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
+ return agg->make_blocking_future(
+ seastar::parallel_for_each(
+ std::forward<T>(t),
+ [](auto &&bf) {
+ return std::move(bf.fut);
+ }).then([agg=std::move(agg)] {
+ return seastar::now();
+ }));
+}
+
+
/**
* Common base for all crimson-osd operations. Mainly provides
* an interface for registering ops in flight and dumping