diff options
Diffstat (limited to 'src/cls')
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue.cc | 37 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_client.cc | 36 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_client.h | 7 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_const.h | 1 | ||||
-rw-r--r-- | src/cls/2pc_queue/cls_2pc_queue_types.h | 24 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_ops.h | 22 |
6 files changed, 117 insertions, 10 deletions
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index fba76395542..45710f9abe3 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -55,6 +55,36 @@ static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, return 0; } +static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_queue_get_stats_ret op_ret; + + // get head + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + const auto remaining_size = (head.tail.offset >= head.front.offset) ? + (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) : + head.front.offset - head.tail.offset; + op_ret.queue_size = head.queue_size - head.max_head_size - remaining_size; + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what()); + return -EINVAL; + } + op_ret.queue_entries = urgent_data.committed_entries; + + encode(op_ret, *out); + + return 0; +} + static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { cls_2pc_queue_reserve_op res_op; try { @@ -112,7 +142,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff cls_2pc_reservations::iterator last_reservation; std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct, std::forward_as_tuple(urgent_data.last_id), - std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now())); + std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries)); if (!result) { // an old reservation that was never committed or aborted is in the map // caller should try again assuming other IDs are ok @@ -148,7 +178,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff } std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct, std::forward_as_tuple(urgent_data.last_id), - std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now())); + std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries)); if (!result) { // an old reservation that was never committed or aborted is in the map // caller should try again assuming other IDs are ok @@ -268,6 +298,7 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe } urgent_data.reserved_size -= res.size; + urgent_data.committed_entries += res.entries; if (xattr_reservations.empty()) { // remove the reservation from urgent data @@ -577,6 +608,7 @@ CLS_INIT(2pc_queue) cls_handle_t h_class; cls_method_handle_t h_2pc_queue_init; cls_method_handle_t h_2pc_queue_get_capacity; + cls_method_handle_t h_2pc_queue_get_topic_stats; cls_method_handle_t h_2pc_queue_reserve; cls_method_handle_t h_2pc_queue_commit; cls_method_handle_t h_2pc_queue_abort; @@ -589,6 +621,7 @@ CLS_INIT(2pc_queue) cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init); cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity); + cls_register_cxx_method(h_class, TPC_QUEUE_GET_TOPIC_STATS, CLS_METHOD_RD, cls_2pc_queue_get_topic_stats, &h_2pc_queue_get_topic_stats); cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve); cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit); cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort); diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc index 6868b2b6f83..42632cba61a 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.cc +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -31,6 +31,21 @@ int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) { return 0; } +int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size) { + cls_queue_get_stats_ret op_ret; + auto iter = bl.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + committed_entries = op_ret.queue_entries; + size = op_ret.queue_size; + + return 0; +} + #ifndef CLS_CLIENT_HIDE_IOCTX int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) { bufferlist in, out; @@ -44,12 +59,31 @@ int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uin #endif // optionally async method for getting capacity (bytes) -// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results +// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) { bufferlist in; op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval); } +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_2pc_queue_get_topic_stats(IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size) { + bufferlist in, out; + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, out); + if (r < 0 ) { + return r; + } + + return cls_2pc_queue_get_topic_stats_result(out, committed_entries, size); +} +#endif + +// optionally async method for getting number of commited entries and size (bytes) +// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results +void cls_2pc_queue_get_topic_stats(ObjectReadOperation& op, bufferlist* obl, int* prval) { + bufferlist in; + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, obl, prval); +} + int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) { cls_2pc_queue_reserve_ret op_ret; diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h index e0bdeafd590..20043edd200 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.h +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -19,6 +19,8 @@ void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& q #ifndef CLS_CLIENT_HIDE_IOCTX // return capacity (bytes) int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t& size); +// return the number of committed entries and size (bytes) +int cls_2pc_queue_get_topic_stats(librados::IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size); // make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) // return a reservation id if reservations is possible, 0 otherwise @@ -37,7 +39,12 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& // after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); +// optionally async method for getting capacity (bytes) +// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results +void cls_2pc_queue_get_topic_stats(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); + int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size); +int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size); // optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) // notes: diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h index 160c5b66e9f..ea7afa943ca 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_const.h +++ b/src/cls/2pc_queue/cls_2pc_queue_const.h @@ -4,6 +4,7 @@ #define TPC_QUEUE_INIT "2pc_queue_init" #define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity" +#define TPC_QUEUE_GET_TOPIC_STATS "2pc_queue_get_topic_stats" #define TPC_QUEUE_RESERVE "2pc_queue_reserve" #define TPC_QUEUE_COMMIT "2pc_queue_commit" #define TPC_QUEUE_ABORT "2pc_queue_abort" diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h index 7c94cdebfe0..2413fd7043d 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_types.h +++ b/src/cls/2pc_queue/cls_2pc_queue_types.h @@ -8,25 +8,30 @@ struct cls_2pc_reservation { using id_t = uint32_t; inline static const id_t NO_ID{0}; - uint64_t size; // how many entries are reserved + uint64_t size; // how much size to reserve (bytes) ceph::coarse_real_time timestamp; // when the reservation was done (used for cleaning stale reservations) + uint32_t entries; // how many entries are reserved - cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) : - size(_size), timestamp(_timestamp) {} + cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp, uint32_t _entries) : + size(_size), timestamp(_timestamp), entries(_entries) {} cls_2pc_reservation() = default; void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(size, bl); encode(timestamp, bl); + encode(entries, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(size, bl); decode(timestamp, bl); + if (struct_v >= 2) { + decode(entries, bl); + } DECODE_FINISH(bl); } }; @@ -40,22 +45,27 @@ struct cls_2pc_urgent_data cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id cls_2pc_reservations reservations; // reservation list (keyed by id) bool has_xattrs{false}; + uint32_t committed_entries{0}; // how many entries have been committed so far void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(reserved_size, bl); encode(last_id, bl); encode(reservations, bl); encode(has_xattrs, bl); + encode(committed_entries, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(reserved_size, bl); decode(last_id, bl); decode(reservations, bl); decode(has_xattrs, bl); + if (struct_v >= 2) { + decode(committed_entries, bl); + } DECODE_FINISH(bl); } }; diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h index 64891cffb39..8209659bda9 100644 --- a/src/cls/queue/cls_queue_ops.h +++ b/src/cls/queue/cls_queue_ops.h @@ -136,4 +136,26 @@ struct cls_queue_get_capacity_ret { }; WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret) +struct cls_queue_get_stats_ret { + uint64_t queue_size; + uint32_t queue_entries; + + cls_queue_get_stats_ret() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(queue_size, bl); + encode(queue_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(queue_size, bl); + decode(queue_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_get_stats_ret) + #endif /* CEPH_CLS_QUEUE_OPS_H */ |