diff options
Diffstat (limited to 'src/msg/async/ProtocolV2.cc')
-rw-r--r-- | src/msg/async/ProtocolV2.cc | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 6d44d6c783f..ed6f93cdd48 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -127,9 +127,9 @@ bool ProtocolV2::is_connected() { return can_write; } void ProtocolV2::discard_out_queue() { ldout(cct, 10) << __func__ << " started" << dendl; - for (auto p = sent.begin(); p != sent.end(); ++p) { - ldout(cct, 20) << __func__ << " discard " << *p << dendl; - (*p)->put(); + for (Message *msg : sent) { + ldout(cct, 20) << __func__ << " discard " << msg << dendl; + msg->put(); } sent.clear(); for (auto& [ prio, entries ] : out_queue) { @@ -211,10 +211,11 @@ void ProtocolV2::requeue_sent() { uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { ldout(cct, 10) << __func__ << " " << seq << dendl; std::lock_guard<std::mutex> l(connection->write_lock); - if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) { + const auto it = out_queue.find(CEPH_MSG_PRIO_HIGHEST); + if (it == out_queue.end()) { return seq; } - auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; + auto& rq = it->second; uint64_t count = out_seq; while (!rq.empty()) { Message* const m = rq.front().m; @@ -226,7 +227,7 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { rq.pop_front(); count++; } - if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST); + if (rq.empty()) out_queue.erase(it); return count; } @@ -265,7 +266,7 @@ void ProtocolV2::reset_recv_state() { // clean read and write callbacks connection->pendingReadLen.reset(); - connection->writeCallback.reset(); + connection->writeCallback = {}; next_tag = static_cast<Tag>(0); @@ -507,14 +508,13 @@ void ProtocolV2::read_event() { ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() { out_queue_entry_t out_entry; - if (!out_queue.empty()) { - auto it = out_queue.rbegin(); + if (const auto it = out_queue.begin(); it != out_queue.end()) { auto& entries = it->second; ceph_assert(!entries.empty()); out_entry = entries.front(); entries.pop_front(); if (entries.empty()) { - out_queue.erase(it->first); + out_queue.erase(it); } } return out_entry; @@ -796,7 +796,7 @@ CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next, } template <class F> -CtPtr ProtocolV2::write(const std::string &desc, +CtPtr ProtocolV2::write(std::string_view desc, CONTINUATION_TYPE<ProtocolV2> &next, F &frame) { ceph::bufferlist bl; @@ -812,7 +812,7 @@ CtPtr ProtocolV2::write(const std::string &desc, return write(desc, next, bl); } -CtPtr ProtocolV2::write(const std::string &desc, +CtPtr ProtocolV2::write(std::string_view desc, CONTINUATION_TYPE<ProtocolV2> &next, ceph::bufferlist &buffer) { if (unlikely(pre_auth.enabled)) { |