diff options
author | Adam C. Emerson <aemerson@redhat.com> | 2020-03-07 10:29:23 +0100 |
---|---|---|
committer | Adam C. Emerson <aemerson@redhat.com> | 2020-03-07 10:29:23 +0100 |
commit | ed3ec4c01d1715f874f773617601e925c94e8320 (patch) | |
tree | 1feb74eabc12d9144b98754cb6caa31e2d5d5f2c | |
parent | osd: Build target 'common' without using namespace in headers (diff) | |
download | ceph-ed3ec4c01d1715f874f773617601e925c94e8320.tar.xz ceph-ed3ec4c01d1715f874f773617601e925c94e8320.zip |
msg: Build target 'common' without using namespace in headers
Part of a changeset to allow building all of 'common' without relying
on 'using namespace std' or 'using namespace ceph' at toplevel in
headers.
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
36 files changed, 259 insertions, 247 deletions
diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 5a081591a59..b8ed6f7efe8 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -20,6 +20,8 @@ #define dout_subsys ceph_subsys_ms #include "common/debug.h" +using ceph::cref_t; +using ceph::ref_t; /******************* * DispatchQueue @@ -213,11 +215,9 @@ void DispatchQueue::entry() void DispatchQueue::discard_queue(uint64_t id) { std::lock_guard l{lock}; - list<QueueItem> removed; + std::list<QueueItem> removed; mqueue.remove_by_class(id, &removed); - for (list<QueueItem>::iterator i = removed.begin(); - i != removed.end(); - ++i) { + for (auto i = removed.begin(); i != removed.end(); ++i) { ceph_assert(!(i->is_code())); // We don't discard id 0, ever! const ref_t<Message>& m = i->get_message(); remove_arrival(m); diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 243de2cba02..de0cb7d1a08 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -41,9 +41,9 @@ class DispatchQueue { class QueueItem { int type; ConnectionRef con; - ref_t<Message> m; + ceph::ref_t<Message> m; public: - explicit QueueItem(const ref_t<Message>& m) : type(-1), con(0), m(m) {} + explicit QueueItem(const ceph::ref_t<Message>& m) : type(-1), con(0), m(m) {} QueueItem(int type, Connection *con) : type(type), con(con), m(0) {} bool is_code() const { return type != -1; @@ -52,7 +52,7 @@ class DispatchQueue { ceph_assert(is_code()); return type; } - const ref_t<Message>& get_message() { + const ceph::ref_t<Message>& get_message() { ceph_assert(!is_code()); return m; } @@ -61,7 +61,7 @@ class DispatchQueue { return con.get(); } }; - + CephContext *cct; Messenger *msgr; mutable ceph::mutex lock; @@ -69,17 +69,17 @@ class DispatchQueue { PrioritizedQueue<QueueItem, uint64_t> mqueue; - std::set<pair<double, ref_t<Message>>> marrival; - map<ref_t<Message>, decltype(marrival)::iterator> marrival_map; - void add_arrival(const ref_t<Message>& m) { + std::set<std::pair<double, ceph::ref_t<Message>>> marrival; + std::map<ceph::ref_t<Message>, decltype(marrival)::iterator> marrival_map; + void add_arrival(const ceph::ref_t<Message>& m) { marrival_map.insert( make_pair( m, - marrival.insert(make_pair(m->get_recv_stamp(), m)).first + marrival.insert(std::make_pair(m->get_recv_stamp(), m)).first ) ); } - void remove_arrival(const ref_t<Message>& m) { + void remove_arrival(const ceph::ref_t<Message>& m) { auto it = marrival_map.find(m); ceph_assert(it != marrival_map.end()); marrival.erase(it->second); @@ -87,7 +87,7 @@ class DispatchQueue { } std::atomic<uint64_t> next_id; - + enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES }; /** @@ -106,7 +106,7 @@ class DispatchQueue { ceph::mutex local_delivery_lock; ceph::condition_variable local_delivery_cond; bool stop_local_delivery; - std::queue<pair<ref_t<Message>, int>> local_messages; + std::queue<std::pair<ceph::ref_t<Message>, int>> local_messages; class LocalDeliveryThread : public Thread { DispatchQueue *dq; public: @@ -117,8 +117,8 @@ class DispatchQueue { } } local_delivery_thread; - uint64_t pre_dispatch(const ref_t<Message>& m); - void post_dispatch(const ref_t<Message>& m, uint64_t msize); + uint64_t pre_dispatch(const ceph::ref_t<Message>& m); + void post_dispatch(const ceph::ref_t<Message>& m, uint64_t msize); public: @@ -126,9 +126,9 @@ class DispatchQueue { Throttle dispatch_throttler; bool stop; - void local_delivery(const ref_t<Message>& m, int priority); + void local_delivery(const ceph::ref_t<Message>& m, int priority); void local_delivery(Message* m, int priority) { - return local_delivery(ref_t<Message>(m, false), priority); /* consume ref */ + return local_delivery(ceph::ref_t<Message>(m, false), priority); /* consume ref */ } void run_local_delivery(); @@ -197,15 +197,15 @@ class DispatchQueue { cond.notify_all(); } - bool can_fast_dispatch(const cref_t<Message> &m) const; - void fast_dispatch(const ref_t<Message>& m); + bool can_fast_dispatch(const ceph::cref_t<Message> &m) const; + void fast_dispatch(const ceph::ref_t<Message>& m); void fast_dispatch(Message* m) { - return fast_dispatch(ref_t<Message>(m, false)); /* consume ref */ + return fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */ } - void fast_preprocess(const ref_t<Message>& m); - void enqueue(const ref_t<Message>& m, int priority, uint64_t id); + void fast_preprocess(const ceph::ref_t<Message>& m); + void enqueue(const ceph::ref_t<Message>& m, int priority, uint64_t id); void enqueue(Message* m, int priority, uint64_t id) { - return enqueue(ref_t<Message>(m, false), priority, id); /* consume ref */ + return enqueue(ceph::ref_t<Message>(m, false), priority, id); /* consume ref */ } void discard_queue(uint64_t id); void discard_local(); @@ -218,7 +218,7 @@ class DispatchQueue { void shutdown(); bool is_started() const {return dispatch_thread.is_started();} - DispatchQueue(CephContext *cct, Messenger *msgr, string &name) + DispatchQueue(CephContext *cct, Messenger *msgr, std::string &name) : cct(cct), msgr(msgr), lock(ceph::make_mutex("Messenger::DispatchQueue::lock" + name)), mqueue(cct->_conf->ms_pq_max_tokens_per_priority, @@ -228,7 +228,7 @@ class DispatchQueue { local_delivery_lock(ceph::make_mutex("Messenger::DispatchQueue::local_delivery_lock" + name)), stop_local_delivery(false), local_delivery_thread(this), - dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name, + dispatch_throttler(cct, std::string("msgr_dispatch_throttler-") + name, cct->_conf->ms_dispatch_throttle_bytes), stop(false) {} diff --git a/src/msg/Message.cc b/src/msg/Message.cc index c57bc3bc1bd..27ef95d9db0 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -288,9 +288,9 @@ void Message::encode(uint64_t features, int crcflags, bool skip_header_crc) } } -void Message::dump(Formatter *f) const +void Message::dump(ceph::Formatter *f) const { - stringstream ss; + std::stringstream ss; print(ss); f->dump_string("summary", ss.str()); } @@ -347,12 +347,14 @@ Message *decode_message(CephContext *cct, } // make message - ref_t<Message> m; + ceph::ref_t<Message> m; int type = header.type; switch (type) { // -- with payload -- + using ceph::make_message; + case MSG_PGSTATS: m = make_message<MPGStats>(); break; @@ -929,7 +931,7 @@ Message *decode_message(CephContext *cct, try { m->decode_payload(); } - catch (const buffer::error &e) { + catch (const ceph::buffer::error &e) { if (cct) { lderr(cct) << "failed to decode message of type " << type << " v" << header.version @@ -948,7 +950,7 @@ Message *decode_message(CephContext *cct, return m.detach(); } -void Message::encode_trace(bufferlist &bl, uint64_t features) const +void Message::encode_trace(ceph::bufferlist &bl, uint64_t features) const { using ceph::encode; auto p = trace.get_info(); @@ -959,7 +961,7 @@ void Message::encode_trace(bufferlist &bl, uint64_t features) const encode(*p, bl); } -void Message::decode_trace(bufferlist::const_iterator &p, bool create) +void Message::decode_trace(ceph::bufferlist::const_iterator &p, bool create) { blkin_trace_info info = {}; decode(info, p); @@ -992,7 +994,7 @@ void Message::decode_trace(bufferlist::const_iterator &p, bool create) // problems, we currently always encode and decode using the old footer format that doesn't // allow for message authentication. Eventually we should fix that. PLR -void encode_message(Message *msg, uint64_t features, bufferlist& payload) +void encode_message(Message *msg, uint64_t features, ceph::bufferlist& payload) { ceph_msg_footer_old old_footer; msg->encode(features, MSG_CRC_ALL); @@ -1006,6 +1008,7 @@ void encode_message(Message *msg, uint64_t features, bufferlist& payload) old_footer.flags = footer.flags; encode(old_footer, payload); + using ceph::encode; encode(msg->get_payload(), payload); encode(msg->get_middle(), payload); encode(msg->get_data(), payload); @@ -1016,12 +1019,12 @@ void encode_message(Message *msg, uint64_t features, bufferlist& payload) // We've slipped in a 0 signature at this point, so any signature checking after this will // fail. PLR -Message *decode_message(CephContext *cct, int crcflags, bufferlist::const_iterator& p) +Message *decode_message(CephContext *cct, int crcflags, ceph::bufferlist::const_iterator& p) { ceph_msg_header h; ceph_msg_footer_old fo; ceph_msg_footer f; - bufferlist fr, mi, da; + ceph::bufferlist fr, mi, da; decode(h, p); decode(fo, p); f.front_crc = fo.front_crc; @@ -1029,6 +1032,7 @@ Message *decode_message(CephContext *cct, int crcflags, bufferlist::const_iterat f.data_crc = fo.data_crc; f.flags = fo.flags; f.sig = 0; + using ceph::decode; decode(fr, p); decode(mi, p); decode(da, p); diff --git a/src/msg/Messenger.cc b/src/msg/Messenger.cc index 4e2ea96ebe2..464ba394666 100644 --- a/src/msg/Messenger.cc +++ b/src/msg/Messenger.cc @@ -10,7 +10,7 @@ #include "msg/async/AsyncMessenger.h" -Messenger *Messenger::create_client_messenger(CephContext *cct, string lname) +Messenger *Messenger::create_client_messenger(CephContext *cct, std::string lname) { std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type; auto nonce = get_random_nonce(); @@ -33,8 +33,8 @@ uint64_t Messenger::get_random_nonce() return ceph::util::generate_random_number<uint64_t>(); } -Messenger *Messenger::create(CephContext *cct, const string &type, - entity_name_t name, string lname, +Messenger *Messenger::create(CephContext *cct, const std::string &type, + entity_name_t name, std::string lname, uint64_t nonce, uint64_t cflags) { int r = -1; diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index c00374c114d..5ed44407c17 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -648,7 +648,7 @@ public: * * @param m The Message we are testing. */ - bool ms_can_fast_dispatch(const cref_t<Message>& m) { + bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) { for (const auto &dispatcher : fast_dispatchers) { if (dispatcher->ms_can_fast_dispatch2(m)) return true; @@ -662,7 +662,7 @@ public: * @param m The Message we are fast dispatching. * If none of our Dispatchers can handle it, ceph_abort(). */ - void ms_fast_dispatch(const ref_t<Message> &m) { + void ms_fast_dispatch(const ceph::ref_t<Message> &m) { m->set_dispatch_stamp(ceph_clock_now()); for (const auto &dispatcher : fast_dispatchers) { if (dispatcher->ms_can_fast_dispatch2(m)) { @@ -673,12 +673,12 @@ public: ceph_abort(); } void ms_fast_dispatch(Message *m) { - return ms_fast_dispatch(ref_t<Message>(m, false)); /* consume ref */ + return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */ } /** * */ - void ms_fast_preprocess(const ref_t<Message> &m) { + void ms_fast_preprocess(const ceph::ref_t<Message> &m) { for (const auto &dispatcher : fast_dispatchers) { dispatcher->ms_fast_preprocess2(m); } @@ -690,7 +690,7 @@ public: * * @param m The Message to deliver. */ - void ms_deliver_dispatch(const ref_t<Message> &m) { + void ms_deliver_dispatch(const ceph::ref_t<Message> &m) { m->set_dispatch_stamp(ceph_clock_now()); for (const auto &dispatcher : dispatchers) { if (dispatcher->ms_dispatch2(m)) @@ -701,7 +701,7 @@ public: ceph_assert(!cct->_conf->ms_die_on_unhandled_msg); } void ms_deliver_dispatch(Message *m) { - return ms_deliver_dispatch(ref_t<Message>(m, false)); /* consume ref */ + return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */ } /** * Notify each Dispatcher of a new Connection. Call diff --git a/src/msg/QueueStrategy.cc b/src/msg/QueueStrategy.cc index 85b0a11e602..342494c5a7a 100644 --- a/src/msg/QueueStrategy.cc +++ b/src/msg/QueueStrategy.cc @@ -44,11 +44,11 @@ void QueueStrategy::ds_dispatch(Message *m) { void QueueStrategy::entry(QSThread *thrd) { for (;;) { - ref_t<Message> m; + ceph::ref_t<Message> m; std::unique_lock l{lock}; for (;;) { if (! mqueue.empty()) { - m = ref_t<Message>(&mqueue.front(), false); + m = ceph::ref_t<Message>(&mqueue.front(), false); mqueue.pop_front(); break; } @@ -98,7 +98,7 @@ void QueueStrategy::start() std::lock_guard l{lock}; threads.reserve(n_threads); for (int ix = 0; ix < n_threads; ++ix) { - string thread_name = "ms_qs_"; + std::string thread_name = "ms_qs_"; thread_name.append(std::to_string(ix)); auto thrd = std::make_unique<QSThread>(this); thrd->create(thread_name.c_str()); diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 82ef1389734..b0adff91946 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -35,7 +35,7 @@ #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix _conn_prefix(_dout) -ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { +std::ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { return *_dout << "-- " << async_msgr->get_myaddrs() << " >> " << *peer_addrs << " conn(" << this << (msgr2 ? " msgr2=" : " legacy=") @@ -168,8 +168,8 @@ void AsyncConnection::maybe_start_delay_thread() if (!delay_state) { async_msgr->cct->_conf.with_val<std::string>( "ms_inject_delay_type", - [this](const string& s) { - if (s.find(ceph_entity_type_name(peer_type)) != string::npos) { + [this](const std::string& s) { + if (s.find(ceph_entity_type_name(peer_type)) != std::string::npos) { ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl; delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, @@ -300,7 +300,7 @@ ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) return nread; } -ssize_t AsyncConnection::write(bufferlist &bl, +ssize_t AsyncConnection::write(ceph::buffer::list &bl, std::function<void(ssize_t)> callback, bool more) { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 20d095dfafb..122a6c4089e 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -56,7 +56,7 @@ class AsyncConnection : public Connection { ssize_t read_until(unsigned needed, char *p); ssize_t read_bulk(char *buf, unsigned len); - ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback, + ssize_t write(ceph::buffer::list &bl, std::function<void(ssize_t)> callback, bool more=false); ssize_t _try_send(bool more=false); @@ -114,7 +114,7 @@ private: public: void maybe_start_delay_thread(); - ostream& _conn_prefix(std::ostream *_dout); + std::ostream& _conn_prefix(std::ostream *_dout); bool is_connected() override; @@ -182,7 +182,7 @@ private: DispatchQueue *dispatch_queue; // lockfree, only used in own thread - bufferlist outgoing_bl; + ceph::buffer::list outgoing_bl; bool open_write = false; std::mutex write_lock; @@ -197,7 +197,7 @@ private: uint32_t recv_max_prefetch; uint32_t recv_start; uint32_t recv_end; - set<uint64_t> register_time_events; // need to delete it if stop + std::set<uint64_t> register_time_events; // need to delete it if stop ceph::coarse_mono_clock::time_point last_connect_started; ceph::coarse_mono_clock::time_point last_active; ceph::mono_clock::time_point recv_start_time; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 4f1224284f2..6e989bca507 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -32,11 +32,11 @@ #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix _prefix(_dout, this) -static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { +static std::ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { return *_dout << "-- " << m->get_myaddrs() << " "; } -static ostream& _prefix(std::ostream *_dout, Processor *p) { +static std::ostream& _prefix(std::ostream *_dout, Processor *p) { return *_dout << " Processor -- "; } @@ -60,7 +60,7 @@ Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c) listen_handler(new C_processor_accept(this)) {} int Processor::bind(const entity_addrvec_t &bind_addrs, - const set<int>& avoid_ports, + const std::set<int>& avoid_ports, entity_addrvec_t* bound_addrs) { const auto& conf = msgr->cct->_conf; @@ -278,7 +278,7 @@ class C_handle_reap : public EventCallback { */ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, - const std::string &type, string mname, uint64_t _nonce) + const std::string &type, std::string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name), dispatch_queue(cct, this, mname), nonce(_nonce) @@ -397,7 +397,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) lock.unlock(); // bind to a socket - set<int> avoid_ports; + std::set<int> avoid_ports; entity_addrvec_t bound_addrs; unsigned i = 0; for (auto &&p : processors) { @@ -421,7 +421,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) return 0; } -int AsyncMessenger::rebind(const set<int>& avoid_ports) +int AsyncMessenger::rebind(const std::set<int>& avoid_ports) { ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; ceph_assert(did_bind); @@ -437,7 +437,7 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports) entity_addrvec_t bound_addrs; entity_addrvec_t bind_addrs = get_myaddrs(); - set<int> new_avoid(avoid_ports); + std::set<int> new_avoid(avoid_ports); for (auto& a : bind_addrs.v) { new_avoid.insert(a.get_port()); a.set_port(0); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 6e18281f1b9..83a9c5a9b77 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -44,9 +44,9 @@ class AsyncMessenger; */ class Processor { AsyncMessenger *msgr; - NetHandler net; + ceph::NetHandler net; Worker *worker; - vector<ServerSocket> listen_sockets; + std::vector<ServerSocket> listen_sockets; EventCallbackRef listen_handler; class C_processor_accept; @@ -57,7 +57,7 @@ class Processor { void stop(); int bind(const entity_addrvec_t &bind_addrs, - const set<int>& avoid_ports, + const std::set<int>& avoid_ports, entity_addrvec_t* bound_addrs); void start(); void accept(); @@ -82,7 +82,7 @@ public: * be a value that will be repeated if the daemon restarts. */ AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, - string mname, uint64_t _nonce); + std::string mname, uint64_t _nonce); /** * Destroy the AsyncMessenger. Pretty simple since all the work is done @@ -115,7 +115,7 @@ public: } int bind(const entity_addr_t& bind_addr) override; - int rebind(const set<int>& avoid_ports) override; + int rebind(const std::set<int>& avoid_ports) override; int client_bind(const entity_addr_t& bind_addr) override; int bindv(const entity_addrvec_t& bind_addrs) override; @@ -268,10 +268,10 @@ private: * * These are not yet in the conns map. */ - set<AsyncConnectionRef> accepting_conns; + std::set<AsyncConnectionRef> accepting_conns; /// anonymous outgoing connections - set<AsyncConnectionRef> anon_conns; + std::set<AsyncConnectionRef> anon_conns; /** * list of connection are closed which need to be clean up @@ -285,7 +285,7 @@ private: * AsyncConnection in this set. */ ceph::mutex deleted_lock = ceph::make_mutex("AsyncMessenger::deleted_lock"); - set<AsyncConnectionRef> deleted_conns; + std::set<AsyncConnectionRef> deleted_conns; EventCallbackRef reap_handler; diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 4d2fb339416..b30a03e8a4a 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -69,7 +69,7 @@ class C_handle_notify : public EventCallback { * about the poller. The name of the superclass is probably sufficient * for most cases. */ -EventCenter::Poller::Poller(EventCenter* center, const string& name) +EventCenter::Poller::Poller(EventCenter* center, const std::string& name) : owner(center), poller_name(name), slot(owner->pollers.size()) { owner->pollers.push_back(this); @@ -94,7 +94,7 @@ EventCenter::Poller::~Poller() slot = -1; } -ostream& EventCenter::_event_prefix(std::ostream *_dout) +std::ostream& EventCenter::_event_prefix(std::ostream *_dout) { return *_dout << "Event(" << this << " nevent=" << nevent << " time_id=" << time_event_next_id << ")."; @@ -340,6 +340,7 @@ int EventCenter::process_time_events() { int processed = 0; clock_type::time_point now = clock_type::now(); + using ceph::operator <<; ldout(cct, 30) << __func__ << " cur time is " << now << dendl; while (!time_events.empty()) { @@ -388,7 +389,7 @@ int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan * tv.tv_usec = timeout_microseconds % 1000000; ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; - vector<FiredFileEvent> fired_events; + std::vector<FiredFileEvent> fired_events; numevents = driver->event_wait(fired_events, &tv); auto working_start = ceph::mono_clock::now(); for (int event_id = 0; event_id < numevents; event_id++) { @@ -422,7 +423,7 @@ int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan * if (external_num_events.load()) { external_lock.lock(); - deque<EventCallbackRef> cur_process; + std::deque<EventCallbackRef> cur_process; cur_process.swap(external_events); external_num_events.store(0); external_lock.unlock(); diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index d7dba443956..bb954357174 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -76,7 +76,7 @@ class EventDriver { virtual int init(EventCenter *center, int nevent) = 0; virtual int add_event(int fd, int cur_mask, int mask) = 0; virtual int del_event(int fd, int cur_mask, int del_mask) = 0; - virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0; + virtual int event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0; virtual int resize_events(int newsize) = 0; virtual bool need_wakeup() { return true; } }; @@ -121,7 +121,7 @@ class EventCenter { */ class Poller { public: - explicit Poller(EventCenter* center, const string& pollerName); + explicit Poller(EventCenter* center, const std::string& pollerName); virtual ~Poller(); /** @@ -142,7 +142,7 @@ class EventCenter { /// Human-readable string name given to the poller to make it /// easy to identify for debugging. For most pollers just passing /// in the subclass name probably makes sense. - string poller_name; + std::string poller_name; /// Index of this Poller in EventCenter::pollers. Allows deletion /// without having to scan all the entries in pollers. -1 means @@ -159,8 +159,8 @@ class EventCenter { pthread_t owner = 0; std::mutex external_lock; std::atomic_ulong external_num_events; - deque<EventCallbackRef> external_events; - vector<FileEvent> file_events; + std::deque<EventCallbackRef> external_events; + std::vector<FileEvent> file_events; EventDriver *driver; std::multimap<clock_type::time_point, TimeEvent> time_events; // Keeps track of all of the pollers currently defined. We don't @@ -171,7 +171,7 @@ class EventCenter { uint64_t time_event_next_id; int notify_receive_fd; int notify_send_fd; - NetHandler net; + ceph::NetHandler net; EventCallbackRef notify_handler; unsigned center_id; AssociatedCenters *global_centers = nullptr; @@ -190,7 +190,7 @@ class EventCenter { notify_receive_fd(-1), notify_send_fd(-1), net(c), notify_handler(NULL), center_id(0) { } ~EventCenter(); - ostream& _event_prefix(std::ostream *_dout); + std::ostream& _event_prefix(std::ostream *_dout); int init(int nevent, unsigned center_id, const std::string &type); void set_owner(); diff --git a/src/msg/async/EventEpoll.cc b/src/msg/async/EventEpoll.cc index 000aaf4fcbc..7ed5321dcda 100644 --- a/src/msg/async/EventEpoll.cc +++ b/src/msg/async/EventEpoll.cc @@ -116,7 +116,7 @@ int EpollDriver::resize_events(int newsize) return 0; } -int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp) +int EpollDriver::event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tvp) { int retval, numevents = 0; diff --git a/src/msg/async/EventEpoll.h b/src/msg/async/EventEpoll.h index 0221f90d34c..454ecbc34ff 100644 --- a/src/msg/async/EventEpoll.h +++ b/src/msg/async/EventEpoll.h @@ -42,7 +42,7 @@ class EpollDriver : public EventDriver { int add_event(int fd, int cur_mask, int add_mask) override; int del_event(int fd, int cur_mask, int del_mask) override; int resize_events(int newsize) override; - int event_wait(vector<FiredFileEvent> &fired_events, + int event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tp) override; }; diff --git a/src/msg/async/EventSelect.cc b/src/msg/async/EventSelect.cc index fdee6ebc3c8..8957792bb93 100644 --- a/src/msg/async/EventSelect.cc +++ b/src/msg/async/EventSelect.cc @@ -67,7 +67,7 @@ int SelectDriver::resize_events(int newsize) return 0; } -int SelectDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp) +int SelectDriver::event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tvp) { int retval, numevents = 0; diff --git a/src/msg/async/EventSelect.h b/src/msg/async/EventSelect.h index 1b75da0b1b0..08af57bcfd2 100644 --- a/src/msg/async/EventSelect.h +++ b/src/msg/async/EventSelect.h @@ -35,7 +35,7 @@ class SelectDriver : public EventDriver { int add_event(int fd, int cur_mask, int add_mask) override; int del_event(int fd, int cur_mask, int del_mask) override; int resize_events(int newsize) override; - int event_wait(vector<FiredFileEvent> &fired_events, + int event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tp) override; }; diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc index 0fc344c2ff4..a7a7fc41dd0 100644 --- a/src/msg/async/PosixStack.cc +++ b/src/msg/async/PosixStack.cc @@ -38,13 +38,14 @@ #define dout_prefix *_dout << "PosixStack " class PosixConnectedSocketImpl final : public ConnectedSocketImpl { - NetHandler &handler; + ceph::NetHandler &handler; int _fd; entity_addr_t sa; bool connected; public: - explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected) + explicit PosixConnectedSocketImpl(ceph::NetHandler &h, const entity_addr_t &sa, + int f, bool connected) : handler(h), _fd(f), sa(sa), connected(connected) {} int is_connected() override { @@ -106,7 +107,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { return (ssize_t)sent; } - ssize_t send(bufferlist &bl, bool more) override { + ssize_t send(ceph::buffer::list &bl, bool more) override { size_t sent_bytes = 0; auto pb = std::cbegin(bl.buffers()); uint64_t left_pbrs = bl.get_num_buffers(); @@ -138,7 +139,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { } if (sent_bytes) { - bufferlist swapped; + ceph::buffer::list swapped; if (sent_bytes < bl.length()) { bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); bl.swap(swapped); @@ -163,11 +164,11 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { }; class PosixServerSocketImpl : public ServerSocketImpl { - NetHandler &handler; + ceph::NetHandler &handler; int _fd; public: - explicit PosixServerSocketImpl(NetHandler &h, int f, + explicit PosixServerSocketImpl(ceph::NetHandler &h, int f, const entity_addr_t& listen_addr, unsigned slot) : ServerSocketImpl(listen_addr.get_type(), slot), handler(h), _fd(f) {} @@ -281,7 +282,7 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C return 0; } -PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t) +PosixNetworkStack::PosixNetworkStack(CephContext *c, const std::string &t) : NetworkStack(c, t) { } diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h index f1aaccd4b82..4aed9dd6444 100644 --- a/src/msg/async/PosixStack.h +++ b/src/msg/async/PosixStack.h @@ -25,7 +25,7 @@ #include "Stack.h" class PosixWorker : public Worker { - NetHandler net; + ceph::NetHandler net; void initialize() override; public: PosixWorker(CephContext *c, unsigned i) @@ -38,10 +38,10 @@ class PosixWorker : public Worker { }; class PosixNetworkStack : public NetworkStack { - vector<std::thread> threads; + std::vector<std::thread> threads; public: - explicit PosixNetworkStack(CephContext *c, const string &t); + explicit PosixNetworkStack(CephContext *c, const std::string &t); void spawn_worker(unsigned i, std::function<void ()> &&func) override { threads.resize(i+1); diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h index cccba183567..10436307ebf 100644 --- a/src/msg/async/Protocol.h +++ b/src/msg/async/Protocol.h @@ -47,7 +47,7 @@ public: }; using rx_buffer_t = - std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>; + std::unique_ptr<ceph::buffer::ptr_node, ceph::buffer::ptr_node::disposer>; template <class C> class CtRxNode : public Ct<C> { diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index cae647edc93..75b0b8ed67b 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -15,7 +15,7 @@ #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix _conn_prefix(_dout) -ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) { +std::ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) { return *_dout << "--1- " << messenger->get_myaddrs() << " >> " << *connection->peer_addrs << " conn(" @@ -39,7 +39,7 @@ const int ASYNC_COALESCE_THRESHOLD = 256; using namespace std; -static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { +static void alloc_aligned_buffer(ceph::buffer::list &data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment unsigned alloc_len = 0; unsigned left = len; @@ -51,7 +51,7 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { left -= head; } alloc_len += left; - bufferptr ptr(buffer::create_small_page_aligned(alloc_len)); + ceph::bufferptr ptr(ceph::buffer::create_small_page_aligned(alloc_len)); if (head) ptr.set_offset(CEPH_PAGE_SIZE - head); data.push_back(std::move(ptr)); } @@ -211,7 +211,7 @@ void ProtocolV1::fault() { } void ProtocolV1::send_message(Message *m) { - bufferlist bl; + ceph::buffer::list bl; uint64_t f = connection->get_features(); // TODO: Currently not all messages supports reencode like MOSDMap, so here @@ -249,7 +249,7 @@ void ProtocolV1::send_message(Message *m) { } void ProtocolV1::prepare_send_message(uint64_t features, Message *m, - bufferlist &bl) { + ceph::buffer::list &bl) { ldout(cct, 20) << __func__ << " m " << *m << dendl; // associate message with Connection (for benefit of encode_payload) @@ -315,7 +315,7 @@ void ProtocolV1::write_event() { auto start = ceph::mono_clock::now(); bool more; do { - bufferlist data; + ceph::buffer::list data; Message *m = _get_next_outgoing(&data); if (!m) { break; @@ -436,7 +436,7 @@ CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next, } CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next, - bufferlist &buffer) { + ceph::buffer::list &buffer) { ssize_t r = connection->write(buffer, [&next, this](int r) { next.setParams(r); CONTINUATION_RUN(next); @@ -749,7 +749,7 @@ CtPtr ProtocolV1::read_message_front() { unsigned front_len = current_header.front_len; if (front_len) { if (!front.length()) { - front.push_back(buffer::create(front_len)); + front.push_back(ceph::buffer::create(front_len)); } return READB(front_len, front.c_str(), handle_message_front); } @@ -774,7 +774,7 @@ CtPtr ProtocolV1::read_message_middle() { if (current_header.middle_len) { if (!middle.length()) { - middle.push_back(buffer::create(current_header.middle_len)); + middle.push_back(ceph::buffer::create(current_header.middle_len)); } return READB(current_header.middle_len, middle.c_str(), handle_message_middle); @@ -807,7 +807,7 @@ CtPtr ProtocolV1::read_message_data_prepare() { #if 0 // rx_buffers is broken by design... see // http://tracker.ceph.com/issues/22480 - map<ceph_tid_t, pair<bufferlist, int> >::iterator p = + map<ceph_tid_t, pair<ceph::buffer::list, int> >::iterator p = connection->rx_buffers.find(current_header.tid); if (p != connection->rx_buffers.end()) { ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second @@ -841,7 +841,7 @@ CtPtr ProtocolV1::read_message_data() { ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl; if (msg_left > 0) { - bufferptr bp = data_blp.get_current_ptr(); + auto bp = data_blp.get_current_ptr(); unsigned read_len = std::min(bp.length(), msg_left); return READB(read_len, bp.c_str(), handle_message_data); @@ -858,7 +858,7 @@ CtPtr ProtocolV1::handle_message_data(char *buffer, int r) { return _fault(); } - bufferptr bp = data_blp.get_current_ptr(); + auto bp = data_blp.get_current_ptr(); unsigned read_len = std::min(bp.length(), msg_left); ceph_assert(read_len < static_cast<unsigned>(std::numeric_limits<int>::max())); @@ -1096,7 +1096,7 @@ void ProtocolV1::randomize_out_seq() { } } -ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { +ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) { FUNCTRACE(cct); ceph_assert(connection->center->in_thread()); m->set_seq(++out_seq); @@ -1192,7 +1192,7 @@ void ProtocolV1::requeue_sent() { return; } - list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; out_seq -= sent.size(); while (!sent.empty()) { Message *m = sent.back(); @@ -1200,7 +1200,7 @@ void ProtocolV1::requeue_sent() { ldout(cct, 10) << __func__ << " " << *m << " for resend " << " (" << m->get_seq() << ")" << dendl; m->clear_payload(); - rq.push_front(make_pair(bufferlist(), m)); + rq.push_front(make_pair(ceph::buffer::list(), m)); } } @@ -1210,10 +1210,10 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) { return seq; } - list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; uint64_t count = out_seq; while (!rq.empty()) { - pair<bufferlist, Message *> p = rq.front(); + pair<ceph::buffer::list, Message *> p = rq.front(); if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break; ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq() << " <= " << seq << ", discarding" @@ -1238,10 +1238,10 @@ void ProtocolV1::discard_out_queue() { (*p)->put(); } sent.clear(); - for (map<int, list<pair<bufferlist, Message *> > >::iterator p = + for (map<int, list<pair<ceph::buffer::list, Message *> > >::iterator p = out_q.begin(); p != out_q.end(); ++p) { - for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin(); + for (list<pair<ceph::buffer::list, Message *> >::iterator r = p->second.begin(); r != p->second.end(); ++r) { ldout(cct, 20) << __func__ << " discard " << r->second << dendl; r->second->put(); @@ -1304,13 +1304,13 @@ void ProtocolV1::reset_recv_state() } } -Message *ProtocolV1::_get_next_outgoing(bufferlist *bl) { +Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) { Message *m = 0; if (!out_q.empty()) { - map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it = + map<int, list<pair<ceph::buffer::list, Message *> > >::reverse_iterator it = out_q.rbegin(); ceph_assert(!it->second.empty()); - list<pair<bufferlist, Message *> >::iterator p = it->second.begin(); + list<pair<ceph::buffer::list, Message *> >::iterator p = it->second.begin(); m = p->second; if (p->first.length() && bl) { assert(bl->length() == 0); @@ -1330,7 +1330,7 @@ CtPtr ProtocolV1::send_client_banner() { ldout(cct, 20) << __func__ << dendl; state = CONNECTING; - bufferlist bl; + ceph::buffer::list bl; bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); return WRITE(bl, handle_client_banner_write); } @@ -1353,7 +1353,7 @@ CtPtr ProtocolV1::wait_server_banner() { ldout(cct, 20) << __func__ << dendl; - bufferlist myaddrbl; + ceph::buffer::list myaddrbl; unsigned banner_len = strlen(CEPH_BANNER); unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2; return READ(need_len, handle_server_banner_and_identify); @@ -1375,7 +1375,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) { return _fault(); } - bufferlist bl; + ceph::buffer::list bl; entity_addr_t paddr, peer_addr_for_me; bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2); @@ -1383,7 +1383,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) { try { decode(paddr, p); decode(peer_addr_for_me, p); - } catch (const buffer::error &e) { + } catch (const ceph::buffer::error &e) { lderr(cct) << __func__ << " decode peer addr failed " << dendl; return _fault(); } @@ -1446,7 +1446,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) { } } - bufferlist myaddrbl; + ceph::buffer::list myaddrbl; encode(messenger->get_myaddr_legacy(), myaddrbl, 0); // legacy return WRITE(myaddrbl, handle_my_addr_write); } @@ -1472,7 +1472,7 @@ CtPtr ProtocolV1::send_connect_message() ldout(cct, 20) << __func__ << dendl; ceph_assert(messenger->auth_client); - bufferlist auth_bl; + ceph::buffer::list auth_bl; vector<uint32_t> preferred_modes; if (connection->peer_type != CEPH_ENTITY_TYPE_MON || @@ -1523,7 +1523,7 @@ CtPtr ProtocolV1::send_connect_message() CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! } - bufferlist bl; + ceph::buffer::list bl; bl.append((char *)&connect, sizeof(connect)); if (auth_bl.length()) { bl.append(auth_bl.c_str(), auth_bl.length()); @@ -1605,14 +1605,14 @@ CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) { return _fault(); } - bufferlist authorizer_reply; + ceph::buffer::list authorizer_reply; authorizer_reply.append(buffer, connect_reply.authorizer_len); if (connection->peer_type != CEPH_ENTITY_TYPE_MON || messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) { auto am = auth_meta; bool more = (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER); - bufferlist auth_retry_bl; + ceph::buffer::list auth_retry_bl; int r; connection->lock.unlock(); if (more) { @@ -1750,7 +1750,7 @@ CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) { << " vs out_seq " << out_seq << dendl; out_seq = discard_requeued_up_to(out_seq, newly_acked_seq); - bufferlist bl; + ceph::buffer::list bl; uint64_t s = in_seq; bl.append((char *)&s, sizeof(s)); @@ -1821,7 +1821,7 @@ CtPtr ProtocolV1::send_server_banner() { ldout(cct, 20) << __func__ << dendl; state = ACCEPTING; - bufferlist bl; + ceph::buffer::list bl; bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); @@ -1874,14 +1874,14 @@ CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) { return _fault(); } - bufferlist addr_bl; + ceph::buffer::list addr_bl; entity_addr_t peer_addr; addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr)); try { auto ti = addr_bl.cbegin(); decode(peer_addr, ti); - } catch (const buffer::error &e) { + } catch (const ceph::buffer::error &e) { lderr(cct) << __func__ << " decode peer_addr failed " << dendl; return _fault(); } @@ -1932,7 +1932,7 @@ CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) { CtPtr ProtocolV1::wait_connect_message_auth() { ldout(cct, 20) << __func__ << dendl; authorizer_buf.clear(); - authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len)); + authorizer_buf.push_back(ceph::buffer::create(connect_msg.authorizer_len)); return READB(connect_msg.authorizer_len, authorizer_buf.c_str(), handle_connect_message_auth); } @@ -1968,7 +1968,7 @@ CtPtr ProtocolV1::handle_connect_message_2() { << dendl; ceph_msg_connect_reply reply; - bufferlist authorizer_reply; + ceph::buffer::list authorizer_reply; // FIPS zeroization audit 20191115: this memset is not security related. memset(&reply, 0, sizeof(reply)); @@ -2017,7 +2017,7 @@ CtPtr ProtocolV1::handle_connect_message_2() { authorizer_reply); } - bufferlist auth_bl_copy = authorizer_buf; + ceph::buffer::list auth_bl_copy = authorizer_buf; auto am = auth_meta; am->auth_method = connect_msg.authorizer_protocol; connection->lock.unlock(); @@ -2265,9 +2265,9 @@ CtPtr ProtocolV1::handle_connect_message_2() { CtPtr ProtocolV1::send_connect_message_reply(char tag, ceph_msg_connect_reply &reply, - bufferlist &authorizer_reply) { + ceph::buffer::list &authorizer_reply) { ldout(cct, 20) << __func__ << dendl; - bufferlist reply_bl; + ceph::buffer::list reply_bl; reply.tag = tag; reply.features = ((uint64_t)connect_msg.features & connection->policy.features_supported) | @@ -2305,7 +2305,7 @@ CtPtr ProtocolV1::handle_connect_message_reply_write(int r) { CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply, - bufferlist &authorizer_reply) { + ceph::buffer::list &authorizer_reply) { ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl; connection->inject_delay(); @@ -2438,7 +2438,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing, } CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply, - bufferlist &authorizer_reply) { + ceph::buffer::list &authorizer_reply) { ldout(cct, 20) << __func__ << dendl; connect_seq = connect_msg.connect_seq + 1; @@ -2482,7 +2482,7 @@ CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply, auth_meta->session_key, connection->get_features())); - bufferlist reply_bl; + ceph::buffer::list reply_bl; reply_bl.append((char *)&reply, sizeof(reply)); if (reply.authorizer_len) { diff --git a/src/msg/async/ProtocolV1.h b/src/msg/async/ProtocolV1.h index 72b707fe2a6..b56a966dbd7 100644 --- a/src/msg/async/ProtocolV1.h +++ b/src/msg/async/ProtocolV1.h @@ -104,9 +104,9 @@ protected: enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED }; std::atomic<WriteStatus> can_write; - std::list<Message *> sent; // the first bufferlist need to inject seq + std::list<Message *> sent; // the first ceph::buffer::list need to inject seq // priority queue for outbound msgs - std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q; + std::map<int, std::list<std::pair<ceph::buffer::list, Message *>>> out_q; bool keepalive; bool write_in_progress = false; @@ -120,8 +120,8 @@ protected: // Open state ceph_msg_connect connect_msg; ceph_msg_connect_reply connect_reply; - bufferlist authorizer_buf; // auth(orizer) payload read off the wire - bufferlist authorizer_more; // connect-side auth retry (we added challenge) + ceph::buffer::list authorizer_buf; // auth(orizer) payload read off the wire + ceph::buffer::list authorizer_more; // connect-side auth retry (we added challenge) utime_t backoff; // backoff time utime_t recv_stamp; @@ -129,9 +129,9 @@ protected: unsigned msg_left; uint64_t cur_msg_size; ceph_msg_header current_header; - bufferlist data_buf; - bufferlist::iterator data_blp; - bufferlist front, middle, data; + ceph::buffer::list data_buf; + ceph::buffer::list::iterator data_blp; + ceph::buffer::list front, middle, data; bool replacing; // when replacing process happened, we will reply connect // side with RETRY tag and accept side will clear replaced @@ -147,7 +147,7 @@ protected: void run_continuation(CtPtr pcontinuation); CtPtr read(CONTINUATION_RX_TYPE<ProtocolV1> &next, int len, char *buffer = nullptr); - CtPtr write(CONTINUATION_TX_TYPE<ProtocolV1> &next,bufferlist &bl); + CtPtr write(CONTINUATION_TX_TYPE<ProtocolV1> &next,ceph::buffer::list &bl); inline CtPtr _fault() { // helper fault method that stops continuation fault(); return nullptr; @@ -194,10 +194,10 @@ protected: void session_reset(); void randomize_out_seq(); - Message *_get_next_outgoing(bufferlist *bl); + Message *_get_next_outgoing(ceph::buffer::list *bl); - void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); - ssize_t write_message(Message *m, bufferlist &bl, bool more); + void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl); + ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more); void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); @@ -205,7 +205,7 @@ protected: void reset_recv_state(); - ostream &_conn_prefix(std::ostream *_dout); + std::ostream& _conn_prefix(std::ostream *_dout); public: ProtocolV1(AsyncConnection *connection); @@ -281,11 +281,11 @@ protected: CtPtr handle_connect_message_auth(char *buffer, int r); CtPtr handle_connect_message_2(); CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply, - bufferlist &authorizer_reply); + ceph::buffer::list &authorizer_reply); CtPtr handle_connect_message_reply_write(int r); CtPtr replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply, - bufferlist &authorizer_reply); - CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply); + ceph::buffer::list &authorizer_reply); + CtPtr open(ceph_msg_connect_reply &reply, ceph::buffer::list &authorizer_reply); CtPtr handle_ready_connect_message_reply_write(int r); CtPtr wait_seq(); CtPtr handle_seq(char *buffer, int r); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index d9a5a72aecc..6164034445f 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -16,7 +16,7 @@ #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix _conn_prefix(_dout) -ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { +std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { return *_dout << "--2- " << messenger->get_myaddrs() << " >> " << *connection->peer_addrs << " conn(" << connection << " " << this @@ -43,7 +43,7 @@ void ProtocolV2::run_continuation(CtPtr pcontinuation) { void ProtocolV2::run_continuation(CtRef continuation) { try { CONTINUATION_RUN(continuation) - } catch (const buffer::error &e) { + } catch (const ceph::buffer::error &e) { lderr(cct) << __func__ << " failed decoding of frame header: " << e << dendl; _fault(); @@ -57,7 +57,7 @@ void ProtocolV2::run_continuation(CtRef continuation) { #define WRITE(B, D, C) write(D, CONTINUATION(C), B) -#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L))) +#define READ(L, C) read(CONTINUATION(C), ceph::buffer::ptr_node::create(ceph::buffer::create(L))) #define READ_RXBUF(B, C) read(CONTINUATION(C), B) @@ -119,7 +119,7 @@ bool ProtocolV2::is_connected() { return can_write; } void ProtocolV2::discard_out_queue() { ldout(cct, 10) << __func__ << " started" << dendl; - for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) { + for (auto p = sent.begin(); p != sent.end(); ++p) { ldout(cct, 20) << __func__ << " discard " << *p << dendl; (*p)->put(); } @@ -766,7 +766,7 @@ CtPtr ProtocolV2::write(const std::string &desc, CtPtr ProtocolV2::write(const std::string &desc, CONTINUATION_TYPE<ProtocolV2> &next, - bufferlist &buffer) { + ceph::bufferlist &buffer) { if (unlikely(pre_auth.enabled)) { pre_auth.txbuf.append(buffer); ceph_assert(!cct->_conf->ms_die_on_bug || @@ -800,11 +800,12 @@ CtPtr ProtocolV2::_banner_exchange(CtRef callback) { ldout(cct, 20) << __func__ << dendl; bannerExchangeCallback = &callback; - bufferlist banner_payload; + ceph::bufferlist banner_payload; + using ceph::encode; encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0); encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0); - bufferlist bl; + ceph::bufferlist bl; bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); encode((uint16_t)banner_payload.length(), bl, 0); bl.claim_append(banner_payload); @@ -841,14 +842,15 @@ CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) { } uint16_t payload_len; - bufferlist bl; + ceph::bufferlist bl; buffer->set_offset(banner_prefix_len); buffer->set_length(sizeof(ceph_le16)); bl.push_back(std::move(buffer)); auto ti = bl.cbegin(); + using ceph::decode; try { decode(payload_len, ti); - } catch (const buffer::error &e) { + } catch (const ceph::buffer::error &e) { lderr(cct) << __func__ << " decode banner payload len failed " << dendl; return _fault(); } @@ -870,13 +872,14 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) { uint64_t peer_supported_features; uint64_t peer_required_features; - bufferlist bl; + ceph::bufferlist bl; + using ceph::decode; bl.push_back(std::move(buffer)); auto ti = bl.cbegin(); try { decode(peer_supported_features, ti); decode(peer_required_features, ti); - } catch (const buffer::error &e) { + } catch (const ceph::buffer::error &e) { lderr(cct) << __func__ << " decode banner payload failed " << dendl; return _fault(); } @@ -1169,7 +1172,7 @@ CtPtr ProtocolV2::read_frame_segment() { const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size()); rx_buffer_t rx_buffer; try { - rx_buffer = buffer::ptr_node::create(buffer::create_aligned( + rx_buffer = ceph::buffer::ptr_node::create(ceph::buffer::create_aligned( get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment)); } catch (std::bad_alloc&) { // Catching because of potential issues with satisfying alignment. @@ -1736,8 +1739,8 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) { ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type << " auth_client " << messenger->auth_client << dendl; - bufferlist bl; - vector<uint32_t> preferred_modes; + ceph::bufferlist bl; + std::vector<uint32_t> preferred_modes; auto am = auth_meta; connection->lock.unlock(); int r = messenger->auth_client->get_auth_request( @@ -2194,12 +2197,12 @@ CtPtr ProtocolV2::_auth_bad_method(int r) return WRITE(bad_method, "bad auth method", read_frame); } -CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) +CtPtr ProtocolV2::_handle_auth_request(ceph::bufferlist& auth_payload, bool more) { if (!messenger->auth_server) { return _fault(); } - bufferlist reply; + ceph::bufferlist reply; auto am = auth_meta; connection->lock.unlock(); int r = messenger->auth_server->handle_auth_request( diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index e5544f98746..b747f401316 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -115,7 +115,7 @@ private: bool keepalive; bool write_in_progress = false; - ostream &_conn_prefix(std::ostream *_dout); + std::ostream& _conn_prefix(std::ostream *_dout); void run_continuation(Ct<ProtocolV2> *pcontinuation); void run_continuation(Ct<ProtocolV2> &continuation); @@ -127,7 +127,7 @@ private: F &frame); Ct<ProtocolV2> *write(const std::string &desc, CONTINUATION_TYPE<ProtocolV2> &next, - bufferlist &buffer); + ceph::bufferlist &buffer); void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); @@ -238,7 +238,7 @@ private: Ct<ProtocolV2> *post_server_banner_exchange(); Ct<ProtocolV2> *handle_auth_request(ceph::bufferlist &payload); Ct<ProtocolV2> *handle_auth_request_more(ceph::bufferlist &payload); - Ct<ProtocolV2> *_handle_auth_request(bufferlist& auth_payload, bool more); + Ct<ProtocolV2> *_handle_auth_request(ceph::bufferlist& auth_payload, bool more); Ct<ProtocolV2> *_auth_bad_method(int r); Ct<ProtocolV2> *handle_client_ident(ceph::bufferlist &payload); Ct<ProtocolV2> *handle_ident_missing_features_write(int r); diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 6b18d1de9cb..74d07829be9 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -63,7 +63,8 @@ std::function<void ()> NetworkStack::add_thread(unsigned worker_id) }; } -std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t) +std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, + const std::string &t) { if (t == "posix") return std::make_shared<PosixNetworkStack>(c, t); @@ -82,7 +83,7 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string return nullptr; } -Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned worker_id) +Worker* NetworkStack::create_worker(CephContext *c, const std::string &type, unsigned worker_id) { if (type == "posix") return new PosixWorker(c, worker_id); @@ -101,7 +102,7 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned return nullptr; } -NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) +NetworkStack::NetworkStack(CephContext *c, const std:: string &t): type(t), started(false), cct(c) { ceph_assert(cct->_conf->ms_async_op_threads > 0); diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index 37a33163d6b..7b8b62f36fb 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -28,7 +28,7 @@ class ConnectedSocketImpl { virtual ~ConnectedSocketImpl() {} virtual int is_connected() = 0; virtual ssize_t read(char*, size_t) = 0; - virtual ssize_t send(bufferlist &bl, bool more) = 0; + virtual ssize_t send(ceph::buffer::list &bl, bool more) = 0; virtual void shutdown() = 0; virtual void close() = 0; virtual int fd() const = 0; @@ -96,7 +96,7 @@ class ConnectedSocket { /// Gets the output stream. /// /// Gets an object that sends data to the remote endpoint. - ssize_t send(bufferlist &bl, bool more) { + ssize_t send(ceph::buffer::list &bl, bool more) { return _csi->send(bl, more); } /// Disables output to the socket. @@ -302,9 +302,9 @@ class NetworkStack { protected: CephContext *cct; - vector<Worker*> workers; + std::vector<Worker*> workers; - explicit NetworkStack(CephContext *c, const string &t); + explicit NetworkStack(CephContext *c, const std::string &t); public: NetworkStack(const NetworkStack &) = delete; NetworkStack& operator=(const NetworkStack &) = delete; @@ -314,10 +314,10 @@ class NetworkStack { } static std::shared_ptr<NetworkStack> create( - CephContext *c, const string &type); + CephContext *c, const std::string &type); static Worker* create_worker( - CephContext *c, const string &t, unsigned i); + CephContext *c, const std::string &t, unsigned i); // backend need to override this method if backend doesn't support shared // listen table. // For example, posix backend has in kernel global listen table. If one diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index ddc42a489cf..2f187ceb769 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -403,14 +403,14 @@ protected: struct AuthRequestFrame : public ControlFrame<AuthRequestFrame, uint32_t, // auth method - vector<uint32_t>, // preferred modes + std::vector<uint32_t>, // preferred modes bufferlist> { // auth payload static const Tag tag = Tag::AUTH_REQUEST; using ControlFrame::Encode; using ControlFrame::Decode; inline uint32_t &method() { return get_val<0>(); } - inline vector<uint32_t> &preferred_modes() { return get_val<1>(); } + inline std::vector<uint32_t> &preferred_modes() { return get_val<1>(); } inline bufferlist &auth_payload() { return get_val<2>(); } protected: diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index e2c06229f41..52323f948c6 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -579,7 +579,7 @@ int Infiniband::CompletionChannel::init() << cpp_strerror(errno) << dendl; return -1; } - int rc = NetHandler(cct).set_nonblock(channel->fd); + int rc = ceph::NetHandler(cct).set_nonblock(channel->fd); if (rc < 0) { ibv_destroy_comp_channel(channel); return -1; @@ -1062,7 +1062,7 @@ void Infiniband::init() device->binding_port(cct, port_num); ib_physical_port = device->active_port->get_port_num(); pd = new ProtectionDomain(cct, device); - ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); + ceph_assert(ceph::NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); support_srq = cct->_conf->ms_async_rdma_support_srq; if (support_srq) { diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 3af89f304fe..03e59e7ac17 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -549,7 +549,7 @@ class Infiniband { uint32_t max_recv_wr; uint32_t q_key; bool dead; - vector<Chunk*> recv_queue; + std::vector<Chunk*> recv_queue; ceph::mutex lock = ceph::make_mutex("queue_pair_lock"); }; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index c897f94f4d5..5e75e961d18 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -47,8 +47,8 @@ class C_handle_connection_read : public EventCallback { #undef dout_prefix #define dout_prefix *_dout << " RDMAConnectedSocketImpl " -RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, +RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband> &ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w) : cct(cct), connected(0), error(0), ib(ib), dispatcher(rdma_dispatcher), worker(w), @@ -129,7 +129,7 @@ int RDMAConnectedSocketImpl::activate() int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; - NetHandler net(cct); + ceph::NetHandler net(cct); // we construct a socket to transport ib sync message // but we shouldn't block in tcp connecting @@ -337,7 +337,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) return read_size; } -ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) +ssize_t RDMAConnectedSocketImpl::send(ceph::buffer::list &bl, bool more) { if (error) { if (!active) @@ -440,7 +440,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) if (total_copied == 0) return -EAGAIN; ceph_assert(total_copied <= pending_bl.length()); - bufferlist swapped; + ceph::buffer::list swapped; if (total_copied < pending_bl.length()) { worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped); @@ -463,7 +463,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers) { ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl; - vector<Chunk*>::iterator current_buffer = tx_buffers.begin(); + auto current_buffer = tx_buffers.begin(); ibv_sge isge[tx_buffers.size()]; uint32_t current_sge = 0; ibv_send_wr iswr[tx_buffers.size()]; diff --git a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc index d55ced3c53f..606dbd2817c 100644 --- a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc @@ -7,8 +7,8 @@ #define TIMEOUT_MS 3000 #define RETRY_COUNT 7 -RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, +RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info) : RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this)) { diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc index e4a170ee8be..0500b4420f9 100644 --- a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc @@ -8,8 +8,8 @@ #define dout_prefix *_dout << " RDMAIWARPServerSocketImpl " RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl( - CephContext *cct, shared_ptr<Infiniband>& ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, + CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot) : RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot) { diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index cc85832eddd..99d12131ffb 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -25,8 +25,8 @@ #define dout_prefix *_dout << " RDMAServerSocketImpl " RDMAServerSocketImpl::RDMAServerSocketImpl( - CephContext *cct, shared_ptr<Infiniband>& ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, + CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, entity_addr_t& a, unsigned slot) : ServerSocketImpl(a.get_type(), slot), cct(cct), net(cct), server_setup_socket(-1), ib(ib), diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index b68aeb1a8ef..5e88e7d01dd 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -40,7 +40,7 @@ RDMADispatcher::~RDMADispatcher() ceph_assert(dead_queue_pairs.empty()); } -RDMADispatcher::RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib) +RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib) : cct(c), ib(ib) { PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); @@ -772,9 +772,9 @@ void RDMAWorker::handle_pending_message() dispatcher->notify_pending_workers(); } -RDMAStack::RDMAStack(CephContext *cct, const string &t) - : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)), - rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib)) +RDMAStack::RDMAStack(CephContext *cct, const std::string &t) + : NetworkStack(cct, t), ib(std::make_shared<Infiniband>(cct)), + rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib)) { ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 45a043d2e23..c9772904ac2 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -40,7 +40,7 @@ class RDMADispatcher { std::thread t; CephContext *cct; - shared_ptr<Infiniband> ib; + std::shared_ptr<Infiniband> ib; Infiniband::CompletionQueue* tx_cq = nullptr; Infiniband::CompletionQueue* rx_cq = nullptr; Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr; @@ -81,7 +81,7 @@ class RDMADispatcher { public: PerfCounters *perf_logger; - explicit RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib); + explicit RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib); virtual ~RDMADispatcher(); void handle_async_event(); @@ -120,10 +120,10 @@ class RDMAWorker : public Worker { typedef Infiniband::MemoryManager::Chunk Chunk; typedef Infiniband::MemoryManager MemoryManager; typedef std::vector<Chunk*>::iterator ChunkIter; - shared_ptr<Infiniband> ib; + std::shared_ptr<Infiniband> ib; EventCallbackRef tx_handler; std::list<RDMAConnectedSocketImpl*> pending_sent_conns; - shared_ptr<RDMADispatcher> dispatcher; + std::shared_ptr<RDMADispatcher> dispatcher; ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock"); class C_handle_cq_tx : public EventCallback { @@ -150,8 +150,8 @@ class RDMAWorker : public Worker { pending_sent_conns.remove(o); } void handle_pending_message(); - void set_dispatcher(shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; } - void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;} + void set_dispatcher(std::shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; } + void set_ib(std::shared_ptr<Infiniband> &ib) {this->ib = ib;} void notify_worker() { center.dispatch_event_external(tx_handler); } @@ -178,12 +178,12 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { uint32_t local_qpn = 0; int connected; int error; - shared_ptr<Infiniband> ib; - shared_ptr<RDMADispatcher> dispatcher; + std::shared_ptr<Infiniband> ib; + std::shared_ptr<RDMADispatcher> dispatcher; RDMAWorker* worker; std::vector<Chunk*> buffers; int notify_fd = -1; - bufferlist pending_bl; + ceph::buffer::list pending_bl; ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock"); std::vector<ibv_wc> wc; @@ -204,8 +204,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { const decltype(std::cbegin(pending_bl.buffers()))& end); public: - RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w); + RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w); virtual ~RDMAConnectedSocketImpl(); void pass_wc(std::vector<ibv_wc> &&v); @@ -213,7 +213,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { virtual int is_connected() override { return connected; } virtual ssize_t read(char* buf, size_t len) override; - virtual ssize_t send(bufferlist &bl, bool more) override; + virtual ssize_t send(ceph::buffer::list &bl, bool more) override; virtual void shutdown() override; virtual void close() override; virtual int fd() const override { return notify_fd; } @@ -249,8 +249,9 @@ enum RDMA_CM_STATUS { class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl { public: - RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr); + RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAWorker *w, RDMACMInfo *info = nullptr); ~RDMAIWARPConnectedSocketImpl(); virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override; virtual void close() override; @@ -283,16 +284,16 @@ class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl { class RDMAServerSocketImpl : public ServerSocketImpl { protected: CephContext *cct; - NetHandler net; + ceph::NetHandler net; int server_setup_socket; - shared_ptr<Infiniband> ib; - shared_ptr<RDMADispatcher> dispatcher; + std::shared_ptr<Infiniband> ib; + std::shared_ptr<RDMADispatcher> dispatcher; RDMAWorker *worker; entity_addr_t sa; public: - RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAServerSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, entity_addr_t& a, unsigned slot); virtual int listen(entity_addr_t &sa, const SocketOptions &opt); @@ -304,8 +305,8 @@ class RDMAServerSocketImpl : public ServerSocketImpl { class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl { public: RDMAIWARPServerSocketImpl( - CephContext *cct, shared_ptr<Infiniband>& ib, - shared_ptr<RDMADispatcher>& rdma_dispatcher, + CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot); virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override; virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; @@ -316,15 +317,15 @@ class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl { }; class RDMAStack : public NetworkStack { - vector<std::thread> threads; + std::vector<std::thread> threads; PerfCounters *perf_counter; - shared_ptr<Infiniband> ib; - shared_ptr<RDMADispatcher> rdma_dispatcher; + std::shared_ptr<Infiniband> ib; + std::shared_ptr<RDMADispatcher> rdma_dispatcher; std::atomic<bool> fork_finished = {false}; public: - explicit RDMAStack(CephContext *cct, const string &t); + explicit RDMAStack(CephContext *cct, const std::string &t); virtual ~RDMAStack(); virtual bool nonblock_connect_need_writable_event() const override { return false; } diff --git a/src/msg/msg_types.cc b/src/msg/msg_types.cc index 7d29713538b..1f0c8242a8c 100644 --- a/src/msg/msg_types.cc +++ b/src/msg/msg_types.cc @@ -1,3 +1,5 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab #include "msg_types.h" @@ -8,26 +10,26 @@ #include "common/Formatter.h" -void entity_name_t::dump(Formatter *f) const +void entity_name_t::dump(ceph::Formatter *f) const { f->dump_string("type", type_str()); f->dump_unsigned("num", num()); } -void entity_addr_t::dump(Formatter *f) const +void entity_addr_t::dump(ceph::Formatter *f) const { f->dump_string("type", get_type_name(type)); f->dump_stream("addr") << get_sockaddr(); f->dump_unsigned("nonce", nonce); } -void entity_inst_t::dump(Formatter *f) const +void entity_inst_t::dump(ceph::Formatter *f) const { f->dump_object("name", name); f->dump_object("addr", addr); } -void entity_name_t::generate_test_instances(list<entity_name_t*>& o) +void entity_name_t::generate_test_instances(std::list<entity_name_t*>& o) { o.push_back(new entity_name_t(entity_name_t::MON())); o.push_back(new entity_name_t(entity_name_t::MON(1))); @@ -35,7 +37,7 @@ void entity_name_t::generate_test_instances(list<entity_name_t*>& o) o.push_back(new entity_name_t(entity_name_t::CLIENT(1))); } -void entity_addr_t::generate_test_instances(list<entity_addr_t*>& o) +void entity_addr_t::generate_test_instances(std::list<entity_addr_t*>& o) { o.push_back(new entity_addr_t()); entity_addr_t *a = new entity_addr_t(); @@ -53,7 +55,7 @@ void entity_addr_t::generate_test_instances(list<entity_addr_t*>& o) o.push_back(b); } -void entity_inst_t::generate_test_instances(list<entity_inst_t*>& o) +void entity_inst_t::generate_test_instances(std::list<entity_inst_t*>& o) { o.push_back(new entity_inst_t()); entity_name_t name; @@ -175,7 +177,7 @@ bool entity_addr_t::parse(const char *s, const char **end, int default_type) return true; } -ostream& operator<<(ostream& out, const entity_addr_t &addr) +std::ostream& operator<<(std::ostream& out, const entity_addr_t &addr) { if (addr.type == entity_addr_t::TYPE_NONE) { return out << "-"; @@ -187,7 +189,7 @@ ostream& operator<<(ostream& out, const entity_addr_t &addr) return out; } -ostream& operator<<(ostream& out, const sockaddr *psa) +std::ostream& operator<<(std::ostream& out, const sockaddr *psa) { char buf[NI_MAXHOST] = { 0 }; @@ -211,7 +213,7 @@ ostream& operator<<(ostream& out, const sockaddr *psa) } } -ostream& operator<<(ostream& out, const sockaddr_storage &ss) +std::ostream& operator<<(std::ostream& out, const sockaddr_storage &ss) { return out << (const sockaddr*)&ss; } @@ -274,7 +276,7 @@ bool entity_addrvec_t::parse(const char *s, const char **end) return !v.empty(); } -void entity_addrvec_t::encode(bufferlist& bl, uint64_t features) const +void entity_addrvec_t::encode(ceph::buffer::list& bl, uint64_t features) const { using ceph::encode; if ((features & CEPH_FEATURE_MSG_ADDR2) == 0) { @@ -286,7 +288,7 @@ void entity_addrvec_t::encode(bufferlist& bl, uint64_t features) const encode(v, bl, features); } -void entity_addrvec_t::decode(bufferlist::const_iterator& bl) +void entity_addrvec_t::decode(ceph::buffer::list::const_iterator& bl) { using ceph::decode; __u8 marker; @@ -315,21 +317,20 @@ void entity_addrvec_t::decode(bufferlist::const_iterator& bl) return; } if (marker > 2) - throw buffer::malformed_input("entity_addrvec_marker > 2"); + throw ceph::buffer::malformed_input("entity_addrvec_marker > 2"); decode(v, bl); } -void entity_addrvec_t::dump(Formatter *f) const +void entity_addrvec_t::dump(ceph::Formatter *f) const { f->open_array_section("addrvec"); - for (vector<entity_addr_t>::const_iterator p = v.begin(); - p != v.end(); ++p) { + for (auto p = v.begin(); p != v.end(); ++p) { f->dump_object("addr", *p); } f->close_section(); } -void entity_addrvec_t::generate_test_instances(list<entity_addrvec_t*>& ls) +void entity_addrvec_t::generate_test_instances(std::list<entity_addrvec_t*>& ls) { ls.push_back(new entity_addrvec_t()); ls.push_back(new entity_addrvec_t()); diff --git a/src/msg/msg_types.h b/src/msg/msg_types.h index 3016d2c53fc..3caf83f24e5 100644 --- a/src/msg/msg_types.h +++ b/src/msg/msg_types.h @@ -523,13 +523,13 @@ struct entity_addr_t { #endif uint16_t ss_family; if (elen < sizeof(ss_family)) { - throw buffer::malformed_input("elen smaller than family len"); + throw ceph::buffer::malformed_input("elen smaller than family len"); } decode(ss_family, bl); u.sa.sa_family = ss_family; elen -= sizeof(ss_family); if (elen > get_sockaddr_len() - sizeof(u.sa.sa_family)) { - throw buffer::malformed_input("elen exceeds sockaddr len"); + throw ceph::buffer::malformed_input("elen exceeds sockaddr len"); } bl.copy(elen, u.sa.sa_data); } |