summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@redhat.com>2017-11-02 01:25:02 +0100
committerYehuda Sadeh <yehuda@redhat.com>2018-04-10 17:05:39 +0200
commit174d268e70a85987da17c4319ede075858972845 (patch)
treef815f610807919515f6c6761f07f8f930d26e7cc
parentrgw: aws sync: configurable multipart threshold, part size (diff)
downloadceph-174d268e70a85987da17c4319ede075858972845.tar.xz
ceph-174d268e70a85987da17c4319ede075858972845.zip
rgw: cr: introduce io channels
ios can have multiple channels, so that we can differentiate between waiting on read/write/control events. We can then block on a specific, any, or multiple events. Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Diffstat (limited to '')
-rw-r--r--src/rgw/rgw_coroutine.cc50
-rw-r--r--src/rgw/rgw_coroutine.h52
-rw-r--r--src/rgw/rgw_cr_rest.cc11
-rw-r--r--src/rgw/rgw_cr_rest.h2
-rw-r--r--src/rgw/rgw_http_client.cc13
-rw-r--r--src/rgw/rgw_http_client.h45
-rw-r--r--src/rgw/rgw_rest_conn.h16
7 files changed, 124 insertions, 65 deletions
diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc
index 852fee41f38..ecc30fad808 100644
--- a/src/rgw/rgw_coroutine.cc
+++ b/src/rgw/rgw_coroutine.cc
@@ -34,7 +34,7 @@ RGWCompletionManager::~RGWCompletionManager()
timer.shutdown();
}
-void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info)
+void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
{
Mutex::Locker l(lock);
_complete(cn, io_id, user_info);
@@ -56,7 +56,7 @@ void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifi
}
}
-void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info)
+void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
{
if (cn) {
cns.erase(cn);
@@ -121,7 +121,7 @@ void RGWCompletionManager::_wakeup(void *opaque)
if (iter != waiters.end()) {
void *user_id = iter->second;
waiters.erase(iter);
- _complete(NULL, -1 /* no IO id */, user_id);
+ _complete(NULL, rgw_io_id() /* no IO id */, user_id);
}
}
@@ -145,6 +145,10 @@ void RGWCoroutine::set_sleeping(bool flag) {
}
int RGWCoroutine::io_block(int ret, int64_t io_id) {
+ return io_block(ret, rgw_io_id{io_id, -1});
+}
+
+int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
if (stack->consume_io_finish(io_id)) {
return 0;
}
@@ -153,7 +157,7 @@ int RGWCoroutine::io_block(int ret, int64_t io_id) {
return ret;
}
-void RGWCoroutine::io_complete(int64_t io_id) {
+void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
stack->io_complete(io_id);
}
@@ -311,7 +315,7 @@ void RGWCoroutinesStack::wakeup()
completion_mgr->wakeup((void *)this);
}
-void RGWCoroutinesStack::io_complete(int64_t io_id)
+void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
{
RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
completion_mgr->complete(nullptr, io_id, (void *)this);
@@ -414,7 +418,7 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
((RGWAioCompletionNotifier *)arg)->cb();
}
-RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data) : completion_mgr(_mgr),
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
io_id(_io_id),
user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
c = librados::Rados::aio_create_completion((void *)this, NULL,
@@ -472,27 +476,40 @@ void RGWCoroutinesStack::dump(Formatter *f) const {
void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
{
io_provider->set_io_user_info((void *)this);
- io_provider->set_io_id(env->manager->get_next_io_id());
+ io_provider->assign_io(env->manager->get_io_id_provider());
}
-bool RGWCoroutinesStack::try_io_unblock(int64_t io_id)
+bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
{
if (!can_io_unblock(io_id)) {
- io_finish_ids.insert(io_id);
+#warning io_finish_ids needs to be cleaned up when owning stack finishes
+ auto p = io_finish_ids.emplace(io_id.id, io_id);
+ auto& iter = p.first;
+ bool inserted = p.second;
+ if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
+ iter->second.channels |= io_id.channels;
+ }
return false;
}
return true;
}
-bool RGWCoroutinesStack::consume_io_finish(int64_t io_id)
+bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
{
- auto iter = io_finish_ids.find(io_id);
+ auto iter = io_finish_ids.find(io_id.id);
if (iter == io_finish_ids.end()) {
return false;
}
- io_finish_ids.erase(iter);
- return true;
+ int finish_mask = iter->second.channels;
+ bool found = (finish_mask & io_id.channels) != 0;
+
+ finish_mask &= ~(finish_mask & io_id.channels);
+
+ if (finish_mask == 0) {
+ io_finish_ids.erase(iter);
+ }
+ return found;
}
@@ -546,7 +563,7 @@ void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
cr->set_sleeping(flag);
}
-void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, int64_t io_id)
+void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
{
RWLock::WLocker wl(lock);
cr->io_complete(io_id);
@@ -576,6 +593,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
env.scheduled_stacks = &scheduled_stacks;
for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
+ RGWCompletionManager::io_completion io;
RGWCoroutinesStack *stack = *iter;
++iter;
scheduled_stacks.pop_front();
@@ -647,7 +665,6 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
stack->run_count = 0;
}
- RGWCompletionManager::io_completion io;
while (completion_mgr->try_get_next(&io)) {
handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
}
@@ -741,7 +758,8 @@ int RGWCoroutinesManager::run(RGWCoroutine *op)
RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
{
- RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, get_next_io_id(), (void *)stack);
+ rgw_io_id io_id{get_next_io_id(), -1};
+ RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
completion_mgr->register_completion_notifier(cn);
return cn;
}
diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h
index 25dc55660ea..a7b2797264c 100644
--- a/src/rgw/rgw_coroutine.h
+++ b/src/rgw/rgw_coroutine.h
@@ -39,7 +39,7 @@ class RGWCompletionManager : public RefCountedObject {
CephContext *cct;
struct io_completion {
- int64_t io_id;
+ rgw_io_id io_id;
void *user_info;
};
list<io_completion> complete_reqs;
@@ -59,12 +59,12 @@ class RGWCompletionManager : public RefCountedObject {
protected:
void _wakeup(void *opaque);
- void _complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info);
+ void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
public:
RGWCompletionManager(CephContext *_cct);
~RGWCompletionManager() override;
- void complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info);
+ void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
int get_next(io_completion *io);
bool try_get_next(io_completion *io);
@@ -84,13 +84,13 @@ public:
class RGWAioCompletionNotifier : public RefCountedObject {
librados::AioCompletion *c;
RGWCompletionManager *completion_mgr;
- int64_t io_id;
+ rgw_io_id io_id;
void *user_data;
Mutex lock;
bool registered;
public:
- RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data);
+ RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
~RGWAioCompletionNotifier() override {
c->release();
lock.Lock();
@@ -293,10 +293,17 @@ public:
void dump(Formatter *f) const;
- void init_new_io(RGWIOProvider *io_provider);
+ void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
- int io_block(int ret = 0, int64_t io_id = -1);
- void io_complete(int64_t io_id = -1);
+ int io_block(int ret = 0) {
+ return io_block(ret, -1);
+ }
+ int io_block(int ret, int64_t io_id);
+ int io_block(int ret, const rgw_io_id& io_id);
+ void io_complete() {
+ io_complete(rgw_io_id{});
+ }
+ void io_complete(const rgw_io_id& io_id);
};
ostream& operator<<(ostream& out, const RGWCoroutine& cr);
@@ -367,8 +374,8 @@ class RGWCoroutinesStack : public RefCountedObject {
set<RGWCoroutinesStack *> blocked_by_stack;
set<RGWCoroutinesStack *> blocking_stacks;
- set<int64_t> io_finish_ids;
- int64_t io_blocked_id{-1};
+ map<int64_t, rgw_io_id> io_finish_ids;
+ rgw_io_id io_blocked_id;
bool done_flag;
bool error_flag;
@@ -409,18 +416,18 @@ public:
void set_io_blocked(bool flag) {
blocked_flag = flag;
}
- void set_io_blocked_id(int64_t io_id) {
+ void set_io_blocked_id(const rgw_io_id& io_id) {
io_blocked_id = io_id;
}
bool is_io_blocked() {
return blocked_flag && !done_flag;
}
- bool can_io_unblock(int64_t io_id) {
- return (io_blocked_id == io_id) ||
- (io_blocked_id < 0);
+ bool can_io_unblock(const rgw_io_id& io_id) {
+ return ((io_blocked_id.id < 0) ||
+ io_blocked_id.intersects(io_id));
}
- bool try_io_unblock(int64_t io_id);
- bool consume_io_finish(int64_t io_id);
+ bool try_io_unblock(const rgw_io_id& io_id);
+ bool consume_io_finish(const rgw_io_id& io_id);
void set_interval_wait(bool flag) {
interval_wait_flag = flag;
}
@@ -461,7 +468,10 @@ public:
int wait(const utime_t& interval);
void wakeup();
- void io_complete(int64_t io_id = -1);
+ void io_complete() {
+ io_complete(rgw_io_id{});
+ }
+ void io_complete(const rgw_io_id& io_id);
bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
@@ -542,6 +552,8 @@ class RGWCoroutinesManager {
RWLock lock;
+ RGWIOIDProvider io_id_provider;
+
void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
RGWCompletionManager::io_completion& io, int *waiting_count);
protected:
@@ -590,10 +602,14 @@ public:
int64_t get_next_io_id();
void set_sleeping(RGWCoroutine *cr, bool flag);
- void io_complete(RGWCoroutine *cr, int64_t io_id = -1);
+ void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
virtual string get_id();
void dump(Formatter *f) const;
+
+ RGWIOIDProvider& get_io_id_provider() {
+ return io_id_provider;
+ }
};
class RGWSimpleCoroutine : public RGWCoroutine {
diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc
index 3c0b597fb97..f489c239ad2 100644
--- a/src/rgw/rgw_cr_rest.cc
+++ b/src/rgw/rgw_cr_rest.cc
@@ -14,12 +14,12 @@ class RGWCRHTTPGetDataCB : public RGWGetDataCB {
Mutex lock;
RGWCoroutinesEnv *env;
RGWCoroutine *cr;
- int64_t io_id;
+ rgw_io_id io_id;
bufferlist data;
bufferlist extra_data;
bool got_all_extra_data{false};
public:
- RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
+ RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
{
@@ -86,7 +86,7 @@ int RGWStreamReadHTTPResourceCRF::init()
{
env->stack->init_new_io(req);
- in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id());
+ in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ));
req->set_in_cb(in_cb);
@@ -134,11 +134,12 @@ int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers,
int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
+ io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
while (!req->is_done() ||
in_cb->has_data()) {
*io_pending = true;
if (!in_cb->has_data()) {
- yield caller->io_block(0, req->get_io_id());
+ yield caller->io_block(0, io_read_mask);
}
got_attrs = true;
if (need_extra_data() && !got_extra_data) {
@@ -202,7 +203,7 @@ int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry)
yield req->finish_write();
*need_retry = !req->is_done();
while (!req->is_done()) {
- yield caller->io_block(0, req->get_io_id());
+ yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
*need_retry = !req->is_done();
}
diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h
index 005ed10cbac..c8bf4763ae5 100644
--- a/src/rgw/rgw_cr_rest.h
+++ b/src/rgw/rgw_cr_rest.h
@@ -368,6 +368,8 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
bool got_attrs{false};
bool got_extra_data{false};
+ rgw_io_id io_read_mask;
+
protected:
rgw_rest_obj rest_obj;
diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc
index 4fdf635e515..757c6a5c7bf 100644
--- a/src/rgw/rgw_http_client.cc
+++ b/src/rgw/rgw_http_client.cc
@@ -34,7 +34,7 @@ struct rgw_http_req_data : public RefCountedObject {
int ret{0};
std::atomic<bool> done = { false };
RGWHTTPClient *client{nullptr};
- int64_t io_id{-1};
+ rgw_io_id control_io_id;
void *user_info{nullptr};
bool registered{false};
RGWHTTPManager *mgr{nullptr};
@@ -268,6 +268,13 @@ void rgw_release_all_curl_handles()
delete handles;
}
+void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
+{
+ if (id == 0) {
+ id = io_id_provider.get_next();
+ }
+}
+
/*
* the simple set of callbacks will be called on RGWHTTPClient::process()
*/
@@ -828,7 +835,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
req_data->mgr = nullptr;
}
if (completion_mgr) {
- completion_mgr->complete(NULL, req_data->io_id, req_data->user_info);
+ completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
}
req_data->put();
@@ -949,7 +956,7 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, bool send_data_hint)
req_data->mgr = this;
req_data->client = client;
- req_data->io_id = client->get_io_id();
+ req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
req_data->user_info = client->get_io_user_info();
register_request(req_data);
diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h
index b5a8e32143b..ad1301531fa 100644
--- a/src/rgw/rgw_http_client.h
+++ b/src/rgw/rgw_http_client.h
@@ -20,14 +20,42 @@ void rgw_http_client_cleanup();
struct rgw_http_req_data;
class RGWHTTPManager;
+class RGWIOIDProvider
+{
+ std::atomic<int64_t> max = {0};
+
+public:
+ RGWIOIDProvider() {}
+ int64_t get_next() {
+ return ++max;
+ }
+};
+
+struct rgw_io_id {
+ int64_t id{0};
+ int channels{0};
+
+ rgw_io_id() {}
+ rgw_io_id(int64_t _id, int _channels) : id(_id), channels(_channels) {}
+
+ bool intersects(const rgw_io_id& rhs) {
+ return (id == rhs.id && ((channels | rhs.channels) != 0));
+ }
+};
+
class RGWIOProvider
{
+ int64_t id{-1};
+
public:
RGWIOProvider() {}
- virtual void set_io_id(int64_t _io_id) = 0;
+ void assign_io(RGWIOIDProvider& io_id_provider, int io_type = -1);
+ rgw_io_id get_io_id(int io_type) {
+ return rgw_io_id{id, io_type};
+ }
+
virtual void set_io_user_info(void *_user_info) = 0;
- virtual int64_t get_io_id() = 0;
virtual void *get_io_user_info() = 0;
};
@@ -41,7 +69,6 @@ class RGWHTTPClient : public RGWIOProvider
bool has_send_len;
long http_status;
- int64_t io_id{-1};
void *user_info{nullptr};
rgw_http_req_data *req_data;
@@ -113,6 +140,10 @@ public:
static const long HTTP_STATUS_UNAUTHORIZED = 401;
static const long HTTP_STATUS_NOTFOUND = 404;
+ static constexpr int HTTPCLIENT_IO_READ = 0x1;
+ static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
+ static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
+
virtual ~RGWHTTPClient();
explicit RGWHTTPClient(CephContext *cct,
const string& _method,
@@ -164,18 +195,10 @@ public:
method = _method;
}
- void set_io_id(int64_t _io_id) override {
- io_id = _io_id;
- }
-
void set_io_user_info(void *_user_info) override {
user_info = _user_info;
}
- int64_t get_io_id() override {
- return io_id;
- }
-
void *get_io_user_info() override {
return user_info;
}
diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h
index aa91c65faef..537077bb83f 100644
--- a/src/rgw/rgw_rest_conn.h
+++ b/src/rgw/rgw_rest_conn.h
@@ -217,12 +217,8 @@ public:
param_vec_t *extra_headers,
RGWHTTPManager *_mgr);
- void set_io_id(int64_t _io_id) override {
- req.set_io_id(_io_id);
- }
-
- int64_t get_io_id() override {
- return req.get_io_id();
+ rgw_io_id get_io_id(int io_type) {
+ return req.get_io_id(io_type);
}
void set_io_user_info(void *user_info) override {
@@ -343,12 +339,8 @@ public:
param_vec_t *extra_headers,
RGWHTTPManager *_mgr);
- void set_io_id(int64_t _io_id) override {
- req.set_io_id(_io_id);
- }
-
- int64_t get_io_id() override {
- return req.get_io_id();
+ rgw_io_id get_io_id(int io_type) {
+ return req.get_io_id(io_type);
}
void set_io_user_info(void *user_info) override {