summaryrefslogtreecommitdiffstats
path: root/src/cls/rgw/cls_rgw_client.cc
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2015-01-22 02:30:32 +0100
committerYehuda Sadeh <yehuda@redhat.com>2015-01-22 02:30:32 +0100
commit6f44f7a0a9847e419ce2783164633efa71218380 (patch)
treeded64c846bf4a006a9ccd55e2f7b17d4396dac6c /src/cls/rgw/cls_rgw_client.cc
parentMerge pull request #3246 from ceph/wip-9780-9781 (diff)
downloadceph-6f44f7a0a9847e419ce2783164633efa71218380.tar.xz
ceph-6f44f7a0a9847e419ce2783164633efa71218380.zip
Revert "Revert "Merge remote-tracking branch 'origin/wip-bi-sharding-3' into next""
Following a merge of next to master, the feature got reverted (because it was reverted on next). Undoing. This reverts commit 6613358ddc5339c8e33c409387fd6044db0b6f26.
Diffstat (limited to 'src/cls/rgw/cls_rgw_client.cc')
-rw-r--r--src/cls/rgw/cls_rgw_client.cc304
1 files changed, 183 insertions, 121 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc
index c13c1a1559c..545b36bcff5 100644
--- a/src/cls/rgw/cls_rgw_client.cc
+++ b/src/cls/rgw/cls_rgw_client.cc
@@ -11,19 +11,131 @@
using namespace librados;
+const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
+const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
+
+/**
+ * This class represents the bucket index object operation callback context.
+ */
+template <typename T>
+class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
+private:
+ T *data;
+ int *ret_code;
+public:
+ ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); }
+ ~ClsBucketIndexOpCtx() {}
+ void handle_completion(int r, bufferlist& outbl) {
+ if (r >= 0) {
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode((*data), iter);
+ } catch (buffer::error& err) {
+ r = -EIO;
+ }
+ }
+ if (ret_code) {
+ *ret_code = r;
+ }
+ }
+};
+
+void BucketIndexAioManager::do_completion(int id) {
+ Mutex::Locker l(lock);
+
+ map<int, librados::AioCompletion*>::iterator iter = pendings.find(id);
+ assert(iter != pendings.end());
+ completions[id] = iter->second;
+ pendings.erase(iter);
+
+ // If the caller needs a list of finished objects, store them
+ // for further processing
+ map<int, string>::iterator miter = pending_objs.find(id);
+ if (miter != pending_objs.end()) {
+ completion_objs[id] = miter->second;
+ pending_objs.erase(miter);
+ }
+
+ cond.Signal();
+}
+
+bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
+ int *num_completions, int *ret_code, map<int, string> *objs) {
+ lock.Lock();
+ if (pendings.empty() && completions.empty()) {
+ lock.Unlock();
+ return false;
+ }
+
+ if (completions.empty()) {
+ // Wait for AIO completion
+ cond.Wait(lock);
+ }
+
+ // Clear the completed AIOs
+ map<int, librados::AioCompletion*>::iterator iter = completions.begin();
+ for (; iter != completions.end(); ++iter) {
+ int r = iter->second->get_return_value();
+ if (objs && r == 0) { /* update list of successfully completed objs */
+ map<int, string>::iterator liter = completion_objs.find(iter->first);
+ if (liter != completion_objs.end()) {
+ (*objs)[liter->first] = liter->second;
+ }
+ }
+ if (ret_code && (r < 0 && r != valid_ret_code))
+ (*ret_code) = r;
+ iter->second->release();
+ }
+ if (num_completions)
+ (*num_completions) = completions.size();
+ completions.clear();
+ lock.Unlock();
+
+ return true;
+}
+
void cls_rgw_bucket_init(ObjectWriteOperation& o)
{
bufferlist in;
o.exec("rgw", "bucket_init_index", in);
}
-void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
-{
+static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+ const string& oid, BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ op.exec("rgw", "bucket_init_index", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
+ const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
bufferlist in;
struct rgw_cls_tag_timeout_op call;
- call.tag_timeout = tag_timeout;
+ call.tag_timeout = timeout;
::encode(call, in);
- o.exec("rgw", "bucket_set_tag_timeout", in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bucket_set_tag_timeout", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_index_init_op(io_ctx, oid, &manager);
+}
+
+void CLSRGWIssueBucketIndexInit::cleanup()
+{
+ // Do best effort removal
+ for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+ io_ctx.remove(citer->second);
+ }
+}
+
+int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
@@ -59,70 +171,89 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string&
o.exec("rgw", "bucket_complete_op", in);
}
-
-int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj,
- string& filter_prefix, uint32_t num_entries,
- rgw_bucket_dir *dir, bool *is_truncated)
-{
- bufferlist in, out;
+static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
+ const string& oid, const string& start_obj, const string& filter_prefix,
+ uint32_t num_entries, BucketIndexAioManager *manager,
+ struct rgw_cls_list_ret *pdata) {
+ bufferlist in;
struct rgw_cls_list_op call;
call.start_obj = start_obj;
call.filter_prefix = filter_prefix;
call.num_entries = num_entries;
::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
+}
- if (dir)
- *dir = ret.dir;
- if (is_truncated)
- *is_truncated = ret.is_truncated;
+int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]);
+}
- return r;
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+ struct cls_rgw_bi_log_list_ret *pdata) {
+ bufferlist in;
+ cls_rgw_bi_log_list_op call;
+ call.marker = marker_mgr.get(shard_id, "");
+ call.max = max;
+ ::encode(call, in);
+
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
}
-int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header)
+int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
- if (r < 0)
- return r;
+ return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
+}
- struct rgw_cls_check_index_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& start_marker_mgr,
+ BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+ bufferlist in;
+ cls_rgw_bi_log_trim_op call;
+ call.start_marker = start_marker_mgr.get(shard_id, "");
+ call.end_marker = end_marker_mgr.get(shard_id, "");
+ ::encode(call, in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bi_log_trim", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
- if (existing_header)
- *existing_header = ret.existing_header;
- if (calculated_header)
- *calculated_header = ret.calculated_header;
+int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+{
+ return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
+}
- return 0;
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+ struct rgw_cls_check_index_ret *pdata) {
+ bufferlist in;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
+ pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
}
-int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
+int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
- if (r < 0)
- return r;
+ return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+}
- return 0;
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.exec("rgw", "bucket_rebuild_index", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -136,28 +267,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
o.exec("rgw", "dir_suggest_changes", updates);
}
-int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header)
+int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- struct rgw_cls_list_op call;
- call.num_entries = 0;
- ::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
-
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- if (header)
- *header = ret.dir.header;
-
- return r;
+ return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]);
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
@@ -198,56 +310,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB
return 0;
}
-int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
- list<rgw_bi_log_entry>& entries, bool *truncated)
-{
- bufferlist in, out;
- cls_rgw_bi_log_list_op call;
- call.marker = marker;
- call.max = max;
- ::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out);
- if (r < 0)
- return r;
-
- cls_rgw_bi_log_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- entries = ret.entries;
-
- if (truncated)
- *truncated = ret.truncated;
-
- return r;
-}
-
-int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
-{
- do {
- int r;
- bufferlist in, out;
- cls_rgw_bi_log_trim_op call;
- call.start_marker = start_marker;
- call.end_marker = end_marker;
- ::encode(call, in);
- r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out);
-
- if (r == -ENODATA)
- break;
-
- if (r < 0)
- return r;
-
- } while (1);
-
- return 0;
-}
-
int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user,
uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,