diff options
Diffstat (limited to 'src/cls')
-rw-r--r-- | src/cls/refcount/cls_refcount.cc | 243 | ||||
-rw-r--r-- | src/cls/refcount/cls_refcount_client.cc | 60 | ||||
-rw-r--r-- | src/cls/refcount/cls_refcount_client.h | 34 | ||||
-rw-r--r-- | src/cls/refcount/cls_refcount_ops.h | 112 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw.cc | 151 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 62 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_client.h | 13 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_ops.h | 43 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_types.h | 109 |
9 files changed, 793 insertions, 34 deletions
diff --git a/src/cls/refcount/cls_refcount.cc b/src/cls/refcount/cls_refcount.cc new file mode 100644 index 00000000000..aa536ac0f7e --- /dev/null +++ b/src/cls/refcount/cls_refcount.cc @@ -0,0 +1,243 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <iostream> + +#include <string.h> +#include <stdlib.h> +#include <errno.h> + +#include "include/types.h" +#include "include/utime.h" +#include "objclass/objclass.h" +#include "cls/refcount/cls_refcount_ops.h" +#include "common/Clock.h" + +#include "global/global_context.h" + +CLS_VER(1,0) +CLS_NAME(refcount) + +cls_handle_t h_class; +cls_method_handle_t h_refcount_get; +cls_method_handle_t h_refcount_put; +cls_method_handle_t h_refcount_set; +cls_method_handle_t h_refcount_read; + + +#define REFCOUNT_ATTR "refcount" + +struct obj_refcount { + map<string, bool> refs; + + obj_refcount() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(refs, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(refs, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(obj_refcount) + +static string wildcard_tag; + +static int read_refcount(cls_method_context_t hctx, bool implicit_ref, obj_refcount *objr) +{ + bufferlist bl; + objr->refs.clear(); + int ret = cls_cxx_getxattr(hctx, REFCOUNT_ATTR, &bl); + if (ret == -ENOENT || ret == -ENODATA) { + if (implicit_ref) { + objr->refs[wildcard_tag] = true; + } + return 0; + } + if (ret < 0) + return ret; + + try { + bufferlist::iterator iter = bl.begin(); + ::decode(*objr, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: read_refcount(): failed to decode refcount entry\n"); + return -EIO; + } + + return 0; +} + +static int set_refcount(cls_method_context_t hctx, map<string, bool>& refs) +{ + bufferlist bl; + struct obj_refcount objr; + + objr.refs = refs; + + ::encode(objr, bl); + + int ret = cls_cxx_setxattr(hctx, REFCOUNT_ATTR, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_rc_refcount_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_refcount_get_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_rc_refcount_get(): failed to decode entry\n"); + return -EINVAL; + } + + obj_refcount objr; + int ret = read_refcount(hctx, op.implicit_ref, &objr); + if (ret < 0) + return ret; + + CLS_LOG(10, "cls_rc_refcount_get() tag=%s\n", op.tag.c_str()); + + objr.refs[op.tag] = true; + + ret = set_refcount(hctx, objr.refs); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_rc_refcount_put(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_refcount_put_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_rc_refcount_put(): failed to decode entry\n"); + return -EINVAL; + } + + obj_refcount objr; + int ret = read_refcount(hctx, op.implicit_ref, &objr); + if (ret < 0) + return ret; + + if (!objr.refs.size()) {// shouldn't happen! + CLS_LOG(0, "ERROR: cls_rc_refcount_put() was called without any references!\n"); + return -EINVAL; + } + + CLS_LOG(10, "cls_rc_refcount_put() tag=%s\n", op.tag.c_str()); + + bool found = false; + map<string, bool>::iterator iter = objr.refs.find(op.tag); + if (iter != objr.refs.end()) { + found = true; + } else if (op.implicit_ref) { + iter = objr.refs.find(wildcard_tag); + if (iter != objr.refs.end()) { + found = true; + } + } + + if (!found) + return 0; + + objr.refs.erase(iter); + + if (!objr.refs.size()) { + return cls_cxx_remove(hctx); + } + + ret = set_refcount(hctx, objr.refs); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_rc_refcount_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_refcount_set_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_refcount_set(): failed to decode entry\n"); + return -EINVAL; + } + + if (!op.refs.size()) { + return cls_cxx_remove(hctx); + } + + obj_refcount objr; + list<string>::iterator iter; + for (iter = op.refs.begin(); iter != op.refs.end(); ++iter) { + objr.refs[*iter] = true; + } + + int ret = set_refcount(hctx, objr.refs); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_rc_refcount_read(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_refcount_read_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_rc_refcount_read(): failed to decode entry\n"); + return -EINVAL; + } + + obj_refcount objr; + + cls_refcount_read_ret read_ret; + int ret = read_refcount(hctx, op.implicit_ref, &objr); + if (ret < 0) + return ret; + + map<string, bool>::iterator iter; + for (iter = objr.refs.begin(); iter != objr.refs.end(); ++iter) { + read_ret.refs.push_back(iter->first); + } + + ::encode(read_ret, *out); + + return 0; +} + +void __cls_init() +{ + CLS_LOG(1, "Loaded refcount class!"); + + cls_register("refcount", &h_class); + + /* refcount */ + cls_register_cxx_method(h_class, "get", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, cls_rc_refcount_get, &h_refcount_get); + cls_register_cxx_method(h_class, "put", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, cls_rc_refcount_put, &h_refcount_put); + cls_register_cxx_method(h_class, "set", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, cls_rc_refcount_set, &h_refcount_set); + cls_register_cxx_method(h_class, "read", CLS_METHOD_RD | CLS_METHOD_PUBLIC, cls_rc_refcount_read, &h_refcount_read); + + return; +} + diff --git a/src/cls/refcount/cls_refcount_client.cc b/src/cls/refcount/cls_refcount_client.cc new file mode 100644 index 00000000000..f88ff5edd4b --- /dev/null +++ b/src/cls/refcount/cls_refcount_client.cc @@ -0,0 +1,60 @@ +#include <errno.h> + +#include "include/types.h" +#include "cls/refcount/cls_refcount_ops.h" +#include "include/rados/librados.hpp" + +using namespace librados; + + +void cls_refcount_get(librados::ObjectWriteOperation& op, const string& tag, bool implicit_ref) +{ + bufferlist in; + cls_refcount_get_op call; + call.tag = tag; + call.implicit_ref = implicit_ref; + ::encode(call, in); + op.exec("refcount", "get", in); +} + +void cls_refcount_put(librados::ObjectWriteOperation& op, const string& tag, bool implicit_ref) +{ + bufferlist in; + cls_refcount_put_op call; + call.tag = tag; + call.implicit_ref = implicit_ref; + ::encode(call, in); + op.exec("refcount", "put", in); +} + +void cls_refcount_set(librados::ObjectWriteOperation& op, list<string>& refs) +{ + bufferlist in; + cls_refcount_set_op call; + call.refs = refs; + ::encode(call, in); + op.exec("refcount", "set", in); +} + +int cls_refcount_read(librados::IoCtx& io_ctx, string& oid, list<string> *refs, bool implicit_ref) +{ + bufferlist in, out; + cls_refcount_read_op call; + call.implicit_ref = implicit_ref; + ::encode(call, in); + int r = io_ctx.exec(oid, "refcount", "read", in, out); + if (r < 0) + return r; + + cls_refcount_read_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + *refs = ret.refs; + + return r; +} diff --git a/src/cls/refcount/cls_refcount_client.h b/src/cls/refcount/cls_refcount_client.h new file mode 100644 index 00000000000..9fbb8c0592f --- /dev/null +++ b/src/cls/refcount/cls_refcount_client.h @@ -0,0 +1,34 @@ +#ifndef CEPH_CLS_REFCOUNT_CLIENT_H +#define CEPH_CLS_REFCOUNT_CLIENT_H + +#include "include/types.h" +#include "include/rados/librados.hpp" + +/* + * refcount objclass + * + * The refcount objclass implements a refcounting scheme that allows having multiple references + * to a single rados object. The canonical way to use it is to add a reference and to remove a + * reference using a specific tag. This way we ensure that refcounting operations are idempotent, + * that is, a single client can only increase/decrease the refcount once using a single tag, so + * any replay of operations (implicit or explicit) is possible. + * + * So, the regular usage would be to create an object, to increase the refcount. Then, when + * wanting to have another reference to it, increase the refcount using a different tag. When + * removing a reference it is required to drop the refcount (using the same tag that was used + * for that reference). When the refcount drops to zero, the object is removed automaticfally. + * + * In order to maintain backwards compatibility with objects that were created without having + * their refcount increased, the implicit_ref was added. Any object that was created without + * having it's refcount increased (explicitly) is having an implicit refcount of 1. Since + * we don't have a tag for this refcount, we consider this tag as a wildcard. So if the refcount + * is being decreased by an unknown tag and we still have one wildcard tag, we'll accept it + * as the relevant tag, and the refcount will be decreased. + */ + +void cls_refcount_get(librados::ObjectWriteOperation& op, const string& tag, bool implicit_ref = false); +void cls_refcount_put(librados::ObjectWriteOperation& op, const string& tag, bool implicit_ref = false); +void cls_refcount_set(librados::ObjectWriteOperation& op, list<string>& refs); +int cls_refcount_read(librados::IoCtx& io_ctx, string& oid, list<string> *refs, bool implicit_ref = false); + +#endif diff --git a/src/cls/refcount/cls_refcount_ops.h b/src/cls/refcount/cls_refcount_ops.h new file mode 100644 index 00000000000..9e32490de55 --- /dev/null +++ b/src/cls/refcount/cls_refcount_ops.h @@ -0,0 +1,112 @@ +#ifndef CEPH_CLS_REFCOUNT_OPS_H +#define CEPH_CLS_REFCOUNT_OPS_H + +#include <map> + +#include "include/types.h" + +struct cls_refcount_get_op { + string tag; + bool implicit_ref; + + cls_refcount_get_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(tag, bl); + ::encode(implicit_ref, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(tag, bl); + ::decode(implicit_ref, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_refcount_get_op) + +struct cls_refcount_put_op { + string tag; + bool implicit_ref; // assume wildcard reference for + // objects without a set ref + + cls_refcount_put_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(tag, bl); + ::encode(implicit_ref, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(tag, bl); + ::decode(implicit_ref, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_refcount_put_op) + +struct cls_refcount_set_op { + list<string> refs; + + cls_refcount_set_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(refs, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(refs, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_refcount_set_op) + +struct cls_refcount_read_op { + bool implicit_ref; // assume wildcard reference for + // objects without a set ref + + cls_refcount_read_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(implicit_ref, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(implicit_ref, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_refcount_read_op) + +struct cls_refcount_read_ret { + list<string> refs; + + cls_refcount_read_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(refs, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(refs, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_refcount_read_ret) + + +#endif diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 12012a45614..1a9c40076e5 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -20,7 +20,10 @@ CLS_NAME(rgw) cls_handle_t h_class; cls_method_handle_t h_rgw_bucket_init_index; +cls_method_handle_t h_rgw_bucket_set_tag_timeout; cls_method_handle_t h_rgw_bucket_list; +cls_method_handle_t h_rgw_bucket_check_index; +cls_method_handle_t h_rgw_bucket_rebuild_index; cls_method_handle_t h_rgw_bucket_prepare_op; cls_method_handle_t h_rgw_bucket_complete_op; cls_method_handle_t h_rgw_dir_suggest_changes; @@ -96,6 +99,84 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) return 0; } +static int check_index(cls_method_context_t hctx, struct rgw_bucket_dir_header *existing_header, struct rgw_bucket_dir_header *calc_header) +{ + bufferlist header_bl; + int rc = cls_cxx_map_read_header(hctx, &header_bl); + if (rc < 0) + return rc; + bufferlist::iterator header_iter = header_bl.begin(); + try { + ::decode(*existing_header, header_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_bucket_list(): failed to decode header\n"); + return -EINVAL; + } + + calc_header->tag_timeout = existing_header->tag_timeout; + + bufferlist bl; + + map<string, bufferlist> keys; + string start_obj; + string filter_prefix; + +#define CHECK_CHUNK_SIZE 1000 + do { + rc = cls_cxx_map_get_vals(hctx, start_obj, filter_prefix, CHECK_CHUNK_SIZE, &keys); + if (rc < 0) + return rc; + + std::map<string, bufferlist>::iterator kiter = keys.begin(); + for (; kiter != keys.end(); ++kiter) { + struct rgw_bucket_dir_entry entry; + bufferlist::iterator eiter = kiter->second.begin(); + try { + ::decode(entry, eiter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_bucket_list(): failed to decode entry, key=%s\n", kiter->first.c_str()); + return -EIO; + } + struct rgw_bucket_category_stats& stats = calc_header->stats[entry.meta.category]; + stats.num_entries++; + stats.total_size += entry.meta.size; + stats.total_size_rounded += get_rounded_size(entry.meta.size); + + start_obj = kiter->first; + } + } while (keys.size() == CHECK_CHUNK_SIZE); + + return 0; +} + +int rgw_bucket_check_index(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + struct rgw_cls_check_index_ret ret; + + int rc = check_index(hctx, &ret.existing_header, &ret.calculated_header); + if (rc < 0) + return rc; + + ::encode(ret, *out); + + return 0; +} + +int rgw_bucket_rebuild_index(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + struct rgw_bucket_dir_header existing_header; + struct rgw_bucket_dir_header calc_header; + int rc = check_index(hctx, &existing_header, &calc_header); + if (rc < 0) + return rc; + + bufferlist header_bl; + ::encode(calc_header, header_bl); + rc = cls_cxx_map_write_header(hctx, &header_bl); + return rc; +} + + int rgw_bucket_init_index(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { bufferlist bl; @@ -124,6 +205,39 @@ int rgw_bucket_init_index(cls_method_context_t hctx, bufferlist *in, bufferlist return rc; } +int rgw_bucket_set_tag_timeout(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + // decode request + rgw_cls_tag_timeout_op op; + bufferlist::iterator iter = in->begin(); + try { + ::decode(op, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_bucket_set_tag_timeout(): failed to decode request\n"); + return -EINVAL; + } + + bufferlist header_bl; + struct rgw_bucket_dir_header header; + int rc = cls_cxx_map_read_header(hctx, &header_bl); + if (rc < 0) + return rc; + bufferlist::iterator header_iter = header_bl.begin(); + try { + ::decode(header, header_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to decode header\n"); + return -EINVAL; + } + + header.tag_timeout = op.tag_timeout; + + header_bl.clear(); + ::encode(header, header_bl); + rc = cls_cxx_map_write_header(hctx, &header_bl); + return rc; +} + int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { // decode request @@ -253,7 +367,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist if (op.tag.size() && op.op == CLS_RGW_OP_CANCEL) { CLS_LOG(1, "rgw_bucket_complete_op(): cancel requested\n"); cancel = true; - } else if (op.epoch <= entry.epoch) { + } else if (op.epoch && op.epoch <= entry.epoch) { CLS_LOG(1, "rgw_bucket_complete_op(): skipping request, old epoch\n"); cancel = true; } @@ -331,6 +445,8 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis if (rc < 0) return rc; + uint64_t tag_timeout; + try { bufferlist::iterator header_iter = header_bl.begin(); ::decode(header, header_iter); @@ -339,13 +455,14 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis return -EINVAL; } + tag_timeout = (header.tag_timeout ? header.tag_timeout : CEPH_RGW_TAG_TIMEOUT); + bufferlist::iterator in_iter = in->begin(); - __u8 op; - rgw_bucket_dir_entry cur_change; - rgw_bucket_dir_entry cur_disk; - bufferlist op_bl; while (!in_iter.end()) { + __u8 op; + rgw_bucket_dir_entry cur_change; + rgw_bucket_dir_entry cur_disk; try { ::decode(op, in_iter); ::decode(cur_change, in_iter); @@ -373,31 +490,40 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis cur_disk.pending_map.begin(); while(iter != cur_disk.pending_map.end()) { map<string, struct rgw_bucket_pending_info>::iterator cur_iter=iter++; - if (cur_time > (cur_iter->second.timestamp + CEPH_RGW_TAG_TIMEOUT)) { + if (cur_time > (cur_iter->second.timestamp + tag_timeout)) { cur_disk.pending_map.erase(cur_iter); } } } + CLS_LOG(20, "cur_disk.pending_map.empty()=%d op=%d cur_disk.exists=%d cur_change.pending_map.size()=%d cur_change.exists=%d\n", + cur_disk.pending_map.empty(), (int)op, cur_disk.exists, + cur_change.pending_map.size(), cur_change.exists); + if (cur_disk.pending_map.empty()) { - struct rgw_bucket_category_stats& stats = - header.stats[cur_disk.meta.category]; if (cur_disk.exists) { - stats.num_entries--; - stats.total_size -= cur_disk.meta.size; - stats.total_size_rounded -= get_rounded_size(cur_disk.meta.size); + struct rgw_bucket_category_stats& old_stats = header.stats[cur_disk.meta.category]; + CLS_LOG(10, "total_entries: %d -> %d\n", old_stats.num_entries, old_stats.num_entries - 1); + old_stats.num_entries--; + old_stats.total_size -= cur_disk.meta.size; + old_stats.total_size_rounded -= get_rounded_size(cur_disk.meta.size); header_changed = true; } + struct rgw_bucket_category_stats& stats = + header.stats[cur_change.meta.category]; switch(op) { case CEPH_RGW_REMOVE: + CLS_LOG(10, "CEPH_RGW_REMOVE name=%s\n", cur_change.name.c_str()); ret = cls_cxx_map_remove_key(hctx, cur_change.name); if (ret < 0) return ret; break; case CEPH_RGW_UPDATE: + CLS_LOG(10, "CEPH_RGW_UPDATE name=%s total_entries: %d -> %d\n", cur_change.name.c_str(), stats.num_entries, stats.num_entries + 1); stats.num_entries++; stats.total_size += cur_change.meta.size; stats.total_size_rounded += get_rounded_size(cur_change.meta.size); + header_changed = true; bufferlist cur_state_bl; ::encode(cur_change, cur_state_bl); ret = cls_cxx_map_set_val(hctx, cur_change.name, &cur_state_bl); @@ -1009,7 +1135,10 @@ void __cls_init() /* bucket index */ cls_register_cxx_method(h_class, "bucket_init_index", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_init_index, &h_rgw_bucket_init_index); + cls_register_cxx_method(h_class, "bucket_set_tag_timeout", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_set_tag_timeout, &h_rgw_bucket_set_tag_timeout); cls_register_cxx_method(h_class, "bucket_list", CLS_METHOD_RD | CLS_METHOD_PUBLIC, rgw_bucket_list, &h_rgw_bucket_list); + cls_register_cxx_method(h_class, "bucket_check_index", CLS_METHOD_RD | CLS_METHOD_PUBLIC, rgw_bucket_check_index, &h_rgw_bucket_check_index); + cls_register_cxx_method(h_class, "bucket_rebuild_index", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_rebuild_index, &h_rgw_bucket_rebuild_index); cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op); cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_complete_op, &h_rgw_bucket_complete_op); cls_register_cxx_method(h_class, "dir_suggest_changes", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_dir_suggest_changes, &h_rgw_dir_suggest_changes); diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 29cf389431b..b873a866903 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -6,6 +6,20 @@ using namespace librados; +void cls_rgw_bucket_init(ObjectWriteOperation& o) +{ + bufferlist in; + o.exec("rgw", "bucket_init_index", in); +} + +void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout) +{ + bufferlist in; + struct rgw_cls_tag_timeout_op call; + call.tag_timeout = tag_timeout; + ::encode(call, in); + o.exec("rgw", "bucket_set_tag_timeout", in); +} void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, uint8_t op, string& tag, string& name, string& locator) @@ -20,7 +34,6 @@ void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, uint8_t op, string& tag, o.exec("rgw", "bucket_prepare_op", in); } - void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, uint8_t op, string& tag, uint64_t epoch, string& name, rgw_bucket_dir_entry_meta& dir_meta) { @@ -36,6 +49,7 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, uint8_t op, string& tag o.exec("rgw", "bucket_complete_op", in); } + int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj, string& filter_prefix, uint32_t num_entries, rgw_bucket_dir *dir, bool *is_truncated) @@ -66,6 +80,52 @@ int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj, return r; } +int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, + rgw_bucket_dir_header *existing_header, + rgw_bucket_dir_header *calculated_header) +{ + bufferlist in, out; + int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out); + if (r < 0) + return r; + + struct rgw_cls_check_index_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + if (existing_header) + *existing_header = ret.existing_header; + if (calculated_header) + *calculated_header = ret.calculated_header; + + return 0; +} + +int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid) +{ + bufferlist in, out; + int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out); + if (r < 0) + return r; + + return 0; +} + +void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) +{ + updates.append(op); + ::encode(dirent, updates); +} + +void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) +{ + o.exec("rgw", "dir_suggest_changes", updates); +} + int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header) { bufferlist in, out; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 50b8125e8b1..d2218b9de7b 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -6,6 +6,10 @@ #include "cls_rgw_types.h" /* bucket index */ +void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); + +void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout); + void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, uint8_t op, string& tag, string& name, string& locator); @@ -16,8 +20,17 @@ int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj, string& filter_prefix, uint32_t num_entries, rgw_bucket_dir *dir, bool *is_truncated); +int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, + rgw_bucket_dir_header *existing_header, + rgw_bucket_dir_header *calculated_header); +int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid); + int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header); +void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); + +void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates); + /* usage logging */ int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index b9d40857d3c..35c96c0a6da 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -6,6 +6,25 @@ #include "include/types.h" #include "cls/rgw/cls_rgw_types.h" +struct rgw_cls_tag_timeout_op +{ + uint64_t tag_timeout; + + rgw_cls_tag_timeout_op() {} + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ::encode(tag_timeout, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &bl) { + DECODE_START(1, bl); + ::decode(tag_timeout, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_cls_tag_timeout_op) + struct rgw_cls_obj_prepare_op { uint8_t op; @@ -126,6 +145,30 @@ struct rgw_cls_list_ret }; WRITE_CLASS_ENCODER(rgw_cls_list_ret) +struct rgw_cls_check_index_ret +{ + rgw_bucket_dir_header existing_header; + rgw_bucket_dir_header calculated_header; + + rgw_cls_check_index_ret() {} + + void encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + ::encode(existing_header, bl); + ::encode(calculated_header, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &bl) { + DECODE_START(1, bl); + ::decode(existing_header, bl); + ::decode(calculated_header, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + static void generate_test_instances(list<rgw_cls_list_ret*>& o); +}; +WRITE_CLASS_ENCODER(rgw_cls_check_index_ret) + struct rgw_cls_usage_log_add_op { rgw_usage_log_info info; diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index f587bc95bc6..96e61f1b765 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -161,15 +161,24 @@ WRITE_CLASS_ENCODER(rgw_bucket_category_stats) struct rgw_bucket_dir_header { map<uint8_t, rgw_bucket_category_stats> stats; + uint64_t tag_timeout; + + rgw_bucket_dir_header() : tag_timeout(0) {} void encode(bufferlist &bl) const { - ENCODE_START(2, 2, bl); + ENCODE_START(3, 2, bl); ::encode(stats, bl); + ::encode(tag_timeout, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); + DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl); ::decode(stats, bl); + if (struct_v > 2) { + ::decode(tag_timeout, bl); + } else { + tag_timeout = 0; + } DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -198,24 +207,17 @@ struct rgw_bucket_dir { }; WRITE_CLASS_ENCODER(rgw_bucket_dir) - -struct rgw_usage_log_entry { - string owner; - string bucket; - uint64_t epoch; +struct rgw_usage_data { uint64_t bytes_sent; uint64_t bytes_received; uint64_t ops; uint64_t successful_ops; - rgw_usage_log_entry() : bytes_sent(0), bytes_received(0), ops(0), successful_ops(0) {} - rgw_usage_log_entry(string& o, string& b, uint64_t s, uint64_t r) : owner(o), bucket(b), bytes_sent(s), bytes_received(r), ops(0), successful_ops(0) {} + rgw_usage_data() : bytes_sent(0), bytes_received(0), ops(0), successful_ops(0) {} + rgw_usage_data(uint64_t sent, uint64_t received) : bytes_sent(sent), bytes_received(received), ops(0), successful_ops(0) {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - ::encode(owner, bl); - ::encode(bucket, bl); - ::encode(epoch, bl); ::encode(bytes_sent, bl); ::encode(bytes_received, bl); ::encode(ops, bl); @@ -223,12 +225,8 @@ struct rgw_usage_log_entry { ENCODE_FINISH(bl); } - - void decode(bufferlist::iterator& bl) { + void decode(bufferlist::iterator& bl) { DECODE_START(1, bl); - ::decode(owner, bl); - ::decode(bucket, bl); - ::decode(epoch, bl); ::decode(bytes_sent, bl); ::decode(bytes_received, bl); ::decode(ops, bl); @@ -236,16 +234,83 @@ struct rgw_usage_log_entry { DECODE_FINISH(bl); } - void aggregate(const rgw_usage_log_entry& e) { + void aggregate(const rgw_usage_data& usage) { + bytes_sent += usage.bytes_sent; + bytes_received += usage.bytes_received; + ops += usage.ops; + successful_ops += usage.successful_ops; + } +}; +WRITE_CLASS_ENCODER(rgw_usage_data) + + +struct rgw_usage_log_entry { + string owner; + string bucket; + uint64_t epoch; + rgw_usage_data total_usage; /* this one is kept for backwards compatibility */ + map<string, rgw_usage_data> usage_map; + + rgw_usage_log_entry() {} + rgw_usage_log_entry(string& o, string& b) : owner(o), bucket(b) {} + + void encode(bufferlist& bl) const { + ENCODE_START(2, 1, bl); + ::encode(owner, bl); + ::encode(bucket, bl); + ::encode(epoch, bl); + ::encode(total_usage.bytes_sent, bl); + ::encode(total_usage.bytes_received, bl); + ::encode(total_usage.ops, bl); + ::encode(total_usage.successful_ops, bl); + ::encode(usage_map, bl); + ENCODE_FINISH(bl); + } + + + void decode(bufferlist::iterator& bl) { + DECODE_START(2, bl); + ::decode(owner, bl); + ::decode(bucket, bl); + ::decode(epoch, bl); + ::decode(total_usage.bytes_sent, bl); + ::decode(total_usage.bytes_received, bl); + ::decode(total_usage.ops, bl); + ::decode(total_usage.successful_ops, bl); + if (struct_v < 2) { + usage_map[""] = total_usage; + } else { + ::decode(usage_map, bl); + } + DECODE_FINISH(bl); + } + + void aggregate(const rgw_usage_log_entry& e, map<string, bool> *categories = NULL) { if (owner.empty()) { owner = e.owner; bucket = e.bucket; epoch = e.epoch; } - bytes_sent += e.bytes_sent; - bytes_received += e.bytes_received; - ops += e.ops; - successful_ops += e.successful_ops; + map<string, rgw_usage_data>::const_iterator iter; + for (iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) { + if (!categories || !categories->size() || categories->count(iter->first)) { + add(iter->first, iter->second); + } + } + } + + void sum(rgw_usage_data& usage, map<string, bool>& categories) const { + usage = rgw_usage_data(); + for (map<string, rgw_usage_data>::const_iterator iter = usage_map.begin(); iter != usage_map.end(); ++iter) { + if (!categories.size() || categories.count(iter->first)) { + usage.aggregate(iter->second); + } + } + } + + void add(const string& category, const rgw_usage_data& data) { + usage_map[category].aggregate(data); + total_usage.aggregate(data); } }; WRITE_CLASS_ENCODER(rgw_usage_log_entry) |