diff options
author | shangdehao1 <dehao.shang@intel.com> | 2019-02-26 18:18:37 +0100 |
---|---|---|
committer | Yuan Zhou <yuan.zhou@intel.com> | 2019-03-21 17:16:30 +0100 |
commit | 3f521074c59e34d3a96e6eb710b5d2a03b2388ec (patch) | |
tree | 2e0ef45fe6fa02fa53160474f1255eb3a92406ad /src/tools/immutable_object_cache | |
parent | tools: return real cache status on lookup in immutable obj cache (diff) | |
download | ceph-3f521074c59e34d3a96e6eb710b5d2a03b2388ec.tar.xz ceph-3f521074c59e34d3a96e6eb710b5d2a03b2388ec.zip |
tools: refactor ObjectCacheRequest of RO
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
Diffstat (limited to 'src/tools/immutable_object_cache')
-rw-r--r-- | src/tools/immutable_object_cache/CacheClient.cc | 32 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/CacheController.cc | 33 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/CacheSession.cc | 4 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/Types.cc | 233 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/Types.h | 123 |
5 files changed, 196 insertions, 229 deletions
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index 20d5426b37f..e2ab8e6c039 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -25,7 +25,7 @@ namespace immutable_obj_cache { if (m_use_dedicated_worker) { m_worker = new boost::asio::io_service(); m_worker_io_service_work = new boost::asio::io_service::work(*m_worker); - for(uint64_t i = 0; i < m_worker_thread_num; i++) { + for (uint64_t i = 0; i < m_worker_thread_num; i++) { std::thread* thd = new std::thread([this](){m_worker->run();}); m_worker_threads.push_back(thd); } @@ -55,7 +55,7 @@ namespace immutable_obj_cache { } if (m_use_dedicated_worker) { m_worker->stop(); - for(auto thd : m_worker_threads) { + for (auto thd : m_worker_threads) { thd->join(); delete thd; } @@ -88,22 +88,16 @@ namespace immutable_obj_cache { void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, std::string oid, GenContext<ObjectCacheRequest*>* on_finish) { - ObjectCacheReadData data; - data.type = RBDSC_READ; - data.seq = ++m_sequence_id; - - data.m_pool_id = pool_id; - data.m_snap_id = snap_id; - data.m_pool_namespace = pool_nspace; - data.m_oid = oid; - ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ); + ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, ++m_sequence_id, 0, 0, + pool_id, snap_id, oid, pool_nspace); req->m_process_msg = on_finish; + req->encode(); { Mutex::Locker locker(m_lock); - m_outcoming_bl.append(req->get_data_buffer()); - ceph_assert(m_seq_to_req.find(data.seq) == m_seq_to_req.end()); - m_seq_to_req[data.seq] = req; + m_outcoming_bl.append(req->get_payload_bufferlist()); + ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end()); + m_seq_to_req[req->seq] = req; } // try to send message to server. @@ -330,7 +324,7 @@ namespace immutable_obj_cache { // all pending request, which have entered into ASIO, will be re-dispatched to RADOS. { Mutex::Locker locker(m_lock); - for(auto it : m_seq_to_req) { + for (auto it : m_seq_to_req) { it.second->type = RBDSC_READ_RADOS; it.second->m_process_msg->complete(it.second); } @@ -343,13 +337,11 @@ namespace immutable_obj_cache { } int CacheClient::register_client(Context* on_finish) { - ObjectCacheRegData data; - data.seq = m_sequence_id++; - data.type = RBDSC_REGISTER; - ObjectCacheRequest* reg_req = encode_object_cache_request(&data, RBDSC_REGISTER); + ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, m_sequence_id++); + reg_req->encode(); bufferlist bl; - bl.append(reg_req->get_data_buffer()); + bl.append(reg_req->get_payload_bufferlist()); uint64_t ret; boost::system::error_code ec; diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc index 213b2d36b50..319631d3463 100644 --- a/src/tools/immutable_object_cache/CacheController.cc +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -83,39 +83,30 @@ void CacheController::run() { void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){ ldout(m_cct, 20) << dendl; - switch (req->type) { + switch (req->get_request_type()) { case RBDSC_REGISTER: { // TODO(): skip register and allow clients to lookup directly - ObjectCacheRegReplyData data; - data.type = RBDSC_REGISTER_REPLY; - data.seq = req->seq; - req = encode_object_cache_request(&data, RBDSC_REGISTER_REPLY); - m_cache_server->send(session_id, req); + ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq); + m_cache_server->send(session_id, reply); break; } case RBDSC_READ: { // lookup object in local cache store - ObjectCacheReadData* data = (ObjectCacheReadData*)(req->m_data); std::string cache_path; - int ret = m_object_cache_store->lookup_object(data->m_pool_namespace, - data->m_pool_id, - data->m_snap_id, - data->m_oid, + ObjectCacheReadData* req_read_data = (ObjectCacheReadData*)req; + int ret = m_object_cache_store->lookup_object(req_read_data->m_pool_namespace, + req_read_data->m_pool_id, + req_read_data->m_snap_id, + req_read_data->m_oid, cache_path); + ObjectCacheRequest* reply = nullptr; if (ret != OBJ_CACHE_PROMOTED) { - ObjectCacheReadRadosData reply_data; - reply_data.type = RBDSC_READ_RADOS; - reply_data.seq = req->seq; - req = encode_object_cache_request(&reply_data, RBDSC_READ_RADOS); + reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq); } else { - ObjectCacheReadReplyData reply_data; - reply_data.m_cache_path = cache_path; - reply_data.type = RBDSC_READ_REPLY; - reply_data.seq = req->seq; - req = encode_object_cache_request(&reply_data, RBDSC_READ_REPLY); + reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, cache_path); } - m_cache_server->send(session_id, req); + m_cache_server->send(session_id, reply); break; } default: diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc index c72305944fc..e4f76a7c806 100644 --- a/src/tools/immutable_object_cache/CacheSession.cc +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -94,6 +94,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, ObjectCacheRequest* req = decode_object_cache_request(bl_data); process(req); + delete req; read_request_header(); } @@ -105,7 +106,8 @@ void CacheSession::process(ObjectCacheRequest* req) { void CacheSession::send(ObjectCacheRequest* reply) { ldout(cct, 20) << dendl; bufferlist bl; - bl.append(reply->get_data_buffer()); + reply->encode(); + bl.append(reply->get_payload_bufferlist()); boost::asio::async_write(m_dm_socket, boost::asio::buffer(bl.c_str(), bl.length()), diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc index b1a00a5a21d..2e547b11b5e 100644 --- a/src/tools/immutable_object_cache/Types.cc +++ b/src/tools/immutable_object_cache/Types.cc @@ -11,207 +11,148 @@ namespace ceph { namespace immutable_obj_cache { -void ObjectCacheRegData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ENCODE_FINISH(bl); +ObjectCacheRequest::ObjectCacheRequest(){} +ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s) + : type(t), seq(s) {} +ObjectCacheRequest::~ObjectCacheRequest(){} + +void ObjectCacheRequest::encode() { + ENCODE_START(1, 1, m_payload); + ceph::encode(type, m_payload); + ceph::encode(seq, m_payload); + if (!payload_empty()) { + encode_payload(); + } + ENCODE_FINISH(m_payload); } -void ObjectCacheRegData::decode(bufferlist& bl) { +void ObjectCacheRequest::decode(bufferlist& bl) { auto i = bl.cbegin(); DECODE_START(1, i); ceph::decode(type, i); ceph::decode(seq, i); + if (!payload_empty()) { + decode_payload(i); + } DECODE_FINISH(i); } -void ObjectCacheRegReplyData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ENCODE_FINISH(bl); -} +ObjectCacheRegData::ObjectCacheRegData() {} +ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} -void ObjectCacheRegReplyData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); - DECODE_FINISH(i); -} +ObjectCacheRegData::~ObjectCacheRegData() {} + +void ObjectCacheRegData::encode_payload() {} + +void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) {} + +ObjectCacheRegReplyData::ObjectCacheRegReplyData() {} +ObjectCacheRegReplyData::ObjectCacheRegReplyData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {} + +void ObjectCacheRegReplyData::encode_payload() {} + +void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {} + +ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s, + uint64_t read_offset, uint64_t read_len, + uint64_t pool_id, uint64_t snap_id, + std::string oid, std::string pool_namespace) + : ObjectCacheRequest(t, s), m_read_offset(read_offset), + m_read_len(read_len), m_pool_id(pool_id), m_snap_id(snap_id), + m_oid(oid), m_pool_namespace(pool_namespace) +{} + +ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheReadData::~ObjectCacheReadData() {} -void ObjectCacheReadData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ceph::encode(m_read_offset, bl); - ceph::encode(m_read_len, bl); - ceph::encode(m_pool_id, bl); - ceph::encode(m_snap_id, bl); - ceph::encode(m_oid, bl); - ceph::encode(m_pool_namespace, bl); - ENCODE_FINISH(bl); +void ObjectCacheReadData::encode_payload() { + ceph::encode(m_read_offset, m_payload); + ceph::encode(m_read_len, m_payload); + ceph::encode(m_pool_id, m_payload); + ceph::encode(m_snap_id, m_payload); + ceph::encode(m_oid, m_payload); + ceph::encode(m_pool_namespace, m_payload); } -void ObjectCacheReadData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); +void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) { ceph::decode(m_read_offset, i); ceph::decode(m_read_len, i); ceph::decode(m_pool_id, i); ceph::decode(m_snap_id, i); ceph::decode(m_oid, i); ceph::decode(m_pool_namespace, i); - DECODE_FINISH(i); } -void ObjectCacheReadReplyData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ceph::encode(m_cache_path, bl); - ENCODE_FINISH(bl); +ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, string cache_path) + : ObjectCacheRequest(t, s), m_cache_path(cache_path) {} +ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheReadReplyData::~ObjectCacheReadReplyData() {} + +void ObjectCacheReadReplyData::encode_payload() { + ceph::encode(m_cache_path, m_payload); } -void ObjectCacheReadReplyData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); +void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) { ceph::decode(m_cache_path, i); - DECODE_FINISH(i); } -void ObjectCacheReadRadosData::encode(bufferlist& bl) { - ENCODE_START(1, 1, bl); - ceph::encode(type, bl); - ceph::encode(seq, bl); - ENCODE_FINISH(bl); -} +ObjectCacheReadRadosData::ObjectCacheReadRadosData() {} +ObjectCacheReadRadosData::ObjectCacheReadRadosData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} -void ObjectCacheReadRadosData::decode(bufferlist& bl) { - auto i = bl.cbegin(); - DECODE_START(1, i); - ceph::decode(type, i); - ceph::decode(seq, i); - DECODE_FINISH(i); -} +ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {} -uint8_t get_header_size() { - return 6; //uint8_t + uint8_t + uint32_t -} +void ObjectCacheReadRadosData::encode_payload() {} -struct encode_header{ - uint8_t v; - uint8_t c_v; - uint32_t len; -}__attribute__((packed)); +void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {} -uint32_t get_data_len(char* buf) { - encode_header* header = (encode_header*)buf; - return header->len; -} +ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) +{ + ObjectCacheRequest* req = nullptr; -uint16_t get_data_type(bufferlist buf) { uint16_t type; - auto i = buf.cbegin(); + uint64_t seq; + auto i = payload_buffer.cbegin(); DECODE_START(1, i); - decode(type, i); + ceph::decode(type, i); + ceph::decode(seq, i); DECODE_FINISH(i); - return type; -} - -bufferlist ObjectCacheRequest::get_data_buffer() { - return m_data_buffer; -} - -ObjectCacheRequest* encode_object_cache_request(void* m_data, uint16_t type) { - ObjectCacheRequest* req = new ObjectCacheRequest(); - - switch(type) { - case RBDSC_REGISTER: { - ObjectCacheRegData* data = (ObjectCacheRegData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_REGISTER_REPLY: { - ObjectCacheRegReplyData* data = (ObjectCacheRegReplyData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_READ: { - ObjectCacheReadData* data = (ObjectCacheReadData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_READ_RADOS: { - ObjectCacheReadRadosData* data = (ObjectCacheReadRadosData*)m_data; - data->encode(req->m_data_buffer); - break; - } - case RBDSC_READ_REPLY: { - ObjectCacheReadReplyData* data = (ObjectCacheReadReplyData*)m_data; - data->encode(req->m_data_buffer); - break; - } - default: - ceph_assert(0); - } - - req->type = type; - return req; -} - -ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) { - ObjectCacheRequest* req = new ObjectCacheRequest(); - uint16_t type = get_data_type(data_buffer); - uint64_t seq; switch(type) { case RBDSC_REGISTER: { - ObjectCacheRegData* data = new ObjectCacheRegData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheRegData(type, seq); break; } case RBDSC_READ: { - ObjectCacheReadData* data = new ObjectCacheReadData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheReadData(type, seq); break; } case RBDSC_REGISTER_REPLY: { - ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheRegReplyData(type, seq); break; } case RBDSC_READ_REPLY: { - ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheReadReplyData(type, seq); break; } case RBDSC_READ_RADOS: { - ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData(); - data->decode(data_buffer); - seq = data->seq; - req->m_data = data; + req = new ObjectCacheReadRadosData(type, seq); break; } default: ceph_assert(0); } - req->type = type; - req->seq = seq; + req->decode(payload_buffer); + return req; } diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index 4d03dfa1993..99b7b8ae97f 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -6,78 +6,119 @@ #include "include/encoding.h" #include "include/Context.h" +#include "SocketCommon.h" namespace ceph { namespace immutable_obj_cache { -class ObjectCacheRegData { +namespace { +struct HeaderHelper { + uint8_t v; + uint8_t c_v; + uint32_t len; +}__attribute__((packed)); + +inline uint8_t get_header_size() { + return sizeof(HeaderHelper); +} + +inline uint32_t get_data_len(char* buf) { + HeaderHelper* header = (HeaderHelper*)buf; + return header->len; +} +} + +class ObjectCacheRequest { public: uint16_t type; uint64_t seq; - void encode(bufferlist& bl); + bufferlist m_payload; + + GenContext<ObjectCacheRequest*>* m_process_msg; + + ObjectCacheRequest(); + ObjectCacheRequest(uint16_t type, uint64_t seq); + virtual ~ObjectCacheRequest(); + + // encode consists of two steps + // step 1 : directly encode common bits using encode method of base classs. + // step 2 : according to payload_empty, determine whether addtional bits need to + // be encoded which be implements by child class. + void encode(); void decode(bufferlist& bl); + bufferlist get_payload_bufferlist() { return m_payload; } + + virtual void encode_payload() = 0; + virtual void decode_payload(bufferlist::const_iterator bl_it) = 0; + virtual uint16_t get_request_type() = 0; + virtual bool payload_empty() = 0; }; -class ObjectCacheRegReplyData { +class ObjectCacheRegData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; + ObjectCacheRegData(); + ObjectCacheRegData(uint16_t t, uint64_t s); + ~ObjectCacheRegData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_REGISTER; } + bool payload_empty() override { return true; } +}; - void encode(bufferlist& bl); - void decode(bufferlist& bl); +class ObjectCacheRegReplyData : public ObjectCacheRequest { +public: + ObjectCacheRegReplyData(); + ObjectCacheRegReplyData(uint16_t t, uint64_t s); + ~ObjectCacheRegReplyData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator iter) override; + uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; } + bool payload_empty() override { return true; } }; -class ObjectCacheReadData { +class ObjectCacheReadData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; uint64_t m_read_offset; uint64_t m_read_len; uint64_t m_pool_id; uint64_t m_snap_id; std::string m_oid; std::string m_pool_namespace; - - void encode(bufferlist& bl); - void decode(bufferlist& bl); + ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, uint64_t read_len, uint64_t pool_id, + uint64_t snap_id, std::string oid, std::string pool_namespace ); + ObjectCacheReadData(uint16_t t, uint64_t s); + ~ObjectCacheReadData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_READ; } + bool payload_empty() override { return false; } }; -class ObjectCacheReadReplyData { +class ObjectCacheReadReplyData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; std::string m_cache_path; - - void encode(bufferlist& bl); - void decode(bufferlist& bl); + ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path); + ObjectCacheReadReplyData(uint16_t t, uint64_t s); + ~ObjectCacheReadReplyData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_READ_REPLY; } + bool payload_empty() override { return false; } }; -class ObjectCacheReadRadosData { +class ObjectCacheReadRadosData : public ObjectCacheRequest { public: - uint16_t type; - uint64_t seq; - - void encode(bufferlist& bl); - void decode(bufferlist& bl); + ObjectCacheReadRadosData(); + ObjectCacheReadRadosData(uint16_t t, uint64_t s); + ~ObjectCacheReadRadosData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl) override; + uint16_t get_request_type() override { return RBDSC_READ_RADOS; } + bool payload_empty() override { return true; } }; -class ObjectCacheRequest { -public: - uint64_t seq; - uint16_t type; - void* m_data; - bufferlist m_data_buffer; - GenContext<ObjectCacheRequest*>* m_process_msg; - - bufferlist get_data_buffer(); -}; - -uint8_t get_header_size(); -uint32_t get_data_len(char* buf); - -ObjectCacheRequest* encode_object_cache_request(void* data, uint16_t type); -ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer); +ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer); } // namespace immutable_obj_cache } // namespace ceph |