Currently, XPT_BUSY is not cleared until xpo_recvfrom returns.
That effectively blocks the receipt and handling of the next RPC
message until the current one has been taken off the transport.
This strict ordering is a requirement for socket transports.
For our kernel RPC/RDMA transport implementation, however, dequeuing
an ingress message is nothing more than a list_del(). The transport
can safely be marked un-busy as soon as that is done.
To keep the changes simpler, this patch just moves the
svc_xprt_received() call site from svc_handle_xprt() into the
transports, so that the actual optimization can be done in a
subsequent patch.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
newxpt->xpt_cred = get_cred(xprt->xpt_cred);
svc_add_new_temp_xprt(serv, newxpt);
trace_svc_xprt_accept(newxpt, serv->sv_name);
- } else
+ } else {
module_put(xprt->xpt_class->xcl_owner);
+ }
+ svc_xprt_received(xprt);
} else if (svc_xprt_reserve_slot(rqstp, xprt)) {
/* XPT_DATA|XPT_DEFERRED case: */
dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
rqstp->rq_reserved = serv->sv_max_mesg;
atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
}
- /* clear XPT_BUSY: */
- svc_xprt_received(xprt);
out:
trace_svc_handle_xprt(xprt, len);
return len;
rqstp->rq_xprt_hlen = dr->xprt_hlen;
rqstp->rq_daddr = dr->daddr;
rqstp->rq_respages = rqstp->rq_pages;
+ svc_xprt_received(rqstp->rq_xprt);
return (dr->argslen<<2) - dr->xprt_hlen;
}
if (serv->sv_stats)
serv->sv_stats->netudpcnt++;
+ svc_xprt_received(rqstp->rq_xprt);
return len;
out_recv_err:
set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
}
trace_svcsock_udp_recv_err(&svsk->sk_xprt, err);
- return 0;
+ goto out_clear_busy;
out_cmsg_err:
net_warn_ratelimited("svc: received unknown control message %d/%d; dropping RPC reply datagram\n",
cmh->cmsg_level, cmh->cmsg_type);
local_bh_enable();
out_free:
kfree_skb(skb);
+out_clear_busy:
+ svc_xprt_received(rqstp->rq_xprt);
return 0;
}
if (serv->sv_stats)
serv->sv_stats->nettcpcnt++;
+ svc_xprt_received(rqstp->rq_xprt);
return rqstp->rq_arg.len;
err_incomplete:
if (len != -EAGAIN)
goto err_delete;
trace_svcsock_tcp_recv_eagain(&svsk->sk_xprt, 0);
- return 0;
+ goto err_noclose;
err_nuts:
svsk->sk_datalen = 0;
err_delete:
trace_svcsock_tcp_recv_err(&svsk->sk_xprt, len);
svc_xprt_deferred_close(&svsk->sk_xprt);
err_noclose:
+ svc_xprt_received(rqstp->rq_xprt);
return 0; /* record not complete */
}
/* No new incoming requests, terminate the loop */
clear_bit(XPT_DATA, &xprt->xpt_flags);
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
+ svc_xprt_received(xprt);
return 0;
}
list_del(&ctxt->rc_list);
rqstp->rq_xprt_ctxt = ctxt;
rqstp->rq_prot = IPPROTO_MAX;
svc_xprt_copy_addrs(rqstp, xprt);
+ svc_xprt_received(xprt);
return rqstp->rq_arg.len;
out_readlist:
ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
if (ret < 0)
goto out_readfail;
+ svc_xprt_received(xprt);
return 0;
out_err:
svc_rdma_send_error(rdma_xprt, ctxt, ret);
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
+ svc_xprt_received(xprt);
return 0;
out_readfail:
if (ret == -EINVAL)
svc_rdma_send_error(rdma_xprt, ctxt, ret);
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
+ svc_xprt_received(xprt);
return ret;
out_backchannel:
svc_rdma_handle_bc_reply(rqstp, ctxt);
out_drop:
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
+ svc_xprt_received(xprt);
return 0;
}