diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2015-01-22 02:30:32 +0100 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2015-01-22 02:30:32 +0100 |
commit | 6f44f7a0a9847e419ce2783164633efa71218380 (patch) | |
tree | ded64c846bf4a006a9ccd55e2f7b17d4396dac6c /src/cls/rgw/cls_rgw_client.cc | |
parent | Merge pull request #3246 from ceph/wip-9780-9781 (diff) | |
download | ceph-6f44f7a0a9847e419ce2783164633efa71218380.tar.xz ceph-6f44f7a0a9847e419ce2783164633efa71218380.zip |
Revert "Revert "Merge remote-tracking branch 'origin/wip-bi-sharding-3' into next""
Following a merge of next to master, the feature got reverted (because
it was reverted on next). Undoing.
This reverts commit 6613358ddc5339c8e33c409387fd6044db0b6f26.
Diffstat (limited to 'src/cls/rgw/cls_rgw_client.cc')
-rw-r--r-- | src/cls/rgw/cls_rgw_client.cc | 304 |
1 files changed, 183 insertions, 121 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index c13c1a1559c..545b36bcff5 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -11,19 +11,131 @@ using namespace librados; +const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; +const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; + +/** + * This class represents the bucket index object operation callback context. + */ +template <typename T> +class ClsBucketIndexOpCtx : public ObjectOperationCompletion { +private: + T *data; + int *ret_code; +public: + ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); } + ~ClsBucketIndexOpCtx() {} + void handle_completion(int r, bufferlist& outbl) { + if (r >= 0) { + try { + bufferlist::iterator iter = outbl.begin(); + ::decode((*data), iter); + } catch (buffer::error& err) { + r = -EIO; + } + } + if (ret_code) { + *ret_code = r; + } + } +}; + +void BucketIndexAioManager::do_completion(int id) { + Mutex::Locker l(lock); + + map<int, librados::AioCompletion*>::iterator iter = pendings.find(id); + assert(iter != pendings.end()); + completions[id] = iter->second; + pendings.erase(iter); + + // If the caller needs a list of finished objects, store them + // for further processing + map<int, string>::iterator miter = pending_objs.find(id); + if (miter != pending_objs.end()) { + completion_objs[id] = miter->second; + pending_objs.erase(miter); + } + + cond.Signal(); +} + +bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, + int *num_completions, int *ret_code, map<int, string> *objs) { + lock.Lock(); + if (pendings.empty() && completions.empty()) { + lock.Unlock(); + return false; + } + + if (completions.empty()) { + // Wait for AIO completion + cond.Wait(lock); + } + + // Clear the completed AIOs + map<int, librados::AioCompletion*>::iterator iter = completions.begin(); + for (; iter != completions.end(); ++iter) { + int r = iter->second->get_return_value(); + if (objs && r == 0) { /* update list of successfully completed objs */ + map<int, string>::iterator liter = completion_objs.find(iter->first); + if (liter != completion_objs.end()) { + (*objs)[liter->first] = liter->second; + } + } + if (ret_code && (r < 0 && r != valid_ret_code)) + (*ret_code) = r; + iter->second->release(); + } + if (num_completions) + (*num_completions) = completions.size(); + completions.clear(); + lock.Unlock(); + + return true; +} + void cls_rgw_bucket_init(ObjectWriteOperation& o) { bufferlist in; o.exec("rgw", "bucket_init_index", in); } -void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout) -{ +static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, + const string& oid, BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.create(true); + op.exec("rgw", "bucket_init_index", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, + const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { bufferlist in; struct rgw_cls_tag_timeout_op call; - call.tag_timeout = tag_timeout; + call.tag_timeout = timeout; ::encode(call, in); - o.exec("rgw", "bucket_set_tag_timeout", in); + ObjectWriteOperation op; + op.exec("rgw", "bucket_set_tag_timeout", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_index_init_op(io_ctx, oid, &manager); +} + +void CLSRGWIssueBucketIndexInit::cleanup() +{ + // Do best effort removal + for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) { + io_ctx.remove(citer->second); + } +} + +int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager); } void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -59,70 +171,89 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& o.exec("rgw", "bucket_complete_op", in); } - -int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj, - string& filter_prefix, uint32_t num_entries, - rgw_bucket_dir *dir, bool *is_truncated) -{ - bufferlist in, out; +static bool issue_bucket_list_op(librados::IoCtx& io_ctx, + const string& oid, const string& start_obj, const string& filter_prefix, + uint32_t num_entries, BucketIndexAioManager *manager, + struct rgw_cls_list_ret *pdata) { + bufferlist in; struct rgw_cls_list_op call; call.start_obj = start_obj; call.filter_prefix = filter_prefix; call.num_entries = num_entries; ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL)); + return manager->aio_operate(io_ctx, oid, &op); +} - if (dir) - *dir = ret.dir; - if (is_truncated) - *is_truncated = ret.is_truncated; +int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]); +} - return r; +static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id, + BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager, + struct cls_rgw_bi_log_list_ret *pdata) { + bufferlist in; + cls_rgw_bi_log_list_op call; + call.marker = marker_mgr.get(shard_id, ""); + call.max = max; + ::encode(call, in); + + librados::ObjectReadOperation op; + op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL)); + return manager->aio_operate(io_ctx, oid, &op); } -int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, - rgw_bucket_dir_header *existing_header, - rgw_bucket_dir_header *calculated_header) +int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid) { - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out); - if (r < 0) - return r; + return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]); +} - struct rgw_cls_check_index_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } +static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id, + BucketIndexShardsManager& start_marker_mgr, + BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { + bufferlist in; + cls_rgw_bi_log_trim_op call; + call.start_marker = start_marker_mgr.get(shard_id, ""); + call.end_marker = end_marker_mgr.get(shard_id, ""); + ::encode(call, in); + ObjectWriteOperation op; + op.exec("rgw", "bi_log_trim", in); + return manager->aio_operate(io_ctx, oid, &op); +} - if (existing_header) - *existing_header = ret.existing_header; - if (calculated_header) - *calculated_header = ret.calculated_header; +int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid) +{ + return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); +} - return 0; +static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, + struct rgw_cls_check_index_ret *pdata) { + bufferlist in; + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>( + pdata, NULL)); + return manager->aio_operate(io_ctx, oid, &op); } -int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid) +int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) { - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out); - if (r < 0) - return r; + return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]); +} - return 0; +static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, + BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.exec("rgw", "bucket_rebuild_index", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_rebuild_index_op(io_ctx, oid, &manager); } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -136,28 +267,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec("rgw", "dir_suggest_changes", updates); } -int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header) +int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid) { - bufferlist in, out; - struct rgw_cls_list_op call; - call.num_entries = 0; - ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } - - if (header) - *header = ret.dir.header; - - return r; + return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]); } class GetDirHeaderCompletion : public ObjectOperationCompletion { @@ -198,56 +310,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB return 0; } -int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, - list<rgw_bi_log_entry>& entries, bool *truncated) -{ - bufferlist in, out; - cls_rgw_bi_log_list_op call; - call.marker = marker; - call.max = max; - ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out); - if (r < 0) - return r; - - cls_rgw_bi_log_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } - - entries = ret.entries; - - if (truncated) - *truncated = ret.truncated; - - return r; -} - -int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker) -{ - do { - int r; - bufferlist in, out; - cls_rgw_bi_log_trim_op call; - call.start_marker = start_marker; - call.end_marker = end_marker; - ::encode(call, in); - r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out); - - if (r == -ENODATA) - break; - - if (r < 0) - return r; - - } while (1); - - return 0; -} - int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage, |