summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_rest_client.h
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2017-11-03 23:57:56 +0100
committerYehuda Sadeh <yehuda@redhat.com>2018-04-10 17:05:39 +0200
commitb2143cded0e971361cdb089db19a6f69ce5b74dd (patch)
tree0babe999188a1083a84db4375eab1e061ee96c4c /src/rgw/rgw_rest_client.h
parentrgw: cr rest splice, work towards write throttling (diff)
downloadceph-b2143cded0e971361cdb089db19a6f69ce5b74dd.tar.xz
ceph-b2143cded0e971361cdb089db19a6f69ce5b74dd.zip
rgw: rest_client: work towards throttling of http read requests
Adjust the interfaces to provide the ability for the read callback to pause the reads. While doing that, define a new class interface for this instead of RGWGetDataCB. This had a butterfly effect that required modifications to the obj read filters, but the end result is a bit cleaner. Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Diffstat (limited to 'src/rgw/rgw_rest_client.h')
-rw-r--r--src/rgw/rgw_rest_client.h33
1 files changed, 25 insertions, 8 deletions
diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h
index 964aa0dff7c..816c87474ed 100644
--- a/src/rgw/rgw_rest_client.h
+++ b/src/rgw/rgw_rest_client.h
@@ -49,7 +49,7 @@ public:
}
int receive_header(void *ptr, size_t len) override;
- int receive_data(void *ptr, size_t len) override;
+ int receive_data(void *ptr, size_t len, bool *pause) override;
int send_data(void *ptr, size_t len) override;
bufferlist& get_response() { return response; }
@@ -78,15 +78,20 @@ public:
class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
+public:
+ class ReceiveCB;
+
+private:
Mutex lock;
Mutex write_lock;
- RGWGetDataCB *cb{nullptr};
+ ReceiveCB *cb{nullptr};
RGWWriteDrainCB *write_drain_cb{nullptr};
bufferlist outbl;
bufferlist in_data;
size_t chunk_ofs{0};
size_t ofs{0};
uint64_t write_ofs{0};
+ bool read_paused{false};
bool send_paused{false};
bool stream_writes{false};
bool write_stream_complete{false};
@@ -94,13 +99,25 @@ protected:
int handle_header(const string& name, const string& val) override;
public:
int send_data(void *ptr, size_t len, bool *pause) override;
- int receive_data(void *ptr, size_t len) override;
+ int receive_data(void *ptr, size_t len, bool *pause) override;
+
+ class ReceiveCB {
+ protected:
+ uint64_t extra_data_len{0};
+ public:
+ ReceiveCB() = default;
+ virtual ~ReceiveCB() = default;
+ virtual int handle_data(bufferlist& bl, bool *pause = nullptr) = 0;
+ virtual void set_extra_data_len(uint64_t len) {
+ extra_data_len = len;
+ }
+ };
RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") {
}
- RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
+ RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, ReceiveCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) {
}
@@ -110,7 +127,7 @@ public:
outbl.swap(_outbl);
}
- void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+ void set_in_cb(ReceiveCB *_cb) { cb = _cb; }
void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; }
void add_send_data(bufferlist& bl);
@@ -126,7 +143,7 @@ public:
class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
bool send_data_hint{false};
public:
- RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
+ RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWHTTPStreamRWRequest::ReceiveCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
}
virtual ~RGWRESTStreamRWRequest() override {}
@@ -145,13 +162,13 @@ public:
class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest {
public:
- RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers,
+ RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers,
param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "GET", _url, _cb, _headers, _params) {}
};
class RGWRESTStreamHeadRequest : public RGWRESTStreamRWRequest {
public:
- RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers,
+ RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers,
param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {}
};