summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache/CacheServer.cc
blob: d6fecb079d4e306f5d61ecdc9bb28e09bd846539 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include <boost/bind.hpp>
#include "common/debug.h"
#include "common/ceph_context.h"
#include "CacheServer.h"

#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_immutable_obj_cache
#undef dout_prefix
#define dout_prefix *_dout << "ceph::cache::CacheServer: " << this << " " \
                           << __func__ << ": "


namespace ceph {
namespace immutable_obj_cache {

CacheServer::CacheServer(CephContext* cct, const std::string& file,
                         ProcessMsg processmsg)
  : cct(cct), m_server_process_msg(processmsg),
    m_local_path(file), m_acceptor(m_io_service) {}

CacheServer::~CacheServer() {
  stop();
}

int CacheServer::run() {
  ldout(cct, 20) << dendl;

  int ret = start_accept();
  if (ret != 0) {
    return ret;
  }

  boost::system::error_code ec;
  ret = m_io_service.run(ec);
  if (ec) {
    ldout(cct, 1) << "m_io_service run fails: " << ec.message() << dendl;
    return -1;
  }
  return 0;
}

int CacheServer::stop() {
  m_io_service.stop();
  return 0;
}

int CacheServer::start_accept() {
  ldout(cct, 20) << dendl;

  boost::system::error_code ec;
  m_acceptor.open(m_local_path.protocol(), ec);
  if (ec) {
    ldout(cct, 1) << "m_acceptor open fails: " << ec.message() << dendl;
    return -1;
  }

  m_acceptor.bind(m_local_path, ec);
  if (ec) {
    ldout(cct, 1) << "m_acceptor bind fails: " << ec.message() << dendl;
    return -1;
  }

  m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
  if (ec) {
    ldout(cct, 1) << "m_acceptor listen fails: " << ec.message() << dendl;
    return -1;
  }

  accept();
  return 0;
}

void CacheServer::accept() {
  CacheSessionPtr new_session = nullptr;

  new_session.reset(new CacheSession(m_session_id, m_io_service,
                    m_server_process_msg, cct));

  m_acceptor.async_accept(new_session->socket(),
      boost::bind(&CacheServer::handle_accept, this, new_session,
        boost::asio::placeholders::error));
}

void CacheServer::handle_accept(CacheSessionPtr new_session,
                                const boost::system::error_code& error) {
  ldout(cct, 20) << dendl;
  if (error) {
    // operation_absort
    lderr(cct) << "async accept fails : " << error.message() << dendl;
    return;
  }

  m_session_map.emplace(m_session_id, new_session);
  // TODO(dehao) : session setting
  new_session->start();
  m_session_id++;

  // lanuch next accept
  accept();
}

}  // namespace immutable_obj_cache
}  // namespace ceph