diff options
author | Yehuda Sadeh <yehuda@redhat.com> | 2017-11-02 01:25:02 +0100 |
---|---|---|
committer | Yehuda Sadeh <yehuda@redhat.com> | 2018-04-10 17:05:39 +0200 |
commit | 174d268e70a85987da17c4319ede075858972845 (patch) | |
tree | f815f610807919515f6c6761f07f8f930d26e7cc | |
parent | rgw: aws sync: configurable multipart threshold, part size (diff) | |
download | ceph-174d268e70a85987da17c4319ede075858972845.tar.xz ceph-174d268e70a85987da17c4319ede075858972845.zip |
rgw: cr: introduce io channels
ios can have multiple channels, so that we can differentiate between
waiting on read/write/control events. We can then block on a specific,
any, or multiple events.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Diffstat (limited to '')
-rw-r--r-- | src/rgw/rgw_coroutine.cc | 50 | ||||
-rw-r--r-- | src/rgw/rgw_coroutine.h | 52 | ||||
-rw-r--r-- | src/rgw/rgw_cr_rest.cc | 11 | ||||
-rw-r--r-- | src/rgw/rgw_cr_rest.h | 2 | ||||
-rw-r--r-- | src/rgw/rgw_http_client.cc | 13 | ||||
-rw-r--r-- | src/rgw/rgw_http_client.h | 45 | ||||
-rw-r--r-- | src/rgw/rgw_rest_conn.h | 16 |
7 files changed, 124 insertions, 65 deletions
diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 852fee41f38..ecc30fad808 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -34,7 +34,7 @@ RGWCompletionManager::~RGWCompletionManager() timer.shutdown(); } -void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info) +void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info) { Mutex::Locker l(lock); _complete(cn, io_id, user_info); @@ -56,7 +56,7 @@ void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifi } } -void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info) +void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info) { if (cn) { cns.erase(cn); @@ -121,7 +121,7 @@ void RGWCompletionManager::_wakeup(void *opaque) if (iter != waiters.end()) { void *user_id = iter->second; waiters.erase(iter); - _complete(NULL, -1 /* no IO id */, user_id); + _complete(NULL, rgw_io_id() /* no IO id */, user_id); } } @@ -145,6 +145,10 @@ void RGWCoroutine::set_sleeping(bool flag) { } int RGWCoroutine::io_block(int ret, int64_t io_id) { + return io_block(ret, rgw_io_id{io_id, -1}); +} + +int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) { if (stack->consume_io_finish(io_id)) { return 0; } @@ -153,7 +157,7 @@ int RGWCoroutine::io_block(int ret, int64_t io_id) { return ret; } -void RGWCoroutine::io_complete(int64_t io_id) { +void RGWCoroutine::io_complete(const rgw_io_id& io_id) { stack->io_complete(io_id); } @@ -311,7 +315,7 @@ void RGWCoroutinesStack::wakeup() completion_mgr->wakeup((void *)this); } -void RGWCoroutinesStack::io_complete(int64_t io_id) +void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id) { RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); completion_mgr->complete(nullptr, io_id, (void *)this); @@ -414,7 +418,7 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) ((RGWAioCompletionNotifier *)arg)->cb(); } -RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data) : completion_mgr(_mgr), +RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr), io_id(_io_id), user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) { c = librados::Rados::aio_create_completion((void *)this, NULL, @@ -472,27 +476,40 @@ void RGWCoroutinesStack::dump(Formatter *f) const { void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider) { io_provider->set_io_user_info((void *)this); - io_provider->set_io_id(env->manager->get_next_io_id()); + io_provider->assign_io(env->manager->get_io_id_provider()); } -bool RGWCoroutinesStack::try_io_unblock(int64_t io_id) +bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id) { if (!can_io_unblock(io_id)) { - io_finish_ids.insert(io_id); +#warning io_finish_ids needs to be cleaned up when owning stack finishes + auto p = io_finish_ids.emplace(io_id.id, io_id); + auto& iter = p.first; + bool inserted = p.second; + if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */ + iter->second.channels |= io_id.channels; + } return false; } return true; } -bool RGWCoroutinesStack::consume_io_finish(int64_t io_id) +bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id) { - auto iter = io_finish_ids.find(io_id); + auto iter = io_finish_ids.find(io_id.id); if (iter == io_finish_ids.end()) { return false; } - io_finish_ids.erase(iter); - return true; + int finish_mask = iter->second.channels; + bool found = (finish_mask & io_id.channels) != 0; + + finish_mask &= ~(finish_mask & io_id.channels); + + if (finish_mask == 0) { + io_finish_ids.erase(iter); + } + return found; } @@ -546,7 +563,7 @@ void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag) cr->set_sleeping(flag); } -void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, int64_t io_id) +void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id) { RWLock::WLocker wl(lock); cr->io_complete(io_id); @@ -576,6 +593,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks) env.scheduled_stacks = &scheduled_stacks; for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) { + RGWCompletionManager::io_completion io; RGWCoroutinesStack *stack = *iter; ++iter; scheduled_stacks.pop_front(); @@ -647,7 +665,6 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks) stack->run_count = 0; } - RGWCompletionManager::io_completion io; while (completion_mgr->try_get_next(&io)) { handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count); } @@ -741,7 +758,8 @@ int RGWCoroutinesManager::run(RGWCoroutine *op) RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack) { - RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, get_next_io_id(), (void *)stack); + rgw_io_id io_id{get_next_io_id(), -1}; + RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack); completion_mgr->register_completion_notifier(cn); return cn; } diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 25dc55660ea..a7b2797264c 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -39,7 +39,7 @@ class RGWCompletionManager : public RefCountedObject { CephContext *cct; struct io_completion { - int64_t io_id; + rgw_io_id io_id; void *user_info; }; list<io_completion> complete_reqs; @@ -59,12 +59,12 @@ class RGWCompletionManager : public RefCountedObject { protected: void _wakeup(void *opaque); - void _complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info); + void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info); public: RGWCompletionManager(CephContext *_cct); ~RGWCompletionManager() override; - void complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info); + void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info); int get_next(io_completion *io); bool try_get_next(io_completion *io); @@ -84,13 +84,13 @@ public: class RGWAioCompletionNotifier : public RefCountedObject { librados::AioCompletion *c; RGWCompletionManager *completion_mgr; - int64_t io_id; + rgw_io_id io_id; void *user_data; Mutex lock; bool registered; public: - RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data); + RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data); ~RGWAioCompletionNotifier() override { c->release(); lock.Lock(); @@ -293,10 +293,17 @@ public: void dump(Formatter *f) const; - void init_new_io(RGWIOProvider *io_provider); + void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */ - int io_block(int ret = 0, int64_t io_id = -1); - void io_complete(int64_t io_id = -1); + int io_block(int ret = 0) { + return io_block(ret, -1); + } + int io_block(int ret, int64_t io_id); + int io_block(int ret, const rgw_io_id& io_id); + void io_complete() { + io_complete(rgw_io_id{}); + } + void io_complete(const rgw_io_id& io_id); }; ostream& operator<<(ostream& out, const RGWCoroutine& cr); @@ -367,8 +374,8 @@ class RGWCoroutinesStack : public RefCountedObject { set<RGWCoroutinesStack *> blocked_by_stack; set<RGWCoroutinesStack *> blocking_stacks; - set<int64_t> io_finish_ids; - int64_t io_blocked_id{-1}; + map<int64_t, rgw_io_id> io_finish_ids; + rgw_io_id io_blocked_id; bool done_flag; bool error_flag; @@ -409,18 +416,18 @@ public: void set_io_blocked(bool flag) { blocked_flag = flag; } - void set_io_blocked_id(int64_t io_id) { + void set_io_blocked_id(const rgw_io_id& io_id) { io_blocked_id = io_id; } bool is_io_blocked() { return blocked_flag && !done_flag; } - bool can_io_unblock(int64_t io_id) { - return (io_blocked_id == io_id) || - (io_blocked_id < 0); + bool can_io_unblock(const rgw_io_id& io_id) { + return ((io_blocked_id.id < 0) || + io_blocked_id.intersects(io_id)); } - bool try_io_unblock(int64_t io_id); - bool consume_io_finish(int64_t io_id); + bool try_io_unblock(const rgw_io_id& io_id); + bool consume_io_finish(const rgw_io_id& io_id); void set_interval_wait(bool flag) { interval_wait_flag = flag; } @@ -461,7 +468,10 @@ public: int wait(const utime_t& interval); void wakeup(); - void io_complete(int64_t io_id = -1); + void io_complete() { + io_complete(rgw_io_id{}); + } + void io_complete(const rgw_io_id& io_id); bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ @@ -542,6 +552,8 @@ class RGWCoroutinesManager { RWLock lock; + RGWIOIDProvider io_id_provider; + void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCompletionManager::io_completion& io, int *waiting_count); protected: @@ -590,10 +602,14 @@ public: int64_t get_next_io_id(); void set_sleeping(RGWCoroutine *cr, bool flag); - void io_complete(RGWCoroutine *cr, int64_t io_id = -1); + void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id); virtual string get_id(); void dump(Formatter *f) const; + + RGWIOIDProvider& get_io_id_provider() { + return io_id_provider; + } }; class RGWSimpleCoroutine : public RGWCoroutine { diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 3c0b597fb97..f489c239ad2 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -14,12 +14,12 @@ class RGWCRHTTPGetDataCB : public RGWGetDataCB { Mutex lock; RGWCoroutinesEnv *env; RGWCoroutine *cr; - int64_t io_id; + rgw_io_id io_id; bufferlist data; bufferlist extra_data; bool got_all_extra_data{false}; public: - RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {} + RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {} int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { { @@ -86,7 +86,7 @@ int RGWStreamReadHTTPResourceCRF::init() { env->stack->init_new_io(req); - in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id()); + in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ)); req->set_in_cb(in_cb); @@ -134,11 +134,12 @@ int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) { reenter(&read_state) { + io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL); while (!req->is_done() || in_cb->has_data()) { *io_pending = true; if (!in_cb->has_data()) { - yield caller->io_block(0, req->get_io_id()); + yield caller->io_block(0, io_read_mask); } got_attrs = true; if (need_extra_data() && !got_extra_data) { @@ -202,7 +203,7 @@ int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry) yield req->finish_write(); *need_retry = !req->is_done(); while (!req->is_done()) { - yield caller->io_block(0, req->get_io_id()); + yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); *need_retry = !req->is_done(); } diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 005ed10cbac..c8bf4763ae5 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -368,6 +368,8 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { bool got_attrs{false}; bool got_extra_data{false}; + rgw_io_id io_read_mask; + protected: rgw_rest_obj rest_obj; diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 4fdf635e515..757c6a5c7bf 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -34,7 +34,7 @@ struct rgw_http_req_data : public RefCountedObject { int ret{0}; std::atomic<bool> done = { false }; RGWHTTPClient *client{nullptr}; - int64_t io_id{-1}; + rgw_io_id control_io_id; void *user_info{nullptr}; bool registered{false}; RGWHTTPManager *mgr{nullptr}; @@ -268,6 +268,13 @@ void rgw_release_all_curl_handles() delete handles; } +void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type) +{ + if (id == 0) { + id = io_id_provider.get_next(); + } +} + /* * the simple set of callbacks will be called on RGWHTTPClient::process() */ @@ -828,7 +835,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) req_data->mgr = nullptr; } if (completion_mgr) { - completion_mgr->complete(NULL, req_data->io_id, req_data->user_info); + completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info); } req_data->put(); @@ -949,7 +956,7 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, bool send_data_hint) req_data->mgr = this; req_data->client = client; - req_data->io_id = client->get_io_id(); + req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL); req_data->user_info = client->get_io_user_info(); register_request(req_data); diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index b5a8e32143b..ad1301531fa 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -20,14 +20,42 @@ void rgw_http_client_cleanup(); struct rgw_http_req_data; class RGWHTTPManager; +class RGWIOIDProvider +{ + std::atomic<int64_t> max = {0}; + +public: + RGWIOIDProvider() {} + int64_t get_next() { + return ++max; + } +}; + +struct rgw_io_id { + int64_t id{0}; + int channels{0}; + + rgw_io_id() {} + rgw_io_id(int64_t _id, int _channels) : id(_id), channels(_channels) {} + + bool intersects(const rgw_io_id& rhs) { + return (id == rhs.id && ((channels | rhs.channels) != 0)); + } +}; + class RGWIOProvider { + int64_t id{-1}; + public: RGWIOProvider() {} - virtual void set_io_id(int64_t _io_id) = 0; + void assign_io(RGWIOIDProvider& io_id_provider, int io_type = -1); + rgw_io_id get_io_id(int io_type) { + return rgw_io_id{id, io_type}; + } + virtual void set_io_user_info(void *_user_info) = 0; - virtual int64_t get_io_id() = 0; virtual void *get_io_user_info() = 0; }; @@ -41,7 +69,6 @@ class RGWHTTPClient : public RGWIOProvider bool has_send_len; long http_status; - int64_t io_id{-1}; void *user_info{nullptr}; rgw_http_req_data *req_data; @@ -113,6 +140,10 @@ public: static const long HTTP_STATUS_UNAUTHORIZED = 401; static const long HTTP_STATUS_NOTFOUND = 404; + static constexpr int HTTPCLIENT_IO_READ = 0x1; + static constexpr int HTTPCLIENT_IO_WRITE = 0x2; + static constexpr int HTTPCLIENT_IO_CONTROL = 0x4; + virtual ~RGWHTTPClient(); explicit RGWHTTPClient(CephContext *cct, const string& _method, @@ -164,18 +195,10 @@ public: method = _method; } - void set_io_id(int64_t _io_id) override { - io_id = _io_id; - } - void set_io_user_info(void *_user_info) override { user_info = _user_info; } - int64_t get_io_id() override { - return io_id; - } - void *get_io_user_info() override { return user_info; } diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index aa91c65faef..537077bb83f 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -217,12 +217,8 @@ public: param_vec_t *extra_headers, RGWHTTPManager *_mgr); - void set_io_id(int64_t _io_id) override { - req.set_io_id(_io_id); - } - - int64_t get_io_id() override { - return req.get_io_id(); + rgw_io_id get_io_id(int io_type) { + return req.get_io_id(io_type); } void set_io_user_info(void *user_info) override { @@ -343,12 +339,8 @@ public: param_vec_t *extra_headers, RGWHTTPManager *_mgr); - void set_io_id(int64_t _io_id) override { - req.set_io_id(_io_id); - } - - int64_t get_io_id() override { - return req.get_io_id(); + rgw_io_id get_io_id(int io_type) { + return req.get_io_id(io_type); } void set_io_user_info(void *user_info) override { |