diff options
Diffstat (limited to 'src/librados/librados_asio.h')
-rw-r--r-- | src/librados/librados_asio.h | 72 |
1 files changed, 66 insertions, 6 deletions
diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h index 0aedc376575..3e5b7c57c6f 100644 --- a/src/librados/librados_asio.h +++ b/src/librados/librados_asio.h @@ -14,6 +14,9 @@ #ifndef LIBRADOS_ASIO_H #define LIBRADOS_ASIO_H +#include <boost/asio/associated_cancellation_slot.hpp> +#include <boost/asio/cancellation_type.hpp> + #include "include/rados/librados.hpp" #include "common/async/completion.h" #include "librados/AioCompletionImpl.h" @@ -74,6 +77,7 @@ struct Invoker<void> { template <typename Result> struct AsyncOp : Invoker<Result> { unique_aio_completion_ptr aio_completion; + boost::asio::cancellation_slot slot; using Signature = typename Invoker<Result>::Signature; using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>; @@ -83,6 +87,7 @@ struct AsyncOp : Invoker<Result> { auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)}; // move result out of Completion memory being freed auto op = std::move(p->user_data); + op.slot.clear(); // clear our cancellation handler // access AioCompletionImpl directly to avoid locking const librados::AioCompletionImpl* pc = op.aio_completion->pc; const int ret = pc->rval; @@ -94,11 +99,46 @@ struct AsyncOp : Invoker<Result> { op.dispatch(std::move(p), ec, ver); } + struct op_cancellation { + AioCompletion* completion = nullptr; + bool is_read = false; + + void operator()(boost::asio::cancellation_type type) { + if (completion == nullptr) { + return; // no AioCompletion attached + } else if (type == boost::asio::cancellation_type::none) { + return; // no cancellation requested + } else if (is_read) { + // read operations produce no side effects, so can satisfy the + // requirements of 'total' cancellation. the weaker requirements + // of 'partial' and 'terminal' are also satisfied + completion->cancel(); + } else if (type == boost::asio::cancellation_type::terminal) { + // write operations only support 'terminal' cancellation because we + // can't guarantee that no osd has succeeded (or will succeed) in + // applying the write + completion->cancel(); + } + } + }; + template <typename Executor1, typename CompletionHandler> - static auto create(const Executor1& ex1, CompletionHandler&& handler) { + static auto create(const Executor1& ex1, bool is_read, + CompletionHandler&& handler) { + op_cancellation* cancel_handler = nullptr; + auto slot = boost::asio::get_associated_cancellation_slot(handler); + if (slot.is_connected()) { + cancel_handler = &slot.template emplace<op_cancellation>(); + } + auto p = Completion::create(ex1, std::move(handler)); p->user_data.aio_completion.reset( Rados::aio_create_completion(p.get(), aio_dispatch)); + if (cancel_handler) { + cancel_handler->completion = p->user_data.aio_completion.get(); + cancel_handler->is_read = is_read; + p->user_data.slot = std::move(slot); + } return p; } }; @@ -108,6 +148,9 @@ struct AsyncOp : Invoker<Result> { /// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a /// given handler with signature (error_code, version_t, bufferlist). +/// +/// The given IoCtx reference is not required to remain valid, but some IoCtx +/// instance must preserve its underlying implementation until completion. template <typename ExecutionContext, typename CompletionToken> auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, size_t len, uint64_t off, CompletionToken&& token) @@ -117,7 +160,8 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate<CompletionToken, Signature>( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, size_t len, uint64_t off) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = true; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off); @@ -132,6 +176,9 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid, /// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a /// given handler with signature (error_code, version_t). +/// +/// The given IoCtx reference is not required to remain valid, but some IoCtx +/// instance must preserve its underlying implementation until completion. template <typename ExecutionContext, typename CompletionToken> auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, const bufferlist &bl, size_t len, uint64_t off, @@ -142,7 +189,8 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate<CompletionToken, Signature>( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, const bufferlist &bl, size_t len, uint64_t off) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off); @@ -157,6 +205,9 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid, /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a /// given handler with signature (error_code, version_t, bufferlist). +/// +/// The given IoCtx reference is not required to remain valid, but some IoCtx +/// instance must preserve its underlying implementation until completion. template <typename ExecutionContext, typename CompletionToken> auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, ObjectReadOperation *read_op, int flags, @@ -167,7 +218,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate<CompletionToken, Signature>( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, ObjectReadOperation *read_op, int flags) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = true; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_operate(oid, op.aio_completion.get(), read_op, @@ -183,6 +235,9 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, /// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a /// given handler with signature (error_code, version_t). +/// +/// The given IoCtx reference is not required to remain valid, but some IoCtx +/// instance must preserve its underlying implementation until completion. template <typename ExecutionContext, typename CompletionToken> auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, ObjectWriteOperation *write_op, int flags, @@ -194,7 +249,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, [] (auto handler, auto ex, IoCtx& io, const std::string& oid, ObjectWriteOperation *write_op, int flags, const jspan_context* trace_ctx) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx); @@ -209,6 +265,9 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid, /// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a /// given handler with signature (error_code, version_t, bufferlist). +/// +/// The given IoCtx reference is not required to remain valid, but some IoCtx +/// instance must preserve its underlying implementation until completion. template <typename ExecutionContext, typename CompletionToken> auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid, bufferlist& bl, uint64_t timeout_ms, CompletionToken &&token) @@ -218,7 +277,8 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid, return boost::asio::async_initiate<CompletionToken, Signature>( [] (auto handler, auto ex, IoCtx& io, const std::string& oid, bufferlist& bl, uint64_t timeout_ms) { - auto p = Op::create(ex, std::move(handler)); + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); auto& op = p->user_data; int ret = io.aio_notify(oid, op.aio_completion.get(), |