summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2020-12-10 15:07:16 +0100
committerGitHub <noreply@github.com>2020-12-10 15:07:16 +0100
commit6f4e3b0dd1f4df7165353b821aefeb13411bef48 (patch)
treed8a2b1e3c5944b2490da34c78988ff78c0fb902a /src/tools/immutable_object_cache
parentMerge pull request #38435 from votdev/issue_48449_test_standby (diff)
parenttools: add throttle mechanism to immutable object cache (diff)
downloadceph-6f4e3b0dd1f4df7165353b821aefeb13411bef48.tar.xz
ceph-6f4e3b0dd1f4df7165353b821aefeb13411bef48.zip
Merge pull request #36551 from CongMinYin/immutable_object_cache_throttle
tools: add throttle mechanism to immutable object cache Reviewed-by: Jason Dillaman <dillaman@redhat.com>
Diffstat (limited to 'src/tools/immutable_object_cache')
-rw-r--r--src/tools/immutable_object_cache/CacheClient.cc7
-rw-r--r--src/tools/immutable_object_cache/CacheClient.h2
-rw-r--r--src/tools/immutable_object_cache/CacheController.cc5
-rw-r--r--src/tools/immutable_object_cache/ObjectCacheStore.cc165
-rw-r--r--src/tools/immutable_object_cache/ObjectCacheStore.h24
-rw-r--r--src/tools/immutable_object_cache/Types.cc28
-rw-r--r--src/tools/immutable_object_cache/Types.h23
7 files changed, 221 insertions, 33 deletions
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc
index d378992719c..e4ed6cb0ed0 100644
--- a/src/tools/immutable_object_cache/CacheClient.cc
+++ b/src/tools/immutable_object_cache/CacheClient.cc
@@ -113,12 +113,13 @@ namespace immutable_obj_cache {
}
void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
- uint64_t snap_id, std::string oid,
+ uint64_t snap_id, uint64_t object_size,
+ std::string oid,
CacheGenContextURef&& on_finish) {
ldout(m_cct, 20) << dendl;
ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
- ++m_sequence_id, 0, 0,
- pool_id, snap_id, oid, pool_nspace);
+ ++m_sequence_id, 0, 0, pool_id,
+ snap_id, object_size, oid, pool_nspace);
req->process_msg = std::move(on_finish);
req->encode();
diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h
index e1e9a65927d..b2f74963125 100644
--- a/src/tools/immutable_object_cache/CacheClient.h
+++ b/src/tools/immutable_object_cache/CacheClient.h
@@ -32,7 +32,7 @@ class CacheClient {
int connect();
void connect(Context* on_finish);
void lookup_object(std::string pool_nspace, uint64_t pool_id,
- uint64_t snap_id, std::string oid,
+ uint64_t snap_id, uint64_t object_size, std::string oid,
CacheGenContextURef&& on_finish);
int register_client(Context* on_finish);
diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc
index 1fade8f84b4..ae16368392e 100644
--- a/src/tools/immutable_object_cache/CacheController.cc
+++ b/src/tools/immutable_object_cache/CacheController.cc
@@ -25,7 +25,6 @@ CacheController::~CacheController() {
int CacheController::init() {
ldout(m_cct, 20) << dendl;
-
m_object_cache_store = new ObjectCacheStore(m_cct);
// TODO(dehao): make this configurable
int r = m_object_cache_store->init(true);
@@ -118,8 +117,8 @@ void CacheController::handle_request(CacheSession* session,
bool return_dne_path = session->client_version().empty();
int ret = m_object_cache_store->lookup_object(
req_read_data->pool_namespace, req_read_data->pool_id,
- req_read_data->snap_id, req_read_data->oid, return_dne_path,
- cache_path);
+ req_read_data->snap_id, req_read_data->object_size,
+ req_read_data->oid, return_dne_path, cache_path);
ObjectCacheRequest* reply = nullptr;
if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) {
reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc
index cee1ca2b65d..18f64250ff7 100644
--- a/src/tools/immutable_object_cache/ObjectCacheStore.cc
+++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc
@@ -21,6 +21,30 @@ namespace fs = std::experimental::filesystem;
namespace ceph {
namespace immutable_obj_cache {
+namespace {
+
+class SafeTimerSingleton : public SafeTimer {
+public:
+ ceph::mutex lock = ceph::make_mutex
+ ("ceph::immutable_object_cache::SafeTimerSingleton::lock");
+
+ explicit SafeTimerSingleton(CephContext *cct)
+ : SafeTimer(cct, lock, true) {
+ init();
+ }
+ ~SafeTimerSingleton() {
+ std::lock_guard locker{lock};
+ shutdown();
+ }
+};
+
+} // anonymous namespace
+
+enum ThrottleTargetCode {
+ ROC_QOS_IOPS_THROTTLE = 1,
+ ROC_QOS_BPS_THROTTLE = 2
+};
+
ObjectCacheStore::ObjectCacheStore(CephContext *cct)
: m_cct(cct), m_rados(new librados::Rados()) {
@@ -40,12 +64,44 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct)
uint64_t max_inflight_ops =
m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
+ uint64_t limit = 0;
+ if ((limit = m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_iops_limit")) != 0) {
+ apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE,
+ m_cct->_conf.get_val<std::chrono::milliseconds>
+ ("immutable_object_cache_qos_schedule_tick_min"),
+ limit,
+ m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_iops_burst"),
+ m_cct->_conf.get_val<std::chrono::seconds>
+ ("immutable_object_cache_qos_iops_burst_seconds"));
+ }
+ if ((limit = m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_bps_limit")) != 0) {
+ apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE,
+ m_cct->_conf.get_val<std::chrono::milliseconds>
+ ("immutable_object_cache_qos_schedule_tick_min"),
+ limit,
+ m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_bps_burst"),
+ m_cct->_conf.get_val<std::chrono::seconds>
+ ("immutable_object_cache_qos_bps_burst_seconds"));
+ }
+
m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops,
cache_watermark);
}
ObjectCacheStore::~ObjectCacheStore() {
delete m_policy;
+ if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) {
+ ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr);
+ delete m_throttles[ROC_QOS_IOPS_THROTTLE];
+ }
+ if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) {
+ ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr);
+ delete m_throttles[ROC_QOS_BPS_THROTTLE];
+ }
}
int ObjectCacheStore::init(bool reset) {
@@ -97,9 +153,8 @@ int ObjectCacheStore::init_cache() {
return 0;
}
-int ObjectCacheStore::do_promote(std::string pool_nspace,
- uint64_t pool_id, uint64_t snap_id,
- std::string object_name) {
+int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string object_name) {
ldout(m_cct, 20) << "to promote object: " << object_name
<< " from pool id: " << pool_id
<< " namespace: " << pool_nspace
@@ -183,8 +238,8 @@ int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
return ret;
}
-int ObjectCacheStore::lookup_object(std::string pool_nspace,
- uint64_t pool_id, uint64_t snap_id,
+int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, uint64_t object_size,
std::string object_name,
bool return_dne_path,
std::string& target_cache_file_path) {
@@ -198,9 +253,13 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace,
switch (ret) {
case OBJ_CACHE_NONE: {
- pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
- if (pret < 0) {
- lderr(m_cct) << "fail to start promote" << dendl;
+ if (take_token_from_throttle(object_size, 1)) {
+ pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
+ if (pret < 0) {
+ lderr(m_cct) << "fail to start promote" << dendl;
+ }
+ } else {
+ m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
}
return ret;
}
@@ -307,5 +366,95 @@ std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name,
return m_cache_root_dir + cache_file_dir + cache_file_name;
}
+void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) {
+ m_io_throttled = false;
+ std::lock_guard lock(m_throttle_lock);
+ if (type & ROC_QOS_IOPS_THROTTLE){
+ m_iops_tokens += tokens;
+ } else if (type & ROC_QOS_BPS_THROTTLE){
+ m_bps_tokens += tokens;
+ } else {
+ lderr(m_cct) << "unknow throttle type." << dendl;
+ }
+}
+
+bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size,
+ uint64_t object_num) {
+ if (m_io_throttled == true) {
+ return false;
+ }
+
+ int flag = 0;
+ bool wait = false;
+ if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) {
+ std::lock_guard lock(m_throttle_lock);
+ if (object_num > m_iops_tokens) {
+ wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this,
+ &ObjectCacheStore::handle_throttle_ready, object_num,
+ ROC_QOS_IOPS_THROTTLE);
+ } else {
+ m_iops_tokens -= object_num;
+ flag = 1;
+ }
+ }
+ if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) {
+ std::lock_guard lock(m_throttle_lock);
+ if (object_size > m_bps_tokens) {
+ wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this,
+ &ObjectCacheStore::handle_throttle_ready, object_size,
+ ROC_QOS_BPS_THROTTLE);
+ } else {
+ m_bps_tokens -= object_size;
+ }
+ }
+
+ if (wait) {
+ m_io_throttled = true;
+ // when passing iops throttle, but limit in bps throttle, recovery
+ if (flag == 1) {
+ std::lock_guard lock(m_throttle_lock);
+ m_iops_tokens += object_num;
+ }
+ }
+
+ return !wait;
+}
+
+static const std::map<uint64_t, std::string> THROTTLE_FLAGS = {
+ { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" },
+ { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" }
+};
+
+void ObjectCacheStore::apply_qos_tick_and_limit(
+ const uint64_t flag,
+ std::chrono::milliseconds min_tick,
+ uint64_t limit,
+ uint64_t burst,
+ std::chrono::seconds burst_seconds) {
+ SafeTimerSingleton* safe_timer_singleton = nullptr;
+ TokenBucketThrottle* throttle = nullptr;
+ safe_timer_singleton =
+ &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+ "tools::immutable_object_cache", false, m_cct);
+ SafeTimer* timer = safe_timer_singleton;
+ ceph::mutex* timer_lock = &safe_timer_singleton->lock;
+ m_qos_enabled_flag |= flag;
+ auto throttle_flags_it = THROTTLE_FLAGS.find(flag);
+ ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end());
+ throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second,
+ 0, 0, timer, timer_lock);
+ throttle->set_schedule_tick_min(min_tick.count());
+ int ret = throttle->set_limit(limit, burst, burst_seconds.count());
+ if (ret < 0) {
+ lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: "
+ << "burst(" << burst << ") is less than "
+ << "limit(" << limit << ")" << dendl;
+ throttle->set_limit(limit, 0, 1);
+ }
+
+ ceph_assert(m_throttles.find(flag) == m_throttles.end());
+ m_throttles.insert({flag, throttle});
+}
+
} // namespace immutable_obj_cache
} // namespace ceph
diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.h b/src/tools/immutable_object_cache/ObjectCacheStore.h
index 270a93be452..607921320ae 100644
--- a/src/tools/immutable_object_cache/ObjectCacheStore.h
+++ b/src/tools/immutable_object_cache/ObjectCacheStore.h
@@ -6,6 +6,8 @@
#include "common/ceph_context.h"
#include "common/ceph_mutex.h"
+#include "common/Throttle.h"
+#include "common/Cond.h"
#include "include/rados/librados.hpp"
#include "SimplePolicy.h"
@@ -30,11 +32,16 @@ class ObjectCacheStore {
int init_cache();
int lookup_object(std::string pool_nspace,
uint64_t pool_id, uint64_t snap_id,
+ uint64_t object_size,
std::string object_name,
bool return_dne_path,
std::string& target_cache_file_path);
-
private:
+ enum ThrottleTypeCode {
+ THROTTLE_CODE_BYTE,
+ THROTTLE_CODE_OBJECT
+ };
+
std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
uint64_t snap_id, std::string oid);
std::string get_cache_file_path(std::string cache_file_name,
@@ -48,6 +55,13 @@ class ObjectCacheStore {
int handle_promote_callback(int, bufferlist*, std::string);
int do_evict(std::string cache_file);
+ bool take_token_from_throttle(uint64_t object_size, uint64_t object_num);
+ void handle_throttle_ready(uint64_t tokens, uint64_t type);
+ void apply_qos_tick_and_limit(const uint64_t flag,
+ std::chrono::milliseconds min_tick,
+ uint64_t limit, uint64_t burst,
+ std::chrono::seconds burst_seconds);
+
CephContext *m_cct;
RadosRef m_rados;
std::map<uint64_t, librados::IoCtx> m_ioctx_map;
@@ -55,6 +69,14 @@ class ObjectCacheStore {
ceph::make_mutex("ceph::cache::ObjectCacheStore::m_ioctx_map_lock");
Policy* m_policy;
std::string m_cache_root_dir;
+ // throttle mechanism
+ uint64_t m_qos_enabled_flag{0};
+ std::map<uint64_t, TokenBucketThrottle*> m_throttles;
+ bool m_io_throttled{false};
+ ceph::mutex m_throttle_lock =
+ ceph::make_mutex("ceph::cache::ObjectCacheStore::m_throttle_lock");;
+ uint64_t m_iops_tokens{0};
+ uint64_t m_bps_tokens{0};
};
} // namespace immutable_obj_cache
diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc
index e65e94bee1e..860017d6aae 100644
--- a/src/tools/immutable_object_cache/Types.cc
+++ b/src/tools/immutable_object_cache/Types.cc
@@ -17,7 +17,7 @@ ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
ObjectCacheRequest::~ObjectCacheRequest() {}
void ObjectCacheRequest::encode() {
- ENCODE_START(1, 1, payload);
+ ENCODE_START(2, 1, payload);
ceph::encode(type, payload);
ceph::encode(seq, payload);
if (!payload_empty()) {
@@ -28,11 +28,11 @@ void ObjectCacheRequest::encode() {
void ObjectCacheRequest::decode(bufferlist& bl) {
auto i = bl.cbegin();
- DECODE_START(1, i);
+ DECODE_START(2, i);
ceph::decode(type, i);
ceph::decode(seq, i);
if (!payload_empty()) {
- decode_payload(i);
+ decode_payload(i, struct_v);
}
DECODE_FINISH(i);
}
@@ -52,7 +52,8 @@ void ObjectCacheRegData::encode_payload() {
ceph::encode(version, payload);
}
-void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) {
+void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {
if (i.end()) {
return;
}
@@ -67,17 +68,19 @@ ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {}
void ObjectCacheRegReplyData::encode_payload() {}
-void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
+void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) {}
ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
uint64_t read_offset,
uint64_t read_len,
uint64_t pool_id, uint64_t snap_id,
+ uint64_t object_size,
std::string oid,
std::string pool_namespace)
: ObjectCacheRequest(t, s), read_offset(read_offset),
read_len(read_len), pool_id(pool_id), snap_id(snap_id),
- oid(oid), pool_namespace(pool_namespace)
+ object_size(object_size), oid(oid), pool_namespace(pool_namespace)
{}
ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s)
@@ -92,15 +95,20 @@ void ObjectCacheReadData::encode_payload() {
ceph::encode(snap_id, payload);
ceph::encode(oid, payload);
ceph::encode(pool_namespace, payload);
+ ceph::encode(object_size, payload);
}
-void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) {
+void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {
ceph::decode(read_offset, i);
ceph::decode(read_len, i);
ceph::decode(pool_id, i);
ceph::decode(snap_id, i);
ceph::decode(oid, i);
ceph::decode(pool_namespace, i);
+ if (encode_version >= 2) {
+ ceph::decode(object_size, i);
+ }
}
ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s,
@@ -115,7 +123,8 @@ void ObjectCacheReadReplyData::encode_payload() {
ceph::encode(cache_path, payload);
}
-void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) {
+void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {
ceph::decode(cache_path, i);
}
@@ -127,7 +136,8 @@ ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {}
void ObjectCacheReadRadosData::encode_payload() {}
-void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
+void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {}
ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) {
ObjectCacheRequest* req = nullptr;
diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h
index 5fab1ec4897..05394d84307 100644
--- a/src/tools/immutable_object_cache/Types.h
+++ b/src/tools/immutable_object_cache/Types.h
@@ -50,7 +50,8 @@ class ObjectCacheRequest {
bufferlist get_payload_bufferlist() { return payload; }
virtual void encode_payload() = 0;
- virtual void decode_payload(bufferlist::const_iterator bl_it) = 0;
+ virtual void decode_payload(bufferlist::const_iterator bl_it,
+ __u8 encode_version) = 0;
virtual uint16_t get_request_type() = 0;
virtual bool payload_empty() = 0;
};
@@ -63,7 +64,8 @@ class ObjectCacheRegData : public ObjectCacheRequest {
ObjectCacheRegData(uint16_t t, uint64_t s);
~ObjectCacheRegData() override;
void encode_payload() override;
- void decode_payload(bufferlist::const_iterator bl) override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
uint16_t get_request_type() override { return RBDSC_REGISTER; }
bool payload_empty() override { return false; }
};
@@ -74,7 +76,8 @@ class ObjectCacheRegReplyData : public ObjectCacheRequest {
ObjectCacheRegReplyData(uint16_t t, uint64_t s);
~ObjectCacheRegReplyData() override;
void encode_payload() override;
- void decode_payload(bufferlist::const_iterator iter) override;
+ void decode_payload(bufferlist::const_iterator iter,
+ __u8 encode_version) override;
uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; }
bool payload_empty() override { return true; }
};
@@ -85,16 +88,18 @@ class ObjectCacheReadData : public ObjectCacheRequest {
uint64_t read_len;
uint64_t pool_id;
uint64_t snap_id;
+ uint64_t object_size = 0;
std::string oid;
std::string pool_namespace;
ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset,
uint64_t read_len, uint64_t pool_id,
- uint64_t snap_id, std::string oid,
- std::string pool_namespace);
+ uint64_t snap_id, uint64_t object_size,
+ std::string oid, std::string pool_namespace);
ObjectCacheReadData(uint16_t t, uint64_t s);
~ObjectCacheReadData() override;
void encode_payload() override;
- void decode_payload(bufferlist::const_iterator bl) override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
uint16_t get_request_type() override { return RBDSC_READ; }
bool payload_empty() override { return false; }
};
@@ -106,7 +111,8 @@ class ObjectCacheReadReplyData : public ObjectCacheRequest {
ObjectCacheReadReplyData(uint16_t t, uint64_t s);
~ObjectCacheReadReplyData() override;
void encode_payload() override;
- void decode_payload(bufferlist::const_iterator bl) override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
uint16_t get_request_type() override { return RBDSC_READ_REPLY; }
bool payload_empty() override { return false; }
};
@@ -117,7 +123,8 @@ class ObjectCacheReadRadosData : public ObjectCacheRequest {
ObjectCacheReadRadosData(uint16_t t, uint64_t s);
~ObjectCacheReadRadosData() override;
void encode_payload() override;
- void decode_payload(bufferlist::const_iterator bl) override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
uint16_t get_request_type() override { return RBDSC_READ_RADOS; }
bool payload_empty() override { return true; }
};