summaryrefslogtreecommitdiffstats
path: root/src/cls/rgw/cls_rgw_client.cc
diff options
context:
space:
mode:
authorGuang Yang <yguang@yahoo-inc.com>2014-08-18 13:46:32 +0200
committerYehuda Sadeh <yehuda@redhat.com>2015-01-14 04:21:23 +0100
commit56feee792ee6cc083e1dfc74fcb7aa181286df80 (patch)
treeccab1b908e67835bcc9a97675749ea08444e59a5 /src/cls/rgw/cls_rgw_client.cc
parentAdjust rgw bucket prepare/complete OP to work with multiple bucket index shards. (diff)
downloadceph-56feee792ee6cc083e1dfc74fcb7aa181286df80.tar.xz
ceph-56feee792ee6cc083e1dfc74fcb7aa181286df80.zip
Adjust bucket listing to work with multiple shards.
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
Diffstat (limited to 'src/cls/rgw/cls_rgw_client.cc')
-rw-r--r--src/cls/rgw/cls_rgw_client.cc90
1 files changed, 69 insertions, 21 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc
index 47c9dcb8db8..0d698e3c815 100644
--- a/src/cls/rgw/cls_rgw_client.cc
+++ b/src/cls/rgw/cls_rgw_client.cc
@@ -11,6 +11,32 @@
using namespace librados;
+/**
+ * This class represents the bucket index object operation callback context.
+ */
+template <typename T>
+class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
+private:
+ T *data;
+ int *ret_code;
+public:
+ ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); }
+ ~ClsBucketIndexOpCtx() {}
+ void handle_completion(int r, bufferlist& outbl) {
+ if (r >= 0) {
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode((*data), iter);
+ } catch (buffer::error& err) {
+ r = -EIO;
+ }
+ }
+ if (ret_code) {
+ *ret_code = r;
+ }
+ }
+};
+
/*
* Callback implementation for AIO request.
*/
@@ -91,7 +117,7 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
break;
}
- int num_completions, r;
+ int num_completions, r = 0;
while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) {
if (r >= 0 && ret >= 0) {
for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
@@ -159,35 +185,57 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string&
o.exec("rgw", "bucket_complete_op", in);
}
-
-int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj,
- string& filter_prefix, uint32_t num_entries,
- rgw_bucket_dir *dir, bool *is_truncated)
-{
- bufferlist in, out;
+static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
+ const string& oid, const string& start_obj, const string& filter_prefix,
+ uint32_t num_entries, BucketIndexAioManager *manager,
+ struct rgw_cls_list_ret *pdata) {
+ bufferlist in;
struct rgw_cls_list_op call;
call.start_obj = start_obj;
call.filter_prefix = filter_prefix;
call.num_entries = num_entries;
::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
+
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op, NULL);
+ if (r >= 0)
+ manager->add_pending(arg->id, c);
+ return r;
+}
+
+int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj,
+ const string& filter_prefix, uint32_t num_entries,
+ map<string, struct rgw_cls_list_ret>& list_results, uint32_t max_aio)
+{
+ int ret = 0;
+ BucketIndexAioManager manager;
+ map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+ for (; iter != list_results.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+ if (ret < 0)
+ break;
}
- if (dir)
- *dir = ret.dir;
- if (is_truncated)
- *is_truncated = ret.is_truncated;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != list_results.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
- return r;
+ return ret;
}
int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,