diff options
author | shangdehao1 <dehao.shang@intel.com> | 2019-03-06 23:24:16 +0100 |
---|---|---|
committer | Yuan Zhou <yuan.zhou@intel.com> | 2019-03-21 17:16:30 +0100 |
commit | 9cfd3594bc7632d8e693037add083270b489a835 (patch) | |
tree | dee0fcc96647df7c0e9149bb1d0525f93d5dff9d /src/tools/immutable_object_cache | |
parent | tools: clean up data structure of immutable obj cache daemon (diff) | |
download | ceph-9cfd3594bc7632d8e693037add083270b489a835.tar.xz ceph-9cfd3594bc7632d8e693037add083270b489a835.zip |
tools: adjust code style of RO
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
Diffstat (limited to 'src/tools/immutable_object_cache')
18 files changed, 225 insertions, 212 deletions
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index ba226dd34cf..58083bbb9ae 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -17,9 +17,8 @@ namespace immutable_obj_cache { m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)), m_io_thread(nullptr), m_session_work(false), m_writing(false), m_reading(false), m_sequence_id(0), - m_lock("ceph::cache::cacheclient::m_lock") - { - // TODO : configure it. + m_lock("ceph::cache::cacheclient::m_lock") { + // TODO(dehao) : configure it. m_use_dedicated_worker = true; m_worker_thread_num = 2; if (m_use_dedicated_worker) { @@ -31,14 +30,13 @@ namespace immutable_obj_cache { } } m_bp_header = buffer::create(get_header_size()); - } CacheClient::~CacheClient() { stop(); } - void CacheClient::run(){ + void CacheClient::run() { m_io_thread.reset(new std::thread([this](){m_io_service.run(); })); } @@ -85,11 +83,12 @@ namespace immutable_obj_cache { return 0; } - void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, - std::string oid, GenContext<ObjectCacheRequest*>* on_finish) { - - ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0, - pool_id, snap_id, oid, pool_nspace); + void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string oid, + GenContext<ObjectCacheRequest*>* on_finish) { + ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, + ++m_sequence_id, 0, 0, + pool_id, snap_id, oid, pool_nspace); req->process_msg = on_finish; req->encode(); @@ -160,7 +159,6 @@ namespace immutable_obj_cache { } void CacheClient::read_reply_header() { - /* create new head buffer for every reply */ bufferptr bp_head(buffer::create(get_header_size())); auto raw_ptr = bp_head.c_str(); @@ -175,8 +173,8 @@ namespace immutable_obj_cache { } void CacheClient::handle_reply_header(bufferptr bp_head, - const boost::system::error_code& ec, - size_t bytes_transferred) { + const boost::system::error_code& ec, + size_t bytes_transferred) { if (ec || bytes_transferred != get_header_size()) { fault(ASIO_ERROR_READ, ec); return; @@ -190,9 +188,9 @@ namespace immutable_obj_cache { read_reply_data(std::move(bp_head), std::move(bp_data), data_len); } - void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data, + void CacheClient::read_reply_data(bufferptr&& bp_head, + bufferptr&& bp_data, const uint64_t data_len) { - auto raw_ptr = bp_data.c_str(); boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len), boost::asio::transfer_exactly(data_len), @@ -200,10 +198,10 @@ namespace immutable_obj_cache { this, std::move(bp_head), std::move(bp_data), data_len, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); - } - void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data, + void CacheClient::handle_reply_data(bufferptr bp_head, + bufferptr bp_data, const uint64_t data_len, const boost::system::error_code& ec, size_t bytes_transferred) { @@ -233,7 +231,6 @@ namespace immutable_obj_cache { if (is_session_work()) { receive_message(); } - } void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) { @@ -266,7 +263,8 @@ namespace immutable_obj_cache { } // if there is one request fails, just execute fault, then shutdown RO. - void CacheClient::fault(const int err_type, const boost::system::error_code& ec) { + void CacheClient::fault(const int err_type, + const boost::system::error_code& ec) { ldout(cct, 20) << "fault." << ec.message() << dendl; if (err_type == ASIO_ERROR_CONNECT) { @@ -276,7 +274,8 @@ namespace immutable_obj_cache { << ". Immutable-object-cache daemon is down ? " << "Data will be read from ceph cluster " << dendl; } else { - ldout(cct, 20) << "Connecting RO daemon fails : " << ec.message() << dendl; + ldout(cct, 20) << "Connecting RO daemon fails : " + << ec.message() << dendl; } if (m_dm_socket.is_open()) { @@ -296,11 +295,11 @@ namespace immutable_obj_cache { return; } - // when current session don't work, ASIO will don't receive any new request from hook. - // On the other hand, for pending request of ASIO, cancle these request, then call their callback. - // these request which are cancled by this method, will be re-dispatched to RADOS layer. - // - // make sure just have one thread to modify execute below code. + /* when current session don't work, ASIO will don't receive any new request from hook. + * On the other hand, for pending request of ASIO, cancle these request, + * then call their callback. these request which are cancled by this method, + * will be re-dispatched to RADOS layer. + * make sure just have one thread to modify execute below code. */ m_session_work.store(false); if (err_type == ASIO_ERROR_MSG_INCOMPLETE) { @@ -321,7 +320,8 @@ namespace immutable_obj_cache { // currently, for any asio error, just shutdown RO. close(); - // all pending request, which have entered into ASIO, will be re-dispatched to RADOS. + /* all pending request, which have entered into ASIO, + * will be re-dispatched to RADOS.*/ { Mutex::Locker locker(m_lock); for (auto it : m_seq_to_req) { @@ -337,7 +337,8 @@ namespace immutable_obj_cache { } int CacheClient::register_client(Context* on_finish) { - ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++); + ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, + m_sequence_id++); reg_req->encode(); bufferlist bl; @@ -365,7 +366,8 @@ namespace immutable_obj_cache { uint64_t data_len = get_data_len(m_bp_header.c_str()); bufferptr bp_data(buffer::create(data_len)); - ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec); + ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), + data_len), ec); if (ec || ret != data_len) { fault(ASIO_ERROR_READ, ec); return -1; @@ -387,5 +389,5 @@ namespace immutable_obj_cache { return 0; } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h index 8917062286c..47a39cb1432 100644 --- a/src/tools/immutable_object_cache/CacheClient.h +++ b/src/tools/immutable_object_cache/CacheClient.h @@ -9,6 +9,7 @@ #include <boost/bind.hpp> #include <boost/asio/error.hpp> #include <boost/algorithm/string.hpp> + #include "include/ceph_assert.h" #include "include/Context.h" #include "common/Mutex.h" @@ -22,8 +23,7 @@ namespace ceph { namespace immutable_obj_cache { class CacheClient { -public: - + public: CacheClient(const std::string& file, CephContext* ceph_ctx); ~CacheClient(); void run(); @@ -31,12 +31,12 @@ public: void close(); int stop(); int connect(); - void lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, - std::string oid, GenContext<ObjectCacheRequest*>* on_finish); + void lookup_object(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string oid, + GenContext<ObjectCacheRequest*>* on_finish); int register_client(Context* on_finish); -private: - + private: void send_message(); void try_send(); void fault(const int err_type, const boost::system::error_code& err); @@ -45,14 +45,16 @@ private: void process(ObjectCacheRequest* reply, uint64_t seq_id); void read_reply_header(); void handle_reply_header(bufferptr bp_head, - const boost::system::error_code& ec, size_t bytes_transferred); + const boost::system::error_code& ec, + size_t bytes_transferred); void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data, const uint64_t data_len); void handle_reply_data(bufferptr bp_head, bufferptr bp_data, const uint64_t data_len, - const boost::system::error_code& ec, size_t bytes_transferred); -private: + const boost::system::error_code& ec, + size_t bytes_transferred); + private: CephContext* cct; boost::asio::io_service m_io_service; boost::asio::io_service::work m_io_service_work; @@ -76,6 +78,6 @@ private: bufferptr m_bp_header; }; -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc index 7648a3a6d05..4fc2dce910c 100644 --- a/src/tools/immutable_object_cache/CacheController.cc +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -12,7 +12,8 @@ namespace ceph { namespace immutable_obj_cache { -CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): +CacheController::CacheController(CephContext *cct, + const std::vector<const char*> &args): m_args(args), m_cct(cct) { ldout(m_cct, 20) << dendl; } @@ -26,7 +27,7 @@ int CacheController::init() { ldout(m_cct, 20) << dendl; m_object_cache_store = new ObjectCacheStore(m_cct); - //TODO(): make this configurable + // TODO(dehao): make this configurable int r = m_object_cache_store->init(true); if (r < 0) { lderr(m_cct) << "init error\n" << dendl; @@ -65,7 +66,8 @@ void CacheController::handle_signal(int signum) { void CacheController::run() { try { - std::string controller_path = m_cct->_conf.get_val<std::string>("immutable_object_cache_sock"); + std::string controller_path = + m_cct->_conf.get_val<std::string>("immutable_object_cache_sock"); std::remove(controller_path.c_str()); m_cache_server = new CacheServer(m_cct, controller_path, @@ -80,14 +82,16 @@ void CacheController::run() { } } -void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){ +void CacheController::handle_request(uint64_t session_id, + ObjectCacheRequest* req) { ldout(m_cct, 20) << dendl; switch (req->get_request_type()) { case RBDSC_REGISTER: { - // TODO(): skip register and allow clients to lookup directly + // TODO(dehao): skip register and allow clients to lookup directly - ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq); + ObjectCacheRequest* reply = new ObjectCacheRegReplyData( + RBDSC_REGISTER_REPLY, req->seq); m_cache_server->send(session_id, reply); break; } @@ -95,16 +99,15 @@ void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* re // lookup object in local cache store std::string cache_path; ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req; - int ret = m_object_cache_store->lookup_object(req_read_data->pool_namespace, - req_read_data->pool_id, - req_read_data->snap_id, - req_read_data->oid, - cache_path); + int ret = m_object_cache_store->lookup_object( + req_read_data->pool_namespace, req_read_data->pool_id, + req_read_data->snap_id, req_read_data->oid, cache_path); ObjectCacheRequest* reply = nullptr; if (ret != OBJ_CACHE_PROMOTED) { reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq); } else { - reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path); + reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, + req->seq, cache_path); } m_cache_server->send(session_id, reply); break; @@ -115,5 +118,5 @@ void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* re } } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheController.h b/src/tools/immutable_object_cache/CacheController.h index 79b3b9525de..ef8495f767e 100644 --- a/src/tools/immutable_object_cache/CacheController.h +++ b/src/tools/immutable_object_cache/CacheController.h @@ -34,7 +34,7 @@ class CacheController { ObjectCacheStore *m_object_cache_store; }; -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/CacheServer.cc b/src/tools/immutable_object_cache/CacheServer.cc index e10db19c44a..0df58d55f50 100644 --- a/src/tools/immutable_object_cache/CacheServer.cc +++ b/src/tools/immutable_object_cache/CacheServer.cc @@ -76,7 +76,8 @@ int CacheServer::start_accept() { void CacheServer::accept() { CacheSessionPtr new_session = nullptr; - new_session.reset(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct)); + new_session.reset(new CacheSession(m_session_id, m_io_service, + m_server_process_msg, cct)); m_acceptor.async_accept(new_session->socket(), boost::bind(&CacheServer::handle_accept, this, new_session, @@ -93,7 +94,7 @@ void CacheServer::handle_accept(CacheSessionPtr new_session, } m_session_map.emplace(m_session_id, new_session); - // TODO : session setting + // TODO(dehao) : session setting new_session->start(); m_session_id++; @@ -113,5 +114,5 @@ void CacheServer::send(uint64_t session_id, ObjectCacheRequest* msg) { } } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheServer.h b/src/tools/immutable_object_cache/CacheServer.h index 4646a15952b..84cdd89e5e1 100644 --- a/src/tools/immutable_object_cache/CacheServer.h +++ b/src/tools/immutable_object_cache/CacheServer.h @@ -29,7 +29,8 @@ class CacheServer { private: void accept(); - void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error); + void handle_accept(CacheSessionPtr new_session, + const boost::system::error_code& error); private: CephContext* cct; @@ -38,11 +39,11 @@ class CacheServer { stream_protocol::endpoint m_local_path; stream_protocol::acceptor m_acceptor; uint64_t m_session_id = 1; - // TODO : need to lock it. + // TODO(dehao) : need to lock it. std::map<uint64_t, CacheSessionPtr> m_session_map; }; -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc index e4f76a7c806..6069f811259 100644 --- a/src/tools/immutable_object_cache/CacheSession.cc +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -47,12 +47,11 @@ void CacheSession::start() { void CacheSession::read_request_header() { ldout(cct, 20) << dendl; boost::asio::async_read(m_dm_socket, - boost::asio::buffer(m_bp_header.c_str(), get_header_size()), - boost::asio::transfer_exactly(get_header_size()), - boost::bind(&CacheSession::handle_request_header, - shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + boost::asio::buffer(m_bp_header.c_str(), get_header_size()), + boost::asio::transfer_exactly(get_header_size()), + boost::bind(&CacheSession::handle_request_header, + shared_from_this(), boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); } void CacheSession::handle_request_header(const boost::system::error_code& err, @@ -70,12 +69,12 @@ void CacheSession::read_request_data(uint64_t data_len) { ldout(cct, 20) << dendl; bufferptr bp_data(buffer::create(data_len)); boost::asio::async_read(m_dm_socket, - boost::asio::buffer(bp_data.c_str(), bp_data.length()), - boost::asio::transfer_exactly(data_len), - boost::bind(&CacheSession::handle_request_data, - shared_from_this(), bp_data, data_len, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + boost::asio::buffer(bp_data.c_str(), bp_data.length()), + boost::asio::transfer_exactly(data_len), + boost::bind(&CacheSession::handle_request_data, + shared_from_this(), bp_data, data_len, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); } void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, @@ -112,7 +111,8 @@ void CacheSession::send(ObjectCacheRequest* reply) { boost::asio::async_write(m_dm_socket, boost::asio::buffer(bl.c_str(), bl.length()), boost::asio::transfer_exactly(bl.length()), - [this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) { + [this, bl, reply](const boost::system::error_code& err, + size_t bytes_transferred) { if (err || bytes_transferred != bl.length()) { fault(); return; @@ -123,8 +123,8 @@ void CacheSession::send(ObjectCacheRequest* reply) { void CacheSession::fault() { ldout(cct, 20) << dendl; - // TODO + // TODO(dehao) } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheSession.h b/src/tools/immutable_object_cache/CacheSession.h index 5f5cd1be9bd..36bd5dbaad9 100644 --- a/src/tools/immutable_object_cache/CacheSession.h +++ b/src/tools/immutable_object_cache/CacheSession.h @@ -18,22 +18,25 @@ namespace ceph { namespace immutable_obj_cache { class CacheSession : public std::enable_shared_from_this<CacheSession> { -public: - CacheSession(uint64_t session_id, io_service& io_service, ProcessMsg process_msg, CephContext* ctx); + public: + CacheSession(uint64_t session_id, io_service& io_service, + ProcessMsg process_msg, CephContext* ctx); ~CacheSession(); stream_protocol::socket& socket(); void close(); void start(); void read_request_header(); - void handle_request_header(const boost::system::error_code& err, size_t bytes_transferred); + void handle_request_header(const boost::system::error_code& err, + size_t bytes_transferred); void read_request_data(uint64_t data_len); void handle_request_data(bufferptr bp, uint64_t data_len, - const boost::system::error_code& err, size_t bytes_transferred); + const boost::system::error_code& err, + size_t bytes_transferred); void process(ObjectCacheRequest* req); void fault(); void send(ObjectCacheRequest* msg); -private: + private: uint64_t m_session_id; stream_protocol::socket m_dm_socket; ProcessMsg m_server_process_msg; @@ -44,7 +47,7 @@ private: typedef std::shared_ptr<CacheSession> CacheSessionPtr; -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc index cafe4a731e8..f0b6eb1a5ab 100644 --- a/src/tools/immutable_object_cache/ObjectCacheStore.cc +++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc @@ -19,7 +19,6 @@ namespace immutable_obj_cache { ObjectCacheStore::ObjectCacheStore(CephContext *cct) : m_cct(cct), m_rados(new librados::Rados()), m_ioctx_map_lock("ceph::cache::ObjectCacheStore::m_ioctx_map_lock") { - object_cache_max_size = m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size"); @@ -30,7 +29,7 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct) m_cache_root_dir += "/"; } - //TODO(): allow to set cache level + // TODO(dehao): allow to set cache level m_policy = new SimplePolicy(m_cct, object_cache_max_size, 0.1); } @@ -48,22 +47,23 @@ int ObjectCacheStore::init(bool reset) { } ret = m_rados->connect(); - if (ret < 0 ) { + if (ret < 0) { lderr(m_cct) << "fail to connect to cluster" << dendl; return ret; } - //TODO(): fsck and reuse existing cache objects + // TODO(dehao): fsck and reuse existing cache objects if (reset) { std::error_code ec; if (efs::exists(m_cache_root_dir)) { int dir = m_dir_num - 1; while (dir >= 0) { - if (!efs::remove_all(m_cache_root_dir + "/" + std::to_string(dir), ec)) { + if (!efs::remove_all( + m_cache_root_dir + "/" + std::to_string(dir), ec)) { lderr(m_cct) << "fail to remove old cache store: " << ec << dendl; - return ec.value(); + return ec.value(); } - dir --; + dir--; } } else { if (!efs::create_directories(m_cache_root_dir, ec)) { @@ -89,7 +89,7 @@ int ObjectCacheStore::init_cache() { int dir = m_dir_num - 1; while (dir >= 0) { efs::create_directories(cache_dir + "/" + std::to_string(dir)); - dir --; + dir--; } return 0; } @@ -126,10 +126,10 @@ int ObjectCacheStore::do_promote(std::string pool_nspace, librados::bufferlist* read_buf = new librados::bufferlist(); auto ctx = new FunctionContext([this, read_buf, cache_file_name](int ret) { - handle_promote_callback(ret, read_buf, cache_file_name); - }); + handle_promote_callback(ret, read_buf, cache_file_name); + }); - return promote_object(&ioctx, object_name, read_buf, ctx); + return promote_object(&ioctx, object_name, read_buf, ctx); } int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, @@ -186,7 +186,7 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace, cache_status_t ret = m_policy->lookup_object(cache_file_name); - switch(ret) { + switch (ret) { case OBJ_CACHE_NONE: { pret = do_promote(pool_nspace, pool_id, snap_id, object_name); if (pret < 0) { @@ -227,7 +227,7 @@ int ObjectCacheStore::evict_objects() { std::list<std::string> obj_list; m_policy->get_evict_list(&obj_list); - for (auto& obj: obj_list) { + for (auto& obj : obj_list) { do_evict(obj); } } @@ -243,9 +243,9 @@ int ObjectCacheStore::do_evict(std::string cache_file) { ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl; - // TODO(): possible race on read? + // TODO(dehao): possible race on read? int ret = std::remove(cache_file_path.c_str()); - // evict metadata + // evict metadata if (ret == 0) { m_policy->update_status(cache_file, OBJ_CACHE_SKIP); m_policy->evict_entry(cache_file); @@ -263,16 +263,16 @@ std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace, } std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name) { - std::string cache_file_dir = ""; if (m_dir_num > 0) { uint32_t crc = 0; - crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), cache_file_name.length()); + crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), + cache_file_name.length()); cache_file_dir = std::to_string(crc % m_dir_num); } return m_cache_root_dir + cache_file_dir + "/" + cache_file_name; } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.h b/src/tools/immutable_object_cache/ObjectCacheStore.h index 935e2970bab..fa86aca4032 100644 --- a/src/tools/immutable_object_cache/ObjectCacheStore.h +++ b/src/tools/immutable_object_cache/ObjectCacheStore.h @@ -21,43 +21,42 @@ namespace immutable_obj_cache { typedef shared_ptr<librados::Rados> RadosRef; typedef shared_ptr<librados::IoCtx> IoCtxRef; -class ObjectCacheStore -{ - public: - ObjectCacheStore(CephContext *cct); - ~ObjectCacheStore(); - int init(bool reset); - int shutdown(); - int init_cache(); - int lookup_object(std::string pool_nspace, - uint64_t pool_id, uint64_t snap_id, - std::string object_name, - std::string& target_cache_file_path); - - private: - std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id, - uint64_t snap_id, std::string oid); - std::string get_cache_file_path(std::string cache_file_name); - int evict_objects(); - int do_promote(std::string pool_nspace, uint64_t pool_id, - uint64_t snap_id, std::string object_name); - int promote_object(librados::IoCtx*, std::string object_name, - librados::bufferlist* read_buf, - Context* on_finish); - int handle_promote_callback(int, bufferlist*, std::string); - int do_evict(std::string cache_file); - - CephContext *m_cct; - RadosRef m_rados; - std::map<uint64_t, librados::IoCtx> m_ioctx_map; - Mutex m_ioctx_map_lock; - Policy* m_policy; - //TODO(): make this configurable - int m_dir_num = 10; - uint64_t object_cache_max_size; - std::string m_cache_root_dir; +class ObjectCacheStore { + public: + ObjectCacheStore(CephContext *cct); + ~ObjectCacheStore(); + int init(bool reset); + int shutdown(); + int init_cache(); + int lookup_object(std::string pool_nspace, + uint64_t pool_id, uint64_t snap_id, + std::string object_name, + std::string& target_cache_file_path); + + private: + std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string oid); + std::string get_cache_file_path(std::string cache_file_name); + int evict_objects(); + int do_promote(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string object_name); + int promote_object(librados::IoCtx*, std::string object_name, + librados::bufferlist* read_buf, + Context* on_finish); + int handle_promote_callback(int, bufferlist*, std::string); + int do_evict(std::string cache_file); + + CephContext *m_cct; + RadosRef m_rados; + std::map<uint64_t, librados::IoCtx> m_ioctx_map; + Mutex m_ioctx_map_lock; + Policy* m_policy; + // TODO(dehao): make this configurable + int m_dir_num = 10; + uint64_t object_cache_max_size; + std::string m_cache_root_dir; }; -} // namespace ceph -} // namespace immutable_obj_cache +} // namespace ceph +} // namespace immutable_obj_cache #endif diff --git a/src/tools/immutable_object_cache/Policy.h b/src/tools/immutable_object_cache/Policy.h index cda8e69c917..3e3a0ef8787 100644 --- a/src/tools/immutable_object_cache/Policy.h +++ b/src/tools/immutable_object_cache/Policy.h @@ -16,18 +16,18 @@ typedef enum { OBJ_CACHE_SKIP, } cache_status_t; - class Policy { -public: - Policy(){} - virtual ~Policy(){}; + public: + Policy() {} + virtual ~Policy() {} virtual cache_status_t lookup_object(std::string) = 0; virtual int evict_entry(std::string) = 0; - virtual void update_status(std::string, cache_status_t, uint64_t size=0) = 0; + virtual void update_status(std::string, cache_status_t, + uint64_t size = 0) = 0; virtual cache_status_t get_status(std::string) = 0; virtual void get_evict_list(std::list<std::string>* obj_list) = 0; }; -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/SimplePolicy.cc b/src/tools/immutable_object_cache/SimplePolicy.cc index 8fe70b4e00d..bb76f95c321 100644 --- a/src/tools/immutable_object_cache/SimplePolicy.cc +++ b/src/tools/immutable_object_cache/SimplePolicy.cc @@ -18,19 +18,18 @@ SimplePolicy::SimplePolicy(CephContext *cct, uint64_t cache_size, : cct(cct), m_watermark(watermark), m_max_cache_size(cache_size), m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock") { ldout(cct, 20) << dendl; - m_max_inflight_ops = cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops"); + m_max_inflight_ops = cct->_conf.get_val<uint64_t>( + "immutable_object_cache_max_inflight_ops"); m_cache_size = 0; } SimplePolicy::~SimplePolicy() { ldout(cct, 20) << dendl; - for (auto it: m_cache_map) { + for (auto it : m_cache_map) { Entry* entry = (it.second); delete entry; } - - } cache_status_t SimplePolicy::alloc_entry(std::string file_name) { @@ -44,13 +43,14 @@ cache_status_t SimplePolicy::alloc_entry(std::string file_name) { return OBJ_CACHE_SKIP; } - if ((m_cache_size < m_max_cache_size) && (inflight_ops < m_max_inflight_ops)) { + if ((m_cache_size < m_max_cache_size) && + (inflight_ops < m_max_inflight_ops)) { Entry* entry = new Entry(); ceph_assert(entry != nullptr); m_cache_map[file_name] = entry; wlocker.unlock(); update_status(file_name, OBJ_CACHE_SKIP); - return OBJ_CACHE_NONE; // start promotion request + return OBJ_CACHE_NONE; // start promotion request } // if there's no free entry, return skip to read from rados @@ -136,7 +136,6 @@ void SimplePolicy::update_status(std::string file_name, m_cache_size -= size; return; } - } int SimplePolicy::evict_entry(std::string file_name) { @@ -165,7 +164,8 @@ void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) { RWLock::WLocker locker(m_cache_map_lock); // check free ratio, pop entries from LRU if ((double)m_cache_size / m_max_cache_size > (1 - m_watermark)) { - int evict_num = m_cache_map.size() * 0.1; //TODO(): make this configurable + // TODO(dehao): make this configurable + int evict_num = m_cache_map.size() * 0.1; for (int i = 0; i < evict_num; i++) { Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); if (entry == nullptr) { @@ -173,7 +173,6 @@ void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) { } std::string file_name = entry->file_name; obj_list->push_back(file_name); - } } } @@ -206,5 +205,5 @@ std::string SimplePolicy::get_evict_entry() { return entry->file_name; } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/SimplePolicy.h b/src/tools/immutable_object_cache/SimplePolicy.h index 7d97bdc9a15..9ff00b26a52 100644 --- a/src/tools/immutable_object_cache/SimplePolicy.h +++ b/src/tools/immutable_object_cache/SimplePolicy.h @@ -17,15 +17,16 @@ namespace ceph { namespace immutable_obj_cache { class SimplePolicy : public Policy { -public: + public: SimplePolicy(CephContext *cct, uint64_t block_num, float watermark); - ~SimplePolicy() ; + ~SimplePolicy(); cache_status_t lookup_object(std::string file_name); cache_status_t get_status(std::string file_name); void update_status(std::string file_name, - cache_status_t new_status, uint64_t size=0); + cache_status_t new_status, + uint64_t size = 0); int evict_entry(std::string file_name); @@ -36,15 +37,15 @@ public: uint64_t get_promoted_entry_num(); std::string get_evict_entry(); -private: + private: cache_status_t alloc_entry(std::string file_name); class Entry : public LRUObject { - public: - cache_status_t status; - Entry() : status(OBJ_CACHE_NONE){} - std::string file_name; - uint64_t size; + public: + cache_status_t status; + Entry() : status(OBJ_CACHE_NONE) {} + std::string file_name; + uint64_t size; }; CephContext* cct; @@ -57,7 +58,6 @@ private: std::unordered_map<std::string, Entry*> m_cache_map; RWLock m_cache_map_lock; - std::deque<Entry*> m_free_list; std::atomic<uint64_t> m_cache_size; @@ -65,6 +65,6 @@ private: LRU m_promoted_lru; }; -} // namespace immutable_obj_cache -} // namespace ceph -#endif // CEPH_CACHE_SIMPLE_POLICY_H +} // namespace immutable_obj_cache +} // namespace ceph +#endif // CEPH_CACHE_SIMPLE_POLICY_H diff --git a/src/tools/immutable_object_cache/SocketCommon.h b/src/tools/immutable_object_cache/SocketCommon.h index db78e427912..9cd108ca06d 100644 --- a/src/tools/immutable_object_cache/SocketCommon.h +++ b/src/tools/immutable_object_cache/SocketCommon.h @@ -23,6 +23,6 @@ class ObjectCacheRequest; typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg; -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc index beb63c3e4ef..a0a4c6352aa 100644 --- a/src/tools/immutable_object_cache/Types.cc +++ b/src/tools/immutable_object_cache/Types.cc @@ -11,10 +11,10 @@ namespace ceph { namespace immutable_obj_cache { -ObjectCacheRequest::ObjectCacheRequest(){} +ObjectCacheRequest::ObjectCacheRequest() {} ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s) : type(t), seq(s) {} -ObjectCacheRequest::~ObjectCacheRequest(){} +ObjectCacheRequest::~ObjectCacheRequest() {} void ObjectCacheRequest::encode() { ENCODE_START(1, 1, payload); @@ -58,9 +58,11 @@ void ObjectCacheRegReplyData::encode_payload() {} void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {} ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s, - uint64_t read_offset, uint64_t read_len, + uint64_t read_offset, + uint64_t read_len, uint64_t pool_id, uint64_t snap_id, - std::string oid, std::string pool_namespace) + std::string oid, + std::string pool_namespace) : ObjectCacheRequest(t, s), read_offset(read_offset), read_len(read_len), pool_id(pool_id), snap_id(snap_id), oid(oid), pool_namespace(pool_namespace) @@ -89,7 +91,8 @@ void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) { ceph::decode(pool_namespace, i); } -ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path) +ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, + string cache_path) : ObjectCacheRequest(t, s), cache_path(cache_path) {} ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s) : ObjectCacheRequest(t, s) {} @@ -114,8 +117,7 @@ void ObjectCacheReadRadosData::encode_payload() {} void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {} -ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) -{ +ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) { ObjectCacheRequest* req = nullptr; uint16_t type; @@ -126,7 +128,7 @@ ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) ceph::decode(seq, i); DECODE_FINISH(i); - switch(type) { + switch (type) { case RBDSC_REGISTER: { req = new ObjectCacheRegData(type, seq); break; @@ -156,5 +158,5 @@ ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) return req; } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index 6518d13ea4d..470f764e0b5 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -26,10 +26,10 @@ inline uint32_t get_data_len(char* buf) { HeaderHelper* header = (HeaderHelper*)buf; return header->len; } -} +} // namespace class ObjectCacheRequest { -public: + public: uint16_t type; uint64_t seq; @@ -43,8 +43,8 @@ public: // encode consists of two steps // step 1 : directly encode common bits using encode method of base classs. - // step 2 : according to payload_empty, determine whether addtional bits need to - // be encoded which be implements by child class. + // step 2 : according to payload_empty, determine whether addtional bits + // need to be encoded which be implements by child class. void encode(); void decode(bufferlist& bl); bufferlist get_payload_bufferlist() { return payload; } @@ -56,7 +56,7 @@ public: }; class ObjectCacheRegData : public ObjectCacheRequest { -public: + public: ObjectCacheRegData(); ObjectCacheRegData(uint16_t t, uint64_t s); ~ObjectCacheRegData() override; @@ -67,7 +67,7 @@ public: }; class ObjectCacheRegReplyData : public ObjectCacheRequest { -public: + public: ObjectCacheRegReplyData(); ObjectCacheRegReplyData(uint16_t t, uint64_t s); ~ObjectCacheRegReplyData() override; @@ -78,15 +78,17 @@ public: }; class ObjectCacheReadData : public ObjectCacheRequest { -public: + public: uint64_t read_offset; uint64_t read_len; uint64_t pool_id; uint64_t snap_id; std::string oid; std::string pool_namespace; - ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id, - uint64_t snap_id, std::string oid, std::string pool_namespace ); + ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, + uint64_t read_len, uint64_t pool_id, + uint64_t snap_id, std::string oid, + std::string pool_namespace); ObjectCacheReadData(uint16_t t, uint64_t s); ~ObjectCacheReadData() override; void encode_payload() override; @@ -96,7 +98,7 @@ public: }; class ObjectCacheReadReplyData : public ObjectCacheRequest { -public: + public: std::string cache_path; ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path); ObjectCacheReadReplyData(uint16_t t, uint64_t s); @@ -108,7 +110,7 @@ public: }; class ObjectCacheReadRadosData : public ObjectCacheRequest { -public: + public: ObjectCacheReadRadosData(); ObjectCacheReadRadosData(uint16_t t, uint64_t s); ~ObjectCacheReadRadosData() override; @@ -120,6 +122,6 @@ public: ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer); -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/Utils.h b/src/tools/immutable_object_cache/Utils.h index daa2bffda8b..71e716bfc22 100644 --- a/src/tools/immutable_object_cache/Utils.h +++ b/src/tools/immutable_object_cache/Utils.h @@ -7,7 +7,6 @@ #include "include/rados/librados.hpp" #include "include/Context.h" - namespace ceph { namespace immutable_obj_cache { namespace detail { @@ -19,7 +18,7 @@ void rados_callback(rados_completion_t c, void *arg) { (obj->*MF)(r); } -} // namespace detail +} // namespace detail template <typename T, void(T::*MF)(int)=&T::complete> librados::AioCompletion *create_rados_callback(T *obj) { @@ -27,6 +26,6 @@ librados::AioCompletion *create_rados_callback(T *obj) { obj, &detail::rados_callback<T, MF>, nullptr); } -} // namespace immutable_obj_cache -} // namespace ceph +} // namespace immutable_obj_cache +} // namespace ceph #endif diff --git a/src/tools/immutable_object_cache/main.cc b/src/tools/immutable_object_cache/main.cc index dffae87acc9..bfe49df6323 100644 --- a/src/tools/immutable_object_cache/main.cc +++ b/src/tools/immutable_object_cache/main.cc @@ -17,20 +17,20 @@ void usage() { std::cout << "usage: cache controller [options...]" << std::endl; std::cout << "options:\n"; std::cout << " -m monaddress[:port] connect to specified monitor\n"; - std::cout << " --keyring=<path> path to keyring for local cluster\n"; + std::cout << " --keyring=<path> path to keyring for local " + << "cluster\n"; std::cout << " --log-file=<logfile> file to log debug output\n"; - std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> set debug level\n"; + std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> " + << "set debug level\n"; generic_server_usage(); } -static void handle_signal(int signum) -{ +static void handle_signal(int signum) { if (cachectl) cachectl->handle_signal(signum); } -int main(int argc, const char **argv) -{ +int main(int argc, const char **argv) { std::vector<const char*> args; env_to_vec(args); argv_to_vec(argc, argv, args); @@ -41,8 +41,8 @@ int main(int argc, const char **argv) } auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, - CODE_ENVIRONMENT_DAEMON, - CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); if (g_conf()->daemonize) { global_init_daemonize(g_ceph_context); |