diff options
author | Dongsheng Yang <dongsheng.yang@easystack.cn> | 2017-08-03 09:21:23 +0200 |
---|---|---|
committer | Dongsheng Yang <dongsheng.yang@easystack.cn> | 2017-11-08 03:34:17 +0100 |
commit | 8366ebceb54c138ff33523e467ae655d6c0fc194 (patch) | |
tree | 65026b820983a2dd5a2f31e693f9a586ecd9f495 /src/common/Throttle.h | |
parent | Merge pull request #18493 from liewegas/wip-unclean-degraded (diff) | |
download | ceph-8366ebceb54c138ff33523e467ae655d6c0fc194.tar.xz ceph-8366ebceb54c138ff33523e467ae655d6c0fc194.zip |
Throttle: add a new TokenBucketThrottle
Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
Diffstat (limited to 'src/common/Throttle.h')
-rw-r--r-- | src/common/Throttle.h | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/src/common/Throttle.h b/src/common/Throttle.h index 162e597a655..9081c006b77 100644 --- a/src/common/Throttle.h +++ b/src/common/Throttle.h @@ -13,6 +13,7 @@ #include <mutex> #include "include/Context.h" +#include "common/Timer.h" #include "common/convenience.h" #include "common/perf_counters.h" @@ -330,4 +331,76 @@ private: uint32_t waiters = 0; }; + +class TokenBucketThrottle { + + struct Bucket { + CephContext *cct; + const std::string name; + std::atomic<uint64_t> remain = { 0 }, max = { 0 }; + + Bucket(CephContext *cct, const std::string& n, uint64_t m) + : cct(cct), name(n), + remain(m), max(m) + { + } + + uint64_t get(uint64_t c); + uint64_t put(uint64_t c); + void set_max(uint64_t m); + }; + + struct Blocker { + uint64_t tokens_requested; + Context *ctx; + + Blocker(uint64_t _tokens_requested, Context* _ctx) + : tokens_requested(_tokens_requested), ctx(_ctx) {} + }; + + CephContext *m_cct; + Bucket m_throttle; + uint64_t m_avg = 0; + SafeTimer *m_timer; + Mutex *m_timer_lock; + FunctionContext *m_token_ctx = nullptr; + list<Blocker> m_blockers; + Mutex m_lock; + +public: + TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg, + SafeTimer *timer, Mutex *timer_lock); + + ~TokenBucketThrottle(); + + template <typename T, typename I, void(T::*MF)(int, I*)> + bool get(uint64_t c, T *handler, I *item) { + if (0 == m_throttle.max) + return false; + + bool waited = false; + + Mutex::Locker lock(m_lock); + uint64_t got = m_throttle.get(c); + if (got < c) { + // Not enough tokens, add a blocker for it. + Context *ctx = new FunctionContext([this, handler, item](int r) { + (handler->*MF)(r, item); + }); + m_blockers.emplace_back(c - got, ctx); + waited = true; + } + return waited; + } + + + void set_max(uint64_t m); + void set_average(uint64_t avg); + +private: + void add_tokens(); + void schedule_timer(); + void cancel_timer(); +}; + #endif |