diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2017-09-04 14:15:11 +0200 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2018-04-10 17:05:38 +0200 |
commit | bd0ca81cdec8c648a3ebfac949ccbbd623ff3589 (patch) | |
tree | 21efa1aa7e352e1268f73941e1d20d2739e5babe | |
parent | rgw: amend http client manager interface (diff) | |
download | ceph-bd0ca81cdec8c648a3ebfac949ccbbd623ff3589.tar.xz ceph-bd0ca81cdec8c648a3ebfac949ccbbd623ff3589.zip |
rgw: initial work for integrating streaming read/write with cr
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r-- | src/rgw/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/rgw/rgw_coroutine.cc | 11 | ||||
-rw-r--r-- | src/rgw/rgw_coroutine.h | 6 | ||||
-rw-r--r-- | src/rgw/rgw_cr_rest.cc | 235 | ||||
-rw-r--r-- | src/rgw/rgw_cr_rest.h | 67 | ||||
-rw-r--r-- | src/rgw/rgw_http_client.cc | 12 | ||||
-rw-r--r-- | src/rgw/rgw_http_client.h | 2 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.h | 6 |
8 files changed, 337 insertions, 3 deletions
diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 772b72c8d57..5ac5ea8bff8 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -89,6 +89,7 @@ set(rgw_a_srcs rgw_reshard.cc rgw_coroutine.cc rgw_cr_rados.cc + rgw_cr_rest.cc rgw_object_expirer_core.cc rgw_op.cc rgw_os_lib.cc diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 0a8b4c270a1..88e4c607131 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -452,6 +452,12 @@ void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *s context_stacks.insert(stack); } +void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag) +{ + RWLock::WLocker wl(lock); + cr->set_sleeping(flag); +} + int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks) { int ret = 0; @@ -811,6 +817,11 @@ void RGWCoroutine::wakeup() stack->wakeup(); } +RGWCoroutinesEnv *RGWCoroutine::get_env() const +{ + return stack->get_env(); +} + void RGWCoroutine::dump(Formatter *f) const { if (!description.str().empty()) { encode_json("description", description.str(), f); diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 9ccba4b15c3..d3006b59a14 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -282,6 +282,8 @@ public: return stack; } + RGWCoroutinesEnv *get_env() const; + void dump(Formatter *f) const; }; @@ -463,7 +465,7 @@ public: bool unblock_stack(RGWCoroutinesStack **s); - RGWCoroutinesEnv *get_env() { return env; } + RGWCoroutinesEnv *get_env() const { return env; } void dump(Formatter *f) const; }; @@ -561,6 +563,8 @@ public: void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); RGWCoroutinesStack *allocate_stack(); + void set_sleeping(RGWCoroutine *cr, bool flag); + virtual string get_id(); void dump(Formatter *f) const; }; diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc new file mode 100644 index 00000000000..fa5d99ed8d2 --- /dev/null +++ b/src/rgw/rgw_cr_rest.cc @@ -0,0 +1,235 @@ +#include "rgw_cr_rest.h" + +#include "rgw_coroutine.h" + +// re-include our assert to clobber the system one; fix dout: +#include "include/assert.h" + +#include <boost/asio/yield.hpp> + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +class RGWCRHTTPGetDataCB : public RGWGetDataCB { + Mutex lock; + RGWCoroutinesEnv *env; + RGWCoroutine *cr; + bufferlist data; +public: + RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr) {} + + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { + { + Mutex::Locker l(lock); + if (bl_len == bl.length()) { + data.claim_append(bl); + } else { + bl.splice(0, bl_len, &data); + } + } + + env->manager->io_complete(cr); + return 0; + } + + void claim_data(bufferlist *dest, uint64_t max) { + Mutex::Locker l(lock); + + if (data.length() == 0) { + return; + } + + if (data.length() < max) { + max = data.length(); + } + + data.splice(0, max, dest); + } + + bool has_data() { + return (data.length() > 0); + } +}; + + +RGWStreamRWHTTPResourceCRF::~RGWStreamRWHTTPResourceCRF() +{ + delete in_cb; +} + +int RGWStreamRWHTTPResourceCRF::init() +{ + in_cb = new RGWCRHTTPGetDataCB(env, caller); + + req->set_user_info(env->stack); + req->set_in_cb(in_cb); + + int r = http_manager->add_request(req); + if (r < 0) { + return r; + } + + return 0; +} + +int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) +{ + reenter(&read_state) { + while (!req->is_done()) { + *io_pending = true; + if (!in_cb->has_data()) { + yield caller->io_block(); + } + *io_pending = false; + in_cb->claim_data(out, max_size); + if (!req->is_done()) { + yield; + } + } + } + return 0; +} + +int RGWStreamRWHTTPResourceCRF::write(bufferlist& data) +{ +#warning write need to throttle and block + reenter(&write_state) { + while (!req->is_done()) { + yield req->add_send_data(data); + } + } + return 0; +} + +TestCR::TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), + req(_req) {} +TestCR::~TestCR() { + delete crf; +} + +int TestCR::operate() { + reenter(this) { + crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, req); + + { + int ret = crf->init(); + if (ret < 0) { + return set_cr_error(ret); + } + } + + do { + + bl.clear(); + + do { + yield { + ret = crf->read(&bl, 4 * 1024 * 1024, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } + } + } while (need_retry); + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + + dout(0) << "read " << bl.length() << " bytes" << dendl; + + if (bl.length() == 0) { + break; + } + + yield { + ret = crf->write(bl); + if (ret < 0) { + return set_cr_error(ret); + } + } + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + + dout(0) << "wrote " << bl.length() << " bytes" << dendl; + } while (true); + + return set_cr_done(); + } + return 0; +} + +TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, + RGWHTTPStreamRWRequest *_in_req, + RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), + in_req(_in_req), out_req(_out_req) {} +TestSpliceCR::~TestSpliceCR() { + delete in_crf; + delete out_crf; +} + +int TestSpliceCR::operate() { + reenter(this) { + in_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, in_req); + out_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, out_req); + + { + int ret = in_crf->init(); + if (ret < 0) { + return set_cr_error(ret); + } + } + + { + int ret = out_crf->init(); + if (ret < 0) { + return set_cr_error(ret); + } + } + + do { + + bl.clear(); + + do { + yield { + ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } + } + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + } while (need_retry); + + dout(0) << "read " << bl.length() << " bytes" << dendl; + + if (bl.length() == 0) { + break; + } + + yield { + ret = out_crf->write(bl); + if (ret < 0) { + return set_cr_error(ret); + } + } + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + + dout(0) << "wrote " << bl.length() << " bytes" << dendl; + } while (true); + + return set_cr_done(); + } + return 0; +} diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index b47d7da1553..8d35b327a18 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -306,4 +306,71 @@ public: } }; +class RGWCRHTTPGetDataCB; + +class RGWStreamRWHTTPResourceCRF { + RGWCoroutinesEnv *env; + RGWCoroutine *caller; + RGWHTTPManager *http_manager; + + RGWHTTPStreamRWRequest *req; + + RGWCRHTTPGetDataCB *in_cb{nullptr}; + + boost::asio::coroutine read_state; + boost::asio::coroutine write_state; + + +public: + RGWStreamRWHTTPResourceCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWHTTPManager *_http_manager, + RGWHTTPStreamRWRequest *_req) : env(_env), + caller(_caller), + http_manager(_http_manager), + req(_req) {} + ~RGWStreamRWHTTPResourceCRF(); + + int init(); + int read(bufferlist *data, uint64_t max, bool *need_retry); /* reentrant */ + int write(bufferlist& data); /* reentrant */ +}; + +class TestCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + string url; + RGWHTTPStreamRWRequest *req{nullptr}; + RGWStreamRWHTTPResourceCRF *crf{nullptr}; + bufferlist bl; + bool need_retry{false}; + int ret{0}; +public: + TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req); + ~TestCR(); + + int operate(); +}; + +class TestSpliceCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + string url; + RGWHTTPStreamRWRequest *in_req{nullptr}; + RGWHTTPStreamRWRequest *out_req{nullptr}; + RGWStreamRWHTTPResourceCRF *in_crf{nullptr}; + RGWStreamRWHTTPResourceCRF *out_crf{nullptr}; + bufferlist bl; + bool need_retry{false}; + int ret{0}; +public: + TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, + RGWHTTPStreamRWRequest *_in_req, + RGWHTTPStreamRWRequest *_out_req); + ~TestSpliceCR(); + + int operate(); +}; + #endif diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index ccea463d2ec..9867bcdf312 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -90,7 +90,12 @@ struct rgw_http_req_data : public RefCountedObject { cond.Signal(); } + bool _is_done() { + return done; + } + bool is_done() { + Mutex::Locker l(lock); return done; } @@ -526,6 +531,11 @@ int RGWHTTPClient::init_request(rgw_http_req_data *_req_data, bool send_data_hin return 0; } +bool RGWHTTPClient::is_done() +{ + return req_data->is_done(); +} + /* * wait for async request to complete */ @@ -862,7 +872,7 @@ void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) if (req_data->curl_handle) { curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle()); } - if (!req_data->is_done()) { + if (!req_data->_is_done()) { _finish_request(req_data, -ECANCELED); } } diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index dcc78d19904..c43b018e27d 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -145,6 +145,8 @@ public: int process(); int wait(); + bool is_done(); + rgw_http_req_data *get_req_data() { return req_data; } string to_str(); diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 51a7ec4310b..990ca976912 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -70,7 +70,7 @@ public: class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest { Mutex lock; Mutex write_lock; - RGWGetDataCB *cb; + RGWGetDataCB *cb{nullptr}; bufferlist outbl; bufferlist in_data; size_t chunk_ofs{0}; @@ -85,6 +85,10 @@ public: int send_data(void *ptr, size_t len, bool *pause) override; int receive_data(void *ptr, size_t len) override; + RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, + param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), + lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") { + } RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) { |