summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2017-10-12 02:03:44 +0200
committerYehuda Sadeh <yehuda@redhat.com>2018-04-10 17:05:38 +0200
commit4d49ec953e8cfb62044e6c7c1c9cd86bcee80556 (patch)
tree51574a21ce5fc63b801b225dcc8eda7eb90a09c8
parentrgw: aws sync, in_crf init abstraction (diff)
downloadceph-4d49ec953e8cfb62044e6c7c1c9cd86bcee80556.tar.xz
ceph-4d49ec953e8cfb62044e6c7c1c9cd86bcee80556.zip
rgw: aws sync, add hooks for decoding/encoding rest obj
Object sync is now functional. Create abstraction that will later help with different cloud providers. Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r--src/rgw/rgw_cr_rest.cc59
-rw-r--r--src/rgw/rgw_cr_rest.h55
-rw-r--r--src/rgw/rgw_rest_client.cc40
-rw-r--r--src/rgw/rgw_rest_client.h7
-rw-r--r--src/rgw/rgw_rest_conn.cc18
-rw-r--r--src/rgw/rgw_sync_module_aws.cc35
6 files changed, 161 insertions, 53 deletions
diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc
index 65ad60b9aab..dfe22fa451a 100644
--- a/src/rgw/rgw_cr_rest.cc
+++ b/src/rgw/rgw_cr_rest.cc
@@ -10,6 +10,21 @@
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+class TestSpliceCR : public RGWCoroutine {
+ CephContext *cct;
+ RGWHTTPManager *http_manager;
+ RGWHTTPStreamRWRequest *in_req{nullptr};
+ RGWHTTPStreamRWRequest *out_req{nullptr};
+ std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+ std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+public:
+ TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+ RGWHTTPStreamRWRequest *_in_req,
+ RGWHTTPStreamRWRequest *_out_req);
+
+ int operate();
+};
+
class RGWCRHTTPGetDataCB : public RGWGetDataCB {
Mutex lock;
RGWCoroutinesEnv *env;
@@ -93,7 +108,7 @@ int RGWStreamReadHTTPResourceCRF::init()
return 0;
}
-int RGWStreamWriteHTTPResourceCRF::init()
+int RGWStreamWriteHTTPResourceCRF::send()
{
env->stack->init_new_io(req);
@@ -116,6 +131,17 @@ void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs)
*attrs = req->get_out_headers();
}
+int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) {
+ /* basic generic implementation */
+ for (auto header : headers) {
+ const string& val = header.second;
+
+ rest_obj.attrs[header.first] = val;
+ }
+
+ return 0;
+}
+
int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
@@ -130,10 +156,22 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
continue;
}
extra_data.claim_append(in_cb->get_extra_data());
+ map<string, string> attrs = req->get_out_headers();
+ int ret = decode_rest_obj(attrs, extra_data, &rest_obj);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl;
+ return ret;
+ }
got_extra_data = true;
}
*io_pending = false;
in_cb->claim_data(out, max_size);
+ if (out->length() == 0) {
+ /* this may happen if we just read the prepended extra_data and didn't have any data
+ * after. In that case, retry reading, so that caller doesn't assume it's EOF.
+ */
+ continue;
+ }
if (!req->is_done()) {
yield;
}
@@ -142,14 +180,11 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
return 0;
}
-void RGWStreamWriteHTTPResourceCRF::send_ready(const map<string, string>& attrs)
+void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj)
{
- for (auto h : attrs) {
- if (h.first == "CONTENT_LENGTH") {
- req->set_send_length(atoi(h.second.c_str()));
- } else {
- req->append_header(h.first, h.second);
- }
+ req->set_send_length(rest_obj.content_len);
+ for (auto h : rest_obj.attrs) {
+ req->append_header(h.first, h.second);
}
}
@@ -225,13 +260,15 @@ int RGWStreamSpliceCR::operate() {
}
if (!sent_attrs) {
- map<string, string> attrs;
- in_crf->get_attrs(&attrs);
- out_crf->send_ready(attrs);
int ret = out_crf->init();
if (ret < 0) {
return set_cr_error(ret);
}
+ out_crf->send_ready(in_crf->get_rest_obj());
+ ret = out_crf->send();
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
sent_attrs = true;
}
diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h
index 1338682549e..512082b5e09 100644
--- a/src/rgw/rgw_cr_rest.h
+++ b/src/rgw/rgw_cr_rest.h
@@ -7,7 +7,16 @@
#include "rgw_coroutine.h"
#include "rgw_rest_conn.h"
-class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine{
+
+struct rgw_rest_obj {
+ rgw_obj_key key;
+ uint64_t content_len;
+ std::map<string, string> attrs;
+ std::map<string, string> custom_attrs;
+ RGWAccessControlPolicy acls;
+};
+
+class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine {
bufferlist *result;
protected:
RGWRESTConn *conn;
@@ -315,6 +324,7 @@ protected:
public:
virtual int init() = 0;
virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+ virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) = 0;
virtual bool has_attrs() = 0;
virtual void get_attrs(std::map<string, string> *attrs) = 0;
};
@@ -326,12 +336,14 @@ protected:
public:
virtual int init() = 0;
- virtual void send_ready(const std::map<string, string>& attrs) = 0;
+ virtual void send_ready(const rgw_rest_obj& rest_obj) = 0;
+ virtual int send() = 0;
virtual int write(bufferlist& data) = 0; /* reentrant */
virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
};
class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
+ CephContext *cct;
RGWCoroutinesEnv *env;
RGWCoroutine *caller;
RGWHTTPManager *http_manager;
@@ -345,24 +357,33 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
bool got_attrs{false};
bool got_extra_data{false};
+protected:
+ rgw_rest_obj rest_obj;
+
public:
RGWStreamReadHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager) : env(_env),
- caller(_caller),
- http_manager(_http_manager) {}
+ RGWHTTPManager *_http_manager) : cct(_cct),
+ env(_env),
+ caller(_caller),
+ http_manager(_http_manager) {}
virtual ~RGWStreamReadHTTPResourceCRF();
int init() override;
int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info);
bool has_attrs() override;
- void get_attrs(std::map<string, string> *pattrs) override;
+ void get_attrs(std::map<string, string> *attrs);
virtual bool need_extra_data() { return false; }
void set_req(RGWHTTPStreamRWRequest *r) {
req = r;
}
+
+ rgw_rest_obj& get_rest_obj() {
+ return rest_obj;
+ }
};
class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
@@ -382,8 +403,11 @@ public:
http_manager(_http_manager) {}
virtual ~RGWStreamWriteHTTPResourceCRF() {}
- int init() override;
- void send_ready(const std::map<string, string>& attrs) override;
+ int init() override {
+ return 0;
+ }
+ void send_ready(const rgw_rest_obj& rest_obj) override;
+ int send() override;
int write(bufferlist& data) override; /* reentrant */
int drain_writes(bool *need_retry) override; /* reentrant */
@@ -412,19 +436,4 @@ public:
int operate();
};
-class TestSpliceCR : public RGWCoroutine {
- CephContext *cct;
- RGWHTTPManager *http_manager;
- RGWHTTPStreamRWRequest *in_req{nullptr};
- RGWHTTPStreamRWRequest *out_req{nullptr};
- std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
- std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-public:
- TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
- RGWHTTPStreamRWRequest *_in_req,
- RGWHTTPStreamRWRequest *_out_req);
-
- int operate();
-};
-
#endif
diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc
index eefcb438061..749fe437195 100644
--- a/src/rgw/rgw_rest_client.cc
+++ b/src/rgw/rgw_rest_client.cc
@@ -541,18 +541,32 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *
return 0;
}
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+static void send_prepare_convert(const rgw_obj& obj, string *resource)
{
string urlsafe_bucket, urlsafe_object;
url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
url_encode(obj.key.name, urlsafe_object);
- string resource = urlsafe_bucket + "/" + urlsafe_object;
+ *resource = urlsafe_bucket + "/" + urlsafe_object;
+}
+
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+{
+ string resource;
+ send_prepare_convert(obj, &resource);
return send_request(&key, extra_headers, resource, mgr);
}
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
- RGWHTTPManager *mgr, bufferlist *send_data)
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj)
+{
+ string resource;
+ send_prepare_convert(obj, &resource);
+
+ return send_prepare(&key, extra_headers, resource);
+}
+
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ bufferlist *send_data)
{
string new_url = url;
if (new_url[new_url.size() - 1] != '/')
@@ -609,7 +623,6 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
headers.emplace_back(kv);
}
- bool send_data_hint = false;
if (send_data) {
set_outbl(*send_data);
send_data_hint = true;
@@ -618,6 +631,23 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
method = new_info.method;
url = new_url;
+ return 0;
+}
+
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ RGWHTTPManager *mgr, bufferlist *send_data)
+{
+ int ret = send_prepare(key, extra_headers, resource, send_data);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return send(mgr);
+}
+
+
+int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
+{
if (!mgr) {
return RGWHTTP::send(this);
}
diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h
index 585af1d0c99..e990715c84e 100644
--- a/src/rgw/rgw_rest_client.h
+++ b/src/rgw/rgw_rest_client.h
@@ -110,13 +110,20 @@ public:
};
class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
+ bool send_data_hint{false};
public:
RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
}
virtual ~RGWRESTStreamRWRequest() override {}
+
+ int send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = nullptr /* optional input data */);
+ int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
+ int send(RGWHTTPManager *mgr);
+
int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr);
int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
+
int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
};
diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc
index 7ac889ed670..70ee7f0bccf 100644
--- a/src/rgw/rgw_rest_conn.cc
+++ b/src/rgw/rgw_rest_conn.cc
@@ -199,7 +199,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
param_vec_t params;
populate_params(params, &uid, self_zone_group);
if (prepend_metadata) {
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", self_zone_group));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", "true"));
}
if (rgwx_stat) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "stat", "true"));
@@ -244,18 +244,24 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
}
+ int r = (*req)->send_prepare(key, extra_headers, obj);
+ if (r < 0) {
+ goto done_err;
+ }
+
if (!send) {
return 0;
}
- int r = (*req)->send_request(key, extra_headers, obj, nullptr);
+ r = (*req)->send(nullptr);
if (r < 0) {
- delete *req;
- *req = nullptr;
- return r;
+ goto done_err;
}
-
return 0;
+done_err:
+ delete *req;
+ *req = nullptr;
+ return r;
}
int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req, string& etag, real_time *mtime,
diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc
index d9723a1a144..07631aa1f6f 100644
--- a/src/rgw/rgw_sync_module_aws.cc
+++ b/src/rgw/rgw_sync_module_aws.cc
@@ -52,12 +52,12 @@ public:
sync_env(_sync_env), conn(_conn), src_obj(_src_obj) {
}
- int init(RGWBucketInfo& bucket_info, rgw_obj_key& key) {
+ int init() override {
/* init input connection */
RGWRESTStreamRWRequest *in_req;
int ret = conn->get_obj(rgw_user(), nullptr, src_obj,
nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
- true /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */,
+ true /* prepend_metadata */, true /* get_op */, false /*rgwx_stat */,
false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
nullptr /* cb */, &in_req);
if (ret < 0) {
@@ -67,7 +67,23 @@ public:
set_req(in_req);
+ return RGWStreamReadHTTPResourceCRF::init();
+ }
+
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override {
+ for (auto header : headers) {
+ const string& val = header.second;
+ if (header.first == "RGWX_OBJECT_SIZE") {
+ rest_obj.content_len = atoi(val.c_str());
+ } else {
+ rest_obj.attrs[header.first] = val;
+ }
+ }
+
+ ldout(sync_env->cct, 20) << __func << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
+
return 0;
+
}
bool need_extra_data() override {
@@ -98,22 +114,25 @@ public:
set_req(out_req);
- return 0;
+ return RGWStreamWriteHTTPResourceCRF::init();
}
- void send_ready(const std::map<string, string>& attrs) override {
+ void send_ready(const rgw_rest_obj& rest_obj) override {
RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
+ /* here we need to convert rest_obj.attrs to cloud specific representation */
+
map<string, bufferlist> new_attrs;
- for (auto attr : attrs) {
- const string& val = attr.second;
- new_attrs[attr.first].append(bufferptr(val.c_str(), val.size() - 1));
+ for (auto attr : rest_obj.attrs) {
+ new_attrs[attr.first].append(attr.second);
}
RGWAccessControlPolicy policy;
::encode(policy, new_attrs[RGW_ATTR_ACL]);
+ r->set_send_length(rest_obj.content_len);
+
r->send_ready(conn->get_key(), new_attrs, false);
}
};
@@ -127,7 +146,7 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
string target_bucket_name;
std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-
+ rgw_rest_obj rest_obj;
string obj_path;
int ret{0};