summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--alpine/APKBUILD.in2
-rw-r--r--ceph.spec.in1
-rw-r--r--debian/radosgw.install1
-rw-r--r--qa/tasks/rgw_multisite.py10
-rw-r--r--src/common/ceph_json.h10
-rw-r--r--src/rgw/CMakeLists.txt13
-rw-r--r--src/rgw/rgw_acl.h1
-rw-r--r--src/rgw/rgw_admin.cc7
-rw-r--r--src/rgw/rgw_common.cc2
-rw-r--r--src/rgw/rgw_common.h21
-rw-r--r--src/rgw/rgw_data_sync.cc28
-rw-r--r--src/rgw/rgw_data_sync.h14
-rw-r--r--src/rgw/rgw_env.cc33
-rw-r--r--src/rgw/rgw_es_main.cc77
-rw-r--r--src/rgw/rgw_es_query.cc660
-rw-r--r--src/rgw/rgw_es_query.h162
-rw-r--r--src/rgw/rgw_http_client.cc8
-rw-r--r--src/rgw/rgw_http_client.h6
-rw-r--r--src/rgw/rgw_json_enc.cc9
-rw-r--r--src/rgw/rgw_main.cc12
-rw-r--r--src/rgw/rgw_op.cc73
-rw-r--r--src/rgw/rgw_op.h48
-rw-r--r--src/rgw/rgw_rados.h2
-rw-r--r--src/rgw/rgw_rest_client.cc27
-rw-r--r--src/rgw/rgw_rest_client.h8
-rw-r--r--src/rgw/rgw_rest_conn.cc31
-rw-r--r--src/rgw/rgw_rest_conn.h16
-rw-r--r--src/rgw/rgw_rest_s3.cc128
-rw-r--r--src/rgw/rgw_rest_s3.h24
-rw-r--r--src/rgw/rgw_sync.cc2
-rw-r--r--src/rgw/rgw_sync_module.h15
-rw-r--r--src/rgw/rgw_sync_module_es.cc465
-rw-r--r--src/rgw/rgw_sync_module_es.h15
-rw-r--r--src/rgw/rgw_sync_module_es_rest.cc412
-rw-r--r--src/rgw/rgw_sync_module_es_rest.h17
-rw-r--r--src/rgw/rgw_sync_module_log.cc2
-rw-r--r--src/rgw/rgw_sync_module_log.h2
-rw-r--r--src/test/rgw/rgw_multi/conn.py16
-rw-r--r--src/test/rgw/rgw_multi/multisite.py44
-rw-r--r--src/test/rgw/rgw_multi/tests.py296
-rw-r--r--src/test/rgw/rgw_multi/tests_es.py274
-rw-r--r--src/test/rgw/rgw_multi/tools.py82
-rw-r--r--src/test/rgw/rgw_multi/zone_es.py256
-rw-r--r--src/test/rgw/rgw_multi/zone_rados.py89
-rw-r--r--src/test/rgw/test_multi.py29
45 files changed, 3168 insertions, 282 deletions
diff --git a/alpine/APKBUILD.in b/alpine/APKBUILD.in
index f1ab9aa27d6..51245ea5a1f 100644
--- a/alpine/APKBUILD.in
+++ b/alpine/APKBUILD.in
@@ -292,7 +292,7 @@ radosgw() {
pkgdesc="Rados REST gateway which implements Amazon's S3 and OpenStack's Swift APIs."
depends="ceph-common"
- _pkg $_bindir radosgw radosgw-admin radosgw-token radosgw-object-expirer
+ _pkg $_bindir radosgw radosgw-admin radosgw-token radosgw-es radosgw-object-expirer
mkdir -p $subpkgdir$_localstatedir/lib/ceph/radosgw
}
diff --git a/ceph.spec.in b/ceph.spec.in
index ce517b3a785..592aa6e0c1a 100644
--- a/ceph.spec.in
+++ b/ceph.spec.in
@@ -1375,6 +1375,7 @@ fi
%defattr(-,root,root,-)
%{_bindir}/radosgw
%{_bindir}/radosgw-token
+%{_bindir}/radosgw-es
%{_bindir}/radosgw-object-expirer
%{_mandir}/man8/radosgw.8*
%dir %{_localstatedir}/lib/ceph/radosgw
diff --git a/debian/radosgw.install b/debian/radosgw.install
index 192f2329bfa..9f2c8debb32 100644
--- a/debian/radosgw.install
+++ b/debian/radosgw.install
@@ -1,4 +1,5 @@
usr/bin/radosgw
usr/bin/radosgw-token
+usr/bin/radosgw-es
usr/bin/radosgw-object-expirer
usr/share/man/man8/radosgw.8
diff --git a/qa/tasks/rgw_multisite.py b/qa/tasks/rgw_multisite.py
index 0cab6249325..cd120c44140 100644
--- a/qa/tasks/rgw_multisite.py
+++ b/qa/tasks/rgw_multisite.py
@@ -10,6 +10,7 @@ from copy import deepcopy
from util.rgw import rgwadmin, wait_for_radosgw
from util.rados import create_ec_pool, create_replicated_pool
from rgw_multi import multisite
+from rgw_multi.zone_rados import RadosZone as RadosZone
from teuthology.orchestra import run
from teuthology import misc
@@ -366,6 +367,7 @@ def create_zonegroup(cluster, gateways, period, config):
def create_zone(ctx, cluster, gateways, creds, zonegroup, config):
""" create a zone with the given configuration """
zone = multisite.Zone(config['name'], zonegroup, cluster)
+ zone = RadosZone(config['name'], zonegroup, cluster)
# collect Gateways for the zone's endpoints
endpoints = config.get('endpoints')
@@ -389,6 +391,14 @@ def create_zone(ctx, cluster, gateways, creds, zonegroup, config):
create_zone_pools(ctx, zone)
if ctx.rgw.compression_type:
configure_zone_compression(zone, ctx.rgw.compression_type)
+
+ zonegroup.zones_by_type.setdefault(zone.tier_type(), []).append(zone)
+
+ if zone.is_read_only():
+ zonegroup.ro_zones.append(zone)
+ else:
+ zonegroup.rw_zones.append(zone)
+
return zone
def create_zone_pools(ctx, zone):
diff --git a/src/common/ceph_json.h b/src/common/ceph_json.h
index 873dcd1afe5..cd868a88f4e 100644
--- a/src/common/ceph_json.h
+++ b/src/common/ceph_json.h
@@ -207,8 +207,8 @@ void decode_json_obj(vector<T>& l, JSONObj *obj)
}
}
-template<class K, class V>
-void decode_json_obj(map<K, V>& m, JSONObj *obj)
+template<class K, class V, class C = std::less<K> >
+void decode_json_obj(map<K, V, C>& m, JSONObj *obj)
{
m.clear();
@@ -381,11 +381,11 @@ static void encode_json(const char *name, const std::vector<T>& l, ceph::Formatt
f->close_section();
}
-template<class K, class V>
-static void encode_json(const char *name, const std::map<K, V>& m, ceph::Formatter *f)
+template<class K, class V, class C = std::less<K> >
+static void encode_json(const char *name, const std::map<K, V, C>& m, ceph::Formatter *f)
{
f->open_array_section(name);
- for (typename std::map<K, V>::const_iterator i = m.begin(); i != m.end(); ++i) {
+ for (typename std::map<K, V, C>::const_iterator i = m.begin(); i != m.end(); ++i) {
f->open_object_section("entry");
encode_json("key", i->first, f);
encode_json("val", i->second, f);
diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt
index 6d2a5928164..c0e0c907b75 100644
--- a/src/rgw/CMakeLists.txt
+++ b/src/rgw/CMakeLists.txt
@@ -55,6 +55,7 @@ set(rgw_a_srcs
rgw_cors_s3.cc
rgw_dencoder.cc
rgw_env.cc
+ rgw_es_query.cc
rgw_formats.cc
rgw_frontend.cc
rgw_gc.cc
@@ -73,6 +74,7 @@ set(rgw_a_srcs
rgw_data_sync.cc
rgw_sync_module.cc
rgw_sync_module_es.cc
+ rgw_sync_module_es_rest.cc
rgw_sync_module_log.cc
rgw_period_history.cc
rgw_period_puller.cc
@@ -194,6 +196,17 @@ target_link_libraries(radosgw-admin rgw_a librados
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${SSL_LIBRARIES} ${BLKID_LIBRARIES})
install(TARGETS radosgw-admin DESTINATION bin)
+set(radosgw_es_srcs
+ rgw_es_main.cc)
+add_executable(radosgw-es ${radosgw_es_srcs})
+target_link_libraries(radosgw-es rgw_a librados
+ cls_rgw_client cls_lock_client cls_refcount_client
+ cls_log_client cls_statelog_client cls_timeindex_client
+ cls_version_client cls_replica_log_client cls_user_client
+ global ${FCGI_LIBRARY} ${LIB_RESOLV}
+ ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${SSL_LIBRARIES} ${BLKID_LIBRARIES})
+install(TARGETS radosgw-es DESTINATION bin)
+
set(radosgw_token_srcs
rgw_token.cc)
add_executable(radosgw-token ${radosgw_token_srcs})
diff --git a/src/rgw/rgw_acl.h b/src/rgw/rgw_acl.h
index 26c84d121a2..0225797e5b7 100644
--- a/src/rgw/rgw_acl.h
+++ b/src/rgw/rgw_acl.h
@@ -377,6 +377,7 @@ public:
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
+ void decode_json(JSONObj *obj);
static void generate_test_instances(list<ACLOwner*>& o);
void set_id(const rgw_user& _id) { id = _id; }
void set_name(const string& name) { display_name = name; }
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc
index fe88d824749..dfe61e0ded8 100644
--- a/src/rgw/rgw_admin.cc
+++ b/src/rgw/rgw_admin.cc
@@ -2182,7 +2182,7 @@ static void sync_status(Formatter *formatter)
tab_dump("data sync", width, data_status);
}
-static void parse_tier_config_param(const string& s, map<string, string>& out)
+static void parse_tier_config_param(const string& s, map<string, string, ltstr_nocase>& out)
{
list<string> confs;
get_str_list(s, ",", confs);
@@ -2514,8 +2514,8 @@ int main(int argc, const char **argv)
string tier_type;
bool tier_type_specified = false;
- map<string, string> tier_config_add;
- map<string, string> tier_config_rm;
+ map<string, string, ltstr_nocase> tier_config_add;
+ map<string, string, ltstr_nocase> tier_config_rm;
boost::optional<string> index_pool;
boost::optional<string> data_pool;
@@ -3935,6 +3935,7 @@ int main(int argc, const char **argv)
zone.system_key.id = access_key;
zone.system_key.key = secret_key;
zone.realm_id = realm_id;
+ zone.tier_config = tier_config_add;
ret = zone.create();
if (ret < 0) {
diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc
index ddbd8fae708..365cd86cb4a 100644
--- a/src/rgw/rgw_common.cc
+++ b/src/rgw/rgw_common.cc
@@ -550,7 +550,7 @@ bool parse_iso8601(const char *s, struct tm *t, uint32_t *pns, bool extended_for
trim_whitespace(p, str);
int len = str.size();
- if (len == 1 && str[0] == 'Z')
+ if (len == 0 || (len == 1 && str[0] == 'Z'))
return true;
if (str[0] != '.' ||
diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h
index bc049346e73..0a7cc342c09 100644
--- a/src/rgw/rgw_common.h
+++ b/src/rgw/rgw_common.h
@@ -357,6 +357,10 @@ class RGWHTTPArgs
}
};
+const char *rgw_conf_get(const map<string, string, ltstr_nocase>& conf_map, const char *name, const char *def_val);
+int rgw_conf_get_int(const map<string, string, ltstr_nocase>& conf_map, const char *name, int def_val);
+bool rgw_conf_get_bool(const map<string, string, ltstr_nocase>& conf_map, const char *name, bool def_val);
+
class RGWEnv;
class RGWConf {
@@ -464,8 +468,11 @@ enum RGWOpType {
/* rgw specific */
RGW_OP_ADMIN_SET_METADATA,
RGW_OP_GET_OBJ_LAYOUT,
-
- RGW_OP_BULK_UPLOAD
+ RGW_OP_BULK_UPLOAD,
+ RGW_OP_METADATA_SEARCH,
+ RGW_OP_CONFIG_BUCKET_META_SEARCH,
+ RGW_OP_GET_BUCKET_META_SEARCH,
+ RGW_OP_DEL_BUCKET_META_SEARCH,
};
class RGWAccessControlPolicy;
@@ -1168,9 +1175,11 @@ struct RGWBucketInfo
bool swift_versioning;
string swift_ver_location;
+ map<string, uint32_t> mdsearch_config;
+
void encode(bufferlist& bl) const {
- ENCODE_START(17, 4, bl);
+ ENCODE_START(18, 4, bl);
::encode(bucket, bl);
::encode(owner.id, bl);
::encode(flags, bl);
@@ -1194,10 +1203,11 @@ struct RGWBucketInfo
::encode(swift_ver_location, bl);
}
::encode(creation_time, bl);
+ ::encode(mdsearch_config, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START_LEGACY_COMPAT_LEN_32(17, 4, 4, bl);
+ DECODE_START_LEGACY_COMPAT_LEN_32(18, 4, 4, bl);
::decode(bucket, bl);
if (struct_v >= 2) {
string s;
@@ -1254,6 +1264,9 @@ struct RGWBucketInfo
if (struct_v >= 17) {
::decode(creation_time, bl);
}
+ if (struct_v >= 18) {
+ ::decode(mdsearch_config, bl);
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc
index b23e306c124..9b2aefe70e4 100644
--- a/src/rgw/rgw_data_sync.cc
+++ b/src/rgw/rgw_data_sync.cc
@@ -22,6 +22,8 @@
#include "cls/lock/cls_lock_client.h"
+#include "auth/Crypto.h"
+
#define dout_subsys ceph_subsys_rgw
#undef dout_prefix
@@ -470,12 +472,15 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
map<int, RGWDataChangesLogInfo> shards_info;
public:
RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
+ uint64_t instance_id,
rgw_data_sync_status *status)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
pool(store->get_zone_params().log_pool),
num_shards(num_shards), status(status) {
lock_name = "sync_lock";
+ status->sync_info.instance_id = instance_id;
+
#define COOKIE_LEN 16
char buf[COOKIE_LEN + 1];
@@ -680,7 +685,9 @@ int RGWRemoteDataLog::init_sync_status(int num_shards)
}
RGWDataSyncEnv sync_env_local = sync_env;
sync_env_local.http_manager = &http_manager;
- ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, &sync_status));
+ uint64_t instance_id;
+ get_random_bytes((char *)&instance_id, sizeof(instance_id));
+ ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
http_manager.stop();
return ret;
}
@@ -1432,6 +1439,8 @@ class RGWDataSyncCR : public RGWCoroutine {
bool *reset_backoff;
RGWDataSyncDebugLogger logger;
+
+ RGWDataSyncModule *data_sync_module{nullptr};
public:
RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
@@ -1439,6 +1448,7 @@ public:
marker_tracker(NULL),
shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
+
}
~RGWDataSyncCR() override {
@@ -1453,6 +1463,8 @@ public:
/* read sync status */
yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
+ data_sync_module = sync_env->sync_module->get_data_handler();
+
if (retcode == -ENOENT) {
sync_status.sync_info.num_shards = num_shards;
} else if (retcode < 0 && retcode != -ENOENT) {
@@ -1463,7 +1475,9 @@ public:
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
- yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, &sync_status));
+ uint64_t instance_id;
+ get_random_bytes((char *)&instance_id, sizeof(instance_id));
+ yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
return set_cr_error(retcode);
@@ -1473,7 +1487,15 @@ public:
*reset_backoff = true;
}
+ data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
+
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
+ /* call sync module init here */
+ yield call(data_sync_module->init_sync(sync_env));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
/* state: building full sync maps */
ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
@@ -1550,7 +1572,7 @@ public:
}
};
-int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance)
+int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
{
instance->reset(new RGWDefaultSyncModuleInstance());
return 0;
diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h
index 75ae358198d..00e09447455 100644
--- a/src/rgw/rgw_data_sync.h
+++ b/src/rgw/rgw_data_sync.h
@@ -29,17 +29,23 @@ struct rgw_data_sync_info {
uint16_t state;
uint32_t num_shards;
+ uint64_t instance_id{0};
+
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(state, bl);
::encode(num_shards, bl);
+ ::encode(instance_id, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
::decode(state, bl);
::decode(num_shards, bl);
+ if (struct_v >= 2) {
+ ::decode(instance_id, bl);
+ }
DECODE_FINISH(bl);
}
@@ -61,6 +67,7 @@ struct rgw_data_sync_info {
}
encode_json("status", s, f);
encode_json("num_shards", num_shards, f);
+ encode_json("instance_id", instance_id, f);
}
void decode_json(JSONObj *obj) {
std::string s;
@@ -73,6 +80,7 @@ struct rgw_data_sync_info {
state = StateInit;
}
JSONDecoder::decode_json("num_shards", num_shards, obj);
+ JSONDecoder::decode_json("instance_id", num_shards, obj);
}
rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
@@ -520,7 +528,7 @@ class RGWDefaultSyncModule : public RGWSyncModule {
public:
RGWDefaultSyncModule() {}
bool supports_data_export() override { return true; }
- int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
};
// DataLogTrimCR factory function
diff --git a/src/rgw/rgw_env.cc b/src/rgw/rgw_env.cc
index 737cda4cc97..d1f876dcbee 100644
--- a/src/rgw/rgw_env.cc
+++ b/src/rgw/rgw_env.cc
@@ -41,35 +41,50 @@ void RGWEnv::init(CephContext *cct, char **envp)
init(cct);
}
-const char *RGWEnv::get(const char *name, const char *def_val)
+const char *rgw_conf_get(const map<string, string, ltstr_nocase>& conf_map, const char *name, const char *def_val)
{
- map<string, string, ltstr_nocase>::iterator iter = env_map.find(name);
- if (iter == env_map.end())
+ auto iter = conf_map.find(name);
+ if (iter == conf_map.end())
return def_val;
return iter->second.c_str();
}
-int RGWEnv::get_int(const char *name, int def_val)
+const char *RGWEnv::get(const char *name, const char *def_val)
{
- map<string, string, ltstr_nocase>::iterator iter = env_map.find(name);
- if (iter == env_map.end())
+ return rgw_conf_get(env_map, name, def_val);
+}
+
+int rgw_conf_get_int(const map<string, string, ltstr_nocase>& conf_map, const char *name, int def_val)
+{
+ auto iter = conf_map.find(name);
+ if (iter == conf_map.end())
return def_val;
const char *s = iter->second.c_str();
return atoi(s);
}
-bool RGWEnv::get_bool(const char *name, bool def_val)
+int RGWEnv::get_int(const char *name, int def_val)
{
- map<string, string, ltstr_nocase>::iterator iter = env_map.find(name);
- if (iter == env_map.end())
+ return rgw_conf_get_int(env_map, name, def_val);
+}
+
+bool rgw_conf_get_bool(const map<string, string, ltstr_nocase>& conf_map, const char *name, bool def_val)
+{
+ auto iter = conf_map.find(name);
+ if (iter == conf_map.end())
return def_val;
const char *s = iter->second.c_str();
return rgw_str_to_bool(s, def_val);
}
+bool RGWEnv::get_bool(const char *name, bool def_val)
+{
+ return rgw_conf_get_bool(env_map, name, def_val);
+}
+
size_t RGWEnv::get_size(const char *name, size_t def_val)
{
map<string, string, ltstr_nocase>::iterator iter = env_map.find(name);
diff --git a/src/rgw/rgw_es_main.cc b/src/rgw/rgw_es_main.cc
new file mode 100644
index 00000000000..78b2865ed33
--- /dev/null
+++ b/src/rgw/rgw_es_main.cc
@@ -0,0 +1,77 @@
+#include <list>
+#include <string>
+#include <iostream>
+
+#include "global/global_init.h"
+#include "global/global_context.h"
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_json.h"
+#include "rgw_es_query.h"
+
+using namespace std;
+
+int main(int argc, char *argv[])
+{
+ vector<const char*> args;
+ argv_to_vec(argc, (const char **)argv, args);
+ env_to_vec(args);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY, 0);
+
+ common_init_finish(g_ceph_context);
+
+ list<string> infix;
+
+ string expr;
+
+ if (argc > 1) {
+ expr = argv[1];
+ } else {
+ expr = "age >= 30";
+ }
+
+ ESQueryCompiler es_query(expr, nullptr, "x-amz-meta-");
+
+ map<string, string, ltstr_nocase> aliases = { { "key", "name" },
+ { "etag", "meta.etag" },
+ { "size", "meta.size" },
+ { "mtime", "meta.mtime" },
+ { "lastmodified", "meta.mtime" },
+ { "contenttype", "meta.contenttype" },
+ };
+ es_query.set_field_aliases(&aliases);
+
+ map<string, ESEntityTypeMap::EntityType> generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR},
+ {"name", ESEntityTypeMap::ES_ENTITY_STR},
+ {"instance", ESEntityTypeMap::ES_ENTITY_STR},
+ {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR},
+ {"meta.contenttype", ESEntityTypeMap::ES_ENTITY_STR},
+ {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE},
+ {"meta.size", ESEntityTypeMap::ES_ENTITY_INT} };
+ ESEntityTypeMap gm(generic_map);
+ es_query.set_generic_type_map(&gm);
+
+ map<string, ESEntityTypeMap::EntityType> custom_map = { {"str", ESEntityTypeMap::ES_ENTITY_STR},
+ {"int", ESEntityTypeMap::ES_ENTITY_INT},
+ {"date", ESEntityTypeMap::ES_ENTITY_DATE} };
+ ESEntityTypeMap em(custom_map);
+ es_query.set_custom_type_map(&em);
+
+ string err;
+
+ bool valid = es_query.compile(&err);
+ if (!valid) {
+ cout << "failed to compile query: " << err << std::endl;
+ return EINVAL;
+ }
+
+ JSONFormatter f;
+ encode_json("root", es_query, &f);
+
+ f.flush(cout);
+
+ return 0;
+}
+
diff --git a/src/rgw/rgw_es_query.cc b/src/rgw/rgw_es_query.cc
new file mode 100644
index 00000000000..9a36cbc0d0f
--- /dev/null
+++ b/src/rgw/rgw_es_query.cc
@@ -0,0 +1,660 @@
+#include <list>
+#include <map>
+#include <string>
+#include <iostream>
+#include <boost/algorithm/string.hpp>
+
+#include "common/ceph_json.h"
+#include "rgw_common.h"
+#include "rgw_es_query.h"
+
+using namespace std;
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+bool pop_front(list<string>& l, string *s)
+{
+ if (l.empty()) {
+ return false;
+ }
+ *s = l.front();
+ l.pop_front();
+ return true;
+}
+
+map<string, int> operator_map = {
+ { "or", 1 },
+ { "and", 2 },
+ { "<", 3 },
+ { "<=", 3 },
+ { "==", 3 },
+ { ">=", 3 },
+ { ">", 3 },
+};
+
+bool is_operator(const string& s)
+{
+ return (operator_map.find(s) != operator_map.end());
+}
+
+int operand_value(const string& op)
+{
+ auto i = operator_map.find(op);
+ if (i == operator_map.end()) {
+ return 0;
+ }
+
+ return i->second;
+}
+
+int check_precedence(const string& op1, const string& op2)
+{
+ return operand_value(op1) - operand_value(op2);
+}
+
+static bool infix_to_prefix(list<string>& source, list<string> *out)
+{
+ list<string> operator_stack;
+ list<string> operand_stack;
+
+ operator_stack.push_front("(");
+ source.push_back(")");
+
+ for (string& entity : source) {
+ if (entity == "(") {
+ operator_stack.push_front(entity);
+ } else if (entity == ")") {
+ string popped_operator;
+ if (!pop_front(operator_stack, &popped_operator)) {
+ return false;
+ }
+
+ while (popped_operator != "(") {
+ operand_stack.push_front(popped_operator);
+ if (!pop_front(operator_stack, &popped_operator)) {
+ return false;
+ }
+ }
+
+ } else if (is_operator(entity)) {
+ string popped_operator;
+ if (!pop_front(operator_stack, &popped_operator)) {
+ return false;
+ }
+
+ int precedence = check_precedence(popped_operator, entity);
+
+ while (precedence >= 0) {
+ operand_stack.push_front(popped_operator);
+ if (!pop_front(operator_stack, &popped_operator)) {
+ return false;
+ }
+ precedence = check_precedence(popped_operator, entity);
+ }
+
+ operator_stack.push_front(popped_operator);
+ operator_stack.push_front(entity);
+ } else {
+ operand_stack.push_front(entity);
+ }
+
+ }
+
+ if (!operator_stack.empty()) {
+ return false;
+ }
+
+ out->swap(operand_stack);
+ return true;
+}
+
+class ESQueryNode {
+protected:
+ ESQueryCompiler *compiler;
+public:
+ ESQueryNode(ESQueryCompiler *_compiler) : compiler(_compiler) {}
+ virtual ~ESQueryNode() {}
+
+ virtual bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) = 0;
+
+ virtual void dump(Formatter *f) const = 0;
+};
+
+static bool alloc_node(ESQueryCompiler *compiler, ESQueryStack *s, ESQueryNode **pnode, string *perr);
+
+class ESQueryNode_Bool : public ESQueryNode {
+ string op;
+ ESQueryNode *first{nullptr};
+ ESQueryNode *second{nullptr};
+public:
+ ESQueryNode_Bool(ESQueryCompiler *compiler) : ESQueryNode(compiler) {}
+ ESQueryNode_Bool(ESQueryCompiler *compiler, const string& _op, ESQueryNode *_first, ESQueryNode *_second) :ESQueryNode(compiler), op(_op), first(_first), second(_second) {}
+ bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) override {
+ bool valid = s->pop(&op);
+ if (!valid) {
+ *perr = "incorrect expression";
+ return false;
+ }
+ valid = alloc_node(compiler, s, &first, perr) &&
+ alloc_node(compiler, s, &second, perr);
+ if (!valid) {
+ return false;
+ }
+ *pnode = this;
+ return true;
+ }
+ virtual ~ESQueryNode_Bool() {
+ delete first;
+ delete second;
+ }
+
+ void dump(Formatter *f) const {
+ f->open_object_section("bool");
+ const char *section = (op == "and" ? "must" : "should");
+ f->open_array_section(section);
+ encode_json("entry", *first, f);
+ encode_json("entry", *second, f);
+ f->close_section();
+ f->close_section();
+ }
+
+};
+
+class ESQueryNodeLeafVal {
+public:
+ ESQueryNodeLeafVal() = default;
+ virtual ~ESQueryNodeLeafVal() {}
+
+ virtual bool init(const string& str_val, string *perr) = 0;
+ virtual void encode_json(const string& field, Formatter *f) const = 0;
+};
+
+class ESQueryNodeLeafVal_Str : public ESQueryNodeLeafVal {
+ string val;
+public:
+ ESQueryNodeLeafVal_Str() {}
+ bool init(const string& str_val, string *perr) override {
+ val = str_val;
+ return true;
+ }
+ void encode_json(const string& field, Formatter *f) const {
+ ::encode_json(field.c_str(), val.c_str(), f);
+ }
+};
+
+class ESQueryNodeLeafVal_Int : public ESQueryNodeLeafVal {
+ int64_t val;
+public:
+ ESQueryNodeLeafVal_Int() {}
+ bool init(const string& str_val, string *perr) override {
+ string err;
+ val = strict_strtoll(str_val.c_str(), 10, &err);
+ if (!err.empty()) {
+ *perr = string("failed to parse integer: ") + err;
+ return false;
+ }
+ return true;
+ }
+ void encode_json(const string& field, Formatter *f) const {
+ ::encode_json(field.c_str(), val, f);
+ }
+};
+
+class ESQueryNodeLeafVal_Date : public ESQueryNodeLeafVal {
+ ceph::real_time val;
+public:
+ ESQueryNodeLeafVal_Date() {}
+ bool init(const string& str_val, string *perr) override {
+ if (parse_time(str_val.c_str(), &val) < 0) {
+ *perr = string("failed to parse date: ") + str_val;
+ return false;
+ }
+ return true;
+ }
+ void encode_json(const string& field, Formatter *f) const {
+ string s;
+ rgw_to_iso8601(val, &s);
+ ::encode_json(field.c_str(), s, f);
+ }
+};
+
+class ESQueryNode_Op : public ESQueryNode {
+protected:
+ string op;
+ string field;
+ string str_val;
+ ESQueryNodeLeafVal *val{nullptr};
+ ESEntityTypeMap::EntityType entity_type{ESEntityTypeMap::ES_ENTITY_NONE};
+ bool allow_restricted{false};
+
+ bool val_from_str(string *perr) {
+ switch (entity_type) {
+ case ESEntityTypeMap::ES_ENTITY_DATE:
+ val = new ESQueryNodeLeafVal_Date;
+ break;
+ case ESEntityTypeMap::ES_ENTITY_INT:
+ val = new ESQueryNodeLeafVal_Int;
+ break;
+ default:
+ val = new ESQueryNodeLeafVal_Str;
+ }
+ return val->init(str_val, perr);
+ }
+ bool do_init(ESQueryNode **pnode, string *perr) {
+ field = compiler->unalias_field(field);
+ ESQueryNode *effective_node;
+ if (!handle_nested(&effective_node, perr)) {
+ return false;
+ }
+ if (!val_from_str(perr)) {
+ return false;
+ }
+ *pnode = effective_node;
+ return true;
+ }
+
+public:
+ ESQueryNode_Op(ESQueryCompiler *compiler) : ESQueryNode(compiler) {}
+ ~ESQueryNode_Op() {
+ delete val;
+ }
+ virtual bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) override {
+ bool valid = s->pop(&op) &&
+ s->pop(&str_val) &&
+ s->pop(&field);
+ if (!valid) {
+ *perr = "invalid expression";
+ return false;
+ }
+ return do_init(pnode, perr);
+ }
+ bool handle_nested(ESQueryNode **pnode, string *perr);
+
+ void set_allow_restricted(bool allow) {
+ allow_restricted = allow;
+ }
+
+ virtual void dump(Formatter *f) const = 0;
+};
+
+class ESQueryNode_Op_Equal : public ESQueryNode_Op {
+public:
+ ESQueryNode_Op_Equal(ESQueryCompiler *compiler) : ESQueryNode_Op(compiler) {}
+ ESQueryNode_Op_Equal(ESQueryCompiler *compiler, const string& f, const string& v) : ESQueryNode_Op(compiler) {
+ op = "==";
+ field = f;
+ str_val = v;
+ }
+
+ bool init(ESQueryStack *s, ESQueryNode **pnode, string *perr) override {
+ if (op.empty()) {
+ return ESQueryNode_Op::init(s, pnode, perr);
+ }
+ return do_init(pnode, perr);
+ }
+
+ virtual void dump(Formatter *f) const {
+ f->open_object_section("term");
+ val->encode_json(field, f);
+ f->close_section();
+ }
+};
+
+class ESQueryNode_Op_Range : public ESQueryNode_Op {
+ string range_str;
+public:
+ ESQueryNode_Op_Range(ESQueryCompiler *compiler, const string& rs) : ESQueryNode_Op(compiler), range_str(rs) {}
+
+ virtual void dump(Formatter *f) const {
+ f->open_object_section("range");
+ f->open_object_section(field.c_str());
+ val->encode_json(range_str, f);
+ f->close_section();
+ f->close_section();
+ }
+};
+
+class ESQueryNode_Op_Nested_Parent : public ESQueryNode_Op {
+public:
+ ESQueryNode_Op_Nested_Parent(ESQueryCompiler *compiler) : ESQueryNode_Op(compiler) {}
+
+ virtual string get_custom_leaf_field_name() = 0;
+};
+
+template <class T>
+class ESQueryNode_Op_Nested : public ESQueryNode_Op_Nested_Parent {
+ string name;
+ ESQueryNode *next;
+public:
+ ESQueryNode_Op_Nested(ESQueryCompiler *compiler, const string& _name, ESQueryNode *_next) : ESQueryNode_Op_Nested_Parent(compiler),
+ name(_name), next(_next) {}
+ ~ESQueryNode_Op_Nested() {
+ delete next;
+ }
+
+ virtual void dump(Formatter *f) const {
+ f->open_object_section("nested");
+ string s = string("meta.custom-") + type_str();
+ encode_json("path", s.c_str(), f);
+ f->open_object_section("query");
+ f->open_object_section("bool");
+ f->open_array_section("must");
+ f->open_object_section("entry");
+ f->open_object_section("match");
+ string n = s + ".name";
+ encode_json(n.c_str(), name.c_str(), f);
+ f->close_section();
+ f->close_section();
+ encode_json("entry", *next, f);
+ f->close_section();
+ f->close_section();
+ f->close_section();
+ f->close_section();
+ }
+
+ string type_str() const;
+ string get_custom_leaf_field_name() {
+ return string("meta.custom-") + type_str() + ".value";
+ }
+};
+
+template<>
+string ESQueryNode_Op_Nested<string>::type_str() const {
+ return "string";
+}
+
+template<>
+string ESQueryNode_Op_Nested<int64_t>::type_str() const {
+ return "int";
+}
+
+template<>
+string ESQueryNode_Op_Nested<ceph::real_time>::type_str() const {
+ return "date";
+}
+
+bool ESQueryNode_Op::handle_nested(ESQueryNode **pnode, string *perr)
+{
+ string field_name = field;
+ const string& custom_prefix = compiler->get_custom_prefix();
+ if (!boost::algorithm::starts_with(field_name, custom_prefix)) {
+ *pnode = this;
+ auto m = compiler->get_generic_type_map();
+ if (m) {
+ bool found = m->find(field_name, &entity_type) &&
+ (allow_restricted || !compiler->is_restricted(field_name));
+ if (!found) {
+ *perr = string("unexpected generic field '") + field_name + "'";
+ }
+ return found;
+ }
+ *perr = "query parser does not support generic types";
+ return false;
+ }
+
+ field_name = field_name.substr(custom_prefix.size());
+ auto m = compiler->get_custom_type_map();
+ if (m) {
+ m->find(field_name, &entity_type);
+ /* ignoring returned bool, for now just treat it as string */
+ }
+
+ ESQueryNode_Op_Nested_Parent *new_node;
+ switch (entity_type) {
+ case ESEntityTypeMap::ES_ENTITY_INT:
+ new_node = new ESQueryNode_Op_Nested<int64_t>(compiler, field_name, this);
+ break;
+ case ESEntityTypeMap::ES_ENTITY_DATE:
+ new_node = new ESQueryNode_Op_Nested<ceph::real_time>(compiler, field_name, this);
+ break;
+ default:
+ new_node = new ESQueryNode_Op_Nested<string>(compiler, field_name, this);
+ }
+
+ field = new_node->get_custom_leaf_field_name();
+ *pnode = new_node;
+
+ return true;
+}
+
+static bool is_bool_op(const string& str)
+{
+ return (str == "or" || str == "and");
+}
+
+static bool alloc_node(ESQueryCompiler *compiler, ESQueryStack *s, ESQueryNode **pnode, string *perr)
+{
+ string op;
+ bool valid = s->peek(&op);
+ if (!valid) {
+ *perr = "incorrect expression";
+ return false;
+ }
+
+ ESQueryNode *node;
+
+ if (is_bool_op(op)) {
+ node = new ESQueryNode_Bool(compiler);
+ } else if (op == "==") {
+ node = new ESQueryNode_Op_Equal(compiler);
+ } else {
+ static map<string, string> range_op_map = {
+ { "<", "lt"},
+ { "<=", "lte"},
+ { ">=", "gte"},
+ { ">", "gt"},
+ };
+
+ auto iter = range_op_map.find(op);
+ if (iter == range_op_map.end()) {
+ *perr = string("invalid operator: ") + op;
+ return false;
+ }
+
+ node = new ESQueryNode_Op_Range(compiler, iter->second);
+ }
+
+ if (!node->init(s, pnode, perr)) {
+ delete node;
+ return false;
+ }
+ return true;
+}
+
+
+bool is_key_char(char c)
+{
+ switch (c) {
+ case '(':
+ case ')':
+ case '<':
+ case '>':
+ case '@':
+ case ',':
+ case ';':
+ case ':':
+ case '\\':
+ case '"':
+ case '/':
+ case '[':
+ case ']':
+ case '?':
+ case '=':
+ case '{':
+ case '}':
+ case ' ':
+ case '\t':
+ return false;
+ };
+ return (isascii(c) > 0);
+}
+
+static bool is_op_char(char c)
+{
+ switch (c) {
+ case '<':
+ case '=':
+ case '>':
+ return true;
+ };
+ return false;
+}
+
+static bool is_val_char(char c)
+{
+ if (isspace(c)) {
+ return false;
+ }
+ return (c != ')');
+}
+
+void ESInfixQueryParser::skip_whitespace(const char *str, int size, int& pos) {
+ while (pos < size && isspace(str[pos])) {
+ ++pos;
+ }
+}
+
+bool ESInfixQueryParser::get_next_token(bool (*filter)(char)) {
+ skip_whitespace(str, size, pos);
+ int token_start = pos;
+ while (pos < size && filter(str[pos])) {
+ ++pos;
+ }
+ if (pos == token_start) {
+ return false;
+ }
+ string token = string(str + token_start, pos - token_start);
+ args.push_back(token);
+ return true;
+}
+
+bool ESInfixQueryParser::parse_condition() {
+ /*
+ * condition: <key> <operator> <val>
+ *
+ * whereas key: needs to conform to http header field restrictions
+ * operator: one of the following: < <= == >= >
+ * val: ascii, terminated by either space or ')' (or end of string)
+ */
+
+ /* parse key */
+ bool valid = get_next_token(is_key_char) &&
+ get_next_token(is_op_char) &&
+ get_next_token(is_val_char);
+
+ if (!valid) {
+ return false;
+ }
+
+ return true;
+}
+
+bool ESInfixQueryParser::parse_and_or() {
+ skip_whitespace(str, size, pos);
+ if (pos + 3 <= size && strncmp(str + pos, "and", 3) == 0) {
+ pos += 3;
+ args.push_back("and");
+ return true;
+ }
+
+ if (pos + 2 <= size && strncmp(str + pos, "or", 2) == 0) {
+ pos += 2;
+ args.push_back("or");
+ return true;
+ }
+
+ return false;
+}
+
+bool ESInfixQueryParser::parse_specific_char(const char *pchar) {
+ skip_whitespace(str, size, pos);
+ if (pos >= size) {
+ return false;
+ }
+ if (str[pos] != *pchar) {
+ return false;
+ }
+
+ args.push_back(pchar);
+ ++pos;
+ return true;
+}
+
+bool ESInfixQueryParser::parse_open_bracket() {
+ return parse_specific_char("(");
+}
+
+bool ESInfixQueryParser::parse_close_bracket() {
+ return parse_specific_char(")");
+}
+
+bool ESInfixQueryParser::parse(list<string> *result) {
+ /*
+ * expression: [(]<condition>[[and/or]<condition>][)][and/or]...
+ */
+
+ while (pos < size) {
+ parse_open_bracket();
+ if (!parse_condition()) {
+ return false;
+ }
+ parse_close_bracket();
+ parse_and_or();
+ }
+
+ result->swap(args);
+
+ return true;
+}
+
+bool ESQueryCompiler::convert(list<string>& infix, string *perr) {
+ list<string> prefix;
+ if (!infix_to_prefix(infix, &prefix)) {
+ *perr = "invalid query";
+ return false;
+ }
+ stack.assign(prefix);
+ if (!alloc_node(this, &stack, &query_root, perr)) {
+ return false;
+ }
+ if (!stack.done()) {
+ *perr = "invalid query";
+ return false;
+ }
+ return true;
+}
+
+ESQueryCompiler::~ESQueryCompiler() {
+ delete query_root;
+}
+
+bool ESQueryCompiler::compile(string *perr) {
+ list<string> infix;
+ if (!parser.parse(&infix)) {
+ *perr = "failed to parse query";
+ return false;
+ }
+
+ if (!convert(infix, perr)) {
+ return false;
+ }
+
+ for (auto& c : eq_conds) {
+ ESQueryNode_Op_Equal *eq_node = new ESQueryNode_Op_Equal(this, c.first, c.second);
+ eq_node->set_allow_restricted(true); /* can access restricted fields */
+ ESQueryNode *effective_node;
+ if (!eq_node->init(nullptr, &effective_node, perr)) {
+ delete eq_node;
+ return false;
+ }
+ query_root = new ESQueryNode_Bool(this, "and", effective_node, query_root);
+ }
+
+ return true;
+}
+
+void ESQueryCompiler::dump(Formatter *f) const {
+ encode_json("query", *query_root, f);
+}
+
diff --git a/src/rgw/rgw_es_query.h b/src/rgw/rgw_es_query.h
new file mode 100644
index 00000000000..4375d93251c
--- /dev/null
+++ b/src/rgw/rgw_es_query.h
@@ -0,0 +1,162 @@
+#ifndef CEPH_RGW_ES_QUERY_H
+#define CEPH_RGW_ES_QUERY_H
+
+#include "rgw_string.h"
+
+class ESQueryStack {
+ list<string> l;
+ list<string>::iterator iter;
+
+public:
+ ESQueryStack(list<string>& src) {
+ assign(src);
+ }
+
+ ESQueryStack() {}
+
+ void assign(list<string>& src) {
+ l.swap(src);
+ iter = l.begin();
+ }
+
+ bool peek(string *dest) {
+ if (done()) {
+ return false;
+ }
+ *dest = *iter;
+ return true;
+ }
+
+ bool pop(string *dest) {
+ bool valid = peek(dest);
+ if (!valid) {
+ return false;
+ }
+ ++iter;
+ return true;
+ }
+
+ bool done() {
+ return (iter == l.end());
+ }
+};
+
+class ESInfixQueryParser {
+ string query;
+ int size;
+ const char *str;
+ int pos{0};
+ list<string> args;
+
+ void skip_whitespace(const char *str, int size, int& pos);
+ bool get_next_token(bool (*filter)(char));
+
+ bool parse_condition();
+ bool parse_and_or();
+ bool parse_specific_char(const char *pchar);
+ bool parse_open_bracket();
+ bool parse_close_bracket();
+
+public:
+ ESInfixQueryParser(const string& _query) : query(_query), size(query.size()), str(query.c_str()) {}
+ bool parse(list<string> *result);
+};
+
+class ESQueryNode;
+
+struct ESEntityTypeMap {
+ enum EntityType {
+ ES_ENTITY_NONE = 0,
+ ES_ENTITY_STR = 1,
+ ES_ENTITY_INT = 2,
+ ES_ENTITY_DATE = 3,
+ };
+
+ map<string, EntityType> m;
+
+ ESEntityTypeMap(map<string, EntityType>& _m) : m(_m) {}
+
+ bool find(const string& entity, EntityType *ptype) {
+ auto i = m.find(entity);
+ if (i != m.end()) {
+ *ptype = i->second;
+ return true;
+ }
+
+ *ptype = ES_ENTITY_NONE;
+ return false;
+ }
+};
+
+class ESQueryCompiler {
+ ESInfixQueryParser parser;
+ ESQueryStack stack;
+ ESQueryNode *query_root{nullptr};
+
+ string custom_prefix;
+
+ bool convert(list<string>& infix, string *perr);
+
+ list<pair<string, string> > eq_conds;
+
+ ESEntityTypeMap *generic_type_map{nullptr};
+ ESEntityTypeMap *custom_type_map{nullptr};
+
+ map<string, string, ltstr_nocase> *field_aliases;
+ set<string> *restricted_fields;
+
+public:
+ ESQueryCompiler(const string& query, list<pair<string, string> > *prepend_eq_conds, const string& _custom_prefix) : parser(query), custom_prefix(_custom_prefix) {
+ if (prepend_eq_conds) {
+ eq_conds = std::move(*prepend_eq_conds);
+ }
+ }
+ ~ESQueryCompiler();
+
+ bool compile(string *perr);
+ void dump(Formatter *f) const;
+
+ void set_generic_type_map(ESEntityTypeMap *entity_map) {
+ generic_type_map = entity_map;
+ }
+
+ ESEntityTypeMap *get_generic_type_map() {
+ return generic_type_map;
+ }
+ const string& get_custom_prefix() { return custom_prefix; }
+
+ void set_custom_type_map(ESEntityTypeMap *entity_map) {
+ custom_type_map = entity_map;
+ }
+
+ ESEntityTypeMap *get_custom_type_map() {
+ return custom_type_map;
+ }
+
+ void set_field_aliases(map<string, string, ltstr_nocase> *fa) {
+ field_aliases = fa;
+ }
+
+ string unalias_field(const string& field) {
+ if (!field_aliases) {
+ return field;
+ }
+ auto i = field_aliases->find(field);
+ if (i == field_aliases->end()) {
+ return field;
+ }
+
+ return i->second;
+ }
+
+ void set_restricted_fields(set<string> *rf) {
+ restricted_fields = rf;
+ }
+
+ bool is_restricted(const string& f) {
+ return (restricted_fields && restricted_fields->find(f) != restricted_fields->end());
+ }
+};
+
+
+#endif
diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc
index 81c4c6ef6bc..f9f8d2ac346 100644
--- a/src/rgw/rgw_http_client.cc
+++ b/src/rgw/rgw_http_client.cc
@@ -314,7 +314,7 @@ int RGWHTTPClient::get_req_retcode()
/*
* init request, will be used later with RGWHTTPManager
*/
-int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data)
+int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
{
assert(!req_data);
_req_data->get();
@@ -349,7 +349,7 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
}
curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
- if (is_upload_request(method)) {
+ if (send_data_hint || is_upload_request(method)) {
curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
}
if (has_send_len) {
@@ -746,11 +746,11 @@ void RGWHTTPManager::manage_pending_requests()
}
}
-int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url)
+int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
{
rgw_http_req_data *req_data = new rgw_http_req_data;
- int ret = client->init_request(method, url, req_data);
+ int ret = client->init_request(method, url, req_data, send_data_hint);
if (ret < 0) {
req_data->put();
req_data = NULL;
diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h
index cbe4f3d0312..63ea9000194 100644
--- a/src/rgw/rgw_http_client.h
+++ b/src/rgw/rgw_http_client.h
@@ -42,7 +42,8 @@ protected:
int init_request(const char *method,
const char *url,
- rgw_http_req_data *req_data);
+ rgw_http_req_data *req_data,
+ bool send_data_hint = false);
virtual int receive_header(void *ptr, size_t len) {
return 0;
@@ -264,7 +265,8 @@ public:
int set_threaded();
void stop();
- int add_request(RGWHTTPClient *client, const char *method, const char *url);
+ int add_request(RGWHTTPClient *client, const char *method, const char *url,
+ bool send_data_hint = false);
int remove_request(RGWHTTPClient *client);
/* only for non threaded case */
diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc
index 2a183b59195..f5f3612239e 100644
--- a/src/rgw/rgw_json_enc.cc
+++ b/src/rgw/rgw_json_enc.cc
@@ -201,6 +201,13 @@ void ACLOwner::dump(Formatter *f) const
encode_json("display_name", display_name, f);
}
+void ACLOwner::decode_json(JSONObj *obj) {
+ string id_str;
+ JSONDecoder::decode_json("id", id_str, obj);
+ id.from_str(id_str);
+ JSONDecoder::decode_json("display_name", display_name, obj);
+}
+
void RGWAccessControlPolicy::dump(Formatter *f) const
{
encode_json("acl", acl, f);
@@ -730,6 +737,7 @@ void RGWBucketInfo::dump(Formatter *f) const
encode_json("swift_versioning", swift_versioning, f);
encode_json("swift_ver_location", swift_ver_location, f);
encode_json("index_type", (uint32_t)index_type, f);
+ encode_json("mdsearch_config", mdsearch_config, f);
}
void RGWBucketInfo::decode_json(JSONObj *obj) {
@@ -761,6 +769,7 @@ void RGWBucketInfo::decode_json(JSONObj *obj) {
uint32_t it;
JSONDecoder::decode_json("index_type", it, obj);
index_type = (RGWBucketIndexType)it;
+ JSONDecoder::decode_json("mdsearch_config", mdsearch_config, obj);
}
void rgw_obj_key::dump(Formatter *f) const
diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc
index f018faba8d6..5c3dbb0582b 100644
--- a/src/rgw/rgw_main.cc
+++ b/src/rgw/rgw_main.cc
@@ -182,6 +182,12 @@ static RGWRESTMgr *set_logging(RGWRESTMgr *mgr)
return mgr;
}
+static RGWRESTMgr *rest_filter(RGWRados *store, int dialect, RGWRESTMgr *orig)
+{
+ RGWSyncModuleInstanceRef sync_module = store->get_sync_module();
+ return sync_module->get_rest_filter(dialect, orig);
+}
+
RGWRealmReloader *preloader = NULL;
static void reloader_handler(int signum)
@@ -377,7 +383,8 @@ int main(int argc, const char **argv)
const bool swift_at_root = g_conf->rgw_swift_url_prefix == "/";
if (apis_map.count("s3") > 0 || s3website_enabled) {
if (! swift_at_root) {
- rest.register_default_mgr(set_logging(new RGWRESTMgr_S3(s3website_enabled)));
+ rest.register_default_mgr(set_logging(rest_filter(store, RGW_REST_S3,
+ new RGWRESTMgr_S3(s3website_enabled))));
} else {
derr << "Cannot have the S3 or S3 Website enabled together with "
<< "Swift API placed in the root of hierarchy" << dendl;
@@ -401,7 +408,8 @@ int main(int argc, const char **argv)
if (! swift_at_root) {
rest.register_resource(g_conf->rgw_swift_url_prefix,
- set_logging(swift_resource));
+ set_logging(rest_filter(store, RGW_REST_SWIFT,
+ swift_resource)));
} else {
if (store->get_zonegroup().zones.size() > 1) {
derr << "Placing Swift API in the root of URL hierarchy while running"
diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc
index ac025c6e7d9..d4a9326c3dd 100644
--- a/src/rgw/rgw_op.cc
+++ b/src/rgw/rgw_op.cc
@@ -631,7 +631,7 @@ rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store,
return e;
}
-static void rgw_bucket_object_pre_exec(struct req_state *s)
+void rgw_bucket_object_pre_exec(struct req_state *s)
{
if (s->expect_cont)
dump_continue(s);
@@ -6308,6 +6308,77 @@ void RGWGetObjLayout::execute()
}
+int RGWConfigBucketMetaSearch::verify_permission()
+{
+ if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+ return -EACCES;
+ }
+
+ return 0;
+}
+
+void RGWConfigBucketMetaSearch::pre_exec()
+{
+ rgw_bucket_object_pre_exec(s);
+}
+
+void RGWConfigBucketMetaSearch::execute()
+{
+ op_ret = get_params();
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "NOTICE: get_params() returned ret=" << op_ret << dendl;
+ return;
+ }
+
+ s->bucket_info.mdsearch_config = mdsearch_config;
+
+ op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+ if (op_ret < 0) {
+ ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
+ return;
+ }
+}
+
+int RGWGetBucketMetaSearch::verify_permission()
+{
+ if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+ return -EACCES;
+ }
+
+ return 0;
+}
+
+void RGWGetBucketMetaSearch::pre_exec()
+{
+ rgw_bucket_object_pre_exec(s);
+}
+
+int RGWDelBucketMetaSearch::verify_permission()
+{
+ if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
+ return -EACCES;
+ }
+
+ return 0;
+}
+
+void RGWDelBucketMetaSearch::pre_exec()
+{
+ rgw_bucket_object_pre_exec(s);
+}
+
+void RGWDelBucketMetaSearch::execute()
+{
+ s->bucket_info.mdsearch_config.clear();
+
+ op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
+ if (op_ret < 0) {
+ ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
+ return;
+ }
+}
+
+
RGWHandler::~RGWHandler()
{
}
diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h
index 1b4f02f1b7f..62c71fd98e7 100644
--- a/src/rgw/rgw_op.h
+++ b/src/rgw/rgw_op.h
@@ -97,6 +97,9 @@ public:
};
+
+void rgw_bucket_object_pre_exec(struct req_state *s);
+
/**
* Provide the base class for all ops.
*/
@@ -2012,4 +2015,49 @@ public:
};
+class RGWConfigBucketMetaSearch : public RGWOp {
+protected:
+ std::map<std::string, uint32_t> mdsearch_config;
+public:
+ RGWConfigBucketMetaSearch() {}
+
+ int verify_permission();
+ void pre_exec();
+ void execute();
+
+ virtual int get_params() = 0;
+ virtual void send_response() = 0;
+ virtual const string name() { return "config_bucket_meta_search"; }
+ virtual RGWOpType get_type() { return RGW_OP_CONFIG_BUCKET_META_SEARCH; }
+ virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
+};
+
+class RGWGetBucketMetaSearch : public RGWOp {
+public:
+ RGWGetBucketMetaSearch() {}
+
+ int verify_permission();
+ void pre_exec();
+ void execute() {}
+
+ virtual void send_response() = 0;
+ virtual const string name() { return "get_bucket_meta_search"; }
+ virtual RGWOpType get_type() { return RGW_OP_GET_BUCKET_META_SEARCH; }
+ virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; }
+};
+
+class RGWDelBucketMetaSearch : public RGWOp {
+public:
+ RGWDelBucketMetaSearch() {}
+
+ int verify_permission();
+ void pre_exec();
+ void execute();
+
+ virtual void send_response() = 0;
+ virtual const string name() { return "delete_bucket_meta_search"; }
+ virtual RGWOpType delete_type() { return RGW_OP_DEL_BUCKET_META_SEARCH; }
+ virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
+};
+
#endif /* CEPH_RGW_OP_H */
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index 6e0cca14c16..cf426ff7ca7 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -1154,7 +1154,7 @@ struct RGWZoneParams : RGWSystemMetaObj {
string realm_id;
- map<string, string> tier_config;
+ map<string, string, ltstr_nocase> tier_config;
RGWZoneParams() : RGWSystemMetaObj() {}
RGWZoneParams(const string& name) : RGWSystemMetaObj(name){}
diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc
index ba3363cbd04..1cccbab7d02 100644
--- a/src/rgw/rgw_rest_client.cc
+++ b/src/rgw/rgw_rest_client.cc
@@ -616,17 +616,18 @@ int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
return status;
}
-int RGWRESTStreamRWRequest::get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj)
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
{
string urlsafe_bucket, urlsafe_object;
url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
url_encode(obj.key.name, urlsafe_object);
string resource = urlsafe_bucket + "/" + urlsafe_object;
- return get_resource(key, extra_headers, resource);
+ return send_request(&key, extra_headers, resource, nullptr, mgr);
}
-int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr)
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ bufferlist *send_data, RGWHTTPManager *mgr)
{
string new_url = url;
if (new_url[new_url.size() - 1] != '/')
@@ -671,10 +672,12 @@ int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>&
new_info.init_meta_info(NULL);
- int ret = sign_request(key, new_env, new_info);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
- return ret;
+ if (key) {
+ int ret = sign_request(*key, new_env, new_info);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
+ return ret;
+ }
}
map<string, string, ltstr_nocase>& m = new_env.get_map();
@@ -683,12 +686,18 @@ int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>&
headers.push_back(pair<string, string>(iter->first, iter->second));
}
+ bool send_data_hint = false;
+ if (send_data) {
+ outbl.claim(*send_data);
+ send_data_hint = true;
+ }
+
RGWHTTPManager *pmanager = &http_manager;
if (mgr) {
pmanager = mgr;
}
- int r = pmanager->add_request(this, new_info.method, new_url.c_str());
+ int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint);
if (r < 0)
return r;
@@ -701,7 +710,7 @@ int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>&
return 0;
}
-int RGWRESTStreamRWRequest::complete(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
+int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
{
set_str_from_headers(out_headers, "ETAG", etag);
if (status >= 0) {
diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h
index e09bb2fd57b..09393f8ea4a 100644
--- a/src/rgw/rgw_rest_client.h
+++ b/src/rgw/rgw_rest_client.h
@@ -103,10 +103,10 @@ public:
lock("RGWRESTStreamReadRequest"), cb(_cb),
chunk_ofs(0), ofs(0), http_manager(_cct), method(_method), write_ofs(0) {
}
- ~RGWRESTStreamRWRequest() override {}
- int get_obj(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
- int get_resource(RGWAccessKey& key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr = NULL);
- int complete(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
+ virtual ~RGWRESTStreamRWRequest() override {}
+ int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL);
+ int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = NULL /* optional input data */, RGWHTTPManager *mgr = NULL);
+ int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
void set_outbl(bufferlist& _outbl) {
outbl.swap(_outbl);
diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc
index 71a11554d2c..6496f129d79 100644
--- a/src/rgw/rgw_rest_conn.cc
+++ b/src/rgw/rgw_rest_conn.cc
@@ -203,7 +203,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
}
- int r = (*req)->get_obj(key, extra_headers, obj);
+ int r = (*req)->send_request(key, extra_headers, obj);
if (r < 0) {
delete *req;
*req = nullptr;
@@ -215,7 +215,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req, string& etag, real_time *mtime,
uint64_t *psize, map<string, string>& attrs)
{
- int ret = req->complete(etag, mtime, psize, attrs);
+ int ret = req->complete_request(etag, mtime, psize, attrs);
delete req;
return ret;
@@ -225,6 +225,7 @@ int RGWRESTConn::get_resource(const string& resource,
param_vec_t *extra_params,
map<string, string> *extra_headers,
bufferlist& bl,
+ bufferlist *send_data,
RGWHTTPManager *mgr)
{
string url;
@@ -249,15 +250,15 @@ int RGWRESTConn::get_resource(const string& resource,
headers.insert(extra_headers->begin(), extra_headers->end());
}
- ret = req.get_resource(key, headers, resource, mgr);
+ ret = req.send_request(&key, headers, resource, send_data, mgr);
if (ret < 0) {
- ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl;
+ ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
}
string etag;
map<string, string> attrs;
- return req.complete(etag, NULL, NULL, attrs);
+ return req.complete_request(etag, NULL, NULL, attrs);
}
RGWRESTReadResource::RGWRESTReadResource(RGWRESTConn *_conn,
@@ -296,22 +297,22 @@ void RGWRESTReadResource::init_common(param_vec_t *extra_headers)
int RGWRESTReadResource::read()
{
- int ret = req.get_resource(conn->get_key(), headers, resource, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
if (ret < 0) {
- ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl;
+ ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
}
string etag;
map<string, string> attrs;
- return req.complete(etag, NULL, NULL, attrs);
+ return req.complete_request(etag, NULL, NULL, attrs);
}
int RGWRESTReadResource::aio_read()
{
- int ret = req.get_resource(conn->get_key(), headers, resource, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
if (ret < 0) {
- ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl;
+ ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
}
@@ -357,23 +358,23 @@ void RGWRESTSendResource::init_common(param_vec_t *extra_headers)
int RGWRESTSendResource::send(bufferlist& outbl)
{
req.set_outbl(outbl);
- int ret = req.get_resource(conn->get_key(), headers, resource, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
if (ret < 0) {
- ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl;
+ ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
}
string etag;
map<string, string> attrs;
- return req.complete(etag, NULL, NULL, attrs);
+ return req.complete_request(etag, NULL, NULL, attrs);
}
int RGWRESTSendResource::aio_send(bufferlist& outbl)
{
req.set_outbl(outbl);
- int ret = req.get_resource(conn->get_key(), headers, resource, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
if (ret < 0) {
- ldout(cct, 5) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl;
+ ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
}
diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h
index bec829d6939..f4a1005b6a3 100644
--- a/src/rgw/rgw_rest_conn.h
+++ b/src/rgw/rgw_rest_conn.h
@@ -100,9 +100,13 @@ public:
int get_resource(const string& resource,
param_vec_t *extra_params,
map<string, string>* extra_headers,
- bufferlist& bl, RGWHTTPManager *mgr = NULL);
+ bufferlist& bl,
+ bufferlist *send_data = nullptr,
+ RGWHTTPManager *mgr = nullptr);
template <class T>
+ int get_json_resource(const string& resource, param_vec_t *params, bufferlist *in_data, T& t);
+ template <class T>
int get_json_resource(const string& resource, param_vec_t *params, T& t);
template <class T>
int get_json_resource(const string& resource, const rgw_http_param_pair *pp, T& t);
@@ -110,10 +114,10 @@ public:
template<class T>
-int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, T& t)
+int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, bufferlist *in_data, T& t)
{
bufferlist bl;
- int ret = get_resource(resource, params, NULL, bl);
+ int ret = get_resource(resource, params, nullptr, bl, in_data);
if (ret < 0) {
return ret;
}
@@ -127,6 +131,12 @@ int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params,
}
template<class T>
+int RGWRESTConn::get_json_resource(const string& resource, param_vec_t *params, T& t)
+{
+ return get_json_resource(resource, params, nullptr, t);
+}
+
+template<class T>
int RGWRESTConn::get_json_resource(const string& resource, const rgw_http_param_pair *pp, T& t)
{
param_vec_t params = make_param_list(pp);
diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc
index 7174a0a4e20..eef63a57ce9 100644
--- a/src/rgw/rgw_rest_s3.cc
+++ b/src/rgw/rgw_rest_s3.cc
@@ -29,6 +29,8 @@
#include "rgw_auth_keystone.h"
#include "rgw_auth_registry.h"
+#include "rgw_es_query.h"
+
#include <typeinfo> // for 'typeid'
#include "rgw_ldap.h"
@@ -2713,6 +2715,115 @@ void RGWGetObjLayout_ObjStore_S3::send_response()
rgw_flush_formatter(s, &f);
}
+int RGWConfigBucketMetaSearch_ObjStore_S3::get_params()
+{
+ auto iter = s->info.x_meta_map.find("x-amz-meta-search");
+ if (iter == s->info.x_meta_map.end()) {
+ s->err.message = "X-Rgw-Meta-Search header not provided";
+ ldout(s->cct, 5) << s->err.message << dendl;
+ return -EINVAL;
+ }
+
+ list<string> expressions;
+ get_str_list(iter->second, ",", expressions);
+
+ for (auto& expression : expressions) {
+ vector<string> args;
+ get_str_vec(expression, ";", args);
+
+ if (args.empty()) {
+ s->err.message = "invalid empty expression";
+ ldout(s->cct, 5) << s->err.message << dendl;
+ return -EINVAL;
+ }
+ if (args.size() > 2) {
+ s->err.message = string("invalid expression: ") + expression;
+ ldout(s->cct, 5) << s->err.message << dendl;
+ return -EINVAL;
+ }
+
+ string key = boost::algorithm::to_lower_copy(rgw_trim_whitespace(args[0]));
+ string val;
+ if (args.size() > 1) {
+ val = boost::algorithm::to_lower_copy(rgw_trim_whitespace(args[1]));
+ }
+
+ if (!boost::algorithm::starts_with(key, RGW_AMZ_META_PREFIX)) {
+ s->err.message = string("invalid expression, key must start with '" RGW_AMZ_META_PREFIX "' : ") + expression;
+ ldout(s->cct, 5) << s->err.message << dendl;
+ return -EINVAL;
+ }
+
+ key = key.substr(sizeof(RGW_AMZ_META_PREFIX) - 1);
+
+ ESEntityTypeMap::EntityType entity_type;
+
+ if (val.empty() || val == "str" || val == "string") {
+ entity_type = ESEntityTypeMap::ES_ENTITY_STR;
+ } else if (val == "int" || val == "integer") {
+ entity_type = ESEntityTypeMap::ES_ENTITY_INT;
+ } else if (val == "date" || val == "datetime") {
+ entity_type = ESEntityTypeMap::ES_ENTITY_DATE;
+ } else {
+ s->err.message = string("invalid entity type: ") + val;
+ ldout(s->cct, 5) << s->err.message << dendl;
+ return -EINVAL;
+ }
+
+ mdsearch_config[key] = entity_type;
+ }
+
+ return 0;
+}
+
+void RGWConfigBucketMetaSearch_ObjStore_S3::send_response()
+{
+ if (op_ret)
+ set_req_state_err(s, op_ret);
+ dump_errno(s);
+ end_header(s, this);
+}
+
+void RGWGetBucketMetaSearch_ObjStore_S3::send_response()
+{
+ if (op_ret)
+ set_req_state_err(s, op_ret);
+ dump_errno(s);
+ end_header(s, NULL, "application/xml");
+
+ Formatter *f = s->formatter;
+ f->open_array_section("GetBucketMetaSearchResult");
+ for (auto& e : s->bucket_info.mdsearch_config) {
+ f->open_object_section("Entry");
+ string k = string("x-amz-meta-") + e.first;
+ f->dump_string("Key", k.c_str());
+ const char *type;
+ switch (e.second) {
+ case ESEntityTypeMap::ES_ENTITY_INT:
+ type = "int";
+ break;
+ case ESEntityTypeMap::ES_ENTITY_DATE:
+ type = "date";
+ break;
+ default:
+ type = "str";
+ }
+ f->dump_string("Type", type);
+ f->close_section();
+ }
+ f->close_section();
+ rgw_flush_formatter(s, f);
+}
+
+void RGWDelBucketMetaSearch_ObjStore_S3::send_response()
+{
+ if (op_ret)
+ set_req_state_err(s, op_ret);
+ dump_errno(s);
+ end_header(s, this);
+}
+
+
RGWOp *RGWHandler_REST_Service_S3::op_get()
{
if (is_usage_op()) {
@@ -2756,10 +2867,11 @@ RGWOp *RGWHandler_REST_Service_S3::op_post()
RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data)
{
// Non-website mode
- if (get_data)
+ if (get_data) {
return new RGWListBucket_ObjStore_S3;
- else
+ } else {
return new RGWStatBucket_ObjStore_S3;
+ }
}
RGWOp *RGWHandler_REST_Bucket_S3::op_get()
@@ -2780,6 +2892,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get()
return new RGWGetBucketWebsite_ObjStore_S3;
}
+ if (s->info.args.exists("mdsearch")) {
+ return new RGWGetBucketMetaSearch_ObjStore_S3;
+ }
+
if (is_acl_op()) {
return new RGWGetACLs_ObjStore_S3;
} else if (is_cors_op()) {
@@ -2849,6 +2965,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete()
return new RGWDeleteBucketWebsite_ObjStore_S3;
}
+ if (s->info.args.exists("mdsearch")) {
+ return new RGWDelBucketMetaSearch_ObjStore_S3;
+ }
+
return new RGWDeleteBucket_ObjStore_S3;
}
@@ -2858,6 +2978,10 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_post()
return new RGWDeleteMultiObj_ObjStore_S3;
}
+ if (s->info.args.exists("mdsearch")) {
+ return new RGWConfigBucketMetaSearch_ObjStore_S3;
+ }
+
return new RGWPostObj_ObjStore_S3;
}
diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h
index 8dcf242b283..d426b3d40ca 100644
--- a/src/rgw/rgw_rest_s3.h
+++ b/src/rgw/rgw_rest_s3.h
@@ -418,6 +418,30 @@ public:
void send_response();
};
+class RGWConfigBucketMetaSearch_ObjStore_S3 : public RGWConfigBucketMetaSearch {
+public:
+ RGWConfigBucketMetaSearch_ObjStore_S3() {}
+ ~RGWConfigBucketMetaSearch_ObjStore_S3() {}
+
+ int get_params() override;
+ void send_response() override;
+};
+
+class RGWGetBucketMetaSearch_ObjStore_S3 : public RGWGetBucketMetaSearch {
+public:
+ RGWGetBucketMetaSearch_ObjStore_S3() {}
+ ~RGWGetBucketMetaSearch_ObjStore_S3() {}
+
+ void send_response() override;
+};
+
+class RGWDelBucketMetaSearch_ObjStore_S3 : public RGWDelBucketMetaSearch {
+public:
+ RGWDelBucketMetaSearch_ObjStore_S3() {}
+ ~RGWDelBucketMetaSearch_ObjStore_S3() {}
+
+ void send_response() override;
+};
class RGW_Auth_S3 {
private:
diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc
index 9d8d250132b..a9ef1ac784f 100644
--- a/src/rgw/rgw_sync.cc
+++ b/src/rgw/rgw_sync.cc
@@ -1508,10 +1508,12 @@ public:
collect_children();
} while ((int)entries.size() == max_entries && can_adjust_marker);
+ldout(cct, 0) << __FILE__ << ":" << __LINE__ << ":" << *this << ": num_spawned()=" << num_spawned() << dendl;
while (num_spawned() > 1) {
yield wait_for_child();
collect_children();
}
+ldout(cct, 0) << __FILE__ << ":" << __LINE__ << ":" << *this << ": num_spawned()=" << num_spawned() << dendl;
if (!lost_lock) {
/* update marker to reflect we're done with full sync */
diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h
index f6f070b1c36..278d1df3893 100644
--- a/src/rgw/rgw_sync_module.h
+++ b/src/rgw/rgw_sync_module.h
@@ -16,6 +16,12 @@ public:
RGWDataSyncModule() {}
virtual ~RGWDataSyncModule() {}
+ virtual void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {}
+
+ virtual RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) {
+ return nullptr;
+ }
+
virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
@@ -23,11 +29,16 @@ public:
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
};
+class RGWRESTMgr;
+
class RGWSyncModuleInstance {
public:
RGWSyncModuleInstance() {}
virtual ~RGWSyncModuleInstance() {}
virtual RGWDataSyncModule *get_data_handler() = 0;
+ virtual RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) {
+ return orig;
+ }
};
typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
@@ -39,7 +50,7 @@ public:
virtual ~RGWSyncModule() {}
virtual bool supports_data_export() = 0;
- virtual int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) = 0;
+ virtual int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) = 0;
};
typedef std::shared_ptr<RGWSyncModule> RGWSyncModuleRef;
@@ -80,7 +91,7 @@ public:
return module.get()->supports_data_export();
}
- int create_instance(CephContext *cct, const string& name, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+ int create_instance(CephContext *cct, const string& name, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) {
RGWSyncModuleRef module;
if (!get_module(name, &module)) {
return -ENOENT;
diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc
index d500e9b937b..443057f8115 100644
--- a/src/rgw/rgw_sync_module_es.cc
+++ b/src/rgw/rgw_sync_module_es.cc
@@ -4,34 +4,258 @@
#include "rgw_data_sync.h"
#include "rgw_boost_asio_yield.h"
#include "rgw_sync_module_es.h"
+#include "rgw_sync_module_es_rest.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rest.h"
+#include "rgw_op.h"
+#include "rgw_es_query.h"
+
+#include "include/str_list.h"
#define dout_subsys ceph_subsys_rgw
+
+/*
+ * whitelist utility. Config string is a list of entries, where an entry is either an item,
+ * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
+ * a prefix would be a string ending with an asterisk, a suffix would be a string starting
+ * with an asterisk. For example:
+ *
+ * bucket1, bucket2, foo*, *bar
+ */
+class ItemList {
+ bool approve_all{false};
+
+ set<string> entries;
+ set<string> prefixes;
+ set<string> suffixes;
+
+ void parse(const string& str) {
+ list<string> l;
+
+ get_str_list(str, ",", l);
+
+ for (auto& entry : l) {
+ entry = rgw_trim_whitespace(entry);
+ if (entry.empty()) {
+ continue;
+ }
+
+ if (entry == "*") {
+ approve_all = true;
+ return;
+ }
+
+ if (entry[0] == '*') {
+ suffixes.insert(entry.substr(1));
+ continue;
+ }
+
+ if (entry.back() == '*') {
+ prefixes.insert(entry.substr(0, entry.size() - 1));
+ continue;
+ }
+
+ entries.insert(entry);
+ }
+ }
+
+public:
+ ItemList() {}
+ void init(const string& str, bool def_val) {
+ if (str.empty()) {
+ approve_all = def_val;
+ } else {
+ parse(str);
+ }
+ }
+
+ bool exists(const string& entry) {
+ if (approve_all) {
+ return true;
+ }
+
+ if (entries.find(entry) != entries.end()) {
+ return true;
+ }
+
+ auto i = prefixes.upper_bound(entry);
+ if (i != prefixes.begin()) {
+ --i;
+ if (boost::algorithm::starts_with(entry, *i)) {
+ return true;
+ }
+ }
+
+ for (i = suffixes.begin(); i != suffixes.end(); ++i) {
+ if (boost::algorithm::ends_with(entry, *i)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+};
+
+#define ES_NUM_SHARDS_MIN 5
+
+#define ES_NUM_SHARDS_DEFAULT 16
+#define ES_NUM_REPLICAS_DEFAULT 1
+
struct ElasticConfig {
+ uint64_t sync_instance{0};
string id;
- RGWRESTConn *conn{nullptr};
+ string index_path;
+ std::unique_ptr<RGWRESTConn> conn;
+ bool explicit_custom_meta{true};
+ string override_index_path;
+ ItemList index_buckets;
+ ItemList allow_owners;
+ uint32_t num_shards{0};
+ uint32_t num_replicas{0};
+
+ void init(CephContext *cct, const map<string, string, ltstr_nocase>& config) {
+ string elastic_endpoint = rgw_conf_get(config, "endpoint", "");
+ id = string("elastic:") + elastic_endpoint;
+ conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint }));
+ explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true);
+ index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */
+ allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */
+ override_index_path = rgw_conf_get(config, "override_index_path", "");
+ num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT);
+ if (num_shards < ES_NUM_SHARDS_MIN) {
+ num_shards = ES_NUM_SHARDS_MIN;
+ }
+ num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT);
+ }
+
+ void init_instance(RGWRealm& realm, uint64_t instance_id) {
+ sync_instance = instance_id;
+
+ if (!override_index_path.empty()) {
+ index_path = override_index_path;
+ return;
+ }
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
+
+ index_path = "/rgw-" + realm.get_name() + buf;
+ }
+
+ string get_index_path() {
+ return index_path;
+ }
+
+ string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
+ return index_path + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance);
+ }
+
+ bool should_handle_operation(RGWBucketInfo& bucket_info) {
+ return index_buckets.exists(bucket_info.bucket.name) &&
+ allow_owners.exists(bucket_info.owner.to_str());
+ }
};
-static string es_get_obj_path(const RGWRealm& realm, const RGWBucketInfo& bucket_info, const rgw_obj_key& key)
-{
- string path = "/rgw-" + realm.get_name() + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance;
- return path;
-}
+using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
+
+struct es_dump_type {
+ const char *type;
+ const char *format;
+ bool analyzed;
+
+ es_dump_type(const char *t, const char *f = nullptr, bool a = false) : type(t), format(f), analyzed(a) {}
+
+ void dump(Formatter *f) const {
+ encode_json("type", type, f);
+ if (format) {
+ encode_json("format", format, f);
+ }
+ if (!analyzed && strcmp(type, "string") == 0) {
+ encode_json("index", "not_analyzed", f);
+ }
+ }
+};
+
+struct es_index_mappings {
+ void dump_custom(Formatter *f, const char *section, const char *type, const char *format) const {
+ f->open_object_section(section);
+ ::encode_json("type", "nested", f);
+ f->open_object_section("properties");
+ encode_json("name", es_dump_type("string"), f);
+ encode_json("value", es_dump_type(type, format), f);
+ f->close_section(); // entry
+ f->close_section(); // custom-string
+ }
+ void dump(Formatter *f) const {
+ f->open_object_section("object");
+ f->open_object_section("properties");
+ encode_json("bucket", es_dump_type("string"), f);
+ encode_json("name", es_dump_type("string"), f);
+ encode_json("instance", es_dump_type("string"), f);
+ encode_json("versioned_epoch", es_dump_type("long"), f);
+ f->open_object_section("meta");
+ f->open_object_section("properties");
+ encode_json("cache_control", es_dump_type("string"), f);
+ encode_json("content_disposition", es_dump_type("string"), f);
+ encode_json("content_encoding", es_dump_type("string"), f);
+ encode_json("content_language", es_dump_type("string"), f);
+ encode_json("content_type", es_dump_type("string"), f);
+ encode_json("etag", es_dump_type("string"), f);
+ encode_json("expires", es_dump_type("string"), f);
+ f->open_object_section("mtime");
+ ::encode_json("type", "date", f);
+ ::encode_json("format", "strict_date_optional_time||epoch_millis", f);
+ f->close_section(); // mtime
+ encode_json("size", es_dump_type("long"), f);
+ dump_custom(f, "custom-string", "string", nullptr);
+ dump_custom(f, "custom-int", "long", nullptr);
+ dump_custom(f, "custom-date", "date", "strict_date_optional_time||epoch_millis");
+ f->close_section(); // properties
+ f->close_section(); // meta
+ f->close_section(); // properties
+ f->close_section(); // object
+ }
+};
+
+struct es_index_settings {
+ uint32_t num_replicas;
+ uint32_t num_shards;
+
+ es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
+
+ void dump(Formatter *f) const {
+ encode_json("number_of_replicas", num_replicas, f);
+ encode_json("number_of_shards", num_shards, f);
+ }
+};
+
+struct es_index_config {
+ es_index_settings settings;
+ es_index_mappings mappings;
+
+ es_index_config(es_index_settings& _s, es_index_mappings& _m) : settings(_s), mappings(_m) {}
+
+ void dump(Formatter *f) const {
+ encode_json("settings", settings, f);
+ encode_json("mappings", mappings, f);
+ }
+};
struct es_obj_metadata {
CephContext *cct;
+ ElasticConfigRef es_conf;
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
uint64_t size;
map<string, bufferlist> attrs;
+ uint64_t versioned_epoch;
- es_obj_metadata(CephContext *_cct, const RGWBucketInfo& _bucket_info,
+ es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
- map<string, bufferlist>& _attrs) : cct(_cct), bucket_info(_bucket_info), key(_key),
- mtime(_mtime), size(_size), attrs(std::move(_attrs)) {}
+ map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
+ mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {}
void dump(Formatter *f) const {
map<string, string> out_attrs;
@@ -88,6 +312,7 @@ struct es_obj_metadata {
::encode_json("bucket", bucket_info.bucket.name, f);
::encode_json("name", key.name, f);
::encode_json("instance", key.instance, f);
+ ::encode_json("versioned_epoch", versioned_epoch, f);
::encode_json("owner", policy.get_owner(), f);
::encode_json("permissions", permissions, f);
f->open_object_section("meta");
@@ -99,34 +324,133 @@ struct es_obj_metadata {
for (auto i : out_attrs) {
::encode_json(i.first.c_str(), i.second, f);
}
- if (!custom_meta.empty()) {
- f->open_object_section("custom");
- for (auto i : custom_meta) {
- ::encode_json(i.first.c_str(), i.second, f);
+ map<string, string> custom_str;
+ map<string, string> custom_int;
+ map<string, string> custom_date;
+
+ for (auto i : custom_meta) {
+ auto config = bucket_info.mdsearch_config.find(i.first);
+ if (config == bucket_info.mdsearch_config.end()) {
+ if (!es_conf->explicit_custom_meta) {
+ /* default custom meta is of type string */
+ custom_str[i.first] = i.second;
+ } else {
+ ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
+ }
+ continue;
+ }
+ switch (config->second) {
+ case ESEntityTypeMap::ES_ENTITY_DATE:
+ custom_date[i.first] = i.second;
+ break;
+ case ESEntityTypeMap::ES_ENTITY_INT:
+ custom_int[i.first] = i.second;
+ break;
+ default:
+ custom_str[i.first] = i.second;
+ }
+ }
+
+ if (!custom_str.empty()) {
+ f->open_array_section("custom-string");
+ for (auto i : custom_str) {
+ f->open_object_section("entity");
+ ::encode_json("name", i.first.c_str(), f);
+ ::encode_json("value", i.second, f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ if (!custom_int.empty()) {
+ f->open_array_section("custom-int");
+ for (auto i : custom_int) {
+ f->open_object_section("entity");
+ ::encode_json("name", i.first.c_str(), f);
+ ::encode_json("value", i.second, f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ if (!custom_date.empty()) {
+ f->open_array_section("custom-date");
+ for (auto i : custom_date) {
+ /*
+ * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
+ * which will end up with failed sync
+ */
+ real_time t;
+ int r = parse_time(i.second.c_str(), &t);
+ if (r < 0) {
+ ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
+ continue;
+ }
+
+ string time_str;
+ rgw_to_iso8601(t, &time_str);
+
+ f->open_object_section("entity");
+ ::encode_json("name", i.first.c_str(), f);
+ ::encode_json("value", time_str.c_str(), f);
+ f->close_section();
}
f->close_section();
}
f->close_section();
}
+};
+
+class RGWElasticInitConfigCBCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ ElasticConfigRef conf;
+public:
+ RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
+ ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ conf(_conf) {}
+ int operate() override {
+ reenter(this) {
+ ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
+ yield {
+ string path = conf->get_index_path();
+
+ es_index_settings settings(conf->num_replicas, conf->num_shards);
+ es_index_mappings mappings;
+
+ es_index_config index_conf(settings, mappings);
+
+ call(new RGWPutRESTResourceCR<es_index_config, int>(sync_env->cct, conf->conn.get(),
+ sync_env->http_manager,
+ path, nullptr /* params */,
+ index_conf, nullptr /* result */));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
};
class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
- const ElasticConfig& conf;
+ ElasticConfigRef conf;
+ uint64_t versioned_epoch;
public:
RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {}
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+ versioned_epoch(_versioned_epoch) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
- << " attrs=" << attrs << dendl;
+ ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
+ << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " attrs=" << attrs << dendl;
yield {
- string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
- es_obj_metadata doc(sync_env->cct, bucket_info, key, mtime, size, attrs);
+ string path = conf->get_obj_path(bucket_info, key);
+ es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
- call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf.conn,
+ call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
path, nullptr /* params */,
doc, nullptr /* result */));
@@ -139,22 +463,22 @@ public:
}
return 0;
}
-
};
class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
- const ElasticConfig& conf;
+ ElasticConfigRef conf;
+ uint64_t versioned_epoch;
public:
RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
- conf(_conf) {
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ conf(_conf), versioned_epoch(_versioned_epoch) {
}
~RGWElasticHandleRemoteObjCR() override {}
RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf);
+ return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
}
};
@@ -163,21 +487,21 @@ class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
- const ElasticConfig& conf;
+ ElasticConfigRef conf;
public:
RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
- const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_info(_bucket_info), key(_key),
mtime(_mtime), conf(_conf) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
+ << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
+ string path = conf->get_obj_path(bucket_info, key);
- call(new RGWDeleteRESTResourceCR(sync_env->cct, conf.conn,
+ call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
path, nullptr /* params */));
}
@@ -192,49 +516,88 @@ public:
};
class RGWElasticDataSyncModule : public RGWDataSyncModule {
- ElasticConfig conf;
+ ElasticConfigRef conf;
public:
- RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) {
- conf.id = string("elastic:") + elastic_endpoint;
- conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint });
+ RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) {
+ conf->init(cct, config);
}
- ~RGWElasticDataSyncModule() override {
- delete conf.conn;
+ ~RGWElasticDataSyncModule() override {}
+
+ void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
+ conf->init_instance(sync_env->store->get_realm(), instance_id);
}
+ RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
+ ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
+ return new RGWElasticInitConfigCBCR(sync_env, conf);
+ }
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf);
+ ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+ if (!conf->should_handle_operation(bucket_info)) {
+ ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+ return nullptr;
+ }
+ return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch);
}
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
- ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ if (!conf->should_handle_operation(bucket_info)) {
+ ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+ return nullptr;
+ }
return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
return NULL;
}
-};
+ RGWRESTConn *get_rest_conn() {
+ return conf->conn.get();
+ }
-class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance {
- RGWElasticDataSyncModule data_handler;
-public:
- RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) : data_handler(cct, endpoint) {}
- RGWDataSyncModule *get_data_handler() override {
- return &data_handler;
+ string get_index_path() {
+ return conf->get_index_path();
}
};
-int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config)
+{
+ data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
+}
+
+RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
+{
+ return data_handler.get();
+}
+
+RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
+{
+ return data_handler->get_rest_conn();
+}
+
+string RGWElasticSyncModuleInstance::get_index_path() {
+ return data_handler->get_index_path();
+}
+
+RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
+ if (dialect != RGW_REST_S3) {
+ return orig;
+ }
+ delete orig;
+ return new RGWRESTMgr_MDSearch_S3();
+}
+
+int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) {
string endpoint;
auto i = config.find("endpoint");
if (i != config.end()) {
endpoint = i->second;
}
- instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint));
+ instance->reset(new RGWElasticSyncModuleInstance(cct, config));
return 0;
}
diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h
index 73c8368571d..43e591e42fc 100644
--- a/src/rgw/rgw_sync_module_es.h
+++ b/src/rgw/rgw_sync_module_es.h
@@ -9,7 +9,20 @@ public:
bool supports_data_export() override {
return false;
}
- int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
+};
+
+class RGWElasticDataSyncModule;
+class RGWRESTConn;
+
+class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance {
+ std::unique_ptr<RGWElasticDataSyncModule> data_handler;
+public:
+ RGWElasticSyncModuleInstance(CephContext *cct, const std::map<std::string, std::string, ltstr_nocase>& config);
+ RGWDataSyncModule *get_data_handler() override;
+ RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
+ RGWRESTConn *get_rest_conn();
+ std::string get_index_path();
};
#endif
diff --git a/src/rgw/rgw_sync_module_es_rest.cc b/src/rgw/rgw_sync_module_es_rest.cc
new file mode 100644
index 00000000000..200335ffa63
--- /dev/null
+++ b/src/rgw/rgw_sync_module_es_rest.cc
@@ -0,0 +1,412 @@
+#include "rgw_sync_module_es.h"
+#include "rgw_sync_module_es_rest.h"
+#include "rgw_es_query.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+#include "rgw_rest_s3.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+struct es_index_obj_response {
+ string bucket;
+ rgw_obj_key key;
+ uint64_t versioned_epoch{0};
+ ACLOwner owner;
+ set<string> read_permissions;
+
+ struct {
+ uint64_t size{0};
+ ceph::real_time mtime;
+ string etag;
+ string content_type;
+ map<string, string> custom_str;
+ map<string, int64_t> custom_int;
+ map<string, string> custom_date;
+
+ template <class T>
+ struct _custom_entry {
+ string name;
+ T value;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("name", name, obj);
+ JSONDecoder::decode_json("value", value, obj);
+ }
+ };
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("size", size, obj);
+ string mtime_str;
+ JSONDecoder::decode_json("mtime", mtime_str, obj);
+ parse_time(mtime_str.c_str(), &mtime);
+ JSONDecoder::decode_json("etag", etag, obj);
+ JSONDecoder::decode_json("content_type", content_type, obj);
+ list<_custom_entry<string> > str_entries;
+ JSONDecoder::decode_json("custom-string", str_entries, obj);
+ for (auto& e : str_entries) {
+ custom_str[e.name] = e.value;
+ }
+ list<_custom_entry<int64_t> > int_entries;
+ JSONDecoder::decode_json("custom-int", int_entries, obj);
+ for (auto& e : int_entries) {
+ custom_int[e.name] = e.value;
+ }
+ list<_custom_entry<string> > date_entries;
+ JSONDecoder::decode_json("custom-date", date_entries, obj);
+ for (auto& e : date_entries) {
+ custom_date[e.name] = e.value;
+ }
+ }
+ } meta;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("bucket", bucket, obj);
+ JSONDecoder::decode_json("name", key.name, obj);
+ JSONDecoder::decode_json("instance", key.instance, obj);
+ JSONDecoder::decode_json("versioned_epoch", versioned_epoch, obj);
+ JSONDecoder::decode_json("permissions", read_permissions, obj);
+ JSONDecoder::decode_json("owner", owner, obj);
+ JSONDecoder::decode_json("meta", meta, obj);
+ }
+};
+
+struct es_search_response {
+ uint32_t took;
+ bool timed_out;
+ struct {
+ uint32_t total;
+ uint32_t successful;
+ uint32_t failed;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("total", total, obj);
+ JSONDecoder::decode_json("successful", successful, obj);
+ JSONDecoder::decode_json("failed", failed, obj);
+ }
+ } shards;
+ struct obj_hit {
+ string index;
+ string type;
+ string id;
+ // double score
+ es_index_obj_response source;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("_index", index, obj);
+ JSONDecoder::decode_json("_type", type, obj);
+ JSONDecoder::decode_json("_id", id, obj);
+ JSONDecoder::decode_json("_source", source, obj);
+ }
+ };
+ struct {
+ uint32_t total;
+ // double max_score;
+ list<obj_hit> hits;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("total", total, obj);
+ // JSONDecoder::decode_json("max_score", max_score, obj);
+ JSONDecoder::decode_json("hits", hits, obj);
+ }
+ } hits;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("took", took, obj);
+ JSONDecoder::decode_json("timed_out", timed_out, obj);
+ JSONDecoder::decode_json("_shards", shards, obj);
+ JSONDecoder::decode_json("hits", hits, obj);
+ }
+};
+
+class RGWMetadataSearchOp : public RGWOp {
+ RGWSyncModuleInstanceRef sync_module_ref;
+ RGWElasticSyncModuleInstance *es_module;
+protected:
+ string expression;
+ string custom_prefix;
+#define MAX_KEYS_DEFAULT 100
+ uint64_t max_keys{MAX_KEYS_DEFAULT};
+ string marker_str;
+ uint64_t marker{0};
+ string next_marker;
+ bool is_truncated{false};
+ string err;
+
+ es_search_response response;
+
+public:
+ RGWMetadataSearchOp(const RGWSyncModuleInstanceRef& sync_module) : sync_module_ref(sync_module) {
+ es_module = static_cast<RGWElasticSyncModuleInstance *>(sync_module_ref.get());
+ }
+
+ int verify_permission() {
+ return 0;
+ }
+ virtual int get_params() = 0;
+ void pre_exec();
+ void execute();
+
+ virtual void send_response() = 0;
+ virtual const string name() { return "metadata_search"; }
+ virtual RGWOpType get_type() { return RGW_OP_METADATA_SEARCH; }
+ virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; }
+};
+
+void RGWMetadataSearchOp::pre_exec()
+{
+ rgw_bucket_object_pre_exec(s);
+}
+
+void RGWMetadataSearchOp::execute()
+{
+ op_ret = get_params();
+ if (op_ret < 0)
+ return;
+
+ list<pair<string, string> > conds;
+
+ if (!s->user->system) {
+ conds.push_back(make_pair("permissions", s->user->user_id.to_str()));
+ }
+
+ if (!s->bucket_name.empty()) {
+ conds.push_back(make_pair("bucket", s->bucket_name));
+ }
+
+ ESQueryCompiler es_query(expression, &conds, custom_prefix);
+
+ static map<string, string, ltstr_nocase> aliases = {
+ { "bucket", "bucket" }, /* forces lowercase */
+ { "name", "name" },
+ { "key", "name" },
+ { "instance", "instance" },
+ { "etag", "meta.etag" },
+ { "size", "meta.size" },
+ { "mtime", "meta.mtime" },
+ { "lastmodified", "meta.mtime" },
+ { "contenttype", "meta.contenttype" },
+ };
+ es_query.set_field_aliases(&aliases);
+
+ static map<string, ESEntityTypeMap::EntityType> generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR},
+ {"name", ESEntityTypeMap::ES_ENTITY_STR},
+ {"instance", ESEntityTypeMap::ES_ENTITY_STR},
+ {"permissions", ESEntityTypeMap::ES_ENTITY_STR},
+ {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR},
+ {"meta.contenttype", ESEntityTypeMap::ES_ENTITY_STR},
+ {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE},
+ {"meta.size", ESEntityTypeMap::ES_ENTITY_INT} };
+ ESEntityTypeMap gm(generic_map);
+ es_query.set_generic_type_map(&gm);
+
+ static set<string> restricted_fields = { {"permissions"} };
+ es_query.set_restricted_fields(&restricted_fields);
+
+ map<string, ESEntityTypeMap::EntityType> custom_map;
+ for (auto& i : s->bucket_info.mdsearch_config) {
+ custom_map[i.first] = (ESEntityTypeMap::EntityType)i.second;
+ }
+
+ ESEntityTypeMap em(custom_map);
+ es_query.set_custom_type_map(&em);
+
+ bool valid = es_query.compile(&err);
+ if (!valid) {
+ ldout(s->cct, 10) << "invalid query, failed generating request json" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ JSONFormatter f;
+ encode_json("root", es_query, &f);
+
+ RGWRESTConn *conn = es_module->get_rest_conn();
+
+ bufferlist in;
+ bufferlist out;
+
+ stringstream ss;
+
+ f.flush(ss);
+ in.append(ss.str());
+
+ string resource = es_module->get_index_path() + "/_search";
+ param_vec_t params;
+ static constexpr int BUFSIZE = 32;
+ char buf[BUFSIZE];
+ snprintf(buf, sizeof(buf), "%lld", (long long)max_keys);
+ params.push_back(param_pair_t("size", buf));
+ if (marker > 0) {
+ params.push_back(param_pair_t("from", marker_str.c_str()));
+ }
+ ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl;
+ op_ret = conn->get_resource(resource, &params, nullptr, out, &in);
+ if (op_ret < 0) {
+ ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl;
+ return;
+ }
+
+ ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl;
+
+ JSONParser jparser;
+ if (!jparser.parse(out.c_str(), out.length())) {
+ ldout(s->cct, 0) << "ERROR: failed to parse elasticsearch response" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ try {
+ decode_json_obj(response, &jparser);
+ } catch (JSONDecoder::err& e) {
+ ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+}
+
+class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp {
+public:
+ RGWMetadataSearch_ObjStore_S3(const RGWSyncModuleInstanceRef& _sync_module) : RGWMetadataSearchOp(_sync_module) {
+ custom_prefix = "x-amz-meta-";
+ }
+
+ int get_params() override {
+ expression = s->info.args.get("query");
+ bool exists;
+ string max_keys_str = s->info.args.get("max-keys", &exists);
+#define MAX_KEYS_MAX 10000
+ if (exists) {
+ string err;
+ max_keys = strict_strtoll(max_keys_str.c_str(), 10, &err);
+ if (!err.empty()) {
+ return -EINVAL;
+ }
+ if (max_keys > MAX_KEYS_MAX) {
+ max_keys = MAX_KEYS_MAX;
+ }
+ }
+ marker_str = s->info.args.get("marker", &exists);
+ if (exists) {
+ string err;
+ marker = strict_strtoll(marker_str.c_str(), 10, &err);
+ if (!err.empty()) {
+ return -EINVAL;
+ }
+ }
+ uint64_t nm = marker + max_keys;
+ static constexpr int BUFSIZE = 32;
+ char buf[BUFSIZE];
+ snprintf(buf, sizeof(buf), "%lld", (long long)nm);
+ next_marker = buf;
+ return 0;
+ }
+ void send_response() override {
+ if (op_ret) {
+ s->err.message = err;
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ is_truncated = (response.hits.hits.size() >= max_keys);
+
+ s->formatter->open_object_section("SearchMetadataResponse");
+ s->formatter->dump_string("Marker", marker_str);
+ s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false"));
+ if (is_truncated) {
+ s->formatter->dump_string("NextMarker", next_marker);
+ }
+ if (s->format == RGW_FORMAT_JSON) {
+ s->formatter->open_array_section("Objects");
+ }
+ for (auto& i : response.hits.hits) {
+ s->formatter->open_object_section("Contents");
+ es_index_obj_response& e = i.source;
+ s->formatter->dump_string("Bucket", e.bucket);
+ s->formatter->dump_string("Key", e.key.name);
+ string instance = (!e.key.instance.empty() ? e.key.instance : "null");
+ s->formatter->dump_string("Instance", instance.c_str());
+ s->formatter->dump_int("VersionedEpoch", e.versioned_epoch);
+ dump_time(s, "LastModified", &e.meta.mtime);
+ s->formatter->dump_int("Size", e.meta.size);
+ s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str());
+ s->formatter->dump_string("ContentType", e.meta.content_type.c_str());
+ dump_owner(s, e.owner.get_id(), e.owner.get_display_name());
+ s->formatter->open_array_section("CustomMetadata");
+ for (auto& m : e.meta.custom_str) {
+ s->formatter->open_object_section("Entry");
+ s->formatter->dump_string("Name", m.first.c_str());
+ s->formatter->dump_string("Value", m.second);
+ s->formatter->close_section();
+ }
+ for (auto& m : e.meta.custom_int) {
+ s->formatter->open_object_section("Entry");
+ s->formatter->dump_string("Name", m.first.c_str());
+ s->formatter->dump_int("Value", m.second);
+ s->formatter->close_section();
+ }
+ for (auto& m : e.meta.custom_date) {
+ s->formatter->open_object_section("Entry");
+ s->formatter->dump_string("Name", m.first.c_str());
+ s->formatter->dump_string("Value", m.second);
+ s->formatter->close_section();
+ }
+ s->formatter->close_section();
+ rgw_flush_formatter(s, s->formatter);
+ s->formatter->close_section();
+ };
+ if (s->format == RGW_FORMAT_JSON) {
+ s->formatter->close_section();
+ }
+ s->formatter->close_section();
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+};
+
+class RGWHandler_REST_MDSearch_S3 : public RGWHandler_REST_S3 {
+protected:
+ RGWOp *op_get() {
+ if (s->info.args.exists("query")) {
+ return new RGWMetadataSearch_ObjStore_S3(store->get_sync_module());
+ }
+ if (!s->init_state.url_bucket.empty() &&
+ s->info.args.exists("mdsearch")) {
+ return new RGWGetBucketMetaSearch_ObjStore_S3;
+ }
+ return nullptr;
+ }
+ RGWOp *op_head() {
+ return nullptr;
+ }
+ RGWOp *op_post() {
+ return nullptr;
+ }
+public:
+ RGWHandler_REST_MDSearch_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_MDSearch_S3() {}
+};
+
+
+RGWHandler_REST* RGWRESTMgr_MDSearch_S3::get_handler(struct req_state* const s,
+ const rgw::auth::StrategyRegistry& auth_registry,
+ const std::string& frontend_prefix)
+{
+ int ret =
+ RGWHandler_REST_S3::init_from_header(s,
+ RGW_FORMAT_XML, true);
+ if (ret < 0) {
+ return nullptr;
+ }
+
+ if (!s->object.empty()) {
+ return nullptr;
+ }
+
+ RGWHandler_REST *handler = new RGWHandler_REST_MDSearch_S3(auth_registry);
+
+ ldout(s->cct, 20) << __func__ << " handler=" << typeid(*handler).name()
+ << dendl;
+ return handler;
+}
+
diff --git a/src/rgw/rgw_sync_module_es_rest.h b/src/rgw/rgw_sync_module_es_rest.h
new file mode 100644
index 00000000000..4dd0698f235
--- /dev/null
+++ b/src/rgw/rgw_sync_module_es_rest.h
@@ -0,0 +1,17 @@
+#ifndef CEPH_RGW_SYNC_MODULE_ES_REST_H
+#define CEPH_RGW_SYNC_MODULE_ES_REST_H
+
+#include "rgw_rest.h"
+
+class RGWElasticSyncModuleInstance;
+
+class RGWRESTMgr_MDSearch_S3 : public RGWRESTMgr {
+public:
+ explicit RGWRESTMgr_MDSearch_S3() {}
+
+ RGWHandler_REST *get_handler(struct req_state* s,
+ const rgw::auth::StrategyRegistry& auth_registry,
+ const std::string& frontend_prefix) override;
+};
+
+#endif
diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc
index f253d707429..67212ec593a 100644
--- a/src/rgw/rgw_sync_module_log.cc
+++ b/src/rgw/rgw_sync_module_log.cc
@@ -64,7 +64,7 @@ public:
}
};
-int RGWLogSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
+int RGWLogSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) {
string prefix;
auto i = config.find("prefix");
if (i != config.end()) {
diff --git a/src/rgw/rgw_sync_module_log.h b/src/rgw/rgw_sync_module_log.h
index 5b7bae6552d..9afc9108e67 100644
--- a/src/rgw/rgw_sync_module_log.h
+++ b/src/rgw/rgw_sync_module_log.h
@@ -9,7 +9,7 @@ public:
bool supports_data_export() override {
return false;
}
- int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
};
#endif
diff --git a/src/test/rgw/rgw_multi/conn.py b/src/test/rgw/rgw_multi/conn.py
new file mode 100644
index 00000000000..1099664df20
--- /dev/null
+++ b/src/test/rgw/rgw_multi/conn.py
@@ -0,0 +1,16 @@
+import boto
+import boto.s3.connection
+
+
+def get_gateway_connection(gateway, credentials):
+ """ connect to the given gateway """
+ if gateway.connection is None:
+ gateway.connection = boto.connect_s3(
+ aws_access_key_id = credentials.access_key,
+ aws_secret_access_key = credentials.secret,
+ host = gateway.host,
+ port = gateway.port,
+ is_secure = False,
+ calling_format = boto.s3.connection.OrdinaryCallingFormat())
+ return gateway.connection
+
diff --git a/src/test/rgw/rgw_multi/multisite.py b/src/test/rgw/rgw_multi/multisite.py
index 2d392cf8893..58bd98224b1 100644
--- a/src/test/rgw/rgw_multi/multisite.py
+++ b/src/test/rgw/rgw_multi/multisite.py
@@ -2,6 +2,8 @@ from abc import ABCMeta, abstractmethod
from cStringIO import StringIO
import json
+from conn import get_gateway_connection
+
class Cluster:
""" interface to run commands against a distinct ceph cluster """
__metaclass__ = ABCMeta
@@ -154,6 +156,40 @@ class Zone(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemO
def realm(self):
return self.zonegroup.realm() if self.zonegroup else None
+ def is_read_only(self):
+ return False
+
+ def tier_type(self):
+ raise NotImplementedError
+
+ def has_buckets(self):
+ return True
+
+ def get_conn(self, credentials):
+ return ZoneConn(self, credentials) # not implemented, but can be used
+
+class ZoneConn(object):
+ def __init__(self, zone, credentials):
+ self.zone = zone
+ self.name = zone.name
+ """ connect to the zone's first gateway """
+ if isinstance(credentials, list):
+ self.credentials = credentials[0]
+ else:
+ self.credentials = credentials
+
+ if self.zone.gateways is not None:
+ self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials)
+
+ def get_connection(self):
+ return self.conn
+
+ def get_bucket(self, bucket_name, credentials):
+ raise NotImplementedError
+
+ def check_bucket_eq(self, zone, bucket_name):
+ raise NotImplementedError
+
class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify):
def __init__(self, name, period = None, data = None, zonegroup_id = None, zones = None, master_zone = None):
self.name = name
@@ -161,6 +197,14 @@ class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, Sy
self.zones = zones or []
self.master_zone = master_zone
super(ZoneGroup, self).__init__(data, zonegroup_id)
+ self.rw_zones = []
+ self.ro_zones = []
+ self.zones_by_type = {}
+ for z in self.zones:
+ if z.is_read_only():
+ self.ro_zones.append(z)
+ else:
+ self.rw_zones.append(z)
def zonegroup_arg(self):
""" command-line argument to specify this zonegroup """
diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py
index 3633b22a336..fff682d37e4 100644
--- a/src/test/rgw/rgw_multi/tests.py
+++ b/src/test/rgw/rgw_multi/tests.py
@@ -4,6 +4,7 @@ import string
import sys
import time
import logging
+
try:
from itertools import izip_longest as zip_longest
except ImportError:
@@ -19,6 +20,8 @@ from nose.plugins.skip import SkipTest
from .multisite import Zone
+from .conn import get_gateway_connection
+
class Config:
""" test configuration """
def __init__(self, **kwargs):
@@ -41,6 +44,10 @@ def init_multi(_realm, _user, _config=None):
user = _user
global config
config = _config or Config()
+ realm_meta_checkpoint(realm)
+
+def get_realm():
+ return realm
log = logging.getLogger(__name__)
@@ -73,6 +80,15 @@ def mdlog_list(zone, period = None):
mdlog_json = mdlog_json.decode('utf-8')
return json.loads(mdlog_json)
+def meta_sync_status(zone):
+ while True:
+ cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
+ meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ if retcode == 0:
+ break
+ assert(retcode == 2) # ENOENT
+ time.sleep(5)
+
def mdlog_autotrim(zone):
zone.cluster.admin(['mdlog', 'autotrim'])
@@ -316,7 +332,7 @@ def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, syn
return True
-def zone_data_checkpoint(target_zone, source_zone):
+def zone_data_checkpoint(target_zone, source_zone_conn):
if target_zone == source_zone:
return
@@ -375,22 +391,44 @@ def gen_bucket_name():
num_buckets += 1
return run_prefix + '-' + str(num_buckets)
-def check_all_buckets_exist(zone, buckets):
- conn = get_zone_connection(zone, user.credentials)
+class ZonegroupConns:
+ def __init__(self, zonegroup):
+ self.zonegroup = zonegroup
+ self.zones = []
+ self.ro_zones = []
+ self.rw_zones = []
+ self.master_zone = None
+ for z in zonegroup.zones:
+ zone_conn = z.get_conn(user.credentials)
+ self.zones.append(zone_conn)
+ if z.is_read_only():
+ self.ro_zones.append(zone_conn)
+ else:
+ self.rw_zones.append(zone_conn)
+
+ if z == zonegroup.master_zone:
+ self.master_zone = zone_conn
+
+def check_all_buckets_exist(zone_conn, buckets):
+ if not zone_conn.zone.has_buckets():
+ return True
+
for b in buckets:
try:
- conn.get_bucket(b)
+ zone_conn.get_bucket(b)
except:
log.critical('zone %s does not contain bucket %s', zone.name, b)
return False
return True
-def check_all_buckets_dont_exist(zone, buckets):
- conn = get_zone_connection(zone, user.credentials)
+def check_all_buckets_dont_exist(zone_conn, buckets):
+ if not zone_conn.zone.has_buckets():
+ return True
+
for b in buckets:
try:
- conn.get_bucket(b)
+ zone_conn.get_bucket(b)
except:
continue
@@ -399,78 +437,80 @@ def check_all_buckets_dont_exist(zone, buckets):
return True
-def create_bucket_per_zone(zonegroup):
+def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
buckets = []
- zone_bucket = {}
- for zone in zonegroup.zones:
- conn = get_zone_connection(zone, user.credentials)
- bucket_name = gen_bucket_name()
- log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
- bucket = conn.create_bucket(bucket_name)
- buckets.append(bucket_name)
- zone_bucket[zone] = bucket
+ zone_bucket = []
+ for zone in zonegroup_conns.rw_zones:
+ for i in xrange(buckets_per_zone):
+ bucket_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
+ bucket = zone.create_bucket(bucket_name)
+ buckets.append(bucket_name)
+ zone_bucket.append((zone, bucket))
return buckets, zone_bucket
def create_bucket_per_zone_in_realm():
buckets = []
- zone_bucket = {}
+ zone_bucket = []
for zonegroup in realm.current_period.zonegroups:
- b, z = create_bucket_per_zone(zonegroup)
+ zg_conn = ZonegroupConns(zonegroup)
+ b, z = create_bucket_per_zone(zg_conn)
buckets.extend(b)
- zone_bucket.update(z)
+ zone_bucket.extend(z)
return buckets, zone_bucket
def test_bucket_create():
zonegroup = realm.master_zonegroup()
- buckets, _ = create_bucket_per_zone(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, _ = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
- for zone in zonegroup.zones:
+ for zone in zonegroup_conns.zones:
assert check_all_buckets_exist(zone, buckets)
def test_bucket_recreate():
zonegroup = realm.master_zonegroup()
- buckets, _ = create_bucket_per_zone(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, _ = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
- for zone in zonegroup.zones:
+
+ for zone in zonegroup_conns.zones:
assert check_all_buckets_exist(zone, buckets)
# recreate buckets on all zones, make sure they weren't removed
- for zone in zonegroup.zones:
+ for zone in zonegroup_conns.rw_zones:
for bucket_name in buckets:
- conn = get_zone_connection(zone, user.credentials)
- bucket = conn.create_bucket(bucket_name)
+ bucket = zone.create_bucket(bucket_name)
- for zone in zonegroup.zones:
+ for zone in zonegroup_conns.zones:
assert check_all_buckets_exist(zone, buckets)
zonegroup_meta_checkpoint(zonegroup)
- for zone in zonegroup.zones:
+ for zone in zonegroup_conns.zones:
assert check_all_buckets_exist(zone, buckets)
def test_bucket_remove():
zonegroup = realm.master_zonegroup()
- buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
- for zone in zonegroup.zones:
+ for zone in zonegroup_conns.zones:
assert check_all_buckets_exist(zone, buckets)
- for zone, bucket_name in zone_bucket.items():
- conn = get_zone_connection(zone, user.credentials)
- conn.delete_bucket(bucket_name)
+ for zone, bucket_name in zone_bucket:
+ zone.conn.delete_bucket(bucket_name)
zonegroup_meta_checkpoint(zonegroup)
- for zone in zonegroup.zones:
+ for zone in zonegroup_conns.zones:
assert check_all_buckets_dont_exist(zone, buckets)
def get_bucket(zone, bucket_name):
- conn = get_zone_connection(zone, user.credentials)
- return conn.get_bucket(bucket_name)
+ return zone.conn.get_bucket(bucket_name)
def get_key(zone, bucket_name, obj_name):
b = get_bucket(zone, bucket_name)
@@ -480,115 +520,67 @@ def new_key(zone, bucket_name, obj_name):
b = get_bucket(zone, bucket_name)
return b.new_key(obj_name)
-def check_object_eq(k1, k2, check_extra = True):
- assert k1
- assert k2
- log.debug('comparing key name=%s', k1.name)
- eq(k1.name, k2.name)
- eq(k1.get_contents_as_string(), k2.get_contents_as_string())
- eq(k1.metadata, k2.metadata)
- eq(k1.cache_control, k2.cache_control)
- eq(k1.content_type, k2.content_type)
- eq(k1.content_encoding, k2.content_encoding)
- eq(k1.content_disposition, k2.content_disposition)
- eq(k1.content_language, k2.content_language)
- eq(k1.etag, k2.etag)
- eq(k1.last_modified, k2.last_modified)
- if check_extra:
- eq(k1.owner.id, k2.owner.id)
- eq(k1.owner.display_name, k2.owner.display_name)
- eq(k1.storage_class, k2.storage_class)
- eq(k1.size, k2.size)
- eq(k1.version_id, k2.version_id)
- eq(k1.encrypted, k2.encrypted)
-
-def check_bucket_eq(zone1, zone2, bucket_name):
- log.info('comparing bucket=%s zones={%s, %s}', bucket_name, zone1.name, zone2.name)
- b1 = get_bucket(zone1, bucket_name)
- b2 = get_bucket(zone2, bucket_name)
-
- log.debug('bucket1 objects:')
- for o in b1.get_all_versions():
- log.debug('o=%s', o.name)
- log.debug('bucket2 objects:')
- for o in b2.get_all_versions():
- log.debug('o=%s', o.name)
-
- for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
- if k1 is None:
- log.critical('key=%s is missing from zone=%s', k2.name, zone1.name)
- assert False
- if k2 is None:
- log.critical('key=%s is missing from zone=%s', k1.name, zone2.name)
- assert False
-
- check_object_eq(k1, k2)
-
- # now get the keys through a HEAD operation, verify that the available data is the same
- k1_head = b1.get_key(k1.name)
- k2_head = b2.get_key(k2.name)
-
- check_object_eq(k1_head, k2_head, False)
-
- log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, zone1.name, zone2.name)
-
+def check_bucket_eq(zone_conn1, zone_conn2, bucket):
+ return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
def test_object_sync():
zonegroup = realm.master_zonegroup()
- buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
objnames = [ 'myobj', '_myobj', ':', '&' ]
content = 'asdasd'
# don't wait for meta sync just yet
- for zone, bucket_name in zone_bucket.items():
+ for zone, bucket_name in zone_bucket:
for objname in objnames:
k = new_key(zone, bucket_name, objname)
k.set_contents_from_string(content)
zonegroup_meta_checkpoint(zonegroup)
- for source_zone, bucket in zone_bucket.items():
- for target_zone in zonegroup.zones:
- if source_zone == target_zone:
+ for source_conn, bucket in zone_bucket:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
continue
- zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket)
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+ check_bucket_eq(source_conn, target_conn, bucket)
def test_object_delete():
zonegroup = realm.master_zonegroup()
- buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
objname = 'myobj'
content = 'asdasd'
# don't wait for meta sync just yet
- for zone, bucket in zone_bucket.items():
+ for zone, bucket in zone_bucket:
k = new_key(zone, bucket, objname)
k.set_contents_from_string(content)
zonegroup_meta_checkpoint(zonegroup)
# check object exists
- for source_zone, bucket in zone_bucket.items():
- for target_zone in zonegroup.zones:
- if source_zone == target_zone:
+ for source_conn, bucket in zone_bucket:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
continue
- zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket)
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+ check_bucket_eq(source_conn, target_conn, bucket)
# check object removal
- for source_zone, bucket in zone_bucket.items():
- k = get_key(source_zone, bucket, objname)
+ for source_conn, bucket in zone_bucket:
+ k = get_key(source_conn, bucket, objname)
k.delete()
- for target_zone in zonegroup.zones:
- if source_zone == target_zone:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
continue
- zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket)
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+ check_bucket_eq(source_conn, target_conn, bucket)
def get_latest_object_version(key):
for k in key.bucket.list_versions(key.name):
@@ -598,28 +590,29 @@ def get_latest_object_version(key):
def test_versioned_object_incremental_sync():
zonegroup = realm.master_zonegroup()
- buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
# enable versioning
- for zone, bucket in zone_bucket.items():
+ for _, bucket in zone_bucket:
bucket.configure_versioning(True)
zonegroup_meta_checkpoint(zonegroup)
# upload a dummy object to each bucket and wait for sync. this forces each
# bucket to finish a full sync and switch to incremental
- for source_zone, bucket in zone_bucket.items():
- new_key(source_zone, bucket, 'dummy').set_contents_from_string('')
- for target_zone in zonegroup.zones:
- if source_zone == target_zone:
+ for source_conn, bucket in zone_bucket:
+ new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
continue
- zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
- for _, bucket in zone_bucket.items():
+ for _, bucket in zone_bucket:
# create and delete multiple versions of an object from each zone
- for zone in zonegroup.zones:
- obj = 'obj-' + zone.name
- k = new_key(zone, bucket, obj)
+ for zone_conn in zonegroup_conns.rw_zones:
+ obj = 'obj-' + zone_conn.name
+ k = new_key(zone_conn, bucket, obj)
k.set_contents_from_string('version1')
v = get_latest_object_version(k)
@@ -639,16 +632,16 @@ def test_versioned_object_incremental_sync():
log.debug('version3 id=%s', v.version_id)
k.bucket.delete_key(obj, version_id=v.version_id)
- for source_zone, bucket in zone_bucket.items():
- for target_zone in zonegroup.zones:
- if source_zone == target_zone:
+ for source_conn, bucket in zone_bucket:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
continue
- zone_bucket_checkpoint(target_zone, source_zone, bucket.name)
- check_bucket_eq(source_zone, target_zone, bucket)
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+ check_bucket_eq(source_conn, target_conn, bucket)
def test_bucket_versioning():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
- for zone, bucket in zone_bucket.items():
+ for _, bucket in zone_bucket:
bucket.configure_versioning(True)
res = bucket.get_versioning_status()
key = 'Versioning'
@@ -656,19 +649,20 @@ def test_bucket_versioning():
def test_bucket_acl():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
- for zone, bucket in zone_bucket.items():
+ for _, bucket in zone_bucket:
assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
bucket.set_acl('public-read')
assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
def test_bucket_delete_notempty():
zonegroup = realm.master_zonegroup()
- buckets, zone_bucket = create_bucket_per_zone(zonegroup)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
- for zone, bucket_name in zone_bucket.items():
+ for zone_conn, bucket_name in zone_bucket:
# upload an object to each bucket on its own zone
- conn = get_zone_connection(zone, user.credentials)
+ conn = zone_conn.get_connection()
bucket = conn.get_bucket(bucket_name)
k = bucket.new_key('foo')
k.set_contents_from_string('bar')
@@ -681,8 +675,8 @@ def test_bucket_delete_notempty():
assert False # expected 409 BucketNotEmpty
# assert that each bucket still exists on the master
- c1 = get_zone_connection(zonegroup.master_zone, user.credentials)
- for _, bucket_name in zone_bucket.items():
+ c1 = zonegroup_conns.master_zone.conn
+ for _, bucket_name in zone_bucket:
assert c1.get_bucket(bucket_name)
def test_multi_period_incremental_sync():
@@ -694,13 +688,8 @@ def test_multi_period_incremental_sync():
mdlog_periods = [realm.current_period.id]
# create a bucket in each zone
- buckets = []
- for zone in zonegroup.zones:
- conn = get_zone_connection(zone, user.credentials)
- bucket_name = gen_bucket_name()
- log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
- bucket = conn.create_bucket(bucket_name)
- buckets.append(bucket_name)
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
zonegroup_meta_checkpoint(zonegroup)
@@ -714,14 +703,12 @@ def test_multi_period_incremental_sync():
set_master_zone(z2)
mdlog_periods += [realm.current_period.id]
- # create another bucket in each zone, except for z3
- for zone in zonegroup.zones:
- if zone == z3:
+ for zone_conn, _ in zone_bucket:
+ if zone_conn.zone == z3:
continue
- conn = get_zone_connection(zone, user.credentials)
bucket_name = gen_bucket_name()
- log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
- bucket = conn.create_bucket(bucket_name)
+ log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
+ bucket = zone_conn.conn.create_bucket(bucket_name)
buckets.append(bucket_name)
# wait for zone 1 to sync
@@ -731,24 +718,26 @@ def test_multi_period_incremental_sync():
set_master_zone(z1)
mdlog_periods += [realm.current_period.id]
- # create another bucket in each zone, except for z3
- for zone in zonegroup.zones:
- if zone == z3:
+ for zone_conn, bucket_name in zone_bucket:
+ if zone_conn.zone == z3:
continue
- conn = get_zone_connection(zone, user.credentials)
bucket_name = gen_bucket_name()
- log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
- bucket = conn.create_bucket(bucket_name)
+ log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
+ bucket = zone_conn.conn.create_bucket(bucket_name)
buckets.append(bucket_name)
# restart zone 3 gateway and wait for sync
z3.start()
zonegroup_meta_checkpoint(zonegroup)
- # verify that we end up with the same buckets
+ # verify that we end up with the same objects
for bucket_name in buckets:
- for source_zone, target_zone in combinations(zonegroup.zones, 2):
- check_bucket_eq(source_zone, target_zone, bucket_name)
+ for source_conn, _ in zone_bucket:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+
+ target_conn.check_bucket_eq(source_conn, bucket_name)
# verify that mdlogs are not empty and match for each period
for period in mdlog_periods:
@@ -777,6 +766,7 @@ def test_multi_period_incremental_sync():
def test_zonegroup_remove():
zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
if len(zonegroup.zones) < 2:
raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
diff --git a/src/test/rgw/rgw_multi/tests_es.py b/src/test/rgw/rgw_multi/tests_es.py
new file mode 100644
index 00000000000..bc9f67b73a0
--- /dev/null
+++ b/src/test/rgw/rgw_multi/tests_es.py
@@ -0,0 +1,274 @@
+import json
+import logging
+
+import boto
+import boto.s3.connection
+
+import datetime
+import dateutil
+
+from nose.tools import eq_ as eq
+
+from rgw_multi.multisite import *
+from rgw_multi.tests import *
+from rgw_multi.zone_es import *
+
+log = logging.getLogger(__name__)
+
+
+def check_es_configured():
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ es_zones = zonegroup.zones_by_type.get("elasticsearch")
+ if not es_zones:
+ raise SkipTest("Requires at least one ES zone")
+
+def is_es_zone(zone_conn):
+ if not zone_conn:
+ return False
+
+ return zone_conn.zone.tier_type() == "elasticsearch"
+
+def verify_search(bucket_name, src_keys, result_keys, f):
+ check_keys = []
+ for k in src_keys:
+ if bucket_name:
+ if bucket_name != k.bucket.name:
+ continue
+ if f(k):
+ check_keys.append(k)
+ check_keys.sort(key = lambda l: (l.bucket.name, l.name, l.version_id))
+
+ log.debug('check keys:' + dump_json(check_keys))
+ log.debug('result keys:' + dump_json(result_keys))
+
+ for k1, k2 in zip_longest(check_keys, result_keys):
+ assert k1
+ assert k2
+ check_object_eq(k1, k2)
+
+def do_check_mdsearch(conn, bucket, src_keys, req_str, src_filter):
+ if bucket:
+ bucket_name = bucket.name
+ else:
+ bucket_name = ''
+ req = MDSearch(conn, bucket_name, req_str)
+ result_keys = req.search(sort_key = lambda k: (k.bucket.name, k.name, k.version_id))
+ verify_search(bucket_name, src_keys, result_keys, src_filter)
+
+def init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = None):
+ check_es_configured()
+
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns, buckets_per_zone = buckets_per_zone)
+
+ if bucket_init_cb:
+ for zone_conn, bucket in zone_bucket:
+ bucket_init_cb(zone_conn, bucket)
+
+ src_keys = []
+
+ owner = None
+
+ obj_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
+
+ # don't wait for meta sync just yet
+ for zone, bucket in zone_bucket:
+ for count in xrange(0, num_keys):
+ objname = obj_prefix + str(count)
+ k = new_key(zone, bucket.name, objname)
+ # k.set_contents_from_string(content + 'x' * count)
+ if not create_obj:
+ continue
+
+ create_obj(k, count)
+
+ if not owner:
+ for list_key in bucket.list_versions():
+ owner = list_key.owner
+ break
+
+ k = bucket.get_key(k.name, version_id = k.version_id)
+ k.owner = owner # owner is not set when doing get_key()
+
+ src_keys.append(k)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ sources = []
+ targets = []
+ for target_conn in zonegroup_conns.zones:
+ if not is_es_zone(target_conn):
+ sources.append(target_conn)
+ continue
+
+ targets.append(target_conn)
+
+ buckets = []
+ # make sure all targets are synced
+ for source_conn, bucket in zone_bucket:
+ buckets.append(bucket)
+ for target_conn in targets:
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+
+ return targets, sources, buckets, src_keys
+
+def test_es_object_search():
+ min_size = 10
+ content = 'a' * min_size
+
+ def create_obj(k, i):
+ k.set_contents_from_string(content + 'x' * i)
+
+ targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 2)
+
+ for target_conn in targets:
+
+ # bucket checks
+ for bucket in buckets:
+ # check name
+ do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name)
+
+ # check on all buckets
+ for key in src_keys:
+ # limiting to checking specific key name, otherwise could get results from
+ # other runs / tests
+ do_check_mdsearch(target_conn.conn, None, src_keys , 'name == ' + key.name, lambda k: k.name == key.name)
+
+ # check on specific bucket
+ for bucket in buckets:
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name < ' + key.name, lambda k: k.name < key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name <= ' + key.name, lambda k: k.name <= key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + key.name, lambda k: k.name == key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name >= ' + key.name, lambda k: k.name >= key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name > ' + key.name, lambda k: k.name > key.name)
+
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + src_keys[0].name + ' or name >= ' + src_keys[2].name,
+ lambda k: k.name == src_keys[0].name or k.name >= src_keys[2].name)
+
+ # check etag
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag < ' + key.etag[1:-1], lambda k: k.etag < key.etag)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag == ' + key.etag[1:-1], lambda k: k.etag == key.etag)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag > ' + key.etag[1:-1], lambda k: k.etag > key.etag)
+
+ # check size
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size < ' + str(key.size), lambda k: k.size < key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size <= ' + str(key.size), lambda k: k.size <= key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size == ' + str(key.size), lambda k: k.size == key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size >= ' + str(key.size), lambda k: k.size >= key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size > ' + str(key.size), lambda k: k.size > key.size)
+
+def date_from_str(s):
+ return dateutil.parser.parse(s)
+
+def test_es_object_search_custom():
+ min_size = 10
+ content = 'a' * min_size
+
+ def bucket_init(zone_conn, bucket):
+ req = MDSearchConfig(zone_conn.conn, bucket.name)
+ req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date')
+
+ def create_obj(k, i):
+ date = datetime.datetime.now() + datetime.timedelta(seconds=1) * i
+ date_str = date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
+ k.set_contents_from_string(content + 'x' * i, headers = { 'X-Amz-Meta-Foo-Str': str(i * 5),
+ 'X-Amz-Meta-Foo-Int': str(i * 5),
+ 'X-Amz-Meta-Foo-Date': date_str})
+
+ targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init)
+
+
+ for target_conn in targets:
+
+ # bucket checks
+ for bucket in buckets:
+ str_vals = []
+ for key in src_keys:
+ # check string values
+ val = key.get_metadata('foo-str')
+ str_vals.append(val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str < ' + val, lambda k: k.get_metadata('foo-str') < val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + val, lambda k: k.get_metadata('foo-str') <= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str == ' + val, lambda k: k.get_metadata('foo-str') == val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + val, lambda k: k.get_metadata('foo-str') >= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str > ' + val, lambda k: k.get_metadata('foo-str') > val)
+
+ # check int values
+ sval = key.get_metadata('foo-int')
+ val = int(sval)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int < ' + sval, lambda k: int(k.get_metadata('foo-int')) < val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int <= ' + sval, lambda k: int(k.get_metadata('foo-int')) <= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int == ' + sval, lambda k: int(k.get_metadata('foo-int')) == val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int >= ' + sval, lambda k: int(k.get_metadata('foo-int')) >= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int > ' + sval, lambda k: int(k.get_metadata('foo-int')) > val)
+
+ # check int values
+ sval = key.get_metadata('foo-date')
+ val = date_from_str(sval)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date < ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) < val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date <= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) <= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date == ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) == val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date >= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) >= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date > ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) > val)
+
+ # 'or' query
+ for i in xrange(len(src_keys) / 2):
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + str_vals[i] + ' or x-amz-meta-foo-str >= ' + str_vals[-i],
+ lambda k: k.get_metadata('foo-str') <= str_vals[i] or k.get_metadata('foo-str') >= str_vals[-i] )
+
+ # 'and' query
+ for i in xrange(len(src_keys) / 2):
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + str_vals[i] + ' and x-amz-meta-foo-str <= ' + str_vals[i + 1],
+ lambda k: k.get_metadata('foo-str') >= str_vals[i] and k.get_metadata('foo-str') <= str_vals[i + 1] )
+ # more complicated query
+ for i in xrange(len(src_keys) / 2):
+ do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name + ' and x-amz-meta-foo-str >= ' + str_vals[i] +
+ ' and (x-amz-meta-foo-str <= ' + str_vals[i + 1] + ')',
+ lambda k: k.bucket.name == bucket.name and (k.get_metadata('foo-str') >= str_vals[i] and
+ k.get_metadata('foo-str') <= str_vals[i + 1]) )
+
+def test_es_bucket_conf():
+ min_size = 0
+
+ def bucket_init(zone_conn, bucket):
+ req = MDSearchConfig(zone_conn.conn, bucket.name)
+ req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date')
+
+ targets, sources, buckets, _ = init_env(None, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init)
+
+ for source_conn in sources:
+ for bucket in buckets:
+ req = MDSearchConfig(source_conn.conn, bucket.name)
+ conf = req.get_config()
+
+ d = {}
+
+ for entry in conf:
+ d[entry['Key']] = entry['Type']
+
+ eq(len(d), 3)
+ eq(d['x-amz-meta-foo-str'], 'str')
+ eq(d['x-amz-meta-foo-int'], 'int')
+ eq(d['x-amz-meta-foo-date'], 'date')
+
+ req.del_config()
+
+ conf = req.get_config()
+
+ eq(len(conf), 0)
+
+ break # no need to iterate over all zones
diff --git a/src/test/rgw/rgw_multi/tools.py b/src/test/rgw/rgw_multi/tools.py
new file mode 100644
index 00000000000..da32516435e
--- /dev/null
+++ b/src/test/rgw/rgw_multi/tools.py
@@ -0,0 +1,82 @@
+import json
+import boto
+
+def append_attr_value(d, attr, attrv):
+ if attrv and len(str(attrv)) > 0:
+ d[attr] = attrv
+
+def append_attr(d, k, attr):
+ try:
+ attrv = getattr(k, attr)
+ except:
+ return
+ append_attr_value(d, attr, attrv)
+
+def get_attrs(k, attrs):
+ d = {}
+ for a in attrs:
+ append_attr(d, k, a)
+
+ return d
+
+def append_query_arg(s, n, v):
+ if not v:
+ return s
+ nv = '{n}={v}'.format(n=n, v=v)
+ if not s:
+ return nv
+ return '{s}&{nv}'.format(s=s, nv=nv)
+
+class KeyJSONEncoder(boto.s3.key.Key):
+ @staticmethod
+ def default(k, versioned=False):
+ attrs = ['bucket', 'name', 'size', 'last_modified', 'metadata', 'cache_control',
+ 'content_type', 'content_disposition', 'content_language',
+ 'owner', 'storage_class', 'md5', 'version_id', 'encrypted',
+ 'delete_marker', 'expiry_date', 'VersionedEpoch', 'RgwxTag']
+ d = get_attrs(k, attrs)
+ d['etag'] = k.etag[1:-1]
+ if versioned:
+ d['is_latest'] = k.is_latest
+ return d
+
+class DeleteMarkerJSONEncoder(boto.s3.key.Key):
+ @staticmethod
+ def default(k):
+ attrs = ['name', 'version_id', 'last_modified', 'owner']
+ d = get_attrs(k, attrs)
+ d['delete_marker'] = True
+ d['is_latest'] = k.is_latest
+ return d
+
+class UserJSONEncoder(boto.s3.user.User):
+ @staticmethod
+ def default(k):
+ attrs = ['id', 'display_name']
+ return get_attrs(k, attrs)
+
+class BucketJSONEncoder(boto.s3.bucket.Bucket):
+ @staticmethod
+ def default(k):
+ attrs = ['name', 'creation_date']
+ return get_attrs(k, attrs)
+
+class BotoJSONEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, boto.s3.key.Key):
+ return KeyJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.deletemarker.DeleteMarker):
+ return DeleteMarkerJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.user.User):
+ return UserJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.prefix.Prefix):
+ return (lambda x: {'prefix': x.name})(obj)
+ if isinstance(obj, boto.s3.bucket.Bucket):
+ return BucketJSONEncoder.default(obj)
+ return json.JSONEncoder.default(self, obj)
+
+
+def dump_json(o, cls=BotoJSONEncoder):
+ return json.dumps(o, cls=cls, indent=4)
+
+
diff --git a/src/test/rgw/rgw_multi/zone_es.py b/src/test/rgw/rgw_multi/zone_es.py
new file mode 100644
index 00000000000..dccab4093d1
--- /dev/null
+++ b/src/test/rgw/rgw_multi/zone_es.py
@@ -0,0 +1,256 @@
+import json
+import requests.compat
+import logging
+
+import boto
+import boto.s3.connection
+
+import dateutil.parser
+
+from nose.tools import eq_ as eq
+try:
+ from itertools import izip_longest as zip_longest
+except ImportError:
+ from itertools import zip_longest
+
+from .multisite import *
+from .tools import *
+
+log = logging.getLogger(__name__)
+
+def get_key_ver(k):
+ if not k.version_id:
+ return 'null'
+ return k.version_id
+
+def check_object_eq(k1, k2, check_extra = True):
+ assert k1
+ assert k2
+ log.debug('comparing key name=%s', k1.name)
+ eq(k1.name, k2.name)
+ eq(k1.metadata, k2.metadata)
+ # eq(k1.cache_control, k2.cache_control)
+ eq(k1.content_type, k2.content_type)
+ # eq(k1.content_encoding, k2.content_encoding)
+ # eq(k1.content_disposition, k2.content_disposition)
+ # eq(k1.content_language, k2.content_language)
+ eq(k1.etag, k2.etag)
+ mtime1 = dateutil.parser.parse(k1.last_modified)
+ mtime2 = dateutil.parser.parse(k2.last_modified)
+ assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
+ if check_extra:
+ eq(k1.owner.id, k2.owner.id)
+ eq(k1.owner.display_name, k2.owner.display_name)
+ # eq(k1.storage_class, k2.storage_class)
+ eq(k1.size, k2.size)
+ eq(get_key_ver(k1), get_key_ver(k2))
+ # eq(k1.encrypted, k2.encrypted)
+
+def make_request(conn, method, bucket, key, query_args, headers):
+ result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
+ if result.status / 100 != 2:
+ raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
+ return result
+
+def append_query_arg(s, n, v):
+ if not v:
+ return s
+ nv = '{n}={v}'.format(n=n, v=v)
+ if not s:
+ return nv
+ return '{s}&{nv}'.format(s=s, nv=nv)
+
+class MDSearch:
+ def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
+ self.conn = conn
+ self.bucket_name = bucket_name or ''
+ if bucket_name:
+ self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
+ else:
+ self.bucket = None
+ self.query = query
+ self.query_args = query_args
+ self.max_keys = None
+ self.marker = marker
+
+ def raw_search(self):
+ q = self.query or ''
+ query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q))
+ if self.max_keys is not None:
+ query_args = append_query_arg(query_args, 'max-keys', self.max_keys)
+ if self.marker:
+ query_args = append_query_arg(query_args, 'marker', self.marker)
+
+ query_args = append_query_arg(query_args, 'format', 'json')
+
+ headers = {}
+
+ result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
+
+ l = []
+
+ result_dict = json.loads(result.read())
+
+ for entry in result_dict['Objects']:
+ bucket = self.conn.get_bucket(entry['Bucket'], validate = False)
+ k = boto.s3.key.Key(bucket, entry['Key'])
+
+ k.version_id = entry['Instance']
+ k.etag = entry['ETag']
+ k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
+ k.last_modified = entry['LastModified']
+ k.size = entry['Size']
+ k.content_type = entry['ContentType']
+ k.versioned_epoch = entry['VersionedEpoch']
+
+ k.metadata = {}
+ for e in entry['CustomMetadata']:
+ k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response
+
+ l.append(k)
+
+ return result_dict, l
+
+ def search(self, drain = True, sort = True, sort_key = None):
+ l = []
+
+ is_done = False
+
+ while not is_done:
+ result, result_keys = self.raw_search()
+
+ l = l + result_keys
+
+ is_done = not (drain and (result['IsTruncated'] == "true"))
+ marker = result['Marker']
+
+ if sort:
+ if not sort_key:
+ sort_key = lambda k: (k.name, -k.versioned_epoch)
+ l.sort(key = sort_key)
+
+ return l
+
+
+class MDSearchConfig:
+ def __init__(self, conn, bucket_name):
+ self.conn = conn
+ self.bucket_name = bucket_name or ''
+ if bucket_name:
+ self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
+ else:
+ self.bucket = None
+
+ def send_request(self, conf, method):
+ query_args = 'mdsearch'
+ headers = None
+ if conf:
+ headers = { 'X-Amz-Meta-Search': conf }
+
+ query_args = append_query_arg(query_args, 'format', 'json')
+
+ return make_request(self.conn, method, bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
+
+ def get_config(self):
+ result = self.send_request(None, 'GET')
+ return json.loads(result.read())
+
+ def set_config(self, conf):
+ self.send_request(conf, 'POST')
+
+ def del_config(self):
+ self.send_request(None, 'DELETE')
+
+
+class ESZoneBucket:
+ def __init__(self, zone_conn, name, conn):
+ self.zone_conn = zone_conn
+ self.name = name
+ self.conn = conn
+
+ self.bucket = boto.s3.bucket.Bucket(name=name)
+
+ def get_all_versions(self):
+
+ marker = None
+ is_done = False
+
+ req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker)
+
+ for k in req.search():
+ yield k
+
+
+
+
+class ESZone(Zone):
+ def __init__(self, name, es_endpoint, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+ self.es_endpoint = es_endpoint
+ super(ESZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
+ def is_read_only(self):
+ return True
+
+ def tier_type(self):
+ return "elasticsearch"
+
+ def create(self, cluster, args = None, check_retcode = True):
+ """ create the object with the given arguments """
+
+ if args is None:
+ args = ''
+
+ tier_config = ','.join([ 'endpoint=' + self.es_endpoint, 'explicit_custom_meta=false' ])
+
+ args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ]
+
+ return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
+
+ def has_buckets(self):
+ return False
+
+ class Conn(ZoneConn):
+ def __init__(self, zone, credentials):
+ super(ESZone.Conn, self).__init__(zone, credentials)
+
+ def get_bucket(self, bucket_name):
+ return ESZoneBucket(self, bucket_name, self.conn)
+
+ def create_bucket(self, name):
+ # should not be here, a bug in the test suite
+ log.critical('Conn.create_bucket() should not be called in ES zone')
+ assert False
+
+ def check_bucket_eq(self, zone_conn, bucket_name):
+ assert(zone_conn.zone.tier_type() == "rados")
+
+ log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
+ b1 = self.get_bucket(bucket_name)
+ b2 = zone_conn.get_bucket(bucket_name)
+
+ log.debug('bucket1 objects:')
+ for o in b1.get_all_versions():
+ log.debug('o=%s', o.name)
+ log.debug('bucket2 objects:')
+ for o in b2.get_all_versions():
+ log.debug('o=%s', o.name)
+
+ for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
+ if k1 is None:
+ log.critical('key=%s is missing from zone=%s', k2.name, self.name)
+ assert False
+ if k2 is None:
+ log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+ assert False
+
+ check_object_eq(k1, k2)
+
+
+ log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+ return True
+
+ def get_conn(self, credentials):
+ return self.Conn(self, credentials)
+
+
diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py
new file mode 100644
index 00000000000..8fae22ea8a9
--- /dev/null
+++ b/src/test/rgw/rgw_multi/zone_rados.py
@@ -0,0 +1,89 @@
+import logging
+
+try:
+ from itertools import izip_longest as zip_longest
+except ImportError:
+ from itertools import zip_longest
+
+from nose.tools import eq_ as eq
+
+from .multisite import *
+
+log = logging.getLogger(__name__)
+
+def check_object_eq(k1, k2, check_extra = True):
+ assert k1
+ assert k2
+ log.debug('comparing key name=%s', k1.name)
+ eq(k1.name, k2.name)
+ eq(k1.get_contents_as_string(), k2.get_contents_as_string())
+ eq(k1.metadata, k2.metadata)
+ eq(k1.cache_control, k2.cache_control)
+ eq(k1.content_type, k2.content_type)
+ eq(k1.content_encoding, k2.content_encoding)
+ eq(k1.content_disposition, k2.content_disposition)
+ eq(k1.content_language, k2.content_language)
+ eq(k1.etag, k2.etag)
+ eq(k1.last_modified, k2.last_modified)
+ if check_extra:
+ eq(k1.owner.id, k2.owner.id)
+ eq(k1.owner.display_name, k2.owner.display_name)
+ eq(k1.storage_class, k2.storage_class)
+ eq(k1.size, k2.size)
+ eq(k1.version_id, k2.version_id)
+ eq(k1.encrypted, k2.encrypted)
+
+
+class RadosZone(Zone):
+ def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+ super(RadosZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
+ def tier_type(self):
+ return "rados"
+
+
+ class Conn(ZoneConn):
+ def __init__(self, zone, credentials):
+ super(RadosZone.Conn, self).__init__(zone, credentials)
+
+ def get_bucket(self, name):
+ return self.conn.get_bucket(name)
+
+ def create_bucket(self, name):
+ return self.conn.create_bucket(name)
+
+ def check_bucket_eq(self, zone_conn, bucket_name):
+ log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+ b1 = self.get_bucket(bucket_name)
+ b2 = zone_conn.get_bucket(bucket_name)
+
+ log.debug('bucket1 objects:')
+ for o in b1.get_all_versions():
+ log.debug('o=%s', o.name)
+ log.debug('bucket2 objects:')
+ for o in b2.get_all_versions():
+ log.debug('o=%s', o.name)
+
+ for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
+ if k1 is None:
+ log.critical('key=%s is missing from zone=%s', k2.name, self.name)
+ assert False
+ if k2 is None:
+ log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+ assert False
+
+ check_object_eq(k1, k2)
+
+ # now get the keys through a HEAD operation, verify that the available data is the same
+ k1_head = b1.get_key(k1.name)
+ k2_head = b2.get_key(k2.name)
+
+ check_object_eq(k1_head, k2_head, False)
+
+ log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+ return True
+
+ def get_conn(self, credentials):
+ return self.Conn(self, credentials)
+
diff --git a/src/test/rgw/test_multi.py b/src/test/rgw/test_multi.py
index e380acc9903..21154585ceb 100644
--- a/src/test/rgw/test_multi.py
+++ b/src/test/rgw/test_multi.py
@@ -13,8 +13,12 @@ except ImportError:
import nose.core
from rgw_multi import multisite
+from rgw_multi.zone_rados import RadosZone as RadosZone
+from rgw_multi.zone_es import ESZone as ESZone
+
# make tests from rgw_multi.tests available to nose
from rgw_multi.tests import *
+from rgw_multi.tests_es import *
mstart_path = os.getenv('MSTART_PATH')
if mstart_path is None:
@@ -146,6 +150,7 @@ def init(parse_args):
cfg = configparser.RawConfigParser({
'num_zonegroups': 1,
'num_zones': 3,
+ 'num_es_zones': 0,
'gateways_per_zone': 2,
'no_bootstrap': 'false',
'log_level': 20,
@@ -155,6 +160,7 @@ def init(parse_args):
'checkpoint_retries': 60,
'checkpoint_delay': 5,
'reconfigure_delay': 5,
+ 'es_endpoint': None,
})
try:
path = os.environ['RGW_MULTI_TEST_CONF']
@@ -175,6 +181,7 @@ def init(parse_args):
section = 'DEFAULT'
parser.add_argument('--num-zonegroups', type=int, default=cfg.getint(section, 'num_zonegroups'))
parser.add_argument('--num-zones', type=int, default=cfg.getint(section, 'num_zones'))
+ parser.add_argument('--num-es-zones', type=int, default=cfg.getint(section, 'num_es_zones'))
parser.add_argument('--gateways-per-zone', type=int, default=cfg.getint(section, 'gateways_per_zone'))
parser.add_argument('--no-bootstrap', action='store_true', default=cfg.getboolean(section, 'no_bootstrap'))
parser.add_argument('--log-level', type=int, default=cfg.getint(section, 'log_level'))
@@ -184,6 +191,7 @@ def init(parse_args):
parser.add_argument('--checkpoint-retries', type=int, default=cfg.getint(section, 'checkpoint_retries'))
parser.add_argument('--checkpoint-delay', type=int, default=cfg.getint(section, 'checkpoint_delay'))
parser.add_argument('--reconfigure-delay', type=int, default=cfg.getint(section, 'reconfigure_delay'))
+ parser.add_argument('--es-endpoint', type=str, default=cfg.get(section, 'es_endpoint'))
argv = []
@@ -193,6 +201,9 @@ def init(parse_args):
args = parser.parse_args(argv)
bootstrap = not args.no_bootstrap
+ # if num_es_zones is defined, need to have es_endpoint defined too
+ assert(args.num_es_zones == 0 or args.es_endpoint)
+
setup_logging(args.log_level, args.log_file, args.file_log_level)
# start first cluster
@@ -217,6 +228,8 @@ def init(parse_args):
period = multisite.Period(realm=realm)
realm.current_period = period
+ num_zones = args.num_zones + args.num_es_zones
+
for zg in range(0, args.num_zonegroups):
zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
period.zonegroups.append(zonegroup)
@@ -225,7 +238,7 @@ def init(parse_args):
if is_master_zg:
period.master_zonegroup = zonegroup
- for z in range(0, args.num_zones):
+ for z in range(0, num_zones):
is_master = z == 0
# start a cluster, or use c1 for first zone
cluster = None
@@ -253,8 +266,15 @@ def init(parse_args):
else:
zonegroup.get(cluster)
+ es_zone = (z >= args.num_zones)
+
# create the zone in its zonegroup
zone = multisite.Zone(zone_name(zg, z), zonegroup, cluster)
+ if es_zone:
+ zone = ESZone(zone_name(zg, z), args.es_endpoint, zonegroup, cluster)
+ else:
+ zone = RadosZone(zone_name(zg, z), zonegroup, cluster)
+
if bootstrap:
arg = admin_creds.credential_args()
if is_master:
@@ -268,6 +288,13 @@ def init(parse_args):
if is_master:
zonegroup.master_zone = zone
+ zonegroup.zones_by_type.setdefault(zone.tier_type(), []).append(zone)
+
+ if zone.is_read_only():
+ zonegroup.ro_zones.append(zone)
+ else:
+ zonegroup.rw_zones.append(zone)
+
# update/commit the period
if bootstrap:
period.update(zone, commit=True)