diff options
author | Yuval Lifshitz <ylifshit@ibm.com> | 2024-08-15 10:00:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-15 10:00:20 +0200 |
commit | cb8d669457b6f7f562c0189200a9be68b2ee3726 (patch) | |
tree | 67f171f7aaf776ab3b9176cdffeff4c91430fdeb | |
parent | Merge pull request #59219 from yuvalif/wip-yuval-50610 (diff) | |
parent | test/cls_2pc_queue: prevent list+remove race between consumers (diff) | |
download | ceph-cb8d669457b6f7f562c0189200a9be68b2ee3726.tar.xz ceph-cb8d669457b6f7f562c0189200a9be68b2ee3726.zip |
Merge pull request #58911 from yuvalif/wip-yuval-67229
test/cls_2pc_queue: prevent list+remove race between consumers
Reviewed-By: Casey Bodley <cbodley@ibm.com>
-rw-r--r-- | qa/suites/rgw/verify/tasks/cls.yaml | 5 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue.cc | 4 | ||||
-rw-r--r-- | src/test/cls_2pc_queue/test_cls_2pc_queue.cc | 46 |
3 files changed, 40 insertions, 15 deletions
diff --git a/qa/suites/rgw/verify/tasks/cls.yaml b/qa/suites/rgw/verify/tasks/cls.yaml index 8034715353f..26f948d42ec 100644 --- a/qa/suites/rgw/verify/tasks/cls.yaml +++ b/qa/suites/rgw/verify/tasks/cls.yaml @@ -1,3 +1,8 @@ +overrides: + ceph: + conf: + osd: + debug objclass: 20 tasks: - workunit: clients: diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index 759d360b014..6e6b6e02db5 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -616,10 +616,10 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i list_op.end_marker = rem_2pc_op.end_marker; ret = cls_2pc_queue_count_entries(hctx, list_op, head, rem_2pc_op.entries_to_remove); if (ret < 0) { - CLS_LOG(1, "ERROR: cls_2pc_queue_count_entries: returned: %d", ret); + CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: returned: %d", ret); return ret; } - CLS_LOG(10, "INFO: cls_2pc_queue_count_entries: counted: %u", rem_2pc_op.entries_to_remove); + CLS_LOG(10, "INFO: cls_2pc_queue_remove_entries: counted: %u", rem_2pc_op.entries_to_remove); } cls_queue_remove_op rem_op; diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc index f0c71c7492e..9d988a49808 100644 --- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc +++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc @@ -964,34 +964,54 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) } const auto max_elements = 128; - std::vector<std::thread> consumers(max_workers/2); - for (auto& c : consumers) { - c = std::thread([this, &queue_name, &producer_count] { + std::vector<std::thread> readers(max_workers/2); + for (auto& c : readers) { + c = std::thread([this, &queue_name, &producer_count, &retry_happened] { librados::ObjectWriteOperation op; const std::string marker; bool truncated = true; std::string end_marker; std::vector<cls_queue_entry> entries; while (producer_count > 0 || truncated) { + if (!retry_happened) { + // queue was never full, let it fill + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); if (entries.empty()) { - // queue is empty, let it fill - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } else { - cls_2pc_queue_remove_entries(op, end_marker, max_elements); - ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + // another consumer has emptied the queue + return; } } }); } + + auto deleter = std::thread([this, &queue_name, &producer_count, &retry_happened] { + librados::ObjectWriteOperation op; + const std::string marker; + bool truncated = true; + std::string end_marker; + std::vector<cls_queue_entry> entries; + while (producer_count > 0 || truncated) { + if (!retry_happened) { + // queue was never full, let it fill + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); + ASSERT_EQ(0, ret); + ASSERT_FALSE(entries.empty()); + cls_2pc_queue_remove_entries(op, end_marker, max_elements); + ASSERT_EQ(0, ioctx.operate(queue_name, &op)); + } + }); std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); }); - std::for_each(consumers.begin(), consumers.end(), [](auto& c) { c.join(); }); - if (!retry_happened) { - std::cerr << "Queue was never full - all reservations were successful." << - "Please decrease the amount of consumer threads" << std::endl; - } + std::for_each(readers.begin(), readers.end(), [](auto& c) { c.join(); }); + deleter.join(); + ASSERT_TRUE(retry_happened); // make sure that queue is empty and no reservations remain cls_2pc_reservations reservations; ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations)); |