diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2017-10-12 02:03:44 +0200 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2018-04-10 17:05:38 +0200 |
commit | 4d49ec953e8cfb62044e6c7c1c9cd86bcee80556 (patch) | |
tree | 51574a21ce5fc63b801b225dcc8eda7eb90a09c8 | |
parent | rgw: aws sync, in_crf init abstraction (diff) | |
download | ceph-4d49ec953e8cfb62044e6c7c1c9cd86bcee80556.tar.xz ceph-4d49ec953e8cfb62044e6c7c1c9cd86bcee80556.zip |
rgw: aws sync, add hooks for decoding/encoding rest obj
Object sync is now functional. Create abstraction that will later help
with different cloud providers.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r-- | src/rgw/rgw_cr_rest.cc | 59 | ||||
-rw-r--r-- | src/rgw/rgw_cr_rest.h | 55 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.cc | 40 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.h | 7 | ||||
-rw-r--r-- | src/rgw/rgw_rest_conn.cc | 18 | ||||
-rw-r--r-- | src/rgw/rgw_sync_module_aws.cc | 35 |
6 files changed, 161 insertions, 53 deletions
diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 65ad60b9aab..dfe22fa451a 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -10,6 +10,21 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw +class TestSpliceCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + RGWHTTPStreamRWRequest *in_req{nullptr}; + RGWHTTPStreamRWRequest *out_req{nullptr}; + std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf; + std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf; +public: + TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, + RGWHTTPStreamRWRequest *_in_req, + RGWHTTPStreamRWRequest *_out_req); + + int operate(); +}; + class RGWCRHTTPGetDataCB : public RGWGetDataCB { Mutex lock; RGWCoroutinesEnv *env; @@ -93,7 +108,7 @@ int RGWStreamReadHTTPResourceCRF::init() return 0; } -int RGWStreamWriteHTTPResourceCRF::init() +int RGWStreamWriteHTTPResourceCRF::send() { env->stack->init_new_io(req); @@ -116,6 +131,17 @@ void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs) *attrs = req->get_out_headers(); } +int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) { + /* basic generic implementation */ + for (auto header : headers) { + const string& val = header.second; + + rest_obj.attrs[header.first] = val; + } + + return 0; +} + int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) { reenter(&read_state) { @@ -130,10 +156,22 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool continue; } extra_data.claim_append(in_cb->get_extra_data()); + map<string, string> attrs = req->get_out_headers(); + int ret = decode_rest_obj(attrs, extra_data, &rest_obj); + if (ret < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl; + return ret; + } got_extra_data = true; } *io_pending = false; in_cb->claim_data(out, max_size); + if (out->length() == 0) { + /* this may happen if we just read the prepended extra_data and didn't have any data + * after. In that case, retry reading, so that caller doesn't assume it's EOF. + */ + continue; + } if (!req->is_done()) { yield; } @@ -142,14 +180,11 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool return 0; } -void RGWStreamWriteHTTPResourceCRF::send_ready(const map<string, string>& attrs) +void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj) { - for (auto h : attrs) { - if (h.first == "CONTENT_LENGTH") { - req->set_send_length(atoi(h.second.c_str())); - } else { - req->append_header(h.first, h.second); - } + req->set_send_length(rest_obj.content_len); + for (auto h : rest_obj.attrs) { + req->append_header(h.first, h.second); } } @@ -225,13 +260,15 @@ int RGWStreamSpliceCR::operate() { } if (!sent_attrs) { - map<string, string> attrs; - in_crf->get_attrs(&attrs); - out_crf->send_ready(attrs); int ret = out_crf->init(); if (ret < 0) { return set_cr_error(ret); } + out_crf->send_ready(in_crf->get_rest_obj()); + ret = out_crf->send(); + if (ret < 0) { + return set_cr_error(ret); + } sent_attrs = true; } diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 1338682549e..512082b5e09 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -7,7 +7,16 @@ #include "rgw_coroutine.h" #include "rgw_rest_conn.h" -class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine{ + +struct rgw_rest_obj { + rgw_obj_key key; + uint64_t content_len; + std::map<string, string> attrs; + std::map<string, string> custom_attrs; + RGWAccessControlPolicy acls; +}; + +class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine { bufferlist *result; protected: RGWRESTConn *conn; @@ -315,6 +324,7 @@ protected: public: virtual int init() = 0; virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ + virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) = 0; virtual bool has_attrs() = 0; virtual void get_attrs(std::map<string, string> *attrs) = 0; }; @@ -326,12 +336,14 @@ protected: public: virtual int init() = 0; - virtual void send_ready(const std::map<string, string>& attrs) = 0; + virtual void send_ready(const rgw_rest_obj& rest_obj) = 0; + virtual int send() = 0; virtual int write(bufferlist& data) = 0; /* reentrant */ virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ }; class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { + CephContext *cct; RGWCoroutinesEnv *env; RGWCoroutine *caller; RGWHTTPManager *http_manager; @@ -345,24 +357,33 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { bool got_attrs{false}; bool got_extra_data{false}; +protected: + rgw_rest_obj rest_obj; + public: RGWStreamReadHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, - RGWHTTPManager *_http_manager) : env(_env), - caller(_caller), - http_manager(_http_manager) {} + RGWHTTPManager *_http_manager) : cct(_cct), + env(_env), + caller(_caller), + http_manager(_http_manager) {} virtual ~RGWStreamReadHTTPResourceCRF(); int init() override; int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ + int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info); bool has_attrs() override; - void get_attrs(std::map<string, string> *pattrs) override; + void get_attrs(std::map<string, string> *attrs); virtual bool need_extra_data() { return false; } void set_req(RGWHTTPStreamRWRequest *r) { req = r; } + + rgw_rest_obj& get_rest_obj() { + return rest_obj; + } }; class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { @@ -382,8 +403,11 @@ public: http_manager(_http_manager) {} virtual ~RGWStreamWriteHTTPResourceCRF() {} - int init() override; - void send_ready(const std::map<string, string>& attrs) override; + int init() override { + return 0; + } + void send_ready(const rgw_rest_obj& rest_obj) override; + int send() override; int write(bufferlist& data) override; /* reentrant */ int drain_writes(bool *need_retry) override; /* reentrant */ @@ -412,19 +436,4 @@ public: int operate(); }; -class TestSpliceCR : public RGWCoroutine { - CephContext *cct; - RGWHTTPManager *http_manager; - RGWHTTPStreamRWRequest *in_req{nullptr}; - RGWHTTPStreamRWRequest *out_req{nullptr}; - std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf; - std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf; -public: - TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, - RGWHTTPStreamRWRequest *_in_req, - RGWHTTPStreamRWRequest *_out_req); - - int operate(); -}; - #endif diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index eefcb438061..749fe437195 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -541,18 +541,32 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time * return 0; } -int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr) +static void send_prepare_convert(const rgw_obj& obj, string *resource) { string urlsafe_bucket, urlsafe_object; url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket); url_encode(obj.key.name, urlsafe_object); - string resource = urlsafe_bucket + "/" + urlsafe_object; + *resource = urlsafe_bucket + "/" + urlsafe_object; +} + +int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr) +{ + string resource; + send_prepare_convert(obj, &resource); return send_request(&key, extra_headers, resource, mgr); } -int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, - RGWHTTPManager *mgr, bufferlist *send_data) +int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj) +{ + string resource; + send_prepare_convert(obj, &resource); + + return send_prepare(&key, extra_headers, resource); +} + +int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, + bufferlist *send_data) { string new_url = url; if (new_url[new_url.size() - 1] != '/') @@ -609,7 +623,6 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& headers.emplace_back(kv); } - bool send_data_hint = false; if (send_data) { set_outbl(*send_data); send_data_hint = true; @@ -618,6 +631,23 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& method = new_info.method; url = new_url; + return 0; +} + +int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, + RGWHTTPManager *mgr, bufferlist *send_data) +{ + int ret = send_prepare(key, extra_headers, resource, send_data); + if (ret < 0) { + return ret; + } + + return send(mgr); +} + + +int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr) +{ if (!mgr) { return RGWHTTP::send(this); } diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 585af1d0c99..e990715c84e 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -110,13 +110,20 @@ public: }; class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest { + bool send_data_hint{false}; public: RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) { } virtual ~RGWRESTStreamRWRequest() override {} + + int send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = nullptr /* optional input data */); + int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj); + int send(RGWHTTPManager *mgr); + int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr); int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */); + int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs); }; diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 7ac889ed670..70ee7f0bccf 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -199,7 +199,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw param_vec_t params; populate_params(params, &uid, self_zone_group); if (prepend_metadata) { - params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", self_zone_group)); + params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", "true")); } if (rgwx_stat) { params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "stat", "true")); @@ -244,18 +244,24 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER"); } + int r = (*req)->send_prepare(key, extra_headers, obj); + if (r < 0) { + goto done_err; + } + if (!send) { return 0; } - int r = (*req)->send_request(key, extra_headers, obj, nullptr); + r = (*req)->send(nullptr); if (r < 0) { - delete *req; - *req = nullptr; - return r; + goto done_err; } - return 0; +done_err: + delete *req; + *req = nullptr; + return r; } int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req, string& etag, real_time *mtime, diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index d9723a1a144..07631aa1f6f 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -52,12 +52,12 @@ public: sync_env(_sync_env), conn(_conn), src_obj(_src_obj) { } - int init(RGWBucketInfo& bucket_info, rgw_obj_key& key) { + int init() override { /* init input connection */ RGWRESTStreamRWRequest *in_req; int ret = conn->get_obj(rgw_user(), nullptr, src_obj, nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */, - true /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */, + true /* prepend_metadata */, true /* get_op */, false /*rgwx_stat */, false /* sync_manifest */, true /* skip_descrypt */, false /* send */, nullptr /* cb */, &in_req); if (ret < 0) { @@ -67,7 +67,23 @@ public: set_req(in_req); + return RGWStreamReadHTTPResourceCRF::init(); + } + + int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override { + for (auto header : headers) { + const string& val = header.second; + if (header.first == "RGWX_OBJECT_SIZE") { + rest_obj.content_len = atoi(val.c_str()); + } else { + rest_obj.attrs[header.first] = val; + } + } + + ldout(sync_env->cct, 20) << __func << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; + return 0; + } bool need_extra_data() override { @@ -98,22 +114,25 @@ public: set_req(out_req); - return 0; + return RGWStreamWriteHTTPResourceCRF::init(); } - void send_ready(const std::map<string, string>& attrs) override { + void send_ready(const rgw_rest_obj& rest_obj) override { RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req; + /* here we need to convert rest_obj.attrs to cloud specific representation */ + map<string, bufferlist> new_attrs; - for (auto attr : attrs) { - const string& val = attr.second; - new_attrs[attr.first].append(bufferptr(val.c_str(), val.size() - 1)); + for (auto attr : rest_obj.attrs) { + new_attrs[attr.first].append(attr.second); } RGWAccessControlPolicy policy; ::encode(policy, new_attrs[RGW_ATTR_ACL]); + r->set_send_length(rest_obj.content_len); + r->send_ready(conn->get_key(), new_attrs, false); } }; @@ -127,7 +146,7 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { string target_bucket_name; std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf; std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf; - + rgw_rest_obj rest_obj; string obj_path; int ret{0}; |