diff options
author | Yuan Zhou <yuan.zhou@intel.com> | 2019-02-13 15:24:58 +0100 |
---|---|---|
committer | Yuan Zhou <yuan.zhou@intel.com> | 2019-03-21 17:16:29 +0100 |
commit | ec48641f9e02b5ce24c1a88f65236a69ad81953a (patch) | |
tree | cc8ede003888fae70bf44961b46a6dab89d92d89 /src | |
parent | tools: cleanup IPC message for immutable obj cache daemon (diff) | |
download | ceph-ec48641f9e02b5ce24c1a88f65236a69ad81953a.tar.xz ceph-ec48641f9e02b5ce24c1a88f65236a69ad81953a.zip |
tools: use specific message for different ops in immutable obj cache daemon
use different types of message for different ops to be more efficient
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/test/immutable_object_cache/test_DomainSocket.cc | 21 | ||||
-rw-r--r-- | src/test/immutable_object_cache/test_message.cc | 46 | ||||
-rw-r--r-- | src/test/immutable_object_cache/test_multi_session.cc | 11 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/CacheClient.cc | 39 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/CacheController.cc | 33 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/CacheSession.cc | 3 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/Types.cc | 164 | ||||
-rw-r--r-- | src/tools/immutable_object_cache/Types.h | 49 |
8 files changed, 290 insertions, 76 deletions
diff --git a/src/test/immutable_object_cache/test_DomainSocket.cc b/src/test/immutable_object_cache/test_DomainSocket.cc index 0ed5b160304..17a442d13ff 100644 --- a/src/test/immutable_object_cache/test_DomainSocket.cc +++ b/src/test/immutable_object_cache/test_DomainSocket.cc @@ -79,17 +79,26 @@ public: void handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (req->m_data.type) { + switch (req->type) { case RBDSC_REGISTER: { - req->m_data.type = RBDSC_REGISTER_REPLY; + ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData(); + data->type = RBDSC_REGISTER_REPLY; + req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY); m_cache_server->send(session_id, req); break; } case RBDSC_READ: { - if (m_hit_entry_set.find(req->m_data.m_oid) == m_hit_entry_set.end()) { - req->m_data.type = RBDSC_READ_RADOS; + ObjectCacheReadData* req_data = (ObjectCacheReadData*)req->m_data; + if (m_hit_entry_set.find(req_data->m_oid) == m_hit_entry_set.end()) { + ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData(); + data->type = RBDSC_READ_RADOS; + data->seq = req_data->seq; + req = encode_object_cache_request(data, RBDSC_READ_RADOS); } else { - req->m_data.type = RBDSC_READ_REPLY; + ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData(); + data->type = RBDSC_READ_REPLY; + data->seq = req_data->seq; + req = encode_object_cache_request(data, RBDSC_READ_REPLY); } m_cache_server->send(session_id, req); break; @@ -130,7 +139,7 @@ public: bool hit; auto ctx = new LambdaGenContext<std::function<void(ObjectCacheRequest*)>, ObjectCacheRequest*>([this, &hit](ObjectCacheRequest* ack){ - hit = ack->m_data.type == RBDSC_READ_REPLY; + hit = ack->type == RBDSC_READ_REPLY; m_wait_event.signal(); }); m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, ctx); diff --git a/src/test/immutable_object_cache/test_message.cc b/src/test/immutable_object_cache/test_message.cc index 2d17f394025..fe27cec0efc 100644 --- a/src/test/immutable_object_cache/test_message.cc +++ b/src/test/immutable_object_cache/test_message.cc @@ -1,5 +1,6 @@ #include "gtest/gtest.h" #include "tools/immutable_object_cache/Types.h" +#include "tools/immutable_object_cache/SocketCommon.h" using namespace ceph::immutable_obj_cache; @@ -9,35 +10,40 @@ TEST(test_for_message, test_1) std::string oid_name("this is a oid name"); std::string cache_file_path("/temp/ceph_immutable_object_cache"); - ObjectCacheRequest req; + ObjectCacheReadData data; - req.m_data.seq = 1; - req.m_data.type = 2; - req.m_data.m_read_offset = 222222; - req.m_data.m_read_len = 333333; - req.m_data.m_pool_id = 444444; - req.m_data.m_snap_id = 555555; - req.m_data.m_oid = oid_name; - req.m_data.m_pool_namespace = pool_nspace; - req.m_data.m_cache_path = cache_file_path; + data.seq = 1UL; + data.type = RBDSC_READ; + data.m_read_offset = 222222; + data.m_read_len = 333333; + data.m_pool_id = 444444; + data.m_snap_id = 555555; + data.m_oid = oid_name; + data.m_pool_namespace = pool_nspace; // ObjectRequest --> bufferlist - req.encode(); + ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ); - - // bufferlist --> ObjectCacheRequest - auto data_bl = req.get_data_buffer(); + auto data_bl = req->get_data_buffer(); uint32_t data_len = get_data_len(data_bl.c_str()); ASSERT_EQ(data_bl.length(), data_len + get_header_size()); + ASSERT_TRUE(data_bl.c_str() != nullptr); + // bufferlist --> ObjectCacheRequest ObjectCacheRequest* req_decode = decode_object_cache_request(data_bl); + ObjectCacheReadData* reply_data = (ObjectCacheReadData*)(req_decode->m_data); + + ASSERT_EQ(req_decode->type, RBDSC_READ); - ASSERT_EQ(req_decode->m_data.seq, 1); - ASSERT_EQ(req_decode->m_data.m_read_offset, 222222); - ASSERT_EQ(req_decode->m_data.m_read_len, 333333); - ASSERT_EQ(req_decode->m_data.m_pool_namespace, pool_nspace); - ASSERT_EQ(req_decode->m_data.m_cache_path, cache_file_path); - ASSERT_EQ(req_decode->m_data.m_oid, oid_name); + ASSERT_EQ(reply_data->seq, 1UL); + ASSERT_EQ(reply_data->type, RBDSC_READ); + ASSERT_EQ(reply_data->m_read_offset, 222222UL); + ASSERT_EQ(reply_data->m_read_len, 333333UL); + ASSERT_EQ(reply_data->m_pool_id, 444444UL); + ASSERT_EQ(reply_data->m_snap_id, 555555UL); + ASSERT_EQ(reply_data->m_pool_namespace, pool_nspace); + ASSERT_EQ(reply_data->m_oid, oid_name); + delete req; delete req_decode; } diff --git a/src/test/immutable_object_cache/test_multi_session.cc b/src/test/immutable_object_cache/test_multi_session.cc index e0f6d953591..008a3ba6d05 100644 --- a/src/test/immutable_object_cache/test_multi_session.cc +++ b/src/test/immutable_object_cache/test_multi_session.cc @@ -87,14 +87,19 @@ public: void server_handle_request(uint64_t session_id, ObjectCacheRequest* req) { - switch (req->m_data.type) { + switch (req->type) { case RBDSC_REGISTER: { - req->m_data.type = RBDSC_REGISTER_REPLY; + ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData(); + data->type = RBDSC_REGISTER_REPLY; + req = encode_object_cache_request(data, RBDSC_REGISTER_REPLY); m_cache_server->send(session_id, req); break; } case RBDSC_READ: { - req->m_data.type = RBDSC_READ_REPLY; + ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData(); + data->type = RBDSC_READ_REPLY; + data->seq = req->seq; + req = encode_object_cache_request(data, RBDSC_READ_REPLY); m_cache_server->send(session_id, req); break; } diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index 67ce958f232..20d5426b37f 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -88,22 +88,22 @@ namespace immutable_obj_cache { void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, uint64_t snap_id, std::string oid, GenContext<ObjectCacheRequest*>* on_finish) { - ObjectCacheRequest* req = new ObjectCacheRequest(); - req->m_data.type = RBDSC_READ; - req->m_data.seq = ++m_sequence_id; - - req->m_data.m_pool_id = pool_id; - req->m_data.m_snap_id = snap_id; - req->m_data.m_pool_namespace = pool_nspace; - req->m_data.m_oid = oid; + ObjectCacheReadData data; + data.type = RBDSC_READ; + data.seq = ++m_sequence_id; + + data.m_pool_id = pool_id; + data.m_snap_id = snap_id; + data.m_pool_namespace = pool_nspace; + data.m_oid = oid; + ObjectCacheRequest* req = encode_object_cache_request(&data, RBDSC_READ); req->m_process_msg = on_finish; - req->encode(); { Mutex::Locker locker(m_lock); m_outcoming_bl.append(req->get_data_buffer()); - ceph_assert(m_seq_to_req.find(req->m_data.seq) == m_seq_to_req.end()); - m_seq_to_req[req->m_data.seq] = req; + ceph_assert(m_seq_to_req.find(data.seq) == m_seq_to_req.end()); + m_seq_to_req[data.seq] = req; } // try to send message to server. @@ -227,7 +227,7 @@ namespace immutable_obj_cache { data_buffer.clear(); ceph_assert(data_buffer.length() == 0); - process(reply, reply->m_data.seq); + process(reply, reply->seq); { Mutex::Locker locker(m_lock); @@ -331,7 +331,7 @@ namespace immutable_obj_cache { { Mutex::Locker locker(m_lock); for(auto it : m_seq_to_req) { - it.second->m_data.type = RBDSC_READ_RADOS; + it.second->type = RBDSC_READ_RADOS; it.second->m_process_msg->complete(it.second); } m_seq_to_req.clear(); @@ -343,13 +343,13 @@ namespace immutable_obj_cache { } int CacheClient::register_client(Context* on_finish) { - ObjectCacheRequest* message = new ObjectCacheRequest(); - message->m_data.seq = m_sequence_id++; - message->m_data.type = RBDSC_REGISTER; - message->encode(); + ObjectCacheRegData data; + data.seq = m_sequence_id++; + data.type = RBDSC_REGISTER; + ObjectCacheRequest* reg_req = encode_object_cache_request(&data, RBDSC_REGISTER); bufferlist bl; - bl.append(message->get_data_buffer()); + bl.append(reg_req->get_data_buffer()); uint64_t ret; boost::system::error_code ec; @@ -361,6 +361,7 @@ namespace immutable_obj_cache { fault(ASIO_ERROR_WRITE, ec); return -1; } + delete reg_req; ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec); @@ -382,7 +383,7 @@ namespace immutable_obj_cache { data_buffer.append(m_bp_header); data_buffer.append(std::move(bp_data)); ObjectCacheRequest* req = decode_object_cache_request(data_buffer); - if (req->m_data.type == RBDSC_REGISTER_REPLY) { + if (req->type == RBDSC_REGISTER_REPLY) { on_finish->complete(true); } else { on_finish->complete(false); diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc index e7b06c4d9f3..dd4ef3f3ed1 100644 --- a/src/tools/immutable_object_cache/CacheController.cc +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -83,28 +83,39 @@ void CacheController::run() { void CacheController::handle_request(uint64_t session_id, ObjectCacheRequest* req){ ldout(m_cct, 20) << dendl; - switch (req->m_data.type) { + switch (req->type) { case RBDSC_REGISTER: { // TODO(): skip register and allow clients to lookup directly - req->m_data.type = RBDSC_REGISTER_REPLY; - m_cache_server->send(session_id, req); + ObjectCacheRegReplyData data; + data.type = RBDSC_REGISTER_REPLY; + data.seq = req->seq; + req = encode_object_cache_request(&data, RBDSC_REGISTER_REPLY); + m_cache_server->send(session_id, req); break; } case RBDSC_READ: { // lookup object in local cache store - int ret = m_object_cache_store->lookup_object(req->m_data.m_pool_namespace, - req->m_data.m_pool_id, - req->m_data.m_snap_id, - req->m_data.m_oid, - req->m_data.m_cache_path); + ObjectCacheReadData* data = (ObjectCacheReadData*)(req->m_data); + std::string cache_path; + int ret = m_object_cache_store->lookup_object(data->m_pool_namespace, + data->m_pool_id, + data->m_snap_id, + data->m_oid, + cache_path); if (ret < 0) { - req->m_data.type = RBDSC_READ_RADOS; + ObjectCacheReadRadosData reply_data; + reply_data.type = RBDSC_READ_RADOS; + reply_data.seq = req->seq; + req = encode_object_cache_request(&reply_data, RBDSC_READ_RADOS); } else { - req->m_data.type = RBDSC_READ_REPLY; + ObjectCacheReadReplyData reply_data; + reply_data.m_cache_path = cache_path; + reply_data.type = RBDSC_READ_REPLY; + reply_data.seq = req->seq; + req = encode_object_cache_request(&reply_data, RBDSC_READ_REPLY); } m_cache_server->send(session_id, req); - break; } default: diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc index c188e8e0b61..c72305944fc 100644 --- a/src/tools/immutable_object_cache/CacheSession.cc +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -91,6 +91,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, bl_data.append(m_bp_header); bl_data.append(std::move(bp)); + ObjectCacheRequest* req = decode_object_cache_request(bl_data); process(req); read_request_header(); @@ -103,8 +104,6 @@ void CacheSession::process(ObjectCacheRequest* req) { void CacheSession::send(ObjectCacheRequest* reply) { ldout(cct, 20) << dendl; - reply->m_data_buffer.clear(); - reply->encode(); bufferlist bl; bl.append(reply->get_data_buffer()); diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc index bd904449113..b1a00a5a21d 100644 --- a/src/tools/immutable_object_cache/Types.cc +++ b/src/tools/immutable_object_cache/Types.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "Types.h" +#include "SocketCommon.h" #define dout_subsys ceph_subsys_immutable_obj_cache #undef dout_prefix @@ -10,42 +11,97 @@ namespace ceph { namespace immutable_obj_cache { -void ObjectCacheMsgData::encode(bufferlist& bl) { +void ObjectCacheRegData::encode(bufferlist& bl) { ENCODE_START(1, 1, bl); + ceph::encode(type, bl); ceph::encode(seq, bl); + ENCODE_FINISH(bl); +} + +void ObjectCacheRegData::decode(bufferlist& bl) { + auto i = bl.cbegin(); + DECODE_START(1, i); + ceph::decode(type, i); + ceph::decode(seq, i); + DECODE_FINISH(i); +} + +void ObjectCacheRegReplyData::encode(bufferlist& bl) { + ENCODE_START(1, 1, bl); ceph::encode(type, bl); + ceph::encode(seq, bl); + ENCODE_FINISH(bl); +} + +void ObjectCacheRegReplyData::decode(bufferlist& bl) { + auto i = bl.cbegin(); + DECODE_START(1, i); + ceph::decode(type, i); + ceph::decode(seq, i); + DECODE_FINISH(i); +} + +void ObjectCacheReadData::encode(bufferlist& bl) { + ENCODE_START(1, 1, bl); + ceph::encode(type, bl); + ceph::encode(seq, bl); ceph::encode(m_read_offset, bl); ceph::encode(m_read_len, bl); ceph::encode(m_pool_id, bl); ceph::encode(m_snap_id, bl); ceph::encode(m_oid, bl); ceph::encode(m_pool_namespace, bl); - ceph::encode(m_cache_path, bl); ENCODE_FINISH(bl); } -void ObjectCacheMsgData::decode(bufferlist& bl) { +void ObjectCacheReadData::decode(bufferlist& bl) { auto i = bl.cbegin(); DECODE_START(1, i); - ceph::decode(seq, i); ceph::decode(type, i); + ceph::decode(seq, i); ceph::decode(m_read_offset, i); ceph::decode(m_read_len, i); ceph::decode(m_pool_id, i); ceph::decode(m_snap_id, i); ceph::decode(m_oid, i); ceph::decode(m_pool_namespace, i); + DECODE_FINISH(i); +} + +void ObjectCacheReadReplyData::encode(bufferlist& bl) { + ENCODE_START(1, 1, bl); + ceph::encode(type, bl); + ceph::encode(seq, bl); + ceph::encode(m_cache_path, bl); + ENCODE_FINISH(bl); +} + +void ObjectCacheReadReplyData::decode(bufferlist& bl) { + auto i = bl.cbegin(); + DECODE_START(1, i); + ceph::decode(type, i); + ceph::decode(seq, i); ceph::decode(m_cache_path, i); DECODE_FINISH(i); } -void ObjectCacheRequest::encode() { - m_data.encode(m_data_buffer); +void ObjectCacheReadRadosData::encode(bufferlist& bl) { + ENCODE_START(1, 1, bl); + ceph::encode(type, bl); + ceph::encode(seq, bl); + ENCODE_FINISH(bl); +} + +void ObjectCacheReadRadosData::decode(bufferlist& bl) { + auto i = bl.cbegin(); + DECODE_START(1, i); + ceph::decode(type, i); + ceph::decode(seq, i); + DECODE_FINISH(i); } uint8_t get_header_size() { - //return sizeof(ObjectCacheMsgHeader); - return 6; + return 6; //uint8_t + uint8_t + uint32_t } struct encode_header{ @@ -59,13 +115,103 @@ uint32_t get_data_len(char* buf) { return header->len; } +uint16_t get_data_type(bufferlist buf) { + uint16_t type; + auto i = buf.cbegin(); + DECODE_START(1, i); + decode(type, i); + DECODE_FINISH(i); + return type; +} + bufferlist ObjectCacheRequest::get_data_buffer() { return m_data_buffer; } +ObjectCacheRequest* encode_object_cache_request(void* m_data, uint16_t type) { + ObjectCacheRequest* req = new ObjectCacheRequest(); + + switch(type) { + case RBDSC_REGISTER: { + ObjectCacheRegData* data = (ObjectCacheRegData*)m_data; + data->encode(req->m_data_buffer); + break; + } + case RBDSC_REGISTER_REPLY: { + ObjectCacheRegReplyData* data = (ObjectCacheRegReplyData*)m_data; + data->encode(req->m_data_buffer); + break; + } + case RBDSC_READ: { + ObjectCacheReadData* data = (ObjectCacheReadData*)m_data; + data->encode(req->m_data_buffer); + break; + } + case RBDSC_READ_RADOS: { + ObjectCacheReadRadosData* data = (ObjectCacheReadRadosData*)m_data; + data->encode(req->m_data_buffer); + break; + } + case RBDSC_READ_REPLY: { + ObjectCacheReadReplyData* data = (ObjectCacheReadReplyData*)m_data; + data->encode(req->m_data_buffer); + break; + } + default: + ceph_assert(0); + } + + req->type = type; + return req; +} + ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer) { ObjectCacheRequest* req = new ObjectCacheRequest(); - req->m_data.decode(data_buffer); + uint16_t type = get_data_type(data_buffer); + uint64_t seq; + + switch(type) { + case RBDSC_REGISTER: { + ObjectCacheRegData* data = new ObjectCacheRegData(); + data->decode(data_buffer); + seq = data->seq; + req->m_data = data; + break; + } + case RBDSC_READ: { + ObjectCacheReadData* data = new ObjectCacheReadData(); + data->decode(data_buffer); + seq = data->seq; + req->m_data = data; + break; + } + case RBDSC_REGISTER_REPLY: { + ObjectCacheRegReplyData* data = new ObjectCacheRegReplyData(); + data->decode(data_buffer); + seq = data->seq; + req->m_data = data; + break; + } + case RBDSC_READ_REPLY: { + ObjectCacheReadReplyData* data = new ObjectCacheReadReplyData(); + data->decode(data_buffer); + seq = data->seq; + req->m_data = data; + break; + } + case RBDSC_READ_RADOS: { + ObjectCacheReadRadosData* data = new ObjectCacheReadRadosData(); + data->decode(data_buffer); + seq = data->seq; + req->m_data = data; + break; + } + default: + ceph_assert(0); + } + + req->type = type; + req->seq = seq; return req; } diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h index ef487e4f215..4d03dfa1993 100644 --- a/src/tools/immutable_object_cache/Types.h +++ b/src/tools/immutable_object_cache/Types.h @@ -10,36 +10,73 @@ namespace ceph { namespace immutable_obj_cache { -class ObjectCacheMsgData { +class ObjectCacheRegData { public: - uint64_t seq; /* sequence id */ - uint16_t type; /* msg type */ + uint16_t type; + uint64_t seq; + + void encode(bufferlist& bl); + void decode(bufferlist& bl); +}; + +class ObjectCacheRegReplyData { +public: + uint16_t type; + uint64_t seq; + + void encode(bufferlist& bl); + void decode(bufferlist& bl); +}; + +class ObjectCacheReadData { +public: + uint16_t type; + uint64_t seq; uint64_t m_read_offset; uint64_t m_read_len; uint64_t m_pool_id; uint64_t m_snap_id; std::string m_oid; std::string m_pool_namespace; + + void encode(bufferlist& bl); + void decode(bufferlist& bl); +}; + +class ObjectCacheReadReplyData { +public: + uint16_t type; + uint64_t seq; std::string m_cache_path; void encode(bufferlist& bl); void decode(bufferlist& bl); }; +class ObjectCacheReadRadosData { +public: + uint16_t type; + uint64_t seq; + + void encode(bufferlist& bl); + void decode(bufferlist& bl); +}; + class ObjectCacheRequest { public: - ObjectCacheMsgData m_data; + uint64_t seq; + uint16_t type; + void* m_data; bufferlist m_data_buffer; GenContext<ObjectCacheRequest*>* m_process_msg; - void encode(); bufferlist get_data_buffer(); - }; uint8_t get_header_size(); uint32_t get_data_len(char* buf); +ObjectCacheRequest* encode_object_cache_request(void* data, uint16_t type); ObjectCacheRequest* decode_object_cache_request(bufferlist data_buffer); } // namespace immutable_obj_cache |