summaryrefslogtreecommitdiffstats
path: root/net/rxrpc/peer_event.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2018-10-08 16:46:25 +0200
committerDavid Howells <dhowells@redhat.com>2018-10-08 23:42:04 +0200
commitc1e15b4944c9fa7fbbb74f7a5920a1e31b4b965a (patch)
treeedb403ae1626e2149adbfac364530b4ced20cc2e /net/rxrpc/peer_event.c
parentrxrpc: Fix the rxrpc_tx_packet trace line (diff)
downloadlinux-c1e15b4944c9fa7fbbb74f7a5920a1e31b4b965a.tar.xz
linux-c1e15b4944c9fa7fbbb74f7a5920a1e31b4b965a.zip
rxrpc: Fix the packet reception routine
The rxrpc_input_packet() function and its call tree was built around the assumption that data_ready() handler called from UDP to inform a kernel service that there is data to be had was non-reentrant. This means that certain locking could be dispensed with. This, however, turns out not to be the case with a multi-queue network card that can deliver packets to multiple cpus simultaneously. Each of those cpus can be in the rxrpc_input_packet() function at the same time. Fix by adding or changing some structure members: (1) Add peer->rtt_input_lock to serialise access to the RTT buffer. (2) Make conn->service_id into a 32-bit variable so that it can be cmpxchg'd on all arches. (3) Add call->input_lock to serialise access to the Rx/Tx state. Note that although the Rx and Tx states are (almost) entirely separate, there's no point completing the separation and having separate locks since it's a bi-phasal RPC protocol rather than a bi-direction streaming protocol. Data transmission and data reception do not take place simultaneously on any particular call. and making the following functional changes: (1) In rxrpc_input_data(), hold call->input_lock around the core to prevent simultaneous producing of packets into the Rx ring and updating of tracking state for a particular call. (2) In rxrpc_input_ping_response(), only read call->ping_serial once, and check it before checking RXRPC_CALL_PINGING as that's a cheaper test. The bit test and bit clear can then be combined. No further locking is needed here. (3) In rxrpc_input_ack(), take call->input_lock after we've parsed much of the ACK packet. The superseded ACK check is then done both before and after the lock is taken. The handing of ackinfo data is split, parsing before the lock is taken and processing with it held. This is keyed on rxMTU being non-zero. Congestion management is also done within the locked section. (4) In rxrpc_input_ackall(), take call->input_lock around the Tx window rotation. The ACKALL packet carries no information and is only really useful after all packets have been transmitted since it's imprecise. (5) In rxrpc_input_implicit_end_call(), we use rx->incoming_lock to prevent calls being simultaneously implicitly ended on two cpus and also to prevent any races with incoming call setup. (6) In rxrpc_input_packet(), use cmpxchg() to effect the service upgrade on a connection. It is only permitted to happen once for a connection. (7) In rxrpc_new_incoming_call(), we have to recheck the routing inside rx->incoming_lock to see if someone else set up the call, connection or peer whilst we were getting there. We can't trust the values from the earlier routing check unless we pin refs on them - which we want to avoid. Further, we need to allow for an incoming call to have its state changed on another CPU between us making it live and us adjusting it because the conn is now in the RXRPC_CONN_SERVICE state. (8) In rxrpc_peer_add_rtt(), take peer->rtt_input_lock around the access to the RTT buffer. Don't need to lock around setting peer->rtt. For reference, the inventory of state-accessing or state-altering functions used by the packet input procedure is: > rxrpc_input_packet() * PACKET CHECKING * ROUTING > rxrpc_post_packet_to_local() > rxrpc_find_connection_rcu() - uses RCU > rxrpc_lookup_peer_rcu() - uses RCU > rxrpc_find_service_conn_rcu() - uses RCU > idr_find() - uses RCU * CONNECTION-LEVEL PROCESSING - Service upgrade - Can only happen once per conn ! Changed to use cmpxchg > rxrpc_post_packet_to_conn() - Setting conn->hi_serial - Probably safe not using locks - Maybe use cmpxchg * CALL-LEVEL PROCESSING > Old-call checking > rxrpc_input_implicit_end_call() > rxrpc_call_completed() > rxrpc_queue_call() ! Need to take rx->incoming_lock > __rxrpc_disconnect_call() > rxrpc_notify_socket() > rxrpc_new_incoming_call() - Uses rx->incoming_lock for the entire process - Might be able to drop this earlier in favour of the call lock > rxrpc_incoming_call() ! Conflicts with rxrpc_input_implicit_end_call() > rxrpc_send_ping() - Don't need locks to check rtt state > rxrpc_propose_ACK * PACKET DISTRIBUTION > rxrpc_input_call_packet() > rxrpc_input_data() * QUEUE DATA PACKET ON CALL > rxrpc_reduce_call_timer() - Uses timer_reduce() ! Needs call->input_lock() > rxrpc_receiving_reply() ! Needs locking around ack state > rxrpc_rotate_tx_window() > rxrpc_end_tx_phase() > rxrpc_proto_abort() > rxrpc_input_dup_data() - Fills the Rx buffer - rxrpc_propose_ACK() - rxrpc_notify_socket() > rxrpc_input_ack() * APPLY ACK PACKET TO CALL AND DISCARD PACKET > rxrpc_input_ping_response() - Probably doesn't need any extra locking ! Need READ_ONCE() on call->ping_serial > rxrpc_input_check_for_lost_ack() - Takes call->lock to consult Tx buffer > rxrpc_peer_add_rtt() ! Needs to take a lock (peer->rtt_input_lock) ! Could perhaps manage with cmpxchg() and xadd() instead > rxrpc_input_requested_ack - Consults Tx buffer ! Probably needs a lock > rxrpc_peer_add_rtt() > rxrpc_propose_ack() > rxrpc_input_ackinfo() - Changes call->tx_winsize ! Use cmpxchg to handle change ! Should perhaps track serial number - Uses peer->lock to record MTU specification changes > rxrpc_proto_abort() ! Need to take call->input_lock > rxrpc_rotate_tx_window() > rxrpc_end_tx_phase() > rxrpc_input_soft_acks() - Consults the Tx buffer > rxrpc_congestion_management() - Modifies the Tx annotations ! Needs call->input_lock() > rxrpc_queue_call() > rxrpc_input_abort() * APPLY ABORT PACKET TO CALL AND DISCARD PACKET > rxrpc_set_call_completion() > rxrpc_notify_socket() > rxrpc_input_ackall() * APPLY ACKALL PACKET TO CALL AND DISCARD PACKET ! Need to take call->input_lock > rxrpc_rotate_tx_window() > rxrpc_end_tx_phase() > rxrpc_reject_packet() There are some functions used by the above that queue the packet, after which the procedure is terminated: - rxrpc_post_packet_to_local() - local->event_queue is an sk_buff_head - local->processor is a work_struct - rxrpc_post_packet_to_conn() - conn->rx_queue is an sk_buff_head - conn->processor is a work_struct - rxrpc_reject_packet() - local->reject_queue is an sk_buff_head - local->processor is a work_struct And some that offload processing to process context: - rxrpc_notify_socket() - Uses RCU lock - Uses call->notify_lock to call call->notify_rx - Uses call->recvmsg_lock to queue recvmsg side - rxrpc_queue_call() - call->processor is a work_struct - rxrpc_propose_ACK() - Uses call->lock to wrap __rxrpc_propose_ACK() And a bunch that complete a call, all of which use call->state_lock to protect the call state: - rxrpc_call_completed() - rxrpc_set_call_completion() - rxrpc_abort_call() - rxrpc_proto_abort() - Also uses rxrpc_queue_call() Fixes: 17926a79320a ("[AF_RXRPC]: Provide secure RxRPC sockets for use by userspace and kernel both") Signed-off-by: David Howells <dhowells@redhat.com>
Diffstat (limited to 'net/rxrpc/peer_event.c')
-rw-r--r--net/rxrpc/peer_event.c5
1 files changed, 5 insertions, 0 deletions
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index f3e6fc670da2..05b51bdbdd41 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -301,6 +301,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
if (rtt < 0)
return;
+ spin_lock(&peer->rtt_input_lock);
+
/* Replace the oldest datum in the RTT buffer */
sum -= peer->rtt_cache[cursor];
sum += rtt;
@@ -312,6 +314,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
peer->rtt_usage = usage;
}
+ spin_unlock(&peer->rtt_input_lock);
+
/* Now recalculate the average */
if (usage == RXRPC_RTT_CACHE_SIZE) {
avg = sum / RXRPC_RTT_CACHE_SIZE;
@@ -320,6 +324,7 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
do_div(avg, usage);
}
+ /* Don't need to update this under lock */
peer->rtt = avg;
trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial, rtt,
usage, avg);