summaryrefslogtreecommitdiffstats
path: root/src/msg
diff options
context:
space:
mode:
authorSage Weil <sage@redhat.com>2019-03-10 00:38:27 +0100
committerSage Weil <sage@redhat.com>2019-03-10 00:38:27 +0100
commit37f5b21b1de11cd904c802d6d9590f440f53575c (patch)
treed27261cc56a07b2fb888be2b2c7833253a49d82e /src/msg
parentMerge PR #26869 into master (diff)
parentRevert "msg/async: msgr2: expected tags validation" (diff)
downloadceph-37f5b21b1de11cd904c802d6d9590f440f53575c.tar.xz
ceph-37f5b21b1de11cd904c802d6d9590f440f53575c.zip
Merge PR #26849 into master
* refs/pull/26849/head: Revert "msg/async: msgr2: expected tags validation" msg/async/ProtocolV2: state machine verifications Reviewed-by: Sage Weil <sage@redhat.com>
Diffstat (limited to 'src/msg')
-rw-r--r--src/msg/async/ProtocolV2.cc255
-rw-r--r--src/msg/async/ProtocolV2.h30
2 files changed, 169 insertions, 116 deletions
diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc
index d9b142c3520..b57ba66e183 100644
--- a/src/msg/async/ProtocolV2.cc
+++ b/src/msg/async/ProtocolV2.cc
@@ -54,9 +54,8 @@ void ProtocolV2::run_continuation(CtPtr continuation) {
#define READB(L, B, C) read(CONTINUATION(C), L, B)
-#define TAG_MASK(T) (1 << ((uint64_t)(T)-1))
-
#ifdef UNIT_TESTS_BUILT
+
#define INTERCEPT(S) { \
if(connection->interceptor) { \
auto a = connection->interceptor->intercept(connection, (S)); \
@@ -88,7 +87,6 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
can_write(false),
bannerExchangeCallback(nullptr),
next_payload_len(0),
- sent_tag(static_cast<Tag>(0)),
next_tag(static_cast<Tag>(0)),
keepalive(false) {
temp_buffer = new char[4096];
@@ -218,7 +216,8 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
}
void ProtocolV2::reset_recv_state() {
- if (state == CONNECTING || state == READY) {
+ if ((state >= AUTH_CONNECTING && state <= SESSION_RECONNECTING) ||
+ state == READY) {
auth_meta.reset(new AuthConnectionMeta);
session_stream_handlers.tx.reset(nullptr);
session_stream_handlers.rx.reset(nullptr);
@@ -228,7 +227,6 @@ void ProtocolV2::reset_recv_state() {
connection->pendingReadLen.reset();
connection->writeCallback.reset();
- sent_tag = static_cast<Tag>(0);
next_tag = static_cast<Tag>(0);
reset_throttle();
@@ -281,8 +279,8 @@ CtPtr ProtocolV2::_fault() {
return nullptr;
}
- if (connection->policy.lossy && state != START_CONNECT &&
- state != CONNECTING) {
+ if (connection->policy.lossy &&
+ !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
@@ -296,7 +294,7 @@ CtPtr ProtocolV2::_fault() {
requeue_sent();
if (out_queue.empty() && state >= START_ACCEPT &&
- state <= ACCEPTING_SESSION && !replacing) {
+ state <= SESSION_ACCEPTING && !replacing) {
ldout(cct, 2) << __func__ << " with nothing to send and in the half "
<< " accept state just closed" << dendl;
connection->write_lock.unlock();
@@ -328,10 +326,9 @@ CtPtr ProtocolV2::_fault() {
connection->write_lock.unlock();
- if (state != START_CONNECT &&
- state != CONNECTING &&
+ if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
state != WAIT &&
- state != ACCEPTING_SESSION /* due to connection race */) {
+ state != SESSION_ACCEPTING /* due to connection race */) {
// policy maybe empty when state is in accept
if (connection->policy.server) {
ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
@@ -510,8 +507,6 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
- sent_tag = message.tag;
-
m->trace.event("async writing message");
ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
<< " src=" << entity_name_t(messenger->get_myname())
@@ -627,7 +622,6 @@ void ProtocolV2::write_event() {
s = in_seq;
auto ack = AckFrame::Encode(session_stream_handlers, in_seq);
connection->outcoming_bl.claim_append(ack.get_buffer());
- sent_tag = ack.tag;
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
<< " messages" << dendl;
ack_left -= left;
@@ -719,7 +713,6 @@ template <class F>
CtPtr ProtocolV2::write(const std::string &desc,
CONTINUATION_PARAM(next, ProtocolV2),
F &frame) {
- sent_tag = frame.tag;
return write(desc, CONTINUATION(next), frame.get_buffer());
}
@@ -762,7 +755,7 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) {
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);
- INTERCEPT(state == CONNECTING ? 3 : 4);
+ INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
return WRITE(bl, "banner", _wait_for_peer_banner);
}
@@ -810,7 +803,7 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) {
next_payload_len = payload_len;
- INTERCEPT(state == CONNECTING ? 5 : 6);
+ INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
return READ(next_payload_len, _handle_peer_banner_payload);
}
@@ -870,10 +863,21 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) {
this->connection_features = msgr2_required;
}
+ // at this point we can change how the client protocol behaves based on
+ // this->peer_required_features
+
+ if (state == BANNER_CONNECTING) {
+ state = HELLO_CONNECTING;
+ }
+ else {
+ ceph_assert(state == BANNER_ACCEPTING);
+ state = HELLO_ACCEPTING;
+ }
+
auto hello = HelloFrame::Encode(messenger->get_mytype(),
connection->target_addr);
- INTERCEPT(state == CONNECTING ? 7 : 8);
+ INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
return WRITE(hello, "hello frame", read_frame);
}
@@ -883,6 +887,11 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
+ lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
+ return _fault();
+ }
+
auto hello = HelloFrame::Decode(payload);
ldout(cct, 5) << __func__ << " received hello:"
@@ -892,7 +901,7 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
if (connection->get_peer_type() == -1) {
connection->set_peer_type(hello.entity_type());
- ceph_assert(state == ACCEPTING);
+ ceph_assert(state == HELLO_ACCEPTING);
connection->policy = messenger->get_policy(hello.entity_type());
ldout(cct, 10) << __func__ << " accept of host_type "
<< (int)hello.entity_type()
@@ -902,6 +911,7 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
<< " policy.resetcheck=" << connection->policy.resetcheck
<< dendl;
} else {
+ ceph_assert(state == HELLO_CONNECTING);
if (connection->get_peer_type() != hello.entity_type()) {
ldout(cct, 1) << __func__ << " connection peer type does not match what"
<< " peer advertises " << connection->get_peer_type()
@@ -919,65 +929,6 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
return callback;
}
-uint64_t ProtocolV2::expected_tags(Tag sent_tag, Tag received_tag) {
- switch(sent_tag) {
- case Tag::HELLO:
- if (received_tag == Tag::HELLO) {
- ceph_assert(state == ACCEPTING);
- return TAG_MASK(Tag::AUTH_REQUEST);
- } else {
- return TAG_MASK(Tag::HELLO);
- }
- case Tag::AUTH_REQUEST:
- case Tag::AUTH_REQUEST_MORE:
- return TAG_MASK(Tag::AUTH_REPLY_MORE) | TAG_MASK(Tag::AUTH_DONE) |
- TAG_MASK(Tag::AUTH_BAD_METHOD);
- case Tag::AUTH_BAD_METHOD:
- return TAG_MASK(Tag::AUTH_REQUEST);
- case Tag::AUTH_REPLY_MORE:
- return TAG_MASK(Tag::AUTH_REQUEST_MORE);
- case Tag::AUTH_DONE:
- return TAG_MASK(Tag::CLIENT_IDENT) | TAG_MASK(Tag::SESSION_RECONNECT);
- case Tag::CLIENT_IDENT:
- if (state == READY) {
- return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
- TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
- } else {
- ceph_assert(state == CONNECTING);
- return TAG_MASK(Tag::SERVER_IDENT) |
- TAG_MASK(Tag::IDENT_MISSING_FEATURES) | TAG_MASK(Tag::WAIT);
- }
- case Tag::SESSION_RECONNECT:
- if (state == READY) {
- return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
- TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
- } else {
- ceph_assert(state == CONNECTING);
- return TAG_MASK(Tag::SESSION_RECONNECT_OK) |
- TAG_MASK(Tag::SESSION_RESET) | TAG_MASK(Tag::SESSION_RETRY) |
- TAG_MASK(Tag::SESSION_RETRY_GLOBAL) | TAG_MASK(Tag::WAIT);
- }
- case Tag::SESSION_RESET:
- return TAG_MASK(Tag::CLIENT_IDENT);
- case Tag::SESSION_RETRY:
- case Tag::SESSION_RETRY_GLOBAL:
- return TAG_MASK(Tag::SESSION_RECONNECT);
- case Tag::SERVER_IDENT:
- case Tag::SESSION_RECONNECT_OK:
- case Tag::KEEPALIVE2:
- case Tag::KEEPALIVE2_ACK:
- case Tag::ACK:
- case Tag::MESSAGE:
- ceph_assert(state == READY);
- return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) |
- TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK);
- case Tag::IDENT_MISSING_FEATURES:
- case Tag::WAIT:
- return 0; // the peer should reset when receiving these tags
- }
- return 0;
-}
-
CtPtr ProtocolV2::read_frame() {
if (state == CLOSED) {
return nullptr;
@@ -1043,17 +994,8 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
return _fault();
}
- Tag received_tag = next_tag;
next_tag = static_cast<Tag>(main_preamble.tag);
- uint64_t expected_tag_mask = expected_tags(sent_tag, received_tag);
- if (!(TAG_MASK(next_tag) & expected_tag_mask)) {
- lderr(cct) << __func__ << " received unexpected tag: expected=0x"
- << std::hex << expected_tag_mask << " got=0x"
- << TAG_MASK(next_tag) << std::dec << dendl;
- return _fault();
- }
-
rx_segments_desc.clear();
rx_segments_data.clear();
@@ -1074,6 +1016,10 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) {
// does it need throttle?
if (next_tag == Tag::MESSAGE) {
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
state = THROTTLE_MESSAGE;
return CONTINUE(throttle_message);
} else {
@@ -1626,6 +1572,11 @@ CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
+
auto keepalive_frame = KeepAliveFrame::Decode(session_stream_handlers,
payload);
@@ -1651,6 +1602,11 @@ CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
+
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(session_stream_handlers,
payload);
connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
@@ -1664,6 +1620,11 @@ CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != READY) {
+ lderr(cct) << __func__ << " not in ready state!" << dendl;
+ return _fault();
+ }
+
auto ack = AckFrame::Decode(session_stream_handlers, payload);
handle_message_ack(ack.seq());
return CONTINUE(read_frame);
@@ -1676,7 +1637,7 @@ CtPtr ProtocolV2::start_client_banner_exchange() {
INTERCEPT(1);
- state = CONNECTING;
+ state = BANNER_CONNECTING;
global_seq = messenger->get_global_seq();
@@ -1686,8 +1647,7 @@ CtPtr ProtocolV2::start_client_banner_exchange() {
CtPtr ProtocolV2::post_client_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;
- // at this point we can change how the client protocol behaves based on
- // this->peer_required_features
+ state = AUTH_CONNECTING;
return send_auth_request();
}
@@ -1705,7 +1665,8 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
connection, am.get(),
&am->auth_method, &preferred_modes, &bl);
connection->lock.lock();
- if (state != State::CONNECTING) {
+ if (state != AUTH_CONNECTING) {
+ ldout(cct, 1) << __func__ << " state changed!" << dendl;
return _fault();
}
if (r < 0) {
@@ -1727,6 +1688,11 @@ CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != AUTH_CONNECTING) {
+ lderr(cct) << __func__ << " not in auth connect state!" << dendl;
+ return _fault();
+ }
+
auto bad_method = AuthBadMethodFrame::Decode(payload);
ldout(cct, 1) << __func__ << " method=" << bad_method.method()
<< " result " << cpp_strerror(bad_method.result())
@@ -1743,7 +1709,7 @@ CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
bad_method.allowed_methods(),
bad_method.allowed_modes());
connection->lock.lock();
- if (state != State::CONNECTING || r < 0) {
+ if (state != AUTH_CONNECTING || r < 0) {
return _fault();
}
return send_auth_request(bad_method.allowed_methods());
@@ -1754,6 +1720,11 @@ CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != AUTH_CONNECTING) {
+ lderr(cct) << __func__ << " not in auth connect state!" << dendl;
+ return _fault();
+ }
+
auto auth_more = AuthReplyMoreFrame::Decode(payload);
ldout(cct, 5) << __func__
<< " auth reply more len=" << auth_more.auth_payload().length()
@@ -1765,7 +1736,8 @@ CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
int r = messenger->auth_client->handle_auth_reply_more(
connection, am.get(), auth_more.auth_payload(), &reply);
connection->lock.lock();
- if (state != State::CONNECTING) {
+ if (state != AUTH_CONNECTING) {
+ ldout(cct, 1) << __func__ << " state changed!" << dendl;
return _fault();
}
if (r < 0) {
@@ -1782,6 +1754,11 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != AUTH_CONNECTING) {
+ lderr(cct) << __func__ << " not in auth connect state!" << dendl;
+ return _fault();
+ }
+
auto auth_done = AuthDoneFrame::Decode(payload);
ceph_assert(messenger->auth_client);
@@ -1796,7 +1773,8 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
&am->session_key,
&am->connection_secret);
connection->lock.lock();
- if (state != State::CONNECTING) {
+ if (state != AUTH_CONNECTING) {
+ ldout(cct, 1) << __func__ << " state changed!" << dendl;
return _fault();
}
if (r < 0) {
@@ -1808,8 +1786,10 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
if (!server_cookie) {
ceph_assert(connect_seq == 0);
+ state = SESSION_CONNECTING;
return send_client_ident();
} else { // reconnecting to previous session
+ state = SESSION_RECONNECTING;
ceph_assert(connect_seq > 0);
return send_reconnect();
}
@@ -1851,7 +1831,7 @@ CtPtr ProtocolV2::send_client_ident() {
}
}
connection->lock.lock();
- if (state != CONNECTING) {
+ if (state != SESSION_CONNECTING) {
ldout(cct, 1) << __func__
<< " state changed while learned_addr, mark_down or "
<< " replacing must be happened just now" << dendl;
@@ -1868,7 +1848,7 @@ CtPtr ProtocolV2::send_client_ident() {
ldout(cct, 5) << __func__ << " sending identification: "
<< "addrs=" << messenger->get_myaddrs()
- << " target=" << connection->target_addr
+ << " target=" << connection->target_addr
<< " gid=" << messenger->get_myname().num()
<< " global_seq=" << global_seq
<< " features_supported=" << std::hex
@@ -1910,6 +1890,11 @@ CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_CONNECTING) {
+ lderr(cct) << __func__ << " not in session connect state!" << dendl;
+ return _fault();
+ }
+
auto ident_missing =
IdentMissingFeaturesFrame::Decode(session_stream_handlers, payload);
lderr(cct) << __func__
@@ -1924,6 +1909,11 @@ CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto reset = ResetFrame::Decode(session_stream_handlers, payload);
ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
@@ -1936,6 +1926,7 @@ CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
in_seq = 0;
}
+ state = SESSION_CONNECTING;
return send_client_ident();
}
@@ -1944,6 +1935,11 @@ CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto retry = RetryFrame::Decode(session_stream_handlers, payload);
connect_seq = retry.connect_seq() + 1;
@@ -1959,6 +1955,11 @@ CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto retry = RetryGlobalFrame::Decode(session_stream_handlers, payload);
global_seq = messenger->get_global_seq(retry.global_seq());
@@ -1975,6 +1976,11 @@ CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
<< " payload.length()=" << payload.length()
<< dendl;
+ if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
+ return _fault();
+ }
+
state = WAIT;
WaitFrame::Decode(session_stream_handlers, payload);
return _fault();
@@ -1985,6 +1991,11 @@ CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_RECONNECTING) {
+ lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
+ return _fault();
+ }
+
auto reconnect_ok = ReconnectOkFrame::Decode(session_stream_handlers,
payload);
ldout(cct, 5) << __func__
@@ -2013,6 +2024,11 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_CONNECTING) {
+ lderr(cct) << __func__ << " not in session connect state!" << dendl;
+ return _fault();
+ }
+
auto server_ident = ServerIdentFrame::Decode(session_stream_handlers,
payload);
ldout(cct, 5) << __func__ << " received server identification:"
@@ -2066,7 +2082,7 @@ CtPtr ProtocolV2::start_server_banner_exchange() {
INTERCEPT(2);
- state = ACCEPTING;
+ state = BANNER_ACCEPTING;
return _banner_exchange(CONTINUATION(post_server_banner_exchange));
}
@@ -2074,13 +2090,20 @@ CtPtr ProtocolV2::start_server_banner_exchange() {
CtPtr ProtocolV2::post_server_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;
- // at this point we can change how the server protocol behaves based on
- // this->peer_required_features
+ state = AUTH_ACCEPTING;
return CONTINUE(read_frame);
}
CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
+ ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
+ << dendl;
+
+ if (state != AUTH_ACCEPTING) {
+ lderr(cct) << __func__ << " not in auth accept state!" << dendl;
+ return _fault();
+ }
+
auto request = AuthRequestFrame::Decode(payload);
ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
<< ", preferred_modes=" << request.preferred_modes()
@@ -2126,7 +2149,7 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
more, am->auth_method, auth_payload,
&reply);
connection->lock.lock();
- if (state != ACCEPTING) {
+ if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
ldout(cct, 1) << __func__
<< " state changed while accept, it must be mark_down"
<< dendl;
@@ -2139,11 +2162,16 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
ceph_assert(auth_meta);
session_stream_handlers = \
ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
+
+ state = SESSION_ACCEPTING;
+
auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
auth_meta->con_mode,
reply);
return WRITE(auth_done, "auth done", read_frame);
} else if (r == 0) {
+ state = AUTH_ACCEPTING_MORE;
+
auto more = AuthReplyMoreFrame::Encode(reply);
return WRITE(more, "auth reply more", read_frame);
} else if (r == -EBUSY) {
@@ -2158,6 +2186,12 @@ CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
{
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+
+ if (state != AUTH_ACCEPTING_MORE) {
+ lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
+ return _fault();
+ }
+
auto auth_more = AuthRequestMoreFrame::Decode(payload);
return _handle_auth_request(auth_more.auth_payload(), true);
}
@@ -2167,6 +2201,11 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_ACCEPTING) {
+ lderr(cct) << __func__ << " not in session accept state!" << dendl;
+ return _fault();
+ }
+
auto client_ident = ClientIdentFrame::Decode(session_stream_handlers,
payload);
@@ -2217,7 +2256,6 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
connection_features =
client_ident.supported_features() & connection->policy.features_supported;
- state = ACCEPTING_SESSION;
peer_global_seq = client_ident.global_seq();
// Looks good so far, let's check if there is already an existing connection
@@ -2238,7 +2276,7 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
connection->inject_delay();
connection->lock.lock();
- if (state != ACCEPTING_SESSION) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept, it must be mark_down"
<< dendl;
@@ -2259,6 +2297,11 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
ldout(cct, 20) << __func__
<< " payload.length()=" << payload.length() << dendl;
+ if (state != SESSION_ACCEPTING) {
+ lderr(cct) << __func__ << " not in session accept state!" << dendl;
+ return _fault();
+ }
+
auto reconnect = ReconnectFrame::Decode(session_stream_handlers, payload);
ldout(cct, 5) << __func__
@@ -2291,7 +2334,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
connection->inject_delay();
connection->lock.lock();
- if (state != ACCEPTING) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept, it must be mark_down"
<< dendl;
@@ -2606,7 +2649,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
if (exproto->state == CLOSED) return;
ceph_assert(exproto->state == NONE);
- exproto->state = ACCEPTING_SESSION;
+ exproto->state = SESSION_ACCEPTING;
existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
existing->read_handler);
@@ -2682,7 +2725,7 @@ CtPtr ProtocolV2::send_server_ident() {
connection->inject_delay();
return _fault();
}
- if (state != ACCEPTING_SESSION) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept_conn, it must be mark_down"
<< dendl;
@@ -2739,7 +2782,7 @@ CtPtr ProtocolV2::send_reconnect_ok() {
connection->inject_delay();
return _fault();
}
- if (state != ACCEPTING_SESSION) {
+ if (state != SESSION_ACCEPTING) {
ldout(cct, 1) << __func__
<< " state changed while accept_conn, it must be mark_down"
<< dendl;
diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h
index d49b3c9a2af..2d4c6d24354 100644
--- a/src/msg/async/ProtocolV2.h
+++ b/src/msg/async/ProtocolV2.h
@@ -15,10 +15,17 @@ private:
enum State {
NONE,
START_CONNECT,
- CONNECTING, // banner + authentication + ident
+ BANNER_CONNECTING,
+ HELLO_CONNECTING,
+ AUTH_CONNECTING,
+ SESSION_CONNECTING,
+ SESSION_RECONNECTING,
START_ACCEPT,
- ACCEPTING, // banner + authentication + ident
- ACCEPTING_SESSION,
+ BANNER_ACCEPTING,
+ HELLO_ACCEPTING,
+ AUTH_ACCEPTING,
+ AUTH_ACCEPTING_MORE,
+ SESSION_ACCEPTING,
READY,
THROTTLE_MESSAGE,
THROTTLE_BYTES,
@@ -33,10 +40,17 @@ private:
static const char *get_state_name(int state) {
const char *const statenames[] = {"NONE",
"START_CONNECT",
- "CONNECTING",
+ "BANNER_CONNECTING",
+ "HELLO_CONNECTING",
+ "AUTH_CONNECTING",
+ "SESSION_CONNECTING",
+ "SESSION_RECONNECTING",
"START_ACCEPT",
- "ACCEPTING",
- "ACCEPTING_SESSION",
+ "BANNER_ACCEPTING",
+ "HELLO_ACCEPTING",
+ "AUTH_ACCEPTING",
+ "AUTH_ACCEPTING_MORE",
+ "SESSION_ACCEPTING",
"READY",
"THROTTLE_MESSAGE",
"THROTTLE_BYTES",
@@ -92,7 +106,6 @@ public:
ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data;
private:
- ceph::msgr::v2::Tag sent_tag;
ceph::msgr::v2::Tag next_tag;
utime_t backoff; // backoff time
utime_t recv_stamp;
@@ -112,9 +125,6 @@ private:
CONTINUATION_PARAM(next, ProtocolV2),
bufferlist &buffer);
- uint64_t expected_tags(ceph::msgr::v2::Tag sent_tag,
- ceph::msgr::v2::Tag received_tag);
-
void requeue_sent();
uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
void reset_recv_state();