summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYuval Lifshitz <ylifshit@ibm.com>2024-08-15 10:00:20 +0200
committerGitHub <noreply@github.com>2024-08-15 10:00:20 +0200
commitcb8d669457b6f7f562c0189200a9be68b2ee3726 (patch)
tree67f171f7aaf776ab3b9176cdffeff4c91430fdeb
parentMerge pull request #59219 from yuvalif/wip-yuval-50610 (diff)
parenttest/cls_2pc_queue: prevent list+remove race between consumers (diff)
downloadceph-cb8d669457b6f7f562c0189200a9be68b2ee3726.tar.xz
ceph-cb8d669457b6f7f562c0189200a9be68b2ee3726.zip
Merge pull request #58911 from yuvalif/wip-yuval-67229
test/cls_2pc_queue: prevent list+remove race between consumers Reviewed-By: Casey Bodley <cbodley@ibm.com>
-rw-r--r--qa/suites/rgw/verify/tasks/cls.yaml5
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue.cc4
-rw-r--r--src/test/cls_2pc_queue/test_cls_2pc_queue.cc46
3 files changed, 40 insertions, 15 deletions
diff --git a/qa/suites/rgw/verify/tasks/cls.yaml b/qa/suites/rgw/verify/tasks/cls.yaml
index 8034715353f..26f948d42ec 100644
--- a/qa/suites/rgw/verify/tasks/cls.yaml
+++ b/qa/suites/rgw/verify/tasks/cls.yaml
@@ -1,3 +1,8 @@
+overrides:
+ ceph:
+ conf:
+ osd:
+ debug objclass: 20
tasks:
- workunit:
clients:
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc
index 759d360b014..6e6b6e02db5 100644
--- a/src/cls/2pc_queue/cls_2pc_queue.cc
+++ b/src/cls/2pc_queue/cls_2pc_queue.cc
@@ -616,10 +616,10 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i
list_op.end_marker = rem_2pc_op.end_marker;
ret = cls_2pc_queue_count_entries(hctx, list_op, head, rem_2pc_op.entries_to_remove);
if (ret < 0) {
- CLS_LOG(1, "ERROR: cls_2pc_queue_count_entries: returned: %d", ret);
+ CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: returned: %d", ret);
return ret;
}
- CLS_LOG(10, "INFO: cls_2pc_queue_count_entries: counted: %u", rem_2pc_op.entries_to_remove);
+ CLS_LOG(10, "INFO: cls_2pc_queue_remove_entries: counted: %u", rem_2pc_op.entries_to_remove);
}
cls_queue_remove_op rem_op;
diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
index f0c71c7492e..9d988a49808 100644
--- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
+++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
@@ -964,34 +964,54 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
}
const auto max_elements = 128;
- std::vector<std::thread> consumers(max_workers/2);
- for (auto& c : consumers) {
- c = std::thread([this, &queue_name, &producer_count] {
+ std::vector<std::thread> readers(max_workers/2);
+ for (auto& c : readers) {
+ c = std::thread([this, &queue_name, &producer_count, &retry_happened] {
librados::ObjectWriteOperation op;
const std::string marker;
bool truncated = true;
std::string end_marker;
std::vector<cls_queue_entry> entries;
while (producer_count > 0 || truncated) {
+ if (!retry_happened) {
+ // queue was never full, let it fill
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ continue;
+ }
const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
if (entries.empty()) {
- // queue is empty, let it fill
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- } else {
- cls_2pc_queue_remove_entries(op, end_marker, max_elements);
- ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ // another consumer has emptied the queue
+ return;
}
}
});
}
+
+ auto deleter = std::thread([this, &queue_name, &producer_count, &retry_happened] {
+ librados::ObjectWriteOperation op;
+ const std::string marker;
+ bool truncated = true;
+ std::string end_marker;
+ std::vector<cls_queue_entry> entries;
+ while (producer_count > 0 || truncated) {
+ if (!retry_happened) {
+ // queue was never full, let it fill
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ continue;
+ }
+ const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
+ ASSERT_EQ(0, ret);
+ ASSERT_FALSE(entries.empty());
+ cls_2pc_queue_remove_entries(op, end_marker, max_elements);
+ ASSERT_EQ(0, ioctx.operate(queue_name, &op));
+ }
+ });
std::for_each(producers.begin(), producers.end(), [](auto& p) { p.join(); });
- std::for_each(consumers.begin(), consumers.end(), [](auto& c) { c.join(); });
- if (!retry_happened) {
- std::cerr << "Queue was never full - all reservations were successful." <<
- "Please decrease the amount of consumer threads" << std::endl;
- }
+ std::for_each(readers.begin(), readers.end(), [](auto& c) { c.join(); });
+ deleter.join();
+ ASSERT_TRUE(retry_happened);
// make sure that queue is empty and no reservations remain
cls_2pc_reservations reservations;
ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));