diff options
29 files changed, 1933 insertions, 43 deletions
diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 46c2a3f9b49..161c78c9712 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -50,6 +50,12 @@ cls_method_handle_t h_rgw_user_usage_log_trim; cls_method_handle_t h_rgw_gc_set_entry; cls_method_handle_t h_rgw_gc_list; cls_method_handle_t h_rgw_gc_remove; +cls_method_handle_t h_rgw_lc_set_entry; +cls_method_handle_t h_rgw_lc_rm_entry; +cls_method_handle_t h_rgw_lc_get_next_entry; +cls_method_handle_t h_rgw_lc_put_head; +cls_method_handle_t h_rgw_lc_get_head; +cls_method_handle_t h_rgw_lc_list_entries; #define ROUND_BLOCK_SIZE 4096 @@ -2818,7 +2824,7 @@ static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_u rgw_user_bucket ub(puser->to_str(), entry.bucket); rgw_usage_log_entry& le = (*usage)[ub]; le.aggregate(entry); - + return 0; } @@ -2962,7 +2968,7 @@ static int gc_omap_remove(cls_method_context_t hctx, int type, const string& key static bool key_in_index(const string& key, int index_type) { - const string& prefix = gc_index_prefixes[index_type]; + const string& prefix = gc_index_prefixes[index_type]; return (key.compare(0, prefix.size(), prefix) == 0); } @@ -3221,6 +3227,154 @@ static int rgw_cls_gc_remove(cls_method_context_t hctx, bufferlist *in, bufferli return gc_remove(hctx, op.tags); } +static int rgw_cls_lc_set_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_rgw_lc_set_entry_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n"); + return -EINVAL; + } + + bufferlist bl; + ::encode(op.entry, bl); + + int ret = cls_cxx_map_set_val(hctx, op.entry.first, &bl); + return ret; +} + +static int rgw_cls_lc_rm_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_rgw_lc_rm_entry_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n"); + return -EINVAL; + } + + bufferlist bl; + ::encode(op.entry, bl); + + int ret = cls_cxx_map_remove_key(hctx, op.entry.first); + return ret; +} + +static int rgw_cls_lc_get_next_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + cls_rgw_lc_get_next_entry_ret op_ret; + cls_rgw_lc_get_next_entry_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n"); + return -EINVAL; + } + + map<string, bufferlist> vals; + string filter_prefix; + int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, 1, &vals); + if (ret < 0) + return ret; + map<string, bufferlist>::iterator it; + pair<string, int> entry; + if (!vals.empty()) { + it=vals.begin(); + in_iter = it->second.begin(); + try { + ::decode(entry, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_get_next_entry(): failed to decode entry\n"); + return -EIO; + } + } + op_ret.entry = entry; + ::encode(op_ret, *out); + return 0; +} + +static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_rgw_lc_list_entries_op op; + bufferlist::iterator in_iter = in->begin(); + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n"); + return -EINVAL; + } + cls_rgw_lc_list_entries_ret op_ret; + bufferlist::iterator iter; + map<string, bufferlist> vals; + string filter_prefix; + int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries, &vals); + if (ret < 0) + return ret; + map<string, bufferlist>::iterator it; + pair<string, int> entry; + for (it = vals.begin(); it != vals.end(); it++) { + iter = it->second.begin(); + try { + ::decode(entry, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_list_entries(): failed to decode entry\n"); + return -EIO; + } + op_ret.entries.insert(entry); + } + ::encode(op_ret, *out); + return 0; +} + +static int rgw_cls_lc_put_head(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_rgw_lc_put_head_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n"); + return -EINVAL; + } + + bufferlist bl; + ::encode(op.head, bl); + int ret = cls_cxx_map_write_header(hctx,&bl); + return ret; +} + +static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist bl; + int ret = cls_cxx_map_read_header(hctx, &bl); + if (ret < 0) + return ret; + cls_rgw_lc_obj_head head; + if (bl.length() != 0) { + bufferlist::iterator iter = bl.begin(); + try { + ::decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: rgw_cls_lc_get_head(): failed to decode entry %s\n",err.what()); + return -EINVAL; + } + } else { + head.start_date = 0; + head.marker.clear(); + } + cls_rgw_lc_get_head_ret op_ret; + op_ret.head = head; + ::encode(op_ret, *out); + return 0; +} + void __cls_init() { CLS_LOG(1, "Loaded rgw class!"); @@ -3265,6 +3419,14 @@ void __cls_init() cls_register_cxx_method(h_class, "gc_list", CLS_METHOD_RD, rgw_cls_gc_list, &h_rgw_gc_list); cls_register_cxx_method(h_class, "gc_remove", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_gc_remove, &h_rgw_gc_remove); + /* lifecycle bucket list */ + cls_register_cxx_method(h_class, "lc_set_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_set_entry, &h_rgw_lc_set_entry); + cls_register_cxx_method(h_class, "lc_rm_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_rm_entry, &h_rgw_lc_rm_entry); + cls_register_cxx_method(h_class, "lc_get_next_entry", CLS_METHOD_RD, rgw_cls_lc_get_next_entry, &h_rgw_lc_get_next_entry); + cls_register_cxx_method(h_class, "lc_put_head", CLS_METHOD_RD| CLS_METHOD_WR, rgw_cls_lc_put_head, &h_rgw_lc_put_head); + cls_register_cxx_method(h_class, "lc_get_head", CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head); + cls_register_cxx_method(h_class, "lc_list_entries", CLS_METHOD_RD, rgw_cls_lc_list_entries, &h_rgw_lc_list_entries); + return; } diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 1cf1156385b..41647845691 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -650,3 +650,105 @@ void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list<string>& t ::encode(call, in); op.exec("rgw", "gc_remove", in); } + +int cls_rgw_lc_get_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head) +{ + bufferlist in, out; + int r = io_ctx.exec(oid, "rgw", "lc_get_head", in, out); + if (r < 0) + return r; + + cls_rgw_lc_get_head_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + head = ret.head; + + return r; +} + +int cls_rgw_lc_put_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head) +{ + bufferlist in, out; + cls_rgw_lc_put_head_op call; + call.head = head; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_put_head", in, out); + return r; +} + +int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, string& oid, string& marker, pair<string, int>& entry) +{ + bufferlist in, out; + cls_rgw_lc_get_next_entry_op call; + call.marker = marker; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_get_next_entry", in, out); + if (r < 0) + return r; + + cls_rgw_lc_get_next_entry_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + entry = ret.entry; + + return r; +} + +int cls_rgw_lc_rm_entry(IoCtx& io_ctx, string& oid, pair<string, int>& entry) +{ + bufferlist in, out; + cls_rgw_lc_rm_entry_op call; + call.entry = entry; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_rm_entry", in, out); + return r; +} + +int cls_rgw_lc_set_entry(IoCtx& io_ctx, string& oid, pair<string, int>& entry) +{ + bufferlist in, out; + cls_rgw_lc_rm_entry_op call; + call.entry = entry; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_set_entry", in, out); + return r; +} + +int cls_rgw_lc_list(IoCtx& io_ctx, string& oid, + const string& marker, + uint32_t max_entries, + map<string, int>& entries) +{ + bufferlist in, out; + cls_rgw_lc_list_entries_op op; + + entries.clear(); + + op.marker = marker; + op.max_entries = max_entries; + + ::encode(op, in); + + int r = io_ctx.exec(oid, "rgw", "lc_list_entries", in, out); + if (r < 0) + return r; + + cls_rgw_lc_list_entries_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + entries.insert(ret.entries.begin(),ret.entries.end()); + + return r; +} diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 1b02a5eabf4..4c68385f5a8 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -474,4 +474,20 @@ int cls_rgw_gc_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32 void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list<string>& tags); +/* lifecycle */ +int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head); +int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head); +int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, string& oid, string& marker, pair<string, int>& entry); +int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry); +int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry); +int cls_rgw_lc_list(librados::IoCtx& io_ctx, string& oid, + const string& marker, + uint32_t max_entries, + map<string, int>& entries); + + + + + + #endif diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 15a638a3923..4aed26b4f5e 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -937,4 +937,162 @@ struct cls_rgw_bi_log_list_ret { }; WRITE_CLASS_ENCODER(cls_rgw_bi_log_list_ret) +struct cls_rgw_lc_get_next_entry_op { + string marker; + cls_rgw_lc_get_next_entry_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_get_next_entry_op) + +struct cls_rgw_lc_get_next_entry_ret { + pair<string, int> entry; + + cls_rgw_lc_get_next_entry_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entry, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_get_next_entry_ret) + +struct cls_rgw_lc_rm_entry_op { + pair<string, int> entry; + cls_rgw_lc_rm_entry_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entry, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_rm_entry_op) + +struct cls_rgw_lc_set_entry_op { + pair<string, int> entry; + cls_rgw_lc_set_entry_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entry, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_set_entry_op) + +struct cls_rgw_lc_put_head_op { + cls_rgw_lc_obj_head head; + + + cls_rgw_lc_put_head_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(head, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(head, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_put_head_op) + +struct cls_rgw_lc_get_head_ret { + cls_rgw_lc_obj_head head; + + cls_rgw_lc_get_head_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(head, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(head, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_get_head_ret) + +struct cls_rgw_lc_list_entries_op { + string marker; + uint32_t max_entries; + + cls_rgw_lc_list_entries_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(marker, bl); + ::encode(max_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(marker, bl); + ::decode(max_entries, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_op) + +struct cls_rgw_lc_list_entries_ret { + map<string, int> entries; + + cls_rgw_lc_list_entries_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_ret) + #endif /* CEPH_CLS_RGW_OPS_H */ diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index cf143ce92cc..7527db7c727 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -532,7 +532,7 @@ struct rgw_bi_log_entry { void dump(Formatter *f) const; void decode_json(JSONObj *obj); static void generate_test_instances(list<rgw_bi_log_entry*>& o); - + bool is_versioned() { return ((bilog_flags & RGW_BILOG_FLAG_VERSIONED_OP) != 0); } @@ -794,7 +794,7 @@ struct rgw_user_bucket { return true; else if (!comp) return bucket.compare(ub2.bucket) < 0; - + return false; } }; @@ -930,4 +930,28 @@ struct cls_rgw_gc_obj_info }; WRITE_CLASS_ENCODER(cls_rgw_gc_obj_info) +struct cls_rgw_lc_obj_head +{ + time_t start_date; + string marker; + + cls_rgw_lc_obj_head() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(start_date, bl); + ::encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(start_date, bl); + ::decode(marker, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_obj_head) + #endif diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 39406bc183b..bcba3e432c0 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1282,6 +1282,8 @@ OPTION(rgw_bucket_index_max_aio, OPT_U32, 8) */ OPTION(rgw_enable_quota_threads, OPT_BOOL, true) OPTION(rgw_enable_gc_threads, OPT_BOOL, true) +OPTION(rgw_enable_lc_threads, OPT_BOOL, true) + OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id") OPTION(rgw_enable_apis, OPT_STR, "s3, s3website, swift, swift_auth, admin") @@ -1293,6 +1295,11 @@ OPTION(rgw_port, OPT_STR, "") // port to listen, format as "8080" "5000", if no OPTION(rgw_dns_name, OPT_STR, "") // hostname suffix on buckets OPTION(rgw_dns_s3website_name, OPT_STR, "") // hostname suffix on buckets for s3-website endpoint OPTION(rgw_content_length_compat, OPT_BOOL, false) // Check both HTTP_CONTENT_LENGTH and CONTENT_LENGTH in fcgi env +OPTION(rgw_lifecycle_enabled, OPT_BOOL, true) //rgw lifecycle enabled +OPTION(rgw_lifecycle_thread, OPT_INT, 1) //start lifecycle thread number per radosgw +OPTION(rgw_lifecycle_work_time, OPT_STR, "00:00-06:00") //job process lc at 00:00-06:00s +OPTION(rgw_lc_lock_max_time, OPT_INT, 60) // total run time for a single gc processor work +OPTION(rgw_lc_max_objs, OPT_INT, 32) OPTION(rgw_script_uri, OPT_STR, "") // alternative value for SCRIPT_URI if not set in request OPTION(rgw_request_uri, OPT_STR, "") // alternative value for REQUEST_URI if not set in request OPTION(rgw_swift_url, OPT_STR, "") // the swift url, being published by the internal swift auth diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index fa66eb4ba53..6c449233dc5 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -41,6 +41,8 @@ set(rgw_a_srcs rgw_ldap.cc rgw_loadgen.cc rgw_log.cc + rgw_lc.cc + rgw_lc_s3.cc rgw_metadata.cc rgw_multi.cc rgw_multi_del.cc diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index 96c9c1a7bc3..90606ad08d6 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -28,6 +28,8 @@ librgw_la_SOURCES = \ rgw/rgw_auth.cc \ rgw/rgw_coroutine.cc \ rgw/rgw_cr_rados.cc \ + rgw/rgw_lc.cc \ + rgw/rgw_lc_s3.cc \ rgw/rgw_tools.cc \ rgw/rgw_basic_types.cc \ rgw/rgw_bucket.cc \ @@ -194,6 +196,8 @@ noinst_HEADERS += \ rgw/rgw_auth.h \ rgw/rgw_auth_decoimpl.h \ rgw/rgw_b64.h \ + rgw/rgw_lc.h \ + rgw/rgw_lc_s3.h \ rgw/rgw_client_io.h \ rgw/rgw_coroutine.h \ rgw/rgw_cr_rados.h \ diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index cded711ff1d..1b15a7014c6 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -457,6 +457,7 @@ namespace rgw { store = RGWStoreManager::get_storage(g_ceph_context, g_conf->rgw_enable_gc_threads, + g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads, g_conf->rgw_run_sync_thread); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index addf36c579a..4e851d257ee 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -26,7 +26,8 @@ #include "rgw_rados.h" #include "rgw_acl.h" #include "rgw_acl_s3.h" - +#include "rgw_lc.h" +#include "rgw_log.h" #include "rgw_formats.h" #include "rgw_usage.h" #include "rgw_replica_log.h" @@ -45,7 +46,7 @@ using namespace std; static RGWRados *store = NULL; -void _usage() +void _usage() { cout << "usage: radosgw-admin <cmd> [options...]" << std::endl; cout << "commands:\n"; @@ -131,6 +132,8 @@ void _usage() cout << " gc list dump expired garbage collection objects (specify\n"; cout << " --include-all to list all entries, including unexpired)\n"; cout << " gc process manually process garbage\n"; + cout << " lc list list all bucket lifecycle progress\n"; + cout << " lc process manually process lifecycle\n"; cout << " metadata get get metadata info\n"; cout << " metadata put put metadata info\n"; cout << " metadata rm remove metadata info\n"; @@ -310,6 +313,8 @@ enum { OPT_QUOTA_DISABLE, OPT_GC_LIST, OPT_GC_PROCESS, + OPT_LC_LIST, + OPT_LC_PROCESS, OPT_ORPHANS_FIND, OPT_ORPHANS_FINISH, OPT_ORPHANS_LIST_JOBS, @@ -326,7 +331,7 @@ enum { OPT_ZONEGROUPMAP_GET, OPT_ZONEGROUPMAP_SET, OPT_ZONEGROUPMAP_UPDATE, - OPT_ZONE_CREATE, + OPT_ZONE_CREATE, OPT_ZONE_DELETE, OPT_ZONE_GET, OPT_ZONE_MODIFY, @@ -399,16 +404,17 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ strcmp(cmd, "data") == 0 || strcmp(cmd, "datalog") == 0 || strcmp(cmd, "error") == 0 || - strcmp(cmd, "gc") == 0 || + strcmp(cmd, "gc") == 0 || strcmp(cmd, "key") == 0 || strcmp(cmd, "log") == 0 || + strcmp(cmd, "lc") == 0 || strcmp(cmd, "mdlog") == 0 || strcmp(cmd, "metadata") == 0 || strcmp(cmd, "object") == 0 || strcmp(cmd, "objects") == 0 || strcmp(cmd, "olh") == 0 || strcmp(cmd, "opstate") == 0 || - strcmp(cmd, "orphans") == 0 || + strcmp(cmd, "orphans") == 0 || strcmp(cmd, "period") == 0 || strcmp(cmd, "pool") == 0 || strcmp(cmd, "pools") == 0 || @@ -660,6 +666,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_GC_LIST; if (strcmp(cmd, "process") == 0) return OPT_GC_PROCESS; + } else if (strcmp(prev_cmd, "lc") == 0) { + if (strcmp(cmd, "list") == 0) + return OPT_LC_LIST; + if (strcmp(cmd, "process") == 0) + return OPT_LC_PROCESS; } else if (strcmp(prev_cmd, "orphans") == 0) { if (strcmp(cmd, "find") == 0) return OPT_ORPHANS_FIND; @@ -854,7 +865,7 @@ int bucket_stats(rgw_bucket& bucket, int shard_id, Formatter *formatter) formatter->dump_string("bucket", bucket.name); formatter->dump_string("pool", bucket.data_pool); formatter->dump_string("index_pool", bucket.index_pool); - + formatter->dump_string("id", bucket.bucket_id); formatter->dump_string("marker", bucket.marker); ::encode_json("owner", bucket_info.owner, formatter); @@ -957,7 +968,7 @@ static int read_decode_json(const string& infile, T& t) } return 0; } - + template <class T, class K> static int read_decode_json(const string& infile, T& t, K *k) { @@ -1196,7 +1207,7 @@ int check_obj_locator_underscore(RGWBucketInfo& bucket_info, rgw_obj& obj, rgw_o f->dump_string("oid", oid); f->dump_string("locator", locator); - + RGWObjectCtx obj_ctx(store); RGWRados::Object op_target(store, bucket_info, obj_ctx, obj); @@ -1294,7 +1305,7 @@ int do_check_object_locator(const string& tenant_name, const string& bucket_name list_op.params.ns = ns; list_op.params.enforce_ns = true; list_op.params.list_versions = true; - + f->open_array_section("check_objects"); do { ret = list_op.list_objects(max_entries - count, &result, &common_prefixes, &truncated); @@ -1311,7 +1322,7 @@ int do_check_object_locator(const string& tenant_name, const string& bucket_name if (key.name[0] == '_') { ret = check_obj_locator_underscore(bucket_info, obj, key, fix, remove_bad, f); - + if (ret >= 0) { ret = check_obj_tail_locator_underscore(bucket_info, obj, key, fix, f); } @@ -1672,7 +1683,7 @@ static void get_md_sync_status(list<string>& status) } status.push_back(status_str); - + uint64_t full_total = 0; uint64_t full_complete = 0; @@ -1811,7 +1822,7 @@ static void get_data_sync_status(const string& source_zone, list<string>& status } push_ss(ss, status, tab) << status_str; - + uint64_t full_total = 0; uint64_t full_complete = 0; @@ -1959,7 +1970,7 @@ static void sync_status(Formatter *formatter) tab_dump("data sync", width, data_status); } -int main(int argc, char **argv) +int main(int argc, char **argv) { vector<const char*> args; argv_to_vec(argc, (const char **)argv, args); @@ -2437,7 +2448,7 @@ int main(int argc, char **argv) bool raw_period_pull = opt_cmd == OPT_PERIOD_PULL && remote.empty() && !url.empty(); bool raw_storage_op = (opt_cmd == OPT_ZONEGROUP_ADD || opt_cmd == OPT_ZONEGROUP_CREATE || opt_cmd == OPT_ZONEGROUP_DELETE || - opt_cmd == OPT_ZONEGROUP_GET || opt_cmd == OPT_ZONEGROUP_LIST || + opt_cmd == OPT_ZONEGROUP_GET || opt_cmd == OPT_ZONEGROUP_LIST || opt_cmd == OPT_ZONEGROUP_SET || opt_cmd == OPT_ZONEGROUP_DEFAULT || opt_cmd == OPT_ZONEGROUP_RENAME || opt_cmd == OPT_ZONEGROUP_MODIFY || opt_cmd == OPT_ZONEGROUP_REMOVE || @@ -2459,7 +2470,7 @@ int main(int argc, char **argv) if (raw_storage_op) { store = RGWStoreManager::get_raw_storage(g_ceph_context); } else { - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); } if (!store) { cerr << "couldn't init storage provider" << std::endl; @@ -2720,7 +2731,7 @@ int main(int argc, char **argv) if (ret < 0) { cerr << "list periods failed: " << cpp_strerror(-ret) << std::endl; return -ret; - } + } formatter->open_object_section("realm_periods_list"); encode_json("current_period", period_id, formatter); encode_json("periods", periods, formatter); @@ -3203,7 +3214,7 @@ int main(int argc, char **argv) cerr << "failed to read zonegroupmap info: " << cpp_strerror(ret); return ret; } - + encode_json("zonegroup-map", zonegroupmap, formatter); formatter->flush(cout); } @@ -3977,7 +3988,7 @@ int main(int argc, char **argv) list_op.params.ns = ns; list_op.params.enforce_ns = false; list_op.params.list_versions = true; - + do { ret = list_op.list_objects(max_entries - count, &result, &common_prefixes, &truncated); if (ret < 0) { @@ -4088,7 +4099,7 @@ int main(int argc, char **argv) formatter->open_object_section("log"); struct rgw_log_entry entry; - + // peek at first entry to get bucket metadata r = store->log_show_next(h, &entry); if (r < 0) { @@ -4155,7 +4166,7 @@ next: } } } - + if (opt_cmd == OPT_POOL_ADD) { if (pool_name.empty()) { cerr << "need to specify pool to add!" << std::endl; @@ -4203,7 +4214,7 @@ next: uint64_t end_epoch = (uint64_t)-1; int ret; - + if (!start_date.empty()) { ret = utime_t::parse_date(start_date, &start_epoch, NULL); if (ret < 0) { @@ -4260,7 +4271,7 @@ next: if (ret < 0) { cerr << "ERROR: read_usage() returned ret=" << ret << std::endl; return 1; - } + } } if (opt_cmd == OPT_OLH_GET || opt_cmd == OPT_OLH_READLOG) { @@ -4704,6 +4715,39 @@ next: } } + if (opt_cmd == OPT_LC_LIST) { + formatter->open_array_section("life cycle progress"); + map<string, int> bucket_lc_map; + string marker; +#define MAX_LC_LIST_ENTRIES 100 + do { + int ret = store->list_lc_progress(marker, max_entries, &bucket_lc_map); + if (ret < 0) { + cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl; + return 1; + } + map<string, int>::iterator iter; + for (iter = bucket_lc_map.begin(); iter != bucket_lc_map.end(); ++iter) { + formatter->open_object_section("bucket_lc_info"); + formatter->dump_string("bucket", iter->first); + string lc_status = LC_STATUS[iter->second]; + formatter->dump_string("status", lc_status); + formatter->close_section(); // objs + formatter->flush(cout); + marker = iter->first; + } + } while (!bucket_lc_map.empty()); + } + + + if (opt_cmd == OPT_LC_PROCESS) { + int ret = store->process_lc(); + if (ret < 0) { + cerr << "ERROR: lc processing returned error: " << cpp_strerror(-ret) << std::endl; + return 1; + } + } + if (opt_cmd == OPT_ORPHANS_FIND) { RGWOrphanSearch search(store, max_concurrent_ios, orphan_stale_secs); @@ -4913,7 +4957,7 @@ next: list<cls_log_entry> entries; - meta_log->init_list_entries(i, start_time.to_real_time(), end_time.to_real_time(), marker, &handle); + meta_log->init_list_entries(i, start_time.to_real_time(), end_time.to_real_time(), marker, &handle); bool truncated; do { int ret = meta_log->list_entries(handle, 1000, entries, NULL, &truncated); @@ -4934,7 +4978,7 @@ next: if (specified_shard_id) break; } - + formatter->close_section(); formatter->flush(cout); @@ -4964,7 +5008,7 @@ next: if (specified_shard_id) break; } - + formatter->close_section(); formatter->flush(cout); @@ -5454,7 +5498,7 @@ next: formatter->close_section(); formatter->flush(cout); } - + if (opt_cmd == OPT_DATALOG_STATUS) { RGWDataChangesLog *log = store->data_log; int i = (specified_shard_id ? shard_id : 0); @@ -5475,7 +5519,7 @@ next: formatter->close_section(); formatter->flush(cout); } - + if (opt_cmd == OPT_DATALOG_TRIM) { utime_t start_time, end_time; diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index 26d5aa77390..f483fd12aa8 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -746,6 +746,7 @@ void RGWHTTPArgs::append(const string& name, const string& val) (name.compare("location") == 0) || (name.compare("logging") == 0) || (name.compare("usage") == 0) || + (name.compare("lifecycle") == 0) || (name.compare("delete") == 0) || (name.compare("uploads") == 0) || (name.compare("partNumber") == 0) || diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 792f1ecbc2c..8f551cdaa30 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -61,6 +61,7 @@ using ceph::crypto::MD5; #define RGW_SYS_PARAM_PREFIX "rgwx-" #define RGW_ATTR_ACL RGW_ATTR_PREFIX "acl" +#define RGW_ATTR_LC RGW_ATTR_PREFIX "lc" #define RGW_ATTR_CORS RGW_ATTR_PREFIX "cors" #define RGW_ATTR_ETAG RGW_ATTR_PREFIX "etag" #define RGW_ATTR_BUCKETS RGW_ATTR_PREFIX "buckets" diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc new file mode 100644 index 00000000000..3d110e829fe --- /dev/null +++ b/src/rgw/rgw_lc.cc @@ -0,0 +1,546 @@ +#include <string.h> +#include <iostream> +#include <map> + +#include "include/types.h" + +#include "common/Formatter.h" +#include <common/errno.h> +#include "auth/Crypto.h" +#include "include/rados/librados.hpp" +#include "cls/rgw/cls_rgw_client.h" +#include "cls/refcount/cls_refcount_client.h" +#include "cls/lock/cls_lock_client.h" +#include <common/dout.h> +#include "rgw_common.h" +#include "rgw_bucket.h" +#include "rgw_lc.h" +#include "rgw_lc_s3.h" + + + +#define dout_subsys ceph_subsys_rgw + +const char* LC_STATUS[] = { + "UNINITIAL", + "PROCESSING", + "FAILED", + "COMPLETE" +}; + +using namespace std; +using namespace librados; +void RGWLifecycleConfiguration::add_rule(LCRule *rule) +{ + string id; + rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups + rule_map.insert(pair<string, LCRule>(id, *rule)); + _add_rule(rule); +} + +void RGWLifecycleConfiguration::_add_rule(LCRule *rule) +{ + string prefix; + LCExpiration expiration; + int days; + if (!rule->get_prefix(prefix)) { + ldout(cct, 5) << "ERROR: rule->get_prefix() failed" << dendl; + } + if (!rule->get_expiration(expiration)) { + ldout(cct, 5) << "ERROR: rule->get_expiration() failed" << dendl; + } + if (!expiration.get_days(&days)) { + ldout(cct, 5) << "ERROR: expiration->get_days() failed" << dendl; + } + prefix_map[prefix] = days; +} + +void *RGWLC::LCWorker::entry() { + do { + utime_t start = ceph_clock_now(cct); + if (should_work(start)) { + dout(5) << "life cycle: start" << dendl; + int r = lc->process(); + if (r < 0) { + dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl; + } + dout(5) << "life cycle: stop" << dendl; + } + if (lc->going_down()) + break; + + utime_t end = ceph_clock_now(cct); + int secs = shedule_next_start_time(end); + time_t next_time = end + secs; + char buf[30]; + char *nt = ctime_r(&next_time, buf); + dout(5) << "shedule life cycle next start time: " << nt <<dendl; + + lock.Lock(); + cond.WaitInterval(cct, lock, utime_t(secs, 0)); + lock.Unlock(); + } while (!lc->going_down()); + + return NULL; +} + +void RGWLC::initialize(CephContext *_cct, RGWRados *_store) { + cct = _cct; + store = _store; + max_objs = cct->_conf->rgw_lc_max_objs; + if (max_objs > HASH_PRIME) + max_objs = HASH_PRIME; + + obj_names = new string[max_objs]; + + for (int i = 0; i < max_objs; i++) { + obj_names[i] = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", i); + obj_names[i].append(buf); + } +} + +void RGWLC::finalize() +{ + delete[] obj_names; +} + +bool RGWLC::if_already_run_today(time_t& start_date) +{ + struct tm bdt; + time_t begin_of_day; + utime_t now = ceph_clock_now(cct); + localtime_r(&start_date, &bdt); + bdt.tm_hour = 0; + bdt.tm_min = 0; + bdt.tm_sec = 0; + begin_of_day = mktime(&bdt); + if (now - begin_of_day < 24*60*60) + return true; + else + return false; +} + +static std::vector<std::string> &split(const std::string &s, char delim, std::vector<std::string> &elems) { + std::stringstream ss(s); + std::string item; + while (std::getline(ss, item, delim)) { + elems.push_back(item); + } + return elems; +} + +static std::vector<std::string> split(const std::string &s, char delim) { + std::vector<std::string> elems; + split(s, delim, elems); + return elems; +} + +int RGWLC::bucket_lc_prepare(int index) +{ + map<string, int > entries; + + string marker; + +#define MAX_LC_LIST_ENTRIES 100 + do { + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries); + if (ret < 0) + return ret; + map<string, int>::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + pair<string, int > entry(iter->first, lc_uninitial); + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl; + break; + } + marker = iter->first; + } + } while (!entries.empty()); + + return 0; +} + +int RGWLC::bucket_lc_process(string& shard_id) +{ + RGWLifecycleConfiguration config(cct); + RGWBucketInfo bucket_info; + map<string, bufferlist> bucket_attrs; + string prefix, delimiter, marker, next_marker, no_ns, end_marker, list_versions; + bool is_truncated; + bool default_config = false; + int default_days = 0; + vector<RGWObjEnt> objs; + RGWObjectCtx obj_ctx(store); + map<string, bool> common_prefixes; + vector<std::string> result; + result = split(shard_id, ':'); + string bucket_tenant = result[0]; + string bucket_name = result[1]; + string bucket_id = result[2]; + int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs); + if (ret < 0) { + ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl; + return ret; + } + + ret = bucket_info.bucket.bucket_id.compare(bucket_id) ; + if (ret !=0) { + ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl; + return -ENOENT; + } + + RGWRados::Bucket target(store, bucket_info); + RGWRados::Bucket::List list_op(&target); + + list_op.params.prefix = prefix; + list_op.params.delim = delimiter; + list_op.params.marker = marker; + list_op.params.end_marker = end_marker; + list_op.params.list_versions = false; + + map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC); + if (aiter == bucket_attrs.end()) + return 0; + + bufferlist::iterator iter(&aiter->second); + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl; + return -1; + } + + map<string, int>& prefix_map = config.get_prefix_map(); + for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) { + if (prefix_iter->first.empty()) { + default_config = true; + default_days = prefix_iter->second; + continue; + } + } + + if (default_config) { + do { + + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated); + if (ret < 0) { + if (ret == -ENOENT) + return 0; + ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl; + return ret; + } + + vector<RGWObjEnt>::iterator obj_iter; + int pos = 0; + utime_t now = ceph_clock_now(cct); + for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) { + bool prefix_match = false; + int match_days = 0; + map<string, int>& prefix_map = config.get_prefix_map(); + + for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) { + if (prefix_iter->first.empty()) { + continue; + } + pos = (*obj_iter).key.name.find(prefix_iter->first, 0); + if (pos != 0) { + continue; + } + prefix_match = true; + match_days = prefix_iter->second; + break; + } + int days = 0; + if (prefix_match) { + days = match_days; + } else if (default_config) { + days = default_days; + } else { + continue; + } + if (now - ceph::real_clock::to_time_t((*obj_iter).mtime) >= days*24*60*60) { + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name); + RGWObjState *state; + int ret = store->get_obj_state(&rctx, obj, &state, false); + if (ret < 0) { + return ret; + } + if (state->mtime != (*obj_iter).mtime) //Check mtime again to avoid delete a recently update object as much as possible + continue; + ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key); + if (ret < 0) { + ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl; + } else { + ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name <<dendl; + } + } + } + } while (is_truncated); + } else { + for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) { + if (prefix_iter->first.empty()) { + continue; + } + list_op.params.prefix = prefix_iter->first; + + do { + + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated); + + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl; + return ret; + } + + vector<RGWObjEnt>::iterator obj_iter; + int days = prefix_iter->second; + utime_t now = ceph_clock_now(cct); + + for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) { + if (now - ceph::real_clock::to_time_t((*obj_iter).mtime) >= days*24*60*60) { + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name); + RGWObjState *state; + int ret = store->get_obj_state(&rctx, obj, &state, false); + if (ret < 0) { + return ret; + } + if (state->mtime != (*obj_iter).mtime)//Check mtime again to avoid delete a recently update object as much as possible + continue; + ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key); + if (ret < 0) { + ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl; + } else { + ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name << dendl; + } + } + } + } while (is_truncated); + } + } + + return ret; +} + +int RGWLC::bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, + pair<string, int >& entry, int& result) +{ + rados::cls::lock::Lock l(lc_index_lock_name); + do { + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY) { /* already locked by another lc processor */ + dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl; + sleep(10); + continue; + } + if (ret < 0) + return 0; + dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl; + if (result == -ENOENT) { + ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl; + goto clean; + } + } else if (result < 0) { + entry.second = lc_failed; + } else { + entry.second = lc_complete; + } + + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl; + } +clean: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl; + return 0; + } while (true); +} + +int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map) +{ + int index = 0; + progress_map->clear(); + for(; index <max_objs; index++) { + map<string, int > entries; + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries); + if (ret < 0) + return ret; + map<string, int>::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + progress_map->insert(*iter); + } + } + return 0; +} + +int RGWLC::process() +{ + int max_secs = cct->_conf->rgw_lc_lock_max_time; + + unsigned start; + int ret = get_random_bytes((char *)&start, sizeof(start)); + if (ret < 0) + return ret; + + for (int i = 0; i < max_objs; i++) { + int index = (i + start) % max_objs; + ret = process(index, max_secs); + if (ret < 0) + return ret; + } + + return 0; +} + +int RGWLC::process(int index, int max_lock_secs) +{ + rados::cls::lock::Lock l(lc_index_lock_name); + do { + utime_t now = ceph_clock_now(g_ceph_context); + pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS + if (max_lock_secs <= 0) + return -EAGAIN; + + utime_t time(max_lock_secs, 0); + l.set_duration(time); + + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY) { /* already locked by another lc processor */ + dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl; + sleep(10); + continue; + } + if (ret < 0) + return 0; + + string marker; + cls_rgw_lc_obj_head head; + ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl; + goto exit; + } + + if(!if_already_run_today(head.start_date)) { + head.start_date = now; + head.marker.clear(); + ret = bucket_lc_prepare(index); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl; + goto exit; + } + } + + ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl; + goto exit; + } + + if (entry.first.empty()) + goto exit; + + entry.second = lc_processing; + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl; + goto exit; + } + + head.marker = entry.first; + ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl; + goto exit; + } + l.unlock(&store->lc_pool_ctx, obj_names[index]); + ret = bucket_lc_process(entry.first); + ret = bucket_lc_post(index, max_lock_secs, head, entry, ret); + return 0; +exit: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + return 0; + + }while(1); + +} + +void RGWLC::start_processor() +{ + worker = new LCWorker(cct, this); + worker->create("lifecycle_thread"); +} + +void RGWLC::stop_processor() +{ + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = NULL; +} + +void RGWLC::LCWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + +bool RGWLC::going_down() +{ + return false; +} + +bool RGWLC::LCWorker::should_work(utime_t& now) +{ + int start_hour; + int start_minite; + int end_hour; + int end_minite; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite); + struct tm bdt; + time_t tt = now.sec(); + localtime_r(&tt, &bdt); + if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minite)|| + (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minite)) { + return true; + } else { + return false; + } + +} + +int RGWLC::LCWorker::shedule_next_start_time(utime_t& now) +{ + int start_hour; + int start_minite; + int end_hour; + int end_minite; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite); + struct tm bdt; + time_t tt = now.sec(); + time_t nt; + localtime_r(&tt, &bdt); + bdt.tm_hour = start_hour; + bdt.tm_min = start_minite; + bdt.tm_sec = 0; + nt = mktime(&bdt); + return (nt+24*60*60 - tt); +} + diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h new file mode 100644 index 00000000000..cdc5ff2f6ba --- /dev/null +++ b/src/rgw/rgw_lc.h @@ -0,0 +1,227 @@ +#ifndef CEPH_RGW_LC_H +#define CEPH_RGW_LC_H + +#include <map> +#include <string> +#include <iostream> +#include <include/types.h> + +#include "common/debug.h" + +#include "include/types.h" +#include "include/atomic.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_common.h" +#include "rgw_rados.h" +#include "cls/rgw/cls_rgw_types.h" + +using namespace std; +#define HASH_PRIME 7877 +static string lc_oid_prefix = "lc"; +static string lc_index_lock_name = "lc_process"; + +extern const char* LC_STATUS[]; + +typedef enum { + lc_uninitial = 0, + lc_processing, + lc_failed, + lc_complete, +}LC_BUCKET_STATUS; + +class LCExpiration +{ +protected: + string days; +public: + LCExpiration() {} + ~LCExpiration() {} + + void encode(bufferlist& bl) const { + ENCODE_START(2, 2, bl); + ::encode(days, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); + ::decode(days, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; +// static void generate_test_instances(list<ACLOwner*>& o); + void set_days(const string& _days) { days = _days; } + bool get_days(int* _days) {*_days = atoi(days.c_str()); return true; } +}; +WRITE_CLASS_ENCODER(LCExpiration) + +class LCRule +{ +protected: + string id; + string prefix; + string status; + LCExpiration expiration; + +public: + + LCRule(){}; + ~LCRule(){}; + + bool get_id(string& _id) { + _id = id; + return true; + } + + bool get_status(string& _status) { + _status = status; + return true; + } + + bool get_prefix(string& _prefix) { + _prefix = prefix; + return true; + } + + bool get_expiration(LCExpiration& _expriation) { + _expriation = expiration; + return true; + } + + void set_id(string*_id) { + id = *_id; + } + + void set_prefix(string*_prefix) { + prefix = *_prefix; + } + + void set_status(string*_status) { + status = *_status; + } + + void set_expiration(LCExpiration*_expiration) { + expiration = *_expiration; + } + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(id, bl); + ::encode(prefix, bl); + ::encode(status, bl); + ::encode(expiration, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl); + ::decode(id, bl); + ::decode(prefix, bl); + ::decode(status, bl); + ::decode(expiration, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(LCRule) + +class RGWLifecycleConfiguration +{ +protected: + CephContext *cct; + map<string, int> prefix_map; + multimap<string, LCRule> rule_map; + void _add_rule(LCRule *rule); +public: + RGWLifecycleConfiguration(CephContext *_cct) : cct(_cct) {} + RGWLifecycleConfiguration() : cct(NULL) {} + + void set_ctx(CephContext *ctx) { + cct = ctx; + } + + virtual ~RGWLifecycleConfiguration() {} + +// int get_perm(string& id, int perm_mask); +// int get_group_perm(ACLGroupTypeEnum group, int perm_mask); + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(rule_map, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl); + ::decode(rule_map, bl); + multimap<string, LCRule>::iterator iter; + for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) { + LCRule& rule = iter->second; + _add_rule(&rule); + } + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; +// static void generate_test_instances(list<RGWAccessControlList*>& o); + + void add_rule(LCRule* rule); + + multimap<string, LCRule>& get_rule_map() { return rule_map; } + map<string, int>& get_prefix_map() { return prefix_map; } +/* + void create_default(string id, string name) { + ACLGrant grant; + grant.set_canon(id, name, RGW_PERM_FULL_CONTROL); + add_grant(&grant); + } +*/ +}; +WRITE_CLASS_ENCODER(RGWLifecycleConfiguration) + +class RGWLC { + CephContext *cct; + RGWRados *store; + int max_objs; + string *obj_names; + + class LCWorker : public Thread { + CephContext *cct; + RGWLC *lc; + Mutex lock; + Cond cond; + + public: + LCWorker(CephContext *_cct, RGWLC *_lc) : cct(_cct), lc(_lc), lock("LCWorker") {} + void *entry(); + void stop(); + bool should_work(utime_t& now); + int shedule_next_start_time(utime_t& now); + }; + + public: + LCWorker *worker; +public: + RGWLC() : cct(NULL), store(NULL), worker(NULL) {} + ~RGWLC() { + stop_processor(); + finalize(); + } + + void initialize(CephContext *_cct, RGWRados *_store); + void finalize(); + + int process(); + int process(int index, int max_secs); + bool if_already_run_today(time_t& start_date); + int list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map); + int bucket_lc_prepare(int index); + int bucket_lc_process(string& shard_id); + int bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, + pair<string, int >& entry, int& result); + bool going_down(); + void start_processor(); + void stop_processor(); +}; + + + +#endif diff --git a/src/rgw/rgw_lc_s3.cc b/src/rgw/rgw_lc_s3.cc new file mode 100644 index 00000000000..cee710bb9a7 --- /dev/null +++ b/src/rgw/rgw_lc_s3.cc @@ -0,0 +1,112 @@ +#include <string.h> + +#include <iostream> +#include <map> + +#include "include/types.h" + +#include "rgw_lc_s3.h" + + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + +bool LCExpiration_S3::xml_end(const char * el) { + LCDays_S3 *lc_days = static_cast<LCDays_S3 *>(find_first("Days")); + + // ID is mandatory + if (!lc_days) + return false; + days = lc_days->get_data(); + return true; +} + +bool RGWLifecycleConfiguration_S3::xml_end(const char *el) { + XMLObjIter iter = find("Rule"); + LCRule_S3 *rule = static_cast<LCRule_S3 *>(iter.get_next()); + while (rule) { + add_rule(rule); + rule = static_cast<LCRule_S3 *>(iter.get_next()); + } + return true; +} + +bool LCRule_S3::xml_end(const char *el) { + LCID_S3 *lc_id; + LCPrefix_S3 *lc_prefix; + LCStatus_S3 *lc_status; + LCExpiration_S3 *lc_expiration; + + id.clear(); + prefix.clear(); + status.clear(); + + lc_id = static_cast<LCID_S3 *>(find_first("ID")); + if (!lc_id) + return false; + id = lc_id->get_data(); + + lc_prefix = static_cast<LCPrefix_S3 *>(find_first("Prefix")); + if (!lc_prefix) + return false; + prefix = lc_prefix->get_data(); + + lc_status = static_cast<LCStatus_S3 *>(find_first("Status")); + if (!lc_status) + return false; + status = lc_status->get_data(); + + lc_expiration = static_cast<LCExpiration_S3 *>(find_first("Expiration")); + if (!lc_expiration) + return false; + expiration = *lc_expiration; + + return true; +} + +void LCRule_S3::to_xml(CephContext *cct, ostream& out) { + LCExpiration_S3& expir = static_cast<LCExpiration_S3&>(expiration); + out << "<Rule>" ; + out << "<ID>" << id << "</ID>"; + out << "<Prefix>" << prefix << "</Prefix>"; + out << "<Status>" << status << "</Status>"; + expir.to_xml(out); + out << "</Rule>"; +} + +int RGWLifecycleConfiguration_S3::rebuild(RGWRados *store, RGWLifecycleConfiguration& dest) +{ + multimap<string, LCRule>::iterator iter; + for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) { + LCRule& src_rule = iter->second; + bool rule_ok = true; + + if (rule_ok) { + dest.add_rule(&src_rule); + } + } + + return 0; +} + +XMLObj *RGWLCXMLParser_S3::alloc_obj(const char *el) +{ + XMLObj * obj = NULL; + if (strcmp(el, "LifecycleConfiguration") == 0) { + obj = new RGWLifecycleConfiguration_S3(cct); + } else if (strcmp(el, "Rule") == 0) { + obj = new LCRule_S3(); + } else if (strcmp(el, "ID") == 0) { + obj = new LCID_S3(); + } else if (strcmp(el, "Prefix") == 0) { + obj = new LCPrefix_S3(); + } else if (strcmp(el, "Status") == 0) { + obj = new LCStatus_S3(); + } else if (strcmp(el, "Expiration") == 0) { + obj = new LCExpiration_S3(); + } else if (strcmp(el, "Days") == 0) { + obj = new LCDays_S3(); + } + return obj; +} diff --git a/src/rgw/rgw_lc_s3.h b/src/rgw/rgw_lc_s3.h new file mode 100644 index 00000000000..1de47d5fb60 --- /dev/null +++ b/src/rgw/rgw_lc_s3.h @@ -0,0 +1,104 @@ +#ifndef CEPH_RGW_LC_S3_H +#define CEPH_RGW_LC_S3_H + +#include <map> +#include <string> +#include <iostream> +#include <include/types.h> + +#include <expat.h> + +#include "include/str_list.h" +#include "rgw_lc.h" +#include "rgw_xml.h" + + + +using namespace std; + +class LCRule_S3 : public LCRule, public XMLObj +{ +public: + LCRule_S3() {} + ~LCRule_S3() {} + + void to_xml(CephContext *cct, ostream& out); + bool xml_end(const char *el); + bool xml_start(const char *el, const char **attr); +}; + +class LCID_S3 : public XMLObj +{ +public: + LCID_S3() {} + ~LCID_S3() {} + string& to_str() { return data; } +}; + +class LCPrefix_S3 : public XMLObj +{ +public: + LCPrefix_S3() {} + ~LCPrefix_S3() {} + string& to_str() { return data; } +}; + +class LCStatus_S3 : public XMLObj +{ +public: + LCStatus_S3() {} + ~LCStatus_S3() {} + string& to_str() { return data; } +}; + +class LCDays_S3 : public XMLObj +{ +public: + LCDays_S3() {} + ~LCDays_S3() {} + string& to_str() { return data; } +}; + +class LCExpiration_S3 : public LCExpiration, public XMLObj +{ +public: + LCExpiration_S3() {} + ~LCExpiration_S3() {} + + bool xml_end(const char *el); + void to_xml(ostream& out) { + out << "<Expiration>" << "<Days>" << days << "</Days>"<< "</Expiration>"; + } +}; + +class RGWLCXMLParser_S3 : public RGWXMLParser +{ + CephContext *cct; + + XMLObj *alloc_obj(const char *el); +public: + RGWLCXMLParser_S3(CephContext *_cct) : cct(_cct) {} +}; + +class RGWLifecycleConfiguration_S3 : public RGWLifecycleConfiguration, public XMLObj +{ +public: + RGWLifecycleConfiguration_S3(CephContext *_cct) : RGWLifecycleConfiguration(_cct) {} + ~RGWLifecycleConfiguration_S3() {} + + bool xml_end(const char *el); + + void to_xml(ostream& out) { + out << "<LifecycleConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">"; + multimap<string, LCRule>::iterator iter; + for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) { + LCRule_S3& rule = static_cast<LCRule_S3&>(iter->second); + rule.to_xml(cct, out); + } + out << "</LifecycleConfiguration>"; + } + int rebuild(RGWRados *store, RGWLifecycleConfiguration& dest); +}; + + +#endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 85c044ae275..323a5e7a01d 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -302,7 +302,7 @@ int main(int argc, const char **argv) FCGX_Init(); RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, - g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_quota_threads, + g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads, g_conf->rgw_run_sync_thread); if (!store) { mutex.Lock(); diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index f044db7f875..97a17bd3e28 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -78,7 +78,7 @@ int main(const int argc, const char **argv) common_init_finish(g_ceph_context); - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); if (!store) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 6ae7627021c..6bd66a03e86 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -27,15 +27,22 @@ #include "rgw_cors_s3.h" #include "rgw_rest_conn.h" #include "rgw_rest_s3.h" +#include "rgw_lc.h" +#include "rgw_lc_s3.h" #include "rgw_client_io.h" +#include "cls/lock/cls_lock_client.h" +#include "cls/rgw/cls_rgw_client.h" + #include "include/assert.h" #define dout_subsys ceph_subsys_rgw using namespace std; +using namespace librados; using ceph::crypto::MD5; + static string mp_ns = RGW_OBJ_NS_MULTIPART; static string shadow_ns = RGW_OBJ_NS_SHADOW; @@ -3662,11 +3669,41 @@ int RGWPutACLs::verify_permission() return 0; } +int RGWPutLC::verify_permission() +{ + bool perm; + perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP); + if (!perm) + return -EACCES; + + return 0; +} + +int RGWDeleteLC::verify_permission() +{ + bool perm; + perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP); + if (!perm) + return -EACCES; + + return 0; +} + void RGWPutACLs::pre_exec() { rgw_bucket_object_pre_exec(s); } +void RGWPutLC::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +void RGWDeleteLC::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + void RGWPutACLs::execute() { bufferlist bl; @@ -3758,6 +3795,152 @@ void RGWPutACLs::execute() } } +static void get_lc_oid(struct req_state *s, string& oid) +{ + string shard_id = s->bucket.name + ':' +s->bucket.bucket_id; + int max_objs = (s->cct->_conf->rgw_lc_max_objs > HASH_PRIME)?HASH_PRIME:s->cct->_conf->rgw_lc_max_objs; + int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs; + oid = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", index); + oid.append(buf); + return; +} +void RGWPutLC::execute() +{ + bufferlist bl; + + RGWLifecycleConfiguration_S3 *config = NULL; + RGWLCXMLParser_S3 parser(s->cct); + RGWLifecycleConfiguration_S3 new_config(s->cct); + ret = 0; + + if (!parser.init()) { + ret = -EINVAL; + return; + } + + ret = get_params(); + if (ret < 0) + return; + + ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl; + + if (!parser.parse(data, len, 1)) { + ret = -EACCES; + return; + } + config = static_cast<RGWLifecycleConfiguration_S3 *>(parser.find_first("LifecycleConfiguration")); + if (!config) { + ret = -EINVAL; + return; + } + + if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) { + ldout(s->cct, 15) << "Old LifecycleConfiguration"; + config->to_xml(*_dout); + *_dout << dendl; + } + + ret = config->rebuild(store, new_config); + if (ret < 0) + return; + + if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) { + ldout(s->cct, 15) << "New LifecycleConfiguration:"; + new_config.to_xml(*_dout); + *_dout << dendl; + } + + new_config.encode(bl); + map<string, bufferlist> attrs; + attrs = s->bucket_attrs; + attrs[RGW_ATTR_LC] = bl; + ret =rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker); + if (ret < 0) + return; + string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id; + string oid; + get_lc_oid(s, oid); + pair<string, int> entry(shard_id, lc_uninitial); + int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time; + rados::cls::lock::Lock l(lc_index_lock_name); + utime_t time(max_lock_secs, 0); + l.set_duration(time); + librados::IoCtx *ctx = store->get_lc_pool_ctx(); + do { + ret = l.lock_exclusive(ctx, oid); + if (ret == -EBUSY) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl; + sleep(5); + continue; + } + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl; + break; + } + ret = cls_rgw_lc_set_entry(*ctx, oid, entry); + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl; + } + break; + }while(1); + l.unlock(ctx, oid); + return; +} + +void RGWDeleteLC::execute() +{ + bufferlist bl; + map<string, bufferlist> orig_attrs, attrs; + map<string, bufferlist>::iterator iter; + rgw_obj obj; + store->get_bucket_instance_obj(s->bucket, obj); + store->set_atomic(s->obj_ctx, obj); + ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker); + if (op_ret < 0) + return; + + for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) { + const string& name = iter->first; + dout(10) << "DeleteLC : attr: " << name << dendl; + if (name.compare(0, (sizeof(RGW_ATTR_LC) - 1), RGW_ATTR_LC) != 0) { + if (attrs.find(name) == attrs.end()) { + attrs[name] = iter->second; + } + } + } + ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker); + string shard_id = s->bucket.name + ':' +s->bucket.bucket_id; + pair<string, int> entry(shard_id, lc_uninitial); + string oid; + get_lc_oid(s, oid); + int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time; + librados::IoCtx *ctx = store->get_lc_pool_ctx(); + rados::cls::lock::Lock l(lc_index_lock_name); + utime_t time(max_lock_secs, 0); + l.set_duration(time); + do { + ret = l.lock_exclusive(ctx, oid); + if (ret == -EBUSY) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl; + sleep(5); + continue; + } + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl; + break; + } + ret = cls_rgw_lc_rm_entry(*ctx, oid, entry); + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl; + } + break; + }while(1); + l.unlock(ctx, oid); + return; +} + int RGWGetCORS::verify_permission() { if (false == s->auth_identity->is_owner_of(s->bucket_owner.get_id())) { diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index eeb633fd4e7..1fbb3483243 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1005,6 +1005,58 @@ public: virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } }; +class RGWPutLC : public RGWOp { +protected: + int ret; + size_t len; + char *data; + +public: + RGWPutLC() { + ret = 0; + len = 0; + data = NULL; + } + virtual ~RGWPutLC() { + free(data); + } + + int verify_permission(); + void pre_exec(); + void execute(); + +// virtual int get_policy_from_state(RGWRados *store, struct req_state *s, stringstream& ss) { return 0; } + virtual int get_params() = 0; + virtual void send_response() = 0; + virtual const string name() { return "put_lifecycle"; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } +}; + +class RGWDeleteLC : public RGWOp { +protected: + int ret; + size_t len; + char *data; + +public: + RGWDeleteLC() { + ret = 0; + len = 0; + data = NULL; + } + virtual ~RGWDeleteLC() { + free(data); + } + + int verify_permission(); + void pre_exec(); + void execute(); + + virtual void send_response() = 0; + virtual const string name() { return "delete_lifecycle"; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } +}; + class RGWGetCORS : public RGWOp { protected: diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index b38ff7cdb0e..0abfa6306e8 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -22,6 +22,8 @@ #include "rgw_cache.h" #include "rgw_acl.h" #include "rgw_acl_s3.h" /* for dumping s3policy in debug log */ +#include "rgw_lc.h" +#include "rgw_lc_s3.h" #include "rgw_metadata.h" #include "rgw_bucket.h" #include "rgw_rest_conn.h" @@ -58,6 +60,8 @@ using namespace librados; #include "rgw_log.h" #include "rgw_gc.h" +#include "rgw_lc.h" + #include "rgw_object_expirer_core.h" #include "rgw_sync.h" #include "rgw_data_sync.h" @@ -1486,6 +1490,7 @@ int RGWZoneParams::fix_pool_names() metadata_heap = fix_zone_pool_name(pool_names, name, ".rgw.meta", metadata_heap.name); control_pool = fix_zone_pool_name(pool_names, name, ".rgw.control", control_pool.name); gc_pool = fix_zone_pool_name(pool_names, name ,".rgw.gc", gc_pool.name); + lc_pool = fix_zone_pool_name(pool_names, name ,".rgw.lc", lc_pool.name); log_pool = fix_zone_pool_name(pool_names, name, ".rgw.log", log_pool.name); intent_log_pool = fix_zone_pool_name(pool_names, name, ".rgw.intent-log", intent_log_pool.name); usage_log_pool = fix_zone_pool_name(pool_names, name, ".rgw.usage", usage_log_pool.name); @@ -3733,6 +3738,10 @@ int RGWRados::init_complete() if (ret < 0) return ret; + ret = open_lc_pool_ctx(); + if (ret < 0) + return ret; + ret = open_objexp_pool_ctx(); if (ret < 0) return ret; @@ -3797,6 +3806,12 @@ int RGWRados::init_complete() data_notifier = new RGWDataNotifier(this); data_notifier->start(); + lc = new RGWLC(); + lc->initialize(cct, this); + + if (use_lc_thread) + lc->start_processor(); + quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads); bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards : @@ -3983,6 +3998,24 @@ int RGWRados::open_gc_pool_ctx() return r; } +int RGWRados::open_lc_pool_ctx() +{ + const char *lc_pool = get_zone_params().lc_pool.name.c_str(); + librados::Rados *rad = get_rados_handle(); + int r = rad->ioctx_create(lc_pool, lc_pool_ctx); + if (r == -ENOENT) { + r = rad->pool_create(lc_pool); + if (r == -EEXIST) + r = 0; + if (r < 0) + return r; + + r = rad->ioctx_create(lc_pool, lc_pool_ctx); + } + + return r; +} + int RGWRados::open_objexp_pool_ctx() { const char * const pool_name = get_zone_params().log_pool.name.c_str(); @@ -11232,6 +11265,16 @@ int RGWRados::process_gc() return gc->process(); } +int RGWRados::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map) +{ + return lc->list_lc_progress(marker, max_entries, progress_map); +} + +int RGWRados::process_lc() +{ + return lc->process(); +} + int RGWRados::process_expire_objects() { obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now(cct)); @@ -12254,7 +12297,7 @@ uint64_t RGWRados::next_bucket_id() return ++max_bucket_id; } -RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread) +RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) { int use_cache = cct->_conf->rgw_cache_enabled; RGWRados *store = NULL; @@ -12264,7 +12307,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t store = new RGWCache<RGWRados>; } - if (store->initialize(cct, use_gc_thread, quota_threads, run_sync_thread) < 0) { + if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread) < 0) { delete store; return NULL; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 5b8d2ec5027..4af138f0483 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -29,6 +29,7 @@ class ACLOwner; class RGWGC; class RGWMetaNotifier; class RGWDataNotifier; +class RGWLC; class RGWObjectExpirer; class RGWMetaSyncProcessorThread; class RGWDataSyncProcessorThread; @@ -856,6 +857,7 @@ struct RGWZoneParams : RGWSystemMetaObj { rgw_bucket metadata_heap; rgw_bucket control_pool; rgw_bucket gc_pool; + rgw_bucket lc_pool; rgw_bucket log_pool; rgw_bucket intent_log_pool; rgw_bucket usage_log_pool; @@ -897,6 +899,7 @@ struct RGWZoneParams : RGWSystemMetaObj { ::encode(domain_root, bl); ::encode(control_pool, bl); ::encode(gc_pool, bl); + ::encode(lc_pool, bl); ::encode(log_pool, bl); ::encode(intent_log_pool, bl); ::encode(usage_log_pool, bl); @@ -917,6 +920,7 @@ struct RGWZoneParams : RGWSystemMetaObj { ::decode(domain_root, bl); ::decode(control_pool, bl); ::decode(gc_pool, bl); + ::decode(lc_pool, bl); ::decode(log_pool, bl); ::decode(intent_log_pool, bl); ::decode(usage_log_pool, bl); @@ -1718,6 +1722,7 @@ class RGWRados friend class RGWGC; friend class RGWMetaNotifier; friend class RGWDataNotifier; + friend class RGWLC; friend class RGWObjectExpirer; friend class RGWMetaSyncProcessorThread; friend class RGWDataSyncProcessorThread; @@ -1727,6 +1732,7 @@ class RGWRados /** Open the pool used as root for this gateway */ int open_root_pool_ctx(); int open_gc_pool_ctx(); + int open_lc_pool_ctx(); int open_objexp_pool_ctx(); int open_pool_ctx(const string& pool, librados::IoCtx& io_ctx); @@ -1764,8 +1770,10 @@ class RGWRados }; RGWGC *gc; + RGWLC *lc; RGWObjectExpirer *obj_expirer; bool use_gc_thread; + bool use_lc_thread; bool quota_threads; bool run_sync_thread; @@ -1826,6 +1834,7 @@ protected: tombstone_cache_t *obj_tombstone_cache; librados::IoCtx gc_pool_ctx; // .rgw.gc + librados::IoCtx lc_pool_ctx; // .rgw.lc librados::IoCtx objexp_pool_ctx; bool pools_initialized; @@ -1848,7 +1857,7 @@ protected: RGWPeriod current_period; public: RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), - gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false), + gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false), run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL), data_notifier(NULL), meta_sync_processor_thread(NULL), meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"), @@ -1872,6 +1881,9 @@ public: return max_req_id.inc(); } + librados::IoCtx* get_lc_pool_ctx() { + return &lc_pool_ctx; + } void set_context(CephContext *_cct) { cct = _cct; } @@ -1996,9 +2008,10 @@ public: CephContext *ctx() { return cct; } /** do all necessary setup of the storage device */ - int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads, bool _run_sync_thread) { + int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread) { set_context(_cct); use_gc_thread = _use_gc_thread; + use_lc_thread = _use_lc_thread; quota_threads = _quota_threads; run_sync_thread = _run_sync_thread; return initialize(); @@ -2862,6 +2875,9 @@ public: int process_expire_objects(); int defer_gc(void *ctx, rgw_obj& obj); + int process_lc(); + int list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map); + int bucket_check_index(rgw_bucket& bucket, map<RGWObjCategory, RGWStorageStats> *existing_stats, map<RGWObjCategory, RGWStorageStats> *calculated_stats); @@ -3023,15 +3039,15 @@ public: class RGWStoreManager { public: RGWStoreManager() {} - static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread) { - RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads, run_sync_thread); + static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) { + RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread); return store; } static RGWRados *get_raw_storage(CephContext *cct) { RGWRados *store = init_raw_storage_provider(cct); return store; } - static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread); + static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread); static RGWRados *init_raw_storage_provider(CephContext *cct); static void close_storage(RGWRados *store); diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 8f38e987ea7..4edf347742b 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -100,6 +100,7 @@ void RGWRealmReloader::reload() // recreate and initialize a new store store = RGWStoreManager::get_storage(cct, cct->_conf->rgw_enable_gc_threads, + cct->_conf->rgw_enable_lc_threads, cct->_conf->rgw_enable_quota_threads, cct->_conf->rgw_run_sync_thread); diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index e485d99f40c..77e67a768e0 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -1149,8 +1149,31 @@ int RGWPutACLs_ObjStore::get_params() return op_ret; } -static int read_all_chunked_input(req_state *s, char **pdata, int *plen, - int max_read) +int RGWPutLC_ObjStore::get_params() +{ + size_t cl = 0; + if (s->length) + cl = atoll(s->length); + if (cl) { + data = (char *)malloc(cl + 1); + if (!data) { + ret = -ENOMEM; + return ret; + } + int read_len; + int r = STREAM_IO(s)->read(data, cl, &read_len, s->aws4_auth_needs_complete); + len = read_len; + if (r < 0) + return r; + data[len] = '\0'; + } else { + len = 0; + } + + return ret; +} + +static int read_all_chunked_input(req_state *s, char **pdata, int *plen, int max_read) { #define READ_CHUNK 4096 #define MAX_READ_CHUNK (128 * 1024) diff --git a/src/rgw/rgw_rest.h b/src/rgw/rgw_rest.h index 3e23945fb2d..831b214df75 100644 --- a/src/rgw/rgw_rest.h +++ b/src/rgw/rgw_rest.h @@ -278,6 +278,21 @@ public: virtual int get_params(); }; +class RGWPutLC_ObjStore : public RGWPutLC { +public: + RGWPutLC_ObjStore() {} + ~RGWPutLC_ObjStore() {} + + int get_params(); +}; + +class RGWDeleteLC_ObjStore : public RGWDeleteLC { +public: + RGWDeleteLC_ObjStore() {} + ~RGWDeleteLC_ObjStore() {} + +}; + class RGWGetCORS_ObjStore : public RGWGetCORS { public: RGWGetCORS_ObjStore() {} diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 5acba5e0825..679d423568d 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -2260,6 +2260,27 @@ void RGWPutACLs_ObjStore_S3::send_response() dump_start(s); } +void RGWPutLC_ObjStore_S3::send_response() +{ + if (ret) + set_req_state_err(s, ret); + dump_errno(s); + end_header(s, this, "application/xml"); + dump_start(s); +} + +void RGWDeleteLC_ObjStore_S3::send_response() +{ + if (ret == 0) + ret = STATUS_NO_CONTENT; + if (ret) { + set_req_state_err(s, ret); + } + dump_errno(s); + end_header(s, this, "application/xml"); + dump_start(s); +} + void RGWGetCORS_ObjStore_S3::send_response() { if (op_ret) { @@ -2844,6 +2865,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put() return new RGWPutCORS_ObjStore_S3; } else if (is_request_payment_op()) { return new RGWSetRequestPayment_ObjStore_S3; + } else if(is_lc_op()) { + return new RGWPutLC_ObjStore_S3; } return new RGWCreateBucket_ObjStore_S3; } @@ -2852,6 +2875,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete() { if (is_cors_op()) { return new RGWDeleteCORS_ObjStore_S3; + } else if(is_lc_op()) { + return new RGWDeleteLC_ObjStore_S3; } if (s->info.args.sub_resource_exists("website")) { diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index cecc14c9765..43c254e3825 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -257,6 +257,22 @@ public: int get_params(); }; +class RGWPutLC_ObjStore_S3 : public RGWPutLC_ObjStore { +public: + RGWPutLC_ObjStore_S3() {} + ~RGWPutLC_ObjStore_S3() {} + + void send_response(); +}; + +class RGWDeleteLC_ObjStore_S3 : public RGWDeleteLC_ObjStore { +public: + RGWDeleteLC_ObjStore_S3() {} + ~RGWDeleteLC_ObjStore_S3() {} + + void send_response(); +}; + class RGWGetCORS_ObjStore_S3 : public RGWGetCORS_ObjStore { public: RGWGetCORS_ObjStore_S3() {} @@ -497,6 +513,9 @@ protected: bool is_cors_op() { return s->info.args.exists("cors"); } + bool is_lc_op() { + return s->info.args.exists("lifecycle"); + } bool is_obj_update_op() { return is_acl_op() || is_cors_op(); } diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 217ac6ec03b..3a18587f876 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -83,6 +83,8 @@ gc list dump expired garbage collection objects (specify --include-all to list all entries, including unexpired) gc process manually process garbage + lc list list all bucket lifecycle progress + lc process manually process lifecycle metadata get get metadata info metadata put put metadata info metadata rm remove metadata info diff --git a/src/test/test_rgw_admin_opstate.cc b/src/test/test_rgw_admin_opstate.cc index 3499aa46abd..451d6df22f2 100644 --- a/src/test/test_rgw_admin_opstate.cc +++ b/src/test/test_rgw_admin_opstate.cc @@ -807,7 +807,7 @@ int main(int argc, char *argv[]){ global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); common_init_finish(g_ceph_context); - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); g_test = new admin_log::test_helper(); finisher = new Finisher(g_ceph_context); #ifdef GTEST |