1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <type_traits>
#include "include/rados/librados.hpp"
#include "librados/librados_asio.h"
#include "rgw_aio.h"
namespace rgw {
namespace {
void cb(librados::completion_t, void* arg);
struct state {
Aio* aio;
librados::AioCompletion* c;
state(Aio* aio, AioResult& r)
: aio(aio),
c(librados::Rados::aio_create_completion(&r, &cb)) {}
};
void cb(librados::completion_t, void* arg) {
static_assert(sizeof(AioResult::user_data) >= sizeof(state));
static_assert(std::is_trivially_destructible_v<state>);
auto& r = *(static_cast<AioResult*>(arg));
auto s = reinterpret_cast<state*>(&r.user_data);
r.result = s->c->get_return_value();
s->c->release();
s->aio->put(r);
}
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op) {
return [op = std::move(op)] (Aio* aio, AioResult& r) mutable {
constexpr bool read = std::is_same_v<std::decay_t<Op>, librados::ObjectReadOperation>;
auto s = new (&r.user_data) state(aio, r);
if constexpr (read) {
r.result = r.obj.aio_operate(s->c, &op, &r.data);
} else {
r.result = r.obj.aio_operate(s->c, &op);
}
if (r.result < 0) {
s->c->release();
aio->put(r);
}
};
}
#ifdef HAVE_BOOST_CONTEXT
struct Handler {
Aio* throttle = nullptr;
AioResult& r;
// write callback
void operator()(boost::system::error_code ec) const {
r.result = -ec.value();
throttle->put(r);
}
// read callback
void operator()(boost::system::error_code ec, bufferlist bl) const {
r.result = -ec.value();
r.data = std::move(bl);
throttle->put(r);
}
};
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op, boost::asio::io_context& context,
spawn::yield_context yield) {
return [op = std::move(op), &context, yield] (Aio* aio, AioResult& r) mutable {
// arrange for the completion Handler to run on the yield_context's strand
// executor so it can safely call back into Aio without locking
using namespace boost::asio;
async_completion<spawn::yield_context, void()> init(yield);
auto ex = get_associated_executor(init.completion_handler);
auto& ref = r.obj.get_ref();
librados::async_operate(context, ref.pool.ioctx(), ref.obj.oid, &op, 0,
bind_executor(ex, Handler{aio, r}));
};
}
#endif // HAVE_BOOST_CONTEXT
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op, optional_yield y) {
static_assert(std::is_base_of_v<librados::ObjectOperation, std::decay_t<Op>>);
static_assert(!std::is_lvalue_reference_v<Op>);
static_assert(!std::is_const_v<Op>);
#ifdef HAVE_BOOST_CONTEXT
if (y) {
return aio_abstract(std::forward<Op>(op), y.get_io_context(),
y.get_yield_context());
}
#endif
return aio_abstract(std::forward<Op>(op));
}
} // anonymous namespace
Aio::OpFunc Aio::librados_op(librados::ObjectReadOperation&& op,
optional_yield y) {
return aio_abstract(std::move(op), y);
}
Aio::OpFunc Aio::librados_op(librados::ObjectWriteOperation&& op,
optional_yield y) {
return aio_abstract(std::move(op), y);
}
} // namespace rgw
|