summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2020-12-08 19:14:15 +0100
committerChuck Lever <chuck.lever@oracle.com>2021-01-25 15:36:28 +0100
commit43042b90cae11cc2d9827c91df6d6b5fe498d5ce (patch)
tree4d75270b9c14d57db51832ac9d5ff0a8f2959185
parentsvcrdma: Deprecate stat variables that are no longer used (diff)
downloadlinux-43042b90cae11cc2d9827c91df6d6b5fe498d5ce.tar.xz
linux-43042b90cae11cc2d9827c91df6d6b5fe498d5ce.zip
svcrdma: Reduce Receive doorbell rate
This is similar to commit e340c2d6ef2a ("xprtrdma: Reduce the doorbell rate (Receive)") which added Receive batching to the client. Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
-rw-r--r--include/linux/sunrpc/svc_rdma.h1
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c82
2 files changed, 44 insertions, 39 deletions
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 1e76ed688044..7c693b31965e 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -104,6 +104,7 @@ struct svcxprt_rdma {
wait_queue_head_t sc_send_wait; /* SQ exhaustion waitlist */
unsigned long sc_flags;
+ u32 sc_pending_recvs;
struct list_head sc_read_complete_q;
struct work_struct sc_work;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 7d14a74df716..ab0b7e9777bc 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -266,33 +266,46 @@ void svc_rdma_release_rqst(struct svc_rqst *rqstp)
svc_rdma_recv_ctxt_put(rdma, ctxt);
}
-static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma,
- struct svc_rdma_recv_ctxt *ctxt)
+static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma,
+ unsigned int wanted, bool temp)
{
+ const struct ib_recv_wr *bad_wr = NULL;
+ struct svc_rdma_recv_ctxt *ctxt;
+ struct ib_recv_wr *recv_chain;
int ret;
- trace_svcrdma_post_recv(ctxt);
- ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, NULL);
+ recv_chain = NULL;
+ while (wanted--) {
+ ctxt = svc_rdma_recv_ctxt_get(rdma);
+ if (!ctxt)
+ break;
+
+ trace_svcrdma_post_recv(ctxt);
+ ctxt->rc_temp = temp;
+ ctxt->rc_recv_wr.next = recv_chain;
+ recv_chain = &ctxt->rc_recv_wr;
+ rdma->sc_pending_recvs++;
+ }
+ if (!recv_chain)
+ return false;
+
+ ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr);
if (ret)
goto err_post;
- return 0;
+ return true;
err_post:
- trace_svcrdma_rq_post_err(rdma, ret);
- svc_rdma_recv_ctxt_put(rdma, ctxt);
- return ret;
-}
-
-static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
-{
- struct svc_rdma_recv_ctxt *ctxt;
+ while (bad_wr) {
+ ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
+ rc_recv_wr);
+ bad_wr = bad_wr->next;
+ svc_rdma_recv_ctxt_put(rdma, ctxt);
+ }
- if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
- return 0;
- ctxt = svc_rdma_recv_ctxt_get(rdma);
- if (!ctxt)
- return -ENOMEM;
- return __svc_rdma_post_recv(rdma, ctxt);
+ trace_svcrdma_rq_post_err(rdma, ret);
+ /* Since we're destroying the xprt, no need to reset
+ * sc_pending_recvs. */
+ return false;
}
/**
@@ -303,20 +316,7 @@ static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
*/
bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
{
- struct svc_rdma_recv_ctxt *ctxt;
- unsigned int i;
- int ret;
-
- for (i = 0; i < rdma->sc_max_requests; i++) {
- ctxt = svc_rdma_recv_ctxt_get(rdma);
- if (!ctxt)
- return false;
- ctxt->rc_temp = true;
- ret = __svc_rdma_post_recv(rdma, ctxt);
- if (ret)
- return false;
- }
- return true;
+ return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests, true);
}
/**
@@ -324,8 +324,6 @@ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
* @cq: Completion Queue context
* @wc: Work Completion object
*
- * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
- * the Receive completion handler could be running.
*/
static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{
@@ -333,6 +331,8 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_recv_ctxt *ctxt;
+ rdma->sc_pending_recvs--;
+
/* WARNING: Only wc->wr_cqe and wc->status are reliable */
ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
@@ -340,9 +340,6 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
if (wc->status != IB_WC_SUCCESS)
goto flushed;
- if (svc_rdma_post_recv(rdma))
- goto post_err;
-
/* All wc fields are now known to be valid */
ctxt->rc_byte_len = wc->byte_len;
ib_dma_sync_single_for_cpu(rdma->sc_pd->device,
@@ -356,11 +353,18 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
spin_unlock(&rdma->sc_rq_dto_lock);
if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
svc_xprt_enqueue(&rdma->sc_xprt);
+
+ if (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) &&
+ rdma->sc_pending_recvs < rdma->sc_max_requests)
+ if (!svc_rdma_refresh_recvs(rdma, RPCRDMA_MAX_RECV_BATCH,
+ false))
+ goto post_err;
+
return;
flushed:
-post_err:
svc_rdma_recv_ctxt_put(rdma, ctxt);
+post_err:
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_xprt_enqueue(&rdma->sc_xprt);
}