diff options
author | Samuel Just <sjust@redhat.com> | 2024-09-27 00:15:48 +0200 |
---|---|---|
committer | Samuel Just <sjust@redhat.com> | 2024-10-15 05:37:26 +0200 |
commit | 304e20e9bcf6f29b0f0f22089665d78099265fec (patch) | |
tree | 16d05f51af28a7bc9a1437e14416d7d7eaae2f9a /src/crimson | |
parent | crimson: introduce PG::run_executer,submit_executer (diff) | |
download | ceph-304e20e9bcf6f29b0f0f22089665d78099265fec.tar.xz ceph-304e20e9bcf6f29b0f0f22089665d78099265fec.zip |
crimson: switch ClientRequest::do_request to use *_executer rather than do_osd_ops
Signed-off-by: Samuel Just <sjust@redhat.com>
Diffstat (limited to 'src/crimson')
-rw-r--r-- | src/crimson/osd/osd_operations/client_request.cc | 141 |
1 files changed, 117 insertions, 24 deletions
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 6eed04df6a5..c226222fa0c 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -502,36 +502,129 @@ ClientRequest::do_process( co_return; } - auto [submitted, all_completed] = co_await pg->do_osd_ops( - m, r_conn, obc, op_info, snapc - ).handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return std::make_tuple( - interruptor::now(), - PG::do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>()); + auto ox = seastar::make_lw_shared<OpsExecuter>( + pg, obc, op_info, *m, r_conn, snapc); + auto ret = co_await pg->run_executer( + ox, obc, op_info, m->ops + ).si_then([]() -> std::optional<std::error_code> { + return std::nullopt; + }).handle_error_interruptible(crimson::ct_error::all_same_way( + [](auto e) -> std::optional<std::error_code> { + return e; }) ); - co_await std::move(submitted); - co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this); + auto should_log_error = [](std::error_code e) -> bool { + switch (e.value()) { + case EDQUOT: + case ENOSPC: + case EAGAIN: + return false; + default: + return true; + } + }; - auto reply = co_await std::move( - all_completed - ).handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return MURef<MOSDOpReply>(); - }) - ); + if (ret && !should_log_error(*ret)) { + co_await reply_op_error(pg, -ret->value()); + co_return; + } + + { + auto all_completed = interruptor::now(); + if (ret) { + assert(should_log_error(*ret)); + if (op_info.may_write()) { + auto rep_tid = pg->shard_services.get_tid(); + auto version = co_await pg->submit_error_log( + m, op_info, obc, *ret, rep_tid); + + all_completed = pg->complete_error_log( + rep_tid, version); + } + // simply return the error below, leaving all_completed alone + } else { + auto submitted = interruptor::now(); + std::tie(submitted, all_completed) = co_await pg->submit_executer( + std::move(ox), m->ops); + co_await std::move(submitted); + } + co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this); + + co_await std::move(all_completed); + } co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this); - DEBUGDPP("{}.{}: sending response", - *pg, *this, this_instance_id); - // TODO: gate the crosscore sending - co_await interruptor::make_interruptible( - get_foreign_connection().send_with_throttling(std::move(reply)) - ); + + if (ret) { + int err = -ret->value(); + DEBUGDPP("{}: replying with error {}", *pg, *this, err); + + auto reply = crimson::make_message<MOSDOpReply>( + m.get(), err, pg->get_osdmap_epoch(), 0, false); + + if (!m->ops.empty() && m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { + reply->set_result(0); + } + + // For all ops except for CMPEXT, the correct error value is encoded + // in e. For CMPEXT, osdop.rval has the actual error value. + if (err == -ct_error::cmp_fail_error_value) { + assert(!m->ops.empty()); + for (auto &osdop : m->ops) { + if (osdop.rval < 0) { + reply->set_result(osdop.rval); + break; + } + } + } + + reply->set_enoent_reply_versions( + pg->peering_state.get_info().last_update, + pg->peering_state.get_info().last_user_version); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply))); + } else { + int result = m->ops.empty() ? 0 : m->ops.back().rval.code; + if (op_info.may_read() && result >= 0) { + for (auto &osdop : m->ops) { + if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { + result = osdop.rval.code; + break; + } + } + } else if (result > 0 && op_info.may_write() && !op_info.allows_returnvec()) { + result = 0; + } else if (result < 0 && + (m->ops.empty() ? + 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { + result = 0; + } + auto reply = crimson::make_message<MOSDOpReply>( + m.get(), + result, + pg->get_osdmap_epoch(), + 0, + false); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + if (obc->obs.exists) { + reply->set_reply_versions(pg->peering_state.get_info().last_update, + obc->obs.oi.user_version); + } else { + reply->set_reply_versions(pg->peering_state.get_info().last_update, + pg->peering_state.get_info().last_user_version); + } + + DEBUGDPP("{}.{}: sending response {}", + *pg, *this, this_instance_id, *m); + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply)) + ); + } } bool ClientRequest::is_misdirected(const PG& pg) const |