diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2017-11-03 23:57:56 +0100 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2018-04-10 17:05:39 +0200 |
commit | b2143cded0e971361cdb089db19a6f69ce5b74dd (patch) | |
tree | 0babe999188a1083a84db4375eab1e061ee96c4c /src/rgw/rgw_rest_client.h | |
parent | rgw: cr rest splice, work towards write throttling (diff) | |
download | ceph-b2143cded0e971361cdb089db19a6f69ce5b74dd.tar.xz ceph-b2143cded0e971361cdb089db19a6f69ce5b74dd.zip |
rgw: rest_client: work towards throttling of http read requests
Adjust the interfaces to provide the ability for the read callback
to pause the reads. While doing that, define a new class interface
for this instead of RGWGetDataCB. This had a butterfly effect that
required modifications to the obj read filters, but the end result
is a bit cleaner.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Diffstat (limited to 'src/rgw/rgw_rest_client.h')
-rw-r--r-- | src/rgw/rgw_rest_client.h | 33 |
1 files changed, 25 insertions, 8 deletions
diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 964aa0dff7c..816c87474ed 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -49,7 +49,7 @@ public: } int receive_header(void *ptr, size_t len) override; - int receive_data(void *ptr, size_t len) override; + int receive_data(void *ptr, size_t len, bool *pause) override; int send_data(void *ptr, size_t len) override; bufferlist& get_response() { return response; } @@ -78,15 +78,20 @@ public: class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest { +public: + class ReceiveCB; + +private: Mutex lock; Mutex write_lock; - RGWGetDataCB *cb{nullptr}; + ReceiveCB *cb{nullptr}; RGWWriteDrainCB *write_drain_cb{nullptr}; bufferlist outbl; bufferlist in_data; size_t chunk_ofs{0}; size_t ofs{0}; uint64_t write_ofs{0}; + bool read_paused{false}; bool send_paused{false}; bool stream_writes{false}; bool write_stream_complete{false}; @@ -94,13 +99,25 @@ protected: int handle_header(const string& name, const string& val) override; public: int send_data(void *ptr, size_t len, bool *pause) override; - int receive_data(void *ptr, size_t len) override; + int receive_data(void *ptr, size_t len, bool *pause) override; + + class ReceiveCB { + protected: + uint64_t extra_data_len{0}; + public: + ReceiveCB() = default; + virtual ~ReceiveCB() = default; + virtual int handle_data(bufferlist& bl, bool *pause = nullptr) = 0; + virtual void set_extra_data_len(uint64_t len) { + extra_data_len = len; + } + }; RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") { } - RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb, + RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) { } @@ -110,7 +127,7 @@ public: outbl.swap(_outbl); } - void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; } + void set_in_cb(ReceiveCB *_cb) { cb = _cb; } void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; } void add_send_data(bufferlist& bl); @@ -126,7 +143,7 @@ public: class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest { bool send_data_hint{false}; public: - RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb, + RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWHTTPStreamRWRequest::ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) { } virtual ~RGWRESTStreamRWRequest() override {} @@ -145,13 +162,13 @@ public: class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest { public: - RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers, + RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "GET", _url, _cb, _headers, _params) {} }; class RGWRESTStreamHeadRequest : public RGWRESTStreamRWRequest { public: - RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers, + RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {} }; |