diff options
author | Guang Yang <yguang@yahoo-inc.com> | 2014-08-18 13:46:32 +0200 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2015-01-14 04:21:23 +0100 |
commit | 56feee792ee6cc083e1dfc74fcb7aa181286df80 (patch) | |
tree | ccab1b908e67835bcc9a97675749ea08444e59a5 /src/cls/rgw/cls_rgw_client.cc | |
parent | Adjust rgw bucket prepare/complete OP to work with multiple bucket index shards. (diff) | |
download | ceph-56feee792ee6cc083e1dfc74fcb7aa181286df80.tar.xz ceph-56feee792ee6cc083e1dfc74fcb7aa181286df80.zip |
Adjust bucket listing to work with multiple shards.
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
Diffstat (limited to 'src/cls/rgw/cls_rgw_client.cc')
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 90 |
1 files changed, 69 insertions, 21 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 47c9dcb8db8..0d698e3c815 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -11,6 +11,32 @@ using namespace librados; +/** + * This class represents the bucket index object operation callback context. + */ +template <typename T> +class ClsBucketIndexOpCtx : public ObjectOperationCompletion { +private: + T *data; + int *ret_code; +public: + ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); } + ~ClsBucketIndexOpCtx() {} + void handle_completion(int r, bufferlist& outbl) { + if (r >= 0) { + try { + bufferlist::iterator iter = outbl.begin(); + ::decode((*data), iter); + } catch (buffer::error& err) { + r = -EIO; + } + } + if (ret_code) { + *ret_code = r; + } + } +}; + /* * Callback implementation for AIO request. */ @@ -91,7 +117,7 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx, break; } - int num_completions, r; + int num_completions, r = 0; while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) { if (r >= 0 && ret >= 0) { for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { @@ -159,35 +185,57 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& o.exec("rgw", "bucket_complete_op", in); } - -int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj, - string& filter_prefix, uint32_t num_entries, - rgw_bucket_dir *dir, bool *is_truncated) -{ - bufferlist in, out; +static bool issue_bucket_list_op(librados::IoCtx& io_ctx, + const string& oid, const string& start_obj, const string& filter_prefix, + uint32_t num_entries, BucketIndexAioManager *manager, + struct rgw_cls_list_ret *pdata) { + bufferlist in; struct rgw_cls_list_op call; call.start_obj = start_obj; call.filter_prefix = filter_prefix; call.num_entries = num_entries; ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL)); + + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op, NULL); + if (r >= 0) + manager->add_pending(arg->id, c); + return r; +} + +int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj, + const string& filter_prefix, uint32_t num_entries, + map<string, struct rgw_cls_list_ret>& list_results, uint32_t max_aio) +{ + int ret = 0; + BucketIndexAioManager manager; + map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin(); + for (; iter != list_results.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); + if (ret < 0) + break; } - if (dir) - *dir = ret.dir; - if (is_truncated) - *is_truncated = ret.is_truncated; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != list_results.end(); ++i, ++iter) { + int issue_ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } - return r; + return ret; } int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, |