SUNRPC: Ensure our task is notified when an rpcbind call is done
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Mon, 7 Jul 2008 16:18:53 +0000 (12:18 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Wed, 9 Jul 2008 16:09:45 +0000 (12:09 -0400)
If another task is busy in rpcb_getport_async number, it is more efficient
to have it wake us up when it has finished instead of arbitrarily sleeping
for 5 seconds.

Also ensure that rpcb_wake_rpcbind_waiters() is called regardless of
whether or not rpcb_getport_done() gets called.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
net/sunrpc/clnt.c
net/sunrpc/rpcb_clnt.c

index 09631f6..76739e9 100644 (file)
@@ -942,11 +942,9 @@ call_bind_status(struct rpc_task *task)
        }
 
        switch (task->tk_status) {
-       case -EAGAIN:
-               dprintk("RPC: %5u rpcbind waiting for another request "
-                               "to finish\n", task->tk_pid);
-               /* avoid busy-waiting here -- could be a network outage. */
-               rpc_delay(task, 5*HZ);
+       case -ENOMEM:
+               dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
+               rpc_delay(task, HZ >> 2);
                goto retry_timeout;
        case -EACCES:
                dprintk("RPC: %5u remote rpcbind: RPC program/version "
index c62e446..24e93e0 100644 (file)
@@ -68,6 +68,7 @@ enum {
 #define RPCB_MAXOWNERLEN       sizeof(RPCB_OWNER_STRING)
 
 static void                    rpcb_getport_done(struct rpc_task *, void *);
+static void                    rpcb_map_release(void *data);
 static struct rpc_program      rpcb_program;
 
 struct rpcbind_args {
@@ -80,6 +81,8 @@ struct rpcbind_args {
        const char *            r_netid;
        const char *            r_addr;
        const char *            r_owner;
+
+       int                     r_status;
 };
 
 static struct rpc_procinfo rpcb_procedures2[];
@@ -93,14 +96,6 @@ struct rpcb_info {
 static struct rpcb_info rpcb_next_version[];
 static struct rpcb_info rpcb_next_version6[];
 
-static void rpcb_map_release(void *data)
-{
-       struct rpcbind_args *map = data;
-
-       xprt_put(map->r_xprt);
-       kfree(map);
-}
-
 static const struct rpc_call_ops rpcb_getport_ops = {
        .rpc_call_done          = rpcb_getport_done,
        .rpc_release            = rpcb_map_release,
@@ -112,6 +107,15 @@ static void rpcb_wake_rpcbind_waiters(struct rpc_xprt *xprt, int status)
        rpc_wake_up_status(&xprt->binding, status);
 }
 
+static void rpcb_map_release(void *data)
+{
+       struct rpcbind_args *map = data;
+
+       rpcb_wake_rpcbind_waiters(map->r_xprt, map->r_status);
+       xprt_put(map->r_xprt);
+       kfree(map);
+}
+
 static struct rpc_clnt *rpcb_create(char *hostname, struct sockaddr *srvaddr,
                                    size_t salen, int proto, u32 version,
                                    int privileged)
@@ -293,17 +297,16 @@ void rpcb_getport_async(struct rpc_task *task)
        /* Autobind on cloned rpc clients is discouraged */
        BUG_ON(clnt->cl_parent != clnt);
 
+       /* Put self on the wait queue to ensure we get notified if
+        * some other task is already attempting to bind the port */
+       rpc_sleep_on(&xprt->binding, task, NULL);
+
        if (xprt_test_and_set_binding(xprt)) {
-               status = -EAGAIN;       /* tell caller to check again */
                dprintk("RPC: %5u %s: waiting for another binder\n",
                        task->tk_pid, __func__);
-               goto bailout_nowake;
+               return;
        }
 
-       /* Put self on queue before sending rpcbind request, in case
-        * rpcb_getport_done completes before we return from rpc_run_task */
-       rpc_sleep_on(&xprt->binding, task, NULL);
-
        /* Someone else may have bound if we slept */
        if (xprt_bound(xprt)) {
                status = 0;
@@ -365,15 +368,15 @@ void rpcb_getport_async(struct rpc_task *task)
        map->r_netid = rpc_peeraddr2str(clnt, RPC_DISPLAY_NETID);
        map->r_addr = rpc_peeraddr2str(rpcb_clnt, RPC_DISPLAY_UNIVERSAL_ADDR);
        map->r_owner = RPCB_OWNER_STRING;       /* ignored for GETADDR */
+       map->r_status = -EIO;
 
        child = rpcb_call_async(rpcb_clnt, map, proc);
        rpc_release_client(rpcb_clnt);
        if (IS_ERR(child)) {
-               status = -EIO;
                /* rpcb_map_release() has freed the arguments */
                dprintk("RPC: %5u %s: rpc_run_task failed\n",
                        task->tk_pid, __func__);
-               goto bailout_nofree;
+               return;
        }
        rpc_put_task(child);
 
@@ -382,7 +385,6 @@ void rpcb_getport_async(struct rpc_task *task)
 
 bailout_nofree:
        rpcb_wake_rpcbind_waiters(xprt, status);
-bailout_nowake:
        task->tk_status = status;
 }
 EXPORT_SYMBOL_GPL(rpcb_getport_async);
@@ -421,7 +423,7 @@ static void rpcb_getport_done(struct rpc_task *child, void *data)
        dprintk("RPC: %5u rpcb_getport_done(status %d, port %u)\n",
                        child->tk_pid, status, map->r_port);
 
-       rpcb_wake_rpcbind_waiters(xprt, status);
+       map->r_status = status;
 }
 
 static int rpcb_encode_mapping(struct rpc_rqst *req, __be32 *p,