summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache
diff options
context:
space:
mode:
authorshangdehao1 <dehao.shang@intel.com>2019-05-31 03:47:31 +0200
committerJason Dillaman <dillaman@redhat.com>2019-06-24 23:35:40 +0200
commit9b89c24a610708c15f93e520ec3228abe966227e (patch)
treeee472a2cbd86c4ecd6387f310fcae1df6504b024 /src/tools/immutable_object_cache
parenttools: modify connect implement of RO cache (diff)
downloadceph-9b89c24a610708c15f93e520ec3228abe966227e.tar.xz
ceph-9b89c24a610708c15f93e520ec3228abe966227e.zip
librbd: add re-connect and infligh enable feature
- reconnect : when session don't work or RO daemon crash, parent cache will try to re-connect daemon. - in-fligh enable RO daemon. Signed-off-by: Dehao Shang <dehao.shang@intel.com>
Diffstat (limited to 'src/tools/immutable_object_cache')
-rw-r--r--src/tools/immutable_object_cache/CacheClient.cc22
-rw-r--r--src/tools/immutable_object_cache/CacheSession.cc12
-rw-r--r--src/tools/immutable_object_cache/CacheSession.h2
3 files changed, 24 insertions, 12 deletions
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc
index bba3820f488..b50491ce8dd 100644
--- a/src/tools/immutable_object_cache/CacheClient.cc
+++ b/src/tools/immutable_object_cache/CacheClient.cc
@@ -98,7 +98,8 @@ namespace immutable_obj_cache {
void CacheClient::handle_connect(Context* on_finish,
const boost::system::error_code& err) {
if (err) {
- ldout(m_cct, 20) << "fails to connect to cache server." << dendl;
+ ldout(m_cct, 20) << "fails to connect to cache server. error : "
+ << err.message() << dendl;
fault(ASIO_ERROR_CONNECT, err);
on_finish->complete(-1);
return;
@@ -111,6 +112,7 @@ namespace immutable_obj_cache {
void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
uint64_t snap_id, std::string oid,
CacheGenContextURef&& on_finish) {
+ ldout(m_cct, 20) << dendl;
ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
++m_sequence_id, 0, 0,
pool_id, snap_id, oid, pool_nspace);
@@ -132,6 +134,7 @@ namespace immutable_obj_cache {
}
void CacheClient::try_send() {
+ ldout(m_cct, 20) << dendl;
if (!m_writing.load()) {
m_writing.store(true);
send_message();
@@ -139,6 +142,7 @@ namespace immutable_obj_cache {
}
void CacheClient::send_message() {
+ ldout(m_cct, 20) << dendl;
bufferlist bl;
{
Mutex::Locker locker(m_lock);
@@ -155,6 +159,7 @@ namespace immutable_obj_cache {
fault(ASIO_ERROR_WRITE, err);
return;
}
+
ceph_assert(cb == bl.length());
{
@@ -172,6 +177,7 @@ namespace immutable_obj_cache {
}
void CacheClient::try_receive() {
+ ldout(m_cct, 20) << dendl;
if (!m_reading.load()) {
m_reading.store(true);
receive_message();
@@ -179,11 +185,13 @@ namespace immutable_obj_cache {
}
void CacheClient::receive_message() {
+ ldout(m_cct, 20) << dendl;
ceph_assert(m_reading.load());
read_reply_header();
}
void CacheClient::read_reply_header() {
+ ldout(m_cct, 20) << dendl;
/* create new head buffer for every reply */
bufferptr bp_head(buffer::create(get_header_size()));
auto raw_ptr = bp_head.c_str();
@@ -200,6 +208,7 @@ namespace immutable_obj_cache {
void CacheClient::handle_reply_header(bufferptr bp_head,
const boost::system::error_code& ec,
size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
if (ec || bytes_transferred != get_header_size()) {
fault(ASIO_ERROR_READ, ec);
return;
@@ -216,6 +225,7 @@ namespace immutable_obj_cache {
void CacheClient::read_reply_data(bufferptr&& bp_head,
bufferptr&& bp_data,
const uint64_t data_len) {
+ ldout(m_cct, 20) << dendl;
auto raw_ptr = bp_data.c_str();
boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
boost::asio::transfer_exactly(data_len),
@@ -230,6 +240,7 @@ namespace immutable_obj_cache {
const uint64_t data_len,
const boost::system::error_code& ec,
size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
if (ec || bytes_transferred != data_len) {
fault(ASIO_ERROR_WRITE, ec);
return;
@@ -259,6 +270,7 @@ namespace immutable_obj_cache {
}
void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
+ ldout(m_cct, 20) << dendl;
ObjectCacheRequest* current_request = nullptr;
{
Mutex::Locker locker(m_lock);
@@ -361,6 +373,7 @@ namespace immutable_obj_cache {
<< ec.message() << dendl;
}
+ // TODO : re-implement this method
int CacheClient::register_client(Context* on_finish) {
ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
m_sequence_id++);
@@ -403,14 +416,13 @@ namespace immutable_obj_cache {
data_buffer.append(std::move(bp_data));
ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
if (req->type == RBDSC_REGISTER_REPLY) {
- on_finish->complete(true);
+ m_session_work.store(true);
+ on_finish->complete(0);
} else {
- on_finish->complete(false);
+ on_finish->complete(-1);
}
delete req;
- m_session_work.store(true);
-
return 0;
}
diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc
index c5ec0342878..5bf730354ab 100644
--- a/src/tools/immutable_object_cache/CacheSession.cc
+++ b/src/tools/immutable_object_cache/CacheSession.cc
@@ -59,7 +59,7 @@ void CacheSession::handle_request_header(const boost::system::error_code& err,
size_t bytes_transferred) {
ldout(m_cct, 20) << dendl;
if (err || bytes_transferred != get_header_size()) {
- fault();
+ fault(err);
return;
}
@@ -83,7 +83,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
size_t bytes_transferred) {
ldout(m_cct, 20) << dendl;
if (err || bytes_transferred != data_len) {
- fault();
+ fault(err);
return;
}
@@ -93,6 +93,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
bl_data.append(std::move(bp));
ObjectCacheRequest* req = decode_object_cache_request(bl_data);
+
process(req);
delete req;
read_request_header();
@@ -116,15 +117,14 @@ void CacheSession::send(ObjectCacheRequest* reply) {
size_t bytes_transferred) {
delete reply;
if (err || bytes_transferred != bl.length()) {
- fault();
+ fault(err);
return;
}
});
}
-void CacheSession::fault() {
- ldout(m_cct, 20) << dendl;
- // TODO(dehao)
+void CacheSession::fault(const boost::system::error_code& ec) {
+ ldout(m_cct, 20) << "session fault : " << ec.message() << dendl;
}
} // namespace immutable_obj_cache
diff --git a/src/tools/immutable_object_cache/CacheSession.h b/src/tools/immutable_object_cache/CacheSession.h
index 575dcf9221d..d530b529c8c 100644
--- a/src/tools/immutable_object_cache/CacheSession.h
+++ b/src/tools/immutable_object_cache/CacheSession.h
@@ -33,7 +33,7 @@ class CacheSession : public std::enable_shared_from_this<CacheSession> {
const boost::system::error_code& err,
size_t bytes_transferred);
void process(ObjectCacheRequest* req);
- void fault();
+ void fault(const boost::system::error_code& ec);
void send(ObjectCacheRequest* msg);
private: