summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2017-09-04 14:15:11 +0200
committerYehuda Sadeh <yehuda@redhat.com>2018-04-10 17:05:38 +0200
commitbd0ca81cdec8c648a3ebfac949ccbbd623ff3589 (patch)
tree21efa1aa7e352e1268f73941e1d20d2739e5babe
parentrgw: amend http client manager interface (diff)
downloadceph-bd0ca81cdec8c648a3ebfac949ccbbd623ff3589.tar.xz
ceph-bd0ca81cdec8c648a3ebfac949ccbbd623ff3589.zip
rgw: initial work for integrating streaming read/write with cr
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
-rw-r--r--src/rgw/CMakeLists.txt1
-rw-r--r--src/rgw/rgw_coroutine.cc11
-rw-r--r--src/rgw/rgw_coroutine.h6
-rw-r--r--src/rgw/rgw_cr_rest.cc235
-rw-r--r--src/rgw/rgw_cr_rest.h67
-rw-r--r--src/rgw/rgw_http_client.cc12
-rw-r--r--src/rgw/rgw_http_client.h2
-rw-r--r--src/rgw/rgw_rest_client.h6
8 files changed, 337 insertions, 3 deletions
diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt
index 772b72c8d57..5ac5ea8bff8 100644
--- a/src/rgw/CMakeLists.txt
+++ b/src/rgw/CMakeLists.txt
@@ -89,6 +89,7 @@ set(rgw_a_srcs
rgw_reshard.cc
rgw_coroutine.cc
rgw_cr_rados.cc
+ rgw_cr_rest.cc
rgw_object_expirer_core.cc
rgw_op.cc
rgw_os_lib.cc
diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc
index 0a8b4c270a1..88e4c607131 100644
--- a/src/rgw/rgw_coroutine.cc
+++ b/src/rgw/rgw_coroutine.cc
@@ -452,6 +452,12 @@ void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *s
context_stacks.insert(stack);
}
+void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
+{
+ RWLock::WLocker wl(lock);
+ cr->set_sleeping(flag);
+}
+
int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
{
int ret = 0;
@@ -811,6 +817,11 @@ void RGWCoroutine::wakeup()
stack->wakeup();
}
+RGWCoroutinesEnv *RGWCoroutine::get_env() const
+{
+ return stack->get_env();
+}
+
void RGWCoroutine::dump(Formatter *f) const {
if (!description.str().empty()) {
encode_json("description", description.str(), f);
diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h
index 9ccba4b15c3..d3006b59a14 100644
--- a/src/rgw/rgw_coroutine.h
+++ b/src/rgw/rgw_coroutine.h
@@ -282,6 +282,8 @@ public:
return stack;
}
+ RGWCoroutinesEnv *get_env() const;
+
void dump(Formatter *f) const;
};
@@ -463,7 +465,7 @@ public:
bool unblock_stack(RGWCoroutinesStack **s);
- RGWCoroutinesEnv *get_env() { return env; }
+ RGWCoroutinesEnv *get_env() const { return env; }
void dump(Formatter *f) const;
};
@@ -561,6 +563,8 @@ public:
void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
RGWCoroutinesStack *allocate_stack();
+ void set_sleeping(RGWCoroutine *cr, bool flag);
+
virtual string get_id();
void dump(Formatter *f) const;
};
diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc
new file mode 100644
index 00000000000..fa5d99ed8d2
--- /dev/null
+++ b/src/rgw/rgw_cr_rest.cc
@@ -0,0 +1,235 @@
+#include "rgw_cr_rest.h"
+
+#include "rgw_coroutine.h"
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+class RGWCRHTTPGetDataCB : public RGWGetDataCB {
+ Mutex lock;
+ RGWCoroutinesEnv *env;
+ RGWCoroutine *cr;
+ bufferlist data;
+public:
+ RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr) {}
+
+ int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+ {
+ Mutex::Locker l(lock);
+ if (bl_len == bl.length()) {
+ data.claim_append(bl);
+ } else {
+ bl.splice(0, bl_len, &data);
+ }
+ }
+
+ env->manager->io_complete(cr);
+ return 0;
+ }
+
+ void claim_data(bufferlist *dest, uint64_t max) {
+ Mutex::Locker l(lock);
+
+ if (data.length() == 0) {
+ return;
+ }
+
+ if (data.length() < max) {
+ max = data.length();
+ }
+
+ data.splice(0, max, dest);
+ }
+
+ bool has_data() {
+ return (data.length() > 0);
+ }
+};
+
+
+RGWStreamRWHTTPResourceCRF::~RGWStreamRWHTTPResourceCRF()
+{
+ delete in_cb;
+}
+
+int RGWStreamRWHTTPResourceCRF::init()
+{
+ in_cb = new RGWCRHTTPGetDataCB(env, caller);
+
+ req->set_user_info(env->stack);
+ req->set_in_cb(in_cb);
+
+ int r = http_manager->add_request(req);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
+{
+ reenter(&read_state) {
+ while (!req->is_done()) {
+ *io_pending = true;
+ if (!in_cb->has_data()) {
+ yield caller->io_block();
+ }
+ *io_pending = false;
+ in_cb->claim_data(out, max_size);
+ if (!req->is_done()) {
+ yield;
+ }
+ }
+ }
+ return 0;
+}
+
+int RGWStreamRWHTTPResourceCRF::write(bufferlist& data)
+{
+#warning write need to throttle and block
+ reenter(&write_state) {
+ while (!req->is_done()) {
+ yield req->add_send_data(data);
+ }
+ }
+ return 0;
+}
+
+TestCR::TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+ req(_req) {}
+TestCR::~TestCR() {
+ delete crf;
+}
+
+int TestCR::operate() {
+ reenter(this) {
+ crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, req);
+
+ {
+ int ret = crf->init();
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ }
+
+ do {
+
+ bl.clear();
+
+ do {
+ yield {
+ ret = crf->read(&bl, 4 * 1024 * 1024, &need_retry);
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ }
+ } while (need_retry);
+
+ if (retcode < 0) {
+ dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+ return set_cr_error(ret);
+ }
+
+ dout(0) << "read " << bl.length() << " bytes" << dendl;
+
+ if (bl.length() == 0) {
+ break;
+ }
+
+ yield {
+ ret = crf->write(bl);
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ }
+
+ if (retcode < 0) {
+ dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+ return set_cr_error(ret);
+ }
+
+ dout(0) << "wrote " << bl.length() << " bytes" << dendl;
+ } while (true);
+
+ return set_cr_done();
+ }
+ return 0;
+}
+
+TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+ RGWHTTPStreamRWRequest *_in_req,
+ RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+ in_req(_in_req), out_req(_out_req) {}
+TestSpliceCR::~TestSpliceCR() {
+ delete in_crf;
+ delete out_crf;
+}
+
+int TestSpliceCR::operate() {
+ reenter(this) {
+ in_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
+ out_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
+
+ {
+ int ret = in_crf->init();
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ }
+
+ {
+ int ret = out_crf->init();
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ }
+
+ do {
+
+ bl.clear();
+
+ do {
+ yield {
+ ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry);
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ }
+
+ if (retcode < 0) {
+ dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+ return set_cr_error(ret);
+ }
+ } while (need_retry);
+
+ dout(0) << "read " << bl.length() << " bytes" << dendl;
+
+ if (bl.length() == 0) {
+ break;
+ }
+
+ yield {
+ ret = out_crf->write(bl);
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ }
+
+ if (retcode < 0) {
+ dout(0) << __FILE__ << ":" << __LINE__ << " retcode=" << retcode << dendl;
+ return set_cr_error(ret);
+ }
+
+ dout(0) << "wrote " << bl.length() << " bytes" << dendl;
+ } while (true);
+
+ return set_cr_done();
+ }
+ return 0;
+}
diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h
index b47d7da1553..8d35b327a18 100644
--- a/src/rgw/rgw_cr_rest.h
+++ b/src/rgw/rgw_cr_rest.h
@@ -306,4 +306,71 @@ public:
}
};
+class RGWCRHTTPGetDataCB;
+
+class RGWStreamRWHTTPResourceCRF {
+ RGWCoroutinesEnv *env;
+ RGWCoroutine *caller;
+ RGWHTTPManager *http_manager;
+
+ RGWHTTPStreamRWRequest *req;
+
+ RGWCRHTTPGetDataCB *in_cb{nullptr};
+
+ boost::asio::coroutine read_state;
+ boost::asio::coroutine write_state;
+
+
+public:
+ RGWStreamRWHTTPResourceCRF(CephContext *_cct,
+ RGWCoroutinesEnv *_env,
+ RGWCoroutine *_caller,
+ RGWHTTPManager *_http_manager,
+ RGWHTTPStreamRWRequest *_req) : env(_env),
+ caller(_caller),
+ http_manager(_http_manager),
+ req(_req) {}
+ ~RGWStreamRWHTTPResourceCRF();
+
+ int init();
+ int read(bufferlist *data, uint64_t max, bool *need_retry); /* reentrant */
+ int write(bufferlist& data); /* reentrant */
+};
+
+class TestCR : public RGWCoroutine {
+ CephContext *cct;
+ RGWHTTPManager *http_manager;
+ string url;
+ RGWHTTPStreamRWRequest *req{nullptr};
+ RGWStreamRWHTTPResourceCRF *crf{nullptr};
+ bufferlist bl;
+ bool need_retry{false};
+ int ret{0};
+public:
+ TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req);
+ ~TestCR();
+
+ int operate();
+};
+
+class TestSpliceCR : public RGWCoroutine {
+ CephContext *cct;
+ RGWHTTPManager *http_manager;
+ string url;
+ RGWHTTPStreamRWRequest *in_req{nullptr};
+ RGWHTTPStreamRWRequest *out_req{nullptr};
+ RGWStreamRWHTTPResourceCRF *in_crf{nullptr};
+ RGWStreamRWHTTPResourceCRF *out_crf{nullptr};
+ bufferlist bl;
+ bool need_retry{false};
+ int ret{0};
+public:
+ TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+ RGWHTTPStreamRWRequest *_in_req,
+ RGWHTTPStreamRWRequest *_out_req);
+ ~TestSpliceCR();
+
+ int operate();
+};
+
#endif
diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc
index ccea463d2ec..9867bcdf312 100644
--- a/src/rgw/rgw_http_client.cc
+++ b/src/rgw/rgw_http_client.cc
@@ -90,7 +90,12 @@ struct rgw_http_req_data : public RefCountedObject {
cond.Signal();
}
+ bool _is_done() {
+ return done;
+ }
+
bool is_done() {
+ Mutex::Locker l(lock);
return done;
}
@@ -526,6 +531,11 @@ int RGWHTTPClient::init_request(rgw_http_req_data *_req_data, bool send_data_hin
return 0;
}
+bool RGWHTTPClient::is_done()
+{
+ return req_data->is_done();
+}
+
/*
* wait for async request to complete
*/
@@ -862,7 +872,7 @@ void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
if (req_data->curl_handle) {
curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
}
- if (!req_data->is_done()) {
+ if (!req_data->_is_done()) {
_finish_request(req_data, -ECANCELED);
}
}
diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h
index dcc78d19904..c43b018e27d 100644
--- a/src/rgw/rgw_http_client.h
+++ b/src/rgw/rgw_http_client.h
@@ -145,6 +145,8 @@ public:
int process();
int wait();
+ bool is_done();
+
rgw_http_req_data *get_req_data() { return req_data; }
string to_str();
diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h
index 51a7ec4310b..990ca976912 100644
--- a/src/rgw/rgw_rest_client.h
+++ b/src/rgw/rgw_rest_client.h
@@ -70,7 +70,7 @@ public:
class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
Mutex lock;
Mutex write_lock;
- RGWGetDataCB *cb;
+ RGWGetDataCB *cb{nullptr};
bufferlist outbl;
bufferlist in_data;
size_t chunk_ofs{0};
@@ -85,6 +85,10 @@ public:
int send_data(void *ptr, size_t len, bool *pause) override;
int receive_data(void *ptr, size_t len) override;
+ RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url,
+ param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
+ lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") {
+ }
RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) {