summaryrefslogtreecommitdiffstats
path: root/src/common/Throttle.h
diff options
context:
space:
mode:
authorDongsheng Yang <dongsheng.yang@easystack.cn>2017-08-03 09:21:23 +0200
committerDongsheng Yang <dongsheng.yang@easystack.cn>2017-11-08 03:34:17 +0100
commit8366ebceb54c138ff33523e467ae655d6c0fc194 (patch)
tree65026b820983a2dd5a2f31e693f9a586ecd9f495 /src/common/Throttle.h
parentMerge pull request #18493 from liewegas/wip-unclean-degraded (diff)
downloadceph-8366ebceb54c138ff33523e467ae655d6c0fc194.tar.xz
ceph-8366ebceb54c138ff33523e467ae655d6c0fc194.zip
Throttle: add a new TokenBucketThrottle
Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
Diffstat (limited to 'src/common/Throttle.h')
-rw-r--r--src/common/Throttle.h73
1 files changed, 73 insertions, 0 deletions
diff --git a/src/common/Throttle.h b/src/common/Throttle.h
index 162e597a655..9081c006b77 100644
--- a/src/common/Throttle.h
+++ b/src/common/Throttle.h
@@ -13,6 +13,7 @@
#include <mutex>
#include "include/Context.h"
+#include "common/Timer.h"
#include "common/convenience.h"
#include "common/perf_counters.h"
@@ -330,4 +331,76 @@ private:
uint32_t waiters = 0;
};
+
+class TokenBucketThrottle {
+
+ struct Bucket {
+ CephContext *cct;
+ const std::string name;
+ std::atomic<uint64_t> remain = { 0 }, max = { 0 };
+
+ Bucket(CephContext *cct, const std::string& n, uint64_t m)
+ : cct(cct), name(n),
+ remain(m), max(m)
+ {
+ }
+
+ uint64_t get(uint64_t c);
+ uint64_t put(uint64_t c);
+ void set_max(uint64_t m);
+ };
+
+ struct Blocker {
+ uint64_t tokens_requested;
+ Context *ctx;
+
+ Blocker(uint64_t _tokens_requested, Context* _ctx)
+ : tokens_requested(_tokens_requested), ctx(_ctx) {}
+ };
+
+ CephContext *m_cct;
+ Bucket m_throttle;
+ uint64_t m_avg = 0;
+ SafeTimer *m_timer;
+ Mutex *m_timer_lock;
+ FunctionContext *m_token_ctx = nullptr;
+ list<Blocker> m_blockers;
+ Mutex m_lock;
+
+public:
+ TokenBucketThrottle(CephContext *cct, uint64_t capacity, uint64_t avg,
+ SafeTimer *timer, Mutex *timer_lock);
+
+ ~TokenBucketThrottle();
+
+ template <typename T, typename I, void(T::*MF)(int, I*)>
+ bool get(uint64_t c, T *handler, I *item) {
+ if (0 == m_throttle.max)
+ return false;
+
+ bool waited = false;
+
+ Mutex::Locker lock(m_lock);
+ uint64_t got = m_throttle.get(c);
+ if (got < c) {
+ // Not enough tokens, add a blocker for it.
+ Context *ctx = new FunctionContext([this, handler, item](int r) {
+ (handler->*MF)(r, item);
+ });
+ m_blockers.emplace_back(c - got, ctx);
+ waited = true;
+ }
+ return waited;
+ }
+
+
+ void set_max(uint64_t m);
+ void set_average(uint64_t avg);
+
+private:
+ void add_tokens();
+ void schedule_timer();
+ void cancel_timer();
+};
+
#endif