summaryrefslogtreecommitdiffstats
path: root/src/cls
diff options
context:
space:
mode:
Diffstat (limited to 'src/cls')
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue.cc37
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.cc36
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.h7
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_const.h1
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_types.h24
-rw-r--r--src/cls/queue/cls_queue_ops.h22
6 files changed, 117 insertions, 10 deletions
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc
index fba76395542..45710f9abe3 100644
--- a/src/cls/2pc_queue/cls_2pc_queue.cc
+++ b/src/cls/2pc_queue/cls_2pc_queue.cc
@@ -55,6 +55,36 @@ static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in,
return 0;
}
+static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ cls_queue_get_stats_ret op_ret;
+
+ // get head
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+ const auto remaining_size = (head.tail.offset >= head.front.offset) ?
+ (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
+ head.front.offset - head.tail.offset;
+ op_ret.queue_size = head.queue_size - head.max_head_size - remaining_size;
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what());
+ return -EINVAL;
+ }
+ op_ret.queue_entries = urgent_data.committed_entries;
+
+ encode(op_ret, *out);
+
+ return 0;
+}
+
static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
cls_2pc_queue_reserve_op res_op;
try {
@@ -112,7 +142,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
cls_2pc_reservations::iterator last_reservation;
std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct,
std::forward_as_tuple(urgent_data.last_id),
- std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+ std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
if (!result) {
// an old reservation that was never committed or aborted is in the map
// caller should try again assuming other IDs are ok
@@ -148,7 +178,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
}
std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct,
std::forward_as_tuple(urgent_data.last_id),
- std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+ std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
if (!result) {
// an old reservation that was never committed or aborted is in the map
// caller should try again assuming other IDs are ok
@@ -268,6 +298,7 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe
}
urgent_data.reserved_size -= res.size;
+ urgent_data.committed_entries += res.entries;
if (xattr_reservations.empty()) {
// remove the reservation from urgent data
@@ -577,6 +608,7 @@ CLS_INIT(2pc_queue)
cls_handle_t h_class;
cls_method_handle_t h_2pc_queue_init;
cls_method_handle_t h_2pc_queue_get_capacity;
+ cls_method_handle_t h_2pc_queue_get_topic_stats;
cls_method_handle_t h_2pc_queue_reserve;
cls_method_handle_t h_2pc_queue_commit;
cls_method_handle_t h_2pc_queue_abort;
@@ -589,6 +621,7 @@ CLS_INIT(2pc_queue)
cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity);
+ cls_register_cxx_method(h_class, TPC_QUEUE_GET_TOPIC_STATS, CLS_METHOD_RD, cls_2pc_queue_get_topic_stats, &h_2pc_queue_get_topic_stats);
cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc
index 6868b2b6f83..42632cba61a 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_client.cc
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc
@@ -31,6 +31,21 @@ int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
return 0;
}
+int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size) {
+ cls_queue_get_stats_ret op_ret;
+ auto iter = bl.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ committed_entries = op_ret.queue_entries;
+ size = op_ret.queue_size;
+
+ return 0;
+}
+
#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) {
bufferlist in, out;
@@ -44,12 +59,31 @@ int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uin
#endif
// optionally async method for getting capacity (bytes)
-// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
+// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
bufferlist in;
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
}
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_get_topic_stats(IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size) {
+ bufferlist in, out;
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, out);
+ if (r < 0 ) {
+ return r;
+ }
+
+ return cls_2pc_queue_get_topic_stats_result(out, committed_entries, size);
+}
+#endif
+
+// optionally async method for getting number of commited entries and size (bytes)
+// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
+void cls_2pc_queue_get_topic_stats(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+ bufferlist in;
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, obl, prval);
+}
+
int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
cls_2pc_queue_reserve_ret op_ret;
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h
index e0bdeafd590..20043edd200 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_client.h
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.h
@@ -19,6 +19,8 @@ void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& q
#ifndef CLS_CLIENT_HIDE_IOCTX
// return capacity (bytes)
int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t& size);
+// return the number of committed entries and size (bytes)
+int cls_2pc_queue_get_topic_stats(librados::IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size);
// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// return a reservation id if reservations is possible, 0 otherwise
@@ -37,7 +39,12 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string&
// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
+// optionally async method for getting capacity (bytes)
+// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
+void cls_2pc_queue_get_topic_stats(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
+
int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size);
+int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size);
// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// notes:
diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h
index 160c5b66e9f..ea7afa943ca 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_const.h
+++ b/src/cls/2pc_queue/cls_2pc_queue_const.h
@@ -4,6 +4,7 @@
#define TPC_QUEUE_INIT "2pc_queue_init"
#define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity"
+#define TPC_QUEUE_GET_TOPIC_STATS "2pc_queue_get_topic_stats"
#define TPC_QUEUE_RESERVE "2pc_queue_reserve"
#define TPC_QUEUE_COMMIT "2pc_queue_commit"
#define TPC_QUEUE_ABORT "2pc_queue_abort"
diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h
index 7c94cdebfe0..2413fd7043d 100644
--- a/src/cls/2pc_queue/cls_2pc_queue_types.h
+++ b/src/cls/2pc_queue/cls_2pc_queue_types.h
@@ -8,25 +8,30 @@ struct cls_2pc_reservation
{
using id_t = uint32_t;
inline static const id_t NO_ID{0};
- uint64_t size; // how many entries are reserved
+ uint64_t size; // how much size to reserve (bytes)
ceph::coarse_real_time timestamp; // when the reservation was done (used for cleaning stale reservations)
+ uint32_t entries; // how many entries are reserved
- cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) :
- size(_size), timestamp(_timestamp) {}
+ cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp, uint32_t _entries) :
+ size(_size), timestamp(_timestamp), entries(_entries) {}
cls_2pc_reservation() = default;
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(size, bl);
encode(timestamp, bl);
+ encode(entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(size, bl);
decode(timestamp, bl);
+ if (struct_v >= 2) {
+ decode(entries, bl);
+ }
DECODE_FINISH(bl);
}
};
@@ -40,22 +45,27 @@ struct cls_2pc_urgent_data
cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
cls_2pc_reservations reservations; // reservation list (keyed by id)
bool has_xattrs{false};
+ uint32_t committed_entries{0}; // how many entries have been committed so far
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(reserved_size, bl);
encode(last_id, bl);
encode(reservations, bl);
encode(has_xattrs, bl);
+ encode(committed_entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(reserved_size, bl);
decode(last_id, bl);
decode(reservations, bl);
decode(has_xattrs, bl);
+ if (struct_v >= 2) {
+ decode(committed_entries, bl);
+ }
DECODE_FINISH(bl);
}
};
diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h
index 64891cffb39..8209659bda9 100644
--- a/src/cls/queue/cls_queue_ops.h
+++ b/src/cls/queue/cls_queue_ops.h
@@ -136,4 +136,26 @@ struct cls_queue_get_capacity_ret {
};
WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret)
+struct cls_queue_get_stats_ret {
+ uint64_t queue_size;
+ uint32_t queue_entries;
+
+ cls_queue_get_stats_ret() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(queue_size, bl);
+ encode(queue_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(queue_size, bl);
+ decode(queue_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_get_stats_ret)
+
#endif /* CEPH_CLS_QUEUE_OPS_H */