summaryrefslogtreecommitdiffstats
path: root/src/cls/rgw
diff options
context:
space:
mode:
authorGuang Yang <yguang@yahoo-inc.com>2014-08-29 12:22:50 +0200
committerYehuda Sadeh <yehuda@redhat.com>2015-01-14 04:21:23 +0100
commit9c5acd67c4cfb28b22662e9ae3a67657cd689080 (patch)
tree237d42daa043e170fa61de583d1831ebbfb0e7a8 /src/cls/rgw
parentAdjust bucket listing to work with multiple shards. (diff)
downloadceph-9c5acd67c4cfb28b22662e9ae3a67657cd689080.tar.xz
ceph-9c5acd67c4cfb28b22662e9ae3a67657cd689080.zip
Adjust bucket stats/index checking/index rebuild/tag timeout implementation to work with multiple shards.
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
Diffstat (limited to 'src/cls/rgw')
-rw-r--r--src/cls/rgw/cls_rgw_client.cc190
-rw-r--r--src/cls/rgw/cls_rgw_client.h54
2 files changed, 190 insertions, 54 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc
index 0d698e3c815..ed937bc52eb 100644
--- a/src/cls/rgw/cls_rgw_client.cc
+++ b/src/cls/rgw/cls_rgw_client.cc
@@ -143,13 +143,50 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
return ret;
}
-void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
-{
+static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
+ const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
bufferlist in;
struct rgw_cls_tag_timeout_op call;
- call.tag_timeout = tag_timeout;
+ call.tag_timeout = timeout;
::encode(call, in);
- o.exec("rgw", "bucket_set_tag_timeout", in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bucket_set_tag_timeout", in);
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op);
+ if (r >= 0) {
+ manager->add_pending(arg->id, c);
+ }
+ return r;
+}
+
+int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
+ uint64_t tag_timeout, uint32_t max_aio)
+{
+ int ret = 0;
+ vector<string>::const_iterator iter = bucket_objs.begin();
+ BucketIndexAioManager manager;
+ for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+ if(issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
@@ -238,39 +275,91 @@ int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj,
return ret;
}
-int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header)
-{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
- if (r < 0)
- return r;
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+ struct rgw_cls_check_index_ret *pdata) {
+ bufferlist in;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
+ pdata, NULL));
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op, NULL);
+ if (r >= 0) {
+ manager->add_pending(arg->id, c);
+ }
+ return r;
+}
- struct rgw_cls_check_index_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
+int cls_rgw_bucket_check_index_op(IoCtx& io_ctx,
+ map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio)
+{
+ int ret = 0;
+ BucketIndexAioManager manager;
+ map<string, struct rgw_cls_check_index_ret>::iterator iter = bucket_objs_ret.begin();
+ for (; iter != bucket_objs_ret.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+ if (ret < 0)
+ break;
}
- if (existing_header)
- *existing_header = ret.existing_header;
- if (calculated_header)
- *calculated_header = ret.calculated_header;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != bucket_objs_ret.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
+}
- return 0;
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.exec("rgw", "bucket_rebuild_index", in);
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op);
+ if (r >= 0) {
+ manager->add_pending(arg->id, c);
+ }
+ return r;
}
-int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
+int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, const vector<string>& bucket_objs,
+ uint32_t max_aio)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
- if (r < 0)
- return r;
+ int ret = 0;
+ BucketIndexAioManager manager;
+ vector<string>::const_iterator iter = bucket_objs.begin();
+ for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+ if (ret < 0)
+ break;
+ }
- return 0;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -284,28 +373,33 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
o.exec("rgw", "dir_suggest_changes", updates);
}
-int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header)
+int cls_rgw_get_dir_header(IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+ uint32_t max_aio)
{
- bufferlist in, out;
- struct rgw_cls_list_op call;
- call.num_entries = 0;
- ::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
-
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
+ int ret = 0;
+ BucketIndexAioManager manager;
+ map<string, rgw_cls_list_ret>::iterator iter = dir_headers.begin();
+ for (; iter != dir_headers.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+ if (ret < 0)
+ break;
}
- if (header)
- *header = ret.dir.header;
-
- return r;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != dir_headers.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h
index a49b6422e7e..b28425e1fba 100644
--- a/src/cls/rgw/cls_rgw_client.h
+++ b/src/cls/rgw/cls_rgw_client.h
@@ -77,6 +77,37 @@ public:
virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
};
+class BucketIndexShardsManager {
+private:
+ // Per shard setting manager, for example, marker.
+ map<string, string> value_by_shards;
+ const static char KEY_VALUE_SEPARATOR = '#';
+ const static char SHARDS_SEPARATOR = ',';
+public:
+ void add_item(const string& shard, const string& value) {
+ value_by_shards[shard] = value;
+ }
+ void to_string(string *out) const {
+ if (out) {
+ map<string, string>::const_iterator iter = value_by_shards.begin();
+ // No shards
+ if (value_by_shards.size() == 1) {
+ *out = iter->second;
+ } else {
+ for (; iter != value_by_shards.end(); ++iter) {
+ if (out->length()) {
+ // Not the first item, append a separator first
+ out->append(1, SHARDS_SEPARATOR);
+ }
+ out->append(iter->first);
+ out->append(1, KEY_VALUE_SEPARATOR);
+ out->append(iter->second);
+ }
+ }
+ }
+ }
+};
+
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
@@ -92,7 +123,8 @@ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
const vector<string>& bucket_objs, uint32_t max_aio);
-void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
+int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx,
+ const vector<string>& bucket_objs, uint64_t tag_timeout, uint32_t max_aio);
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator, bool log_op);
@@ -122,12 +154,22 @@ int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj,
map<string, struct rgw_cls_list_ret>& list_results,
uint32_t max_aio);
-int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header);
-int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
+/**
+ * Check the bucket index.
+ *
+ * io_ctx - IO context for rados.
+ * bucket_objs_ret - check result for all shards.
+ * max_aio - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx,
+ map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio);
+int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
+ uint32_t max_aio);
-int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
+int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+ uint32_t max_aio);
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);