diff options
author | David Howells <dhowells@redhat.com> | 2022-08-27 15:27:56 +0200 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2022-11-08 17:42:28 +0100 |
commit | 5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb (patch) | |
tree | bb10e432f005b9988c542d079a1c6f0200b4fc64 /net/rxrpc/input.c | |
parent | rxrpc: Clone received jumbo subpackets and queue separately (diff) | |
download | linux-5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb.tar.xz linux-5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb.zip |
rxrpc: Get rid of the Rx ring
Get rid of the Rx ring and replace it with a pair of queues instead. One
queue gets the packets that are in-sequence and are ready for processing by
recvmsg(); the other queue gets the out-of-sequence packets for addition to
the first queue as the holes get filled.
The annotation ring is removed and replaced with a SACK table. The SACK
table has the bits set that correspond exactly to the sequence number of
the packet being acked. The SACK ring is copied when an ACK packet is
being assembled and rotated so that the first ACK is in byte 0.
Flow control handling is altered so that packets that are moved to the
in-sequence queue are hard-ACK'd even before they're consumed - and then
the Rx window size in the ACK packet (rsize) is shrunk down to compensate
(even going to 0 if the window is full).
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/input.c')
-rw-r--r-- | net/rxrpc/input.c | 213 |
1 files changed, 131 insertions, 82 deletions
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 0e7545ed0128..947e7196e2be 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -312,18 +312,43 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call) return rxrpc_end_tx_phase(call, true, "ETD"); } +static void rxrpc_input_update_ack_window(struct rxrpc_call *call, + rxrpc_seq_t window, rxrpc_seq_t wtop) +{ + atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window); +} + /* - * Process a DATA packet, adding the packet to the Rx ring. The caller's - * packet ref must be passed on or discarded. + * Push a DATA packet onto the Rx queue. + */ +static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb, + rxrpc_seq_t window, rxrpc_seq_t wtop, + enum rxrpc_receive_trace why) +{ + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + bool last = sp->hdr.flags & RXRPC_LAST_PACKET; + + __skb_queue_tail(&call->recvmsg_queue, skb); + rxrpc_input_update_ack_window(call, window, wtop); + + trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq); +} + +/* + * Process a DATA packet. */ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + struct sk_buff *oos; rxrpc_serial_t serial = sp->hdr.serial; - rxrpc_seq_t seq = sp->hdr.seq, hard_ack; - unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK; + u64 win = atomic64_read(&call->ackr_window); + rxrpc_seq_t window = lower_32_bits(win); + rxrpc_seq_t wtop = upper_32_bits(win); + rxrpc_seq_t wlimit = window + call->rx_winsize - 1; + rxrpc_seq_t seq = sp->hdr.seq; bool last = sp->hdr.flags & RXRPC_LAST_PACKET; - bool acked = false; + int ack_reason = -1; rxrpc_inc_stat(call->rxnet, stat_rx_data); if (sp->hdr.flags & RXRPC_REQUEST_ACK) @@ -331,112 +356,135 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb) if (sp->hdr.flags & RXRPC_JUMBO_PACKET) rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo); - hard_ack = READ_ONCE(call->rx_hard_ack); - - _proto("Rx DATA %%%u { #%x l=%u }", serial, seq, last); - if (last) { - if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && - seq != call->rx_top) { + if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) && + seq + 1 != wtop) { rxrpc_proto_abort("LSN", call, seq); - goto out; + goto err_free; } } else { if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && - after_eq(seq, call->rx_top)) { + after_eq(seq, wtop)) { + pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n", + call->debug_id, seq, window, wtop, wlimit); rxrpc_proto_abort("LSA", call, seq); - goto out; + goto err_free; } } + if (after(seq, call->rx_highest_seq)) + call->rx_highest_seq = seq; + trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags); - if (before_eq(seq, hard_ack)) { - rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial, - rxrpc_propose_ack_input_data); - goto out; + if (before(seq, window)) { + ack_reason = RXRPC_ACK_DUPLICATE; + goto send_ack; } - - if (call->rxtx_buffer[ix]) { - rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial, - rxrpc_propose_ack_input_data); - goto out; + if (after(seq, wlimit)) { + ack_reason = RXRPC_ACK_EXCEEDS_WINDOW; + goto send_ack; } - if (after(seq, hard_ack + call->rx_winsize)) { - rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial, - rxrpc_propose_ack_input_data); - goto out; - } + /* Queue the packet. */ + if (seq == window) { + rxrpc_seq_t reset_from; + bool reset_sack = false; - if (sp->hdr.flags & RXRPC_REQUEST_ACK) { - rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial, - rxrpc_propose_ack_input_data); - acked = true; - } + if (sp->hdr.flags & RXRPC_REQUEST_ACK) + ack_reason = RXRPC_ACK_REQUESTED; + /* Send an immediate ACK if we fill in a hole */ + else if (!skb_queue_empty(&call->rx_oos_queue)) + ack_reason = RXRPC_ACK_DELAY; - if (after(seq, call->ackr_highest_seq)) - call->ackr_highest_seq = seq; + window++; + if (after(window, wtop)) + wtop = window; - /* Queue the packet. We use a couple of memory barriers here as need - * to make sure that rx_top is perceived to be set after the buffer - * pointer and that the buffer pointer is set after the annotation and - * the skb data. - * - * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window() - * and also rxrpc_fill_out_ack(). - */ - call->rxtx_annotations[ix] = 1; - smp_wmb(); - call->rxtx_buffer[ix] = skb; - if (after(seq, call->rx_top)) { - smp_store_release(&call->rx_top, seq); - } else if (before(seq, call->rx_top)) { - /* Send an immediate ACK if we fill in a hole */ - if (!acked) { - rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial, - rxrpc_propose_ack_input_data_hole); - acked = true; + spin_lock(&call->recvmsg_queue.lock); + rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue); + skb = NULL; + + while ((oos = skb_peek(&call->rx_oos_queue))) { + struct rxrpc_skb_priv *osp = rxrpc_skb(oos); + + if (after(osp->hdr.seq, window)) + break; + + __skb_unlink(oos, &call->rx_oos_queue); + last = osp->hdr.flags & RXRPC_LAST_PACKET; + seq = osp->hdr.seq; + if (!reset_sack) { + reset_from = seq; + reset_sack = true; + } + + window++; + rxrpc_input_queue_data(call, oos, window, wtop, + rxrpc_receive_queue_oos); } - } - /* From this point on, we're not allowed to touch the packet any longer - * as its ref now belongs to the Rx ring. - */ - skb = NULL; - sp = NULL; + spin_unlock(&call->recvmsg_queue.lock); - if (last) { - set_bit(RXRPC_CALL_RX_LAST, &call->flags); - trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq); + if (reset_sack) { + do { + call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0; + } while (reset_from++, before(reset_from, window)); + } } else { - trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq); - } + bool keep = false; + + ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE; + + if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) { + call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1; + keep = 1; + } + + if (after(seq + 1, wtop)) { + wtop = seq + 1; + rxrpc_input_update_ack_window(call, window, wtop); + } + + if (!keep) { + ack_reason = RXRPC_ACK_DUPLICATE; + goto send_ack; + } + + skb_queue_walk(&call->rx_oos_queue, oos) { + struct rxrpc_skb_priv *osp = rxrpc_skb(oos); - if (after_eq(seq, call->rx_expect_next)) { - if (after(seq, call->rx_expect_next)) { - _net("OOS %u > %u", seq, call->rx_expect_next); - rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial, - rxrpc_propose_ack_input_data); - acked = true; + if (after(osp->hdr.seq, seq)) { + __skb_queue_before(&call->rx_oos_queue, oos, skb); + goto oos_queued; + } } - call->rx_expect_next = seq + 1; + + __skb_queue_tail(&call->rx_oos_queue, skb); + oos_queued: + trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos, + sp->hdr.serial, sp->hdr.seq); + skb = NULL; } -out: - if (!acked && - atomic_inc_return(&call->ackr_nr_unacked) > 2) - rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial, +send_ack: + if (ack_reason < 0 && + atomic_inc_return(&call->ackr_nr_unacked) > 2 && + test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) { + ack_reason = RXRPC_ACK_IDLE; + } else if (ack_reason >= 0) { + set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags); + } + + if (ack_reason >= 0) + rxrpc_send_ACK(call, ack_reason, serial, rxrpc_propose_ack_input_data); else rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_input_data); - trace_rxrpc_notify_socket(call->debug_id, serial); - rxrpc_notify_socket(call); - +err_free: rxrpc_free_skb(skb, rxrpc_skb_freed); - _leave(" [queued]"); } /* @@ -498,8 +546,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) rxrpc_serial_t serial = sp->hdr.serial; rxrpc_seq_t seq0 = sp->hdr.seq; - _enter("{%u,%u},{%u,%u}", - call->rx_hard_ack, call->rx_top, skb->len, seq0); + _enter("{%llx,%x},{%u,%x}", + atomic64_read(&call->ackr_window), call->rx_highest_seq, + skb->len, seq0); _proto("Rx DATA %%%u { #%u f=%02x }", sp->hdr.serial, seq0, sp->hdr.flags); |