diff options
author | Jason Dillaman <dillaman@redhat.com> | 2020-12-10 15:07:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-10 15:07:16 +0100 |
commit | 6f4e3b0dd1f4df7165353b821aefeb13411bef48 (patch) | |
tree | d8a2b1e3c5944b2490da34c78988ff78c0fb902a | |
parent | Merge pull request #38435 from votdev/issue_48449_test_standby (diff) | |
parent | tools: add throttle mechanism to immutable object cache (diff) | |
download | ceph-6f4e3b0dd1f4df7165353b821aefeb13411bef48.tar.xz ceph-6f4e3b0dd1f4df7165353b821aefeb13411bef48.zip |
Merge pull request #36551 from CongMinYin/immutable_object_cache_throttle
tools: add throttle mechanism to immutable object cache
Reviewed-by: Jason Dillaman <dillaman@redhat.com>
18 files changed, 298 insertions, 51 deletions
diff --git a/qa/tasks/mgr/dashboard/test_cluster_configuration.py b/qa/tasks/mgr/dashboard/test_cluster_configuration.py index dc96bced02a..9c8245d238d 100644 --- a/qa/tasks/mgr/dashboard/test_cluster_configuration.py +++ b/qa/tasks/mgr/dashboard/test_cluster_configuration.py @@ -369,7 +369,7 @@ class ClusterConfigurationTest(DashboardTestCase): self.assertIn('type', data) self.assertIn('desc', data) self.assertIn(data['type'], ['str', 'bool', 'float', 'int', 'size', 'uint', 'addr', - 'addrvec', 'uuid', 'secs']) + 'addrvec', 'uuid', 'secs', 'millisecs']) if 'value' in data: self.assertIn('source', data) diff --git a/src/common/options.cc b/src/common/options.cc index 367e52ee0db..0f495673302 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -49,6 +49,9 @@ public: void operator()(const std::chrono::seconds v) const { out << v.count(); } + void operator()(const std::chrono::milliseconds v) const { + out << v.count(); + } }; } @@ -203,6 +206,13 @@ int Option::parse_value( *error_message = e.what(); return -EINVAL; } + } else if (type == Option::TYPE_MILLISECS) { + try { + *out = boost::lexical_cast<uint64_t>(val); + } catch (const boost::bad_lexical_cast& e) { + *error_message = e.what(); + return -EINVAL; + } } else { ceph_abort(); } @@ -7894,6 +7904,37 @@ static std::vector<Option> get_immutable_object_cache_options() { Option("immutable_object_cache_watermark", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED) .set_default(0.1) .set_description("immutable object cache water mark"), + + Option("immutable_object_cache_qos_schedule_tick_min", Option::TYPE_MILLISECS, Option::LEVEL_ADVANCED) + .set_default(50) + .set_min(1) + .set_description("minimum schedule tick for immutable object cache"), + + Option("immutable_object_cache_qos_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED) + .set_default(0) + .set_description("the desired immutable object cache IO operations limit per second"), + + Option("immutable_object_cache_qos_iops_burst", Option::TYPE_UINT, Option::LEVEL_ADVANCED) + .set_default(0) + .set_description("the desired burst limit of immutable object cache IO operations"), + + Option("immutable_object_cache_qos_iops_burst_seconds", Option::TYPE_SECS, Option::LEVEL_ADVANCED) + .set_default(1) + .set_min(1) + .set_description("the desired burst duration in seconds of immutable object cache IO operations"), + + Option("immutable_object_cache_qos_bps_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED) + .set_default(0) + .set_description("the desired immutable object cache IO bytes limit per second"), + + Option("immutable_object_cache_qos_bps_burst", Option::TYPE_UINT, Option::LEVEL_ADVANCED) + .set_default(0) + .set_description("the desired burst limit of immutable object cache IO bytes"), + + Option("immutable_object_cache_qos_bps_burst_seconds", Option::TYPE_SECS, Option::LEVEL_ADVANCED) + .set_default(1) + .set_min(1) + .set_description("the desired burst duration in seconds of immutable object cache IO bytes"), }); } diff --git a/src/common/options.h b/src/common/options.h index 75812c23450..15a0bf50465 100644 --- a/src/common/options.h +++ b/src/common/options.h @@ -23,6 +23,7 @@ struct Option { TYPE_UUID = 7, TYPE_SIZE = 8, TYPE_SECS = 9, + TYPE_MILLISECS = 10, }; static const char *type_to_c_type_str(type_t t) { @@ -37,6 +38,7 @@ struct Option { case TYPE_UUID: return "uuid_d"; case TYPE_SIZE: return "size_t"; case TYPE_SECS: return "secs"; + case TYPE_MILLISECS: return "millisecs"; default: return "unknown"; } } @@ -52,6 +54,7 @@ struct Option { case TYPE_UUID: return "uuid"; case TYPE_SIZE: return "size"; case TYPE_SECS: return "secs"; + case TYPE_MILLISECS: return "millisecs"; default: return "unknown"; } } @@ -86,6 +89,9 @@ struct Option { if (s == "secs") { return TYPE_SECS; } + if (s == "millisecs") { + return TYPE_MILLISECS; + } return -1; } @@ -140,6 +146,7 @@ struct Option { entity_addr_t, entity_addrvec_t, std::chrono::seconds, + std::chrono::milliseconds, size_t, uuid_d>; const std::string name; @@ -215,6 +222,8 @@ struct Option { value = size_t{0}; break; case TYPE_SECS: value = std::chrono::seconds{0}; break; + case TYPE_MILLISECS: + value = std::chrono::milliseconds{0}; break; default: ceph_abort(); } @@ -265,6 +274,8 @@ struct Option { v = size_t{static_cast<std::size_t>(new_value)}; break; case TYPE_SECS: v = std::chrono::seconds{new_value}; break; + case TYPE_MILLISECS: + v = std::chrono::milliseconds{new_value}; break; default: std::cerr << "Bad type in set_value: " << name << ": " << typeid(T).name() << std::endl; @@ -377,10 +388,11 @@ struct Option { { return (has_flag(FLAG_RUNTIME) - || (!has_flag(FLAG_MGR) - && (type == TYPE_BOOL || type == TYPE_INT - || type == TYPE_UINT || type == TYPE_FLOAT - || type == TYPE_SIZE || type == TYPE_SECS))) + || (!has_flag(FLAG_MGR) + && (type == TYPE_BOOL || type == TYPE_INT + || type == TYPE_UINT || type == TYPE_FLOAT + || type == TYPE_SIZE || type == TYPE_SECS + || type == TYPE_MILLISECS))) && !has_flag(FLAG_STARTUP) && !has_flag(FLAG_CLUSTER_CREATE) && !has_flag(FLAG_CREATE); diff --git a/src/librbd/cache/ParentCacheObjectDispatch.cc b/src/librbd/cache/ParentCacheObjectDispatch.cc index 843bc845fcd..762b18101f6 100644 --- a/src/librbd/cache/ParentCacheObjectDispatch.cc +++ b/src/librbd/cache/ParentCacheObjectDispatch.cc @@ -102,6 +102,7 @@ bool ParentCacheObjectDispatch<I>::read( m_cache_client->lookup_object(m_image_ctx->data_ctx.get_namespace(), m_image_ctx->data_ctx.get_id(), io_context->read_snap().value_or(CEPH_NOSNAP), + m_image_ctx->layout.object_size, oid, std::move(ctx)); return true; } diff --git a/src/mgr/PyUtil.cc b/src/mgr/PyUtil.cc index bfecdb35423..a8efc2f2800 100644 --- a/src/mgr/PyUtil.cc +++ b/src/mgr/PyUtil.cc @@ -15,6 +15,7 @@ PyObject *get_python_typed_option_value( case Option::TYPE_SIZE: return PyLong_FromString((char *)value.c_str(), nullptr, 0); case Option::TYPE_SECS: + case Option::TYPE_MILLISECS: case Option::TYPE_FLOAT: { PyObject *s = PyUnicode_FromString(value.c_str()); diff --git a/src/test/immutable_object_cache/MockCacheDaemon.h b/src/test/immutable_object_cache/MockCacheDaemon.h index 3773e87ea9a..02e86acb2a7 100644 --- a/src/test/immutable_object_cache/MockCacheDaemon.h +++ b/src/test/immutable_object_cache/MockCacheDaemon.h @@ -24,8 +24,8 @@ class MockCacheClient { MOCK_METHOD0(stop, void()); MOCK_METHOD0(connect, int()); MOCK_METHOD1(connect, void(Context*)); - MOCK_METHOD5(lookup_object, void(std::string, uint64_t, uint64_t, std::string, - CacheGenContextURef)); + MOCK_METHOD6(lookup_object, void(std::string, uint64_t, uint64_t, uint64_t, + std::string, CacheGenContextURef)); MOCK_METHOD1(register_client, int(Context*)); }; diff --git a/src/test/immutable_object_cache/test_DomainSocket.cc b/src/test/immutable_object_cache/test_DomainSocket.cc index 3a538a3191c..31d1b9adc20 100644 --- a/src/test/immutable_object_cache/test_DomainSocket.cc +++ b/src/test/immutable_object_cache/test_DomainSocket.cc @@ -122,7 +122,7 @@ public: usleep(1); } - m_cache_client->lookup_object("pool_nspace", 1, 2, "object_name", std::move(ctx)); + m_cache_client->lookup_object("pool_nspace", 1, 2, 3, "object_name", std::move(ctx)); m_send_request_index++; } m_wait_event.wait(); @@ -135,7 +135,7 @@ public: hit = ack->type == RBDSC_READ_REPLY; m_wait_event.signal(); }); - m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, std::move(ctx)); + m_cache_client->lookup_object(pool_nspace, 1, 2, 3, object_id, std::move(ctx)); m_wait_event.wait(); return hit; } diff --git a/src/test/immutable_object_cache/test_message.cc b/src/test/immutable_object_cache/test_message.cc index b03fa35313d..bbd6ad165b9 100644 --- a/src/test/immutable_object_cache/test_message.cc +++ b/src/test/immutable_object_cache/test_message.cc @@ -16,10 +16,11 @@ TEST(test_for_message, test_1) uint64_t read_len = 333333UL; uint64_t pool_id = 444444UL; uint64_t snap_id = 555555UL; + uint64_t object_size = 666666UL; // ObjectRequest --> bufferlist ObjectCacheRequest* req = new ObjectCacheReadData(type, seq, read_offset, read_len, - pool_id, snap_id, oid_name, pool_nspace); + pool_id, snap_id, object_size, oid_name, pool_nspace); req->encode(); auto payload_bl = req->get_payload_bufferlist(); @@ -40,8 +41,9 @@ TEST(test_for_message, test_1) ASSERT_EQ(((ObjectCacheReadData*)req_decode)->read_len, 333333UL); ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_id, 444444UL); ASSERT_EQ(((ObjectCacheReadData*)req_decode)->snap_id, 555555UL); - ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_namespace, pool_nspace); ASSERT_EQ(((ObjectCacheReadData*)req_decode)->oid, oid_name); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_namespace, pool_nspace); + ASSERT_EQ(((ObjectCacheReadData*)req_decode)->object_size, 666666UL); delete req; delete req_decode; diff --git a/src/test/immutable_object_cache/test_multi_session.cc b/src/test/immutable_object_cache/test_multi_session.cc index e3a73bc373c..c0c629ab036 100644 --- a/src/test/immutable_object_cache/test_multi_session.cc +++ b/src/test/immutable_object_cache/test_multi_session.cc @@ -126,7 +126,7 @@ public: }); m_send_request_index++; // here just for concurrently testing register + lookup, so fix object id. - m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, "1234", std::move(ctx)); + m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx)); } if (is_last) { diff --git a/src/test/immutable_object_cache/test_object_store.cc b/src/test/immutable_object_cache/test_object_store.cc index 6d2875a7d3e..e100ed44456 100644 --- a/src/test/immutable_object_cache/test_object_store.cc +++ b/src/test/immutable_object_cache/test_object_store.cc @@ -62,7 +62,8 @@ public: m_object_cache_store = new ObjectCacheStore(m_ceph_context); } - void init_object_cache_store(std::string pool_name, std::string vol_name, uint64_t vol_size, bool reset) { + void init_object_cache_store(std::string pool_name, std::string vol_name, + uint64_t vol_size, bool reset) { ASSERT_EQ(0, m_object_cache_store->init(reset)); ASSERT_EQ(0, m_object_cache_store->init_cache()); } @@ -71,10 +72,11 @@ public: ASSERT_EQ(0, m_object_cache_store->shutdown()); } - void lookup_object_cache_store(std::string pool_name, std::string vol_name, std::string obj_name, int& ret) { + void lookup_object_cache_store(std::string pool_name, std::string vol_name, + std::string obj_name, int& ret) { std::string cache_path; - ret = m_object_cache_store->lookup_object(pool_name, 1, 2, obj_name, true, - cache_path); + ret = m_object_cache_store->lookup_object(pool_name, 1, 2, 3, + obj_name, true, cache_path); } void TearDown() override { diff --git a/src/test/librbd/cache/test_mock_ParentCacheObjectDispatch.cc b/src/test/librbd/cache/test_mock_ParentCacheObjectDispatch.cc index 6609f1f02e4..2b262825127 100644 --- a/src/test/librbd/cache/test_mock_ParentCacheObjectDispatch.cc +++ b/src/test/librbd/cache/test_mock_ParentCacheObjectDispatch.cc @@ -105,8 +105,8 @@ public : void expect_cache_lookup_object(MockParentImageCache& mparent_image_cache, const std::string &cache_path) { EXPECT_CALL(*(mparent_image_cache.get_cache_client()), - lookup_object(_, _, _, _, _)) - .WillOnce(WithArg<4>(Invoke([cache_path](CacheGenContextURef on_finish) { + lookup_object(_, _, _, _, _, _)) + .WillOnce(WithArg<5>(Invoke([cache_path](CacheGenContextURef on_finish) { auto ack = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, 0, cache_path); on_finish.release()->complete(ack); }))); diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index d378992719c..e4ed6cb0ed0 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -113,12 +113,13 @@ namespace immutable_obj_cache { } void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, - uint64_t snap_id, std::string oid, + uint64_t snap_id, uint64_t object_size, + std::string oid, CacheGenContextURef&& on_finish) { ldout(m_cct, 20) << dendl; ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, - ++m_sequence_id, 0, 0, - pool_id, snap_id, oid, pool_nspace); + ++m_sequence_id, 0, 0, pool_id, + snap_id, object_size, oid, pool_nspace); req->process_msg = std::move(on_finish); req->encode(); diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h index e1e9a65927d..b2f74963125 100644 --- a/src/tools/immutable_object_cache/CacheClient.h +++ b/src/tools/immutable_object_cache/CacheClient.h @@ -32,7 +32,7 @@ class CacheClient { int connect(); void connect(Context* on_finish); void lookup_object(std::string pool_nspace, uint64_t pool_id, - uint64_t snap_id, std::string oid, + uint64_t snap_id, uint64_t object_size, std::string oid, CacheGenContextURef&& on_finish); int register_client(Context* on_finish); diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc index 1fade8f84b4..ae16368392e 100644 --- a/src/tools/immutable_object_cache/CacheController.cc +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -25,7 +25,6 @@ CacheController::~CacheController() { int CacheController::init() { ldout(m_cct, 20) << dendl; - m_object_cache_store = new ObjectCacheStore(m_cct); // TODO(dehao): make this configurable int r = m_object_cache_store->init(true); @@ -118,8 +117,8 @@ void CacheController::handle_request(CacheSession* session, bool return_dne_path = session->client_version().empty(); int ret = m_object_cache_store->lookup_object( req_read_data->pool_namespace, req_read_data->pool_id, - req_read_data->snap_id, req_read_data->oid, return_dne_path, - cache_path); + req_read_data->snap_id, req_read_data->object_size, + req_read_data->oid, return_dne_path, cache_path); ObjectCacheRequest* reply = nullptr; if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) { reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq); diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc index cee1ca2b65d..18f64250ff7 100644 --- a/src/tools/immutable_object_cache/ObjectCacheStore.cc +++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc @@ -21,6 +21,30 @@ namespace fs = std::experimental::filesystem; namespace ceph { namespace immutable_obj_cache { +namespace { + +class SafeTimerSingleton : public SafeTimer { +public: + ceph::mutex lock = ceph::make_mutex + ("ceph::immutable_object_cache::SafeTimerSingleton::lock"); + + explicit SafeTimerSingleton(CephContext *cct) + : SafeTimer(cct, lock, true) { + init(); + } + ~SafeTimerSingleton() { + std::lock_guard locker{lock}; + shutdown(); + } +}; + +} // anonymous namespace + +enum ThrottleTargetCode { + ROC_QOS_IOPS_THROTTLE = 1, + ROC_QOS_BPS_THROTTLE = 2 +}; + ObjectCacheStore::ObjectCacheStore(CephContext *cct) : m_cct(cct), m_rados(new librados::Rados()) { @@ -40,12 +64,44 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct) uint64_t max_inflight_ops = m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops"); + uint64_t limit = 0; + if ((limit = m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_iops_limit")) != 0) { + apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE, + m_cct->_conf.get_val<std::chrono::milliseconds> + ("immutable_object_cache_qos_schedule_tick_min"), + limit, + m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_iops_burst"), + m_cct->_conf.get_val<std::chrono::seconds> + ("immutable_object_cache_qos_iops_burst_seconds")); + } + if ((limit = m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_bps_limit")) != 0) { + apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE, + m_cct->_conf.get_val<std::chrono::milliseconds> + ("immutable_object_cache_qos_schedule_tick_min"), + limit, + m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_bps_burst"), + m_cct->_conf.get_val<std::chrono::seconds> + ("immutable_object_cache_qos_bps_burst_seconds")); + } + m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops, cache_watermark); } ObjectCacheStore::~ObjectCacheStore() { delete m_policy; + if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) { + ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr); + delete m_throttles[ROC_QOS_IOPS_THROTTLE]; + } + if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) { + ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr); + delete m_throttles[ROC_QOS_BPS_THROTTLE]; + } } int ObjectCacheStore::init(bool reset) { @@ -97,9 +153,8 @@ int ObjectCacheStore::init_cache() { return 0; } -int ObjectCacheStore::do_promote(std::string pool_nspace, - uint64_t pool_id, uint64_t snap_id, - std::string object_name) { +int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string object_name) { ldout(m_cct, 20) << "to promote object: " << object_name << " from pool id: " << pool_id << " namespace: " << pool_nspace @@ -183,8 +238,8 @@ int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, return ret; } -int ObjectCacheStore::lookup_object(std::string pool_nspace, - uint64_t pool_id, uint64_t snap_id, +int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, uint64_t object_size, std::string object_name, bool return_dne_path, std::string& target_cache_file_path) { @@ -198,9 +253,13 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace, switch (ret) { case OBJ_CACHE_NONE: { - pret = do_promote(pool_nspace, pool_id, snap_id, object_name); - if (pret < 0) { - lderr(m_cct) << "fail to start promote" << dendl; + if (take_token_from_throttle(object_size, 1)) { + pret = do_promote(pool_nspace, pool_id, snap_id, object_name); + if (pret < 0) { + lderr(m_cct) << "fail to start promote" << dendl; + } + } else { + m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); } return ret; } @@ -307,5 +366,95 @@ std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name, return m_cache_root_dir + cache_file_dir + cache_file_name; } +void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) { + m_io_throttled = false; + std::lock_guard lock(m_throttle_lock); + if (type & ROC_QOS_IOPS_THROTTLE){ + m_iops_tokens += tokens; + } else if (type & ROC_QOS_BPS_THROTTLE){ + m_bps_tokens += tokens; + } else { + lderr(m_cct) << "unknow throttle type." << dendl; + } +} + +bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size, + uint64_t object_num) { + if (m_io_throttled == true) { + return false; + } + + int flag = 0; + bool wait = false; + if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) { + std::lock_guard lock(m_throttle_lock); + if (object_num > m_iops_tokens) { + wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this, + &ObjectCacheStore::handle_throttle_ready, object_num, + ROC_QOS_IOPS_THROTTLE); + } else { + m_iops_tokens -= object_num; + flag = 1; + } + } + if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) { + std::lock_guard lock(m_throttle_lock); + if (object_size > m_bps_tokens) { + wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this, + &ObjectCacheStore::handle_throttle_ready, object_size, + ROC_QOS_BPS_THROTTLE); + } else { + m_bps_tokens -= object_size; + } + } + + if (wait) { + m_io_throttled = true; + // when passing iops throttle, but limit in bps throttle, recovery + if (flag == 1) { + std::lock_guard lock(m_throttle_lock); + m_iops_tokens += object_num; + } + } + + return !wait; +} + +static const std::map<uint64_t, std::string> THROTTLE_FLAGS = { + { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" }, + { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" } +}; + +void ObjectCacheStore::apply_qos_tick_and_limit( + const uint64_t flag, + std::chrono::milliseconds min_tick, + uint64_t limit, + uint64_t burst, + std::chrono::seconds burst_seconds) { + SafeTimerSingleton* safe_timer_singleton = nullptr; + TokenBucketThrottle* throttle = nullptr; + safe_timer_singleton = + &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>( + "tools::immutable_object_cache", false, m_cct); + SafeTimer* timer = safe_timer_singleton; + ceph::mutex* timer_lock = &safe_timer_singleton->lock; + m_qos_enabled_flag |= flag; + auto throttle_flags_it = THROTTLE_FLAGS.find(flag); + ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end()); + throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second, + 0, 0, timer, timer_lock); + throttle->set_schedule_tick_min(min_tick.count()); + int ret = throttle->set_limit(limit, burst, burst_seconds.count()); + if (ret < 0) { + lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: " + << "burst(" << burst << ") is less than " + << "limit(" << limit << ")" << dendl; + throttle->set_limit(limit, 0, 1); + } + + ceph_assert(m_throttles.find(flag) == m_throttles.end()); + m_throttles.insert({flag, throttle}); +} + } // namespace immutable_obj_cache } // namespace ceph diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.h b/src/tools/immutable_object_cache/ObjectCacheStore.h index 270a93be452..607921320ae 100644 --- a/src/tools/immutable_object_cache/ObjectCacheStore.h +++ b/src/tools/immutable_object_cache/ObjectCacheStore.h @@ -6,6 +6,8 @@ #include "common/ceph_context.h" #include "common/ceph_mutex.h" +#include "common/Throttle.h" +#include "common/Cond.h" #include "include/rados/librados.hpp" #include "SimplePolicy.h" @@ -30,11 +32,16 @@ class ObjectCacheStore { int init_cache(); int lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, + uint64_t object_size, std::string object_name, bool return_dne_path, std::string& target_cache_file_path); - private: + enum ThrottleTypeCode { + THROTTLE_CODE_BYTE, + THROTTLE_CODE_OBJECT + }; + std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, std::string oid); std::string get_cache_file_path(std::string cache_file_name, @@ -48,6 +55,13 @@ class ObjectCacheStore { int handle_promote_callback(int, bufferlist*, std::string); int do_evict(std::string cache_file); + bool take_token_from_throttle(uint64_t object_size, uint64_t object_num); + void handle_throttle_ready(uint64_t tokens, uint64_t type); + void apply_qos_tick_and_limit(const uint64_t flag, + std::chrono::milliseconds min_tick, + uint64_t limit, uint64_t burst, + std::chrono::seconds burst_seconds); + CephContext *m_cct; RadosRef m_rados; std::map<uint64_t, librados::IoCtx> m_ioctx_map; @@ -55,6 +69,14 @@ class ObjectCacheStore { ceph::make_mutex("ceph::cache::ObjectCacheStore::m_ioctx_map_lock"); Policy* m_policy; std::string m_cache_root_dir; + // throttle mechanism + uint64_t m_qos_enabled_flag{0}; + std::map<uint64_t, TokenBucketThrottle*> m_throttles; + bool m_io_throttled{false}; + ceph::mutex m_throttle_lock = + ceph::make_mutex("ceph::cache::ObjectCacheStore::m_throttle_lock");; + uint64_t m_iops_tokens{0}; + uint64_t m_bps_tokens{0}; }; } // namespace immutable_obj_cache diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc index e65e94bee1e..860017d6aae 100644 --- a/src/tools/immutable_object_cache/Types.cc +++ b/src/tools/immutable_object_cache/Types.cc @@ -17,7 +17,7 @@ ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s) ObjectCacheRequest::~ObjectCacheRequest() {} void ObjectCacheRequest::encode() { - ENCODE_START(1, 1, payload); + ENCODE_START(2, 1, payload); ceph::encode(type, payload); ceph::encode(seq, payload); if (!payload_empty()) { @@ -28,11 +28,11 @@ void ObjectCacheRequest::encode() { void ObjectCacheRequest::decode(bufferlist& bl) { auto i = bl.cbegin(); - DECODE_START(1, i); + DECODE_START(2, i); ceph::decode(type, i); ceph::decode(seq, i); if (!payload_empty()) { - decode_payload(i); + decode_payload(i, struct_v); } DECODE_FINISH(i); } @@ -52,7 +52,8 @@ void ObjectCacheRegData::encode_payload() { ceph::encode(version, payload); } -void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) { +void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { if (i.end()) { return; } @@ -67,17 +68,19 @@ ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {} void ObjectCacheRegReplyData::encode_payload() {} -void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {} +void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) {} ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id, uint64_t snap_id, + uint64_t object_size, std::string oid, std::string pool_namespace) : ObjectCacheRequest(t, s), read_offset(read_offset), read_len(read_len), pool_id(pool_id), snap_id(snap_id), - oid(oid), pool_namespace(pool_namespace) + object_size(object_size), oid(oid), pool_namespace(pool_namespace) {} ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s) @@ -92,15 +95,20 @@ void ObjectCacheReadData::encode_payload() { ceph::encode(snap_id, payload); ceph::encode(oid, payload); ceph::encode(pool_namespace, payload); + ceph::encode(object_size, payload); } -void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) { +void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { ceph::decode(read_offset, i); ceph::decode(read_len, i); ceph::decode(pool_id, i); ceph::decode(snap_id, i); ceph::decode(oid, i); ceph::decode(pool_namespace, i); + if (encode_version >= 2) { + ceph::decode(object_size, i); + } } ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, @@ -115,7 +123,8 @@ void ObjectCacheReadReplyData::encode_payload() { ceph::encode(cache_path, payload); } -void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) { +void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { ceph::decode(cache_path, i); } @@ -127,7 +136,8 @@ ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {} void ObjectCacheReadRadosData::encode_payload() {} -void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {} +void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) {} ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) { ObjectCacheRequest* req = nullptr; diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index 5fab1ec4897..05394d84307 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -50,7 +50,8 @@ class ObjectCacheRequest { bufferlist get_payload_bufferlist() { return payload; } virtual void encode_payload() = 0; - virtual void decode_payload(bufferlist::const_iterator bl_it) = 0; + virtual void decode_payload(bufferlist::const_iterator bl_it, + __u8 encode_version) = 0; virtual uint16_t get_request_type() = 0; virtual bool payload_empty() = 0; }; @@ -63,7 +64,8 @@ class ObjectCacheRegData : public ObjectCacheRequest { ObjectCacheRegData(uint16_t t, uint64_t s); ~ObjectCacheRegData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_REGISTER; } bool payload_empty() override { return false; } }; @@ -74,7 +76,8 @@ class ObjectCacheRegReplyData : public ObjectCacheRequest { ObjectCacheRegReplyData(uint16_t t, uint64_t s); ~ObjectCacheRegReplyData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator iter) override; + void decode_payload(bufferlist::const_iterator iter, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; } bool payload_empty() override { return true; } }; @@ -85,16 +88,18 @@ class ObjectCacheReadData : public ObjectCacheRequest { uint64_t read_len; uint64_t pool_id; uint64_t snap_id; + uint64_t object_size = 0; std::string oid; std::string pool_namespace; ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id, - uint64_t snap_id, std::string oid, - std::string pool_namespace); + uint64_t snap_id, uint64_t object_size, + std::string oid, std::string pool_namespace); ObjectCacheReadData(uint16_t t, uint64_t s); ~ObjectCacheReadData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_READ; } bool payload_empty() override { return false; } }; @@ -106,7 +111,8 @@ class ObjectCacheReadReplyData : public ObjectCacheRequest { ObjectCacheReadReplyData(uint16_t t, uint64_t s); ~ObjectCacheReadReplyData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_READ_REPLY; } bool payload_empty() override { return false; } }; @@ -117,7 +123,8 @@ class ObjectCacheReadRadosData : public ObjectCacheRequest { ObjectCacheReadRadosData(uint16_t t, uint64_t s); ~ObjectCacheReadRadosData() override; void encode_payload() override; - void decode_payload(bufferlist::const_iterator bl) override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; uint16_t get_request_type() override { return RBDSC_READ_RADOS; } bool payload_empty() override { return true; } }; |