diff options
author | Jason Dillaman <dillaman@redhat.com> | 2020-12-10 15:07:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-10 15:07:16 +0100 |
commit | 6f4e3b0dd1f4df7165353b821aefeb13411bef48 (patch) | |
tree | d8a2b1e3c5944b2490da34c78988ff78c0fb902a /src/tools/immutable_object_cache | |
parent | Merge pull request #38435 from votdev/issue_48449_test_standby (diff) | |
parent | tools: add throttle mechanism to immutable object cache (diff) | |
download | ceph-6f4e3b0dd1f4df7165353b821aefeb13411bef48.tar.xz ceph-6f4e3b0dd1f4df7165353b821aefeb13411bef48.zip |
Merge pull request #36551 from CongMinYin/immutable_object_cache_throttle
tools: add throttle mechanism to immutable object cache
Reviewed-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/tools/immutable_object_cache')
-rw-r--r-- | src/tools/immutable_object_cache/CacheClient.cc | 7 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/CacheClient.h | 2 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/CacheController.cc | 5 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/ObjectCacheStore.cc | 165 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/ObjectCacheStore.h | 24 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/Types.cc | 28 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/Types.h | 23 |
7 files changed, 221 insertions, 33 deletions
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index d378992719c..e4ed6cb0ed0 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -113,12 +113,13 @@ namespace immutable_obj_cache { } void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, - uint64_t snap_id, std::string oid, + uint64_t snap_id, uint64_t object_size, + std::string oid, CacheGenContextURef&& on_finish) { ldout(m_cct, 20) << dendl; ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, - ++m_sequence_id, 0, 0, - pool_id, snap_id, oid, pool_nspace); + ++m_sequence_id, 0, 0, pool_id, + snap_id, object_size, oid, pool_nspace); req->process_msg = std::move(on_finish); req->encode(); diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h index e1e9a65927d..b2f74963125 100644 --- a/src/tools/immutable_object_cache/CacheClient.h +++ b/src/tools/immutable_object_cache/CacheClient.h @@ -32,7 +32,7 @@ class CacheClient { int connect(); void connect(Context* on_finish); void lookup_object(std::string pool_nspace, uint64_t pool_id, - uint64_t snap_id, std::string oid, + uint64_t snap_id, uint64_t object_size, std::string oid, CacheGenContextURef&& on_finish); int register_client(Context* on_finish); diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc index 1fade8f84b4..ae16368392e 100644 --- a/src/tools/immutable_object_cache/CacheController.cc +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -25,7 +25,6 @@ CacheController::~CacheController() { int CacheController::init() { ldout(m_cct, 20) << dendl; - m_object_cache_store = new ObjectCacheStore(m_cct); // TODO(dehao): make this configurable int r = m_object_cache_store->init(true); @@ -118,8 +117,8 @@ void CacheController::handle_request(CacheSession* session, bool return_dne_path = session->client_version().empty(); int ret = m_object_cache_store->lookup_object( req_read_data->pool_namespace, req_read_data->pool_id, - req_read_data->snap_id, req_read_data->oid, return_dne_path, - cache_path); + req_read_data->snap_id, req_read_data->object_size, + req_read_data->oid, return_dne_path, cache_path); ObjectCacheRequest* reply = nullptr; if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) { reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq); diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc index cee1ca2b65d..18f64250ff7 100644 --- a/src/tools/immutable_object_cache/ObjectCacheStore.cc +++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc @@ -21,6 +21,30 @@ namespace fs = std::experimental::filesystem; namespace ceph { namespace immutable_obj_cache { +namespace { + +class SafeTimerSingleton : public SafeTimer { +public: + ceph::mutex lock = ceph::make_mutex + ("ceph::immutable_object_cache::SafeTimerSingleton::lock"); + + explicit SafeTimerSingleton(CephContext *cct) + : SafeTimer(cct, lock, true) { + init(); + } + ~SafeTimerSingleton() { + std::lock_guard locker{lock}; + shutdown(); + } +}; + +} // anonymous namespace + +enum ThrottleTargetCode { + ROC_QOS_IOPS_THROTTLE = 1, + ROC_QOS_BPS_THROTTLE = 2 +}; + ObjectCacheStore::ObjectCacheStore(CephContext *cct) : m_cct(cct), m_rados(new librados::Rados()) { @@ -40,12 +64,44 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct) uint64_t max_inflight_ops = m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops"); + uint64_t limit = 0; + if ((limit = m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_iops_limit")) != 0) { + apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE, + m_cct->_conf.get_val<std::chrono::milliseconds> + ("immutable_object_cache_qos_schedule_tick_min"), + limit, + m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_iops_burst"), + m_cct->_conf.get_val<std::chrono::seconds> + ("immutable_object_cache_qos_iops_burst_seconds")); + } + if ((limit = m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_bps_limit")) != 0) { + apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE, + m_cct->_conf.get_val<std::chrono::milliseconds> + ("immutable_object_cache_qos_schedule_tick_min"), + limit, + m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_bps_burst"), + m_cct->_conf.get_val<std::chrono::seconds> + ("immutable_object_cache_qos_bps_burst_seconds")); + } + m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops, cache_watermark); } ObjectCacheStore::~ObjectCacheStore() { delete m_policy; + if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) { + ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr); + delete m_throttles[ROC_QOS_IOPS_THROTTLE]; + } + if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) { + ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr); + delete m_throttles[ROC_QOS_BPS_THROTTLE]; + } } int ObjectCacheStore::init(bool reset) { @@ -97,9 +153,8 @@ int ObjectCacheStore::init_cache() { return 0; } -int ObjectCacheStore::do_promote(std::string pool_nspace, - uint64_t pool_id, uint64_t snap_id, - std::string object_name) { +int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string object_name) { ldout(m_cct, 20) << "to promote object: " << object_name << " from pool id: " << pool_id << " namespace: " << pool_nspace @@ -183,8 +238,8 @@ int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, return ret; } -int ObjectCacheStore::lookup_object(std::string pool_nspace, - uint64_t pool_id, uint64_t snap_id, +int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, uint64_t object_size, std::string object_name, bool return_dne_path, std::string& target_cache_file_path) { @@ -198,9 +253,13 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace, switch (ret) { case OBJ_CACHE_NONE: { - pret = do_promote(pool_nspace, pool_id, snap_id, object_name); - if (pret < 0) { - lderr(m_cct) << "fail to start promote" << dendl; + if (take_token_from_throttle(object_size, 1)) { + pret = do_promote(pool_nspace, pool_id, snap_id, object_name); + if (pret < 0) { + lderr(m_cct) << "fail to start promote" << dendl; + } + } else { + m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); } return ret; } @@ -307,5 +366,95 @@ std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name, return m_cache_root_dir + cache_file_dir + cache_file_name; } +void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) { + m_io_throttled = false; + std::lock_guard lock(m_throttle_lock); + if (type & ROC_QOS_IOPS_THROTTLE){ + m_iops_tokens += tokens; + } else if (type & ROC_QOS_BPS_THROTTLE){ + m_bps_tokens += tokens; + } else { + lderr(m_cct) << "unknow throttle type." << dendl; + } +} + +bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size, + uint64_t object_num) { + if (m_io_throttled == true) { + return false; + } + + int flag = 0; + bool wait = false; + if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) { + std::lock_guard lock(m_throttle_lock); + if (object_num > m_iops_tokens) { + wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this, + &ObjectCacheStore::handle_throttle_ready, object_num, + ROC_QOS_IOPS_THROTTLE); + } else { + m_iops_tokens -= object_num; + flag = 1; + } + } + if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) { + std::lock_guard lock(m_throttle_lock); + if (object_size > m_bps_tokens) { + wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this, + &ObjectCacheStore::handle_throttle_ready, object_size, + ROC_QOS_BPS_THROTTLE); + } else { + m_bps_tokens -= object_size; + } + } + + if (wait) { + m_io_throttled = true; + // when passing iops throttle, but limit in bps throttle, recovery + if (flag == 1) { + std::lock_guard lock(m_throttle_lock); + m_iops_tokens += object_num; + } + } + + return !wait; +} + +static const std::map<uint64_t, std::string> THROTTLE_FLAGS = { + { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" }, + { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" } +}; + +void ObjectCacheStore::apply_qos_tick_and_limit( + const uint64_t flag, + std::chrono::milliseconds min_tick, + uint64_t limit, + uint64_t burst, + std::chrono::seconds burst_seconds) { + SafeTimerSingleton* safe_timer_singleton = nullptr; + TokenBucketThrottle* throttle = nullptr; + safe_timer_singleton = + &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>( + "tools::immutable_object_cache", false, m_cct); + SafeTimer* timer = safe_timer_singleton; + ceph::mutex* timer_lock = &safe_timer_singleton->lock; + m_qos_enabled_flag |= flag; + auto throttle_flags_it = THROTTLE_FLAGS.find(flag); + ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end()); + throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second, + 0, 0, timer, timer_lock); + throttle->set_schedule_tick_min(min_tick.count()); + int ret = throttle->set_limit(limit, burst, burst_seconds.count()); + if (ret < 0) { + lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: " + << "burst(" << burst << ") is less than " + << "limit(" << limit << ")" << dendl; + throttle->set_limit(limit, 0, 1); + } + + ceph_assert(m_throttles.find(flag) == m_throttles.end()); + m_throttles.insert({flag, throttle}); +} + } // namespace immutable_obj_cache } // namespace ceph diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.h b/src/tools/immutable_object_cache/ObjectCacheStore.h index 270a93be452..607921320ae 100644 --- a/src/tools/immutable_object_cache/ObjectCacheStore.h +++ b/src/tools/immutable_object_cache/ObjectCacheStore.h @@ -6,6 +6,8 @@ #include "common/ceph_context.h" #include "common/ceph_mutex.h" +#include "common/Throttle.h" +#include "common/Cond.h" #include "include/rados/librados.hpp" #include "SimplePolicy.h" @@ -30,11 +32,16 @@ class ObjectCacheStore { int init_cache(); int lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, + uint64_t object_size, std::string object_name, bool return_dne_path, std::string& target_cache_file_path); - private: + enum ThrottleTypeCode { + THROTTLE_CODE_BYTE, + THROTTLE_CODE_OBJECT + }; + std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, std::string oid); std::string get_cache_file_path(std::string cache_file_name, @@ -48,6 +55,13 @@ class ObjectCacheStore { int handle_promote_callback(int, bufferlist*, std::string); int do_evict(std::string cache_file); + bool take_token_from_throttle(uint64_t object_size, uint64_t object_num); + void handle_throttle_ready(uint64_t tokens, uint64_t type); + void apply_qos_tick_and_limit(const uint64_t flag, + std::chrono::milliseconds min_tick, + uint64_t limit, uint64_t burst, + std::chrono::seconds burst_seconds); + CephContext *m_cct; RadosRef m_rados; std::map<uint64_t, librados::IoCtx> m_ioctx_map; @@ -55,6 +69,14 @@ class ObjectCacheStore { ceph::make_mutex("ceph::cache::ObjectCacheStore::m_ioctx_map_lock"); Policy* m_policy; std::string m_cache_root_dir; + // throttle mechanism + uint64_t m_qos_enabled_flag{0}; + std::map<uint64_t, TokenBucketThrottle*> m_throttles; + bool m_io_throttled{false}; + ceph::mutex m_throttle_lock = + ceph::make_mutex("ceph::cache::ObjectCacheStore::m_throttle_lock");; + uint64_t m_iops_tokens{0}; + uint64_t m_bps_tokens{0}; }; } // namespace immutable_obj_cache diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc index e65e94bee1e..860017d6aae 100644 --- a/src/tools/immutable_object_cache/Types.cc +++ b/src/tools/immutable_object_cache/Types.cc @@ -17,7 +17,7 @@ ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s) ObjectCacheRequest::~ObjectCacheRequest() {} void ObjectCacheRequest::encode() { - ENCODE_START(1, 1, payload); + ENCODE_START(2, 1, payload); ceph::encode(type, payload); ceph::encode(seq, payload); if (!payload_empty()) { @@ -28,11 +28,11 @@ void ObjectCacheRequest::encode() { void ObjectCacheRequest::decode(bufferlist& bl) { auto i = bl.cbegin(); - DECODE_START(1, i); + DECODE_START(2, i); ceph::decode(type, i); ceph::decode(seq, i); if (!payload_empty()) { - decode_payload(i); + decode_payload(i, struct_v); } DECODE_FINISH(i); } @@ -52,7 +52,8 @@ void ObjectCacheRegData::encode_payload() { ceph::encode(version, payload); } -void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) { +void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { if (i.end()) { return; } @@ -67,17 +68,19 @@ ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {} void ObjectCacheRegReplyData::encode_payload() {} -void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {} +void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) {} ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id, uint64_t snap_id, + uint64_t object_size, std::string oid, std::string pool_namespace) : ObjectCacheRequest(t, s), read_offset(read_offset), read_len(read_len), pool_id(pool_id), snap_id(snap_id), - oid(oid), pool_namespace(pool_namespace) + object_size(object_size), oid(oid), pool_namespace(pool_namespace) {} ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s) @@ -92,15 +95,20 @@ void ObjectCacheReadData::encode_payload() { ceph::encode(snap_id, payload); ceph::encode(oid, payload); ceph::encode(pool_namespace, payload); + ceph::encode(object_size, payload); } -void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) { +void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { ceph::decode(read_offset, i); ceph::decode(read_len, i); ceph::decode(pool_id, i); ceph::decode(snap_id, i); ceph::decode(oid, i); ceph::decode(pool_namespace, i); + if (encode_version >= 2) { + ceph::decode(object_size, i); + } } ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, @@ -115,7 +123,8 @@ void ObjectCacheReadReplyData::encode_payload() { ceph::encode(cache_path, payload); } -void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) { +void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { ceph::decode(cache_path, i); } @@ -127,7 +136,8 @@ ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {} void ObjectCacheReadRadosData::encode_payload() {} -void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {} +void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) {} ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) { ObjectCacheRequest* req = nullptr; diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index 5fab1ec4897..05394d84307 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -50,7 +50,8 @@ class ObjectCacheRequest { bufferlist get_payload_bufferlist() { return payload; } virtual void encode_payload() = 0; - virtual void decode_payload(bufferlist::const_iterator bl_it) = 0; + virtual void decode_payload(bufferlist::const_iterator bl_it, + __u8 encode_version) = 0; virtual uint16_t get_request_type() = 0; virtual bool payload_empty() = 0; }; @@ -63,7 +64,8 @@ class ObjectCacheRegData : public ObjectCacheRequest { ObjectCacheRegData(uint16_t t, uint64_t s); ~ObjectCacheRegData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_REGISTER; } bool payload_empty() override { return false; } }; @@ -74,7 +76,8 @@ class ObjectCacheRegReplyData : public ObjectCacheRequest { ObjectCacheRegReplyData(uint16_t t, uint64_t s); ~ObjectCacheRegReplyData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator iter) override; + void decode_payload(bufferlist::const_iterator iter, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; } bool payload_empty() override { return true; } }; @@ -85,16 +88,18 @@ class ObjectCacheReadData : public ObjectCacheRequest { uint64_t read_len; uint64_t pool_id; uint64_t snap_id; + uint64_t object_size = 0; std::string oid; std::string pool_namespace; ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id, - uint64_t snap_id, std::string oid, - std::string pool_namespace); + uint64_t snap_id, uint64_t object_size, + std::string oid, std::string pool_namespace); ObjectCacheReadData(uint16_t t, uint64_t s); ~ObjectCacheReadData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_READ; } bool payload_empty() override { return false; } }; @@ -106,7 +111,8 @@ class ObjectCacheReadReplyData : public ObjectCacheRequest { ObjectCacheReadReplyData(uint16_t t, uint64_t s); ~ObjectCacheReadReplyData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_READ_REPLY; } bool payload_empty() override { return false; } }; @@ -117,7 +123,8 @@ class ObjectCacheReadRadosData : public ObjectCacheRequest { ObjectCacheReadRadosData(uint16_t t, uint64_t s); ~ObjectCacheReadRadosData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_READ_RADOS; } bool payload_empty() override { return true; } }; |