1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#include <seastar/core/future.hh>
#include "crimson/osd/osd_operations/internal_client_request.h"
#include "osd/object_state_fmt.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
}
}
namespace crimson {
template <>
struct EventBackendRegistry<osd::InternalClientRequest> {
static std::tuple<> get_backends() {
return {};
}
};
}
SET_SUBSYS(osd);
namespace crimson::osd {
InternalClientRequest::InternalClientRequest(Ref<PG> pg)
: pg(pg), start_epoch(pg->get_osdmap_epoch())
{
assert(bool(this->pg));
assert(this->pg->is_primary());
}
InternalClientRequest::~InternalClientRequest()
{
LOG_PREFIX(InternalClientRequest::~InternalClientRequest);
DEBUGI("{}: destroying", *this);
}
void InternalClientRequest::print(std::ostream &) const
{
}
void InternalClientRequest::dump_detail(Formatter *f) const
{
}
CommonPGPipeline& InternalClientRequest::client_pp()
{
return pg->request_pg_pipeline;
}
InternalClientRequest::interruptible_future<>
InternalClientRequest::with_interruption()
{
LOG_PREFIX(InternalClientRequest::with_interruption);
assert(pg->is_active());
obc_orderer = pg->obc_loader.get_obc_orderer(get_target_oid());
auto obc_manager = pg->obc_loader.get_obc_manager(
*obc_orderer,
get_target_oid());
co_await enter_stage<interruptor>(obc_orderer->obc_pp().process);
bool unfound = co_await do_recover_missing(
pg, get_target_oid(), osd_reqid_t());
if (unfound) {
throw std::system_error(
std::make_error_code(std::errc::operation_canceled),
fmt::format("{} is unfound, drop it!", get_target_oid()));
}
DEBUGI("{}: generating ops", *this);
auto osd_ops = create_osd_ops();
DEBUGI("InternalClientRequest: got {} OSDOps to execute",
std::size(osd_ops));
[[maybe_unused]] const int ret = op_info.set_from_op(
std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
assert(ret == 0);
co_await pg->obc_loader.load_and_lock(
obc_manager, pg->get_lock_type(op_info)
).handle_error_interruptible(
crimson::ct_error::assert_all("unexpected error")
);
auto params = get_do_osd_ops_params();
OpsExecuter ox(
pg, obc_manager.get_obc(), op_info, params, params.get_connection(),
SnapContext{});
co_await pg->run_executer(
ox, obc_manager.get_obc(), op_info, osd_ops
).handle_error_interruptible(
crimson::ct_error::all_same_way(
[this, FNAME](auto e) {
ERRORDPPI("{}: got unexpected error {}", *pg, *this, e);
ceph_assert(0 == "should not return an error");
return interruptor::now();
})
);
auto [submitted, completed] = co_await pg->submit_executer(
std::move(ox), osd_ops);
co_await std::move(submitted);
co_await enter_stage<interruptor>(obc_orderer->obc_pp().wait_repop);
co_await std::move(completed);
DEBUGDPP("{}: complete", *pg, *this);
co_await interruptor::make_interruptible(handle.complete());
co_return;
}
seastar::future<> InternalClientRequest::start()
{
track_event<StartEvent>();
LOG_PREFIX(InternalClientRequest::start);
DEBUGI("{}: in repeat", *this);
return interruptor::with_interruption([this]() mutable {
return with_interruption();
}, [](std::exception_ptr eptr) {
return seastar::now();
}, pg, start_epoch).then([this] {
track_event<CompletionEvent>();
}).handle_exception_type([](std::system_error &error) {
logger().debug("error {}, message: {}", error.code(), error.what());
return seastar::now();
}).finally([this] {
logger().debug("{}: exit", *this);
return handle.complete();
});
}
} // namespace crimson::osd
|