diff options
author | Sage Weil <sage@redhat.com> | 2019-03-10 00:38:27 +0100 |
---|---|---|
committer | Sage Weil <sage@redhat.com> | 2019-03-10 00:38:27 +0100 |
commit | 37f5b21b1de11cd904c802d6d9590f440f53575c (patch) | |
tree | d27261cc56a07b2fb888be2b2c7833253a49d82e /src/msg | |
parent | Merge PR #26869 into master (diff) | |
parent | Revert "msg/async: msgr2: expected tags validation" (diff) | |
download | ceph-37f5b21b1de11cd904c802d6d9590f440f53575c.tar.xz ceph-37f5b21b1de11cd904c802d6d9590f440f53575c.zip |
Merge PR #26849 into master
* refs/pull/26849/head:
Revert "msg/async: msgr2: expected tags validation"
msg/async/ProtocolV2: state machine verifications
Reviewed-by: Sage Weil <sage@redhat.com>
Diffstat (limited to 'src/msg')
-rw-r--r-- | src/msg/async/ProtocolV2.cc | 255 | ||||
-rw-r--r-- | src/msg/async/ProtocolV2.h | 30 |
2 files changed, 169 insertions, 116 deletions
diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index d9b142c3520..b57ba66e183 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -54,9 +54,8 @@ void ProtocolV2::run_continuation(CtPtr continuation) { #define READB(L, B, C) read(CONTINUATION(C), L, B) -#define TAG_MASK(T) (1 << ((uint64_t)(T)-1)) - #ifdef UNIT_TESTS_BUILT + #define INTERCEPT(S) { \ if(connection->interceptor) { \ auto a = connection->interceptor->intercept(connection, (S)); \ @@ -88,7 +87,6 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) can_write(false), bannerExchangeCallback(nullptr), next_payload_len(0), - sent_tag(static_cast<Tag>(0)), next_tag(static_cast<Tag>(0)), keepalive(false) { temp_buffer = new char[4096]; @@ -218,7 +216,8 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { } void ProtocolV2::reset_recv_state() { - if (state == CONNECTING || state == READY) { + if ((state >= AUTH_CONNECTING && state <= SESSION_RECONNECTING) || + state == READY) { auth_meta.reset(new AuthConnectionMeta); session_stream_handlers.tx.reset(nullptr); session_stream_handlers.rx.reset(nullptr); @@ -228,7 +227,6 @@ void ProtocolV2::reset_recv_state() { connection->pendingReadLen.reset(); connection->writeCallback.reset(); - sent_tag = static_cast<Tag>(0); next_tag = static_cast<Tag>(0); reset_throttle(); @@ -281,8 +279,8 @@ CtPtr ProtocolV2::_fault() { return nullptr; } - if (connection->policy.lossy && state != START_CONNECT && - state != CONNECTING) { + if (connection->policy.lossy && + !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) { ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl; stop(); connection->dispatch_queue->queue_reset(connection); @@ -296,7 +294,7 @@ CtPtr ProtocolV2::_fault() { requeue_sent(); if (out_queue.empty() && state >= START_ACCEPT && - state <= ACCEPTING_SESSION && !replacing) { + state <= SESSION_ACCEPTING && !replacing) { ldout(cct, 2) << __func__ << " with nothing to send and in the half " << " accept state just closed" << dendl; connection->write_lock.unlock(); @@ -328,10 +326,9 @@ CtPtr ProtocolV2::_fault() { connection->write_lock.unlock(); - if (state != START_CONNECT && - state != CONNECTING && + if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) && state != WAIT && - state != ACCEPTING_SESSION /* due to connection race */) { + state != SESSION_ACCEPTING /* due to connection race */) { // policy maybe empty when state is in accept if (connection->policy.server) { ldout(cct, 1) << __func__ << " server, going to standby" << dendl; @@ -510,8 +507,6 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { ldout(cct, 5) << __func__ << " sending message m=" << m << " seq=" << m->get_seq() << " " << *m << dendl; - sent_tag = message.tag; - m->trace.event("async writing message"); ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq() << " src=" << entity_name_t(messenger->get_myname()) @@ -627,7 +622,6 @@ void ProtocolV2::write_event() { s = in_seq; auto ack = AckFrame::Encode(session_stream_handlers, in_seq); connection->outcoming_bl.claim_append(ack.get_buffer()); - sent_tag = ack.tag; ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left -= left; @@ -719,7 +713,6 @@ template <class F> CtPtr ProtocolV2::write(const std::string &desc, CONTINUATION_PARAM(next, ProtocolV2), F &frame) { - sent_tag = frame.tag; return write(desc, CONTINUATION(next), frame.get_buffer()); } @@ -762,7 +755,7 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) { encode((uint16_t)banner_payload.length(), bl, 0); bl.claim_append(banner_payload); - INTERCEPT(state == CONNECTING ? 3 : 4); + INTERCEPT(state == BANNER_CONNECTING ? 3 : 4); return WRITE(bl, "banner", _wait_for_peer_banner); } @@ -810,7 +803,7 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) { next_payload_len = payload_len; - INTERCEPT(state == CONNECTING ? 5 : 6); + INTERCEPT(state == BANNER_CONNECTING ? 5 : 6); return READ(next_payload_len, _handle_peer_banner_payload); } @@ -870,10 +863,21 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) { this->connection_features = msgr2_required; } + // at this point we can change how the client protocol behaves based on + // this->peer_required_features + + if (state == BANNER_CONNECTING) { + state = HELLO_CONNECTING; + } + else { + ceph_assert(state == BANNER_ACCEPTING); + state = HELLO_ACCEPTING; + } + auto hello = HelloFrame::Encode(messenger->get_mytype(), connection->target_addr); - INTERCEPT(state == CONNECTING ? 7 : 8); + INTERCEPT(state == HELLO_CONNECTING ? 7 : 8); return WRITE(hello, "hello frame", read_frame); } @@ -883,6 +887,11 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) { + lderr(cct) << __func__ << " not in hello exchange state!" << dendl; + return _fault(); + } + auto hello = HelloFrame::Decode(payload); ldout(cct, 5) << __func__ << " received hello:" @@ -892,7 +901,7 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload) if (connection->get_peer_type() == -1) { connection->set_peer_type(hello.entity_type()); - ceph_assert(state == ACCEPTING); + ceph_assert(state == HELLO_ACCEPTING); connection->policy = messenger->get_policy(hello.entity_type()); ldout(cct, 10) << __func__ << " accept of host_type " << (int)hello.entity_type() @@ -902,6 +911,7 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload) << " policy.resetcheck=" << connection->policy.resetcheck << dendl; } else { + ceph_assert(state == HELLO_CONNECTING); if (connection->get_peer_type() != hello.entity_type()) { ldout(cct, 1) << __func__ << " connection peer type does not match what" << " peer advertises " << connection->get_peer_type() @@ -919,65 +929,6 @@ CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload) return callback; } -uint64_t ProtocolV2::expected_tags(Tag sent_tag, Tag received_tag) { - switch(sent_tag) { - case Tag::HELLO: - if (received_tag == Tag::HELLO) { - ceph_assert(state == ACCEPTING); - return TAG_MASK(Tag::AUTH_REQUEST); - } else { - return TAG_MASK(Tag::HELLO); - } - case Tag::AUTH_REQUEST: - case Tag::AUTH_REQUEST_MORE: - return TAG_MASK(Tag::AUTH_REPLY_MORE) | TAG_MASK(Tag::AUTH_DONE) | - TAG_MASK(Tag::AUTH_BAD_METHOD); - case Tag::AUTH_BAD_METHOD: - return TAG_MASK(Tag::AUTH_REQUEST); - case Tag::AUTH_REPLY_MORE: - return TAG_MASK(Tag::AUTH_REQUEST_MORE); - case Tag::AUTH_DONE: - return TAG_MASK(Tag::CLIENT_IDENT) | TAG_MASK(Tag::SESSION_RECONNECT); - case Tag::CLIENT_IDENT: - if (state == READY) { - return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) | - TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK); - } else { - ceph_assert(state == CONNECTING); - return TAG_MASK(Tag::SERVER_IDENT) | - TAG_MASK(Tag::IDENT_MISSING_FEATURES) | TAG_MASK(Tag::WAIT); - } - case Tag::SESSION_RECONNECT: - if (state == READY) { - return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) | - TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK); - } else { - ceph_assert(state == CONNECTING); - return TAG_MASK(Tag::SESSION_RECONNECT_OK) | - TAG_MASK(Tag::SESSION_RESET) | TAG_MASK(Tag::SESSION_RETRY) | - TAG_MASK(Tag::SESSION_RETRY_GLOBAL) | TAG_MASK(Tag::WAIT); - } - case Tag::SESSION_RESET: - return TAG_MASK(Tag::CLIENT_IDENT); - case Tag::SESSION_RETRY: - case Tag::SESSION_RETRY_GLOBAL: - return TAG_MASK(Tag::SESSION_RECONNECT); - case Tag::SERVER_IDENT: - case Tag::SESSION_RECONNECT_OK: - case Tag::KEEPALIVE2: - case Tag::KEEPALIVE2_ACK: - case Tag::ACK: - case Tag::MESSAGE: - ceph_assert(state == READY); - return TAG_MASK(Tag::MESSAGE) | TAG_MASK(Tag::KEEPALIVE2) | - TAG_MASK(Tag::KEEPALIVE2_ACK) | TAG_MASK(Tag::ACK); - case Tag::IDENT_MISSING_FEATURES: - case Tag::WAIT: - return 0; // the peer should reset when receiving these tags - } - return 0; -} - CtPtr ProtocolV2::read_frame() { if (state == CLOSED) { return nullptr; @@ -1043,17 +994,8 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { return _fault(); } - Tag received_tag = next_tag; next_tag = static_cast<Tag>(main_preamble.tag); - uint64_t expected_tag_mask = expected_tags(sent_tag, received_tag); - if (!(TAG_MASK(next_tag) & expected_tag_mask)) { - lderr(cct) << __func__ << " received unexpected tag: expected=0x" - << std::hex << expected_tag_mask << " got=0x" - << TAG_MASK(next_tag) << std::dec << dendl; - return _fault(); - } - rx_segments_desc.clear(); rx_segments_data.clear(); @@ -1074,6 +1016,10 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { // does it need throttle? if (next_tag == Tag::MESSAGE) { + if (state != READY) { + lderr(cct) << __func__ << " not in ready state!" << dendl; + return _fault(); + } state = THROTTLE_MESSAGE; return CONTINUE(throttle_message); } else { @@ -1626,6 +1572,11 @@ CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != READY) { + lderr(cct) << __func__ << " not in ready state!" << dendl; + return _fault(); + } + auto keepalive_frame = KeepAliveFrame::Decode(session_stream_handlers, payload); @@ -1651,6 +1602,11 @@ CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != READY) { + lderr(cct) << __func__ << " not in ready state!" << dendl; + return _fault(); + } + auto keepalive_ack_frame = KeepAliveFrameAck::Decode(session_stream_handlers, payload); connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp()); @@ -1664,6 +1620,11 @@ CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != READY) { + lderr(cct) << __func__ << " not in ready state!" << dendl; + return _fault(); + } + auto ack = AckFrame::Decode(session_stream_handlers, payload); handle_message_ack(ack.seq()); return CONTINUE(read_frame); @@ -1676,7 +1637,7 @@ CtPtr ProtocolV2::start_client_banner_exchange() { INTERCEPT(1); - state = CONNECTING; + state = BANNER_CONNECTING; global_seq = messenger->get_global_seq(); @@ -1686,8 +1647,7 @@ CtPtr ProtocolV2::start_client_banner_exchange() { CtPtr ProtocolV2::post_client_banner_exchange() { ldout(cct, 20) << __func__ << dendl; - // at this point we can change how the client protocol behaves based on - // this->peer_required_features + state = AUTH_CONNECTING; return send_auth_request(); } @@ -1705,7 +1665,8 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) { connection, am.get(), &am->auth_method, &preferred_modes, &bl); connection->lock.lock(); - if (state != State::CONNECTING) { + if (state != AUTH_CONNECTING) { + ldout(cct, 1) << __func__ << " state changed!" << dendl; return _fault(); } if (r < 0) { @@ -1727,6 +1688,11 @@ CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) { ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != AUTH_CONNECTING) { + lderr(cct) << __func__ << " not in auth connect state!" << dendl; + return _fault(); + } + auto bad_method = AuthBadMethodFrame::Decode(payload); ldout(cct, 1) << __func__ << " method=" << bad_method.method() << " result " << cpp_strerror(bad_method.result()) @@ -1743,7 +1709,7 @@ CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) { bad_method.allowed_methods(), bad_method.allowed_modes()); connection->lock.lock(); - if (state != State::CONNECTING || r < 0) { + if (state != AUTH_CONNECTING || r < 0) { return _fault(); } return send_auth_request(bad_method.allowed_methods()); @@ -1754,6 +1720,11 @@ CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != AUTH_CONNECTING) { + lderr(cct) << __func__ << " not in auth connect state!" << dendl; + return _fault(); + } + auto auth_more = AuthReplyMoreFrame::Decode(payload); ldout(cct, 5) << __func__ << " auth reply more len=" << auth_more.auth_payload().length() @@ -1765,7 +1736,8 @@ CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload) int r = messenger->auth_client->handle_auth_reply_more( connection, am.get(), auth_more.auth_payload(), &reply); connection->lock.lock(); - if (state != State::CONNECTING) { + if (state != AUTH_CONNECTING) { + ldout(cct, 1) << __func__ << " state changed!" << dendl; return _fault(); } if (r < 0) { @@ -1782,6 +1754,11 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != AUTH_CONNECTING) { + lderr(cct) << __func__ << " not in auth connect state!" << dendl; + return _fault(); + } + auto auth_done = AuthDoneFrame::Decode(payload); ceph_assert(messenger->auth_client); @@ -1796,7 +1773,8 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload) &am->session_key, &am->connection_secret); connection->lock.lock(); - if (state != State::CONNECTING) { + if (state != AUTH_CONNECTING) { + ldout(cct, 1) << __func__ << " state changed!" << dendl; return _fault(); } if (r < 0) { @@ -1808,8 +1786,10 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload) if (!server_cookie) { ceph_assert(connect_seq == 0); + state = SESSION_CONNECTING; return send_client_ident(); } else { // reconnecting to previous session + state = SESSION_RECONNECTING; ceph_assert(connect_seq > 0); return send_reconnect(); } @@ -1851,7 +1831,7 @@ CtPtr ProtocolV2::send_client_ident() { } } connection->lock.lock(); - if (state != CONNECTING) { + if (state != SESSION_CONNECTING) { ldout(cct, 1) << __func__ << " state changed while learned_addr, mark_down or " << " replacing must be happened just now" << dendl; @@ -1868,7 +1848,7 @@ CtPtr ProtocolV2::send_client_ident() { ldout(cct, 5) << __func__ << " sending identification: " << "addrs=" << messenger->get_myaddrs() - << " target=" << connection->target_addr + << " target=" << connection->target_addr << " gid=" << messenger->get_myname().num() << " global_seq=" << global_seq << " features_supported=" << std::hex @@ -1910,6 +1890,11 @@ CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_CONNECTING) { + lderr(cct) << __func__ << " not in session connect state!" << dendl; + return _fault(); + } + auto ident_missing = IdentMissingFeaturesFrame::Decode(session_stream_handlers, payload); lderr(cct) << __func__ @@ -1924,6 +1909,11 @@ CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_RECONNECTING) { + lderr(cct) << __func__ << " not in session reconnect state!" << dendl; + return _fault(); + } + auto reset = ResetFrame::Decode(session_stream_handlers, payload); ldout(cct, 1) << __func__ << " received session reset full=" << reset.full() @@ -1936,6 +1926,7 @@ CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload) in_seq = 0; } + state = SESSION_CONNECTING; return send_client_ident(); } @@ -1944,6 +1935,11 @@ CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_RECONNECTING) { + lderr(cct) << __func__ << " not in session reconnect state!" << dendl; + return _fault(); + } + auto retry = RetryFrame::Decode(session_stream_handlers, payload); connect_seq = retry.connect_seq() + 1; @@ -1959,6 +1955,11 @@ CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_RECONNECTING) { + lderr(cct) << __func__ << " not in session reconnect state!" << dendl; + return _fault(); + } + auto retry = RetryGlobalFrame::Decode(session_stream_handlers, payload); global_seq = messenger->get_global_seq(retry.global_seq()); @@ -1975,6 +1976,11 @@ CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) { << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) { + lderr(cct) << __func__ << " not in session (re)connect state!" << dendl; + return _fault(); + } + state = WAIT; WaitFrame::Decode(session_stream_handlers, payload); return _fault(); @@ -1985,6 +1991,11 @@ CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_RECONNECTING) { + lderr(cct) << __func__ << " not in session reconnect state!" << dendl; + return _fault(); + } + auto reconnect_ok = ReconnectOkFrame::Decode(session_stream_handlers, payload); ldout(cct, 5) << __func__ @@ -2013,6 +2024,11 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_CONNECTING) { + lderr(cct) << __func__ << " not in session connect state!" << dendl; + return _fault(); + } + auto server_ident = ServerIdentFrame::Decode(session_stream_handlers, payload); ldout(cct, 5) << __func__ << " received server identification:" @@ -2066,7 +2082,7 @@ CtPtr ProtocolV2::start_server_banner_exchange() { INTERCEPT(2); - state = ACCEPTING; + state = BANNER_ACCEPTING; return _banner_exchange(CONTINUATION(post_server_banner_exchange)); } @@ -2074,13 +2090,20 @@ CtPtr ProtocolV2::start_server_banner_exchange() { CtPtr ProtocolV2::post_server_banner_exchange() { ldout(cct, 20) << __func__ << dendl; - // at this point we can change how the server protocol behaves based on - // this->peer_required_features + state = AUTH_ACCEPTING; return CONTINUE(read_frame); } CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) { + ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() + << dendl; + + if (state != AUTH_ACCEPTING) { + lderr(cct) << __func__ << " not in auth accept state!" << dendl; + return _fault(); + } + auto request = AuthRequestFrame::Decode(payload); ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method() << ", preferred_modes=" << request.preferred_modes() @@ -2126,7 +2149,7 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) more, am->auth_method, auth_payload, &reply); connection->lock.lock(); - if (state != ACCEPTING) { + if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) { ldout(cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; @@ -2139,11 +2162,16 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) ceph_assert(auth_meta); session_stream_handlers = \ ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true); + + state = SESSION_ACCEPTING; + auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id, auth_meta->con_mode, reply); return WRITE(auth_done, "auth done", read_frame); } else if (r == 0) { + state = AUTH_ACCEPTING_MORE; + auto more = AuthReplyMoreFrame::Encode(reply); return WRITE(more, "auth reply more", read_frame); } else if (r == -EBUSY) { @@ -2158,6 +2186,12 @@ CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload) { ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + + if (state != AUTH_ACCEPTING_MORE) { + lderr(cct) << __func__ << " not in auth accept more state!" << dendl; + return _fault(); + } + auto auth_more = AuthRequestMoreFrame::Decode(payload); return _handle_auth_request(auth_more.auth_payload(), true); } @@ -2167,6 +2201,11 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_ACCEPTING) { + lderr(cct) << __func__ << " not in session accept state!" << dendl; + return _fault(); + } + auto client_ident = ClientIdentFrame::Decode(session_stream_handlers, payload); @@ -2217,7 +2256,6 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) connection_features = client_ident.supported_features() & connection->policy.features_supported; - state = ACCEPTING_SESSION; peer_global_seq = client_ident.global_seq(); // Looks good so far, let's check if there is already an existing connection @@ -2238,7 +2276,7 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) connection->inject_delay(); connection->lock.lock(); - if (state != ACCEPTING_SESSION) { + if (state != SESSION_ACCEPTING) { ldout(cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; @@ -2259,6 +2297,11 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) ldout(cct, 20) << __func__ << " payload.length()=" << payload.length() << dendl; + if (state != SESSION_ACCEPTING) { + lderr(cct) << __func__ << " not in session accept state!" << dendl; + return _fault(); + } + auto reconnect = ReconnectFrame::Decode(session_stream_handlers, payload); ldout(cct, 5) << __func__ @@ -2291,7 +2334,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) connection->inject_delay(); connection->lock.lock(); - if (state != ACCEPTING) { + if (state != SESSION_ACCEPTING) { ldout(cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; @@ -2606,7 +2649,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, if (exproto->state == CLOSED) return; ceph_assert(exproto->state == NONE); - exproto->state = ACCEPTING_SESSION; + exproto->state = SESSION_ACCEPTING; existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED; existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); @@ -2682,7 +2725,7 @@ CtPtr ProtocolV2::send_server_ident() { connection->inject_delay(); return _fault(); } - if (state != ACCEPTING_SESSION) { + if (state != SESSION_ACCEPTING) { ldout(cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl; @@ -2739,7 +2782,7 @@ CtPtr ProtocolV2::send_reconnect_ok() { connection->inject_delay(); return _fault(); } - if (state != ACCEPTING_SESSION) { + if (state != SESSION_ACCEPTING) { ldout(cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl; diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index d49b3c9a2af..2d4c6d24354 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -15,10 +15,17 @@ private: enum State { NONE, START_CONNECT, - CONNECTING, // banner + authentication + ident + BANNER_CONNECTING, + HELLO_CONNECTING, + AUTH_CONNECTING, + SESSION_CONNECTING, + SESSION_RECONNECTING, START_ACCEPT, - ACCEPTING, // banner + authentication + ident - ACCEPTING_SESSION, + BANNER_ACCEPTING, + HELLO_ACCEPTING, + AUTH_ACCEPTING, + AUTH_ACCEPTING_MORE, + SESSION_ACCEPTING, READY, THROTTLE_MESSAGE, THROTTLE_BYTES, @@ -33,10 +40,17 @@ private: static const char *get_state_name(int state) { const char *const statenames[] = {"NONE", "START_CONNECT", - "CONNECTING", + "BANNER_CONNECTING", + "HELLO_CONNECTING", + "AUTH_CONNECTING", + "SESSION_CONNECTING", + "SESSION_RECONNECTING", "START_ACCEPT", - "ACCEPTING", - "ACCEPTING_SESSION", + "BANNER_ACCEPTING", + "HELLO_ACCEPTING", + "AUTH_ACCEPTING", + "AUTH_ACCEPTING_MORE", + "SESSION_ACCEPTING", "READY", "THROTTLE_MESSAGE", "THROTTLE_BYTES", @@ -92,7 +106,6 @@ public: ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data; private: - ceph::msgr::v2::Tag sent_tag; ceph::msgr::v2::Tag next_tag; utime_t backoff; // backoff time utime_t recv_stamp; @@ -112,9 +125,6 @@ private: CONTINUATION_PARAM(next, ProtocolV2), bufferlist &buffer); - uint64_t expected_tags(ceph::msgr::v2::Tag sent_tag, - ceph::msgr::v2::Tag received_tag); - void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); void reset_recv_state(); |