diff options
Diffstat (limited to 'src/rgw/rgw_cr_rest.cc')
-rw-r--r-- | src/rgw/rgw_cr_rest.cc | 235 |
1 files changed, 235 insertions, 0 deletions
diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc new file mode 100644 index 00000000000..fa5d99ed8d2 --- /dev/null +++ b/src/rgw/rgw_cr_rest.cc @@ -0,0 +1,235 @@ +#include "rgw_cr_rest.h" + +#include "rgw_coroutine.h" + +// re-include our assert to clobber the system one; fix dout: +#include "include/assert.h" + +#include <boost/asio/yield.hpp> + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +class RGWCRHTTPGetDataCB : public RGWGetDataCB { + Mutex lock; + RGWCoroutinesEnv *env; + RGWCoroutine *cr; + bufferlist data; +public: + RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr) {} + + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { + { + Mutex::Locker l(lock); + if (bl_len == bl.length()) { + data.claim_append(bl); + } else { + bl.splice(0, bl_len, &data); + } + } + + env->manager->io_complete(cr); + return 0; + } + + void claim_data(bufferlist *dest, uint64_t max) { + Mutex::Locker l(lock); + + if (data.length() == 0) { + return; + } + + if (data.length() < max) { + max = data.length(); + } + + data.splice(0, max, dest); + } + + bool has_data() { + return (data.length() > 0); + } +}; + + +RGWStreamRWHTTPResourceCRF::~RGWStreamRWHTTPResourceCRF() +{ + delete in_cb; +} + +int RGWStreamRWHTTPResourceCRF::init() +{ + in_cb = new RGWCRHTTPGetDataCB(env, caller); + + req->set_user_info(env->stack); + req->set_in_cb(in_cb); + + int r = http_manager->add_request(req); + if (r < 0) { + return r; + } + + return 0; +} + +int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) +{ + reenter(&read_state) { + while (!req->is_done()) { + *io_pending = true; + if (!in_cb->has_data()) { + yield caller->io_block(); + } + *io_pending = false; + in_cb->claim_data(out, max_size); + if (!req->is_done()) { + yield; + } + } + } + return 0; +} + +int RGWStreamRWHTTPResourceCRF::write(bufferlist& data) +{ +#warning write need to throttle and block + reenter(&write_state) { + while (!req->is_done()) { + yield req->add_send_data(data); + } + } + return 0; +} + +TestCR::TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), + req(_req) {} +TestCR::~TestCR() { + delete crf; +} + +int TestCR::operate() { + reenter(this) { + crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, req); + + { + int ret = crf->init(); + if (ret < 0) { + return set_cr_error(ret); + } + } + + do { + + bl.clear(); + + do { + yield { + ret = crf->read(&bl, 4 * 1024 * 1024, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } + } + } while (need_retry); + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + + dout(0) << "read " << bl.length() << " bytes" << dendl; + + if (bl.length() == 0) { + break; + } + + yield { + ret = crf->write(bl); + if (ret < 0) { + return set_cr_error(ret); + } + } + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + + dout(0) << "wrote " << bl.length() << " bytes" << dendl; + } while (true); + + return set_cr_done(); + } + return 0; +} + +TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, + RGWHTTPStreamRWRequest *_in_req, + RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), + in_req(_in_req), out_req(_out_req) {} +TestSpliceCR::~TestSpliceCR() { + delete in_crf; + delete out_crf; +} + +int TestSpliceCR::operate() { + reenter(this) { + in_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, in_req); + out_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, out_req); + + { + int ret = in_crf->init(); + if (ret < 0) { + return set_cr_error(ret); + } + } + + { + int ret = out_crf->init(); + if (ret < 0) { + return set_cr_error(ret); + } + } + + do { + + bl.clear(); + + do { + yield { + ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } + } + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + } while (need_retry); + + dout(0) << "read " << bl.length() << " bytes" << dendl; + + if (bl.length() == 0) { + break; + } + + yield { + ret = out_crf->write(bl); + if (ret < 0) { + return set_cr_error(ret); + } + } + + if (retcode < 0) { + dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl; + return set_cr_error(ret); + } + + dout(0) << "wrote " << bl.length() << " bytes" << dendl; + } while (true); + + return set_cr_done(); + } + return 0; +} |