diff options
author | Guang Yang <yguang@yahoo-inc.com> | 2014-08-29 12:22:50 +0200 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2015-01-14 04:21:23 +0100 |
commit | 9c5acd67c4cfb28b22662e9ae3a67657cd689080 (patch) | |
tree | 237d42daa043e170fa61de583d1831ebbfb0e7a8 /src/cls/rgw | |
parent | Adjust bucket listing to work with multiple shards. (diff) | |
download | ceph-9c5acd67c4cfb28b22662e9ae3a67657cd689080.tar.xz ceph-9c5acd67c4cfb28b22662e9ae3a67657cd689080.zip |
Adjust bucket stats/index checking/index rebuild/tag timeout implementation to work with multiple shards.
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
Diffstat (limited to 'src/cls/rgw')
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 190 | ||||
-rw-r--r-- | src/cls/rgw/cls_rgw_client.h | 54 |
2 files changed, 190 insertions, 54 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 0d698e3c815..ed937bc52eb 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -143,13 +143,50 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx, return ret; } -void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout) -{ +static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, + const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { bufferlist in; struct rgw_cls_tag_timeout_op call; - call.tag_timeout = tag_timeout; + call.tag_timeout = timeout; ::encode(call, in); - o.exec("rgw", "bucket_set_tag_timeout", in); + ObjectWriteOperation op; + op.exec("rgw", "bucket_set_tag_timeout", in); + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op); + if (r >= 0) { + manager->add_pending(arg->id, c); + } + return r; +} + +int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>& bucket_objs, + uint64_t tag_timeout, uint32_t max_aio) +{ + int ret = 0; + vector<string>::const_iterator iter = bucket_objs.begin(); + BucketIndexAioManager manager; + for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager); + if (ret < 0) + break; + } + + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { + int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager); + if(issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; } void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -238,39 +275,91 @@ int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj, return ret; } -int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, - rgw_bucket_dir_header *existing_header, - rgw_bucket_dir_header *calculated_header) -{ - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out); - if (r < 0) - return r; +static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, + struct rgw_cls_check_index_ret *pdata) { + bufferlist in; + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>( + pdata, NULL)); + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op, NULL); + if (r >= 0) { + manager->add_pending(arg->id, c); + } + return r; +} - struct rgw_cls_check_index_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; +int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, + map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio) +{ + int ret = 0; + BucketIndexAioManager manager; + map<string, struct rgw_cls_check_index_ret>::iterator iter = bucket_objs_ret.begin(); + for (; iter != bucket_objs_ret.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); + if (ret < 0) + break; } - if (existing_header) - *existing_header = ret.existing_header; - if (calculated_header) - *calculated_header = ret.calculated_header; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != bucket_objs_ret.end(); ++i, ++iter) { + int issue_ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; +} - return 0; +static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, + BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.exec("rgw", "bucket_rebuild_index", in); + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op); + if (r >= 0) { + manager->add_pending(arg->id, c); + } + return r; } -int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid) +int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, const vector<string>& bucket_objs, + uint32_t max_aio) { - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out); - if (r < 0) - return r; + int ret = 0; + BucketIndexAioManager manager; + vector<string>::const_iterator iter = bucket_objs.begin(); + for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); + if (ret < 0) + break; + } - return 0; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { + int issue_ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -284,28 +373,33 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec("rgw", "dir_suggest_changes", updates); } -int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header) +int cls_rgw_get_dir_header(IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers, + uint32_t max_aio) { - bufferlist in, out; - struct rgw_cls_list_op call; - call.num_entries = 0; - ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; + int ret = 0; + BucketIndexAioManager manager; + map<string, rgw_cls_list_ret>::iterator iter = dir_headers.begin(); + for (; iter != dir_headers.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); + if (ret < 0) + break; } - if (header) - *header = ret.dir.header; - - return r; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != dir_headers.end(); ++i, ++iter) { + int issue_ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; } class GetDirHeaderCompletion : public ObjectOperationCompletion { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index a49b6422e7e..b28425e1fba 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -77,6 +77,37 @@ public: virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0; }; +class BucketIndexShardsManager { +private: + // Per shard setting manager, for example, marker. + map<string, string> value_by_shards; + const static char KEY_VALUE_SEPARATOR = '#'; + const static char SHARDS_SEPARATOR = ','; +public: + void add_item(const string& shard, const string& value) { + value_by_shards[shard] = value; + } + void to_string(string *out) const { + if (out) { + map<string, string>::const_iterator iter = value_by_shards.begin(); + // No shards + if (value_by_shards.size() == 1) { + *out = iter->second; + } else { + for (; iter != value_by_shards.end(); ++iter) { + if (out->length()) { + // Not the first item, append a separator first + out->append(1, SHARDS_SEPARATOR); + } + out->append(iter->first); + out->append(1, KEY_VALUE_SEPARATOR); + out->append(iter->second); + } + } + } + } +}; + /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); @@ -92,7 +123,8 @@ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx, const vector<string>& bucket_objs, uint32_t max_aio); -void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout); +int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, + const vector<string>& bucket_objs, uint64_t tag_timeout, uint32_t max_aio); void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, string& name, string& locator, bool log_op); @@ -122,12 +154,22 @@ int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj, map<string, struct rgw_cls_list_ret>& list_results, uint32_t max_aio); -int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, - rgw_bucket_dir_header *existing_header, - rgw_bucket_dir_header *calculated_header); -int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid); +/** + * Check the bucket index. + * + * io_ctx - IO context for rados. + * bucket_objs_ret - check result for all shards. + * max_aio - the maximum number of AIO (for throttling). + * + * Return 0 on success, a failure code otherwise. + */ +int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, + map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio); +int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, const vector<string>& bucket_objs, + uint32_t max_aio); -int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header); +int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers, + uint32_t max_aio); int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); |