summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache
diff options
context:
space:
mode:
authorshangdehao1 <dehao.shang@intel.com>2019-03-06 23:24:16 +0100
committerYuan Zhou <yuan.zhou@intel.com>2019-03-21 17:16:30 +0100
commit9cfd3594bc7632d8e693037add083270b489a835 (patch)
treedee0fcc96647df7c0e9149bb1d0525f93d5dff9d /src/tools/immutable_object_cache
parenttools: clean up data structure of immutable obj cache daemon (diff)
downloadceph-9cfd3594bc7632d8e693037add083270b489a835.tar.xz
ceph-9cfd3594bc7632d8e693037add083270b489a835.zip
tools: adjust code style of RO
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
Diffstat (limited to 'src/tools/immutable_object_cache')
-rw-r--r--src/tools/immutable_object_cache/CacheClient.cc62
-rw-r--r--src/tools/immutable_object_cache/CacheClient.h24
-rw-r--r--src/tools/immutable_object_cache/CacheController.cc31
-rw-r--r--src/tools/immutable_object_cache/CacheController.h4
-rw-r--r--src/tools/immutable_object_cache/CacheServer.cc9
-rw-r--r--src/tools/immutable_object_cache/CacheServer.h9
-rw-r--r--src/tools/immutable_object_cache/CacheSession.cc32
-rw-r--r--src/tools/immutable_object_cache/CacheSession.h17
-rw-r--r--src/tools/immutable_object_cache/ObjectCacheStore.cc38
-rw-r--r--src/tools/immutable_object_cache/ObjectCacheStore.h73
-rw-r--r--src/tools/immutable_object_cache/Policy.h14
-rw-r--r--src/tools/immutable_object_cache/SimplePolicy.cc21
-rw-r--r--src/tools/immutable_object_cache/SimplePolicy.h26
-rw-r--r--src/tools/immutable_object_cache/SocketCommon.h4
-rw-r--r--src/tools/immutable_object_cache/Types.cc22
-rw-r--r--src/tools/immutable_object_cache/Types.h28
-rw-r--r--src/tools/immutable_object_cache/Utils.h7
-rw-r--r--src/tools/immutable_object_cache/main.cc16
18 files changed, 225 insertions, 212 deletions
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc
index ba226dd34cf..58083bbb9ae 100644
--- a/src/tools/immutable_object_cache/CacheClient.cc
+++ b/src/tools/immutable_object_cache/CacheClient.cc
@@ -17,9 +17,8 @@ namespace immutable_obj_cache {
m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
m_io_thread(nullptr), m_session_work(false), m_writing(false),
m_reading(false), m_sequence_id(0),
- m_lock("ceph::cache::cacheclient::m_lock")
- {
- // TODO : configure it.
+ m_lock("ceph::cache::cacheclient::m_lock") {
+ // TODO(dehao) : configure it.
m_use_dedicated_worker = true;
m_worker_thread_num = 2;
if (m_use_dedicated_worker) {
@@ -31,14 +30,13 @@ namespace immutable_obj_cache {
}
}
m_bp_header = buffer::create(get_header_size());
-
}
CacheClient::~CacheClient() {
stop();
}
- void CacheClient::run(){
+ void CacheClient::run() {
m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
}
@@ -85,11 +83,12 @@ namespace immutable_obj_cache {
return 0;
}
- void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
- std::string oid, GenContext<ObjectCacheRequest*>* on_finish) {
-
- ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0,
- pool_id, snap_id, oid, pool_nspace);
+ void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid,
+ GenContext<ObjectCacheRequest*>* on_finish) {
+ ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
+ ++m_sequence_id, 0, 0,
+ pool_id, snap_id, oid, pool_nspace);
req->process_msg = on_finish;
req->encode();
@@ -160,7 +159,6 @@ namespace immutable_obj_cache {
}
void CacheClient::read_reply_header() {
-
/* create new head buffer for every reply */
bufferptr bp_head(buffer::create(get_header_size()));
auto raw_ptr = bp_head.c_str();
@@ -175,8 +173,8 @@ namespace immutable_obj_cache {
}
void CacheClient::handle_reply_header(bufferptr bp_head,
- const boost::system::error_code& ec,
- size_t bytes_transferred) {
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
if (ec || bytes_transferred != get_header_size()) {
fault(ASIO_ERROR_READ, ec);
return;
@@ -190,9 +188,9 @@ namespace immutable_obj_cache {
read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
}
- void CacheClient::read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
+ void CacheClient::read_reply_data(bufferptr&& bp_head,
+ bufferptr&& bp_data,
const uint64_t data_len) {
-
auto raw_ptr = bp_data.c_str();
boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
boost::asio::transfer_exactly(data_len),
@@ -200,10 +198,10 @@ namespace immutable_obj_cache {
this, std::move(bp_head), std::move(bp_data), data_len,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
-
}
- void CacheClient::handle_reply_data(bufferptr bp_head, bufferptr bp_data,
+ void CacheClient::handle_reply_data(bufferptr bp_head,
+ bufferptr bp_data,
const uint64_t data_len,
const boost::system::error_code& ec,
size_t bytes_transferred) {
@@ -233,7 +231,6 @@ namespace immutable_obj_cache {
if (is_session_work()) {
receive_message();
}
-
}
void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
@@ -266,7 +263,8 @@ namespace immutable_obj_cache {
}
// if there is one request fails, just execute fault, then shutdown RO.
- void CacheClient::fault(const int err_type, const boost::system::error_code& ec) {
+ void CacheClient::fault(const int err_type,
+ const boost::system::error_code& ec) {
ldout(cct, 20) << "fault." << ec.message() << dendl;
if (err_type == ASIO_ERROR_CONNECT) {
@@ -276,7 +274,8 @@ namespace immutable_obj_cache {
<< ". Immutable-object-cache daemon is down ? "
<< "Data will be read from ceph cluster " << dendl;
} else {
- ldout(cct, 20) << "Connecting RO daemon fails : " << ec.message() << dendl;
+ ldout(cct, 20) << "Connecting RO daemon fails : "
+ << ec.message() << dendl;
}
if (m_dm_socket.is_open()) {
@@ -296,11 +295,11 @@ namespace immutable_obj_cache {
return;
}
- // when current session don't work, ASIO will don't receive any new request from hook.
- // On the other hand, for pending request of ASIO, cancle these request, then call their callback.
- // these request which are cancled by this method, will be re-dispatched to RADOS layer.
- //
- // make sure just have one thread to modify execute below code.
+ /* when current session don't work, ASIO will don't receive any new request from hook.
+ * On the other hand, for pending request of ASIO, cancle these request,
+ * then call their callback. these request which are cancled by this method,
+ * will be re-dispatched to RADOS layer.
+ * make sure just have one thread to modify execute below code. */
m_session_work.store(false);
if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
@@ -321,7 +320,8 @@ namespace immutable_obj_cache {
// currently, for any asio error, just shutdown RO.
close();
- // all pending request, which have entered into ASIO, will be re-dispatched to RADOS.
+ /* all pending request, which have entered into ASIO,
+ * will be re-dispatched to RADOS.*/
{
Mutex::Locker locker(m_lock);
for (auto it : m_seq_to_req) {
@@ -337,7 +337,8 @@ namespace immutable_obj_cache {
}
int CacheClient::register_client(Context* on_finish) {
- ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++);
+ ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
+ m_sequence_id++);
reg_req->encode();
bufferlist bl;
@@ -365,7 +366,8 @@ namespace immutable_obj_cache {
uint64_t data_len = get_data_len(m_bp_header.c_str());
bufferptr bp_data(buffer::create(data_len));
- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec);
+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
+ data_len), ec);
if (ec || ret != data_len) {
fault(ASIO_ERROR_READ, ec);
return -1;
@@ -387,5 +389,5 @@ namespace immutable_obj_cache {
return 0;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h
index 8917062286c..47a39cb1432 100644
--- a/src/tools/immutable_object_cache/CacheClient.h
+++ b/src/tools/immutable_object_cache/CacheClient.h
@@ -9,6 +9,7 @@
#include <boost/bind.hpp>
#include <boost/asio/error.hpp>
#include <boost/algorithm/string.hpp>
+
#include "include/ceph_assert.h"
#include "include/Context.h"
#include "common/Mutex.h"
@@ -22,8 +23,7 @@ namespace ceph {
namespace immutable_obj_cache {
class CacheClient {
-public:
-
+ public:
CacheClient(const std::string& file, CephContext* ceph_ctx);
~CacheClient();
void run();
@@ -31,12 +31,12 @@ public:
void close();
int stop();
int connect();
- void lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id,
- std::string oid, GenContext<ObjectCacheRequest*>* on_finish);
+ void lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid,
+ GenContext<ObjectCacheRequest*>* on_finish);
int register_client(Context* on_finish);
-private:
-
+ private:
void send_message();
void try_send();
void fault(const int err_type, const boost::system::error_code& err);
@@ -45,14 +45,16 @@ private:
void process(ObjectCacheRequest* reply, uint64_t seq_id);
void read_reply_header();
void handle_reply_header(bufferptr bp_head,
- const boost::system::error_code& ec, size_t bytes_transferred);
+ const boost::system::error_code& ec,
+ size_t bytes_transferred);
void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
const uint64_t data_len);
void handle_reply_data(bufferptr bp_head, bufferptr bp_data,
const uint64_t data_len,
- const boost::system::error_code& ec, size_t bytes_transferred);
-private:
+ const boost::system::error_code& ec,
+ size_t bytes_transferred);
+ private:
CephContext* cct;
boost::asio::io_service m_io_service;
boost::asio::io_service::work m_io_service_work;
@@ -76,6 +78,6 @@ private:
bufferptr m_bp_header;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc
index 7648a3a6d05..4fc2dce910c 100644
--- a/src/tools/immutable_object_cache/CacheController.cc
+++ b/src/tools/immutable_object_cache/CacheController.cc
@@ -12,7 +12,8 @@
namespace ceph {
namespace immutable_obj_cache {
-CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
+CacheController::CacheController(CephContext *cct,
+ const std::vector<const char*> &args):
m_args(args), m_cct(cct) {
ldout(m_cct, 20) << dendl;
}
@@ -26,7 +27,7 @@ int CacheController::init() {
ldout(m_cct, 20) << dendl;
m_object_cache_store = new ObjectCacheStore(m_cct);
- //TODO(): make this configurable
+ // TODO(dehao): make this configurable
int r = m_object_cache_store->init(true);
if (r < 0) {
lderr(m_cct) << "init error\n" << dendl;
@@ -65,7 +66,8 @@ void CacheController::handle_signal(int signum) {
void CacheController::run() {
try {
- std::string controller_path = m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
+ std::string controller_path =
+ m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
std::remove(controller_path.c_str());
m_cache_server = new CacheServer(m_cct, controller_path,
@@ -80,14 +82,16 @@ void CacheController::run() {
}
}
-void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){
+void CacheController::handle_request(uint64_t session_id,
+ ObjectCacheRequest* req) {
ldout(m_cct, 20) << dendl;
switch (req->get_request_type()) {
case RBDSC_REGISTER: {
- // TODO(): skip register and allow clients to lookup directly
+ // TODO(dehao): skip register and allow clients to lookup directly
- ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
+ ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
+ RBDSC_REGISTER_REPLY, req->seq);
m_cache_server->send(session_id, reply);
break;
}
@@ -95,16 +99,15 @@ void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* re
// lookup object in local cache store
std::string cache_path;
ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req;
- int ret = m_object_cache_store->lookup_object(req_read_data->pool_namespace,
- req_read_data->pool_id,
- req_read_data->snap_id,
- req_read_data->oid,
- cache_path);
+ int ret = m_object_cache_store->lookup_object(
+ req_read_data->pool_namespace, req_read_data->pool_id,
+ req_read_data->snap_id, req_read_data->oid, cache_path);
ObjectCacheRequest* reply = nullptr;
if (ret != OBJ_CACHE_PROMOTED) {
reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
} else {
- reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path);
+ reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
+ req->seq, cache_path);
}
m_cache_server->send(session_id, reply);
break;
@@ -115,5 +118,5 @@ void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* re
}
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheController.h b/src/tools/immutable_object_cache/CacheController.h
index 79b3b9525de..ef8495f767e 100644
--- a/src/tools/immutable_object_cache/CacheController.h
+++ b/src/tools/immutable_object_cache/CacheController.h
@@ -34,7 +34,7 @@ class CacheController {
ObjectCacheStore *m_object_cache_store;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/CacheServer.cc b/src/tools/immutable_object_cache/CacheServer.cc
index e10db19c44a..0df58d55f50 100644
--- a/src/tools/immutable_object_cache/CacheServer.cc
+++ b/src/tools/immutable_object_cache/CacheServer.cc
@@ -76,7 +76,8 @@ int CacheServer::start_accept() {
void CacheServer::accept() {
CacheSessionPtr new_session = nullptr;
- new_session.reset(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
+ new_session.reset(new CacheSession(m_session_id, m_io_service,
+ m_server_process_msg, cct));
m_acceptor.async_accept(new_session->socket(),
boost::bind(&CacheServer::handle_accept, this, new_session,
@@ -93,7 +94,7 @@ void CacheServer::handle_accept(CacheSessionPtr new_session,
}
m_session_map.emplace(m_session_id, new_session);
- // TODO : session setting
+ // TODO(dehao) : session setting
new_session->start();
m_session_id++;
@@ -113,5 +114,5 @@ void CacheServer::send(uint64_t session_id, ObjectCacheRequest* msg) {
}
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheServer.h b/src/tools/immutable_object_cache/CacheServer.h
index 4646a15952b..84cdd89e5e1 100644
--- a/src/tools/immutable_object_cache/CacheServer.h
+++ b/src/tools/immutable_object_cache/CacheServer.h
@@ -29,7 +29,8 @@ class CacheServer {
private:
void accept();
- void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
+ void handle_accept(CacheSessionPtr new_session,
+ const boost::system::error_code& error);
private:
CephContext* cct;
@@ -38,11 +39,11 @@ class CacheServer {
stream_protocol::endpoint m_local_path;
stream_protocol::acceptor m_acceptor;
uint64_t m_session_id = 1;
- // TODO : need to lock it.
+ // TODO(dehao) : need to lock it.
std::map<uint64_t, CacheSessionPtr> m_session_map;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc
index e4f76a7c806..6069f811259 100644
--- a/src/tools/immutable_object_cache/CacheSession.cc
+++ b/src/tools/immutable_object_cache/CacheSession.cc
@@ -47,12 +47,11 @@ void CacheSession::start() {
void CacheSession::read_request_header() {
ldout(cct, 20) << dendl;
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
- boost::asio::transfer_exactly(get_header_size()),
- boost::bind(&CacheSession::handle_request_header,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
+ boost::asio::transfer_exactly(get_header_size()),
+ boost::bind(&CacheSession::handle_request_header,
+ shared_from_this(), boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
void CacheSession::handle_request_header(const boost::system::error_code& err,
@@ -70,12 +69,12 @@ void CacheSession::read_request_data(uint64_t data_len) {
ldout(cct, 20) << dendl;
bufferptr bp_data(buffer::create(data_len));
boost::asio::async_read(m_dm_socket,
- boost::asio::buffer(bp_data.c_str(), bp_data.length()),
- boost::asio::transfer_exactly(data_len),
- boost::bind(&CacheSession::handle_request_data,
- shared_from_this(), bp_data, data_len,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ boost::asio::buffer(bp_data.c_str(), bp_data.length()),
+ boost::asio::transfer_exactly(data_len),
+ boost::bind(&CacheSession::handle_request_data,
+ shared_from_this(), bp_data, data_len,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
@@ -112,7 +111,8 @@ void CacheSession::send(ObjectCacheRequest* reply) {
boost::asio::async_write(m_dm_socket,
boost::asio::buffer(bl.c_str(), bl.length()),
boost::asio::transfer_exactly(bl.length()),
- [this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) {
+ [this, bl, reply](const boost::system::error_code& err,
+ size_t bytes_transferred) {
if (err || bytes_transferred != bl.length()) {
fault();
return;
@@ -123,8 +123,8 @@ void CacheSession::send(ObjectCacheRequest* reply) {
void CacheSession::fault() {
ldout(cct, 20) << dendl;
- // TODO
+ // TODO(dehao)
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheSession.h b/src/tools/immutable_object_cache/CacheSession.h
index 5f5cd1be9bd..36bd5dbaad9 100644
--- a/src/tools/immutable_object_cache/CacheSession.h
+++ b/src/tools/immutable_object_cache/CacheSession.h
@@ -18,22 +18,25 @@ namespace ceph {
namespace immutable_obj_cache {
class CacheSession : public std::enable_shared_from_this<CacheSession> {
-public:
- CacheSession(uint64_t session_id, io_service& io_service, ProcessMsg process_msg, CephContext* ctx);
+ public:
+ CacheSession(uint64_t session_id, io_service& io_service,
+ ProcessMsg process_msg, CephContext* ctx);
~CacheSession();
stream_protocol::socket& socket();
void close();
void start();
void read_request_header();
- void handle_request_header(const boost::system::error_code& err, size_t bytes_transferred);
+ void handle_request_header(const boost::system::error_code& err,
+ size_t bytes_transferred);
void read_request_data(uint64_t data_len);
void handle_request_data(bufferptr bp, uint64_t data_len,
- const boost::system::error_code& err, size_t bytes_transferred);
+ const boost::system::error_code& err,
+ size_t bytes_transferred);
void process(ObjectCacheRequest* req);
void fault();
void send(ObjectCacheRequest* msg);
-private:
+ private:
uint64_t m_session_id;
stream_protocol::socket m_dm_socket;
ProcessMsg m_server_process_msg;
@@ -44,7 +47,7 @@ private:
typedef std::shared_ptr<CacheSession> CacheSessionPtr;
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc
index cafe4a731e8..f0b6eb1a5ab 100644
--- a/src/tools/immutable_object_cache/ObjectCacheStore.cc
+++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc
@@ -19,7 +19,6 @@ namespace immutable_obj_cache {
ObjectCacheStore::ObjectCacheStore(CephContext *cct)
: m_cct(cct), m_rados(new librados::Rados()),
m_ioctx_map_lock("ceph::cache::ObjectCacheStore::m_ioctx_map_lock") {
-
object_cache_max_size =
m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size");
@@ -30,7 +29,7 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct)
m_cache_root_dir += "/";
}
- //TODO(): allow to set cache level
+ // TODO(dehao): allow to set cache level
m_policy = new SimplePolicy(m_cct, object_cache_max_size, 0.1);
}
@@ -48,22 +47,23 @@ int ObjectCacheStore::init(bool reset) {
}
ret = m_rados->connect();
- if (ret < 0 ) {
+ if (ret < 0) {
lderr(m_cct) << "fail to connect to cluster" << dendl;
return ret;
}
- //TODO(): fsck and reuse existing cache objects
+ // TODO(dehao): fsck and reuse existing cache objects
if (reset) {
std::error_code ec;
if (efs::exists(m_cache_root_dir)) {
int dir = m_dir_num - 1;
while (dir >= 0) {
- if (!efs::remove_all(m_cache_root_dir + "/" + std::to_string(dir), ec)) {
+ if (!efs::remove_all(
+ m_cache_root_dir + "/" + std::to_string(dir), ec)) {
lderr(m_cct) << "fail to remove old cache store: " << ec << dendl;
- return ec.value();
+ return ec.value();
}
- dir --;
+ dir--;
}
} else {
if (!efs::create_directories(m_cache_root_dir, ec)) {
@@ -89,7 +89,7 @@ int ObjectCacheStore::init_cache() {
int dir = m_dir_num - 1;
while (dir >= 0) {
efs::create_directories(cache_dir + "/" + std::to_string(dir));
- dir --;
+ dir--;
}
return 0;
}
@@ -126,10 +126,10 @@ int ObjectCacheStore::do_promote(std::string pool_nspace,
librados::bufferlist* read_buf = new librados::bufferlist();
auto ctx = new FunctionContext([this, read_buf, cache_file_name](int ret) {
- handle_promote_callback(ret, read_buf, cache_file_name);
- });
+ handle_promote_callback(ret, read_buf, cache_file_name);
+ });
- return promote_object(&ioctx, object_name, read_buf, ctx);
+ return promote_object(&ioctx, object_name, read_buf, ctx);
}
int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
@@ -186,7 +186,7 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace,
cache_status_t ret = m_policy->lookup_object(cache_file_name);
- switch(ret) {
+ switch (ret) {
case OBJ_CACHE_NONE: {
pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
if (pret < 0) {
@@ -227,7 +227,7 @@ int ObjectCacheStore::evict_objects() {
std::list<std::string> obj_list;
m_policy->get_evict_list(&obj_list);
- for (auto& obj: obj_list) {
+ for (auto& obj : obj_list) {
do_evict(obj);
}
}
@@ -243,9 +243,9 @@ int ObjectCacheStore::do_evict(std::string cache_file) {
ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl;
- // TODO(): possible race on read?
+ // TODO(dehao): possible race on read?
int ret = std::remove(cache_file_path.c_str());
- // evict metadata
+ // evict metadata
if (ret == 0) {
m_policy->update_status(cache_file, OBJ_CACHE_SKIP);
m_policy->evict_entry(cache_file);
@@ -263,16 +263,16 @@ std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace,
}
std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name) {
-
std::string cache_file_dir = "";
if (m_dir_num > 0) {
uint32_t crc = 0;
- crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), cache_file_name.length());
+ crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(),
+ cache_file_name.length());
cache_file_dir = std::to_string(crc % m_dir_num);
}
return m_cache_root_dir + cache_file_dir + "/" + cache_file_name;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.h b/src/tools/immutable_object_cache/ObjectCacheStore.h
index 935e2970bab..fa86aca4032 100644
--- a/src/tools/immutable_object_cache/ObjectCacheStore.h
+++ b/src/tools/immutable_object_cache/ObjectCacheStore.h
@@ -21,43 +21,42 @@ namespace immutable_obj_cache {
typedef shared_ptr<librados::Rados> RadosRef;
typedef shared_ptr<librados::IoCtx> IoCtxRef;
-class ObjectCacheStore
-{
- public:
- ObjectCacheStore(CephContext *cct);
- ~ObjectCacheStore();
- int init(bool reset);
- int shutdown();
- int init_cache();
- int lookup_object(std::string pool_nspace,
- uint64_t pool_id, uint64_t snap_id,
- std::string object_name,
- std::string& target_cache_file_path);
-
- private:
- std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
- uint64_t snap_id, std::string oid);
- std::string get_cache_file_path(std::string cache_file_name);
- int evict_objects();
- int do_promote(std::string pool_nspace, uint64_t pool_id,
- uint64_t snap_id, std::string object_name);
- int promote_object(librados::IoCtx*, std::string object_name,
- librados::bufferlist* read_buf,
- Context* on_finish);
- int handle_promote_callback(int, bufferlist*, std::string);
- int do_evict(std::string cache_file);
-
- CephContext *m_cct;
- RadosRef m_rados;
- std::map<uint64_t, librados::IoCtx> m_ioctx_map;
- Mutex m_ioctx_map_lock;
- Policy* m_policy;
- //TODO(): make this configurable
- int m_dir_num = 10;
- uint64_t object_cache_max_size;
- std::string m_cache_root_dir;
+class ObjectCacheStore {
+ public:
+ ObjectCacheStore(CephContext *cct);
+ ~ObjectCacheStore();
+ int init(bool reset);
+ int shutdown();
+ int init_cache();
+ int lookup_object(std::string pool_nspace,
+ uint64_t pool_id, uint64_t snap_id,
+ std::string object_name,
+ std::string& target_cache_file_path);
+
+ private:
+ std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid);
+ std::string get_cache_file_path(std::string cache_file_name);
+ int evict_objects();
+ int do_promote(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string object_name);
+ int promote_object(librados::IoCtx*, std::string object_name,
+ librados::bufferlist* read_buf,
+ Context* on_finish);
+ int handle_promote_callback(int, bufferlist*, std::string);
+ int do_evict(std::string cache_file);
+
+ CephContext *m_cct;
+ RadosRef m_rados;
+ std::map<uint64_t, librados::IoCtx> m_ioctx_map;
+ Mutex m_ioctx_map_lock;
+ Policy* m_policy;
+ // TODO(dehao): make this configurable
+ int m_dir_num = 10;
+ uint64_t object_cache_max_size;
+ std::string m_cache_root_dir;
};
-} // namespace ceph
-} // namespace immutable_obj_cache
+} // namespace ceph
+} // namespace immutable_obj_cache
#endif
diff --git a/src/tools/immutable_object_cache/Policy.h b/src/tools/immutable_object_cache/Policy.h
index cda8e69c917..3e3a0ef8787 100644
--- a/src/tools/immutable_object_cache/Policy.h
+++ b/src/tools/immutable_object_cache/Policy.h
@@ -16,18 +16,18 @@ typedef enum {
OBJ_CACHE_SKIP,
} cache_status_t;
-
class Policy {
-public:
- Policy(){}
- virtual ~Policy(){};
+ public:
+ Policy() {}
+ virtual ~Policy() {}
virtual cache_status_t lookup_object(std::string) = 0;
virtual int evict_entry(std::string) = 0;
- virtual void update_status(std::string, cache_status_t, uint64_t size=0) = 0;
+ virtual void update_status(std::string, cache_status_t,
+ uint64_t size = 0) = 0;
virtual cache_status_t get_status(std::string) = 0;
virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/SimplePolicy.cc b/src/tools/immutable_object_cache/SimplePolicy.cc
index 8fe70b4e00d..bb76f95c321 100644
--- a/src/tools/immutable_object_cache/SimplePolicy.cc
+++ b/src/tools/immutable_object_cache/SimplePolicy.cc
@@ -18,19 +18,18 @@ SimplePolicy::SimplePolicy(CephContext *cct, uint64_t cache_size,
: cct(cct), m_watermark(watermark), m_max_cache_size(cache_size),
m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock") {
ldout(cct, 20) << dendl;
- m_max_inflight_ops = cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
+ m_max_inflight_ops = cct->_conf.get_val<uint64_t>(
+ "immutable_object_cache_max_inflight_ops");
m_cache_size = 0;
}
SimplePolicy::~SimplePolicy() {
ldout(cct, 20) << dendl;
- for (auto it: m_cache_map) {
+ for (auto it : m_cache_map) {
Entry* entry = (it.second);
delete entry;
}
-
-
}
cache_status_t SimplePolicy::alloc_entry(std::string file_name) {
@@ -44,13 +43,14 @@ cache_status_t SimplePolicy::alloc_entry(std::string file_name) {
return OBJ_CACHE_SKIP;
}
- if ((m_cache_size < m_max_cache_size) && (inflight_ops < m_max_inflight_ops)) {
+ if ((m_cache_size < m_max_cache_size) &&
+ (inflight_ops < m_max_inflight_ops)) {
Entry* entry = new Entry();
ceph_assert(entry != nullptr);
m_cache_map[file_name] = entry;
wlocker.unlock();
update_status(file_name, OBJ_CACHE_SKIP);
- return OBJ_CACHE_NONE; // start promotion request
+ return OBJ_CACHE_NONE; // start promotion request
}
// if there's no free entry, return skip to read from rados
@@ -136,7 +136,6 @@ void SimplePolicy::update_status(std::string file_name,
m_cache_size -= size;
return;
}
-
}
int SimplePolicy::evict_entry(std::string file_name) {
@@ -165,7 +164,8 @@ void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) {
RWLock::WLocker locker(m_cache_map_lock);
// check free ratio, pop entries from LRU
if ((double)m_cache_size / m_max_cache_size > (1 - m_watermark)) {
- int evict_num = m_cache_map.size() * 0.1; //TODO(): make this configurable
+ // TODO(dehao): make this configurable
+ int evict_num = m_cache_map.size() * 0.1;
for (int i = 0; i < evict_num; i++) {
Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
if (entry == nullptr) {
@@ -173,7 +173,6 @@ void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) {
}
std::string file_name = entry->file_name;
obj_list->push_back(file_name);
-
}
}
}
@@ -206,5 +205,5 @@ std::string SimplePolicy::get_evict_entry() {
return entry->file_name;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/SimplePolicy.h b/src/tools/immutable_object_cache/SimplePolicy.h
index 7d97bdc9a15..9ff00b26a52 100644
--- a/src/tools/immutable_object_cache/SimplePolicy.h
+++ b/src/tools/immutable_object_cache/SimplePolicy.h
@@ -17,15 +17,16 @@ namespace ceph {
namespace immutable_obj_cache {
class SimplePolicy : public Policy {
-public:
+ public:
SimplePolicy(CephContext *cct, uint64_t block_num, float watermark);
- ~SimplePolicy() ;
+ ~SimplePolicy();
cache_status_t lookup_object(std::string file_name);
cache_status_t get_status(std::string file_name);
void update_status(std::string file_name,
- cache_status_t new_status, uint64_t size=0);
+ cache_status_t new_status,
+ uint64_t size = 0);
int evict_entry(std::string file_name);
@@ -36,15 +37,15 @@ public:
uint64_t get_promoted_entry_num();
std::string get_evict_entry();
-private:
+ private:
cache_status_t alloc_entry(std::string file_name);
class Entry : public LRUObject {
- public:
- cache_status_t status;
- Entry() : status(OBJ_CACHE_NONE){}
- std::string file_name;
- uint64_t size;
+ public:
+ cache_status_t status;
+ Entry() : status(OBJ_CACHE_NONE) {}
+ std::string file_name;
+ uint64_t size;
};
CephContext* cct;
@@ -57,7 +58,6 @@ private:
std::unordered_map<std::string, Entry*> m_cache_map;
RWLock m_cache_map_lock;
-
std::deque<Entry*> m_free_list;
std::atomic<uint64_t> m_cache_size;
@@ -65,6 +65,6 @@ private:
LRU m_promoted_lru;
};
-} // namespace immutable_obj_cache
-} // namespace ceph
-#endif // CEPH_CACHE_SIMPLE_POLICY_H
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif // CEPH_CACHE_SIMPLE_POLICY_H
diff --git a/src/tools/immutable_object_cache/SocketCommon.h b/src/tools/immutable_object_cache/SocketCommon.h
index db78e427912..9cd108ca06d 100644
--- a/src/tools/immutable_object_cache/SocketCommon.h
+++ b/src/tools/immutable_object_cache/SocketCommon.h
@@ -23,6 +23,6 @@ class ObjectCacheRequest;
typedef std::function<void(uint64_t, ObjectCacheRequest*)> ProcessMsg;
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc
index beb63c3e4ef..a0a4c6352aa 100644
--- a/src/tools/immutable_object_cache/Types.cc
+++ b/src/tools/immutable_object_cache/Types.cc
@@ -11,10 +11,10 @@
namespace ceph {
namespace immutable_obj_cache {
-ObjectCacheRequest::ObjectCacheRequest(){}
+ObjectCacheRequest::ObjectCacheRequest() {}
ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
: type(t), seq(s) {}
-ObjectCacheRequest::~ObjectCacheRequest(){}
+ObjectCacheRequest::~ObjectCacheRequest() {}
void ObjectCacheRequest::encode() {
ENCODE_START(1, 1, payload);
@@ -58,9 +58,11 @@ void ObjectCacheRegReplyData::encode_payload() {}
void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
- uint64_t read_offset, uint64_t read_len,
+ uint64_t read_offset,
+ uint64_t read_len,
uint64_t pool_id, uint64_t snap_id,
- std::string oid, std::string pool_namespace)
+ std::string oid,
+ std::string pool_namespace)
: ObjectCacheRequest(t, s), read_offset(read_offset),
read_len(read_len), pool_id(pool_id), snap_id(snap_id),
oid(oid), pool_namespace(pool_namespace)
@@ -89,7 +91,8 @@ void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) {
ceph::decode(pool_namespace, i);
}
-ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path)
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s,
+ string cache_path)
: ObjectCacheRequest(t, s), cache_path(cache_path) {}
ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s)
: ObjectCacheRequest(t, s) {}
@@ -114,8 +117,7 @@ void ObjectCacheReadRadosData::encode_payload() {}
void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
-ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer)
-{
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) {
ObjectCacheRequest* req = nullptr;
uint16_t type;
@@ -126,7 +128,7 @@ ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer)
ceph::decode(seq, i);
DECODE_FINISH(i);
- switch(type) {
+ switch (type) {
case RBDSC_REGISTER: {
req = new ObjectCacheRegData(type, seq);
break;
@@ -156,5 +158,5 @@ ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer)
return req;
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h
index 6518d13ea4d..470f764e0b5 100644
--- a/src/tools/immutable_object_cache/Types.h
+++ b/src/tools/immutable_object_cache/Types.h
@@ -26,10 +26,10 @@ inline uint32_t get_data_len(char* buf) {
HeaderHelper* header = (HeaderHelper*)buf;
return header->len;
}
-}
+} // namespace
class ObjectCacheRequest {
-public:
+ public:
uint16_t type;
uint64_t seq;
@@ -43,8 +43,8 @@ public:
// encode consists of two steps
// step 1 : directly encode common bits using encode method of base classs.
- // step 2 : according to payload_empty, determine whether addtional bits need to
- // be encoded which be implements by child class.
+ // step 2 : according to payload_empty, determine whether addtional bits
+ // need to be encoded which be implements by child class.
void encode();
void decode(bufferlist& bl);
bufferlist get_payload_bufferlist() { return payload; }
@@ -56,7 +56,7 @@ public:
};
class ObjectCacheRegData : public ObjectCacheRequest {
-public:
+ public:
ObjectCacheRegData();
ObjectCacheRegData(uint16_t t, uint64_t s);
~ObjectCacheRegData() override;
@@ -67,7 +67,7 @@ public:
};
class ObjectCacheRegReplyData : public ObjectCacheRequest {
-public:
+ public:
ObjectCacheRegReplyData();
ObjectCacheRegReplyData(uint16_t t, uint64_t s);
~ObjectCacheRegReplyData() override;
@@ -78,15 +78,17 @@ public:
};
class ObjectCacheReadData : public ObjectCacheRequest {
-public:
+ public:
uint64_t read_offset;
uint64_t read_len;
uint64_t pool_id;
uint64_t snap_id;
std::string oid;
std::string pool_namespace;
- ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id,
- uint64_t snap_id, std::string oid, std::string pool_namespace );
+ ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset,
+ uint64_t read_len, uint64_t pool_id,
+ uint64_t snap_id, std::string oid,
+ std::string pool_namespace);
ObjectCacheReadData(uint16_t t, uint64_t s);
~ObjectCacheReadData() override;
void encode_payload() override;
@@ -96,7 +98,7 @@ public:
};
class ObjectCacheReadReplyData : public ObjectCacheRequest {
-public:
+ public:
std::string cache_path;
ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path);
ObjectCacheReadReplyData(uint16_t t, uint64_t s);
@@ -108,7 +110,7 @@ public:
};
class ObjectCacheReadRadosData : public ObjectCacheRequest {
-public:
+ public:
ObjectCacheReadRadosData();
ObjectCacheReadRadosData(uint16_t t, uint64_t s);
~ObjectCacheReadRadosData() override;
@@ -120,6 +122,6 @@ public:
ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer);
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/Utils.h b/src/tools/immutable_object_cache/Utils.h
index daa2bffda8b..71e716bfc22 100644
--- a/src/tools/immutable_object_cache/Utils.h
+++ b/src/tools/immutable_object_cache/Utils.h
@@ -7,7 +7,6 @@
#include "include/rados/librados.hpp"
#include "include/Context.h"
-
namespace ceph {
namespace immutable_obj_cache {
namespace detail {
@@ -19,7 +18,7 @@ void rados_callback(rados_completion_t c, void *arg) {
(obj->*MF)(r);
}
-} // namespace detail
+} // namespace detail
template <typename T, void(T::*MF)(int)=&T::complete>
librados::AioCompletion *create_rados_callback(T *obj) {
@@ -27,6 +26,6 @@ librados::AioCompletion *create_rados_callback(T *obj) {
obj, &detail::rados_callback<T, MF>, nullptr);
}
-} // namespace immutable_obj_cache
-} // namespace ceph
+} // namespace immutable_obj_cache
+} // namespace ceph
#endif
diff --git a/src/tools/immutable_object_cache/main.cc b/src/tools/immutable_object_cache/main.cc
index dffae87acc9..bfe49df6323 100644
--- a/src/tools/immutable_object_cache/main.cc
+++ b/src/tools/immutable_object_cache/main.cc
@@ -17,20 +17,20 @@ void usage() {
std::cout << "usage: cache controller [options...]" << std::endl;
std::cout << "options:\n";
std::cout << " -m monaddress[:port] connect to specified monitor\n";
- std::cout << " --keyring=<path> path to keyring for local cluster\n";
+ std::cout << " --keyring=<path> path to keyring for local "
+ << "cluster\n";
std::cout << " --log-file=<logfile> file to log debug output\n";
- std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> set debug level\n";
+ std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> "
+ << "set debug level\n";
generic_server_usage();
}
-static void handle_signal(int signum)
-{
+static void handle_signal(int signum) {
if (cachectl)
cachectl->handle_signal(signum);
}
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
std::vector<const char*> args;
env_to_vec(args);
argv_to_vec(argc, argv, args);
@@ -41,8 +41,8 @@ int main(int argc, const char **argv)
}
auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
- CODE_ENVIRONMENT_DAEMON,
- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
if (g_conf()->daemonize) {
global_init_daemonize(g_ceph_context);