diff options
45 files changed, 3168 insertions, 282 deletions
diff --git a/alpine/APKBUILD.in b/alpine/APKBUILD.in index f1ab9aa27d6..51245ea5a1f 100644 --- a/alpine/APKBUILD.in +++ b/alpine/APKBUILD.in @@ -292,7 +292,7 @@ radosgw() { pkgdesc="Rados REST gateway which implements Amazon's S3 and OpenStack's Swift APIs." depends="ceph-common" - _pkg $_bindir radosgw radosgw-admin radosgw-token radosgw-object-expirer + _pkg $_bindir radosgw radosgw-admin radosgw-token radosgw-es radosgw-object-expirer mkdir -p $subpkgdir$_localstatedir/lib/ceph/radosgw } diff --git a/ceph.spec.in b/ceph.spec.in index ce517b3a785..592aa6e0c1a 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -1375,6 +1375,7 @@ fi %defattr(-,root,root,-) %{_bindir}/radosgw %{_bindir}/radosgw-token +%{_bindir}/radosgw-es %{_bindir}/radosgw-object-expirer %{_mandir}/man8/radosgw.8* %dir %{_localstatedir}/lib/ceph/radosgw diff --git a/debian/radosgw.install b/debian/radosgw.install index 192f2329bfa..9f2c8debb32 100644 --- a/debian/radosgw.install +++ b/debian/radosgw.install @@ -1,4 +1,5 @@ usr/bin/radosgw usr/bin/radosgw-token +usr/bin/radosgw-es usr/bin/radosgw-object-expirer usr/share/man/man8/radosgw.8 diff --git a/qa/tasks/rgw_multisite.py b/qa/tasks/rgw_multisite.py index 0cab6249325..cd120c44140 100644 --- a/qa/tasks/rgw_multisite.py +++ b/qa/tasks/rgw_multisite.py @@ -10,6 +10,7 @@ from copy import deepcopy from util.rgw import rgwadmin, wait_for_radosgw from util.rados import create_ec_pool, create_replicated_pool from rgw_multi import multisite +from rgw_multi.zone_rados import RadosZone as RadosZone from teuthology.orchestra import run from teuthology import misc @@ -366,6 +367,7 @@ def create_zonegroup(cluster, gateways, period, config): def create_zone(ctx, cluster, gateways, creds, zonegroup, config): """ create a zone with the given configuration """ zone = multisite.Zone(config['name'], zonegroup, cluster) + zone = RadosZone(config['name'], zonegroup, cluster) # collect Gateways for the zone's endpoints endpoints = config.get('endpoints') @@ -389,6 +391,14 @@ def create_zone(ctx, cluster, gateways, creds, zonegroup, config): create_zone_pools(ctx, zone) if ctx.rgw.compression_type: configure_zone_compression(zone, ctx.rgw.compression_type) + + zonegroup.zones_by_type.setdefault(zone.tier_type(), []).append(zone) + + if zone.is_read_only(): + zonegroup.ro_zones.append(zone) + else: + zonegroup.rw_zones.append(zone) + return zone def create_zone_pools(ctx, zone): diff --git a/src/common/ceph_json.h b/src/common/ceph_json.h index 873dcd1afe5..cd868a88f4e 100644 --- a/src/common/ceph_json.h +++ b/src/common/ceph_json.h @@ -207,8 +207,8 @@ void decode_json_obj(vector<T>& l, JSONObj *obj) } } -template<class K, class V> -void decode_json_obj(map<K, V>& m, JSONObj *obj) +template<class K, class V, class C = std::less<K> > +void decode_json_obj(map<K, V, C>& m, JSONObj *obj) { m.clear(); @@ -381,11 +381,11 @@ static void encode_json(const char *name, const std::vector<T>& l, ceph::Formatt f->close_section(); } -template<class K, class V> -static void encode_json(const char *name, const std::map<K, V>& m, ceph::Formatter *f) +template<class K, class V, class C = std::less<K> > +static void encode_json(const char *name, const std::map<K, V, C>& m, ceph::Formatter *f) { f->open_array_section(name); - for (typename std::map<K, V>::const_iterator i = m.begin(); i != m.end(); ++i) { + for (typename std::map<K, V, C>::const_iterator i = m.begin(); i != m.end(); ++i) { f->open_object_section("entry"); encode_json("key", i->first, f); encode_json("val", i->second, f); diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 6d2a5928164..c0e0c907b75 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -55,6 +55,7 @@ set(rgw_a_srcs rgw_cors_s3.cc rgw_dencoder.cc rgw_env.cc + rgw_es_query.cc rgw_formats.cc rgw_frontend.cc rgw_gc.cc @@ -73,6 +74,7 @@ set(rgw_a_srcs rgw_data_sync.cc rgw_sync_module.cc rgw_sync_module_es.cc + rgw_sync_module_es_rest.cc rgw_sync_module_log.cc rgw_period_history.cc rgw_period_puller.cc @@ -194,6 +196,17 @@ target_link_libraries(radosgw-admin rgw_a librados ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${SSL_LIBRARIES} ${BLKID_LIBRARIES}) install(TARGETS radosgw-admin DESTINATION bin) +set(radosgw_es_srcs + rgw_es_main.cc) +add_executable(radosgw-es ${radosgw_es_srcs}) +target_link_libraries(radosgw-es rgw_a librados + cls_rgw_client cls_lock_client cls_refcount_client + cls_log_client cls_statelog_client cls_timeindex_client + cls_version_client cls_replica_log_client cls_user_client + global ${FCGI_LIBRARY} ${LIB_RESOLV} + ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${SSL_LIBRARIES} ${BLKID_LIBRARIES}) +install(TARGETS radosgw-es DESTINATION bin) + set(radosgw_token_srcs rgw_token.cc) add_executable(radosgw-token ${radosgw_token_srcs}) diff --git a/src/rgw/rgw_acl.h b/src/rgw/rgw_acl.h index 26c84d121a2..0225797e5b7 100644 --- a/src/rgw/rgw_acl.h +++ b/src/rgw/rgw_acl.h @@ -377,6 +377,7 @@ public: DECODE_FINISH(bl); } void dump(Formatter *f) const; + void decode_json(JSONObj *obj); static void generate_test_instances(list<ACLOwner*>& o); void set_id(const rgw_user& _id) { id = _id; } void set_name(const string& name) { display_name = name; } diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index fe88d824749..dfe61e0ded8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2182,7 +2182,7 @@ static void sync_status(Formatter *formatter) tab_dump("data sync", width, data_status); } -static void parse_tier_config_param(const string& s, map<string, string>& out) +static void parse_tier_config_param(const string& s, map<string, string, ltstr_nocase>& out) { list<string> confs; get_str_list(s, ",", confs); @@ -2514,8 +2514,8 @@ int main(int argc, const char **argv) string tier_type; bool tier_type_specified = false; - map<string, string> tier_config_add; - map<string, string> tier_config_rm; + map<string, string, ltstr_nocase> tier_config_add; + map<string, string, ltstr_nocase> tier_config_rm; boost::optional<string> index_pool; boost::optional<string> data_pool; @@ -3935,6 +3935,7 @@ int main(int argc, const char **argv) zone.system_key.id = access_key; zone.system_key.key = secret_key; zone.realm_id = realm_id; + zone.tier_config = tier_config_add; ret = zone.create(); if (ret < 0) { diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index ddbd8fae708..365cd86cb4a 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -550,7 +550,7 @@ bool parse_iso8601(const char *s, struct tm *t, uint32_t *pns, bool extended_for trim_whitespace(p, str); int len = str.size(); - if (len == 1 && str[0] == 'Z') + if (len == 0 || (len == 1 && str[0] == 'Z')) return true; if (str[0] != '.' || diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index bc049346e73..0a7cc342c09 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -357,6 +357,10 @@ class RGWHTTPArgs } }; +const char *rgw_conf_get(const map<string, string, ltstr_nocase>& conf_map, const char *name, const char *def_val); +int rgw_conf_get_int(const map<string, string, ltstr_nocase>& conf_map, const char *name, int def_val); +bool rgw_conf_get_bool(const map<string, string, ltstr_nocase>& conf_map, const char *name, bool def_val); + class RGWEnv; class RGWConf { @@ -464,8 +468,11 @@ enum RGWOpType { /* rgw specific */ RGW_OP_ADMIN_SET_METADATA, RGW_OP_GET_OBJ_LAYOUT, - - RGW_OP_BULK_UPLOAD + RGW_OP_BULK_UPLOAD, + RGW_OP_METADATA_SEARCH, + RGW_OP_CONFIG_BUCKET_META_SEARCH, + RGW_OP_GET_BUCKET_META_SEARCH, + RGW_OP_DEL_BUCKET_META_SEARCH, }; class RGWAccessControlPolicy; @@ -1168,9 +1175,11 @@ struct RGWBucketInfo bool swift_versioning; string swift_ver_location; + map<string, uint32_t> mdsearch_config; + void encode(bufferlist& bl) const { - ENCODE_START(17, 4, bl); + ENCODE_START(18, 4, bl); ::encode(bucket, bl); ::encode(owner.id, bl); ::encode(flags, bl); @@ -1194,10 +1203,11 @@ struct RGWBucketInfo ::encode(swift_ver_location, bl); } ::encode(creation_time, bl); + ::encode(mdsearch_config, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START_LEGACY_COMPAT_LEN_32(17, 4, 4, bl); + DECODE_START_LEGACY_COMPAT_LEN_32(18, 4, 4, bl); ::decode(bucket, bl); if (struct_v >= 2) { string s; @@ -1254,6 +1264,9 @@ struct RGWBucketInfo if (struct_v >= 17) { ::decode(creation_time, bl); } + if (struct_v >= 18) { + ::decode(mdsearch_config, bl); + } DECODE_FINISH(bl); } void dump(Formatter *f) const; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index b23e306c124..9b2aefe70e4 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -22,6 +22,8 @@ #include "cls/lock/cls_lock_client.h" +#include "auth/Crypto.h" + #define dout_subsys ceph_subsys_rgw #undef dout_prefix @@ -470,12 +472,15 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { map<int, RGWDataChangesLogInfo> shards_info; public: RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards, + uint64_t instance_id, rgw_data_sync_status *status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store), pool(store->get_zone_params().log_pool), num_shards(num_shards), status(status) { lock_name = "sync_lock"; + status->sync_info.instance_id = instance_id; + #define COOKIE_LEN 16 char buf[COOKIE_LEN + 1]; @@ -680,7 +685,9 @@ int RGWRemoteDataLog::init_sync_status(int num_shards) } RGWDataSyncEnv sync_env_local = sync_env; sync_env_local.http_manager = &http_manager; - ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, &sync_status)); + uint64_t instance_id; + get_random_bytes((char *)&instance_id, sizeof(instance_id)); + ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status)); http_manager.stop(); return ret; } @@ -1432,6 +1439,8 @@ class RGWDataSyncCR : public RGWCoroutine { bool *reset_backoff; RGWDataSyncDebugLogger logger; + + RGWDataSyncModule *data_sync_module{nullptr}; public: RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), @@ -1439,6 +1448,7 @@ public: marker_tracker(NULL), shard_crs_lock("RGWDataSyncCR::shard_crs_lock"), reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") { + } ~RGWDataSyncCR() override { @@ -1453,6 +1463,8 @@ public: /* read sync status */ yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status)); + data_sync_module = sync_env->sync_module->get_data_handler(); + if (retcode == -ENOENT) { sync_status.sync_info.num_shards = num_shards; } else if (retcode < 0 && retcode != -ENOENT) { @@ -1463,7 +1475,9 @@ public: /* state: init status */ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) { ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl; - yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, &sync_status)); + uint64_t instance_id; + get_random_bytes((char *)&instance_id, sizeof(instance_id)); + yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl; return set_cr_error(retcode); @@ -1473,7 +1487,15 @@ public: *reset_backoff = true; } + data_sync_module->init(sync_env, sync_status.sync_info.instance_id); + if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) { + /* call sync module init here */ + yield call(data_sync_module->init_sync(sync_env)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl; + return set_cr_error(retcode); + } /* state: building full sync maps */ ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl; yield call(new RGWListBucketIndexesCR(sync_env, &sync_status)); @@ -1550,7 +1572,7 @@ public: } }; -int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) +int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) { instance->reset(new RGWDefaultSyncModuleInstance()); return 0; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 75ae358198d..00e09447455 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -29,17 +29,23 @@ struct rgw_data_sync_info { uint16_t state; uint32_t num_shards; + uint64_t instance_id{0}; + void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); ::encode(state, bl); ::encode(num_shards, bl); + ::encode(instance_id, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); ::decode(state, bl); ::decode(num_shards, bl); + if (struct_v >= 2) { + ::decode(instance_id, bl); + } DECODE_FINISH(bl); } @@ -61,6 +67,7 @@ struct rgw_data_sync_info { } encode_json("status", s, f); encode_json("num_shards", num_shards, f); + encode_json("instance_id", instance_id, f); } void decode_json(JSONObj *obj) { std::string s; @@ -73,6 +80,7 @@ struct rgw_data_sync_info { state = StateInit; } JSONDecoder::decode_json("num_shards", num_shards, obj); + JSONDecoder::decode_json("instance_id", num_shards, obj); } rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} @@ -520,7 +528,7 @@ class RGWDefaultSyncModule : public RGWSyncModule { public: RGWDefaultSyncModule() {} bool supports_data_export() override { return true; } - int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override; }; // DataLogTrimCR factory function diff --git a/src/rgw/rgw_env.cc b/src/rgw/rgw_env.cc index 737cda4cc97..d1f876dcbee 100644 --- a/src/rgw/rgw_env.cc +++ b/src/rgw/rgw_env.cc @@ -41,35 +41,50 @@ void RGWEnv::init(CephContext *cct, char **envp) init(cct); } -const char *RGWEnv::get(const char *name, const char *def_val) +const char *rgw_conf_get(const map<string, string, ltstr_nocase>& conf_map, const char *name, const char *def_val) { - map<string, string, ltstr_nocase>::iterator iter = env_map.find(name); - if (iter == env_map.end()) + auto iter = conf_map.find(name); + if (iter == conf_map.end()) return def_val; return iter->second.c_str(); } -int RGWEnv::get_int(const char *name, int def_val) +const char *RGWEnv::get(const char *name, const char *def_val) { - map<string, string, ltstr_nocase>::iterator iter = env_map.find(name); - if (iter == env_map.end()) + return rgw_conf_get(env_map, name, def_val); +} + +int rgw_conf_get_int(const map<string, string, ltstr_nocase>& conf_map, const char *name, int def_val) +{ + auto iter = conf_map.find(name); + if (iter == conf_map.end()) return def_val; const char *s = iter->second.c_str(); return atoi(s); } -bool RGWEnv::get_bool(const char *name, bool def_val) +int RGWEnv::get_int(const char *name, int def_val) { - map<string, string, ltstr_nocase>::iterator iter = env_map.find(name); - if (iter == env_map.end()) + return rgw_conf_get_int(env_map, name, def_val); +} + +bool rgw_conf_get_bool(const map<string, string, ltstr_nocase>& conf_map, const char *name, bool def_val) +{ + auto iter = conf_map.find(name); + if (iter == conf_map.end()) return def_val; const char *s = iter->second.c_str(); return rgw_str_to_bool(s, def_val); } +bool RGWEnv::get_bool(const char *name, bool def_val) +{ + return rgw_conf_get_bool(env_map, name, def_val); +} + size_t RGWEnv::get_size(const char *name, size_t def_val) { map<string, string, ltstr_nocase>::iterator iter = env_map.find(name); diff --git a/src/rgw/rgw_es_main.cc b/src/rgw/rgw_es_main.cc new file mode 100644 index 00000000000..78b2865ed33 --- /dev/null +++ b/src/rgw/rgw_es_main.cc @@ -0,0 +1,77 @@ +#include <list> +#include <string> +#include <iostream> + +#include "global/global_init.h" +#include "global/global_context.h" + +#include "common/ceph_argparse.h" +#include "common/ceph_json.h" +#include "rgw_es_query.h" + +using namespace std; + +int main(int argc, char *argv[]) +{ + vector<const char*> args; + argv_to_vec(argc, (const char **)argv, args); + env_to_vec(args); + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, 0); + + common_init_finish(g_ceph_context); + + list<string> infix; + + string expr; + + if (argc > 1) { + expr = argv[1]; + } else { + expr = "age >= 30"; + } + + ESQueryCompiler es_query(expr, nullptr, "x-amz-meta-"); + + map<string, string, ltstr_nocase> aliases = { { "key", "name" }, + { "etag", "meta.etag" }, + { "size", "meta.size" }, + { "mtime", "meta.mtime" }, + { "lastmodified", "meta.mtime" }, + { "contenttype", "meta.contenttype" }, + }; + es_query.set_field_aliases(&aliases); + + map<string, ESEntityTypeMap::EntityType> generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR}, + {"name", ESEntityTypeMap::ES_ENTITY_STR}, + {"instance", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.contenttype", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE}, + {"meta.size", ESEntityTypeMap::ES_ENTITY_INT} }; + ESEntityTypeMap gm(generic_map); + es_query.set_generic_type_map(&gm); + + map<string, ESEntityTypeMap::EntityType> custom_map = { {"str", ESEntityTypeMap::ES_ENTITY_STR}, + {"int", ESEntityTypeMap::ES_ENTITY_INT}, + {"date", ESEntityTypeMap::ES_ENTITY_DATE} }; + ESEntityTypeMap em(custom_map); + es_query.set_custom_type_map(&em); + + string err; + + bool valid = es_query.compile(&err); + if (!valid) { + cout << "failed to compile query: " << err << std::endl; + return EINVAL; + } + + JSONFormatter f; + encode_json("root", es_query, &f); + + f.flush(cout); + + return 0; +} + diff --git a/src/rgw/rgw_es_query.cc b/src/rgw/rgw_es_query.cc new file mode 100644 index 00000000000..9a36cbc0d0f --- /dev/null +++ b/src/rgw/rgw_es_query.cc @@ -0,0 +1,660 @@ +#include <list> +#include <map> +#include <string> +#include <iostream> +#include <boost/algorithm/string.hpp> + +#include "common/ceph_json.h" +#include "rgw_common.h" +#include "rgw_es_query.h" + +using namespace std; + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +bool pop_front(list<string>& l, string *s) +{ + if (l.empty()) { + return false; + } + *s = l.front(); + l.pop_front(); + return true; +} + +map<string, int> operator_map = { + { "or", 1 }, + { "and", 2 }, + { "<", 3 }, + { "<=", 3 }, + { "==", 3 }, + { ">=", 3 }, + { ">", 3 }, +}; + +bool is_operator(const string& s) +{ + return (operator_map.find(s) != operator_map.end()); +} + +int operand_value(const string& op) +{ + auto i = operator_map.find(op); + if (i == operator_map.end()) { + return 0; + } + + return i->second; +} + +int check_precedence(const string& op1, const string& op2) +{ + return operand_value(op1) - operand_value(op2); +} + +static bool infix_to_prefix(list<string>& source, list<string> *out) +{ + list<string> operator_stack; + list<string> operand_stack; + + operator_stack.push_front("("); + source.push_back(")"); + + for (string& entity : source) { + if (entity == "(") { + operator_stack.push_front(entity); + } else if (entity == ")") { + string popped_operator; + if (!pop_front(operator_stack, &popped_operator)) { + return false; + } + + while (popped_operator != "(") { + operand_stack.push_front(popped_operator); + if (!pop_front(operator_stack, &popped_operator)) { + return false; + } + } + + } else if (is_operator(entity)) { + string popped_operator; + if (!pop_front(operator_stack, &popped_operator)) { + return false; + } + + int precedence = check_precedence(popped_operator, entity); + + while (precedence >= 0) { + operand_stack.push_front(popped_operator); + if (!pop_front(operator_stack, &popped_operator)) { + return false; + } + precedence = check_precedence(popped_operator, entity); + } + + operator_stack.push_front(popped_operator); + operator_stack.push_front(entity); + } else { + operand_stack.push_front(entity); + } + + } + + if (!operator_stack.empty()) { + return false; + } + + out->swap(operand_stack); + return true; +} + +class ESQueryNode { +protected: + ESQueryCompiler *compiler; +public: + ESQueryNode(ESQueryCompiler *_compiler) : compiler(_compiler) {} + virtual ~ESQueryNode() {} + + virtual bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) = 0; + + virtual void dump(Formatter *f) const = 0; +}; + +static bool alloc_node(ESQueryCompiler *compiler, ESQueryStack *s, ESQueryNode **pnode, string *perr); + +class ESQueryNode_Bool : public ESQueryNode { + string op; + ESQueryNode *first{nullptr}; + ESQueryNode *second{nullptr}; +public: + ESQueryNode_Bool(ESQueryCompiler *compiler) : ESQueryNode(compiler) {} + ESQueryNode_Bool(ESQueryCompiler *compiler, const string& _op, ESQueryNode *_first, ESQueryNode *_second) :ESQueryNode(compiler), op(_op), first(_first), second(_second) {} + bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) override { + bool valid = s->pop(&op); + if (!valid) { + *perr = "incorrect expression"; + return false; + } + valid = alloc_node(compiler, s, &first, perr) && + alloc_node(compiler, s, &second, perr); + if (!valid) { + return false; + } + *pnode = this; + return true; + } + virtual ~ESQueryNode_Bool() { + delete first; + delete second; + } + + void dump(Formatter *f) const { + f->open_object_section("bool"); + const char *section = (op == "and" ? "must" : "should"); + f->open_array_section(section); + encode_json("entry", *first, f); + encode_json("entry", *second, f); + f->close_section(); + f->close_section(); + } + +}; + +class ESQueryNodeLeafVal { +public: + ESQueryNodeLeafVal() = default; + virtual ~ESQueryNodeLeafVal() {} + + virtual bool init(const string& str_val, string *perr) = 0; + virtual void encode_json(const string& field, Formatter *f) const = 0; +}; + +class ESQueryNodeLeafVal_Str : public ESQueryNodeLeafVal { + string val; +public: + ESQueryNodeLeafVal_Str() {} + bool init(const string& str_val, string *perr) override { + val = str_val; + return true; + } + void encode_json(const string& field, Formatter *f) const { + ::encode_json(field.c_str(), val.c_str(), f); + } +}; + +class ESQueryNodeLeafVal_Int : public ESQueryNodeLeafVal { + int64_t val; +public: + ESQueryNodeLeafVal_Int() {} + bool init(const string& str_val, string *perr) override { + string err; + val = strict_strtoll(str_val.c_str(), 10, &err); + if (!err.empty()) { + *perr = string("failed to parse integer: ") + err; + return false; + } + return true; + } + void encode_json(const string& field, Formatter *f) const { + ::encode_json(field.c_str(), val, f); + } +}; + +class ESQueryNodeLeafVal_Date : public ESQueryNodeLeafVal { + ceph::real_time val; +public: + ESQueryNodeLeafVal_Date() {} + bool init(const string& str_val, string *perr) override { + if (parse_time(str_val.c_str(), &val) < 0) { + *perr = string("failed to parse date: ") + str_val; + return false; + } + return true; + } + void encode_json(const string& field, Formatter *f) const { + string s; + rgw_to_iso8601(val, &s); + ::encode_json(field.c_str(), s, f); + } +}; + +class ESQueryNode_Op : public ESQueryNode { +protected: + string op; + string field; + string str_val; + ESQueryNodeLeafVal *val{nullptr}; + ESEntityTypeMap::EntityType entity_type{ESEntityTypeMap::ES_ENTITY_NONE}; + bool allow_restricted{false}; + + bool val_from_str(string *perr) { + switch (entity_type) { + case ESEntityTypeMap::ES_ENTITY_DATE: + val = new ESQueryNodeLeafVal_Date; + break; + case ESEntityTypeMap::ES_ENTITY_INT: + val = new ESQueryNodeLeafVal_Int; + break; + default: + val = new ESQueryNodeLeafVal_Str; + } + return val->init(str_val, perr); + } + bool do_init(ESQueryNode **pnode, string *perr) { + field = compiler->unalias_field(field); + ESQueryNode *effective_node; + if (!handle_nested(&effective_node, perr)) { + return false; + } + if (!val_from_str(perr)) { + return false; + } + *pnode = effective_node; + return true; + } + +public: + ESQueryNode_Op(ESQueryCompiler *compiler) : ESQueryNode(compiler) {} + ~ESQueryNode_Op() { + delete val; + } + virtual bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) override { + bool valid = s->pop(&op) && + s->pop(&str_val) && + s->pop(&field); + if (!valid) { + *perr = "invalid expression"; + return false; + } + return do_init(pnode, perr); + } + bool handle_nested(ESQueryNode **pnode, string *perr); + + void set_allow_restricted(bool allow) { + allow_restricted = allow; + } + + virtual void dump(Formatter *f) const = 0; +}; + +class ESQueryNode_Op_Equal : public ESQueryNode_Op { +public: + ESQueryNode_Op_Equal(ESQueryCompiler *compiler) : ESQueryNode_Op(compiler) {} + ESQueryNode_Op_Equal(ESQueryCompiler *compiler, const string& f, const string& v) : ESQueryNode_Op(compiler) { + op = "=="; + field = f; + str_val = v; + } + + bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) override { + if (op.empty()) { + return ESQueryNode_Op::init(s, pnode, perr); + } + return do_init(pnode, perr); + } + + virtual void dump(Formatter *f) const { + f->open_object_section("term"); + val->encode_json(field, f); + f->close_section(); + } +}; + +class ESQueryNode_Op_Range : public ESQueryNode_Op { + string range_str; +public: + ESQueryNode_Op_Range(ESQueryCompiler *compiler, const string& rs) : ESQueryNode_Op(compiler), range_str(rs) {} + + virtual void dump(Formatter *f) const { + f->open_object_section("range"); + f->open_object_section(field.c_str()); + val->encode_json(range_str, f); + f->close_section(); + f->close_section(); + } +}; + +class ESQueryNode_Op_Nested_Parent : public ESQueryNode_Op { +public: + ESQueryNode_Op_Nested_Parent(ESQueryCompiler *compiler) : ESQueryNode_Op(compiler) {} + + virtual string get_custom_leaf_field_name() = 0; +}; + +template <class T> +class ESQueryNode_Op_Nested : public ESQueryNode_Op_Nested_Parent { + string name; + ESQueryNode *next; +public: + ESQueryNode_Op_Nested(ESQueryCompiler *compiler, const string& _name, ESQueryNode *_next) : ESQueryNode_Op_Nested_Parent(compiler), + name(_name), next(_next) {} + ~ESQueryNode_Op_Nested() { + delete next; + } + + virtual void dump(Formatter *f) const { + f->open_object_section("nested"); + string s = string("meta.custom-") + type_str(); + encode_json("path", s.c_str(), f); + f->open_object_section("query"); + f->open_object_section("bool"); + f->open_array_section("must"); + f->open_object_section("entry"); + f->open_object_section("match"); + string n = s + ".name"; + encode_json(n.c_str(), name.c_str(), f); + f->close_section(); + f->close_section(); + encode_json("entry", *next, f); + f->close_section(); + f->close_section(); + f->close_section(); + f->close_section(); + } + + string type_str() const; + string get_custom_leaf_field_name() { + return string("meta.custom-") + type_str() + ".value"; + } +}; + +template<> +string ESQueryNode_Op_Nested<string>::type_str() const { + return "string"; +} + +template<> +string ESQueryNode_Op_Nested<int64_t>::type_str() const { + return "int"; +} + +template<> +string ESQueryNode_Op_Nested<ceph::real_time>::type_str() const { + return "date"; +} + +bool ESQueryNode_Op::handle_nested(ESQueryNode **pnode, string *perr) +{ + string field_name = field; + const string& custom_prefix = compiler->get_custom_prefix(); + if (!boost::algorithm::starts_with(field_name, custom_prefix)) { + *pnode = this; + auto m = compiler->get_generic_type_map(); + if (m) { + bool found = m->find(field_name, &entity_type) && + (allow_restricted || !compiler->is_restricted(field_name)); + if (!found) { + *perr = string("unexpected generic field '") + field_name + "'"; + } + return found; + } + *perr = "query parser does not support generic types"; + return false; + } + + field_name = field_name.substr(custom_prefix.size()); + auto m = compiler->get_custom_type_map(); + if (m) { + m->find(field_name, &entity_type); + /* ignoring returned bool, for now just treat it as string */ + } + + ESQueryNode_Op_Nested_Parent *new_node; + switch (entity_type) { + case ESEntityTypeMap::ES_ENTITY_INT: + new_node = new ESQueryNode_Op_Nested<int64_t>(compiler, field_name, this); + break; + case ESEntityTypeMap::ES_ENTITY_DATE: + new_node = new ESQueryNode_Op_Nested<ceph::real_time>(compiler, field_name, this); + break; + default: + new_node = new ESQueryNode_Op_Nested<string>(compiler, field_name, this); + } + + field = new_node->get_custom_leaf_field_name(); + *pnode = new_node; + + return true; +} + +static bool is_bool_op(const string& str) +{ + return (str == "or" || str == "and"); +} + +static bool alloc_node(ESQueryCompiler *compiler, ESQueryStack *s, ESQueryNode **pnode, string *perr) +{ + string op; + bool valid = s->peek(&op); + if (!valid) { + *perr = "incorrect expression"; + return false; + } + + ESQueryNode *node; + + if (is_bool_op(op)) { + node = new ESQueryNode_Bool(compiler); + } else if (op == "==") { + node = new ESQueryNode_Op_Equal(compiler); + } else { + static map<string, string> range_op_map = { + { "<", "lt"}, + { "<=", "lte"}, + { ">=", "gte"}, + { ">", "gt"}, + }; + + auto iter = range_op_map.find(op); + if (iter == range_op_map.end()) { + *perr = string("invalid operator: ") + op; + return false; + } + + node = new ESQueryNode_Op_Range(compiler, iter->second); + } + + if (!node->init(s, pnode, perr)) { + delete node; + return false; + } + return true; +} + + +bool is_key_char(char c) +{ + switch (c) { + case '(': + case ')': + case '<': + case '>': + case '@': + case ',': + case ';': + case ':': + case '\\': + case '"': + case '/': + case '[': + case ']': + case '?': + case '=': + case '{': + case '}': + case ' ': + case '\t': + return false; + }; + return (isascii(c) > 0); +} + +static bool is_op_char(char c) +{ + switch (c) { + case '<': + case '=': + case '>': + return true; + }; + return false; +} + +static bool is_val_char(char c) +{ + if (isspace(c)) { + return false; + } + return (c != ')'); +} + +void ESInfixQueryParser::skip_whitespace(const char *str, int size, int& pos) { + while (pos < size && isspace(str[pos])) { + ++pos; + } +} + +bool ESInfixQueryParser::get_next_token(bool (*filter)(char)) { + skip_whitespace(str, size, pos); + int token_start = pos; + while (pos < size && filter(str[pos])) { + ++pos; + } + if (pos == token_start) { + return false; + } + string token = string(str + token_start, pos - token_start); + args.push_back(token); + return true; +} + +bool ESInfixQueryParser::parse_condition() { + /* + * condition: <key> <operator> <val> + * + * whereas key: needs to conform to http header field restrictions + * operator: one of the following: < <= == >= > + * val: ascii, terminated by either space or ')' (or end of string) + */ + + /* parse key */ + bool valid = get_next_token(is_key_char) && + get_next_token(is_op_char) && + get_next_token(is_val_char); + + if (!valid) { + return false; + } + + return true; +} + +bool ESInfixQueryParser::parse_and_or() { + skip_whitespace(str, size, pos); + if (pos + 3 <= size && strncmp(str + pos, "and", 3) == 0) { + pos += 3; + args.push_back("and"); + return true; + } + + if (pos + 2 <= size && strncmp(str + pos, "or", 2) == 0) { + pos += 2; + args.push_back("or"); + return true; + } + + return false; +} + +bool ESInfixQueryParser::parse_specific_char(const char *pchar) { + skip_whitespace(str, size, pos); + if (pos >= size) { + return false; + } + if (str[pos] != *pchar) { + return false; + } + + args.push_back(pchar); + ++pos; + return true; +} + +bool ESInfixQueryParser::parse_open_bracket() { + return parse_specific_char("("); +} + +bool ESInfixQueryParser::parse_close_bracket() { + return parse_specific_char(")"); +} + +bool ESInfixQueryParser::parse(list<string> *result) { + /* + * expression: [(]<condition>[[and/or]<condition>][)][and/or]... + */ + + while (pos < size) { + parse_open_bracket(); + if (!parse_condition()) { + return false; + } + parse_close_bracket(); + parse_and_or(); + } + + result->swap(args); + + return true; +} + +bool ESQueryCompiler::convert(list<string>& infix, string *perr) { + list<string> prefix; + if (!infix_to_prefix(infix, &prefix)) { + *perr = "invalid query"; + return false; + } + stack.assign(prefix); + if (!alloc_node(this, &stack, &query_root, perr)) { + return false; + } + if (!stack.done()) { + *perr = "invalid query"; + return false; + } + return true; +} + +ESQueryCompiler::~ESQueryCompiler() { + delete query_root; +} + +bool ESQueryCompiler::compile(string *perr) { + list<string> infix; + if (!parser.parse(&infix)) { + *perr = "failed to parse query"; + return false; + } + + if (!convert(infix, perr)) { + return false; + } + + for (auto& c : eq_conds) { + ESQueryNode_Op_Equal *eq_node = new ESQueryNode_Op_Equal(this, c.first, c.second); + eq_node->set_allow_restricted(true); /* can access restricted fields */ + ESQueryNode *effective_node; + if (!eq_node->init(nullptr, &effective_node, perr)) { + delete eq_node; + return false; + } + query_root = new ESQueryNode_Bool(this, "and", effective_node, query_root); + } + + return true; +} + +void ESQueryCompiler::dump(Formatter *f) const { + encode_json("query", *query_root, f); +} + diff --git a/src/rgw/rgw_es_query.h b/src/rgw/rgw_es_query.h new file mode 100644 index 00000000000..4375d93251c --- /dev/null +++ b/src/rgw/rgw_es_query.h @@ -0,0 +1,162 @@ +#ifndef CEPH_RGW_ES_QUERY_H +#define CEPH_RGW_ES_QUERY_H + +#include "rgw_string.h" + +class ESQueryStack { + list<string> l; + list<string>::iterator iter; + +public: + ESQueryStack(list<string>& src) { + assign(src); + } + + ESQueryStack() {} + + void assign(list<string>& src) { + l.swap(src); + iter = l.begin(); + } + + bool peek(string *dest) { + if (done()) { + return false; + } + *dest = *iter; + return true; + } + + bool pop(string *dest) { + bool valid = peek(dest); + if (!valid) { + return false; + } + ++iter; + return true; + } + + bool done() { + return (iter == l.end()); + } +}; + +class ESInfixQueryParser { + string query; + int size; + const char *str; + int pos{0}; + list<string> args; + + void skip_whitespace(const char *str, int size, int& pos); + bool get_next_token(bool (*filter)(char)); + + bool parse_condition(); + bool parse_and_or(); + bool parse_specific_char(const char *pchar); + bool parse_open_bracket(); + bool parse_close_bracket(); + +public: + ESInfixQueryParser(const string& _query) : query(_query), size(query.size()), str(query.c_str()) {} + bool parse(list<string> *result); +}; + +class ESQueryNode; + +struct ESEntityTypeMap { + enum EntityType { + ES_ENTITY_NONE = 0, + ES_ENTITY_STR = 1, + ES_ENTITY_INT = 2, + ES_ENTITY_DATE = 3, + }; + + map<string, EntityType> m; + + ESEntityTypeMap(map<string, EntityType>& _m) : m(_m) {} + + bool find(const string& entity, EntityType *ptype) { + auto i = m.find(entity); + if (i != m.end()) { + *ptype = i->second; + return true; + } + + *ptype = ES_ENTITY_NONE; + return false; + } +}; + +class ESQueryCompiler { + ESInfixQueryParser parser; + ESQueryStack stack; + ESQueryNode *query_root{nullptr}; + + string custom_prefix; + + bool convert(list<string>& infix, string *perr); + + list<pair<string, string> > eq_conds; + + ESEntityTypeMap *generic_type_map{nullptr}; + ESEntityTypeMap *custom_type_map{nullptr}; + + map<string, string, ltstr_nocase> *field_aliases; + set<string> *restricted_fields; + +public: + ESQueryCompiler(const string& query, list<pair<string, string> > *prepend_eq_conds, const string& _custom_prefix) : parser(query), custom_prefix(_custom_prefix) { + if (prepend_eq_conds) { + eq_conds = std::move(*prepend_eq_conds); + } + } + ~ESQueryCompiler(); + + bool compile(string *perr); + void dump(Formatter *f) const; + + void set_generic_type_map(ESEntityTypeMap *entity_map) { + generic_type_map = entity_map; + } + + ESEntityTypeMap *get_generic_type_map() { + return generic_type_map; + } + const string& get_custom_prefix() { return custom_prefix; } + + void set_custom_type_map(ESEntityTypeMap *entity_map) { + custom_type_map = entity_map; + } + + ESEntityTypeMap *get_custom_type_map() { + return custom_type_map; + } + + void set_field_aliases(map<string, string, ltstr_nocase> *fa) { + field_aliases = fa; + } + + string unalias_field(const string& field) { + if (!field_aliases) { + return field; + } + auto i = field_aliases->find(field); + if (i == field_aliases->end()) { + return field; + } + + return i->second; + } + + void set_restricted_fields(set<string> *rf) { + restricted_fields = rf; + } + + bool is_restricted(const string& f) { + return (restricted_fields && restricted_fields->find(f) != restricted_fields->end()); + } +}; + + +#endif diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 81c4c6ef6bc..f9f8d2ac346 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -314,7 +314,7 @@ int RGWHTTPClient::get_req_retcode() /* * init request, will be used later with RGWHTTPManager */ -int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data) +int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint) { assert(!req_data); _req_data->get(); @@ -349,7 +349,7 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re } curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); - if (is_upload_request(method)) { + if (send_data_hint || is_upload_request(method)) { curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); } if (has_send_len) { @@ -746,11 +746,11 @@ void RGWHTTPManager::manage_pending_requests() } } -int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url) +int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint) { rgw_http_req_data *req_data = new rgw_http_req_data; - int ret = client->init_request(method, url, req_data); + int ret = client->init_request(method, url, req_data, send_data_hint); if (ret < 0) { req_data->put(); req_data = NULL; diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index cbe4f3d0312..63ea9000194 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -42,7 +42,8 @@ protected: int init_request(const char *method, const char *url, - rgw_http_req_data *req_data); + rgw_http_req_data *req_data, + bool send_data_hint = false); virtual int receive_header(void *ptr, size_t len) { return 0; @@ -264,7 +265,8 @@ public: int set_threaded(); void stop(); - int add_request(RGWHTTPClient *client, const char *method, const char *url); + int add_request(RGWHTTPClient *client, const char *method, const char *url, + bool send_data_hint = false); int remove_request(RGWHTTPClient *client); /* only for non threaded case */ diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 2a183b59195..f5f3612239e 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -201,6 +201,13 @@ void ACLOwner::dump(Formatter *f) const encode_json("display_name", display_name, f); } +void ACLOwner::decode_json(JSONObj *obj) { + string id_str; + JSONDecoder::decode_json("id", id_str, obj); + id.from_str(id_str); + JSONDecoder::decode_json("display_name", display_name, obj); +} + void RGWAccessControlPolicy::dump(Formatter *f) const { encode_json("acl", acl, f); @@ -730,6 +737,7 @@ void RGWBucketInfo::dump(Formatter *f) const encode_json("swift_versioning", swift_versioning, f); encode_json("swift_ver_location", swift_ver_location, f); encode_json("index_type", (uint32_t)index_type, f); + encode_json("mdsearch_config", mdsearch_config, f); } void RGWBucketInfo::decode_json(JSONObj *obj) { @@ -761,6 +769,7 @@ void RGWBucketInfo::decode_json(JSONObj *obj) { uint32_t it; JSONDecoder::decode_json("index_type", it, obj); index_type = (RGWBucketIndexType)it; + JSONDecoder::decode_json("mdsearch_config", mdsearch_config, obj); } void rgw_obj_key::dump(Formatter *f) const diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index f018faba8d6..5c3dbb0582b 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -182,6 +182,12 @@ static RGWRESTMgr *set_logging(RGWRESTMgr *mgr) return mgr; } +static RGWRESTMgr *rest_filter(RGWRados *store, int dialect, RGWRESTMgr *orig) +{ + RGWSyncModuleInstanceRef sync_module = store->get_sync_module(); + return sync_module->get_rest_filter(dialect, orig); +} + RGWRealmReloader *preloader = NULL; static void reloader_handler(int signum) @@ -377,7 +383,8 @@ int main(int argc, const char **argv) const bool swift_at_root = g_conf->rgw_swift_url_prefix == "/"; if (apis_map.count("s3") > 0 || s3website_enabled) { if (! swift_at_root) { - rest.register_default_mgr(set_logging(new RGWRESTMgr_S3(s3website_enabled))); + rest.register_default_mgr(set_logging(rest_filter(store, RGW_REST_S3, + new RGWRESTMgr_S3(s3website_enabled)))); } else { derr << "Cannot have the S3 or S3 Website enabled together with " << "Swift API placed in the root of hierarchy" << dendl; @@ -401,7 +408,8 @@ int main(int argc, const char **argv) if (! swift_at_root) { rest.register_resource(g_conf->rgw_swift_url_prefix, - set_logging(swift_resource)); + set_logging(rest_filter(store, RGW_REST_SWIFT, + swift_resource))); } else { if (store->get_zonegroup().zones.size() > 1) { derr << "Placing Swift API in the root of URL hierarchy while running" diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index ac025c6e7d9..d4a9326c3dd 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -631,7 +631,7 @@ rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store, return e; } -static void rgw_bucket_object_pre_exec(struct req_state *s) +void rgw_bucket_object_pre_exec(struct req_state *s) { if (s->expect_cont) dump_continue(s); @@ -6308,6 +6308,77 @@ void RGWGetObjLayout::execute() } +int RGWConfigBucketMetaSearch::verify_permission() +{ + if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) { + return -EACCES; + } + + return 0; +} + +void RGWConfigBucketMetaSearch::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +void RGWConfigBucketMetaSearch::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + ldout(s->cct, 20) << "NOTICE: get_params() returned ret=" << op_ret << dendl; + return; + } + + s->bucket_info.mdsearch_config = mdsearch_config; + + op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs); + if (op_ret < 0) { + ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl; + return; + } +} + +int RGWGetBucketMetaSearch::verify_permission() +{ + if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) { + return -EACCES; + } + + return 0; +} + +void RGWGetBucketMetaSearch::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +int RGWDelBucketMetaSearch::verify_permission() +{ + if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) { + return -EACCES; + } + + return 0; +} + +void RGWDelBucketMetaSearch::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +void RGWDelBucketMetaSearch::execute() +{ + s->bucket_info.mdsearch_config.clear(); + + op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs); + if (op_ret < 0) { + ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl; + return; + } +} + + RGWHandler::~RGWHandler() { } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 1b4f02f1b7f..62c71fd98e7 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -97,6 +97,9 @@ public: }; + +void rgw_bucket_object_pre_exec(struct req_state *s); + /** * Provide the base class for all ops. */ @@ -2012,4 +2015,49 @@ public: }; +class RGWConfigBucketMetaSearch : public RGWOp { +protected: + std::map<std::string, uint32_t> mdsearch_config; +public: + RGWConfigBucketMetaSearch() {} + + int verify_permission(); + void pre_exec(); + void execute(); + + virtual int get_params() = 0; + virtual void send_response() = 0; + virtual const string name() { return "config_bucket_meta_search"; } + virtual RGWOpType get_type() { return RGW_OP_CONFIG_BUCKET_META_SEARCH; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } +}; + +class RGWGetBucketMetaSearch : public RGWOp { +public: + RGWGetBucketMetaSearch() {} + + int verify_permission(); + void pre_exec(); + void execute() {} + + virtual void send_response() = 0; + virtual const string name() { return "get_bucket_meta_search"; } + virtual RGWOpType get_type() { return RGW_OP_GET_BUCKET_META_SEARCH; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; } +}; + +class RGWDelBucketMetaSearch : public RGWOp { +public: + RGWDelBucketMetaSearch() {} + + int verify_permission(); + void pre_exec(); + void execute(); + + virtual void send_response() = 0; + virtual const string name() { return "delete_bucket_meta_search"; } + virtual RGWOpType delete_type() { return RGW_OP_DEL_BUCKET_META_SEARCH; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } +}; + #endif /* CEPH_RGW_OP_H */ diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 6e0cca14c16..cf426ff7ca7 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1154,7 +1154,7 @@ struct RGWZoneParams : RGWSystemMetaObj { string realm_id; - map<string, string> tier_config; + map<string, string, ltstr_nocase> tier_config; RGWZoneParams() : RGWSystemMetaObj() {} RGWZoneParams(const string& name) : RGWSystemMetaObj(name){} diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index ba3363cbd04..1cccbab7d02 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -616,17 +616,18 @@ int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime) return status; } -int RGWRESTStreamRWRequest::get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj) +int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr) { string urlsafe_bucket, urlsafe_object; url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket); url_encode(obj.key.name, urlsafe_object); string resource = urlsafe_bucket + "/" + urlsafe_object; - return get_resource(key, extra_headers, resource); + return send_request(&key, extra_headers, resource, nullptr, mgr); } -int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr) +int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, + bufferlist *send_data, RGWHTTPManager *mgr) { string new_url = url; if (new_url[new_url.size() - 1] != '/') @@ -671,10 +672,12 @@ int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>& new_info.init_meta_info(NULL); - int ret = sign_request(key, new_env, new_info); - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to sign request" << dendl; - return ret; + if (key) { + int ret = sign_request(*key, new_env, new_info); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to sign request" << dendl; + return ret; + } } map<string, string, ltstr_nocase>& m = new_env.get_map(); @@ -683,12 +686,18 @@ int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>& headers.push_back(pair<string, string>(iter->first, iter->second)); } + bool send_data_hint = false; + if (send_data) { + outbl.claim(*send_data); + send_data_hint = true; + } + RGWHTTPManager *pmanager = &http_manager; if (mgr) { pmanager = mgr; } - int r = pmanager->add_request(this, new_info.method, new_url.c_str()); + int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint); if (r < 0) return r; @@ -701,7 +710,7 @@ int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>& return 0; } -int RGWRESTStreamRWRequest::complete(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs) +int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs) { set_str_from_headers(out_headers, "ETAG", etag); if (status >= 0) { diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index e09bb2fd57b..09393f8ea4a 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -103,10 +103,10 @@ public: lock("RGWRESTStreamReadRequest"), cb(_cb), chunk_ofs(0), ofs(0), http_manager(_cct), method(_method), write_ofs(0) { } - ~RGWRESTStreamRWRequest() override {} - int get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj); - int get_resource(RGWAccessKey& key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr = NULL); - int complete(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs); + virtual ~RGWRESTStreamRWRequest() override {} + int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL); + int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = NULL /* optional input data */, RGWHTTPManager *mgr = NULL); + int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs); void set_outbl(bufferlist& _outbl) { outbl.swap(_outbl); diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 71a11554d2c..6496f129d79 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -203,7 +203,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER"); } - int r = (*req)->get_obj(key, extra_headers, obj); + int r = (*req)->send_request(key, extra_headers, obj); if (r < 0) { delete *req; *req = nullptr; @@ -215,7 +215,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req, string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs) { - int ret = req->complete(etag, mtime, psize, attrs); + int ret = req->complete_request(etag, mtime, psize, attrs); delete req; return ret; @@ -225,6 +225,7 @@ int RGWRESTConn::get_resource(const string& resource, param_vec_t *extra_params, map<string, string> *extra_headers, bufferlist& bl, + bufferlist *send_data, RGWHTTPManager *mgr) { string url; @@ -249,15 +250,15 @@ int RGWRESTConn::get_resource(const string& resource, headers.insert(extra_headers->begin(), extra_headers->end()); } - ret = req.get_resource(key, headers, resource, mgr); + ret = req.send_request(&key, headers, resource, send_data, mgr); if (ret < 0) { - ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl; + ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; } string etag; map<string, string> attrs; - return req.complete(etag, NULL, NULL, attrs); + return req.complete_request(etag, NULL, NULL, attrs); } RGWRESTReadResource::RGWRESTReadResource(RGWRESTConn *_conn, @@ -296,22 +297,22 @@ void RGWRESTReadResource::init_common(param_vec_t *extra_headers) int RGWRESTReadResource::read() { - int ret = req.get_resource(conn->get_key(), headers, resource, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); if (ret < 0) { - ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl; + ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; } string etag; map<string, string> attrs; - return req.complete(etag, NULL, NULL, attrs); + return req.complete_request(etag, NULL, NULL, attrs); } int RGWRESTReadResource::aio_read() { - int ret = req.get_resource(conn->get_key(), headers, resource, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); if (ret < 0) { - ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl; + ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; } @@ -357,23 +358,23 @@ void RGWRESTSendResource::init_common(param_vec_t *extra_headers) int RGWRESTSendResource::send(bufferlist& outbl) { req.set_outbl(outbl); - int ret = req.get_resource(conn->get_key(), headers, resource, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); if (ret < 0) { - ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl; + ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; } string etag; map<string, string> attrs; - return req.complete(etag, NULL, NULL, attrs); + return req.complete_request(etag, NULL, NULL, attrs); } int RGWRESTSendResource::aio_send(bufferlist& outbl) { req.set_outbl(outbl); - int ret = req.get_resource(conn->get_key(), headers, resource, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); if (ret < 0) { - ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl; + ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; } diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index bec829d6939..f4a1005b6a3 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -100,9 +100,13 @@ public: int get_resource(const string& resource, param_vec_t *extra_params, map<string, string>* extra_headers, - bufferlist& bl, RGWHTTPManager *mgr = NULL); + bufferlist& bl, + bufferlist *send_data = nullptr, + RGWHTTPManager *mgr = nullptr); template <class T> + int get_json_resource(const string& resource, param_vec_t *params, bufferlist *in_data, T& t); + template <class T> int get_json_resource(const string& resource, param_vec_t *params, T& t); template <class T> int get_json_resource(const string& resource, const rgw_http_param_pair *pp, T& t); @@ -110,10 +114,10 @@ public: template<class T> -int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, T& t) +int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, bufferlist *in_data, T& t) { bufferlist bl; - int ret = get_resource(resource, params, NULL, bl); + int ret = get_resource(resource, params, nullptr, bl, in_data); if (ret < 0) { return ret; } @@ -127,6 +131,12 @@ int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, } template<class T> +int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, T& t) +{ + return get_json_resource(resource, params, nullptr, t); +} + +template<class T> int RGWRESTConn::get_json_resource(const string& resource, const rgw_http_param_pair *pp, T& t) { param_vec_t params = make_param_list(pp); diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 7174a0a4e20..eef63a57ce9 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -29,6 +29,8 @@ #include "rgw_auth_keystone.h" #include "rgw_auth_registry.h" +#include "rgw_es_query.h" + #include <typeinfo> // for 'typeid' #include "rgw_ldap.h" @@ -2713,6 +2715,115 @@ void RGWGetObjLayout_ObjStore_S3::send_response() rgw_flush_formatter(s, &f); } +int RGWConfigBucketMetaSearch_ObjStore_S3::get_params() +{ + auto iter = s->info.x_meta_map.find("x-amz-meta-search"); + if (iter == s->info.x_meta_map.end()) { + s->err.message = "X-Rgw-Meta-Search header not provided"; + ldout(s->cct, 5) << s->err.message << dendl; + return -EINVAL; + } + + list<string> expressions; + get_str_list(iter->second, ",", expressions); + + for (auto& expression : expressions) { + vector<string> args; + get_str_vec(expression, ";", args); + + if (args.empty()) { + s->err.message = "invalid empty expression"; + ldout(s->cct, 5) << s->err.message << dendl; + return -EINVAL; + } + if (args.size() > 2) { + s->err.message = string("invalid expression: ") + expression; + ldout(s->cct, 5) << s->err.message << dendl; + return -EINVAL; + } + + string key = boost::algorithm::to_lower_copy(rgw_trim_whitespace(args[0])); + string val; + if (args.size() > 1) { + val = boost::algorithm::to_lower_copy(rgw_trim_whitespace(args[1])); + } + + if (!boost::algorithm::starts_with(key, RGW_AMZ_META_PREFIX)) { + s->err.message = string("invalid expression, key must start with '" RGW_AMZ_META_PREFIX "' : ") + expression; + ldout(s->cct, 5) << s->err.message << dendl; + return -EINVAL; + } + + key = key.substr(sizeof(RGW_AMZ_META_PREFIX) - 1); + + ESEntityTypeMap::EntityType entity_type; + + if (val.empty() || val == "str" || val == "string") { + entity_type = ESEntityTypeMap::ES_ENTITY_STR; + } else if (val == "int" || val == "integer") { + entity_type = ESEntityTypeMap::ES_ENTITY_INT; + } else if (val == "date" || val == "datetime") { + entity_type = ESEntityTypeMap::ES_ENTITY_DATE; + } else { + s->err.message = string("invalid entity type: ") + val; + ldout(s->cct, 5) << s->err.message << dendl; + return -EINVAL; + } + + mdsearch_config[key] = entity_type; + } + + return 0; +} + +void RGWConfigBucketMetaSearch_ObjStore_S3::send_response() +{ + if (op_ret) + set_req_state_err(s, op_ret); + dump_errno(s); + end_header(s, this); +} + +void RGWGetBucketMetaSearch_ObjStore_S3::send_response() +{ + if (op_ret) + set_req_state_err(s, op_ret); + dump_errno(s); + end_header(s, NULL, "application/xml"); + + Formatter *f = s->formatter; + f->open_array_section("GetBucketMetaSearchResult"); + for (auto& e : s->bucket_info.mdsearch_config) { + f->open_object_section("Entry"); + string k = string("x-amz-meta-") + e.first; + f->dump_string("Key", k.c_str()); + const char *type; + switch (e.second) { + case ESEntityTypeMap::ES_ENTITY_INT: + type = "int"; + break; + case ESEntityTypeMap::ES_ENTITY_DATE: + type = "date"; + break; + default: + type = "str"; + } + f->dump_string("Type", type); + f->close_section(); + } + f->close_section(); + rgw_flush_formatter(s, f); +} + +void RGWDelBucketMetaSearch_ObjStore_S3::send_response() +{ + if (op_ret) + set_req_state_err(s, op_ret); + dump_errno(s); + end_header(s, this); +} + + RGWOp *RGWHandler_REST_Service_S3::op_get() { if (is_usage_op()) { @@ -2756,10 +2867,11 @@ RGWOp *RGWHandler_REST_Service_S3::op_post() RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data) { // Non-website mode - if (get_data) + if (get_data) { return new RGWListBucket_ObjStore_S3; - else + } else { return new RGWStatBucket_ObjStore_S3; + } } RGWOp *RGWHandler_REST_Bucket_S3::op_get() @@ -2780,6 +2892,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get() return new RGWGetBucketWebsite_ObjStore_S3; } + if (s->info.args.exists("mdsearch")) { + return new RGWGetBucketMetaSearch_ObjStore_S3; + } + if (is_acl_op()) { return new RGWGetACLs_ObjStore_S3; } else if (is_cors_op()) { @@ -2849,6 +2965,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete() return new RGWDeleteBucketWebsite_ObjStore_S3; } + if (s->info.args.exists("mdsearch")) { + return new RGWDelBucketMetaSearch_ObjStore_S3; + } + return new RGWDeleteBucket_ObjStore_S3; } @@ -2858,6 +2978,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_post() return new RGWDeleteMultiObj_ObjStore_S3; } + if (s->info.args.exists("mdsearch")) { + return new RGWConfigBucketMetaSearch_ObjStore_S3; + } + return new RGWPostObj_ObjStore_S3; } diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index 8dcf242b283..d426b3d40ca 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -418,6 +418,30 @@ public: void send_response(); }; +class RGWConfigBucketMetaSearch_ObjStore_S3 : public RGWConfigBucketMetaSearch { +public: + RGWConfigBucketMetaSearch_ObjStore_S3() {} + ~RGWConfigBucketMetaSearch_ObjStore_S3() {} + + int get_params() override; + void send_response() override; +}; + +class RGWGetBucketMetaSearch_ObjStore_S3 : public RGWGetBucketMetaSearch { +public: + RGWGetBucketMetaSearch_ObjStore_S3() {} + ~RGWGetBucketMetaSearch_ObjStore_S3() {} + + void send_response() override; +}; + +class RGWDelBucketMetaSearch_ObjStore_S3 : public RGWDelBucketMetaSearch { +public: + RGWDelBucketMetaSearch_ObjStore_S3() {} + ~RGWDelBucketMetaSearch_ObjStore_S3() {} + + void send_response() override; +}; class RGW_Auth_S3 { private: diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 9d8d250132b..a9ef1ac784f 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1508,10 +1508,12 @@ public: collect_children(); } while ((int)entries.size() == max_entries && can_adjust_marker); +ldout(cct, 0) << __FILE__ << ":" << __LINE__ << ":" << *this << ": num_spawned()=" << num_spawned() << dendl; while (num_spawned() > 1) { yield wait_for_child(); collect_children(); } +ldout(cct, 0) << __FILE__ << ":" << __LINE__ << ":" << *this << ": num_spawned()=" << num_spawned() << dendl; if (!lost_lock) { /* update marker to reflect we're done with full sync */ diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index f6f070b1c36..278d1df3893 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -16,6 +16,12 @@ public: RGWDataSyncModule() {} virtual ~RGWDataSyncModule() {} + virtual void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {} + + virtual RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) { + return nullptr; + } + virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; @@ -23,11 +29,16 @@ public: rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; }; +class RGWRESTMgr; + class RGWSyncModuleInstance { public: RGWSyncModuleInstance() {} virtual ~RGWSyncModuleInstance() {} virtual RGWDataSyncModule *get_data_handler() = 0; + virtual RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) { + return orig; + } }; typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef; @@ -39,7 +50,7 @@ public: virtual ~RGWSyncModule() {} virtual bool supports_data_export() = 0; - virtual int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) = 0; + virtual int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) = 0; }; typedef std::shared_ptr<RGWSyncModule> RGWSyncModuleRef; @@ -80,7 +91,7 @@ public: return module.get()->supports_data_export(); } - int create_instance(CephContext *cct, const string& name, map<string, string>& config, RGWSyncModuleInstanceRef *instance) { + int create_instance(CephContext *cct, const string& name, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) { RGWSyncModuleRef module; if (!get_module(name, &module)) { return -ENOENT; diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index d500e9b937b..443057f8115 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -4,34 +4,258 @@ #include "rgw_data_sync.h" #include "rgw_boost_asio_yield.h" #include "rgw_sync_module_es.h" +#include "rgw_sync_module_es_rest.h" #include "rgw_rest_conn.h" #include "rgw_cr_rest.h" +#include "rgw_op.h" +#include "rgw_es_query.h" + +#include "include/str_list.h" #define dout_subsys ceph_subsys_rgw + +/* + * whitelist utility. Config string is a list of entries, where an entry is either an item, + * a prefix, or a suffix. An item would be the name of the entity that we'd look up, + * a prefix would be a string ending with an asterisk, a suffix would be a string starting + * with an asterisk. For example: + * + * bucket1, bucket2, foo*, *bar + */ +class ItemList { + bool approve_all{false}; + + set<string> entries; + set<string> prefixes; + set<string> suffixes; + + void parse(const string& str) { + list<string> l; + + get_str_list(str, ",", l); + + for (auto& entry : l) { + entry = rgw_trim_whitespace(entry); + if (entry.empty()) { + continue; + } + + if (entry == "*") { + approve_all = true; + return; + } + + if (entry[0] == '*') { + suffixes.insert(entry.substr(1)); + continue; + } + + if (entry.back() == '*') { + prefixes.insert(entry.substr(0, entry.size() - 1)); + continue; + } + + entries.insert(entry); + } + } + +public: + ItemList() {} + void init(const string& str, bool def_val) { + if (str.empty()) { + approve_all = def_val; + } else { + parse(str); + } + } + + bool exists(const string& entry) { + if (approve_all) { + return true; + } + + if (entries.find(entry) != entries.end()) { + return true; + } + + auto i = prefixes.upper_bound(entry); + if (i != prefixes.begin()) { + --i; + if (boost::algorithm::starts_with(entry, *i)) { + return true; + } + } + + for (i = suffixes.begin(); i != suffixes.end(); ++i) { + if (boost::algorithm::ends_with(entry, *i)) { + return true; + } + } + + return false; + } +}; + +#define ES_NUM_SHARDS_MIN 5 + +#define ES_NUM_SHARDS_DEFAULT 16 +#define ES_NUM_REPLICAS_DEFAULT 1 + struct ElasticConfig { + uint64_t sync_instance{0}; string id; - RGWRESTConn *conn{nullptr}; + string index_path; + std::unique_ptr<RGWRESTConn> conn; + bool explicit_custom_meta{true}; + string override_index_path; + ItemList index_buckets; + ItemList allow_owners; + uint32_t num_shards{0}; + uint32_t num_replicas{0}; + + void init(CephContext *cct, const map<string, string, ltstr_nocase>& config) { + string elastic_endpoint = rgw_conf_get(config, "endpoint", ""); + id = string("elastic:") + elastic_endpoint; + conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint })); + explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true); + index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */ + allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */ + override_index_path = rgw_conf_get(config, "override_index_path", ""); + num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT); + if (num_shards < ES_NUM_SHARDS_MIN) { + num_shards = ES_NUM_SHARDS_MIN; + } + num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT); + } + + void init_instance(RGWRealm& realm, uint64_t instance_id) { + sync_instance = instance_id; + + if (!override_index_path.empty()) { + index_path = override_index_path; + return; + } + + char buf[32]; + snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF)); + + index_path = "/rgw-" + realm.get_name() + buf; + } + + string get_index_path() { + return index_path; + } + + string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) { + return index_path + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance); + } + + bool should_handle_operation(RGWBucketInfo& bucket_info) { + return index_buckets.exists(bucket_info.bucket.name) && + allow_owners.exists(bucket_info.owner.to_str()); + } }; -static string es_get_obj_path(const RGWRealm& realm, const RGWBucketInfo& bucket_info, const rgw_obj_key& key) -{ - string path = "/rgw-" + realm.get_name() + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance; - return path; -} +using ElasticConfigRef = std::shared_ptr<ElasticConfig>; + +struct es_dump_type { + const char *type; + const char *format; + bool analyzed; + + es_dump_type(const char *t, const char *f = nullptr, bool a = false) : type(t), format(f), analyzed(a) {} + + void dump(Formatter *f) const { + encode_json("type", type, f); + if (format) { + encode_json("format", format, f); + } + if (!analyzed && strcmp(type, "string") == 0) { + encode_json("index", "not_analyzed", f); + } + } +}; + +struct es_index_mappings { + void dump_custom(Formatter *f, const char *section, const char *type, const char *format) const { + f->open_object_section(section); + ::encode_json("type", "nested", f); + f->open_object_section("properties"); + encode_json("name", es_dump_type("string"), f); + encode_json("value", es_dump_type(type, format), f); + f->close_section(); // entry + f->close_section(); // custom-string + } + void dump(Formatter *f) const { + f->open_object_section("object"); + f->open_object_section("properties"); + encode_json("bucket", es_dump_type("string"), f); + encode_json("name", es_dump_type("string"), f); + encode_json("instance", es_dump_type("string"), f); + encode_json("versioned_epoch", es_dump_type("long"), f); + f->open_object_section("meta"); + f->open_object_section("properties"); + encode_json("cache_control", es_dump_type("string"), f); + encode_json("content_disposition", es_dump_type("string"), f); + encode_json("content_encoding", es_dump_type("string"), f); + encode_json("content_language", es_dump_type("string"), f); + encode_json("content_type", es_dump_type("string"), f); + encode_json("etag", es_dump_type("string"), f); + encode_json("expires", es_dump_type("string"), f); + f->open_object_section("mtime"); + ::encode_json("type", "date", f); + ::encode_json("format", "strict_date_optional_time||epoch_millis", f); + f->close_section(); // mtime + encode_json("size", es_dump_type("long"), f); + dump_custom(f, "custom-string", "string", nullptr); + dump_custom(f, "custom-int", "long", nullptr); + dump_custom(f, "custom-date", "date", "strict_date_optional_time||epoch_millis"); + f->close_section(); // properties + f->close_section(); // meta + f->close_section(); // properties + f->close_section(); // object + } +}; + +struct es_index_settings { + uint32_t num_replicas; + uint32_t num_shards; + + es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {} + + void dump(Formatter *f) const { + encode_json("number_of_replicas", num_replicas, f); + encode_json("number_of_shards", num_shards, f); + } +}; + +struct es_index_config { + es_index_settings settings; + es_index_mappings mappings; + + es_index_config(es_index_settings& _s, es_index_mappings& _m) : settings(_s), mappings(_m) {} + + void dump(Formatter *f) const { + encode_json("settings", settings, f); + encode_json("mappings", mappings, f); + } +}; struct es_obj_metadata { CephContext *cct; + ElasticConfigRef es_conf; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; uint64_t size; map<string, bufferlist> attrs; + uint64_t versioned_epoch; - es_obj_metadata(CephContext *_cct, const RGWBucketInfo& _bucket_info, + es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size, - map<string, bufferlist>& _attrs) : cct(_cct), bucket_info(_bucket_info), key(_key), - mtime(_mtime), size(_size), attrs(std::move(_attrs)) {} + map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key), + mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {} void dump(Formatter *f) const { map<string, string> out_attrs; @@ -88,6 +312,7 @@ struct es_obj_metadata { ::encode_json("bucket", bucket_info.bucket.name, f); ::encode_json("name", key.name, f); ::encode_json("instance", key.instance, f); + ::encode_json("versioned_epoch", versioned_epoch, f); ::encode_json("owner", policy.get_owner(), f); ::encode_json("permissions", permissions, f); f->open_object_section("meta"); @@ -99,34 +324,133 @@ struct es_obj_metadata { for (auto i : out_attrs) { ::encode_json(i.first.c_str(), i.second, f); } - if (!custom_meta.empty()) { - f->open_object_section("custom"); - for (auto i : custom_meta) { - ::encode_json(i.first.c_str(), i.second, f); + map<string, string> custom_str; + map<string, string> custom_int; + map<string, string> custom_date; + + for (auto i : custom_meta) { + auto config = bucket_info.mdsearch_config.find(i.first); + if (config == bucket_info.mdsearch_config.end()) { + if (!es_conf->explicit_custom_meta) { + /* default custom meta is of type string */ + custom_str[i.first] = i.second; + } else { + ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; + } + continue; + } + switch (config->second) { + case ESEntityTypeMap::ES_ENTITY_DATE: + custom_date[i.first] = i.second; + break; + case ESEntityTypeMap::ES_ENTITY_INT: + custom_int[i.first] = i.second; + break; + default: + custom_str[i.first] = i.second; + } + } + + if (!custom_str.empty()) { + f->open_array_section("custom-string"); + for (auto i : custom_str) { + f->open_object_section("entity"); + ::encode_json("name", i.first.c_str(), f); + ::encode_json("value", i.second, f); + f->close_section(); + } + f->close_section(); + } + if (!custom_int.empty()) { + f->open_array_section("custom-int"); + for (auto i : custom_int) { + f->open_object_section("entity"); + ::encode_json("name", i.first.c_str(), f); + ::encode_json("value", i.second, f); + f->close_section(); + } + f->close_section(); + } + if (!custom_date.empty()) { + f->open_array_section("custom-date"); + for (auto i : custom_date) { + /* + * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc, + * which will end up with failed sync + */ + real_time t; + int r = parse_time(i.second.c_str(), &t); + if (r < 0) { + ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl; + continue; + } + + string time_str; + rgw_to_iso8601(t, &time_str); + + f->open_object_section("entity"); + ::encode_json("name", i.first.c_str(), f); + ::encode_json("value", time_str.c_str(), f); + f->close_section(); } f->close_section(); } f->close_section(); } +}; + +class RGWElasticInitConfigCBCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + ElasticConfigRef conf; +public: + RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env, + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + conf(_conf) {} + int operate() override { + reenter(this) { + ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl; + yield { + string path = conf->get_index_path(); + + es_index_settings settings(conf->num_replicas, conf->num_shards); + es_index_mappings mappings; + + es_index_config index_conf(settings, mappings); + + call(new RGWPutRESTResourceCR<es_index_config, int>(sync_env->cct, conf->conn.get(), + sync_env->http_manager, + path, nullptr /* params */, + index_conf, nullptr /* result */)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } }; class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { - const ElasticConfig& conf; + ElasticConfigRef conf; + uint64_t versioned_epoch; public: RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {} + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf), + versioned_epoch(_versioned_epoch) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime - << " attrs=" << attrs << dendl; + ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone + << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime + << " attrs=" << attrs << dendl; yield { - string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); - es_obj_metadata doc(sync_env->cct, bucket_info, key, mtime, size, attrs); + string path = conf->get_obj_path(bucket_info, key); + es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch); - call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf.conn, + call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(), sync_env->http_manager, path, nullptr /* params */, doc, nullptr /* result */)); @@ -139,22 +463,22 @@ public: } return 0; } - }; class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { - const ElasticConfig& conf; + ElasticConfigRef conf; + uint64_t versioned_epoch; public: RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), - conf(_conf) { + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + conf(_conf), versioned_epoch(_versioned_epoch) { } ~RGWElasticHandleRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf); + return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch); } }; @@ -163,21 +487,21 @@ class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; - const ElasticConfig& conf; + ElasticConfigRef conf; public: RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, - const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_info(_bucket_info), key(_key), mtime(_mtime), conf(_conf) {} int operate() override { reenter(this) { - ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; + ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone + << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; yield { - string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key); + string path = conf->get_obj_path(bucket_info, key); - call(new RGWDeleteRESTResourceCR(sync_env->cct, conf.conn, + call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), sync_env->http_manager, path, nullptr /* params */)); } @@ -192,49 +516,88 @@ public: }; class RGWElasticDataSyncModule : public RGWDataSyncModule { - ElasticConfig conf; + ElasticConfigRef conf; public: - RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) { - conf.id = string("elastic:") + elastic_endpoint; - conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint }); + RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) { + conf->init(cct, config); } - ~RGWElasticDataSyncModule() override { - delete conf.conn; + ~RGWElasticDataSyncModule() override {} + + void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { + conf->init_instance(sync_env->store->get_realm(), instance_id); } + RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { + ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; + return new RGWElasticInitConfigCBCR(sync_env, conf); + } RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf); + ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; + if (!conf->should_handle_operation(bucket_info)) { + ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + return nullptr; + } + return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch); } RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ - ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + if (!conf->should_handle_operation(bucket_info)) { + ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + return nullptr; + } return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); } RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; return NULL; } -}; + RGWRESTConn *get_rest_conn() { + return conf->conn.get(); + } -class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance { - RGWElasticDataSyncModule data_handler; -public: - RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) : data_handler(cct, endpoint) {} - RGWDataSyncModule *get_data_handler() override { - return &data_handler; + string get_index_path() { + return conf->get_index_path(); } }; -int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) { +RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config) +{ + data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config)); +} + +RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() +{ + return data_handler.get(); +} + +RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn() +{ + return data_handler->get_rest_conn(); +} + +string RGWElasticSyncModuleInstance::get_index_path() { + return data_handler->get_index_path(); +} + +RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { + if (dialect != RGW_REST_S3) { + return orig; + } + delete orig; + return new RGWRESTMgr_MDSearch_S3(); +} + +int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) { string endpoint; auto i = config.find("endpoint"); if (i != config.end()) { endpoint = i->second; } - instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint)); + instance->reset(new RGWElasticSyncModuleInstance(cct, config)); return 0; } diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h index 73c8368571d..43e591e42fc 100644 --- a/src/rgw/rgw_sync_module_es.h +++ b/src/rgw/rgw_sync_module_es.h @@ -9,7 +9,20 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override; +}; + +class RGWElasticDataSyncModule; +class RGWRESTConn; + +class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance { + std::unique_ptr<RGWElasticDataSyncModule> data_handler; +public: + RGWElasticSyncModuleInstance(CephContext *cct, const std::map<std::string, std::string, ltstr_nocase>& config); + RGWDataSyncModule *get_data_handler() override; + RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; + RGWRESTConn *get_rest_conn(); + std::string get_index_path(); }; #endif diff --git a/src/rgw/rgw_sync_module_es_rest.cc b/src/rgw/rgw_sync_module_es_rest.cc new file mode 100644 index 00000000000..200335ffa63 --- /dev/null +++ b/src/rgw/rgw_sync_module_es_rest.cc @@ -0,0 +1,412 @@ +#include "rgw_sync_module_es.h" +#include "rgw_sync_module_es_rest.h" +#include "rgw_es_query.h" +#include "rgw_op.h" +#include "rgw_rest.h" +#include "rgw_rest_s3.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +struct es_index_obj_response { + string bucket; + rgw_obj_key key; + uint64_t versioned_epoch{0}; + ACLOwner owner; + set<string> read_permissions; + + struct { + uint64_t size{0}; + ceph::real_time mtime; + string etag; + string content_type; + map<string, string> custom_str; + map<string, int64_t> custom_int; + map<string, string> custom_date; + + template <class T> + struct _custom_entry { + string name; + T value; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("name", name, obj); + JSONDecoder::decode_json("value", value, obj); + } + }; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("size", size, obj); + string mtime_str; + JSONDecoder::decode_json("mtime", mtime_str, obj); + parse_time(mtime_str.c_str(), &mtime); + JSONDecoder::decode_json("etag", etag, obj); + JSONDecoder::decode_json("content_type", content_type, obj); + list<_custom_entry<string> > str_entries; + JSONDecoder::decode_json("custom-string", str_entries, obj); + for (auto& e : str_entries) { + custom_str[e.name] = e.value; + } + list<_custom_entry<int64_t> > int_entries; + JSONDecoder::decode_json("custom-int", int_entries, obj); + for (auto& e : int_entries) { + custom_int[e.name] = e.value; + } + list<_custom_entry<string> > date_entries; + JSONDecoder::decode_json("custom-date", date_entries, obj); + for (auto& e : date_entries) { + custom_date[e.name] = e.value; + } + } + } meta; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("bucket", bucket, obj); + JSONDecoder::decode_json("name", key.name, obj); + JSONDecoder::decode_json("instance", key.instance, obj); + JSONDecoder::decode_json("versioned_epoch", versioned_epoch, obj); + JSONDecoder::decode_json("permissions", read_permissions, obj); + JSONDecoder::decode_json("owner", owner, obj); + JSONDecoder::decode_json("meta", meta, obj); + } +}; + +struct es_search_response { + uint32_t took; + bool timed_out; + struct { + uint32_t total; + uint32_t successful; + uint32_t failed; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + JSONDecoder::decode_json("successful", successful, obj); + JSONDecoder::decode_json("failed", failed, obj); + } + } shards; + struct obj_hit { + string index; + string type; + string id; + // double score + es_index_obj_response source; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("_index", index, obj); + JSONDecoder::decode_json("_type", type, obj); + JSONDecoder::decode_json("_id", id, obj); + JSONDecoder::decode_json("_source", source, obj); + } + }; + struct { + uint32_t total; + // double max_score; + list<obj_hit> hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + // JSONDecoder::decode_json("max_score", max_score, obj); + JSONDecoder::decode_json("hits", hits, obj); + } + } hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("took", took, obj); + JSONDecoder::decode_json("timed_out", timed_out, obj); + JSONDecoder::decode_json("_shards", shards, obj); + JSONDecoder::decode_json("hits", hits, obj); + } +}; + +class RGWMetadataSearchOp : public RGWOp { + RGWSyncModuleInstanceRef sync_module_ref; + RGWElasticSyncModuleInstance *es_module; +protected: + string expression; + string custom_prefix; +#define MAX_KEYS_DEFAULT 100 + uint64_t max_keys{MAX_KEYS_DEFAULT}; + string marker_str; + uint64_t marker{0}; + string next_marker; + bool is_truncated{false}; + string err; + + es_search_response response; + +public: + RGWMetadataSearchOp(const RGWSyncModuleInstanceRef& sync_module) : sync_module_ref(sync_module) { + es_module = static_cast<RGWElasticSyncModuleInstance *>(sync_module_ref.get()); + } + + int verify_permission() { + return 0; + } + virtual int get_params() = 0; + void pre_exec(); + void execute(); + + virtual void send_response() = 0; + virtual const string name() { return "metadata_search"; } + virtual RGWOpType get_type() { return RGW_OP_METADATA_SEARCH; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; } +}; + +void RGWMetadataSearchOp::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +void RGWMetadataSearchOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) + return; + + list<pair<string, string> > conds; + + if (!s->user->system) { + conds.push_back(make_pair("permissions", s->user->user_id.to_str())); + } + + if (!s->bucket_name.empty()) { + conds.push_back(make_pair("bucket", s->bucket_name)); + } + + ESQueryCompiler es_query(expression, &conds, custom_prefix); + + static map<string, string, ltstr_nocase> aliases = { + { "bucket", "bucket" }, /* forces lowercase */ + { "name", "name" }, + { "key", "name" }, + { "instance", "instance" }, + { "etag", "meta.etag" }, + { "size", "meta.size" }, + { "mtime", "meta.mtime" }, + { "lastmodified", "meta.mtime" }, + { "contenttype", "meta.contenttype" }, + }; + es_query.set_field_aliases(&aliases); + + static map<string, ESEntityTypeMap::EntityType> generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR}, + {"name", ESEntityTypeMap::ES_ENTITY_STR}, + {"instance", ESEntityTypeMap::ES_ENTITY_STR}, + {"permissions", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.contenttype", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE}, + {"meta.size", ESEntityTypeMap::ES_ENTITY_INT} }; + ESEntityTypeMap gm(generic_map); + es_query.set_generic_type_map(&gm); + + static set<string> restricted_fields = { {"permissions"} }; + es_query.set_restricted_fields(&restricted_fields); + + map<string, ESEntityTypeMap::EntityType> custom_map; + for (auto& i : s->bucket_info.mdsearch_config) { + custom_map[i.first] = (ESEntityTypeMap::EntityType)i.second; + } + + ESEntityTypeMap em(custom_map); + es_query.set_custom_type_map(&em); + + bool valid = es_query.compile(&err); + if (!valid) { + ldout(s->cct, 10) << "invalid query, failed generating request json" << dendl; + op_ret = -EINVAL; + return; + } + + JSONFormatter f; + encode_json("root", es_query, &f); + + RGWRESTConn *conn = es_module->get_rest_conn(); + + bufferlist in; + bufferlist out; + + stringstream ss; + + f.flush(ss); + in.append(ss.str()); + + string resource = es_module->get_index_path() + "/_search"; + param_vec_t params; + static constexpr int BUFSIZE = 32; + char buf[BUFSIZE]; + snprintf(buf, sizeof(buf), "%lld", (long long)max_keys); + params.push_back(param_pair_t("size", buf)); + if (marker > 0) { + params.push_back(param_pair_t("from", marker_str.c_str())); + } + ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl; + op_ret = conn->get_resource(resource, ¶ms, nullptr, out, &in); + if (op_ret < 0) { + ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl; + return; + } + + ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl; + + JSONParser jparser; + if (!jparser.parse(out.c_str(), out.length())) { + ldout(s->cct, 0) << "ERROR: failed to parse elasticsearch response" << dendl; + op_ret = -EINVAL; + return; + } + + try { + decode_json_obj(response, &jparser); + } catch (JSONDecoder::err& e) { + ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl; + op_ret = -EINVAL; + return; + } + +} + +class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp { +public: + RGWMetadataSearch_ObjStore_S3(const RGWSyncModuleInstanceRef& _sync_module) : RGWMetadataSearchOp(_sync_module) { + custom_prefix = "x-amz-meta-"; + } + + int get_params() override { + expression = s->info.args.get("query"); + bool exists; + string max_keys_str = s->info.args.get("max-keys", &exists); +#define MAX_KEYS_MAX 10000 + if (exists) { + string err; + max_keys = strict_strtoll(max_keys_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + if (max_keys > MAX_KEYS_MAX) { + max_keys = MAX_KEYS_MAX; + } + } + marker_str = s->info.args.get("marker", &exists); + if (exists) { + string err; + marker = strict_strtoll(marker_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + } + uint64_t nm = marker + max_keys; + static constexpr int BUFSIZE = 32; + char buf[BUFSIZE]; + snprintf(buf, sizeof(buf), "%lld", (long long)nm); + next_marker = buf; + return 0; + } + void send_response() override { + if (op_ret) { + s->err.message = err; + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/xml"); + + if (op_ret < 0) { + return; + } + + is_truncated = (response.hits.hits.size() >= max_keys); + + s->formatter->open_object_section("SearchMetadataResponse"); + s->formatter->dump_string("Marker", marker_str); + s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false")); + if (is_truncated) { + s->formatter->dump_string("NextMarker", next_marker); + } + if (s->format == RGW_FORMAT_JSON) { + s->formatter->open_array_section("Objects"); + } + for (auto& i : response.hits.hits) { + s->formatter->open_object_section("Contents"); + es_index_obj_response& e = i.source; + s->formatter->dump_string("Bucket", e.bucket); + s->formatter->dump_string("Key", e.key.name); + string instance = (!e.key.instance.empty() ? e.key.instance : "null"); + s->formatter->dump_string("Instance", instance.c_str()); + s->formatter->dump_int("VersionedEpoch", e.versioned_epoch); + dump_time(s, "LastModified", &e.meta.mtime); + s->formatter->dump_int("Size", e.meta.size); + s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str()); + s->formatter->dump_string("ContentType", e.meta.content_type.c_str()); + dump_owner(s, e.owner.get_id(), e.owner.get_display_name()); + s->formatter->open_array_section("CustomMetadata"); + for (auto& m : e.meta.custom_str) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_string("Value", m.second); + s->formatter->close_section(); + } + for (auto& m : e.meta.custom_int) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_int("Value", m.second); + s->formatter->close_section(); + } + for (auto& m : e.meta.custom_date) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_string("Value", m.second); + s->formatter->close_section(); + } + s->formatter->close_section(); + rgw_flush_formatter(s, s->formatter); + s->formatter->close_section(); + }; + if (s->format == RGW_FORMAT_JSON) { + s->formatter->close_section(); + } + s->formatter->close_section(); + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + +class RGWHandler_REST_MDSearch_S3 : public RGWHandler_REST_S3 { +protected: + RGWOp *op_get() { + if (s->info.args.exists("query")) { + return new RGWMetadataSearch_ObjStore_S3(store->get_sync_module()); + } + if (!s->init_state.url_bucket.empty() && + s->info.args.exists("mdsearch")) { + return new RGWGetBucketMetaSearch_ObjStore_S3; + } + return nullptr; + } + RGWOp *op_head() { + return nullptr; + } + RGWOp *op_post() { + return nullptr; + } +public: + RGWHandler_REST_MDSearch_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_MDSearch_S3() {} +}; + + +RGWHandler_REST* RGWRESTMgr_MDSearch_S3::get_handler(struct req_state* const s, + const rgw::auth::StrategyRegistry& auth_registry, + const std::string& frontend_prefix) +{ + int ret = + RGWHandler_REST_S3::init_from_header(s, + RGW_FORMAT_XML, true); + if (ret < 0) { + return nullptr; + } + + if (!s->object.empty()) { + return nullptr; + } + + RGWHandler_REST *handler = new RGWHandler_REST_MDSearch_S3(auth_registry); + + ldout(s->cct, 20) << __func__ << " handler=" << typeid(*handler).name() + << dendl; + return handler; +} + diff --git a/src/rgw/rgw_sync_module_es_rest.h b/src/rgw/rgw_sync_module_es_rest.h new file mode 100644 index 00000000000..4dd0698f235 --- /dev/null +++ b/src/rgw/rgw_sync_module_es_rest.h @@ -0,0 +1,17 @@ +#ifndef CEPH_RGW_SYNC_MODULE_ES_REST_H +#define CEPH_RGW_SYNC_MODULE_ES_REST_H + +#include "rgw_rest.h" + +class RGWElasticSyncModuleInstance; + +class RGWRESTMgr_MDSearch_S3 : public RGWRESTMgr { +public: + explicit RGWRESTMgr_MDSearch_S3() {} + + RGWHandler_REST *get_handler(struct req_state* s, + const rgw::auth::StrategyRegistry& auth_registry, + const std::string& frontend_prefix) override; +}; + +#endif diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index f253d707429..67212ec593a 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -64,7 +64,7 @@ public: } }; -int RGWLogSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) { +int RGWLogSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) { string prefix; auto i = config.find("prefix"); if (i != config.end()) { diff --git a/src/rgw/rgw_sync_module_log.h b/src/rgw/rgw_sync_module_log.h index 5b7bae6552d..9afc9108e67 100644 --- a/src/rgw/rgw_sync_module_log.h +++ b/src/rgw/rgw_sync_module_log.h @@ -9,7 +9,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override; }; #endif diff --git a/src/test/rgw/rgw_multi/conn.py b/src/test/rgw/rgw_multi/conn.py new file mode 100644 index 00000000000..1099664df20 --- /dev/null +++ b/src/test/rgw/rgw_multi/conn.py @@ -0,0 +1,16 @@ +import boto +import boto.s3.connection + + +def get_gateway_connection(gateway, credentials): + """ connect to the given gateway """ + if gateway.connection is None: + gateway.connection = boto.connect_s3( + aws_access_key_id = credentials.access_key, + aws_secret_access_key = credentials.secret, + host = gateway.host, + port = gateway.port, + is_secure = False, + calling_format = boto.s3.connection.OrdinaryCallingFormat()) + return gateway.connection + diff --git a/src/test/rgw/rgw_multi/multisite.py b/src/test/rgw/rgw_multi/multisite.py index 2d392cf8893..58bd98224b1 100644 --- a/src/test/rgw/rgw_multi/multisite.py +++ b/src/test/rgw/rgw_multi/multisite.py @@ -2,6 +2,8 @@ from abc import ABCMeta, abstractmethod from cStringIO import StringIO import json +from conn import get_gateway_connection + class Cluster: """ interface to run commands against a distinct ceph cluster """ __metaclass__ = ABCMeta @@ -154,6 +156,40 @@ class Zone(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemO def realm(self): return self.zonegroup.realm() if self.zonegroup else None + def is_read_only(self): + return False + + def tier_type(self): + raise NotImplementedError + + def has_buckets(self): + return True + + def get_conn(self, credentials): + return ZoneConn(self, credentials) # not implemented, but can be used + +class ZoneConn(object): + def __init__(self, zone, credentials): + self.zone = zone + self.name = zone.name + """ connect to the zone's first gateway """ + if isinstance(credentials, list): + self.credentials = credentials[0] + else: + self.credentials = credentials + + if self.zone.gateways is not None: + self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials) + + def get_connection(self): + return self.conn + + def get_bucket(self, bucket_name, credentials): + raise NotImplementedError + + def check_bucket_eq(self, zone, bucket_name): + raise NotImplementedError + class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify): def __init__(self, name, period = None, data = None, zonegroup_id = None, zones = None, master_zone = None): self.name = name @@ -161,6 +197,14 @@ class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, Sy self.zones = zones or [] self.master_zone = master_zone super(ZoneGroup, self).__init__(data, zonegroup_id) + self.rw_zones = [] + self.ro_zones = [] + self.zones_by_type = {} + for z in self.zones: + if z.is_read_only(): + self.ro_zones.append(z) + else: + self.rw_zones.append(z) def zonegroup_arg(self): """ command-line argument to specify this zonegroup """ diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 3633b22a336..fff682d37e4 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -4,6 +4,7 @@ import string import sys import time import logging + try: from itertools import izip_longest as zip_longest except ImportError: @@ -19,6 +20,8 @@ from nose.plugins.skip import SkipTest from .multisite import Zone +from .conn import get_gateway_connection + class Config: """ test configuration """ def __init__(self, **kwargs): @@ -41,6 +44,10 @@ def init_multi(_realm, _user, _config=None): user = _user global config config = _config or Config() + realm_meta_checkpoint(realm) + +def get_realm(): + return realm log = logging.getLogger(__name__) @@ -73,6 +80,15 @@ def mdlog_list(zone, period = None): mdlog_json = mdlog_json.decode('utf-8') return json.loads(mdlog_json) +def meta_sync_status(zone): + while True: + cmd = ['metadata', 'sync', 'status'] + zone.zone_args() + meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True) + if retcode == 0: + break + assert(retcode == 2) # ENOENT + time.sleep(5) + def mdlog_autotrim(zone): zone.cluster.admin(['mdlog', 'autotrim']) @@ -316,7 +332,7 @@ def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, syn return True -def zone_data_checkpoint(target_zone, source_zone): +def zone_data_checkpoint(target_zone, source_zone_conn): if target_zone == source_zone: return @@ -375,22 +391,44 @@ def gen_bucket_name(): num_buckets += 1 return run_prefix + '-' + str(num_buckets) -def check_all_buckets_exist(zone, buckets): - conn = get_zone_connection(zone, user.credentials) +class ZonegroupConns: + def __init__(self, zonegroup): + self.zonegroup = zonegroup + self.zones = [] + self.ro_zones = [] + self.rw_zones = [] + self.master_zone = None + for z in zonegroup.zones: + zone_conn = z.get_conn(user.credentials) + self.zones.append(zone_conn) + if z.is_read_only(): + self.ro_zones.append(zone_conn) + else: + self.rw_zones.append(zone_conn) + + if z == zonegroup.master_zone: + self.master_zone = zone_conn + +def check_all_buckets_exist(zone_conn, buckets): + if not zone_conn.zone.has_buckets(): + return True + for b in buckets: try: - conn.get_bucket(b) + zone_conn.get_bucket(b) except: log.critical('zone %s does not contain bucket %s', zone.name, b) return False return True -def check_all_buckets_dont_exist(zone, buckets): - conn = get_zone_connection(zone, user.credentials) +def check_all_buckets_dont_exist(zone_conn, buckets): + if not zone_conn.zone.has_buckets(): + return True + for b in buckets: try: - conn.get_bucket(b) + zone_conn.get_bucket(b) except: continue @@ -399,78 +437,80 @@ def check_all_buckets_dont_exist(zone, buckets): return True -def create_bucket_per_zone(zonegroup): +def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1): buckets = [] - zone_bucket = {} - for zone in zonegroup.zones: - conn = get_zone_connection(zone, user.credentials) - bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone.name, bucket_name) - bucket = conn.create_bucket(bucket_name) - buckets.append(bucket_name) - zone_bucket[zone] = bucket + zone_bucket = [] + for zone in zonegroup_conns.rw_zones: + for i in xrange(buckets_per_zone): + bucket_name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone.name, bucket_name) + bucket = zone.create_bucket(bucket_name) + buckets.append(bucket_name) + zone_bucket.append((zone, bucket)) return buckets, zone_bucket def create_bucket_per_zone_in_realm(): buckets = [] - zone_bucket = {} + zone_bucket = [] for zonegroup in realm.current_period.zonegroups: - b, z = create_bucket_per_zone(zonegroup) + zg_conn = ZonegroupConns(zonegroup) + b, z = create_bucket_per_zone(zg_conn) buckets.extend(b) - zone_bucket.update(z) + zone_bucket.extend(z) return buckets, zone_bucket def test_bucket_create(): zonegroup = realm.master_zonegroup() - buckets, _ = create_bucket_per_zone(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, _ = create_bucket_per_zone(zonegroup_conns) zonegroup_meta_checkpoint(zonegroup) - for zone in zonegroup.zones: + for zone in zonegroup_conns.zones: assert check_all_buckets_exist(zone, buckets) def test_bucket_recreate(): zonegroup = realm.master_zonegroup() - buckets, _ = create_bucket_per_zone(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, _ = create_bucket_per_zone(zonegroup_conns) zonegroup_meta_checkpoint(zonegroup) - for zone in zonegroup.zones: + + for zone in zonegroup_conns.zones: assert check_all_buckets_exist(zone, buckets) # recreate buckets on all zones, make sure they weren't removed - for zone in zonegroup.zones: + for zone in zonegroup_conns.rw_zones: for bucket_name in buckets: - conn = get_zone_connection(zone, user.credentials) - bucket = conn.create_bucket(bucket_name) + bucket = zone.create_bucket(bucket_name) - for zone in zonegroup.zones: + for zone in zonegroup_conns.zones: assert check_all_buckets_exist(zone, buckets) zonegroup_meta_checkpoint(zonegroup) - for zone in zonegroup.zones: + for zone in zonegroup_conns.zones: assert check_all_buckets_exist(zone, buckets) def test_bucket_remove(): zonegroup = realm.master_zonegroup() - buckets, zone_bucket = create_bucket_per_zone(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) zonegroup_meta_checkpoint(zonegroup) - for zone in zonegroup.zones: + for zone in zonegroup_conns.zones: assert check_all_buckets_exist(zone, buckets) - for zone, bucket_name in zone_bucket.items(): - conn = get_zone_connection(zone, user.credentials) - conn.delete_bucket(bucket_name) + for zone, bucket_name in zone_bucket: + zone.conn.delete_bucket(bucket_name) zonegroup_meta_checkpoint(zonegroup) - for zone in zonegroup.zones: + for zone in zonegroup_conns.zones: assert check_all_buckets_dont_exist(zone, buckets) def get_bucket(zone, bucket_name): - conn = get_zone_connection(zone, user.credentials) - return conn.get_bucket(bucket_name) + return zone.conn.get_bucket(bucket_name) def get_key(zone, bucket_name, obj_name): b = get_bucket(zone, bucket_name) @@ -480,115 +520,67 @@ def new_key(zone, bucket_name, obj_name): b = get_bucket(zone, bucket_name) return b.new_key(obj_name) -def check_object_eq(k1, k2, check_extra = True): - assert k1 - assert k2 - log.debug('comparing key name=%s', k1.name) - eq(k1.name, k2.name) - eq(k1.get_contents_as_string(), k2.get_contents_as_string()) - eq(k1.metadata, k2.metadata) - eq(k1.cache_control, k2.cache_control) - eq(k1.content_type, k2.content_type) - eq(k1.content_encoding, k2.content_encoding) - eq(k1.content_disposition, k2.content_disposition) - eq(k1.content_language, k2.content_language) - eq(k1.etag, k2.etag) - eq(k1.last_modified, k2.last_modified) - if check_extra: - eq(k1.owner.id, k2.owner.id) - eq(k1.owner.display_name, k2.owner.display_name) - eq(k1.storage_class, k2.storage_class) - eq(k1.size, k2.size) - eq(k1.version_id, k2.version_id) - eq(k1.encrypted, k2.encrypted) - -def check_bucket_eq(zone1, zone2, bucket_name): - log.info('comparing bucket=%s zones={%s, %s}', bucket_name, zone1.name, zone2.name) - b1 = get_bucket(zone1, bucket_name) - b2 = get_bucket(zone2, bucket_name) - - log.debug('bucket1 objects:') - for o in b1.get_all_versions(): - log.debug('o=%s', o.name) - log.debug('bucket2 objects:') - for o in b2.get_all_versions(): - log.debug('o=%s', o.name) - - for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): - if k1 is None: - log.critical('key=%s is missing from zone=%s', k2.name, zone1.name) - assert False - if k2 is None: - log.critical('key=%s is missing from zone=%s', k1.name, zone2.name) - assert False - - check_object_eq(k1, k2) - - # now get the keys through a HEAD operation, verify that the available data is the same - k1_head = b1.get_key(k1.name) - k2_head = b2.get_key(k2.name) - - check_object_eq(k1_head, k2_head, False) - - log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, zone1.name, zone2.name) - +def check_bucket_eq(zone_conn1, zone_conn2, bucket): + return zone_conn2.check_bucket_eq(zone_conn1, bucket.name) def test_object_sync(): zonegroup = realm.master_zonegroup() - buckets, zone_bucket = create_bucket_per_zone(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) objnames = [ 'myobj', '_myobj', ':', '&' ] content = 'asdasd' # don't wait for meta sync just yet - for zone, bucket_name in zone_bucket.items(): + for zone, bucket_name in zone_bucket: for objname in objnames: k = new_key(zone, bucket_name, objname) k.set_contents_from_string(content) zonegroup_meta_checkpoint(zonegroup) - for source_zone, bucket in zone_bucket.items(): - for target_zone in zonegroup.zones: - if source_zone == target_zone: + for source_conn, bucket in zone_bucket: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: continue - zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket) + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + check_bucket_eq(source_conn, target_conn, bucket) def test_object_delete(): zonegroup = realm.master_zonegroup() - buckets, zone_bucket = create_bucket_per_zone(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) objname = 'myobj' content = 'asdasd' # don't wait for meta sync just yet - for zone, bucket in zone_bucket.items(): + for zone, bucket in zone_bucket: k = new_key(zone, bucket, objname) k.set_contents_from_string(content) zonegroup_meta_checkpoint(zonegroup) # check object exists - for source_zone, bucket in zone_bucket.items(): - for target_zone in zonegroup.zones: - if source_zone == target_zone: + for source_conn, bucket in zone_bucket: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: continue - zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket) + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + check_bucket_eq(source_conn, target_conn, bucket) # check object removal - for source_zone, bucket in zone_bucket.items(): - k = get_key(source_zone, bucket, objname) + for source_conn, bucket in zone_bucket: + k = get_key(source_conn, bucket, objname) k.delete() - for target_zone in zonegroup.zones: - if source_zone == target_zone: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: continue - zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket) + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + check_bucket_eq(source_conn, target_conn, bucket) def get_latest_object_version(key): for k in key.bucket.list_versions(key.name): @@ -598,28 +590,29 @@ def get_latest_object_version(key): def test_versioned_object_incremental_sync(): zonegroup = realm.master_zonegroup() - buckets, zone_bucket = create_bucket_per_zone(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) # enable versioning - for zone, bucket in zone_bucket.items(): + for _, bucket in zone_bucket: bucket.configure_versioning(True) zonegroup_meta_checkpoint(zonegroup) # upload a dummy object to each bucket and wait for sync. this forces each # bucket to finish a full sync and switch to incremental - for source_zone, bucket in zone_bucket.items(): - new_key(source_zone, bucket, 'dummy').set_contents_from_string('') - for target_zone in zonegroup.zones: - if source_zone == target_zone: + for source_conn, bucket in zone_bucket: + new_key(source_conn, bucket, 'dummy').set_contents_from_string('') + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: continue - zone_bucket_checkpoint(target_zone, source_zone, bucket.name) + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) - for _, bucket in zone_bucket.items(): + for _, bucket in zone_bucket: # create and delete multiple versions of an object from each zone - for zone in zonegroup.zones: - obj = 'obj-' + zone.name - k = new_key(zone, bucket, obj) + for zone_conn in zonegroup_conns.rw_zones: + obj = 'obj-' + zone_conn.name + k = new_key(zone_conn, bucket, obj) k.set_contents_from_string('version1') v = get_latest_object_version(k) @@ -639,16 +632,16 @@ def test_versioned_object_incremental_sync(): log.debug('version3 id=%s', v.version_id) k.bucket.delete_key(obj, version_id=v.version_id) - for source_zone, bucket in zone_bucket.items(): - for target_zone in zonegroup.zones: - if source_zone == target_zone: + for source_conn, bucket in zone_bucket: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: continue - zone_bucket_checkpoint(target_zone, source_zone, bucket.name) - check_bucket_eq(source_zone, target_zone, bucket) + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + check_bucket_eq(source_conn, target_conn, bucket) def test_bucket_versioning(): buckets, zone_bucket = create_bucket_per_zone_in_realm() - for zone, bucket in zone_bucket.items(): + for _, bucket in zone_bucket: bucket.configure_versioning(True) res = bucket.get_versioning_status() key = 'Versioning' @@ -656,19 +649,20 @@ def test_bucket_versioning(): def test_bucket_acl(): buckets, zone_bucket = create_bucket_per_zone_in_realm() - for zone, bucket in zone_bucket.items(): + for _, bucket in zone_bucket: assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner bucket.set_acl('public-read') assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers def test_bucket_delete_notempty(): zonegroup = realm.master_zonegroup() - buckets, zone_bucket = create_bucket_per_zone(zonegroup) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) zonegroup_meta_checkpoint(zonegroup) - for zone, bucket_name in zone_bucket.items(): + for zone_conn, bucket_name in zone_bucket: # upload an object to each bucket on its own zone - conn = get_zone_connection(zone, user.credentials) + conn = zone_conn.get_connection() bucket = conn.get_bucket(bucket_name) k = bucket.new_key('foo') k.set_contents_from_string('bar') @@ -681,8 +675,8 @@ def test_bucket_delete_notempty(): assert False # expected 409 BucketNotEmpty # assert that each bucket still exists on the master - c1 = get_zone_connection(zonegroup.master_zone, user.credentials) - for _, bucket_name in zone_bucket.items(): + c1 = zonegroup_conns.master_zone.conn + for _, bucket_name in zone_bucket: assert c1.get_bucket(bucket_name) def test_multi_period_incremental_sync(): @@ -694,13 +688,8 @@ def test_multi_period_incremental_sync(): mdlog_periods = [realm.current_period.id] # create a bucket in each zone - buckets = [] - for zone in zonegroup.zones: - conn = get_zone_connection(zone, user.credentials) - bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone.name, bucket_name) - bucket = conn.create_bucket(bucket_name) - buckets.append(bucket_name) + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) zonegroup_meta_checkpoint(zonegroup) @@ -714,14 +703,12 @@ def test_multi_period_incremental_sync(): set_master_zone(z2) mdlog_periods += [realm.current_period.id] - # create another bucket in each zone, except for z3 - for zone in zonegroup.zones: - if zone == z3: + for zone_conn, _ in zone_bucket: + if zone_conn.zone == z3: continue - conn = get_zone_connection(zone, user.credentials) bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone.name, bucket_name) - bucket = conn.create_bucket(bucket_name) + log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) + bucket = zone_conn.conn.create_bucket(bucket_name) buckets.append(bucket_name) # wait for zone 1 to sync @@ -731,24 +718,26 @@ def test_multi_period_incremental_sync(): set_master_zone(z1) mdlog_periods += [realm.current_period.id] - # create another bucket in each zone, except for z3 - for zone in zonegroup.zones: - if zone == z3: + for zone_conn, bucket_name in zone_bucket: + if zone_conn.zone == z3: continue - conn = get_zone_connection(zone, user.credentials) bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone.name, bucket_name) - bucket = conn.create_bucket(bucket_name) + log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) + bucket = zone_conn.conn.create_bucket(bucket_name) buckets.append(bucket_name) # restart zone 3 gateway and wait for sync z3.start() zonegroup_meta_checkpoint(zonegroup) - # verify that we end up with the same buckets + # verify that we end up with the same objects for bucket_name in buckets: - for source_zone, target_zone in combinations(zonegroup.zones, 2): - check_bucket_eq(source_zone, target_zone, bucket_name) + for source_conn, _ in zone_bucket: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + + target_conn.check_bucket_eq(source_conn, bucket_name) # verify that mdlogs are not empty and match for each period for period in mdlog_periods: @@ -777,6 +766,7 @@ def test_multi_period_incremental_sync(): def test_zonegroup_remove(): zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) if len(zonegroup.zones) < 2: raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.") diff --git a/src/test/rgw/rgw_multi/tests_es.py b/src/test/rgw/rgw_multi/tests_es.py new file mode 100644 index 00000000000..bc9f67b73a0 --- /dev/null +++ b/src/test/rgw/rgw_multi/tests_es.py @@ -0,0 +1,274 @@ +import json +import logging + +import boto +import boto.s3.connection + +import datetime +import dateutil + +from nose.tools import eq_ as eq + +from rgw_multi.multisite import * +from rgw_multi.tests import * +from rgw_multi.zone_es import * + +log = logging.getLogger(__name__) + + +def check_es_configured(): + realm = get_realm() + zonegroup = realm.master_zonegroup() + + es_zones = zonegroup.zones_by_type.get("elasticsearch") + if not es_zones: + raise SkipTest("Requires at least one ES zone") + +def is_es_zone(zone_conn): + if not zone_conn: + return False + + return zone_conn.zone.tier_type() == "elasticsearch" + +def verify_search(bucket_name, src_keys, result_keys, f): + check_keys = [] + for k in src_keys: + if bucket_name: + if bucket_name != k.bucket.name: + continue + if f(k): + check_keys.append(k) + check_keys.sort(key = lambda l: (l.bucket.name, l.name, l.version_id)) + + log.debug('check keys:' + dump_json(check_keys)) + log.debug('result keys:' + dump_json(result_keys)) + + for k1, k2 in zip_longest(check_keys, result_keys): + assert k1 + assert k2 + check_object_eq(k1, k2) + +def do_check_mdsearch(conn, bucket, src_keys, req_str, src_filter): + if bucket: + bucket_name = bucket.name + else: + bucket_name = '' + req = MDSearch(conn, bucket_name, req_str) + result_keys = req.search(sort_key = lambda k: (k.bucket.name, k.name, k.version_id)) + verify_search(bucket_name, src_keys, result_keys, src_filter) + +def init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = None): + check_es_configured() + + realm = get_realm() + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns, buckets_per_zone = buckets_per_zone) + + if bucket_init_cb: + for zone_conn, bucket in zone_bucket: + bucket_init_cb(zone_conn, bucket) + + src_keys = [] + + owner = None + + obj_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) + + # don't wait for meta sync just yet + for zone, bucket in zone_bucket: + for count in xrange(0, num_keys): + objname = obj_prefix + str(count) + k = new_key(zone, bucket.name, objname) + # k.set_contents_from_string(content + 'x' * count) + if not create_obj: + continue + + create_obj(k, count) + + if not owner: + for list_key in bucket.list_versions(): + owner = list_key.owner + break + + k = bucket.get_key(k.name, version_id = k.version_id) + k.owner = owner # owner is not set when doing get_key() + + src_keys.append(k) + + zonegroup_meta_checkpoint(zonegroup) + + sources = [] + targets = [] + for target_conn in zonegroup_conns.zones: + if not is_es_zone(target_conn): + sources.append(target_conn) + continue + + targets.append(target_conn) + + buckets = [] + # make sure all targets are synced + for source_conn, bucket in zone_bucket: + buckets.append(bucket) + for target_conn in targets: + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + + return targets, sources, buckets, src_keys + +def test_es_object_search(): + min_size = 10 + content = 'a' * min_size + + def create_obj(k, i): + k.set_contents_from_string(content + 'x' * i) + + targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 2) + + for target_conn in targets: + + # bucket checks + for bucket in buckets: + # check name + do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name) + + # check on all buckets + for key in src_keys: + # limiting to checking specific key name, otherwise could get results from + # other runs / tests + do_check_mdsearch(target_conn.conn, None, src_keys , 'name == ' + key.name, lambda k: k.name == key.name) + + # check on specific bucket + for bucket in buckets: + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name < ' + key.name, lambda k: k.name < key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name <= ' + key.name, lambda k: k.name <= key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + key.name, lambda k: k.name == key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name >= ' + key.name, lambda k: k.name >= key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name > ' + key.name, lambda k: k.name > key.name) + + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + src_keys[0].name + ' or name >= ' + src_keys[2].name, + lambda k: k.name == src_keys[0].name or k.name >= src_keys[2].name) + + # check etag + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag < ' + key.etag[1:-1], lambda k: k.etag < key.etag) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag == ' + key.etag[1:-1], lambda k: k.etag == key.etag) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag > ' + key.etag[1:-1], lambda k: k.etag > key.etag) + + # check size + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size < ' + str(key.size), lambda k: k.size < key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size <= ' + str(key.size), lambda k: k.size <= key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size == ' + str(key.size), lambda k: k.size == key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size >= ' + str(key.size), lambda k: k.size >= key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size > ' + str(key.size), lambda k: k.size > key.size) + +def date_from_str(s): + return dateutil.parser.parse(s) + +def test_es_object_search_custom(): + min_size = 10 + content = 'a' * min_size + + def bucket_init(zone_conn, bucket): + req = MDSearchConfig(zone_conn.conn, bucket.name) + req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date') + + def create_obj(k, i): + date = datetime.datetime.now() + datetime.timedelta(seconds=1) * i + date_str = date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + k.set_contents_from_string(content + 'x' * i, headers = { 'X-Amz-Meta-Foo-Str': str(i * 5), + 'X-Amz-Meta-Foo-Int': str(i * 5), + 'X-Amz-Meta-Foo-Date': date_str}) + + targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init) + + + for target_conn in targets: + + # bucket checks + for bucket in buckets: + str_vals = [] + for key in src_keys: + # check string values + val = key.get_metadata('foo-str') + str_vals.append(val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str < ' + val, lambda k: k.get_metadata('foo-str') < val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + val, lambda k: k.get_metadata('foo-str') <= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str == ' + val, lambda k: k.get_metadata('foo-str') == val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + val, lambda k: k.get_metadata('foo-str') >= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str > ' + val, lambda k: k.get_metadata('foo-str') > val) + + # check int values + sval = key.get_metadata('foo-int') + val = int(sval) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int < ' + sval, lambda k: int(k.get_metadata('foo-int')) < val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int <= ' + sval, lambda k: int(k.get_metadata('foo-int')) <= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int == ' + sval, lambda k: int(k.get_metadata('foo-int')) == val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int >= ' + sval, lambda k: int(k.get_metadata('foo-int')) >= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int > ' + sval, lambda k: int(k.get_metadata('foo-int')) > val) + + # check int values + sval = key.get_metadata('foo-date') + val = date_from_str(sval) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date < ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) < val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date <= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) <= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date == ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) == val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date >= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) >= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date > ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) > val) + + # 'or' query + for i in xrange(len(src_keys) / 2): + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + str_vals[i] + ' or x-amz-meta-foo-str >= ' + str_vals[-i], + lambda k: k.get_metadata('foo-str') <= str_vals[i] or k.get_metadata('foo-str') >= str_vals[-i] ) + + # 'and' query + for i in xrange(len(src_keys) / 2): + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + str_vals[i] + ' and x-amz-meta-foo-str <= ' + str_vals[i + 1], + lambda k: k.get_metadata('foo-str') >= str_vals[i] and k.get_metadata('foo-str') <= str_vals[i + 1] ) + # more complicated query + for i in xrange(len(src_keys) / 2): + do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name + ' and x-amz-meta-foo-str >= ' + str_vals[i] + + ' and (x-amz-meta-foo-str <= ' + str_vals[i + 1] + ')', + lambda k: k.bucket.name == bucket.name and (k.get_metadata('foo-str') >= str_vals[i] and + k.get_metadata('foo-str') <= str_vals[i + 1]) ) + +def test_es_bucket_conf(): + min_size = 0 + + def bucket_init(zone_conn, bucket): + req = MDSearchConfig(zone_conn.conn, bucket.name) + req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date') + + targets, sources, buckets, _ = init_env(None, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init) + + for source_conn in sources: + for bucket in buckets: + req = MDSearchConfig(source_conn.conn, bucket.name) + conf = req.get_config() + + d = {} + + for entry in conf: + d[entry['Key']] = entry['Type'] + + eq(len(d), 3) + eq(d['x-amz-meta-foo-str'], 'str') + eq(d['x-amz-meta-foo-int'], 'int') + eq(d['x-amz-meta-foo-date'], 'date') + + req.del_config() + + conf = req.get_config() + + eq(len(conf), 0) + + break # no need to iterate over all zones diff --git a/src/test/rgw/rgw_multi/tools.py b/src/test/rgw/rgw_multi/tools.py new file mode 100644 index 00000000000..da32516435e --- /dev/null +++ b/src/test/rgw/rgw_multi/tools.py @@ -0,0 +1,82 @@ +import json +import boto + +def append_attr_value(d, attr, attrv): + if attrv and len(str(attrv)) > 0: + d[attr] = attrv + +def append_attr(d, k, attr): + try: + attrv = getattr(k, attr) + except: + return + append_attr_value(d, attr, attrv) + +def get_attrs(k, attrs): + d = {} + for a in attrs: + append_attr(d, k, a) + + return d + +def append_query_arg(s, n, v): + if not v: + return s + nv = '{n}={v}'.format(n=n, v=v) + if not s: + return nv + return '{s}&{nv}'.format(s=s, nv=nv) + +class KeyJSONEncoder(boto.s3.key.Key): + @staticmethod + def default(k, versioned=False): + attrs = ['bucket', 'name', 'size', 'last_modified', 'metadata', 'cache_control', + 'content_type', 'content_disposition', 'content_language', + 'owner', 'storage_class', 'md5', 'version_id', 'encrypted', + 'delete_marker', 'expiry_date', 'VersionedEpoch', 'RgwxTag'] + d = get_attrs(k, attrs) + d['etag'] = k.etag[1:-1] + if versioned: + d['is_latest'] = k.is_latest + return d + +class DeleteMarkerJSONEncoder(boto.s3.key.Key): + @staticmethod + def default(k): + attrs = ['name', 'version_id', 'last_modified', 'owner'] + d = get_attrs(k, attrs) + d['delete_marker'] = True + d['is_latest'] = k.is_latest + return d + +class UserJSONEncoder(boto.s3.user.User): + @staticmethod + def default(k): + attrs = ['id', 'display_name'] + return get_attrs(k, attrs) + +class BucketJSONEncoder(boto.s3.bucket.Bucket): + @staticmethod + def default(k): + attrs = ['name', 'creation_date'] + return get_attrs(k, attrs) + +class BotoJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, boto.s3.key.Key): + return KeyJSONEncoder.default(obj) + if isinstance(obj, boto.s3.deletemarker.DeleteMarker): + return DeleteMarkerJSONEncoder.default(obj) + if isinstance(obj, boto.s3.user.User): + return UserJSONEncoder.default(obj) + if isinstance(obj, boto.s3.prefix.Prefix): + return (lambda x: {'prefix': x.name})(obj) + if isinstance(obj, boto.s3.bucket.Bucket): + return BucketJSONEncoder.default(obj) + return json.JSONEncoder.default(self, obj) + + +def dump_json(o, cls=BotoJSONEncoder): + return json.dumps(o, cls=cls, indent=4) + + diff --git a/src/test/rgw/rgw_multi/zone_es.py b/src/test/rgw/rgw_multi/zone_es.py new file mode 100644 index 00000000000..dccab4093d1 --- /dev/null +++ b/src/test/rgw/rgw_multi/zone_es.py @@ -0,0 +1,256 @@ +import json +import requests.compat +import logging + +import boto +import boto.s3.connection + +import dateutil.parser + +from nose.tools import eq_ as eq +try: + from itertools import izip_longest as zip_longest +except ImportError: + from itertools import zip_longest + +from .multisite import * +from .tools import * + +log = logging.getLogger(__name__) + +def get_key_ver(k): + if not k.version_id: + return 'null' + return k.version_id + +def check_object_eq(k1, k2, check_extra = True): + assert k1 + assert k2 + log.debug('comparing key name=%s', k1.name) + eq(k1.name, k2.name) + eq(k1.metadata, k2.metadata) + # eq(k1.cache_control, k2.cache_control) + eq(k1.content_type, k2.content_type) + # eq(k1.content_encoding, k2.content_encoding) + # eq(k1.content_disposition, k2.content_disposition) + # eq(k1.content_language, k2.content_language) + eq(k1.etag, k2.etag) + mtime1 = dateutil.parser.parse(k1.last_modified) + mtime2 = dateutil.parser.parse(k2.last_modified) + assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution + if check_extra: + eq(k1.owner.id, k2.owner.id) + eq(k1.owner.display_name, k2.owner.display_name) + # eq(k1.storage_class, k2.storage_class) + eq(k1.size, k2.size) + eq(get_key_ver(k1), get_key_ver(k2)) + # eq(k1.encrypted, k2.encrypted) + +def make_request(conn, method, bucket, key, query_args, headers): + result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers) + if result.status / 100 != 2: + raise boto.exception.S3ResponseError(result.status, result.reason, result.read()) + return result + +def append_query_arg(s, n, v): + if not v: + return s + nv = '{n}={v}'.format(n=n, v=v) + if not s: + return nv + return '{s}&{nv}'.format(s=s, nv=nv) + +class MDSearch: + def __init__(self, conn, bucket_name, query, query_args = None, marker = None): + self.conn = conn + self.bucket_name = bucket_name or '' + if bucket_name: + self.bucket = boto.s3.bucket.Bucket(name=bucket_name) + else: + self.bucket = None + self.query = query + self.query_args = query_args + self.max_keys = None + self.marker = marker + + def raw_search(self): + q = self.query or '' + query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q)) + if self.max_keys is not None: + query_args = append_query_arg(query_args, 'max-keys', self.max_keys) + if self.marker: + query_args = append_query_arg(query_args, 'marker', self.marker) + + query_args = append_query_arg(query_args, 'format', 'json') + + headers = {} + + result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers) + + l = [] + + result_dict = json.loads(result.read()) + + for entry in result_dict['Objects']: + bucket = self.conn.get_bucket(entry['Bucket'], validate = False) + k = boto.s3.key.Key(bucket, entry['Key']) + + k.version_id = entry['Instance'] + k.etag = entry['ETag'] + k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName']) + k.last_modified = entry['LastModified'] + k.size = entry['Size'] + k.content_type = entry['ContentType'] + k.versioned_epoch = entry['VersionedEpoch'] + + k.metadata = {} + for e in entry['CustomMetadata']: + k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response + + l.append(k) + + return result_dict, l + + def search(self, drain = True, sort = True, sort_key = None): + l = [] + + is_done = False + + while not is_done: + result, result_keys = self.raw_search() + + l = l + result_keys + + is_done = not (drain and (result['IsTruncated'] == "true")) + marker = result['Marker'] + + if sort: + if not sort_key: + sort_key = lambda k: (k.name, -k.versioned_epoch) + l.sort(key = sort_key) + + return l + + +class MDSearchConfig: + def __init__(self, conn, bucket_name): + self.conn = conn + self.bucket_name = bucket_name or '' + if bucket_name: + self.bucket = boto.s3.bucket.Bucket(name=bucket_name) + else: + self.bucket = None + + def send_request(self, conf, method): + query_args = 'mdsearch' + headers = None + if conf: + headers = { 'X-Amz-Meta-Search': conf } + + query_args = append_query_arg(query_args, 'format', 'json') + + return make_request(self.conn, method, bucket=self.bucket_name, key='', query_args=query_args, headers=headers) + + def get_config(self): + result = self.send_request(None, 'GET') + return json.loads(result.read()) + + def set_config(self, conf): + self.send_request(conf, 'POST') + + def del_config(self): + self.send_request(None, 'DELETE') + + +class ESZoneBucket: + def __init__(self, zone_conn, name, conn): + self.zone_conn = zone_conn + self.name = name + self.conn = conn + + self.bucket = boto.s3.bucket.Bucket(name=name) + + def get_all_versions(self): + + marker = None + is_done = False + + req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker) + + for k in req.search(): + yield k + + + + +class ESZone(Zone): + def __init__(self, name, es_endpoint, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): + self.es_endpoint = es_endpoint + super(ESZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) + + def is_read_only(self): + return True + + def tier_type(self): + return "elasticsearch" + + def create(self, cluster, args = None, check_retcode = True): + """ create the object with the given arguments """ + + if args is None: + args = '' + + tier_config = ','.join([ 'endpoint=' + self.es_endpoint, 'explicit_custom_meta=false' ]) + + args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] + + return self.json_command(cluster, 'create', args, check_retcode=check_retcode) + + def has_buckets(self): + return False + + class Conn(ZoneConn): + def __init__(self, zone, credentials): + super(ESZone.Conn, self).__init__(zone, credentials) + + def get_bucket(self, bucket_name): + return ESZoneBucket(self, bucket_name, self.conn) + + def create_bucket(self, name): + # should not be here, a bug in the test suite + log.critical('Conn.create_bucket() should not be called in ES zone') + assert False + + def check_bucket_eq(self, zone_conn, bucket_name): + assert(zone_conn.zone.tier_type() == "rados") + + log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name) + b1 = self.get_bucket(bucket_name) + b2 = zone_conn.get_bucket(bucket_name) + + log.debug('bucket1 objects:') + for o in b1.get_all_versions(): + log.debug('o=%s', o.name) + log.debug('bucket2 objects:') + for o in b2.get_all_versions(): + log.debug('o=%s', o.name) + + for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): + if k1 is None: + log.critical('key=%s is missing from zone=%s', k2.name, self.name) + assert False + if k2 is None: + log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) + assert False + + check_object_eq(k1, k2) + + + log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + + return True + + def get_conn(self, credentials): + return self.Conn(self, credentials) + + diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py new file mode 100644 index 00000000000..8fae22ea8a9 --- /dev/null +++ b/src/test/rgw/rgw_multi/zone_rados.py @@ -0,0 +1,89 @@ +import logging + +try: + from itertools import izip_longest as zip_longest +except ImportError: + from itertools import zip_longest + +from nose.tools import eq_ as eq + +from .multisite import * + +log = logging.getLogger(__name__) + +def check_object_eq(k1, k2, check_extra = True): + assert k1 + assert k2 + log.debug('comparing key name=%s', k1.name) + eq(k1.name, k2.name) + eq(k1.get_contents_as_string(), k2.get_contents_as_string()) + eq(k1.metadata, k2.metadata) + eq(k1.cache_control, k2.cache_control) + eq(k1.content_type, k2.content_type) + eq(k1.content_encoding, k2.content_encoding) + eq(k1.content_disposition, k2.content_disposition) + eq(k1.content_language, k2.content_language) + eq(k1.etag, k2.etag) + eq(k1.last_modified, k2.last_modified) + if check_extra: + eq(k1.owner.id, k2.owner.id) + eq(k1.owner.display_name, k2.owner.display_name) + eq(k1.storage_class, k2.storage_class) + eq(k1.size, k2.size) + eq(k1.version_id, k2.version_id) + eq(k1.encrypted, k2.encrypted) + + +class RadosZone(Zone): + def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): + super(RadosZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) + + def tier_type(self): + return "rados" + + + class Conn(ZoneConn): + def __init__(self, zone, credentials): + super(RadosZone.Conn, self).__init__(zone, credentials) + + def get_bucket(self, name): + return self.conn.get_bucket(name) + + def create_bucket(self, name): + return self.conn.create_bucket(name) + + def check_bucket_eq(self, zone_conn, bucket_name): + log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + b1 = self.get_bucket(bucket_name) + b2 = zone_conn.get_bucket(bucket_name) + + log.debug('bucket1 objects:') + for o in b1.get_all_versions(): + log.debug('o=%s', o.name) + log.debug('bucket2 objects:') + for o in b2.get_all_versions(): + log.debug('o=%s', o.name) + + for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): + if k1 is None: + log.critical('key=%s is missing from zone=%s', k2.name, self.name) + assert False + if k2 is None: + log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) + assert False + + check_object_eq(k1, k2) + + # now get the keys through a HEAD operation, verify that the available data is the same + k1_head = b1.get_key(k1.name) + k2_head = b2.get_key(k2.name) + + check_object_eq(k1_head, k2_head, False) + + log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + + return True + + def get_conn(self, credentials): + return self.Conn(self, credentials) + diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py index e380acc9903..21154585ceb 100644 --- a/src/test/rgw/test_multi.py +++ b/src/test/rgw/test_multi.py @@ -13,8 +13,12 @@ except ImportError: import nose.core from rgw_multi import multisite +from rgw_multi.zone_rados import RadosZone as RadosZone +from rgw_multi.zone_es import ESZone as ESZone + # make tests from rgw_multi.tests available to nose from rgw_multi.tests import * +from rgw_multi.tests_es import * mstart_path = os.getenv('MSTART_PATH') if mstart_path is None: @@ -146,6 +150,7 @@ def init(parse_args): cfg = configparser.RawConfigParser({ 'num_zonegroups': 1, 'num_zones': 3, + 'num_es_zones': 0, 'gateways_per_zone': 2, 'no_bootstrap': 'false', 'log_level': 20, @@ -155,6 +160,7 @@ def init(parse_args): 'checkpoint_retries': 60, 'checkpoint_delay': 5, 'reconfigure_delay': 5, + 'es_endpoint': None, }) try: path = os.environ['RGW_MULTI_TEST_CONF'] @@ -175,6 +181,7 @@ def init(parse_args): section = 'DEFAULT' parser.add_argument('--num-zonegroups', type=int, default=cfg.getint(section, 'num_zonegroups')) parser.add_argument('--num-zones', type=int, default=cfg.getint(section, 'num_zones')) + parser.add_argument('--num-es-zones', type=int, default=cfg.getint(section, 'num_es_zones')) parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone')) parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap')) parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level')) @@ -184,6 +191,7 @@ def init(parse_args): parser.add_argument('--checkpoint-retries', type=int, default=cfg.getint(section, 'checkpoint_retries')) parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay')) parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay')) + parser.add_argument('--es-endpoint', type=str, default=cfg.get(section, 'es_endpoint')) argv = [] @@ -193,6 +201,9 @@ def init(parse_args): args = parser.parse_args(argv) bootstrap = not args.no_bootstrap + # if num_es_zones is defined, need to have es_endpoint defined too + assert(args.num_es_zones == 0 or args.es_endpoint) + setup_logging(args.log_level, args.log_file, args.file_log_level) # start first cluster @@ -217,6 +228,8 @@ def init(parse_args): period = multisite.Period(realm=realm) realm.current_period = period + num_zones = args.num_zones + args.num_es_zones + for zg in range(0, args.num_zonegroups): zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period) period.zonegroups.append(zonegroup) @@ -225,7 +238,7 @@ def init(parse_args): if is_master_zg: period.master_zonegroup = zonegroup - for z in range(0, args.num_zones): + for z in range(0, num_zones): is_master = z == 0 # start a cluster, or use c1 for first zone cluster = None @@ -253,8 +266,15 @@ def init(parse_args): else: zonegroup.get(cluster) + es_zone = (z >= args.num_zones) + # create the zone in its zonegroup zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster) + if es_zone: + zone = ESZone(zone_name(zg, z), args.es_endpoint, zonegroup, cluster) + else: + zone = RadosZone(zone_name(zg, z), zonegroup, cluster) + if bootstrap: arg = admin_creds.credential_args() if is_master: @@ -268,6 +288,13 @@ def init(parse_args): if is_master: zonegroup.master_zone = zone + zonegroup.zones_by_type.setdefault(zone.tier_type(), []).append(zone) + + if zone.is_read_only(): + zonegroup.ro_zones.append(zone) + else: + zonegroup.rw_zones.append(zone) + # update/commit the period if bootstrap: period.update(zone, commit=True) |