summaryrefslogtreecommitdiffstats
path: root/src/crimson
diff options
context:
space:
mode:
authorSamuel Just <sjust@redhat.com>2024-09-27 00:15:48 +0200
committerSamuel Just <sjust@redhat.com>2024-10-15 05:37:26 +0200
commit304e20e9bcf6f29b0f0f22089665d78099265fec (patch)
tree16d05f51af28a7bc9a1437e14416d7d7eaae2f9a /src/crimson
parentcrimson: introduce PG::run_executer,submit_executer (diff)
downloadceph-304e20e9bcf6f29b0f0f22089665d78099265fec.tar.xz
ceph-304e20e9bcf6f29b0f0f22089665d78099265fec.zip
crimson: switch ClientRequest::do_request to use *_executer rather than do_osd_ops
Signed-off-by: Samuel Just <sjust@redhat.com>
Diffstat (limited to 'src/crimson')
-rw-r--r--src/crimson/osd/osd_operations/client_request.cc141
1 files changed, 117 insertions, 24 deletions
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc
index 6eed04df6a5..c226222fa0c 100644
--- a/src/crimson/osd/osd_operations/client_request.cc
+++ b/src/crimson/osd/osd_operations/client_request.cc
@@ -502,36 +502,129 @@ ClientRequest::do_process(
co_return;
}
- auto [submitted, all_completed] = co_await pg->do_osd_ops(
- m, r_conn, obc, op_info, snapc
- ).handle_error_interruptible(
- crimson::ct_error::eagain::handle([] {
- ceph_assert(0 == "not handled");
- return std::make_tuple(
- interruptor::now(),
- PG::do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>());
+ auto ox = seastar::make_lw_shared<OpsExecuter>(
+ pg, obc, op_info, *m, r_conn, snapc);
+ auto ret = co_await pg->run_executer(
+ ox, obc, op_info, m->ops
+ ).si_then([]() -> std::optional<std::error_code> {
+ return std::nullopt;
+ }).handle_error_interruptible(crimson::ct_error::all_same_way(
+ [](auto e) -> std::optional<std::error_code> {
+ return e;
})
);
- co_await std::move(submitted);
- co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+ auto should_log_error = [](std::error_code e) -> bool {
+ switch (e.value()) {
+ case EDQUOT:
+ case ENOSPC:
+ case EAGAIN:
+ return false;
+ default:
+ return true;
+ }
+ };
- auto reply = co_await std::move(
- all_completed
- ).handle_error_interruptible(
- crimson::ct_error::eagain::handle([] {
- ceph_assert(0 == "not handled");
- return MURef<MOSDOpReply>();
- })
- );
+ if (ret && !should_log_error(*ret)) {
+ co_await reply_op_error(pg, -ret->value());
+ co_return;
+ }
+
+ {
+ auto all_completed = interruptor::now();
+ if (ret) {
+ assert(should_log_error(*ret));
+ if (op_info.may_write()) {
+ auto rep_tid = pg->shard_services.get_tid();
+ auto version = co_await pg->submit_error_log(
+ m, op_info, obc, *ret, rep_tid);
+
+ all_completed = pg->complete_error_log(
+ rep_tid, version);
+ }
+ // simply return the error below, leaving all_completed alone
+ } else {
+ auto submitted = interruptor::now();
+ std::tie(submitted, all_completed) = co_await pg->submit_executer(
+ std::move(ox), m->ops);
+ co_await std::move(submitted);
+ }
+ co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+
+ co_await std::move(all_completed);
+ }
co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this);
- DEBUGDPP("{}.{}: sending response",
- *pg, *this, this_instance_id);
- // TODO: gate the crosscore sending
- co_await interruptor::make_interruptible(
- get_foreign_connection().send_with_throttling(std::move(reply))
- );
+
+ if (ret) {
+ int err = -ret->value();
+ DEBUGDPP("{}: replying with error {}", *pg, *this, err);
+
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), err, pg->get_osdmap_epoch(), 0, false);
+
+ if (!m->ops.empty() && m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+ reply->set_result(0);
+ }
+
+ // For all ops except for CMPEXT, the correct error value is encoded
+ // in e. For CMPEXT, osdop.rval has the actual error value.
+ if (err == -ct_error::cmp_fail_error_value) {
+ assert(!m->ops.empty());
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0) {
+ reply->set_result(osdop.rval);
+ break;
+ }
+ }
+ }
+
+ reply->set_enoent_reply_versions(
+ pg->peering_state.get_info().last_update,
+ pg->peering_state.get_info().last_user_version);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+
+ // TODO: gate the crosscore sending
+ co_await interruptor::make_interruptible(
+ get_foreign_connection().send_with_throttling(std::move(reply)));
+ } else {
+ int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+ if (op_info.may_read() && result >= 0) {
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = osdop.rval.code;
+ break;
+ }
+ }
+ } else if (result > 0 && op_info.may_write() && !op_info.allows_returnvec()) {
+ result = 0;
+ } else if (result < 0 &&
+ (m->ops.empty() ?
+ 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = 0;
+ }
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(),
+ result,
+ pg->get_osdmap_epoch(),
+ 0,
+ false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ if (obc->obs.exists) {
+ reply->set_reply_versions(pg->peering_state.get_info().last_update,
+ obc->obs.oi.user_version);
+ } else {
+ reply->set_reply_versions(pg->peering_state.get_info().last_update,
+ pg->peering_state.get_info().last_user_version);
+ }
+
+ DEBUGDPP("{}.{}: sending response {}",
+ *pg, *this, this_instance_id, *m);
+ // TODO: gate the crosscore sending
+ co_await interruptor::make_interruptible(
+ get_foreign_connection().send_with_throttling(std::move(reply))
+ );
+ }
}
bool ClientRequest::is_misdirected(const PG& pg) const