summaryrefslogtreecommitdiffstats
path: root/src/cls
diff options
context:
space:
mode:
authorAli Masarwa <ali.saed.masarwa@gmail.com>2023-07-13 13:17:48 +0200
committerAli Masarwa <ali.saed.masarwa@gmail.com>2023-07-17 17:13:50 +0200
commit10addc6485348d03dbd8ea022ce4221ff2f44cd0 (patch)
tree10e5d1970d9c501442aba1fcb86171e61d723814 /src/cls
parentMerge pull request #52087 from AliMasarweh/wip-alimasa-persistant-q-observabi... (diff)
downloadceph-10addc6485348d03dbd8ea022ce4221ff2f44cd0.tar.xz
ceph-10addc6485348d03dbd8ea022ce4221ff2f44cd0.zip
RGW: fix issue in observability over the persistent topics queue
when releasing entires, we don't decrease them in the urgent data Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
Diffstat (limited to 'src/cls')
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue.cc24
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.cc5
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.h2
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_ops.h22
4 files changed, 46 insertions, 7 deletions
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc
index 45710f9abe3..019f2c96dea 100644
--- a/src/cls/2pc_queue/cls_2pc_queue.cc
+++ b/src/cls/2pc_queue/cls_2pc_queue.cc
@@ -75,7 +75,7 @@ static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *
auto in_iter = head.bl_urgent_data.cbegin();
decode(urgent_data, in_iter);
} catch (ceph::buffer::error& err) {
- CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what());
+ CLS_LOG(1, "ERROR: cls_2pc_queue_get_topic_stats: failed to decode header of queue: %s", err.what());
return -EINVAL;
}
op_ret.queue_entries = urgent_data.committed_entries;
@@ -581,9 +581,9 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in,
static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
auto in_iter = in->cbegin();
- cls_queue_remove_op op;
+ cls_2pc_queue_remove_op rem_2pc_op;
try {
- decode(op, in_iter);
+ decode(rem_2pc_op, in_iter);
} catch (ceph::buffer::error& err) {
CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what());
return -EINVAL;
@@ -594,10 +594,26 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i
if (ret < 0) {
return ret;
}
- ret = queue_remove_entries(hctx, op, head);
+ cls_queue_remove_op rem_op;
+ rem_op.end_marker = std::move(rem_2pc_op.end_marker);
+ ret = queue_remove_entries(hctx, rem_op, head);
if (ret < 0) {
return ret;
}
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode header of queue: %s", err.what());
+ return -EINVAL;
+ }
+ urgent_data.committed_entries -= rem_2pc_op.entries_to_remove;
+ // write back head
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+
return queue_write_head(hctx, head);
}
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc
index 42632cba61a..5b457b291ca 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_client.cc
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc
@@ -226,10 +226,11 @@ void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, i
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
}
-void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
+void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove) {
bufferlist in;
- cls_queue_remove_op rem_op;
+ cls_2pc_queue_remove_op rem_op;
rem_op.end_marker = end_marker;
+ rem_op.entries_to_remove = entries_to_remove;
encode(rem_op, in);
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
}
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h
index 20043edd200..c806d30f59e 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_client.h
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.h
@@ -87,5 +87,5 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op,
ceph::coarse_real_time stale_time);
// remove all entries up to the given marker
-void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
+void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove);
diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h
index d0b84193d5c..bb61ef341ac 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_ops.h
+++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h
@@ -115,3 +115,25 @@ struct cls_2pc_queue_reservations_ret {
}
};
WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret)
+
+struct cls_2pc_queue_remove_op {
+ std::string end_marker;
+ uint32_t entries_to_remove;
+
+ cls_2pc_queue_remove_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(end_marker, bl);
+ encode(entries_to_remove, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(end_marker, bl);
+ decode(entries_to_remove, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_remove_op) \ No newline at end of file