diff options
Diffstat (limited to 'src/common/io_exerciser/RadosIo.cc')
-rw-r--r-- | src/common/io_exerciser/RadosIo.cc | 601 |
1 files changed, 377 insertions, 224 deletions
diff --git a/src/common/io_exerciser/RadosIo.cc b/src/common/io_exerciser/RadosIo.cc index 44b82260263..a78c074228b 100644 --- a/src/common/io_exerciser/RadosIo.cc +++ b/src/common/io_exerciser/RadosIo.cc @@ -1,300 +1,453 @@ #include "RadosIo.h" +#include <fmt/format.h> +#include <json_spirit/json_spirit.h> + +#include <ranges> + #include "DataGenerator.h" +#include "common/ceph_json.h" +#include "common/json/OSDStructures.h" using RadosIo = ceph::io_exerciser::RadosIo; -RadosIo::RadosIo(librados::Rados& rados, - boost::asio::io_context& asio, - const std::string& pool, - const std::string& oid, - uint64_t block_size, - int seed, - int threads, - ceph::mutex& lock, - ceph::condition_variable& cond) : - Model(oid, block_size), - rados(rados), - asio(asio), - om(std::make_unique<ObjectModel>(oid, block_size, seed)), - db(data_generation::DataGenerator::create_generator( - data_generation::GenerationType::HeaderedSeededRandom, *om)), - pool(pool), - threads(threads), - lock(lock), - cond(cond), - outstanding_io(0) -{ +namespace { +template <typename S> +int send_osd_command(int osd, S& s, librados::Rados& rados, const char* name, + ceph::buffer::list& inbl, ceph::buffer::list* outbl, + Formatter* f) { + encode_json(name, s, f); + + std::ostringstream oss; + f->flush(oss); + int rc = rados.osd_command(osd, oss.str(), inbl, outbl, nullptr); + return rc; +} + +template <typename S> +int send_mon_command(S& s, librados::Rados& rados, const char* name, + ceph::buffer::list& inbl, ceph::buffer::list* outbl, + Formatter* f) { + encode_json(name, s, f); + + std::ostringstream oss; + f->flush(oss); + int rc = rados.mon_command(oss.str(), inbl, outbl, nullptr); + return rc; +} +} // namespace + +RadosIo::RadosIo(librados::Rados& rados, boost::asio::io_context& asio, + const std::string& pool, const std::string& oid, + const std::optional<std::vector<int>>& cached_shard_order, + uint64_t block_size, int seed, int threads, ceph::mutex& lock, + ceph::condition_variable& cond) + : Model(oid, block_size), + rados(rados), + asio(asio), + om(std::make_unique<ObjectModel>(oid, block_size, seed)), + db(data_generation::DataGenerator::create_generator( + data_generation::GenerationType::HeaderedSeededRandom, *om)), + pool(pool), + cached_shard_order(cached_shard_order), + threads(threads), + lock(lock), + cond(cond), + outstanding_io(0) { int rc; rc = rados.ioctx_create(pool.c_str(), io); ceph_assert(rc == 0); allow_ec_overwrites(true); } -RadosIo::~RadosIo() -{ -} +RadosIo::~RadosIo() {} -void RadosIo::start_io() -{ +void RadosIo::start_io() { std::lock_guard l(lock); outstanding_io++; } -void RadosIo::finish_io() -{ +void RadosIo::finish_io() { std::lock_guard l(lock); ceph_assert(outstanding_io > 0); outstanding_io--; cond.notify_all(); } -void RadosIo::wait_for_io(int count) -{ +void RadosIo::wait_for_io(int count) { std::unique_lock l(lock); while (outstanding_io > count) { cond.wait(l); } } -void RadosIo::allow_ec_overwrites(bool allow) -{ +void RadosIo::allow_ec_overwrites(bool allow) { int rc; bufferlist inbl, outbl; - std::string cmdstr = - "{\"prefix\": \"osd pool set\", \"pool\": \"" + pool + "\", \ + std::string cmdstr = "{\"prefix\": \"osd pool set\", \"pool\": \"" + pool + + "\", \ \"var\": \"allow_ec_overwrites\", \"val\": \"" + - (allow ? "true" : "false") + "\"}"; + (allow ? "true" : "false") + "\"}"; rc = rados.mon_command(cmdstr, inbl, &outbl, nullptr); ceph_assert(rc == 0); } -RadosIo::AsyncOpInfo::AsyncOpInfo(uint64_t offset1, uint64_t length1, - uint64_t offset2, uint64_t length2, - uint64_t offset3, uint64_t length3 ) : - offset1(offset1), length1(length1), - offset2(offset2), length2(length2), - offset3(offset3), length3(length3) -{ +template <int N> +RadosIo::AsyncOpInfo<N>::AsyncOpInfo(const std::array<uint64_t, N>& offset, + const std::array<uint64_t, N>& length) + : offset(offset), length(length) {} -} - -bool RadosIo::readyForIoOp(IoOp &op) -{ - ceph_assert(ceph_mutex_is_locked_by_me(lock)); //Must be called with lock held +bool RadosIo::readyForIoOp(IoOp& op) { + ceph_assert( + ceph_mutex_is_locked_by_me(lock)); // Must be called with lock held if (!om->readyForIoOp(op)) { return false; } - switch (op.op) { - case OpType::Done: - case OpType::BARRIER: - return outstanding_io == 0; - default: - return outstanding_io < threads; + + switch (op.getOpType()) { + case OpType::Done: + case OpType::Barrier: + return outstanding_io == 0; + default: + return outstanding_io < threads; } } -void RadosIo::applyIoOp(IoOp &op) -{ - std::shared_ptr<AsyncOpInfo> op_info; - +void RadosIo::applyIoOp(IoOp& op) { om->applyIoOp(op); // If there are thread concurrent I/Os in flight then wait for // at least one I/O to complete - wait_for_io(threads-1); - - switch (op.op) { - case OpType::Done: - [[ fallthrough ]]; - case OpType::BARRIER: - // Wait for all outstanding I/O to complete - wait_for_io(0); - break; - - case OpType::CREATE: - { + wait_for_io(threads - 1); + + switch (op.getOpType()) { + case OpType::Done: + [[fallthrough]]; + case OpType::Barrier: + // Wait for all outstanding I/O to complete + wait_for_io(0); + break; + + case OpType::Create: { start_io(); - op_info = std::make_shared<AsyncOpInfo>(0, op.length1); - op_info->bl1 = db->generate_data(0, op.length1); - op_info->wop.write_full(op_info->bl1); - auto create_cb = [this] (boost::system::error_code ec, - version_t ver) { + uint64_t opSize = static_cast<CreateOp&>(op).size; + std::shared_ptr<AsyncOpInfo<1>> op_info = + std::make_shared<AsyncOpInfo<1>>(std::array<uint64_t, 1>{0}, + std::array<uint64_t, 1>{opSize}); + op_info->bufferlist[0] = db->generate_data(0, opSize); + op_info->wop.write_full(op_info->bufferlist[0]); + auto create_cb = [this](boost::system::error_code ec, version_t ver) { ceph_assert(ec == boost::system::errc::success); finish_io(); }; - librados::async_operate(asio, io, oid, - &op_info->wop, 0, nullptr, create_cb); + librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr, + create_cb); + break; } - break; - case OpType::REMOVE: - { + case OpType::Remove: { start_io(); - op_info = std::make_shared<AsyncOpInfo>(); + auto op_info = std::make_shared<AsyncOpInfo<0>>(); op_info->wop.remove(); - auto remove_cb = [this] (boost::system::error_code ec, - version_t ver) { + auto remove_cb = [this](boost::system::error_code ec, version_t ver) { ceph_assert(ec == boost::system::errc::success); finish_io(); }; - librados::async_operate(asio, io, oid, - &op_info->wop, 0, nullptr, remove_cb); + librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr, + remove_cb); + break; } - break; + case OpType::Read: + [[fallthrough]]; + case OpType::Read2: + [[fallthrough]]; + case OpType::Read3: + [[fallthrough]]; + case OpType::Write: + [[fallthrough]]; + case OpType::Write2: + [[fallthrough]]; + case OpType::Write3: + [[fallthrough]]; + case OpType::FailedWrite: + [[fallthrough]]; + case OpType::FailedWrite2: + [[fallthrough]]; + case OpType::FailedWrite3: + applyReadWriteOp(op); + break; + case OpType::InjectReadError: + [[fallthrough]]; + case OpType::InjectWriteError: + [[fallthrough]]; + case OpType::ClearReadErrorInject: + [[fallthrough]]; + case OpType::ClearWriteErrorInject: + applyInjectOp(op); + break; + default: + ceph_abort_msg("Unrecognised Op"); + break; + } +} - case OpType::READ: - { - start_io(); - op_info = std::make_shared<AsyncOpInfo>(op.offset1, op.length1); - op_info->rop.read(op.offset1 * block_size, - op.length1 * block_size, - &op_info->bl1, nullptr); - auto read_cb = [this, op_info] (boost::system::error_code ec, - version_t ver, - bufferlist bl) { - ceph_assert(ec == boost::system::errc::success); - ceph_assert(db->validate(op_info->bl1, - op_info->offset1, - op_info->length1)); - finish_io(); - }; - librados::async_operate(asio, io, oid, - &op_info->rop, 0, nullptr, read_cb); - num_io++; +void RadosIo::applyReadWriteOp(IoOp& op) { + auto applyReadOp = [this]<OpType opType, int N>( + ReadWriteOp<opType, N> readOp) { + auto op_info = + std::make_shared<AsyncOpInfo<N>>(readOp.offset, readOp.length); + + for (int i = 0; i < N; i++) { + op_info->rop.read(readOp.offset[i] * block_size, + readOp.length[i] * block_size, &op_info->bufferlist[i], + nullptr); } - break; + auto read_cb = [this, op_info](boost::system::error_code ec, version_t ver, + bufferlist bl) { + ceph_assert(ec == boost::system::errc::success); + for (int i = 0; i < N; i++) { + ceph_assert(db->validate(op_info->bufferlist[i], op_info->offset[i], + op_info->length[i])); + } + finish_io(); + }; + librados::async_operate(asio, io, oid, &op_info->rop, 0, nullptr, read_cb); + num_io++; + }; - case OpType::READ2: - { - start_io(); - op_info = std::make_shared<AsyncOpInfo>(op.offset1, - op.length1, - op.offset2, - op.length2); - - op_info->rop.read(op.offset1 * block_size, - op.length1 * block_size, - &op_info->bl1, nullptr); - op_info->rop.read(op.offset2 * block_size, - op.length2 * block_size, - &op_info->bl2, nullptr); - auto read2_cb = [this, op_info] (boost::system::error_code ec, - version_t ver, - bufferlist bl) { - ceph_assert(ec == boost::system::errc::success); - ceph_assert(db->validate(op_info->bl1, - op_info->offset1, - op_info->length1)); - ceph_assert(db->validate(op_info->bl2, - op_info->offset2, - op_info->length2)); - finish_io(); - }; - librados::async_operate(asio, io, oid, - &op_info->rop, 0, nullptr, read2_cb); - num_io++; + auto applyWriteOp = [this]<OpType opType, int N>( + ReadWriteOp<opType, N> writeOp) { + auto op_info = + std::make_shared<AsyncOpInfo<N>>(writeOp.offset, writeOp.length); + for (int i = 0; i < N; i++) { + op_info->bufferlist[i] = + db->generate_data(writeOp.offset[i], writeOp.length[i]); + op_info->wop.write(writeOp.offset[i] * block_size, + op_info->bufferlist[i]); } - break; + auto write_cb = [this](boost::system::error_code ec, version_t ver) { + ceph_assert(ec == boost::system::errc::success); + finish_io(); + }; + librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr, write_cb); + num_io++; + }; - case OpType::READ3: - { - start_io(); - op_info = std::make_shared<AsyncOpInfo>(op.offset1, op.length1, - op.offset2, op.length2, - op.offset3, op.length3); - op_info->rop.read(op.offset1 * block_size, - op.length1 * block_size, - &op_info->bl1, nullptr); - op_info->rop.read(op.offset2 * block_size, - op.length2 * block_size, - &op_info->bl2, nullptr); - op_info->rop.read(op.offset3 * block_size, - op.length3 * block_size, - &op_info->bl3, nullptr); - auto read3_cb = [this, op_info] (boost::system::error_code ec, - version_t ver, - bufferlist bl) { - ceph_assert(ec == boost::system::errc::success); - ceph_assert(db->validate(op_info->bl1, - op_info->offset1, - op_info->length1)); - ceph_assert(db->validate(op_info->bl2, - op_info->offset2, - op_info->length2)); - ceph_assert(db->validate(op_info->bl3, - op_info->offset3, - op_info->length3)); - finish_io(); - }; - librados::async_operate(asio, io, oid, - &op_info->rop, 0, nullptr, read3_cb); - num_io++; + auto applyFailedWriteOp = [this]<OpType opType, int N>( + ReadWriteOp<opType, N> writeOp) { + auto op_info = + std::make_shared<AsyncOpInfo<N>>(writeOp.offset, writeOp.length); + for (int i = 0; i < N; i++) { + op_info->bufferlist[i] = + db->generate_data(writeOp.offset[i], writeOp.length[i]); + op_info->wop.write(writeOp.offset[i] * block_size, + op_info->bufferlist[i]); } - break; + auto write_cb = [this, writeOp](boost::system::error_code ec, + version_t ver) { + ceph_assert(ec != boost::system::errc::success); + finish_io(); + }; + librados::async_operate(asio, io, oid, &op_info->wop, 0, nullptr, write_cb); + num_io++; + }; - case OpType::WRITE: - { + switch (op.getOpType()) { + case OpType::Read: { start_io(); - op_info = std::make_shared<AsyncOpInfo>(op.offset1, op.length1); - op_info->bl1 = db->generate_data(op.offset1, op.length1); - - op_info->wop.write(op.offset1 * block_size, op_info->bl1); - auto write_cb = [this] (boost::system::error_code ec, - version_t ver) { - ceph_assert(ec == boost::system::errc::success); - finish_io(); - }; - librados::async_operate(asio, io, oid, - &op_info->wop, 0, nullptr, write_cb); - num_io++; + SingleReadOp& readOp = static_cast<SingleReadOp&>(op); + applyReadOp(readOp); + break; } - break; - - case OpType::WRITE2: - { + case OpType::Read2: { start_io(); - op_info = std::make_shared<AsyncOpInfo>(op.offset1, op.length1, - op.offset2, op.length2); - op_info->bl1 = db->generate_data(op.offset1, op.length1); - op_info->bl2 = db->generate_data(op.offset2, op.length2); - op_info->wop.write(op.offset1 * block_size, op_info->bl1); - op_info->wop.write(op.offset2 * block_size, op_info->bl2); - auto write2_cb = [this] (boost::system::error_code ec, - version_t ver) { - ceph_assert(ec == boost::system::errc::success); - finish_io(); - }; - librados::async_operate(asio, io, oid, - &op_info->wop, 0, nullptr, write2_cb); - num_io++; + DoubleReadOp& readOp = static_cast<DoubleReadOp&>(op); + applyReadOp(readOp); + break; + } + case OpType::Read3: { + start_io(); + TripleReadOp& readOp = static_cast<TripleReadOp&>(op); + applyReadOp(readOp); + break; + } + case OpType::Write: { + start_io(); + SingleWriteOp& writeOp = static_cast<SingleWriteOp&>(op); + applyWriteOp(writeOp); + break; + } + case OpType::Write2: { + start_io(); + DoubleWriteOp& writeOp = static_cast<DoubleWriteOp&>(op); + applyWriteOp(writeOp); + break; + } + case OpType::Write3: { + start_io(); + TripleWriteOp& writeOp = static_cast<TripleWriteOp&>(op); + applyWriteOp(writeOp); + break; } - break; - case OpType::WRITE3: - { + case OpType::FailedWrite: { start_io(); - op_info = std::make_shared<AsyncOpInfo>(op.offset1, op.length1, - op.offset2, op.length2, - op.offset3, op.length3); - op_info->bl1 = db->generate_data(op.offset1, op.length1); - op_info->bl2 = db->generate_data(op.offset2, op.length2); - op_info->bl3 = db->generate_data(op.offset3, op.length3); - op_info->wop.write(op.offset1 * block_size, op_info->bl1); - op_info->wop.write(op.offset2 * block_size, op_info->bl2); - op_info->wop.write(op.offset3 * block_size, op_info->bl3); - auto write3_cb = [this] (boost::system::error_code ec, - version_t ver) { - ceph_assert(ec == boost::system::errc::success); - finish_io(); - }; - librados::async_operate(asio, io, oid, - &op_info->wop, 0, nullptr, write3_cb); - num_io++; + SingleFailedWriteOp& writeOp = static_cast<SingleFailedWriteOp&>(op); + applyFailedWriteOp(writeOp); + break; + } + case OpType::FailedWrite2: { + start_io(); + DoubleFailedWriteOp& writeOp = static_cast<DoubleFailedWriteOp&>(op); + applyFailedWriteOp(writeOp); + break; + } + case OpType::FailedWrite3: { + start_io(); + TripleFailedWriteOp& writeOp = static_cast<TripleFailedWriteOp&>(op); + applyFailedWriteOp(writeOp); + break; } - break; - default: - break; + default: + ceph_abort_msg( + fmt::format("Unsupported Read/Write operation ({})", op.getOpType())); + break; } } + +void RadosIo::applyInjectOp(IoOp& op) { + bufferlist osdmap_inbl, inject_inbl, osdmap_outbl, inject_outbl; + auto formatter = std::make_unique<JSONFormatter>(false); + + int osd = -1; + std::vector<int> shard_order; + + ceph::messaging::osd::OSDMapRequest osdMapRequest{pool, get_oid(), ""}; + int rc = send_mon_command(osdMapRequest, rados, "OSDMapRequest", osdmap_inbl, + &osdmap_outbl, formatter.get()); + ceph_assert(rc == 0); + + JSONParser p; + bool success = p.parse(osdmap_outbl.c_str(), osdmap_outbl.length()); + ceph_assert(success); + + ceph::messaging::osd::OSDMapReply reply; + reply.decode_json(&p); + + osd = reply.acting_primary; + shard_order = reply.acting; + + switch (op.getOpType()) { + case OpType::InjectReadError: { + InjectReadErrorOp& errorOp = static_cast<InjectReadErrorOp&>(op); + + if (errorOp.type == 0) { + ceph::messaging::osd::InjectECErrorRequest<InjectOpType::ReadEIO> + injectErrorRequest{pool, oid, errorOp.shard, + errorOp.type, errorOp.when, errorOp.duration}; + int rc = send_osd_command(osd, injectErrorRequest, rados, + "InjectECErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + } else if (errorOp.type == 1) { + ceph::messaging::osd::InjectECErrorRequest< + InjectOpType::ReadMissingShard> + injectErrorRequest{pool, oid, errorOp.shard, + errorOp.type, errorOp.when, errorOp.duration}; + int rc = send_osd_command(osd, injectErrorRequest, rados, + "InjectECErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + } else { + ceph_abort_msg("Unsupported inject type"); + } + break; + } + case OpType::InjectWriteError: { + InjectWriteErrorOp& errorOp = static_cast<InjectWriteErrorOp&>(op); + + if (errorOp.type == 0) { + ceph::messaging::osd::InjectECErrorRequest< + InjectOpType::WriteFailAndRollback> + injectErrorRequest{pool, oid, errorOp.shard, + errorOp.type, errorOp.when, errorOp.duration}; + int rc = send_osd_command(osd, injectErrorRequest, rados, + "InjectECErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + } else if (errorOp.type == 3) { + ceph::messaging::osd::InjectECErrorRequest<InjectOpType::WriteOSDAbort> + injectErrorRequest{pool, oid, errorOp.shard, + errorOp.type, errorOp.when, errorOp.duration}; + int rc = send_osd_command(osd, injectErrorRequest, rados, + "InjectECErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + + // This inject is sent directly to the shard we want to inject the error + // on + osd = shard_order[errorOp.shard]; + } else { + ceph_abort("Unsupported inject type"); + } + + break; + } + case OpType::ClearReadErrorInject: { + ClearReadErrorInjectOp& errorOp = + static_cast<ClearReadErrorInjectOp&>(op); + + if (errorOp.type == 0) { + ceph::messaging::osd::InjectECClearErrorRequest<InjectOpType::ReadEIO> + clearErrorInject{pool, oid, errorOp.shard, errorOp.type}; + int rc = send_osd_command(osd, clearErrorInject, rados, + "InjectECClearErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + } else if (errorOp.type == 1) { + ceph::messaging::osd::InjectECClearErrorRequest< + InjectOpType::ReadMissingShard> + clearErrorInject{pool, oid, errorOp.shard, errorOp.type}; + int rc = send_osd_command(osd, clearErrorInject, rados, + "InjectECClearErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + } else { + ceph_abort("Unsupported inject type"); + } + + break; + } + case OpType::ClearWriteErrorInject: { + ClearReadErrorInjectOp& errorOp = + static_cast<ClearReadErrorInjectOp&>(op); + + if (errorOp.type == 0) { + ceph::messaging::osd::InjectECClearErrorRequest< + InjectOpType::WriteFailAndRollback> + clearErrorInject{pool, oid, errorOp.shard, errorOp.type}; + int rc = send_osd_command(osd, clearErrorInject, rados, + "InjectECClearErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + } else if (errorOp.type == 3) { + ceph::messaging::osd::InjectECClearErrorRequest< + InjectOpType::WriteOSDAbort> + clearErrorInject{pool, oid, errorOp.shard, errorOp.type}; + int rc = send_osd_command(osd, clearErrorInject, rados, + "InjectECClearErrorRequest", inject_inbl, + &inject_outbl, formatter.get()); + ceph_assert(rc == 0); + } else { + ceph_abort("Unsupported inject type"); + } + + break; + } + default: + ceph_abort_msg( + fmt::format("Unsupported inject operation ({})", op.getOpType())); + break; + } +}
\ No newline at end of file |