diff options
author | Ali Masarwa <ali.saed.masarwa@gmail.com> | 2023-07-13 13:17:48 +0200 |
---|---|---|
committer | Ali Masarwa <ali.saed.masarwa@gmail.com> | 2023-07-17 17:13:50 +0200 |
commit | 10addc6485348d03dbd8ea022ce4221ff2f44cd0 (patch) | |
tree | 10e5d1970d9c501442aba1fcb86171e61d723814 /src/cls | |
parent | Merge pull request #52087 from AliMasarweh/wip-alimasa-persistant-q-observabi... (diff) | |
download | ceph-10addc6485348d03dbd8ea022ce4221ff2f44cd0.tar.xz ceph-10addc6485348d03dbd8ea022ce4221ff2f44cd0.zip |
RGW: fix issue in observability over the persistent topics queue
when releasing entires, we don't decrease them in the urgent data
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
Diffstat (limited to 'src/cls')
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue.cc | 24 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_client.cc | 5 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_client.h | 2 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_ops.h | 22 |
4 files changed, 46 insertions, 7 deletions
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index 45710f9abe3..019f2c96dea 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -75,7 +75,7 @@ static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist * auto in_iter = head.bl_urgent_data.cbegin(); decode(urgent_data, in_iter); } catch (ceph::buffer::error& err) { - CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what()); + CLS_LOG(1, "ERROR: cls_2pc_queue_get_topic_stats: failed to decode header of queue: %s", err.what()); return -EINVAL; } op_ret.queue_entries = urgent_data.committed_entries; @@ -581,9 +581,9 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { auto in_iter = in->cbegin(); - cls_queue_remove_op op; + cls_2pc_queue_remove_op rem_2pc_op; try { - decode(op, in_iter); + decode(rem_2pc_op, in_iter); } catch (ceph::buffer::error& err) { CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what()); return -EINVAL; @@ -594,10 +594,26 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i if (ret < 0) { return ret; } - ret = queue_remove_entries(hctx, op, head); + cls_queue_remove_op rem_op; + rem_op.end_marker = std::move(rem_2pc_op.end_marker); + ret = queue_remove_entries(hctx, rem_op, head); if (ret < 0) { return ret; } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode header of queue: %s", err.what()); + return -EINVAL; + } + urgent_data.committed_entries -= rem_2pc_op.entries_to_remove; + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + return queue_write_head(hctx, head); } diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc index 42632cba61a..5b457b291ca 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.cc +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -226,10 +226,11 @@ void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, i op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval); } -void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) { +void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove) { bufferlist in; - cls_queue_remove_op rem_op; + cls_2pc_queue_remove_op rem_op; rem_op.end_marker = end_marker; + rem_op.entries_to_remove = entries_to_remove; encode(rem_op, in); op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in); } diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h index 20043edd200..c806d30f59e 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.h +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -87,5 +87,5 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, ceph::coarse_real_time stale_time); // remove all entries up to the given marker -void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker); +void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove); diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h index d0b84193d5c..bb61ef341ac 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_ops.h +++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h @@ -115,3 +115,25 @@ struct cls_2pc_queue_reservations_ret { } }; WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret) + +struct cls_2pc_queue_remove_op { + std::string end_marker; + uint32_t entries_to_remove; + + cls_2pc_queue_remove_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(end_marker, bl); + encode(entries_to_remove, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(end_marker, bl); + decode(entries_to_remove, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_remove_op)
\ No newline at end of file |