summaryrefslogtreecommitdiffstats
path: root/src/common/async/completion.h
blob: d8065934e016bbd758adfe43de693803e256ede2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2018 Red Hat
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation. See file COPYING.
 *
 */

#ifndef CEPH_ASYNC_COMPLETION_H
#define CEPH_ASYNC_COMPLETION_H

#include <memory>

#include <boost/asio/bind_executor.hpp>
#include <boost/asio/defer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/post.hpp>

#include "bind_handler.h"
#include "forward_handler.h"

namespace ceph::async {

/**
 * Abstract completion handler interface for use with boost::asio.
 *
 * Memory management is performed using the Handler's 'associated allocator',
 * which carries the additional requirement that its memory be released before
 * the Handler is invoked. This allows memory allocated for one asynchronous
 * operation to be reused in its continuation. Because of this requirement, any
 * calls to invoke the completion must first release ownership of it. To enforce
 * this, the static functions defer()/dispatch()/post() take the completion by
 * rvalue-reference to std::unique_ptr<Completion>, i.e. std::move(completion).
 *
 * Handlers may also have an 'associated executor', so the calls to defer(),
 * dispatch(), and post() are forwarded to that executor. If there is no
 * associated executor (which is generally the case unless one was bound with
 * boost::asio::bind_executor()), the executor passed to Completion::create()
 * is used as a default.
 *
 * Example use:
 *
 *   // declare a Completion type with Signature = void(int, string)
 *   using MyCompletion = ceph::async::Completion<void(int, string)>;
 *
 *   // create a completion with the given callback:
 *   std::unique_ptr<MyCompletion> c;
 *   c = MyCompletion::create(ex, [] (int a, const string& b) {});
 *
 *   // bind arguments to the callback and post to its associated executor:
 *   MyCompletion::post(std::move(c), 5, "hello");
 *
 *
 * Additional user data may be stored along with the Completion to take
 * advantage of the handler allocator optimization. This is accomplished by
 * specifying its type in the template parameter T. For example, the type
 * Completion<void(), int> contains a public member variable 'int user_data'.
 * Any additional arguments to Completion::create() will be forwarded to type
 * T's constructor.
 *
 * If the AsBase<T> type tag is used, as in Completion<void(), AsBase<T>>,
 * the Completion will inherit from T instead of declaring it as a member
 * variable.
 *
 * When invoking the completion handler via defer(), dispatch(), or post(),
 * care must be taken when passing arguments that refer to user data, because
 * its memory is destroyed prior to invocation. In such cases, the user data
 * should be moved/copied out of the Completion first.
 */
template <typename Signature, typename T = void>
class Completion;


/// type tag for UserData
template <typename T> struct AsBase {};

namespace detail {

/// optional user data to be stored with the Completion
template <typename T>
struct UserData {
  T user_data;
  template <typename ...Args>
  UserData(Args&& ...args)
    : user_data(std::forward<Args>(args)...)
  {}
};
// AsBase specialization inherits from T
template <typename T>
struct UserData<AsBase<T>> : public T {
  template <typename ...Args>
  UserData(Args&& ...args)
    : T(std::forward<Args>(args)...)
  {}
};
// void specialization
template <>
class UserData<void> {};

} // namespace detail


// template specialization to pull the Signature's args apart
template <typename T, typename ...Args>
class Completion<void(Args...), T> : public detail::UserData<T> {
 protected:
  // internal interfaces for type-erasure on the Handler/Executor. uses
  // tuple<Args...> to provide perfect forwarding because you can't make
  // virtual function templates
  virtual void destroy_defer(std::tuple<Args...>&& args) = 0;
  virtual void destroy_dispatch(std::tuple<Args...>&& args) = 0;
  virtual void destroy_post(std::tuple<Args...>&& args) = 0;
  virtual void destroy() = 0;

  // constructor is protected, use create(). any constructor arguments are
  // forwarded to UserData
  template <typename ...TArgs>
  Completion(TArgs&& ...args)
    : detail::UserData<T>(std::forward<TArgs>(args)...)
  {}
 public:
  virtual ~Completion() = default;

  // use the virtual destroy() interface on delete. this allows the derived
  // class to manage its memory using Handler allocators, without having to use
  // a custom Deleter for std::unique_ptr<>
  static void operator delete(void *p) {
    static_cast<Completion*>(p)->destroy();
  }

  /// completion factory function that uses the handler's associated allocator.
  /// any additional arguments are forwared to T's constructor
  template <typename Executor1, typename Handler, typename ...TArgs>
  static std::unique_ptr<Completion>
  create(const Executor1& ex1, Handler&& handler, TArgs&& ...args);

  /// take ownership of the completion, bind any arguments to the completion
  /// handler, then defer() it on its associated executor
  template <typename ...Args2>
  static void defer(std::unique_ptr<Completion>&& c, Args2&&...args);

  /// take ownership of the completion, bind any arguments to the completion
  /// handler, then dispatch() it on its associated executor
  template <typename ...Args2>
  static void dispatch(std::unique_ptr<Completion>&& c, Args2&&...args);

  /// take ownership of the completion, bind any arguments to the completion
  /// handler, then post() it to its associated executor
  template <typename ...Args2>
  static void post(std::unique_ptr<Completion>&& c, Args2&&...args);
};

namespace detail {

// concrete Completion that knows how to invoke the completion handler. this
// observes all of the 'Requirements on asynchronous operations' specified by
// the C++ Networking TS
template <typename Executor1, typename Handler, typename T, typename ...Args>
class CompletionImpl final : public Completion<void(Args...), T> {
  // use Handler's associated executor (or Executor1 by default) for callbacks
  using Executor2 = boost::asio::associated_executor_t<Handler, Executor1>;
  // maintain work on both executors
  using Work1 = boost::asio::executor_work_guard<Executor1>;
  using Work2 = boost::asio::executor_work_guard<Executor2>;
  std::pair<Work1, Work2> work;
  Handler handler;

  // use Handler's associated allocator
  using Alloc2 = boost::asio::associated_allocator_t<Handler>;
  using Traits2 = std::allocator_traits<Alloc2>;
  using RebindAlloc2 = typename Traits2::template rebind_alloc<CompletionImpl>;
  using RebindTraits2 = std::allocator_traits<RebindAlloc2>;

  // placement new for the handler allocator
  static void* operator new(size_t, RebindAlloc2 alloc2) {
    return RebindTraits2::allocate(alloc2, 1);
  }
  // placement delete for when the constructor throws during placement new
  static void operator delete(void *p, RebindAlloc2 alloc2) {
    RebindTraits2::deallocate(alloc2, static_cast<CompletionImpl*>(p), 1);
  }

  static auto bind_and_forward(const Executor2& ex, Handler&& h,
                               std::tuple<Args...>&& args) {
    return forward_handler(CompletionHandler{
        boost::asio::bind_executor(ex, std::move(h)), std::move(args)});
  }

  void destroy_defer(std::tuple<Args...>&& args) override {
    auto w = std::move(work);
    auto ex2 = w.second.get_executor();
    RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
    auto f = bind_and_forward(ex2, std::move(handler), std::move(args));
    RebindTraits2::destroy(alloc2, this);
    RebindTraits2::deallocate(alloc2, this, 1);
    boost::asio::defer(boost::asio::bind_executor(ex2, std::move(f)));
  }
  void destroy_dispatch(std::tuple<Args...>&& args) override {
    auto w = std::move(work);
    auto ex2 = w.second.get_executor();
    RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
    auto f = bind_and_forward(ex2, std::move(handler), std::move(args));
    RebindTraits2::destroy(alloc2, this);
    RebindTraits2::deallocate(alloc2, this, 1);
    boost::asio::dispatch(std::move(f));
  }
  void destroy_post(std::tuple<Args...>&& args) override {
    auto w = std::move(work);
    auto ex2 = w.second.get_executor();
    RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
    auto f = bind_and_forward(ex2, std::move(handler), std::move(args));
    RebindTraits2::destroy(alloc2, this);
    RebindTraits2::deallocate(alloc2, this, 1);
    boost::asio::post(std::move(f));
  }
  void destroy() override {
    RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
    RebindTraits2::destroy(alloc2, this);
    RebindTraits2::deallocate(alloc2, this, 1);
  }

  // constructor is private, use create(). extra constructor arguments are
  // forwarded to UserData
  template <typename ...TArgs>
  CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args)
    : Completion<void(Args...), T>(std::forward<TArgs>(args)...),
      work(ex1, boost::asio::make_work_guard(handler, ex1)),
      handler(std::move(handler))
  {}

 public:
  template <typename ...TArgs>
  static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) {
    auto alloc2 = boost::asio::get_associated_allocator(handler);
    using Ptr = std::unique_ptr<CompletionImpl>;
    return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler),
                                           std::forward<TArgs>(args)...)};
  }

  static void operator delete(void *p) {
    static_cast<CompletionImpl*>(p)->destroy();
  }
};

} // namespace detail


template <typename T, typename ...Args>
template <typename Executor1, typename Handler, typename ...TArgs>
std::unique_ptr<Completion<void(Args...), T>>
Completion<void(Args...), T>::create(const Executor1& ex,
                                     Handler&& handler, TArgs&& ...args)
{
  using Impl = detail::CompletionImpl<Executor1, Handler, T, Args...>;
  return Impl::create(ex, std::forward<Handler>(handler),
                      std::forward<TArgs>(args)...);
}

template <typename T, typename ...Args>
template <typename ...Args2>
void Completion<void(Args...), T>::defer(std::unique_ptr<Completion>&& ptr,
                                         Args2&& ...args)
{
  auto c = ptr.release();
  c->destroy_defer(std::make_tuple(std::forward<Args2>(args)...));
}

template <typename T, typename ...Args>
template <typename ...Args2>
void Completion<void(Args...), T>::dispatch(std::unique_ptr<Completion>&& ptr,
                                            Args2&& ...args)
{
  auto c = ptr.release();
  c->destroy_dispatch(std::make_tuple(std::forward<Args2>(args)...));
}

template <typename T, typename ...Args>
template <typename ...Args2>
void Completion<void(Args...), T>::post(std::unique_ptr<Completion>&& ptr,
                                        Args2&& ...args)
{
  auto c = ptr.release();
  c->destroy_post(std::make_tuple(std::forward<Args2>(args)...));
}


/// completion factory function that uses the handler's associated allocator.
/// any additional arguments are forwared to T's constructor
template <typename Signature, typename T, typename Executor1,
          typename Handler, typename ...TArgs>
std::unique_ptr<Completion<Signature, T>>
create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args)
{
  return Completion<Signature, T>::create(ex, std::forward<Handler>(handler),
                                          std::forward<TArgs>(args)...);
}

/// take ownership of the completion, bind any arguments to the completion
/// handler, then defer() it on its associated executor
template <typename Signature, typename T, typename ...Args>
void defer(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
{
  Completion<Signature, T>::defer(std::move(ptr), std::forward<Args>(args)...);
}

/// take ownership of the completion, bind any arguments to the completion
/// handler, then dispatch() it on its associated executor
template <typename Signature, typename T, typename ...Args>
void dispatch(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
{
  Completion<Signature, T>::dispatch(std::move(ptr), std::forward<Args>(args)...);
}

/// take ownership of the completion, bind any arguments to the completion
/// handler, then post() it to its associated executor
template <typename Signature, typename T, typename ...Args>
void post(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
{
  Completion<Signature, T>::post(std::move(ptr), std::forward<Args>(args)...);
}

} // namespace ceph::async

#endif // CEPH_ASYNC_COMPLETION_H