diff options
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue.cc | 24 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_client.cc | 5 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_client.h | 2 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_ops.h | 22 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_notify.cc | 6 | ||||
-rw-r--r-- | src/test/cls_2pc_queue/test_cls_2pc_queue.cc | 6 | ||||
-rw-r--r-- | src/test/cls_queue/test_cls_queue.cc | 18 | ||||
-rw-r--r-- | src/test/rgw/bucket_notification/test_bn.py | 25 |
8 files changed, 85 insertions, 23 deletions
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index 45710f9abe3..019f2c96dea 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -75,7 +75,7 @@ static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist * auto in_iter = head.bl_urgent_data.cbegin(); decode(urgent_data, in_iter); } catch (ceph::buffer::error& err) { - CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what()); + CLS_LOG(1, "ERROR: cls_2pc_queue_get_topic_stats: failed to decode header of queue: %s", err.what()); return -EINVAL; } op_ret.queue_entries = urgent_data.committed_entries; @@ -581,9 +581,9 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { auto in_iter = in->cbegin(); - cls_queue_remove_op op; + cls_2pc_queue_remove_op rem_2pc_op; try { - decode(op, in_iter); + decode(rem_2pc_op, in_iter); } catch (ceph::buffer::error& err) { CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what()); return -EINVAL; @@ -594,10 +594,26 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i if (ret < 0) { return ret; } - ret = queue_remove_entries(hctx, op, head); + cls_queue_remove_op rem_op; + rem_op.end_marker = std::move(rem_2pc_op.end_marker); + ret = queue_remove_entries(hctx, rem_op, head); if (ret < 0) { return ret; } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode header of queue: %s", err.what()); + return -EINVAL; + } + urgent_data.committed_entries -= rem_2pc_op.entries_to_remove; + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + return queue_write_head(hctx, head); } diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc index 42632cba61a..5b457b291ca 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.cc +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -226,10 +226,11 @@ void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, i op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval); } -void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) { +void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove) { bufferlist in; - cls_queue_remove_op rem_op; + cls_2pc_queue_remove_op rem_op; rem_op.end_marker = end_marker; + rem_op.entries_to_remove = entries_to_remove; encode(rem_op, in); op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in); } diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h index 20043edd200..c806d30f59e 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.h +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -87,5 +87,5 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, ceph::coarse_real_time stale_time); // remove all entries up to the given marker -void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker); +void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove); diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h index d0b84193d5c..bb61ef341ac 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_ops.h +++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h @@ -115,3 +115,25 @@ struct cls_2pc_queue_reservations_ret { } }; WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret) + +struct cls_2pc_queue_remove_op { + std::string end_marker; + uint32_t entries_to_remove; + + cls_2pc_queue_remove_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(end_marker, bl); + encode(entries_to_remove, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(end_marker, bl); + decode(entries_to_remove, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_remove_op)
\ No newline at end of file diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 324fb4460bb..184d9b38f6d 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -305,6 +305,7 @@ private: is_idle = false; auto has_error = false; auto remove_entries = false; + uint64_t entries_to_remove = 0; auto entry_idx = 1U; tokens_waiter waiter(io_context); for (auto& entry : entries) { @@ -313,12 +314,13 @@ private: break; } // TODO pass entry pointer instead of by-value - spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) { + spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &entries_to_remove, &has_error, &waiter, entry](yield_context yield) { const auto token = waiter.make_token(); if (process_entry(entry, yield)) { ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl; remove_entries = true; + ++entries_to_remove; } else { if (set_min_marker(end_marker, entry.marker) < 0) { ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; @@ -344,7 +346,7 @@ private: ClsLockType::EXCLUSIVE, lock_cookie, "" /*no tag*/); - cls_2pc_queue_remove_entries(op, end_marker); + cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove); // check ownership and deleted entries in one batch const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); if (ret == -ENOENT) { diff --git a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc index a0e83aacb63..d3570bf9c55 100644 --- a/src/test/cls_2pc_queue/test_cls_2pc_queue.cc +++ b/src/test/cls_2pc_queue/test_cls_2pc_queue.cc @@ -716,7 +716,7 @@ TEST_F(TestCls2PCQueue, MultiProducer) const auto ret = cls_2pc_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); consume_count += entries.size(); - cls_2pc_queue_remove_entries(op, end_marker); + cls_2pc_queue_remove_entries(op, end_marker, max_elements); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } }); @@ -771,7 +771,7 @@ TEST_F(TestCls2PCQueue, AsyncConsumer) ASSERT_EQ(rc, 0); ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0); consume_count += entries.size(); - cls_2pc_queue_remove_entries(wop, end_marker); + cls_2pc_queue_remove_entries(wop, end_marker, max_elements); marker = end_marker; } @@ -849,7 +849,7 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer) // queue is empty, let it fill std::this_thread::sleep_for(std::chrono::milliseconds(100)); } else { - cls_2pc_queue_remove_entries(op, end_marker); + cls_2pc_queue_remove_entries(op, end_marker, max_elements); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } } diff --git a/src/test/cls_queue/test_cls_queue.cc b/src/test/cls_queue/test_cls_queue.cc index 5dbbccb82fd..cca615afb0a 100644 --- a/src/test/cls_queue/test_cls_queue.cc +++ b/src/test/cls_queue/test_cls_queue.cc @@ -191,7 +191,7 @@ TEST_F(TestClsQueue, DequeueMarker) ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0); if (marker.offset > 0 && marker.offset % 2 == 0) { after_deleted_marker = marker; - cls_queue_remove_entries(op, marker.to_str()); + cls_queue_remove_entries(op, marker.to_str()); } } ASSERT_EQ(0, ioctx.operate(queue_name, &op)); @@ -243,7 +243,7 @@ TEST_F(TestClsQueue, DequeueEmpty) std::vector<cls_queue_entry> entries; const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } @@ -289,7 +289,7 @@ TEST_F(TestClsQueue, DeleteAll) std::vector<cls_queue_entry> entries; auto ret = cls_queue_list_entries(ioctx, queue_name, marker, total_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); // list again to make sure that queue is empty ret = cls_queue_list_entries(ioctx, queue_name, marker, 10, entries, &truncated, end_marker); @@ -328,7 +328,7 @@ TEST_F(TestClsQueue, EnqueueDequeue) const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); consume_count += entries.size(); - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } }); @@ -387,7 +387,7 @@ TEST_F(TestClsQueue, QueueFullDequeue) auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); consume_count += entries.size(); - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } }); @@ -431,7 +431,7 @@ TEST_F(TestClsQueue, MultiProducer) const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); consume_count += entries.size(); - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } }); @@ -478,7 +478,7 @@ TEST_F(TestClsQueue, MultiConsumer) const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); consume_count += entries.size(); - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } }); @@ -521,7 +521,7 @@ TEST_F(TestClsQueue, NoLockMultiConsumer) while (!done || truncated) { const auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker); ASSERT_EQ(0, ret); - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); } }); @@ -587,7 +587,7 @@ TEST_F(TestClsQueue, WrapAround) total_bl.pop_front(); } marker = end_marker; - cls_queue_remove_entries(op, end_marker); + cls_queue_remove_entries(op, end_marker); ASSERT_EQ(0, ioctx.operate(queue_name, &op)); // fill half+1 of the queue diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 3fd249f61f0..a51064dbb52 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -3031,7 +3031,10 @@ def test_ps_s3_persistent_topic_stats(): # create random port for the http server host = get_ip() - port = random.randint(10000, 20000) + http_port = random.randint(10000, 20000) + + # start an http server in a separate thread + http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5) # create bucket bucket_name = gen_bucket_name() @@ -3039,7 +3042,7 @@ def test_ps_s3_persistent_topic_stats(): topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic - endpoint_address = 'http://'+host+':'+str(port) + endpoint_address = 'http://'+host+':'+str(http_port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() @@ -3053,6 +3056,10 @@ def test_ps_s3_persistent_topic_stats(): response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) + delay = 20 + time.sleep(delay) + http_server.close() + # topic stats result = admin(['topic', 'stats', '--topic', topic_name]) parsed_result = json.loads(result[0]) @@ -3101,11 +3108,25 @@ def test_ps_s3_persistent_topic_stats(): assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects) assert_equal(result[1], 0) + # start an http server in a separate thread + http_server = StreamingHTTPServer(host, http_port, num_workers=10, delay=0.5) + + print('wait for '+str(delay)+'sec for the messages...') + time.sleep(delay) + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket conn.delete_bucket(bucket_name) + time.sleep(delay) + http_server.close() @attr('manual_test') def test_ps_s3_persistent_notification_pushback(): |