From 95503d295ad6af20f09efff193e085481a962fd2 Mon Sep 17 00:00:00 2001 From: "J. Bruce Fields" Date: Fri, 11 Jan 2019 15:36:40 -0500 Subject: [PATCH] svcrpc: fix unlikely races preventing queueing of sockets In the rpc server, When something happens that might be reason to wake up a thread to do something, what we do is - modify xpt_flags, sk_sock->flags, xpt_reserved, or xpt_nr_rqsts to indicate the new situation - call svc_xprt_enqueue() to decide whether to wake up a thread. svc_xprt_enqueue may require multiple conditions to be true before queueing up a thread to handle the xprt. In the SMP case, one of the other CPU's may have set another required condition, and in that case, although both CPUs run svc_xprt_enqueue(), it's possible that neither call sees the writes done by the other CPU in time, and neither one recognizes that all the required conditions have been set. A socket could therefore be ignored indefinitely. Add memory barries to ensure that any svc_xprt_enqueue() call will always see the conditions changed by other CPUs before deciding to ignore a socket. I've never seen this race reported. In the unlikely event it happens, another event will usually come along and the problem will fix itself. So I don't think this is worth backporting to stable. Chuck tried this patch and said "I don't see any performance regressions, but my server has only a single last-level CPU cache." Tested-by: Chuck Lever Signed-off-by: J. Bruce Fields --- net/sunrpc/svc_xprt.c | 12 +++++++++++- net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 3 ++- net/sunrpc/xprtrdma/svc_rdma_rw.c | 3 ++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index a2435d3811a9..61530b1b7754 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -357,6 +357,7 @@ static void svc_xprt_release_slot(struct svc_rqst *rqstp) struct svc_xprt *xprt = rqstp->rq_xprt; if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) { atomic_dec(&xprt->xpt_nr_rqsts); + smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */ svc_xprt_enqueue(xprt); } } @@ -365,6 +366,15 @@ static bool svc_xprt_ready(struct svc_xprt *xprt) { unsigned long xpt_flags; + /* + * If another cpu has recently updated xpt_flags, + * sk_sock->flags, xpt_reserved, or xpt_nr_rqsts, we need to + * know about it; otherwise it's possible that both that cpu and + * this one could call svc_xprt_enqueue() without either + * svc_xprt_enqueue() recognizing that the conditions below + * are satisfied, and we could stall indefinitely: + */ + smp_rmb(); xpt_flags = READ_ONCE(xprt->xpt_flags); if (xpt_flags & (BIT(XPT_CONN) | BIT(XPT_CLOSE))) @@ -479,7 +489,7 @@ void svc_reserve(struct svc_rqst *rqstp, int space) if (xprt && space < rqstp->rq_reserved) { atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); rqstp->rq_reserved = space; - + smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */ svc_xprt_enqueue(xprt); } } diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 828b149eaaef..3ebb158c2279 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -314,8 +314,9 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) spin_lock(&rdma->sc_rq_dto_lock); list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q); - spin_unlock(&rdma->sc_rq_dto_lock); + /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); + spin_unlock(&rdma->sc_rq_dto_lock); if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags)) svc_xprt_enqueue(&rdma->sc_xprt); goto out; diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index dc1951759a8e..c35753691960 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -287,9 +287,10 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) spin_lock(&rdma->sc_rq_dto_lock); list_add_tail(&info->ri_readctxt->rc_list, &rdma->sc_read_complete_q); + /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */ + set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); spin_unlock(&rdma->sc_rq_dto_lock); - set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); svc_xprt_enqueue(&rdma->sc_xprt); }