summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAdam C. Emerson <aemerson@redhat.com>2020-03-07 10:29:23 +0100
committerAdam C. Emerson <aemerson@redhat.com>2020-03-07 10:29:23 +0100
commited3ec4c01d1715f874f773617601e925c94e8320 (patch)
tree1feb74eabc12d9144b98754cb6caa31e2d5d5f2c
parentosd: Build target 'common' without using namespace in headers (diff)
downloadceph-ed3ec4c01d1715f874f773617601e925c94e8320.tar.xz
ceph-ed3ec4c01d1715f874f773617601e925c94e8320.zip
msg: Build target 'common' without using namespace in headers
Part of a changeset to allow building all of 'common' without relying on 'using namespace std' or 'using namespace ceph' at toplevel in headers. Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
-rw-r--r--src/msg/DispatchQueue.cc8
-rw-r--r--src/msg/DispatchQueue.h46
-rw-r--r--src/msg/Message.cc22
-rw-r--r--src/msg/Messenger.cc6
-rw-r--r--src/msg/Messenger.h12
-rw-r--r--src/msg/QueueStrategy.cc6
-rw-r--r--src/msg/async/AsyncConnection.cc8
-rw-r--r--src/msg/async/AsyncConnection.h8
-rw-r--r--src/msg/async/AsyncMessenger.cc14
-rw-r--r--src/msg/async/AsyncMessenger.h16
-rw-r--r--src/msg/async/Event.cc9
-rw-r--r--src/msg/async/Event.h14
-rw-r--r--src/msg/async/EventEpoll.cc2
-rw-r--r--src/msg/async/EventEpoll.h2
-rw-r--r--src/msg/async/EventSelect.cc2
-rw-r--r--src/msg/async/EventSelect.h2
-rw-r--r--src/msg/async/PosixStack.cc15
-rw-r--r--src/msg/async/PosixStack.h6
-rw-r--r--src/msg/async/Protocol.h2
-rw-r--r--src/msg/async/ProtocolV1.cc86
-rw-r--r--src/msg/async/ProtocolV1.h30
-rw-r--r--src/msg/async/ProtocolV2.cc35
-rw-r--r--src/msg/async/ProtocolV2.h6
-rw-r--r--src/msg/async/Stack.cc7
-rw-r--r--src/msg/async/Stack.h12
-rw-r--r--src/msg/async/frames_v2.h4
-rw-r--r--src/msg/async/rdma/Infiniband.cc4
-rw-r--r--src/msg/async/rdma/Infiniband.h2
-rw-r--r--src/msg/async/rdma/RDMAConnectedSocketImpl.cc12
-rw-r--r--src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc4
-rw-r--r--src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc4
-rw-r--r--src/msg/async/rdma/RDMAServerSocketImpl.cc4
-rw-r--r--src/msg/async/rdma/RDMAStack.cc8
-rw-r--r--src/msg/async/rdma/RDMAStack.h51
-rw-r--r--src/msg/msg_types.cc33
-rw-r--r--src/msg/msg_types.h4
36 files changed, 259 insertions, 247 deletions
diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc
index 5a081591a59..b8ed6f7efe8 100644
--- a/src/msg/DispatchQueue.cc
+++ b/src/msg/DispatchQueue.cc
@@ -20,6 +20,8 @@
#define dout_subsys ceph_subsys_ms
#include "common/debug.h"
+using ceph::cref_t;
+using ceph::ref_t;
/*******************
* DispatchQueue
@@ -213,11 +215,9 @@ void DispatchQueue::entry()
void DispatchQueue::discard_queue(uint64_t id) {
std::lock_guard l{lock};
- list<QueueItem> removed;
+ std::list<QueueItem> removed;
mqueue.remove_by_class(id, &removed);
- for (list<QueueItem>::iterator i = removed.begin();
- i != removed.end();
- ++i) {
+ for (auto i = removed.begin(); i != removed.end(); ++i) {
ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
const ref_t<Message>& m = i->get_message();
remove_arrival(m);
diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h
index 243de2cba02..de0cb7d1a08 100644
--- a/src/msg/DispatchQueue.h
+++ b/src/msg/DispatchQueue.h
@@ -41,9 +41,9 @@ class DispatchQueue {
class QueueItem {
int type;
ConnectionRef con;
- ref_t<Message> m;
+ ceph::ref_t<Message> m;
public:
- explicit QueueItem(const ref_t<Message>& m) : type(-1), con(0), m(m) {}
+ explicit QueueItem(const ceph::ref_t<Message>& m) : type(-1), con(0), m(m) {}
QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
bool is_code() const {
return type != -1;
@@ -52,7 +52,7 @@ class DispatchQueue {
ceph_assert(is_code());
return type;
}
- const ref_t<Message>& get_message() {
+ const ceph::ref_t<Message>& get_message() {
ceph_assert(!is_code());
return m;
}
@@ -61,7 +61,7 @@ class DispatchQueue {
return con.get();
}
};
-
+
CephContext *cct;
Messenger *msgr;
mutable ceph::mutex lock;
@@ -69,17 +69,17 @@ class DispatchQueue {
PrioritizedQueue<QueueItem, uint64_t> mqueue;
- std::set<pair<double, ref_t<Message>>> marrival;
- map<ref_t<Message>, decltype(marrival)::iterator> marrival_map;
- void add_arrival(const ref_t<Message>& m) {
+ std::set<std::pair<double, ceph::ref_t<Message>>> marrival;
+ std::map<ceph::ref_t<Message>, decltype(marrival)::iterator> marrival_map;
+ void add_arrival(const ceph::ref_t<Message>& m) {
marrival_map.insert(
make_pair(
m,
- marrival.insert(make_pair(m->get_recv_stamp(), m)).first
+ marrival.insert(std::make_pair(m->get_recv_stamp(), m)).first
)
);
}
- void remove_arrival(const ref_t<Message>& m) {
+ void remove_arrival(const ceph::ref_t<Message>& m) {
auto it = marrival_map.find(m);
ceph_assert(it != marrival_map.end());
marrival.erase(it->second);
@@ -87,7 +87,7 @@ class DispatchQueue {
}
std::atomic<uint64_t> next_id;
-
+
enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
/**
@@ -106,7 +106,7 @@ class DispatchQueue {
ceph::mutex local_delivery_lock;
ceph::condition_variable local_delivery_cond;
bool stop_local_delivery;
- std::queue<pair<ref_t<Message>, int>> local_messages;
+ std::queue<std::pair<ceph::ref_t<Message>, int>> local_messages;
class LocalDeliveryThread : public Thread {
DispatchQueue *dq;
public:
@@ -117,8 +117,8 @@ class DispatchQueue {
}
} local_delivery_thread;
- uint64_t pre_dispatch(const ref_t<Message>& m);
- void post_dispatch(const ref_t<Message>& m, uint64_t msize);
+ uint64_t pre_dispatch(const ceph::ref_t<Message>& m);
+ void post_dispatch(const ceph::ref_t<Message>& m, uint64_t msize);
public:
@@ -126,9 +126,9 @@ class DispatchQueue {
Throttle dispatch_throttler;
bool stop;
- void local_delivery(const ref_t<Message>& m, int priority);
+ void local_delivery(const ceph::ref_t<Message>& m, int priority);
void local_delivery(Message* m, int priority) {
- return local_delivery(ref_t<Message>(m, false), priority); /* consume ref */
+ return local_delivery(ceph::ref_t<Message>(m, false), priority); /* consume ref */
}
void run_local_delivery();
@@ -197,15 +197,15 @@ class DispatchQueue {
cond.notify_all();
}
- bool can_fast_dispatch(const cref_t<Message> &m) const;
- void fast_dispatch(const ref_t<Message>& m);
+ bool can_fast_dispatch(const ceph::cref_t<Message> &m) const;
+ void fast_dispatch(const ceph::ref_t<Message>& m);
void fast_dispatch(Message* m) {
- return fast_dispatch(ref_t<Message>(m, false)); /* consume ref */
+ return fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
}
- void fast_preprocess(const ref_t<Message>& m);
- void enqueue(const ref_t<Message>& m, int priority, uint64_t id);
+ void fast_preprocess(const ceph::ref_t<Message>& m);
+ void enqueue(const ceph::ref_t<Message>& m, int priority, uint64_t id);
void enqueue(Message* m, int priority, uint64_t id) {
- return enqueue(ref_t<Message>(m, false), priority, id); /* consume ref */
+ return enqueue(ceph::ref_t<Message>(m, false), priority, id); /* consume ref */
}
void discard_queue(uint64_t id);
void discard_local();
@@ -218,7 +218,7 @@ class DispatchQueue {
void shutdown();
bool is_started() const {return dispatch_thread.is_started();}
- DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
+ DispatchQueue(CephContext *cct, Messenger *msgr, std::string &name)
: cct(cct), msgr(msgr),
lock(ceph::make_mutex("Messenger::DispatchQueue::lock" + name)),
mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
@@ -228,7 +228,7 @@ class DispatchQueue {
local_delivery_lock(ceph::make_mutex("Messenger::DispatchQueue::local_delivery_lock" + name)),
stop_local_delivery(false),
local_delivery_thread(this),
- dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
+ dispatch_throttler(cct, std::string("msgr_dispatch_throttler-") + name,
cct->_conf->ms_dispatch_throttle_bytes),
stop(false)
{}
diff --git a/src/msg/Message.cc b/src/msg/Message.cc
index c57bc3bc1bd..27ef95d9db0 100644
--- a/src/msg/Message.cc
+++ b/src/msg/Message.cc
@@ -288,9 +288,9 @@ void Message::encode(uint64_t features, int crcflags, bool skip_header_crc)
}
}
-void Message::dump(Formatter *f) const
+void Message::dump(ceph::Formatter *f) const
{
- stringstream ss;
+ std::stringstream ss;
print(ss);
f->dump_string("summary", ss.str());
}
@@ -347,12 +347,14 @@ Message *decode_message(CephContext *cct,
}
// make message
- ref_t<Message> m;
+ ceph::ref_t<Message> m;
int type = header.type;
switch (type) {
// -- with payload --
+ using ceph::make_message;
+
case MSG_PGSTATS:
m = make_message<MPGStats>();
break;
@@ -929,7 +931,7 @@ Message *decode_message(CephContext *cct,
try {
m->decode_payload();
}
- catch (const buffer::error &e) {
+ catch (const ceph::buffer::error &e) {
if (cct) {
lderr(cct) << "failed to decode message of type " << type
<< " v" << header.version
@@ -948,7 +950,7 @@ Message *decode_message(CephContext *cct,
return m.detach();
}
-void Message::encode_trace(bufferlist &bl, uint64_t features) const
+void Message::encode_trace(ceph::bufferlist &bl, uint64_t features) const
{
using ceph::encode;
auto p = trace.get_info();
@@ -959,7 +961,7 @@ void Message::encode_trace(bufferlist &bl, uint64_t features) const
encode(*p, bl);
}
-void Message::decode_trace(bufferlist::const_iterator &p, bool create)
+void Message::decode_trace(ceph::bufferlist::const_iterator &p, bool create)
{
blkin_trace_info info = {};
decode(info, p);
@@ -992,7 +994,7 @@ void Message::decode_trace(bufferlist::const_iterator &p, bool create)
// problems, we currently always encode and decode using the old footer format that doesn't
// allow for message authentication. Eventually we should fix that. PLR
-void encode_message(Message *msg, uint64_t features, bufferlist& payload)
+void encode_message(Message *msg, uint64_t features, ceph::bufferlist& payload)
{
ceph_msg_footer_old old_footer;
msg->encode(features, MSG_CRC_ALL);
@@ -1006,6 +1008,7 @@ void encode_message(Message *msg, uint64_t features, bufferlist& payload)
old_footer.flags = footer.flags;
encode(old_footer, payload);
+ using ceph::encode;
encode(msg->get_payload(), payload);
encode(msg->get_middle(), payload);
encode(msg->get_data(), payload);
@@ -1016,12 +1019,12 @@ void encode_message(Message *msg, uint64_t features, bufferlist& payload)
// We've slipped in a 0 signature at this point, so any signature checking after this will
// fail. PLR
-Message *decode_message(CephContext *cct, int crcflags, bufferlist::const_iterator& p)
+Message *decode_message(CephContext *cct, int crcflags, ceph::bufferlist::const_iterator& p)
{
ceph_msg_header h;
ceph_msg_footer_old fo;
ceph_msg_footer f;
- bufferlist fr, mi, da;
+ ceph::bufferlist fr, mi, da;
decode(h, p);
decode(fo, p);
f.front_crc = fo.front_crc;
@@ -1029,6 +1032,7 @@ Message *decode_message(CephContext *cct, int crcflags, bufferlist::const_iterat
f.data_crc = fo.data_crc;
f.flags = fo.flags;
f.sig = 0;
+ using ceph::decode;
decode(fr, p);
decode(mi, p);
decode(da, p);
diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc
index 4e2ea96ebe2..464ba394666 100644
--- a/src/msg/Messenger.cc
+++ b/src/msg/Messenger.cc
@@ -10,7 +10,7 @@
#include "msg/async/AsyncMessenger.h"
-Messenger *Messenger::create_client_messenger(CephContext *cct, string lname)
+Messenger *Messenger::create_client_messenger(CephContext *cct, std::string lname)
{
std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type;
auto nonce = get_random_nonce();
@@ -33,8 +33,8 @@ uint64_t Messenger::get_random_nonce()
return ceph::util::generate_random_number<uint64_t>();
}
-Messenger *Messenger::create(CephContext *cct, const string &type,
- entity_name_t name, string lname,
+Messenger *Messenger::create(CephContext *cct, const std::string &type,
+ entity_name_t name, std::string lname,
uint64_t nonce, uint64_t cflags)
{
int r = -1;
diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h
index c00374c114d..5ed44407c17 100644
--- a/src/msg/Messenger.h
+++ b/src/msg/Messenger.h
@@ -648,7 +648,7 @@ public:
*
* @param m The Message we are testing.
*/
- bool ms_can_fast_dispatch(const cref_t<Message>& m) {
+ bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
for (const auto &dispatcher : fast_dispatchers) {
if (dispatcher->ms_can_fast_dispatch2(m))
return true;
@@ -662,7 +662,7 @@ public:
* @param m The Message we are fast dispatching.
* If none of our Dispatchers can handle it, ceph_abort().
*/
- void ms_fast_dispatch(const ref_t<Message> &m) {
+ void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
for (const auto &dispatcher : fast_dispatchers) {
if (dispatcher->ms_can_fast_dispatch2(m)) {
@@ -673,12 +673,12 @@ public:
ceph_abort();
}
void ms_fast_dispatch(Message *m) {
- return ms_fast_dispatch(ref_t<Message>(m, false)); /* consume ref */
+ return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
}
/**
*
*/
- void ms_fast_preprocess(const ref_t<Message> &m) {
+ void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
for (const auto &dispatcher : fast_dispatchers) {
dispatcher->ms_fast_preprocess2(m);
}
@@ -690,7 +690,7 @@ public:
*
* @param m The Message to deliver.
*/
- void ms_deliver_dispatch(const ref_t<Message> &m) {
+ void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
for (const auto &dispatcher : dispatchers) {
if (dispatcher->ms_dispatch2(m))
@@ -701,7 +701,7 @@ public:
ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
}
void ms_deliver_dispatch(Message *m) {
- return ms_deliver_dispatch(ref_t<Message>(m, false)); /* consume ref */
+ return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
}
/**
* Notify each Dispatcher of a new Connection. Call
diff --git a/src/msg/QueueStrategy.cc b/src/msg/QueueStrategy.cc
index 85b0a11e602..342494c5a7a 100644
--- a/src/msg/QueueStrategy.cc
+++ b/src/msg/QueueStrategy.cc
@@ -44,11 +44,11 @@ void QueueStrategy::ds_dispatch(Message *m) {
void QueueStrategy::entry(QSThread *thrd)
{
for (;;) {
- ref_t<Message> m;
+ ceph::ref_t<Message> m;
std::unique_lock l{lock};
for (;;) {
if (! mqueue.empty()) {
- m = ref_t<Message>(&mqueue.front(), false);
+ m = ceph::ref_t<Message>(&mqueue.front(), false);
mqueue.pop_front();
break;
}
@@ -98,7 +98,7 @@ void QueueStrategy::start()
std::lock_guard l{lock};
threads.reserve(n_threads);
for (int ix = 0; ix < n_threads; ++ix) {
- string thread_name = "ms_qs_";
+ std::string thread_name = "ms_qs_";
thread_name.append(std::to_string(ix));
auto thrd = std::make_unique<QSThread>(this);
thrd->create(thread_name.c_str());
diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc
index 82ef1389734..b0adff91946 100644
--- a/src/msg/async/AsyncConnection.cc
+++ b/src/msg/async/AsyncConnection.cc
@@ -35,7 +35,7 @@
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix _conn_prefix(_dout)
-ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
+std::ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
<< *peer_addrs << " conn(" << this
<< (msgr2 ? " msgr2=" : " legacy=")
@@ -168,8 +168,8 @@ void AsyncConnection::maybe_start_delay_thread()
if (!delay_state) {
async_msgr->cct->_conf.with_val<std::string>(
"ms_inject_delay_type",
- [this](const string& s) {
- if (s.find(ceph_entity_type_name(peer_type)) != string::npos) {
+ [this](const std::string& s) {
+ if (s.find(ceph_entity_type_name(peer_type)) != std::string::npos) {
ldout(msgr->cct, 1) << __func__ << " setting up a delay queue"
<< dendl;
delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue,
@@ -300,7 +300,7 @@ ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
return nread;
}
-ssize_t AsyncConnection::write(bufferlist &bl,
+ssize_t AsyncConnection::write(ceph::buffer::list &bl,
std::function<void(ssize_t)> callback,
bool more) {
diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h
index 20d095dfafb..122a6c4089e 100644
--- a/src/msg/async/AsyncConnection.h
+++ b/src/msg/async/AsyncConnection.h
@@ -56,7 +56,7 @@ class AsyncConnection : public Connection {
ssize_t read_until(unsigned needed, char *p);
ssize_t read_bulk(char *buf, unsigned len);
- ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback,
+ ssize_t write(ceph::buffer::list &bl, std::function<void(ssize_t)> callback,
bool more=false);
ssize_t _try_send(bool more=false);
@@ -114,7 +114,7 @@ private:
public:
void maybe_start_delay_thread();
- ostream& _conn_prefix(std::ostream *_dout);
+ std::ostream& _conn_prefix(std::ostream *_dout);
bool is_connected() override;
@@ -182,7 +182,7 @@ private:
DispatchQueue *dispatch_queue;
// lockfree, only used in own thread
- bufferlist outgoing_bl;
+ ceph::buffer::list outgoing_bl;
bool open_write = false;
std::mutex write_lock;
@@ -197,7 +197,7 @@ private:
uint32_t recv_max_prefetch;
uint32_t recv_start;
uint32_t recv_end;
- set<uint64_t> register_time_events; // need to delete it if stop
+ std::set<uint64_t> register_time_events; // need to delete it if stop
ceph::coarse_mono_clock::time_point last_connect_started;
ceph::coarse_mono_clock::time_point last_active;
ceph::mono_clock::time_point recv_start_time;
diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc
index 4f1224284f2..6e989bca507 100644
--- a/src/msg/async/AsyncMessenger.cc
+++ b/src/msg/async/AsyncMessenger.cc
@@ -32,11 +32,11 @@
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
-static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
+static std::ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
return *_dout << "-- " << m->get_myaddrs() << " ";
}
-static ostream& _prefix(std::ostream *_dout, Processor *p) {
+static std::ostream& _prefix(std::ostream *_dout, Processor *p) {
return *_dout << " Processor -- ";
}
@@ -60,7 +60,7 @@ Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
listen_handler(new C_processor_accept(this)) {}
int Processor::bind(const entity_addrvec_t &bind_addrs,
- const set<int>& avoid_ports,
+ const std::set<int>& avoid_ports,
entity_addrvec_t* bound_addrs)
{
const auto& conf = msgr->cct->_conf;
@@ -278,7 +278,7 @@ class C_handle_reap : public EventCallback {
*/
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
- const std::string &type, string mname, uint64_t _nonce)
+ const std::string &type, std::string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name),
dispatch_queue(cct, this, mname),
nonce(_nonce)
@@ -397,7 +397,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
lock.unlock();
// bind to a socket
- set<int> avoid_ports;
+ std::set<int> avoid_ports;
entity_addrvec_t bound_addrs;
unsigned i = 0;
for (auto &&p : processors) {
@@ -421,7 +421,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
return 0;
}
-int AsyncMessenger::rebind(const set<int>& avoid_ports)
+int AsyncMessenger::rebind(const std::set<int>& avoid_ports)
{
ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
ceph_assert(did_bind);
@@ -437,7 +437,7 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
entity_addrvec_t bound_addrs;
entity_addrvec_t bind_addrs = get_myaddrs();
- set<int> new_avoid(avoid_ports);
+ std::set<int> new_avoid(avoid_ports);
for (auto& a : bind_addrs.v) {
new_avoid.insert(a.get_port());
a.set_port(0);
diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h
index 6e18281f1b9..83a9c5a9b77 100644
--- a/src/msg/async/AsyncMessenger.h
+++ b/src/msg/async/AsyncMessenger.h
@@ -44,9 +44,9 @@ class AsyncMessenger;
*/
class Processor {
AsyncMessenger *msgr;
- NetHandler net;
+ ceph::NetHandler net;
Worker *worker;
- vector<ServerSocket> listen_sockets;
+ std::vector<ServerSocket> listen_sockets;
EventCallbackRef listen_handler;
class C_processor_accept;
@@ -57,7 +57,7 @@ class Processor {
void stop();
int bind(const entity_addrvec_t &bind_addrs,
- const set<int>& avoid_ports,
+ const std::set<int>& avoid_ports,
entity_addrvec_t* bound_addrs);
void start();
void accept();
@@ -82,7 +82,7 @@ public:
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
- string mname, uint64_t _nonce);
+ std::string mname, uint64_t _nonce);
/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
@@ -115,7 +115,7 @@ public:
}
int bind(const entity_addr_t& bind_addr) override;
- int rebind(const set<int>& avoid_ports) override;
+ int rebind(const std::set<int>& avoid_ports) override;
int client_bind(const entity_addr_t& bind_addr) override;
int bindv(const entity_addrvec_t& bind_addrs) override;
@@ -268,10 +268,10 @@ private:
*
* These are not yet in the conns map.
*/
- set<AsyncConnectionRef> accepting_conns;
+ std::set<AsyncConnectionRef> accepting_conns;
/// anonymous outgoing connections
- set<AsyncConnectionRef> anon_conns;
+ std::set<AsyncConnectionRef> anon_conns;
/**
* list of connection are closed which need to be clean up
@@ -285,7 +285,7 @@ private:
* AsyncConnection in this set.
*/
ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock");
- set<AsyncConnectionRef> deleted_conns;
+ std::set<AsyncConnectionRef> deleted_conns;
EventCallbackRef reap_handler;
diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc
index 4d2fb339416..b30a03e8a4a 100644
--- a/src/msg/async/Event.cc
+++ b/src/msg/async/Event.cc
@@ -69,7 +69,7 @@ class C_handle_notify : public EventCallback {
* about the poller. The name of the superclass is probably sufficient
* for most cases.
*/
-EventCenter::Poller::Poller(EventCenter* center, const string& name)
+EventCenter::Poller::Poller(EventCenter* center, const std::string& name)
: owner(center), poller_name(name), slot(owner->pollers.size())
{
owner->pollers.push_back(this);
@@ -94,7 +94,7 @@ EventCenter::Poller::~Poller()
slot = -1;
}
-ostream& EventCenter::_event_prefix(std::ostream *_dout)
+std::ostream& EventCenter::_event_prefix(std::ostream *_dout)
{
return *_dout << "Event(" << this << " nevent=" << nevent
<< " time_id=" << time_event_next_id << ").";
@@ -340,6 +340,7 @@ int EventCenter::process_time_events()
{
int processed = 0;
clock_type::time_point now = clock_type::now();
+ using ceph::operator <<;
ldout(cct, 30) << __func__ << " cur time is " << now << dendl;
while (!time_events.empty()) {
@@ -388,7 +389,7 @@ int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan *
tv.tv_usec = timeout_microseconds % 1000000;
ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
- vector<FiredFileEvent> fired_events;
+ std::vector<FiredFileEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
auto working_start = ceph::mono_clock::now();
for (int event_id = 0; event_id < numevents; event_id++) {
@@ -422,7 +423,7 @@ int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan *
if (external_num_events.load()) {
external_lock.lock();
- deque<EventCallbackRef> cur_process;
+ std::deque<EventCallbackRef> cur_process;
cur_process.swap(external_events);
external_num_events.store(0);
external_lock.unlock();
diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h
index d7dba443956..bb954357174 100644
--- a/src/msg/async/Event.h
+++ b/src/msg/async/Event.h
@@ -76,7 +76,7 @@ class EventDriver {
virtual int init(EventCenter *center, int nevent) = 0;
virtual int add_event(int fd, int cur_mask, int mask) = 0;
virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
- virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
+ virtual int event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
virtual int resize_events(int newsize) = 0;
virtual bool need_wakeup() { return true; }
};
@@ -121,7 +121,7 @@ class EventCenter {
*/
class Poller {
public:
- explicit Poller(EventCenter* center, const string& pollerName);
+ explicit Poller(EventCenter* center, const std::string& pollerName);
virtual ~Poller();
/**
@@ -142,7 +142,7 @@ class EventCenter {
/// Human-readable string name given to the poller to make it
/// easy to identify for debugging. For most pollers just passing
/// in the subclass name probably makes sense.
- string poller_name;
+ std::string poller_name;
/// Index of this Poller in EventCenter::pollers. Allows deletion
/// without having to scan all the entries in pollers. -1 means
@@ -159,8 +159,8 @@ class EventCenter {
pthread_t owner = 0;
std::mutex external_lock;
std::atomic_ulong external_num_events;
- deque<EventCallbackRef> external_events;
- vector<FileEvent> file_events;
+ std::deque<EventCallbackRef> external_events;
+ std::vector<FileEvent> file_events;
EventDriver *driver;
std::multimap<clock_type::time_point, TimeEvent> time_events;
// Keeps track of all of the pollers currently defined. We don't
@@ -171,7 +171,7 @@ class EventCenter {
uint64_t time_event_next_id;
int notify_receive_fd;
int notify_send_fd;
- NetHandler net;
+ ceph::NetHandler net;
EventCallbackRef notify_handler;
unsigned center_id;
AssociatedCenters *global_centers = nullptr;
@@ -190,7 +190,7 @@ class EventCenter {
notify_receive_fd(-1), notify_send_fd(-1), net(c),
notify_handler(NULL), center_id(0) { }
~EventCenter();
- ostream& _event_prefix(std::ostream *_dout);
+ std::ostream& _event_prefix(std::ostream *_dout);
int init(int nevent, unsigned center_id, const std::string &type);
void set_owner();
diff --git a/src/msg/async/EventEpoll.cc b/src/msg/async/EventEpoll.cc
index 000aaf4fcbc..7ed5321dcda 100644
--- a/src/msg/async/EventEpoll.cc
+++ b/src/msg/async/EventEpoll.cc
@@ -116,7 +116,7 @@ int EpollDriver::resize_events(int newsize)
return 0;
}
-int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
+int EpollDriver::event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tvp)
{
int retval, numevents = 0;
diff --git a/src/msg/async/EventEpoll.h b/src/msg/async/EventEpoll.h
index 0221f90d34c..454ecbc34ff 100644
--- a/src/msg/async/EventEpoll.h
+++ b/src/msg/async/EventEpoll.h
@@ -42,7 +42,7 @@ class EpollDriver : public EventDriver {
int add_event(int fd, int cur_mask, int add_mask) override;
int del_event(int fd, int cur_mask, int del_mask) override;
int resize_events(int newsize) override;
- int event_wait(vector<FiredFileEvent> &fired_events,
+ int event_wait(std::vector<FiredFileEvent> &fired_events,
struct timeval *tp) override;
};
diff --git a/src/msg/async/EventSelect.cc b/src/msg/async/EventSelect.cc
index fdee6ebc3c8..8957792bb93 100644
--- a/src/msg/async/EventSelect.cc
+++ b/src/msg/async/EventSelect.cc
@@ -67,7 +67,7 @@ int SelectDriver::resize_events(int newsize)
return 0;
}
-int SelectDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
+int SelectDriver::event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tvp)
{
int retval, numevents = 0;
diff --git a/src/msg/async/EventSelect.h b/src/msg/async/EventSelect.h
index 1b75da0b1b0..08af57bcfd2 100644
--- a/src/msg/async/EventSelect.h
+++ b/src/msg/async/EventSelect.h
@@ -35,7 +35,7 @@ class SelectDriver : public EventDriver {
int add_event(int fd, int cur_mask, int add_mask) override;
int del_event(int fd, int cur_mask, int del_mask) override;
int resize_events(int newsize) override;
- int event_wait(vector<FiredFileEvent> &fired_events,
+ int event_wait(std::vector<FiredFileEvent> &fired_events,
struct timeval *tp) override;
};
diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc
index 0fc344c2ff4..a7a7fc41dd0 100644
--- a/src/msg/async/PosixStack.cc
+++ b/src/msg/async/PosixStack.cc
@@ -38,13 +38,14 @@
#define dout_prefix *_dout << "PosixStack "
class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
- NetHandler &handler;
+ ceph::NetHandler &handler;
int _fd;
entity_addr_t sa;
bool connected;
public:
- explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
+ explicit PosixConnectedSocketImpl(ceph::NetHandler &h, const entity_addr_t &sa,
+ int f, bool connected)
: handler(h), _fd(f), sa(sa), connected(connected) {}
int is_connected() override {
@@ -106,7 +107,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
return (ssize_t)sent;
}
- ssize_t send(bufferlist &bl, bool more) override {
+ ssize_t send(ceph::buffer::list &bl, bool more) override {
size_t sent_bytes = 0;
auto pb = std::cbegin(bl.buffers());
uint64_t left_pbrs = bl.get_num_buffers();
@@ -138,7 +139,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
}
if (sent_bytes) {
- bufferlist swapped;
+ ceph::buffer::list swapped;
if (sent_bytes < bl.length()) {
bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
bl.swap(swapped);
@@ -163,11 +164,11 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
};
class PosixServerSocketImpl : public ServerSocketImpl {
- NetHandler &handler;
+ ceph::NetHandler &handler;
int _fd;
public:
- explicit PosixServerSocketImpl(NetHandler &h, int f,
+ explicit PosixServerSocketImpl(ceph::NetHandler &h, int f,
const entity_addr_t& listen_addr, unsigned slot)
: ServerSocketImpl(listen_addr.get_type(), slot),
handler(h), _fd(f) {}
@@ -281,7 +282,7 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C
return 0;
}
-PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
+PosixNetworkStack::PosixNetworkStack(CephContext *c, const std::string &t)
: NetworkStack(c, t)
{
}
diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h
index f1aaccd4b82..4aed9dd6444 100644
--- a/src/msg/async/PosixStack.h
+++ b/src/msg/async/PosixStack.h
@@ -25,7 +25,7 @@
#include "Stack.h"
class PosixWorker : public Worker {
- NetHandler net;
+ ceph::NetHandler net;
void initialize() override;
public:
PosixWorker(CephContext *c, unsigned i)
@@ -38,10 +38,10 @@ class PosixWorker : public Worker {
};
class PosixNetworkStack : public NetworkStack {
- vector<std::thread> threads;
+ std::vector<std::thread> threads;
public:
- explicit PosixNetworkStack(CephContext *c, const string &t);
+ explicit PosixNetworkStack(CephContext *c, const std::string &t);
void spawn_worker(unsigned i, std::function<void ()> &&func) override {
threads.resize(i+1);
diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h
index cccba183567..10436307ebf 100644
--- a/src/msg/async/Protocol.h
+++ b/src/msg/async/Protocol.h
@@ -47,7 +47,7 @@ public:
};
using rx_buffer_t =
- std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>;
+ std::unique_ptr<ceph::buffer::ptr_node, ceph::buffer::ptr_node::disposer>;
template <class C>
class CtRxNode : public Ct<C> {
diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc
index cae647edc93..75b0b8ed67b 100644
--- a/src/msg/async/ProtocolV1.cc
+++ b/src/msg/async/ProtocolV1.cc
@@ -15,7 +15,7 @@
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix _conn_prefix(_dout)
-ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
+std::ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
return *_dout << "--1- " << messenger->get_myaddrs() << " >> "
<< *connection->peer_addrs
<< " conn("
@@ -39,7 +39,7 @@ const int ASYNC_COALESCE_THRESHOLD = 256;
using namespace std;
-static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
+static void alloc_aligned_buffer(ceph::buffer::list &data, unsigned len, unsigned off) {
// create a buffer to read into that matches the data alignment
unsigned alloc_len = 0;
unsigned left = len;
@@ -51,7 +51,7 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
left -= head;
}
alloc_len += left;
- bufferptr ptr(buffer::create_small_page_aligned(alloc_len));
+ ceph::bufferptr ptr(ceph::buffer::create_small_page_aligned(alloc_len));
if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
data.push_back(std::move(ptr));
}
@@ -211,7 +211,7 @@ void ProtocolV1::fault() {
}
void ProtocolV1::send_message(Message *m) {
- bufferlist bl;
+ ceph::buffer::list bl;
uint64_t f = connection->get_features();
// TODO: Currently not all messages supports reencode like MOSDMap, so here
@@ -249,7 +249,7 @@ void ProtocolV1::send_message(Message *m) {
}
void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
- bufferlist &bl) {
+ ceph::buffer::list &bl) {
ldout(cct, 20) << __func__ << " m " << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
@@ -315,7 +315,7 @@ void ProtocolV1::write_event() {
auto start = ceph::mono_clock::now();
bool more;
do {
- bufferlist data;
+ ceph::buffer::list data;
Message *m = _get_next_outgoing(&data);
if (!m) {
break;
@@ -436,7 +436,7 @@ CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next,
}
CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next,
- bufferlist &buffer) {
+ ceph::buffer::list &buffer) {
ssize_t r = connection->write(buffer, [&next, this](int r) {
next.setParams(r);
CONTINUATION_RUN(next);
@@ -749,7 +749,7 @@ CtPtr ProtocolV1::read_message_front() {
unsigned front_len = current_header.front_len;
if (front_len) {
if (!front.length()) {
- front.push_back(buffer::create(front_len));
+ front.push_back(ceph::buffer::create(front_len));
}
return READB(front_len, front.c_str(), handle_message_front);
}
@@ -774,7 +774,7 @@ CtPtr ProtocolV1::read_message_middle() {
if (current_header.middle_len) {
if (!middle.length()) {
- middle.push_back(buffer::create(current_header.middle_len));
+ middle.push_back(ceph::buffer::create(current_header.middle_len));
}
return READB(current_header.middle_len, middle.c_str(),
handle_message_middle);
@@ -807,7 +807,7 @@ CtPtr ProtocolV1::read_message_data_prepare() {
#if 0
// rx_buffers is broken by design... see
// http://tracker.ceph.com/issues/22480
- map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
+ map<ceph_tid_t, pair<ceph::buffer::list, int> >::iterator p =
connection->rx_buffers.find(current_header.tid);
if (p != connection->rx_buffers.end()) {
ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
@@ -841,7 +841,7 @@ CtPtr ProtocolV1::read_message_data() {
ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
if (msg_left > 0) {
- bufferptr bp = data_blp.get_current_ptr();
+ auto bp = data_blp.get_current_ptr();
unsigned read_len = std::min(bp.length(), msg_left);
return READB(read_len, bp.c_str(), handle_message_data);
@@ -858,7 +858,7 @@ CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
return _fault();
}
- bufferptr bp = data_blp.get_current_ptr();
+ auto bp = data_blp.get_current_ptr();
unsigned read_len = std::min(bp.length(), msg_left);
ceph_assert(read_len <
static_cast<unsigned>(std::numeric_limits<int>::max()));
@@ -1096,7 +1096,7 @@ void ProtocolV1::randomize_out_seq() {
}
}
-ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
+ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) {
FUNCTRACE(cct);
ceph_assert(connection->center->in_thread());
m->set_seq(++out_seq);
@@ -1192,7 +1192,7 @@ void ProtocolV1::requeue_sent() {
return;
}
- list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+ list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
out_seq -= sent.size();
while (!sent.empty()) {
Message *m = sent.back();
@@ -1200,7 +1200,7 @@ void ProtocolV1::requeue_sent() {
ldout(cct, 10) << __func__ << " " << *m << " for resend "
<< " (" << m->get_seq() << ")" << dendl;
m->clear_payload();
- rq.push_front(make_pair(bufferlist(), m));
+ rq.push_front(make_pair(ceph::buffer::list(), m));
}
}
@@ -1210,10 +1210,10 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
return seq;
}
- list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+ list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
uint64_t count = out_seq;
while (!rq.empty()) {
- pair<bufferlist, Message *> p = rq.front();
+ pair<ceph::buffer::list, Message *> p = rq.front();
if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
<< p.second->get_seq() << " <= " << seq << ", discarding"
@@ -1238,10 +1238,10 @@ void ProtocolV1::discard_out_queue() {
(*p)->put();
}
sent.clear();
- for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
+ for (map<int, list<pair<ceph::buffer::list, Message *> > >::iterator p =
out_q.begin();
p != out_q.end(); ++p) {
- for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
+ for (list<pair<ceph::buffer::list, Message *> >::iterator r = p->second.begin();
r != p->second.end(); ++r) {
ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
r->second->put();
@@ -1304,13 +1304,13 @@ void ProtocolV1::reset_recv_state()
}
}
-Message *ProtocolV1::_get_next_outgoing(bufferlist *bl) {
+Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) {
Message *m = 0;
if (!out_q.empty()) {
- map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
+ map<int, list<pair<ceph::buffer::list, Message *> > >::reverse_iterator it =
out_q.rbegin();
ceph_assert(!it->second.empty());
- list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
+ list<pair<ceph::buffer::list, Message *> >::iterator p = it->second.begin();
m = p->second;
if (p->first.length() && bl) {
assert(bl->length() == 0);
@@ -1330,7 +1330,7 @@ CtPtr ProtocolV1::send_client_banner() {
ldout(cct, 20) << __func__ << dendl;
state = CONNECTING;
- bufferlist bl;
+ ceph::buffer::list bl;
bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
return WRITE(bl, handle_client_banner_write);
}
@@ -1353,7 +1353,7 @@ CtPtr ProtocolV1::wait_server_banner() {
ldout(cct, 20) << __func__ << dendl;
- bufferlist myaddrbl;
+ ceph::buffer::list myaddrbl;
unsigned banner_len = strlen(CEPH_BANNER);
unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
return READ(need_len, handle_server_banner_and_identify);
@@ -1375,7 +1375,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
return _fault();
}
- bufferlist bl;
+ ceph::buffer::list bl;
entity_addr_t paddr, peer_addr_for_me;
bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2);
@@ -1383,7 +1383,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
try {
decode(paddr, p);
decode(peer_addr_for_me, p);
- } catch (const buffer::error &e) {
+ } catch (const ceph::buffer::error &e) {
lderr(cct) << __func__ << " decode peer addr failed " << dendl;
return _fault();
}
@@ -1446,7 +1446,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
}
}
- bufferlist myaddrbl;
+ ceph::buffer::list myaddrbl;
encode(messenger->get_myaddr_legacy(), myaddrbl, 0); // legacy
return WRITE(myaddrbl, handle_my_addr_write);
}
@@ -1472,7 +1472,7 @@ CtPtr ProtocolV1::send_connect_message()
ldout(cct, 20) << __func__ << dendl;
ceph_assert(messenger->auth_client);
- bufferlist auth_bl;
+ ceph::buffer::list auth_bl;
vector<uint32_t> preferred_modes;
if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
@@ -1523,7 +1523,7 @@ CtPtr ProtocolV1::send_connect_message()
CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
}
- bufferlist bl;
+ ceph::buffer::list bl;
bl.append((char *)&connect, sizeof(connect));
if (auth_bl.length()) {
bl.append(auth_bl.c_str(), auth_bl.length());
@@ -1605,14 +1605,14 @@ CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
return _fault();
}
- bufferlist authorizer_reply;
+ ceph::buffer::list authorizer_reply;
authorizer_reply.append(buffer, connect_reply.authorizer_len);
if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
auto am = auth_meta;
bool more = (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER);
- bufferlist auth_retry_bl;
+ ceph::buffer::list auth_retry_bl;
int r;
connection->lock.unlock();
if (more) {
@@ -1750,7 +1750,7 @@ CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
<< " vs out_seq " << out_seq << dendl;
out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
- bufferlist bl;
+ ceph::buffer::list bl;
uint64_t s = in_seq;
bl.append((char *)&s, sizeof(s));
@@ -1821,7 +1821,7 @@ CtPtr ProtocolV1::send_server_banner() {
ldout(cct, 20) << __func__ << dendl;
state = ACCEPTING;
- bufferlist bl;
+ ceph::buffer::list bl;
bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
@@ -1874,14 +1874,14 @@ CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
return _fault();
}
- bufferlist addr_bl;
+ ceph::buffer::list addr_bl;
entity_addr_t peer_addr;
addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
try {
auto ti = addr_bl.cbegin();
decode(peer_addr, ti);
- } catch (const buffer::error &e) {
+ } catch (const ceph::buffer::error &e) {
lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
return _fault();
}
@@ -1932,7 +1932,7 @@ CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
CtPtr ProtocolV1::wait_connect_message_auth() {
ldout(cct, 20) << __func__ << dendl;
authorizer_buf.clear();
- authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
+ authorizer_buf.push_back(ceph::buffer::create(connect_msg.authorizer_len));
return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
handle_connect_message_auth);
}
@@ -1968,7 +1968,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
<< dendl;
ceph_msg_connect_reply reply;
- bufferlist authorizer_reply;
+ ceph::buffer::list authorizer_reply;
// FIPS zeroization audit 20191115: this memset is not security related.
memset(&reply, 0, sizeof(reply));
@@ -2017,7 +2017,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
authorizer_reply);
}
- bufferlist auth_bl_copy = authorizer_buf;
+ ceph::buffer::list auth_bl_copy = authorizer_buf;
auto am = auth_meta;
am->auth_method = connect_msg.authorizer_protocol;
connection->lock.unlock();
@@ -2265,9 +2265,9 @@ CtPtr ProtocolV1::handle_connect_message_2() {
CtPtr ProtocolV1::send_connect_message_reply(char tag,
ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply) {
+ ceph::buffer::list &authorizer_reply) {
ldout(cct, 20) << __func__ << dendl;
- bufferlist reply_bl;
+ ceph::buffer::list reply_bl;
reply.tag = tag;
reply.features =
((uint64_t)connect_msg.features & connection->policy.features_supported) |
@@ -2305,7 +2305,7 @@ CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply) {
+ ceph::buffer::list &authorizer_reply) {
ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
connection->inject_delay();
@@ -2438,7 +2438,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
}
CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply) {
+ ceph::buffer::list &authorizer_reply) {
ldout(cct, 20) << __func__ << dendl;
connect_seq = connect_msg.connect_seq + 1;
@@ -2482,7 +2482,7 @@ CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
auth_meta->session_key,
connection->get_features()));
- bufferlist reply_bl;
+ ceph::buffer::list reply_bl;
reply_bl.append((char *)&reply, sizeof(reply));
if (reply.authorizer_len) {
diff --git a/src/msg/async/ProtocolV1.h b/src/msg/async/ProtocolV1.h
index 72b707fe2a6..b56a966dbd7 100644
--- a/src/msg/async/ProtocolV1.h
+++ b/src/msg/async/ProtocolV1.h
@@ -104,9 +104,9 @@ protected:
enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
std::atomic<WriteStatus> can_write;
- std::list<Message *> sent; // the first bufferlist need to inject seq
+ std::list<Message *> sent; // the first ceph::buffer::list need to inject seq
// priority queue for outbound msgs
- std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q;
+ std::map<int, std::list<std::pair<ceph::buffer::list, Message *>>> out_q;
bool keepalive;
bool write_in_progress = false;
@@ -120,8 +120,8 @@ protected:
// Open state
ceph_msg_connect connect_msg;
ceph_msg_connect_reply connect_reply;
- bufferlist authorizer_buf; // auth(orizer) payload read off the wire
- bufferlist authorizer_more; // connect-side auth retry (we added challenge)
+ ceph::buffer::list authorizer_buf; // auth(orizer) payload read off the wire
+ ceph::buffer::list authorizer_more; // connect-side auth retry (we added challenge)
utime_t backoff; // backoff time
utime_t recv_stamp;
@@ -129,9 +129,9 @@ protected:
unsigned msg_left;
uint64_t cur_msg_size;
ceph_msg_header current_header;
- bufferlist data_buf;
- bufferlist::iterator data_blp;
- bufferlist front, middle, data;
+ ceph::buffer::list data_buf;
+ ceph::buffer::list::iterator data_blp;
+ ceph::buffer::list front, middle, data;
bool replacing; // when replacing process happened, we will reply connect
// side with RETRY tag and accept side will clear replaced
@@ -147,7 +147,7 @@ protected:
void run_continuation(CtPtr pcontinuation);
CtPtr read(CONTINUATION_RX_TYPE<ProtocolV1> &next, int len,
char *buffer = nullptr);
- CtPtr write(CONTINUATION_TX_TYPE<ProtocolV1> &next,bufferlist &bl);
+ CtPtr write(CONTINUATION_TX_TYPE<ProtocolV1> &next,ceph::buffer::list &bl);
inline CtPtr _fault() { // helper fault method that stops continuation
fault();
return nullptr;
@@ -194,10 +194,10 @@ protected:
void session_reset();
void randomize_out_seq();
- Message *_get_next_outgoing(bufferlist *bl);
+ Message *_get_next_outgoing(ceph::buffer::list *bl);
- void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
- ssize_t write_message(Message *m, bufferlist &bl, bool more);
+ void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl);
+ ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more);
void requeue_sent();
uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
@@ -205,7 +205,7 @@ protected:
void reset_recv_state();
- ostream &_conn_prefix(std::ostream *_dout);
+ std::ostream& _conn_prefix(std::ostream *_dout);
public:
ProtocolV1(AsyncConnection *connection);
@@ -281,11 +281,11 @@ protected:
CtPtr handle_connect_message_auth(char *buffer, int r);
CtPtr handle_connect_message_2();
CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply);
+ ceph::buffer::list &authorizer_reply);
CtPtr handle_connect_message_reply_write(int r);
CtPtr replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply,
- bufferlist &authorizer_reply);
- CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply);
+ ceph::buffer::list &authorizer_reply);
+ CtPtr open(ceph_msg_connect_reply &reply, ceph::buffer::list &authorizer_reply);
CtPtr handle_ready_connect_message_reply_write(int r);
CtPtr wait_seq();
CtPtr handle_seq(char *buffer, int r);
diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc
index d9a5a72aecc..6164034445f 100644
--- a/src/msg/async/ProtocolV2.cc
+++ b/src/msg/async/ProtocolV2.cc
@@ -16,7 +16,7 @@
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix _conn_prefix(_dout)
-ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
+std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
<< *connection->peer_addrs << " conn(" << connection << " "
<< this
@@ -43,7 +43,7 @@ void ProtocolV2::run_continuation(CtPtr pcontinuation) {
void ProtocolV2::run_continuation(CtRef continuation) {
try {
CONTINUATION_RUN(continuation)
- } catch (const buffer::error &e) {
+ } catch (const ceph::buffer::error &e) {
lderr(cct) << __func__ << " failed decoding of frame header: " << e
<< dendl;
_fault();
@@ -57,7 +57,7 @@ void ProtocolV2::run_continuation(CtRef continuation) {
#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
-#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
+#define READ(L, C) read(CONTINUATION(C), ceph::buffer::ptr_node::create(ceph::buffer::create(L)))
#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
@@ -119,7 +119,7 @@ bool ProtocolV2::is_connected() { return can_write; }
void ProtocolV2::discard_out_queue() {
ldout(cct, 10) << __func__ << " started" << dendl;
- for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
+ for (auto p = sent.begin(); p != sent.end(); ++p) {
ldout(cct, 20) << __func__ << " discard " << *p << dendl;
(*p)->put();
}
@@ -766,7 +766,7 @@ CtPtr ProtocolV2::write(const std::string &desc,
CtPtr ProtocolV2::write(const std::string &desc,
CONTINUATION_TYPE<ProtocolV2> &next,
- bufferlist &buffer) {
+ ceph::bufferlist &buffer) {
if (unlikely(pre_auth.enabled)) {
pre_auth.txbuf.append(buffer);
ceph_assert(!cct->_conf->ms_die_on_bug ||
@@ -800,11 +800,12 @@ CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
ldout(cct, 20) << __func__ << dendl;
bannerExchangeCallback = &callback;
- bufferlist banner_payload;
+ ceph::bufferlist banner_payload;
+ using ceph::encode;
encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
- bufferlist bl;
+ ceph::bufferlist bl;
bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);
@@ -841,14 +842,15 @@ CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
}
uint16_t payload_len;
- bufferlist bl;
+ ceph::bufferlist bl;
buffer->set_offset(banner_prefix_len);
buffer->set_length(sizeof(ceph_le16));
bl.push_back(std::move(buffer));
auto ti = bl.cbegin();
+ using ceph::decode;
try {
decode(payload_len, ti);
- } catch (const buffer::error &e) {
+ } catch (const ceph::buffer::error &e) {
lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
return _fault();
}
@@ -870,13 +872,14 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
uint64_t peer_supported_features;
uint64_t peer_required_features;
- bufferlist bl;
+ ceph::bufferlist bl;
+ using ceph::decode;
bl.push_back(std::move(buffer));
auto ti = bl.cbegin();
try {
decode(peer_supported_features, ti);
decode(peer_required_features, ti);
- } catch (const buffer::error &e) {
+ } catch (const ceph::buffer::error &e) {
lderr(cct) << __func__ << " decode banner payload failed " << dendl;
return _fault();
}
@@ -1169,7 +1172,7 @@ CtPtr ProtocolV2::read_frame_segment() {
const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
rx_buffer_t rx_buffer;
try {
- rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
+ rx_buffer = ceph::buffer::ptr_node::create(ceph::buffer::create_aligned(
get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
} catch (std::bad_alloc&) {
// Catching because of potential issues with satisfying alignment.
@@ -1736,8 +1739,8 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
<< " auth_client " << messenger->auth_client << dendl;
- bufferlist bl;
- vector<uint32_t> preferred_modes;
+ ceph::bufferlist bl;
+ std::vector<uint32_t> preferred_modes;
auto am = auth_meta;
connection->lock.unlock();
int r = messenger->auth_client->get_auth_request(
@@ -2194,12 +2197,12 @@ CtPtr ProtocolV2::_auth_bad_method(int r)
return WRITE(bad_method, "bad auth method", read_frame);
}
-CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+CtPtr ProtocolV2::_handle_auth_request(ceph::bufferlist& auth_payload, bool more)
{
if (!messenger->auth_server) {
return _fault();
}
- bufferlist reply;
+ ceph::bufferlist reply;
auto am = auth_meta;
connection->lock.unlock();
int r = messenger->auth_server->handle_auth_request(
diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h
index e5544f98746..b747f401316 100644
--- a/src/msg/async/ProtocolV2.h
+++ b/src/msg/async/ProtocolV2.h
@@ -115,7 +115,7 @@ private:
bool keepalive;
bool write_in_progress = false;
- ostream &_conn_prefix(std::ostream *_dout);
+ std::ostream& _conn_prefix(std::ostream *_dout);
void run_continuation(Ct<ProtocolV2> *pcontinuation);
void run_continuation(Ct<ProtocolV2> &continuation);
@@ -127,7 +127,7 @@ private:
F &frame);
Ct<ProtocolV2> *write(const std::string &desc,
CONTINUATION_TYPE<ProtocolV2> &next,
- bufferlist &buffer);
+ ceph::bufferlist &buffer);
void requeue_sent();
uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
@@ -238,7 +238,7 @@ private:
Ct<ProtocolV2> *post_server_banner_exchange();
Ct<ProtocolV2> *handle_auth_request(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_auth_request_more(ceph::bufferlist &payload);
- Ct<ProtocolV2> *_handle_auth_request(bufferlist& auth_payload, bool more);
+ Ct<ProtocolV2> *_handle_auth_request(ceph::bufferlist& auth_payload, bool more);
Ct<ProtocolV2> *_auth_bad_method(int r);
Ct<ProtocolV2> *handle_client_ident(ceph::bufferlist &payload);
Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc
index 6b18d1de9cb..74d07829be9 100644
--- a/src/msg/async/Stack.cc
+++ b/src/msg/async/Stack.cc
@@ -63,7 +63,8 @@ std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
};
}
-std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
+std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
+ const std::string &t)
{
if (t == "posix")
return std::make_shared<PosixNetworkStack>(c, t);
@@ -82,7 +83,7 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string
return nullptr;
}
-Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned worker_id)
+Worker* NetworkStack::create_worker(CephContext *c, const std::string &type, unsigned worker_id)
{
if (type == "posix")
return new PosixWorker(c, worker_id);
@@ -101,7 +102,7 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned
return nullptr;
}
-NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
+NetworkStack::NetworkStack(CephContext *c, const std:: string &t): type(t), started(false), cct(c)
{
ceph_assert(cct->_conf->ms_async_op_threads > 0);
diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h
index 37a33163d6b..7b8b62f36fb 100644
--- a/src/msg/async/Stack.h
+++ b/src/msg/async/Stack.h
@@ -28,7 +28,7 @@ class ConnectedSocketImpl {
virtual ~ConnectedSocketImpl() {}
virtual int is_connected() = 0;
virtual ssize_t read(char*, size_t) = 0;
- virtual ssize_t send(bufferlist &bl, bool more) = 0;
+ virtual ssize_t send(ceph::buffer::list &bl, bool more) = 0;
virtual void shutdown() = 0;
virtual void close() = 0;
virtual int fd() const = 0;
@@ -96,7 +96,7 @@ class ConnectedSocket {
/// Gets the output stream.
///
/// Gets an object that sends data to the remote endpoint.
- ssize_t send(bufferlist &bl, bool more) {
+ ssize_t send(ceph::buffer::list &bl, bool more) {
return _csi->send(bl, more);
}
/// Disables output to the socket.
@@ -302,9 +302,9 @@ class NetworkStack {
protected:
CephContext *cct;
- vector<Worker*> workers;
+ std::vector<Worker*> workers;
- explicit NetworkStack(CephContext *c, const string &t);
+ explicit NetworkStack(CephContext *c, const std::string &t);
public:
NetworkStack(const NetworkStack &) = delete;
NetworkStack& operator=(const NetworkStack &) = delete;
@@ -314,10 +314,10 @@ class NetworkStack {
}
static std::shared_ptr<NetworkStack> create(
- CephContext *c, const string &type);
+ CephContext *c, const std::string &type);
static Worker* create_worker(
- CephContext *c, const string &t, unsigned i);
+ CephContext *c, const std::string &t, unsigned i);
// backend need to override this method if backend doesn't support shared
// listen table.
// For example, posix backend has in kernel global listen table. If one
diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h
index ddc42a489cf..2f187ceb769 100644
--- a/src/msg/async/frames_v2.h
+++ b/src/msg/async/frames_v2.h
@@ -403,14 +403,14 @@ protected:
struct AuthRequestFrame : public ControlFrame<AuthRequestFrame,
uint32_t, // auth method
- vector<uint32_t>, // preferred modes
+ std::vector<uint32_t>, // preferred modes
bufferlist> { // auth payload
static const Tag tag = Tag::AUTH_REQUEST;
using ControlFrame::Encode;
using ControlFrame::Decode;
inline uint32_t &method() { return get_val<0>(); }
- inline vector<uint32_t> &preferred_modes() { return get_val<1>(); }
+ inline std::vector<uint32_t> &preferred_modes() { return get_val<1>(); }
inline bufferlist &auth_payload() { return get_val<2>(); }
protected:
diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc
index e2c06229f41..52323f948c6 100644
--- a/src/msg/async/rdma/Infiniband.cc
+++ b/src/msg/async/rdma/Infiniband.cc
@@ -579,7 +579,7 @@ int Infiniband::CompletionChannel::init()
<< cpp_strerror(errno) << dendl;
return -1;
}
- int rc = NetHandler(cct).set_nonblock(channel->fd);
+ int rc = ceph::NetHandler(cct).set_nonblock(channel->fd);
if (rc < 0) {
ibv_destroy_comp_channel(channel);
return -1;
@@ -1062,7 +1062,7 @@ void Infiniband::init()
device->binding_port(cct, port_num);
ib_physical_port = device->active_port->get_port_num();
pd = new ProtectionDomain(cct, device);
- ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+ ceph_assert(ceph::NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
support_srq = cct->_conf->ms_async_rdma_support_srq;
if (support_srq) {
diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h
index 3af89f304fe..03e59e7ac17 100644
--- a/src/msg/async/rdma/Infiniband.h
+++ b/src/msg/async/rdma/Infiniband.h
@@ -549,7 +549,7 @@ class Infiniband {
uint32_t max_recv_wr;
uint32_t q_key;
bool dead;
- vector<Chunk*> recv_queue;
+ std::vector<Chunk*> recv_queue;
ceph::mutex lock = ceph::make_mutex("queue_pair_lock");
};
diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
index c897f94f4d5..5e75e961d18 100644
--- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
+++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
@@ -47,8 +47,8 @@ class C_handle_connection_read : public EventCallback {
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
-RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher,
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband> &ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w)
: cct(cct), connected(0), error(0), ib(ib),
dispatcher(rdma_dispatcher), worker(w),
@@ -129,7 +129,7 @@ int RDMAConnectedSocketImpl::activate()
int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
<< opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
- NetHandler net(cct);
+ ceph::NetHandler net(cct);
// we construct a socket to transport ib sync message
// but we shouldn't block in tcp connecting
@@ -337,7 +337,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
return read_size;
}
-ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
+ssize_t RDMAConnectedSocketImpl::send(ceph::buffer::list &bl, bool more)
{
if (error) {
if (!active)
@@ -440,7 +440,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
if (total_copied == 0)
return -EAGAIN;
ceph_assert(total_copied <= pending_bl.length());
- bufferlist swapped;
+ ceph::buffer::list swapped;
if (total_copied < pending_bl.length()) {
worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
@@ -463,7 +463,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
{
ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
- vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
+ auto current_buffer = tx_buffers.begin();
ibv_sge isge[tx_buffers.size()];
uint32_t current_sge = 0;
ibv_send_wr iswr[tx_buffers.size()];
diff --git a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
index d55ced3c53f..606dbd2817c 100644
--- a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
+++ b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
@@ -7,8 +7,8 @@
#define TIMEOUT_MS 3000
#define RETRY_COUNT 7
-RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher,
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w, RDMACMInfo *info)
: RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this))
{
diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
index e4a170ee8be..0500b4420f9 100644
--- a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
+++ b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
@@ -8,8 +8,8 @@
#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
- CephContext *cct, shared_ptr<Infiniband>& ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w,
+ CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w,
entity_addr_t& a, unsigned addr_slot)
: RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot)
{
diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc
index cc85832eddd..99d12131ffb 100644
--- a/src/msg/async/rdma/RDMAServerSocketImpl.cc
+++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc
@@ -25,8 +25,8 @@
#define dout_prefix *_dout << " RDMAServerSocketImpl "
RDMAServerSocketImpl::RDMAServerSocketImpl(
- CephContext *cct, shared_ptr<Infiniband>& ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w, entity_addr_t& a, unsigned slot)
: ServerSocketImpl(a.get_type(), slot),
cct(cct), net(cct), server_setup_socket(-1), ib(ib),
diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc
index b68aeb1a8ef..5e88e7d01dd 100644
--- a/src/msg/async/rdma/RDMAStack.cc
+++ b/src/msg/async/rdma/RDMAStack.cc
@@ -40,7 +40,7 @@ RDMADispatcher::~RDMADispatcher()
ceph_assert(dead_queue_pairs.empty());
}
-RDMADispatcher::RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib)
+RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib)
: cct(c), ib(ib)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
@@ -772,9 +772,9 @@ void RDMAWorker::handle_pending_message()
dispatcher->notify_pending_workers();
}
-RDMAStack::RDMAStack(CephContext *cct, const string &t)
- : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
- rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
+RDMAStack::RDMAStack(CephContext *cct, const std::string &t)
+ : NetworkStack(cct, t), ib(std::make_shared<Infiniband>(cct)),
+ rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib))
{
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h
index 45a043d2e23..c9772904ac2 100644
--- a/src/msg/async/rdma/RDMAStack.h
+++ b/src/msg/async/rdma/RDMAStack.h
@@ -40,7 +40,7 @@ class RDMADispatcher {
std::thread t;
CephContext *cct;
- shared_ptr<Infiniband> ib;
+ std::shared_ptr<Infiniband> ib;
Infiniband::CompletionQueue* tx_cq = nullptr;
Infiniband::CompletionQueue* rx_cq = nullptr;
Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
@@ -81,7 +81,7 @@ class RDMADispatcher {
public:
PerfCounters *perf_logger;
- explicit RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib);
+ explicit RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib);
virtual ~RDMADispatcher();
void handle_async_event();
@@ -120,10 +120,10 @@ class RDMAWorker : public Worker {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::MemoryManager MemoryManager;
typedef std::vector<Chunk*>::iterator ChunkIter;
- shared_ptr<Infiniband> ib;
+ std::shared_ptr<Infiniband> ib;
EventCallbackRef tx_handler;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
- shared_ptr<RDMADispatcher> dispatcher;
+ std::shared_ptr<RDMADispatcher> dispatcher;
ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
class C_handle_cq_tx : public EventCallback {
@@ -150,8 +150,8 @@ class RDMAWorker : public Worker {
pending_sent_conns.remove(o);
}
void handle_pending_message();
- void set_dispatcher(shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
- void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
+ void set_dispatcher(std::shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
+ void set_ib(std::shared_ptr<Infiniband> &ib) {this->ib = ib;}
void notify_worker() {
center.dispatch_event_external(tx_handler);
}
@@ -178,12 +178,12 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
uint32_t local_qpn = 0;
int connected;
int error;
- shared_ptr<Infiniband> ib;
- shared_ptr<RDMADispatcher> dispatcher;
+ std::shared_ptr<Infiniband> ib;
+ std::shared_ptr<RDMADispatcher> dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
int notify_fd = -1;
- bufferlist pending_bl;
+ ceph::buffer::list pending_bl;
ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock");
std::vector<ibv_wc> wc;
@@ -204,8 +204,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
const decltype(std::cbegin(pending_bl.buffers()))& end);
public:
- RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
+ RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
void pass_wc(std::vector<ibv_wc> &&v);
@@ -213,7 +213,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
virtual int is_connected() override { return connected; }
virtual ssize_t read(char* buf, size_t len) override;
- virtual ssize_t send(bufferlist &bl, bool more) override;
+ virtual ssize_t send(ceph::buffer::list &bl, bool more) override;
virtual void shutdown() override;
virtual void close() override;
virtual int fd() const override { return notify_fd; }
@@ -249,8 +249,9 @@ enum RDMA_CM_STATUS {
class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
public:
- RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr);
+ RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, RDMACMInfo *info = nullptr);
~RDMAIWARPConnectedSocketImpl();
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
virtual void close() override;
@@ -283,16 +284,16 @@ class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
class RDMAServerSocketImpl : public ServerSocketImpl {
protected:
CephContext *cct;
- NetHandler net;
+ ceph::NetHandler net;
int server_setup_socket;
- shared_ptr<Infiniband> ib;
- shared_ptr<RDMADispatcher> dispatcher;
+ std::shared_ptr<Infiniband> ib;
+ std::shared_ptr<RDMADispatcher> dispatcher;
RDMAWorker *worker;
entity_addr_t sa;
public:
- RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAServerSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w, entity_addr_t& a, unsigned slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
@@ -304,8 +305,8 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
public:
RDMAIWARPServerSocketImpl(
- CephContext *cct, shared_ptr<Infiniband>& ib,
- shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
@@ -316,15 +317,15 @@ class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
};
class RDMAStack : public NetworkStack {
- vector<std::thread> threads;
+ std::vector<std::thread> threads;
PerfCounters *perf_counter;
- shared_ptr<Infiniband> ib;
- shared_ptr<RDMADispatcher> rdma_dispatcher;
+ std::shared_ptr<Infiniband> ib;
+ std::shared_ptr<RDMADispatcher> rdma_dispatcher;
std::atomic<bool> fork_finished = {false};
public:
- explicit RDMAStack(CephContext *cct, const string &t);
+ explicit RDMAStack(CephContext *cct, const std::string &t);
virtual ~RDMAStack();
virtual bool nonblock_connect_need_writable_event() const override { return false; }
diff --git a/src/msg/msg_types.cc b/src/msg/msg_types.cc
index 7d29713538b..1f0c8242a8c 100644
--- a/src/msg/msg_types.cc
+++ b/src/msg/msg_types.cc
@@ -1,3 +1,5 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
#include "msg_types.h"
@@ -8,26 +10,26 @@
#include "common/Formatter.h"
-void entity_name_t::dump(Formatter *f) const
+void entity_name_t::dump(ceph::Formatter *f) const
{
f->dump_string("type", type_str());
f->dump_unsigned("num", num());
}
-void entity_addr_t::dump(Formatter *f) const
+void entity_addr_t::dump(ceph::Formatter *f) const
{
f->dump_string("type", get_type_name(type));
f->dump_stream("addr") << get_sockaddr();
f->dump_unsigned("nonce", nonce);
}
-void entity_inst_t::dump(Formatter *f) const
+void entity_inst_t::dump(ceph::Formatter *f) const
{
f->dump_object("name", name);
f->dump_object("addr", addr);
}
-void entity_name_t::generate_test_instances(list<entity_name_t*>& o)
+void entity_name_t::generate_test_instances(std::list<entity_name_t*>& o)
{
o.push_back(new entity_name_t(entity_name_t::MON()));
o.push_back(new entity_name_t(entity_name_t::MON(1)));
@@ -35,7 +37,7 @@ void entity_name_t::generate_test_instances(list<entity_name_t*>& o)
o.push_back(new entity_name_t(entity_name_t::CLIENT(1)));
}
-void entity_addr_t::generate_test_instances(list<entity_addr_t*>& o)
+void entity_addr_t::generate_test_instances(std::list<entity_addr_t*>& o)
{
o.push_back(new entity_addr_t());
entity_addr_t *a = new entity_addr_t();
@@ -53,7 +55,7 @@ void entity_addr_t::generate_test_instances(list<entity_addr_t*>& o)
o.push_back(b);
}
-void entity_inst_t::generate_test_instances(list<entity_inst_t*>& o)
+void entity_inst_t::generate_test_instances(std::list<entity_inst_t*>& o)
{
o.push_back(new entity_inst_t());
entity_name_t name;
@@ -175,7 +177,7 @@ bool entity_addr_t::parse(const char *s, const char **end, int default_type)
return true;
}
-ostream& operator<<(ostream& out, const entity_addr_t &addr)
+std::ostream& operator<<(std::ostream& out, const entity_addr_t &addr)
{
if (addr.type == entity_addr_t::TYPE_NONE) {
return out << "-";
@@ -187,7 +189,7 @@ ostream& operator<<(ostream& out, const entity_addr_t &addr)
return out;
}
-ostream& operator<<(ostream& out, const sockaddr *psa)
+std::ostream& operator<<(std::ostream& out, const sockaddr *psa)
{
char buf[NI_MAXHOST] = { 0 };
@@ -211,7 +213,7 @@ ostream& operator<<(ostream& out, const sockaddr *psa)
}
}
-ostream& operator<<(ostream& out, const sockaddr_storage &ss)
+std::ostream& operator<<(std::ostream& out, const sockaddr_storage &ss)
{
return out << (const sockaddr*)&ss;
}
@@ -274,7 +276,7 @@ bool entity_addrvec_t::parse(const char *s, const char **end)
return !v.empty();
}
-void entity_addrvec_t::encode(bufferlist& bl, uint64_t features) const
+void entity_addrvec_t::encode(ceph::buffer::list& bl, uint64_t features) const
{
using ceph::encode;
if ((features & CEPH_FEATURE_MSG_ADDR2) == 0) {
@@ -286,7 +288,7 @@ void entity_addrvec_t::encode(bufferlist& bl, uint64_t features) const
encode(v, bl, features);
}
-void entity_addrvec_t::decode(bufferlist::const_iterator& bl)
+void entity_addrvec_t::decode(ceph::buffer::list::const_iterator& bl)
{
using ceph::decode;
__u8 marker;
@@ -315,21 +317,20 @@ void entity_addrvec_t::decode(bufferlist::const_iterator& bl)
return;
}
if (marker > 2)
- throw buffer::malformed_input("entity_addrvec_marker > 2");
+ throw ceph::buffer::malformed_input("entity_addrvec_marker > 2");
decode(v, bl);
}
-void entity_addrvec_t::dump(Formatter *f) const
+void entity_addrvec_t::dump(ceph::Formatter *f) const
{
f->open_array_section("addrvec");
- for (vector<entity_addr_t>::const_iterator p = v.begin();
- p != v.end(); ++p) {
+ for (auto p = v.begin(); p != v.end(); ++p) {
f->dump_object("addr", *p);
}
f->close_section();
}
-void entity_addrvec_t::generate_test_instances(list<entity_addrvec_t*>& ls)
+void entity_addrvec_t::generate_test_instances(std::list<entity_addrvec_t*>& ls)
{
ls.push_back(new entity_addrvec_t());
ls.push_back(new entity_addrvec_t());
diff --git a/src/msg/msg_types.h b/src/msg/msg_types.h
index 3016d2c53fc..3caf83f24e5 100644
--- a/src/msg/msg_types.h
+++ b/src/msg/msg_types.h
@@ -523,13 +523,13 @@ struct entity_addr_t {
#endif
uint16_t ss_family;
if (elen < sizeof(ss_family)) {
- throw buffer::malformed_input("elen smaller than family len");
+ throw ceph::buffer::malformed_input("elen smaller than family len");
}
decode(ss_family, bl);
u.sa.sa_family = ss_family;
elen -= sizeof(ss_family);
if (elen > get_sockaddr_len() - sizeof(u.sa.sa_family)) {
- throw buffer::malformed_input("elen exceeds sockaddr len");
+ throw ceph::buffer::malformed_input("elen exceeds sockaddr len");
}
bl.copy(elen, u.sa.sa_data);
}