summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue.cc24
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.cc5
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.h2
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_ops.h22
-rw-r--r--src/rgw/driver/rados/rgw_notify.cc6
-rw-r--r--src/test/cls_2pc_queue/test_cls_2pc_queue.cc6
-rw-r--r--src/test/cls_queue/test_cls_queue.cc18
-rw-r--r--src/test/rgw/bucket_notification/test_bn.py25
8 files changed, 85 insertions, 23 deletions
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc
index 45710f9abe3..019f2c96dea 100644
--- a/src/cls/2pc_queue/cls_2pc_queue.cc
+++ b/src/cls/2pc_queue/cls_2pc_queue.cc
@@ -75,7 +75,7 @@ static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *
auto in_iter = head.bl_urgent_data.cbegin();
decode(urgent_data, in_iter);
} catch (ceph::buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what());
+ CLS_LOG(1, "ERROR: cls_2pc_queue_get_topic_stats: failed to decode header of queue: %s", err.what());
return -EINVAL;
}
op_ret.queue_entries = urgent_data.committed_entries;
@@ -581,9 +581,9 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in,
static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
auto in_iter = in->cbegin();
- cls_queue_remove_op op;
+ cls_2pc_queue_remove_op rem_2pc_op;
try {
- decode(op, in_iter);
+ decode(rem_2pc_op, in_iter);
} catch (ceph::buffer::error& err) {
CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what());
return -EINVAL;
@@ -594,10 +594,26 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i
if (ret < 0) {
return ret;
}
- ret = queue_remove_entries(hctx, op, head);
+ cls_queue_remove_op rem_op;
+ rem_op.end_marker = std::move(rem_2pc_op.end_marker);
+ ret = queue_remove_entries(hctx, rem_op, head);
if (ret < 0) {
return ret;
}
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode header of queue: %s", err.what());
+ return -EINVAL;
+ }
+ urgent_data.committed_entries -= rem_2pc_op.entries_to_remove;
+ // write back head
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+
return queue_write_head(hctx, head);
}
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc
index 42632cba61a..5b457b291ca 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_client.cc
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc
@@ -226,10 +226,11 @@ void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, i
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
}
-void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
+void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove) {
bufferlist in;
- cls_queue_remove_op rem_op;
+ cls_2pc_queue_remove_op rem_op;
rem_op.end_marker = end_marker;
+ rem_op.entries_to_remove = entries_to_remove;
encode(rem_op, in);
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
}
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h
index 20043edd200..c806d30f59e 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_client.h
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.h
@@ -87,5 +87,5 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op,
ceph::coarse_real_time stale_time);
// remove all entries up to the given marker
-void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
+void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove);
diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h
index d0b84193d5c..bb61ef341ac 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_ops.h
+++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h
@@ -115,3 +115,25 @@ struct cls_2pc_queue_reservations_ret {
}
};
WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret)
+
+struct cls_2pc_queue_remove_op {
+ std::string end_marker;
+ uint32_t entries_to_remove;
+
+ cls_2pc_queue_remove_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(end_marker, bl);
+ encode(entries_to_remove, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(end_marker, bl);
+ decode(entries_to_remove, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_remove_op) \ No newline at end of file
diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc
index 324fb4460bb..184d9b38f6d 100644
--- a/src/rgw/driver/rados/rgw_notify.cc
+++ b/src/rgw/driver/rados/rgw_notify.cc
@@ -305,6 +305,7 @@ private:
is_idle = false;
auto has_error = false;
auto remove_entries = false;
+ uint64_t entries_to_remove = 0;
auto entry_idx = 1U;
tokens_waiter waiter(io_context);
for (auto& entry : entries) {
@@ -313,12 +314,13 @@ private:
break;
}
// TODO pass entry pointer instead of by-value
- spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
+ spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &entries_to_remove, &has_error, &waiter, entry](yield_context yield) {
const auto token = waiter.make_token();
if (process_entry(entry, yield)) {
ldpp_dout(this, 20) << "INFO: processing of entry: " <<
entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
remove_entries = true;
+ ++entries_to_remove;
} else {
if (set_min_marker(end_marker, entry.marker) < 0) {
ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
@@ -344,7 +346,7 @@ private:
ClsLockType::EXCLUSIVE,
lock_cookie,
"" /*no tag*/);
- cls_2pc_queue_remove_entries(op, end_marker);
+ cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove);
// check ownership and deleted entries in one batch
const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
index a0e83aacb63..d3570bf9c55 100644
--- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
+++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc
@@ -716,7 +716,7 @@ TEST_F(TestCls2PCQueue, MultiProducer)
const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
consume_count += entries.size();
- cls_2pc_queue_remove_entries(op, end_marker);
+ cls_2pc_queue_remove_entries(op, end_marker, max_elements);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
});
@@ -771,7 +771,7 @@ TEST_F(TestCls2PCQueue, AsyncConsumer)
ASSERT_EQ(rc, 0);
ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
consume_count += entries.size();
- cls_2pc_queue_remove_entries(wop, end_marker);
+ cls_2pc_queue_remove_entries(wop, end_marker, max_elements);
marker = end_marker;
}
@@ -849,7 +849,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
// queue is empty, let it fill
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
- cls_2pc_queue_remove_entries(op, end_marker);
+ cls_2pc_queue_remove_entries(op, end_marker, max_elements);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
}
diff --git a/src/test/cls_queue/test_cls_queue.cc b/src/test/cls_queue/test_cls_queue.cc
index 5dbbccb82fd..cca615afb0a 100644
--- a/src/test/cls_queue/test_cls_queue.cc
+++ b/src/test/cls_queue/test_cls_queue.cc
@@ -191,7 +191,7 @@ TEST_F(TestClsQueue, DequeueMarker)
ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
if (marker.offset > 0 && marker.offset % 2 == 0) {
after_deleted_marker = marker;
- cls_queue_remove_entries(op, marker.to_str());
+ cls_queue_remove_entries(op, marker.to_str());
}
}
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
@@ -243,7 +243,7 @@ TEST_F(TestClsQueue, DequeueEmpty)
std::vector<cls_queue_entry> entries;
const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
@@ -289,7 +289,7 @@ TEST_F(TestClsQueue, DeleteAll)
std::vector<cls_queue_entry> entries;
auto ret = cls_queue_list_entries(ioctx, queue_name, marker, total_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
// list again to make sure that queue is empty
ret = cls_queue_list_entries(ioctx, queue_name, marker, 10, entries, &truncated, end_marker);
@@ -328,7 +328,7 @@ TEST_F(TestClsQueue, EnqueueDequeue)
const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
consume_count += entries.size();
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
});
@@ -387,7 +387,7 @@ TEST_F(TestClsQueue, QueueFullDequeue)
auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
consume_count += entries.size();
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
});
@@ -431,7 +431,7 @@ TEST_F(TestClsQueue, MultiProducer)
const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
consume_count += entries.size();
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
});
@@ -478,7 +478,7 @@ TEST_F(TestClsQueue, MultiConsumer)
const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
consume_count += entries.size();
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
});
@@ -521,7 +521,7 @@ TEST_F(TestClsQueue, NoLockMultiConsumer)
while (!done || truncated) {
const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
});
@@ -587,7 +587,7 @@ TEST_F(TestClsQueue, WrapAround)
total_bl.pop_front();
}
marker = end_marker;
- cls_queue_remove_entries(op, end_marker);
+ cls_queue_remove_entries(op, end_marker);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
// fill half+1 of the queue
diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py
index 3fd249f61f0..a51064dbb52 100644
--- a/src/test/rgw/bucket_notification/test_bn.py
+++ b/src/test/rgw/bucket_notification/test_bn.py
@@ -3031,7 +3031,10 @@ def test_ps_s3_persistent_topic_stats():
# create random port for the http server
host = get_ip()
- port = random.randint(10000, 20000)
+ http_port = random.randint(10000, 20000)
+
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5)
# create bucket
bucket_name = gen_bucket_name()
@@ -3039,7 +3042,7 @@ def test_ps_s3_persistent_topic_stats():
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_address = 'http://'+host+':'+str(http_port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
@@ -3053,6 +3056,10 @@ def test_ps_s3_persistent_topic_stats():
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
+ delay = 20
+ time.sleep(delay)
+ http_server.close()
+
# topic stats
result = admin(['topic', 'stats', '--topic', topic_name])
parsed_result = json.loads(result[0])
@@ -3101,11 +3108,25 @@ def test_ps_s3_persistent_topic_stats():
assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
assert_equal(result[1], 0)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5)
+
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
# cleanup
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
+ time.sleep(delay)
+ http_server.close()
@attr('manual_test')
def test_ps_s3_persistent_notification_pushback():