SUNRPC: Add RPC client support for the RPC_AUTH_TLS auth flavor
[platform/kernel/linux-starfive.git] / net / sunrpc / clnt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -   RPC header generation and argument serialization.
10  *  -   Credential refresh.
11  *  -   TCP connect handling.
12  *  -   Retry of operation when it is suspected the operation failed because
13  *      of uid squashing on the server, or when the credentials were stale
14  *      and need to be refreshed, or when a packet was damaged in transit.
15  *      This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20
21
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY        RPCDBG_CALL
49 #endif
50
51 /*
52  * All RPC clients are linked into this list
53  */
54
55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56
57
58 static void     call_start(struct rpc_task *task);
59 static void     call_reserve(struct rpc_task *task);
60 static void     call_reserveresult(struct rpc_task *task);
61 static void     call_allocate(struct rpc_task *task);
62 static void     call_encode(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 static void     call_status(struct rpc_task *task);
68 static void     call_transmit_status(struct rpc_task *task);
69 static void     call_refresh(struct rpc_task *task);
70 static void     call_refreshresult(struct rpc_task *task);
71 static void     call_connect(struct rpc_task *task);
72 static void     call_connect_status(struct rpc_task *task);
73
74 static int      rpc_encode_header(struct rpc_task *task,
75                                   struct xdr_stream *xdr);
76 static int      rpc_decode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_ping(struct rpc_clnt *clnt);
79 static int      rpc_ping_noreply(struct rpc_clnt *clnt);
80 static void     rpc_check_timeout(struct rpc_task *task);
81
82 static void rpc_register_client(struct rpc_clnt *clnt)
83 {
84         struct net *net = rpc_net_ns(clnt);
85         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
86
87         spin_lock(&sn->rpc_client_lock);
88         list_add(&clnt->cl_clients, &sn->all_clients);
89         spin_unlock(&sn->rpc_client_lock);
90 }
91
92 static void rpc_unregister_client(struct rpc_clnt *clnt)
93 {
94         struct net *net = rpc_net_ns(clnt);
95         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
96
97         spin_lock(&sn->rpc_client_lock);
98         list_del(&clnt->cl_clients);
99         spin_unlock(&sn->rpc_client_lock);
100 }
101
102 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103 {
104         rpc_remove_client_dir(clnt);
105 }
106
107 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
108 {
109         struct net *net = rpc_net_ns(clnt);
110         struct super_block *pipefs_sb;
111
112         pipefs_sb = rpc_get_sb_net(net);
113         if (pipefs_sb) {
114                 __rpc_clnt_remove_pipedir(clnt);
115                 rpc_put_sb_net(net);
116         }
117 }
118
119 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
120                                     struct rpc_clnt *clnt)
121 {
122         static uint32_t clntid;
123         const char *dir_name = clnt->cl_program->pipe_dir_name;
124         char name[15];
125         struct dentry *dir, *dentry;
126
127         dir = rpc_d_lookup_sb(sb, dir_name);
128         if (dir == NULL) {
129                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
130                 return dir;
131         }
132         for (;;) {
133                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
134                 name[sizeof(name) - 1] = '\0';
135                 dentry = rpc_create_client_dir(dir, name, clnt);
136                 if (!IS_ERR(dentry))
137                         break;
138                 if (dentry == ERR_PTR(-EEXIST))
139                         continue;
140                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
141                                 " %s/%s, error %ld\n",
142                                 dir_name, name, PTR_ERR(dentry));
143                 break;
144         }
145         dput(dir);
146         return dentry;
147 }
148
149 static int
150 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
151 {
152         struct dentry *dentry;
153
154         if (clnt->cl_program->pipe_dir_name != NULL) {
155                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
156                 if (IS_ERR(dentry))
157                         return PTR_ERR(dentry);
158         }
159         return 0;
160 }
161
162 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
163 {
164         if (clnt->cl_program->pipe_dir_name == NULL)
165                 return 1;
166
167         switch (event) {
168         case RPC_PIPEFS_MOUNT:
169                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
170                         return 1;
171                 if (refcount_read(&clnt->cl_count) == 0)
172                         return 1;
173                 break;
174         case RPC_PIPEFS_UMOUNT:
175                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
176                         return 1;
177                 break;
178         }
179         return 0;
180 }
181
182 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
183                                    struct super_block *sb)
184 {
185         struct dentry *dentry;
186
187         switch (event) {
188         case RPC_PIPEFS_MOUNT:
189                 dentry = rpc_setup_pipedir_sb(sb, clnt);
190                 if (!dentry)
191                         return -ENOENT;
192                 if (IS_ERR(dentry))
193                         return PTR_ERR(dentry);
194                 break;
195         case RPC_PIPEFS_UMOUNT:
196                 __rpc_clnt_remove_pipedir(clnt);
197                 break;
198         default:
199                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
200                 return -ENOTSUPP;
201         }
202         return 0;
203 }
204
205 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
206                                 struct super_block *sb)
207 {
208         int error = 0;
209
210         for (;; clnt = clnt->cl_parent) {
211                 if (!rpc_clnt_skip_event(clnt, event))
212                         error = __rpc_clnt_handle_event(clnt, event, sb);
213                 if (error || clnt == clnt->cl_parent)
214                         break;
215         }
216         return error;
217 }
218
219 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
220 {
221         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
222         struct rpc_clnt *clnt;
223
224         spin_lock(&sn->rpc_client_lock);
225         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
226                 if (rpc_clnt_skip_event(clnt, event))
227                         continue;
228                 spin_unlock(&sn->rpc_client_lock);
229                 return clnt;
230         }
231         spin_unlock(&sn->rpc_client_lock);
232         return NULL;
233 }
234
235 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
236                             void *ptr)
237 {
238         struct super_block *sb = ptr;
239         struct rpc_clnt *clnt;
240         int error = 0;
241
242         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
243                 error = __rpc_pipefs_event(clnt, event, sb);
244                 if (error)
245                         break;
246         }
247         return error;
248 }
249
250 static struct notifier_block rpc_clients_block = {
251         .notifier_call  = rpc_pipefs_event,
252         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
253 };
254
255 int rpc_clients_notifier_register(void)
256 {
257         return rpc_pipefs_notifier_register(&rpc_clients_block);
258 }
259
260 void rpc_clients_notifier_unregister(void)
261 {
262         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
263 }
264
265 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
266                 struct rpc_xprt *xprt,
267                 const struct rpc_timeout *timeout)
268 {
269         struct rpc_xprt *old;
270
271         spin_lock(&clnt->cl_lock);
272         old = rcu_dereference_protected(clnt->cl_xprt,
273                         lockdep_is_held(&clnt->cl_lock));
274
275         if (!xprt_bound(xprt))
276                 clnt->cl_autobind = 1;
277
278         clnt->cl_timeout = timeout;
279         rcu_assign_pointer(clnt->cl_xprt, xprt);
280         spin_unlock(&clnt->cl_lock);
281
282         return old;
283 }
284
285 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
286 {
287         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
288                         nodename, sizeof(clnt->cl_nodename));
289 }
290
291 static int rpc_client_register(struct rpc_clnt *clnt,
292                                rpc_authflavor_t pseudoflavor,
293                                const char *client_name)
294 {
295         struct rpc_auth_create_args auth_args = {
296                 .pseudoflavor = pseudoflavor,
297                 .target_name = client_name,
298         };
299         struct rpc_auth *auth;
300         struct net *net = rpc_net_ns(clnt);
301         struct super_block *pipefs_sb;
302         int err;
303
304         rpc_clnt_debugfs_register(clnt);
305
306         pipefs_sb = rpc_get_sb_net(net);
307         if (pipefs_sb) {
308                 err = rpc_setup_pipedir(pipefs_sb, clnt);
309                 if (err)
310                         goto out;
311         }
312
313         rpc_register_client(clnt);
314         if (pipefs_sb)
315                 rpc_put_sb_net(net);
316
317         auth = rpcauth_create(&auth_args, clnt);
318         if (IS_ERR(auth)) {
319                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
320                                 pseudoflavor);
321                 err = PTR_ERR(auth);
322                 goto err_auth;
323         }
324         return 0;
325 err_auth:
326         pipefs_sb = rpc_get_sb_net(net);
327         rpc_unregister_client(clnt);
328         __rpc_clnt_remove_pipedir(clnt);
329 out:
330         if (pipefs_sb)
331                 rpc_put_sb_net(net);
332         rpc_sysfs_client_destroy(clnt);
333         rpc_clnt_debugfs_unregister(clnt);
334         return err;
335 }
336
337 static DEFINE_IDA(rpc_clids);
338
339 void rpc_cleanup_clids(void)
340 {
341         ida_destroy(&rpc_clids);
342 }
343
344 static int rpc_alloc_clid(struct rpc_clnt *clnt)
345 {
346         int clid;
347
348         clid = ida_alloc(&rpc_clids, GFP_KERNEL);
349         if (clid < 0)
350                 return clid;
351         clnt->cl_clid = clid;
352         return 0;
353 }
354
355 static void rpc_free_clid(struct rpc_clnt *clnt)
356 {
357         ida_free(&rpc_clids, clnt->cl_clid);
358 }
359
360 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
361                 struct rpc_xprt_switch *xps,
362                 struct rpc_xprt *xprt,
363                 struct rpc_clnt *parent)
364 {
365         const struct rpc_program *program = args->program;
366         const struct rpc_version *version;
367         struct rpc_clnt *clnt = NULL;
368         const struct rpc_timeout *timeout;
369         const char *nodename = args->nodename;
370         int err;
371
372         err = rpciod_up();
373         if (err)
374                 goto out_no_rpciod;
375
376         err = -EINVAL;
377         if (args->version >= program->nrvers)
378                 goto out_err;
379         version = program->version[args->version];
380         if (version == NULL)
381                 goto out_err;
382
383         err = -ENOMEM;
384         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
385         if (!clnt)
386                 goto out_err;
387         clnt->cl_parent = parent ? : clnt;
388         clnt->cl_xprtsec = args->xprtsec;
389
390         err = rpc_alloc_clid(clnt);
391         if (err)
392                 goto out_no_clid;
393
394         clnt->cl_cred     = get_cred(args->cred);
395         clnt->cl_procinfo = version->procs;
396         clnt->cl_maxproc  = version->nrprocs;
397         clnt->cl_prog     = args->prognumber ? : program->number;
398         clnt->cl_vers     = version->number;
399         clnt->cl_stats    = program->stats;
400         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
401         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
402         err = -ENOMEM;
403         if (clnt->cl_metrics == NULL)
404                 goto out_no_stats;
405         clnt->cl_program  = program;
406         INIT_LIST_HEAD(&clnt->cl_tasks);
407         spin_lock_init(&clnt->cl_lock);
408
409         timeout = xprt->timeout;
410         if (args->timeout != NULL) {
411                 memcpy(&clnt->cl_timeout_default, args->timeout,
412                                 sizeof(clnt->cl_timeout_default));
413                 timeout = &clnt->cl_timeout_default;
414         }
415
416         rpc_clnt_set_transport(clnt, xprt, timeout);
417         xprt->main = true;
418         xprt_iter_init(&clnt->cl_xpi, xps);
419         xprt_switch_put(xps);
420
421         clnt->cl_rtt = &clnt->cl_rtt_default;
422         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
423
424         refcount_set(&clnt->cl_count, 1);
425
426         if (nodename == NULL)
427                 nodename = utsname()->nodename;
428         /* save the nodename */
429         rpc_clnt_set_nodename(clnt, nodename);
430
431         rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
432         err = rpc_client_register(clnt, args->authflavor, args->client_name);
433         if (err)
434                 goto out_no_path;
435         if (parent)
436                 refcount_inc(&parent->cl_count);
437
438         trace_rpc_clnt_new(clnt, xprt, args);
439         return clnt;
440
441 out_no_path:
442         rpc_free_iostats(clnt->cl_metrics);
443 out_no_stats:
444         put_cred(clnt->cl_cred);
445         rpc_free_clid(clnt);
446 out_no_clid:
447         kfree(clnt);
448 out_err:
449         rpciod_down();
450 out_no_rpciod:
451         xprt_switch_put(xps);
452         xprt_put(xprt);
453         trace_rpc_clnt_new_err(program->name, args->servername, err);
454         return ERR_PTR(err);
455 }
456
457 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
458                                         struct rpc_xprt *xprt)
459 {
460         struct rpc_clnt *clnt = NULL;
461         struct rpc_xprt_switch *xps;
462
463         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
464                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
465                 xps = args->bc_xprt->xpt_bc_xps;
466                 xprt_switch_get(xps);
467         } else {
468                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
469                 if (xps == NULL) {
470                         xprt_put(xprt);
471                         return ERR_PTR(-ENOMEM);
472                 }
473                 if (xprt->bc_xprt) {
474                         xprt_switch_get(xps);
475                         xprt->bc_xprt->xpt_bc_xps = xps;
476                 }
477         }
478         clnt = rpc_new_client(args, xps, xprt, NULL);
479         if (IS_ERR(clnt))
480                 return clnt;
481
482         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
483                 int err = rpc_ping(clnt);
484                 if (err != 0) {
485                         rpc_shutdown_client(clnt);
486                         return ERR_PTR(err);
487                 }
488         } else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
489                 int err = rpc_ping_noreply(clnt);
490                 if (err != 0) {
491                         rpc_shutdown_client(clnt);
492                         return ERR_PTR(err);
493                 }
494         }
495
496         clnt->cl_softrtry = 1;
497         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
498                 clnt->cl_softrtry = 0;
499                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
500                         clnt->cl_softerr = 1;
501         }
502
503         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
504                 clnt->cl_autobind = 1;
505         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
506                 clnt->cl_noretranstimeo = 1;
507         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
508                 clnt->cl_discrtry = 1;
509         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
510                 clnt->cl_chatty = 1;
511
512         return clnt;
513 }
514
515 /**
516  * rpc_create - create an RPC client and transport with one call
517  * @args: rpc_clnt create argument structure
518  *
519  * Creates and initializes an RPC transport and an RPC client.
520  *
521  * It can ping the server in order to determine if it is up, and to see if
522  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
523  * this behavior so asynchronous tasks can also use rpc_create.
524  */
525 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
526 {
527         struct rpc_xprt *xprt;
528         struct xprt_create xprtargs = {
529                 .net = args->net,
530                 .ident = args->protocol,
531                 .srcaddr = args->saddress,
532                 .dstaddr = args->address,
533                 .addrlen = args->addrsize,
534                 .servername = args->servername,
535                 .bc_xprt = args->bc_xprt,
536                 .xprtsec = args->xprtsec,
537         };
538         char servername[48];
539         struct rpc_clnt *clnt;
540         int i;
541
542         if (args->bc_xprt) {
543                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
544                 xprt = args->bc_xprt->xpt_bc_xprt;
545                 if (xprt) {
546                         xprt_get(xprt);
547                         return rpc_create_xprt(args, xprt);
548                 }
549         }
550
551         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
552                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
553         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
554                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
555         /*
556          * If the caller chooses not to specify a hostname, whip
557          * up a string representation of the passed-in address.
558          */
559         if (xprtargs.servername == NULL) {
560                 struct sockaddr_un *sun =
561                                 (struct sockaddr_un *)args->address;
562                 struct sockaddr_in *sin =
563                                 (struct sockaddr_in *)args->address;
564                 struct sockaddr_in6 *sin6 =
565                                 (struct sockaddr_in6 *)args->address;
566
567                 servername[0] = '\0';
568                 switch (args->address->sa_family) {
569                 case AF_LOCAL:
570                         if (sun->sun_path[0])
571                                 snprintf(servername, sizeof(servername), "%s",
572                                          sun->sun_path);
573                         else
574                                 snprintf(servername, sizeof(servername), "@%s",
575                                          sun->sun_path+1);
576                         break;
577                 case AF_INET:
578                         snprintf(servername, sizeof(servername), "%pI4",
579                                  &sin->sin_addr.s_addr);
580                         break;
581                 case AF_INET6:
582                         snprintf(servername, sizeof(servername), "%pI6",
583                                  &sin6->sin6_addr);
584                         break;
585                 default:
586                         /* caller wants default server name, but
587                          * address family isn't recognized. */
588                         return ERR_PTR(-EINVAL);
589                 }
590                 xprtargs.servername = servername;
591         }
592
593         xprt = xprt_create_transport(&xprtargs);
594         if (IS_ERR(xprt))
595                 return (struct rpc_clnt *)xprt;
596
597         /*
598          * By default, kernel RPC client connects from a reserved port.
599          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
600          * but it is always enabled for rpciod, which handles the connect
601          * operation.
602          */
603         xprt->resvport = 1;
604         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
605                 xprt->resvport = 0;
606         xprt->reuseport = 0;
607         if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
608                 xprt->reuseport = 1;
609
610         clnt = rpc_create_xprt(args, xprt);
611         if (IS_ERR(clnt) || args->nconnect <= 1)
612                 return clnt;
613
614         for (i = 0; i < args->nconnect - 1; i++) {
615                 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
616                         break;
617         }
618         return clnt;
619 }
620 EXPORT_SYMBOL_GPL(rpc_create);
621
622 /*
623  * This function clones the RPC client structure. It allows us to share the
624  * same transport while varying parameters such as the authentication
625  * flavour.
626  */
627 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
628                                            struct rpc_clnt *clnt)
629 {
630         struct rpc_xprt_switch *xps;
631         struct rpc_xprt *xprt;
632         struct rpc_clnt *new;
633         int err;
634
635         err = -ENOMEM;
636         rcu_read_lock();
637         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
638         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
639         rcu_read_unlock();
640         if (xprt == NULL || xps == NULL) {
641                 xprt_put(xprt);
642                 xprt_switch_put(xps);
643                 goto out_err;
644         }
645         args->servername = xprt->servername;
646         args->nodename = clnt->cl_nodename;
647
648         new = rpc_new_client(args, xps, xprt, clnt);
649         if (IS_ERR(new))
650                 return new;
651
652         /* Turn off autobind on clones */
653         new->cl_autobind = 0;
654         new->cl_softrtry = clnt->cl_softrtry;
655         new->cl_softerr = clnt->cl_softerr;
656         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
657         new->cl_discrtry = clnt->cl_discrtry;
658         new->cl_chatty = clnt->cl_chatty;
659         new->cl_principal = clnt->cl_principal;
660         new->cl_max_connect = clnt->cl_max_connect;
661         return new;
662
663 out_err:
664         trace_rpc_clnt_clone_err(clnt, err);
665         return ERR_PTR(err);
666 }
667
668 /**
669  * rpc_clone_client - Clone an RPC client structure
670  *
671  * @clnt: RPC client whose parameters are copied
672  *
673  * Returns a fresh RPC client or an ERR_PTR.
674  */
675 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
676 {
677         struct rpc_create_args args = {
678                 .program        = clnt->cl_program,
679                 .prognumber     = clnt->cl_prog,
680                 .version        = clnt->cl_vers,
681                 .authflavor     = clnt->cl_auth->au_flavor,
682                 .cred           = clnt->cl_cred,
683         };
684         return __rpc_clone_client(&args, clnt);
685 }
686 EXPORT_SYMBOL_GPL(rpc_clone_client);
687
688 /**
689  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
690  *
691  * @clnt: RPC client whose parameters are copied
692  * @flavor: security flavor for new client
693  *
694  * Returns a fresh RPC client or an ERR_PTR.
695  */
696 struct rpc_clnt *
697 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
698 {
699         struct rpc_create_args args = {
700                 .program        = clnt->cl_program,
701                 .prognumber     = clnt->cl_prog,
702                 .version        = clnt->cl_vers,
703                 .authflavor     = flavor,
704                 .cred           = clnt->cl_cred,
705         };
706         return __rpc_clone_client(&args, clnt);
707 }
708 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
709
710 /**
711  * rpc_switch_client_transport: switch the RPC transport on the fly
712  * @clnt: pointer to a struct rpc_clnt
713  * @args: pointer to the new transport arguments
714  * @timeout: pointer to the new timeout parameters
715  *
716  * This function allows the caller to switch the RPC transport for the
717  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
718  * server, for instance.  It assumes that the caller has ensured that
719  * there are no active RPC tasks by using some form of locking.
720  *
721  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
722  * negative errno is returned, and "clnt" continues to use the old
723  * xprt.
724  */
725 int rpc_switch_client_transport(struct rpc_clnt *clnt,
726                 struct xprt_create *args,
727                 const struct rpc_timeout *timeout)
728 {
729         const struct rpc_timeout *old_timeo;
730         rpc_authflavor_t pseudoflavor;
731         struct rpc_xprt_switch *xps, *oldxps;
732         struct rpc_xprt *xprt, *old;
733         struct rpc_clnt *parent;
734         int err;
735
736         args->xprtsec = clnt->cl_xprtsec;
737         xprt = xprt_create_transport(args);
738         if (IS_ERR(xprt))
739                 return PTR_ERR(xprt);
740
741         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
742         if (xps == NULL) {
743                 xprt_put(xprt);
744                 return -ENOMEM;
745         }
746
747         pseudoflavor = clnt->cl_auth->au_flavor;
748
749         old_timeo = clnt->cl_timeout;
750         old = rpc_clnt_set_transport(clnt, xprt, timeout);
751         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
752
753         rpc_unregister_client(clnt);
754         __rpc_clnt_remove_pipedir(clnt);
755         rpc_sysfs_client_destroy(clnt);
756         rpc_clnt_debugfs_unregister(clnt);
757
758         /*
759          * A new transport was created.  "clnt" therefore
760          * becomes the root of a new cl_parent tree.  clnt's
761          * children, if it has any, still point to the old xprt.
762          */
763         parent = clnt->cl_parent;
764         clnt->cl_parent = clnt;
765
766         /*
767          * The old rpc_auth cache cannot be re-used.  GSS
768          * contexts in particular are between a single
769          * client and server.
770          */
771         err = rpc_client_register(clnt, pseudoflavor, NULL);
772         if (err)
773                 goto out_revert;
774
775         synchronize_rcu();
776         if (parent != clnt)
777                 rpc_release_client(parent);
778         xprt_switch_put(oldxps);
779         xprt_put(old);
780         trace_rpc_clnt_replace_xprt(clnt);
781         return 0;
782
783 out_revert:
784         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
785         rpc_clnt_set_transport(clnt, old, old_timeo);
786         clnt->cl_parent = parent;
787         rpc_client_register(clnt, pseudoflavor, NULL);
788         xprt_switch_put(xps);
789         xprt_put(xprt);
790         trace_rpc_clnt_replace_xprt_err(clnt);
791         return err;
792 }
793 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
794
795 static
796 int _rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi,
797                              void func(struct rpc_xprt_iter *xpi, struct rpc_xprt_switch *xps))
798 {
799         struct rpc_xprt_switch *xps;
800
801         rcu_read_lock();
802         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
803         rcu_read_unlock();
804         if (xps == NULL)
805                 return -EAGAIN;
806         func(xpi, xps);
807         xprt_switch_put(xps);
808         return 0;
809 }
810
811 static
812 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
813 {
814         return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listall);
815 }
816
817 static
818 int rpc_clnt_xprt_iter_offline_init(struct rpc_clnt *clnt,
819                                     struct rpc_xprt_iter *xpi)
820 {
821         return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listoffline);
822 }
823
824 /**
825  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
826  * @clnt: pointer to client
827  * @fn: function to apply
828  * @data: void pointer to function data
829  *
830  * Iterates through the list of RPC transports currently attached to the
831  * client and applies the function fn(clnt, xprt, data).
832  *
833  * On error, the iteration stops, and the function returns the error value.
834  */
835 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
836                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
837                 void *data)
838 {
839         struct rpc_xprt_iter xpi;
840         int ret;
841
842         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
843         if (ret)
844                 return ret;
845         for (;;) {
846                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
847
848                 if (!xprt)
849                         break;
850                 ret = fn(clnt, xprt, data);
851                 xprt_put(xprt);
852                 if (ret < 0)
853                         break;
854         }
855         xprt_iter_destroy(&xpi);
856         return ret;
857 }
858 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
859
860 /*
861  * Kill all tasks for the given client.
862  * XXX: kill their descendants as well?
863  */
864 void rpc_killall_tasks(struct rpc_clnt *clnt)
865 {
866         struct rpc_task *rovr;
867
868
869         if (list_empty(&clnt->cl_tasks))
870                 return;
871
872         /*
873          * Spin lock all_tasks to prevent changes...
874          */
875         trace_rpc_clnt_killall(clnt);
876         spin_lock(&clnt->cl_lock);
877         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
878                 rpc_signal_task(rovr);
879         spin_unlock(&clnt->cl_lock);
880 }
881 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
882
883 /**
884  * rpc_cancel_tasks - try to cancel a set of RPC tasks
885  * @clnt: Pointer to RPC client
886  * @error: RPC task error value to set
887  * @fnmatch: Pointer to selector function
888  * @data: User data
889  *
890  * Uses @fnmatch to define a set of RPC tasks that are to be cancelled.
891  * The argument @error must be a negative error value.
892  */
893 unsigned long rpc_cancel_tasks(struct rpc_clnt *clnt, int error,
894                                bool (*fnmatch)(const struct rpc_task *,
895                                                const void *),
896                                const void *data)
897 {
898         struct rpc_task *task;
899         unsigned long count = 0;
900
901         if (list_empty(&clnt->cl_tasks))
902                 return 0;
903         /*
904          * Spin lock all_tasks to prevent changes...
905          */
906         spin_lock(&clnt->cl_lock);
907         list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
908                 if (!RPC_IS_ACTIVATED(task))
909                         continue;
910                 if (!fnmatch(task, data))
911                         continue;
912                 rpc_task_try_cancel(task, error);
913                 count++;
914         }
915         spin_unlock(&clnt->cl_lock);
916         return count;
917 }
918 EXPORT_SYMBOL_GPL(rpc_cancel_tasks);
919
920 static int rpc_clnt_disconnect_xprt(struct rpc_clnt *clnt,
921                                     struct rpc_xprt *xprt, void *dummy)
922 {
923         if (xprt_connected(xprt))
924                 xprt_force_disconnect(xprt);
925         return 0;
926 }
927
928 void rpc_clnt_disconnect(struct rpc_clnt *clnt)
929 {
930         rpc_clnt_iterate_for_each_xprt(clnt, rpc_clnt_disconnect_xprt, NULL);
931 }
932 EXPORT_SYMBOL_GPL(rpc_clnt_disconnect);
933
934 /*
935  * Properly shut down an RPC client, terminating all outstanding
936  * requests.
937  */
938 void rpc_shutdown_client(struct rpc_clnt *clnt)
939 {
940         might_sleep();
941
942         trace_rpc_clnt_shutdown(clnt);
943
944         while (!list_empty(&clnt->cl_tasks)) {
945                 rpc_killall_tasks(clnt);
946                 wait_event_timeout(destroy_wait,
947                         list_empty(&clnt->cl_tasks), 1*HZ);
948         }
949
950         rpc_release_client(clnt);
951 }
952 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
953
954 /*
955  * Free an RPC client
956  */
957 static void rpc_free_client_work(struct work_struct *work)
958 {
959         struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
960
961         trace_rpc_clnt_free(clnt);
962
963         /* These might block on processes that might allocate memory,
964          * so they cannot be called in rpciod, so they are handled separately
965          * here.
966          */
967         rpc_sysfs_client_destroy(clnt);
968         rpc_clnt_debugfs_unregister(clnt);
969         rpc_free_clid(clnt);
970         rpc_clnt_remove_pipedir(clnt);
971         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
972
973         kfree(clnt);
974         rpciod_down();
975 }
976 static struct rpc_clnt *
977 rpc_free_client(struct rpc_clnt *clnt)
978 {
979         struct rpc_clnt *parent = NULL;
980
981         trace_rpc_clnt_release(clnt);
982         if (clnt->cl_parent != clnt)
983                 parent = clnt->cl_parent;
984         rpc_unregister_client(clnt);
985         rpc_free_iostats(clnt->cl_metrics);
986         clnt->cl_metrics = NULL;
987         xprt_iter_destroy(&clnt->cl_xpi);
988         put_cred(clnt->cl_cred);
989
990         INIT_WORK(&clnt->cl_work, rpc_free_client_work);
991         schedule_work(&clnt->cl_work);
992         return parent;
993 }
994
995 /*
996  * Free an RPC client
997  */
998 static struct rpc_clnt *
999 rpc_free_auth(struct rpc_clnt *clnt)
1000 {
1001         /*
1002          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
1003          *       release remaining GSS contexts. This mechanism ensures
1004          *       that it can do so safely.
1005          */
1006         if (clnt->cl_auth != NULL) {
1007                 rpcauth_release(clnt->cl_auth);
1008                 clnt->cl_auth = NULL;
1009         }
1010         if (refcount_dec_and_test(&clnt->cl_count))
1011                 return rpc_free_client(clnt);
1012         return NULL;
1013 }
1014
1015 /*
1016  * Release reference to the RPC client
1017  */
1018 void
1019 rpc_release_client(struct rpc_clnt *clnt)
1020 {
1021         do {
1022                 if (list_empty(&clnt->cl_tasks))
1023                         wake_up(&destroy_wait);
1024                 if (refcount_dec_not_one(&clnt->cl_count))
1025                         break;
1026                 clnt = rpc_free_auth(clnt);
1027         } while (clnt != NULL);
1028 }
1029 EXPORT_SYMBOL_GPL(rpc_release_client);
1030
1031 /**
1032  * rpc_bind_new_program - bind a new RPC program to an existing client
1033  * @old: old rpc_client
1034  * @program: rpc program to set
1035  * @vers: rpc program version
1036  *
1037  * Clones the rpc client and sets up a new RPC program. This is mainly
1038  * of use for enabling different RPC programs to share the same transport.
1039  * The Sun NFSv2/v3 ACL protocol can do this.
1040  */
1041 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
1042                                       const struct rpc_program *program,
1043                                       u32 vers)
1044 {
1045         struct rpc_create_args args = {
1046                 .program        = program,
1047                 .prognumber     = program->number,
1048                 .version        = vers,
1049                 .authflavor     = old->cl_auth->au_flavor,
1050                 .cred           = old->cl_cred,
1051         };
1052         struct rpc_clnt *clnt;
1053         int err;
1054
1055         clnt = __rpc_clone_client(&args, old);
1056         if (IS_ERR(clnt))
1057                 goto out;
1058         err = rpc_ping(clnt);
1059         if (err != 0) {
1060                 rpc_shutdown_client(clnt);
1061                 clnt = ERR_PTR(err);
1062         }
1063 out:
1064         return clnt;
1065 }
1066 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
1067
1068 struct rpc_xprt *
1069 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1070 {
1071         struct rpc_xprt_switch *xps;
1072
1073         if (!xprt)
1074                 return NULL;
1075         rcu_read_lock();
1076         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1077         atomic_long_inc(&xps->xps_queuelen);
1078         rcu_read_unlock();
1079         atomic_long_inc(&xprt->queuelen);
1080
1081         return xprt;
1082 }
1083
1084 static void
1085 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1086 {
1087         struct rpc_xprt_switch *xps;
1088
1089         atomic_long_dec(&xprt->queuelen);
1090         rcu_read_lock();
1091         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1092         atomic_long_dec(&xps->xps_queuelen);
1093         rcu_read_unlock();
1094
1095         xprt_put(xprt);
1096 }
1097
1098 void rpc_task_release_transport(struct rpc_task *task)
1099 {
1100         struct rpc_xprt *xprt = task->tk_xprt;
1101
1102         if (xprt) {
1103                 task->tk_xprt = NULL;
1104                 if (task->tk_client)
1105                         rpc_task_release_xprt(task->tk_client, xprt);
1106                 else
1107                         xprt_put(xprt);
1108         }
1109 }
1110 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1111
1112 void rpc_task_release_client(struct rpc_task *task)
1113 {
1114         struct rpc_clnt *clnt = task->tk_client;
1115
1116         rpc_task_release_transport(task);
1117         if (clnt != NULL) {
1118                 /* Remove from client task list */
1119                 spin_lock(&clnt->cl_lock);
1120                 list_del(&task->tk_task);
1121                 spin_unlock(&clnt->cl_lock);
1122                 task->tk_client = NULL;
1123
1124                 rpc_release_client(clnt);
1125         }
1126 }
1127
1128 static struct rpc_xprt *
1129 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1130 {
1131         struct rpc_xprt *xprt;
1132
1133         rcu_read_lock();
1134         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1135         rcu_read_unlock();
1136         return rpc_task_get_xprt(clnt, xprt);
1137 }
1138
1139 static struct rpc_xprt *
1140 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1141 {
1142         return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1143 }
1144
1145 static
1146 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1147 {
1148         if (task->tk_xprt) {
1149                 if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1150                       (task->tk_flags & RPC_TASK_MOVEABLE)))
1151                         return;
1152                 xprt_release(task);
1153                 xprt_put(task->tk_xprt);
1154         }
1155         if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1156                 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1157         else
1158                 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1159 }
1160
1161 static
1162 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1163 {
1164         rpc_task_set_transport(task, clnt);
1165         task->tk_client = clnt;
1166         refcount_inc(&clnt->cl_count);
1167         if (clnt->cl_softrtry)
1168                 task->tk_flags |= RPC_TASK_SOFT;
1169         if (clnt->cl_softerr)
1170                 task->tk_flags |= RPC_TASK_TIMEOUT;
1171         if (clnt->cl_noretranstimeo)
1172                 task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1173         /* Add to the client's list of all tasks */
1174         spin_lock(&clnt->cl_lock);
1175         list_add_tail(&task->tk_task, &clnt->cl_tasks);
1176         spin_unlock(&clnt->cl_lock);
1177 }
1178
1179 static void
1180 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1181 {
1182         if (msg != NULL) {
1183                 task->tk_msg.rpc_proc = msg->rpc_proc;
1184                 task->tk_msg.rpc_argp = msg->rpc_argp;
1185                 task->tk_msg.rpc_resp = msg->rpc_resp;
1186                 task->tk_msg.rpc_cred = msg->rpc_cred;
1187                 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1188                         get_cred(task->tk_msg.rpc_cred);
1189         }
1190 }
1191
1192 /*
1193  * Default callback for async RPC calls
1194  */
1195 static void
1196 rpc_default_callback(struct rpc_task *task, void *data)
1197 {
1198 }
1199
1200 static const struct rpc_call_ops rpc_default_ops = {
1201         .rpc_call_done = rpc_default_callback,
1202 };
1203
1204 /**
1205  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1206  * @task_setup_data: pointer to task initialisation data
1207  */
1208 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1209 {
1210         struct rpc_task *task;
1211
1212         task = rpc_new_task(task_setup_data);
1213         if (IS_ERR(task))
1214                 return task;
1215
1216         if (!RPC_IS_ASYNC(task))
1217                 task->tk_flags |= RPC_TASK_CRED_NOREF;
1218
1219         rpc_task_set_client(task, task_setup_data->rpc_client);
1220         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1221
1222         if (task->tk_action == NULL)
1223                 rpc_call_start(task);
1224
1225         atomic_inc(&task->tk_count);
1226         rpc_execute(task);
1227         return task;
1228 }
1229 EXPORT_SYMBOL_GPL(rpc_run_task);
1230
1231 /**
1232  * rpc_call_sync - Perform a synchronous RPC call
1233  * @clnt: pointer to RPC client
1234  * @msg: RPC call parameters
1235  * @flags: RPC call flags
1236  */
1237 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1238 {
1239         struct rpc_task *task;
1240         struct rpc_task_setup task_setup_data = {
1241                 .rpc_client = clnt,
1242                 .rpc_message = msg,
1243                 .callback_ops = &rpc_default_ops,
1244                 .flags = flags,
1245         };
1246         int status;
1247
1248         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1249         if (flags & RPC_TASK_ASYNC) {
1250                 rpc_release_calldata(task_setup_data.callback_ops,
1251                         task_setup_data.callback_data);
1252                 return -EINVAL;
1253         }
1254
1255         task = rpc_run_task(&task_setup_data);
1256         if (IS_ERR(task))
1257                 return PTR_ERR(task);
1258         status = task->tk_status;
1259         rpc_put_task(task);
1260         return status;
1261 }
1262 EXPORT_SYMBOL_GPL(rpc_call_sync);
1263
1264 /**
1265  * rpc_call_async - Perform an asynchronous RPC call
1266  * @clnt: pointer to RPC client
1267  * @msg: RPC call parameters
1268  * @flags: RPC call flags
1269  * @tk_ops: RPC call ops
1270  * @data: user call data
1271  */
1272 int
1273 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1274                const struct rpc_call_ops *tk_ops, void *data)
1275 {
1276         struct rpc_task *task;
1277         struct rpc_task_setup task_setup_data = {
1278                 .rpc_client = clnt,
1279                 .rpc_message = msg,
1280                 .callback_ops = tk_ops,
1281                 .callback_data = data,
1282                 .flags = flags|RPC_TASK_ASYNC,
1283         };
1284
1285         task = rpc_run_task(&task_setup_data);
1286         if (IS_ERR(task))
1287                 return PTR_ERR(task);
1288         rpc_put_task(task);
1289         return 0;
1290 }
1291 EXPORT_SYMBOL_GPL(rpc_call_async);
1292
1293 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1294 static void call_bc_encode(struct rpc_task *task);
1295
1296 /**
1297  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1298  * rpc_execute against it
1299  * @req: RPC request
1300  */
1301 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1302 {
1303         struct rpc_task *task;
1304         struct rpc_task_setup task_setup_data = {
1305                 .callback_ops = &rpc_default_ops,
1306                 .flags = RPC_TASK_SOFTCONN |
1307                         RPC_TASK_NO_RETRANS_TIMEOUT,
1308         };
1309
1310         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1311         /*
1312          * Create an rpc_task to send the data
1313          */
1314         task = rpc_new_task(&task_setup_data);
1315         if (IS_ERR(task)) {
1316                 xprt_free_bc_request(req);
1317                 return task;
1318         }
1319
1320         xprt_init_bc_request(req, task);
1321
1322         task->tk_action = call_bc_encode;
1323         atomic_inc(&task->tk_count);
1324         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1325         rpc_execute(task);
1326
1327         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1328         return task;
1329 }
1330 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1331
1332 /**
1333  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1334  * @req: RPC request to prepare
1335  * @pages: vector of struct page pointers
1336  * @base: offset in first page where receive should start, in bytes
1337  * @len: expected size of the upper layer data payload, in bytes
1338  * @hdrsize: expected size of upper layer reply header, in XDR words
1339  *
1340  */
1341 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1342                              unsigned int base, unsigned int len,
1343                              unsigned int hdrsize)
1344 {
1345         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1346
1347         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1348         trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1349 }
1350 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1351
1352 void
1353 rpc_call_start(struct rpc_task *task)
1354 {
1355         task->tk_action = call_start;
1356 }
1357 EXPORT_SYMBOL_GPL(rpc_call_start);
1358
1359 /**
1360  * rpc_peeraddr - extract remote peer address from clnt's xprt
1361  * @clnt: RPC client structure
1362  * @buf: target buffer
1363  * @bufsize: length of target buffer
1364  *
1365  * Returns the number of bytes that are actually in the stored address.
1366  */
1367 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1368 {
1369         size_t bytes;
1370         struct rpc_xprt *xprt;
1371
1372         rcu_read_lock();
1373         xprt = rcu_dereference(clnt->cl_xprt);
1374
1375         bytes = xprt->addrlen;
1376         if (bytes > bufsize)
1377                 bytes = bufsize;
1378         memcpy(buf, &xprt->addr, bytes);
1379         rcu_read_unlock();
1380
1381         return bytes;
1382 }
1383 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1384
1385 /**
1386  * rpc_peeraddr2str - return remote peer address in printable format
1387  * @clnt: RPC client structure
1388  * @format: address format
1389  *
1390  * NB: the lifetime of the memory referenced by the returned pointer is
1391  * the same as the rpc_xprt itself.  As long as the caller uses this
1392  * pointer, it must hold the RCU read lock.
1393  */
1394 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1395                              enum rpc_display_format_t format)
1396 {
1397         struct rpc_xprt *xprt;
1398
1399         xprt = rcu_dereference(clnt->cl_xprt);
1400
1401         if (xprt->address_strings[format] != NULL)
1402                 return xprt->address_strings[format];
1403         else
1404                 return "unprintable";
1405 }
1406 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1407
1408 static const struct sockaddr_in rpc_inaddr_loopback = {
1409         .sin_family             = AF_INET,
1410         .sin_addr.s_addr        = htonl(INADDR_ANY),
1411 };
1412
1413 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1414         .sin6_family            = AF_INET6,
1415         .sin6_addr              = IN6ADDR_ANY_INIT,
1416 };
1417
1418 /*
1419  * Try a getsockname() on a connected datagram socket.  Using a
1420  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1421  * This conserves the ephemeral port number space.
1422  *
1423  * Returns zero and fills in "buf" if successful; otherwise, a
1424  * negative errno is returned.
1425  */
1426 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1427                         struct sockaddr *buf)
1428 {
1429         struct socket *sock;
1430         int err;
1431
1432         err = __sock_create(net, sap->sa_family,
1433                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1434         if (err < 0) {
1435                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1436                 goto out;
1437         }
1438
1439         switch (sap->sa_family) {
1440         case AF_INET:
1441                 err = kernel_bind(sock,
1442                                 (struct sockaddr *)&rpc_inaddr_loopback,
1443                                 sizeof(rpc_inaddr_loopback));
1444                 break;
1445         case AF_INET6:
1446                 err = kernel_bind(sock,
1447                                 (struct sockaddr *)&rpc_in6addr_loopback,
1448                                 sizeof(rpc_in6addr_loopback));
1449                 break;
1450         default:
1451                 err = -EAFNOSUPPORT;
1452                 goto out_release;
1453         }
1454         if (err < 0) {
1455                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1456                 goto out_release;
1457         }
1458
1459         err = kernel_connect(sock, sap, salen, 0);
1460         if (err < 0) {
1461                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1462                 goto out_release;
1463         }
1464
1465         err = kernel_getsockname(sock, buf);
1466         if (err < 0) {
1467                 dprintk("RPC:       getsockname failed (%d)\n", err);
1468                 goto out_release;
1469         }
1470
1471         err = 0;
1472         if (buf->sa_family == AF_INET6) {
1473                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1474                 sin6->sin6_scope_id = 0;
1475         }
1476         dprintk("RPC:       %s succeeded\n", __func__);
1477
1478 out_release:
1479         sock_release(sock);
1480 out:
1481         return err;
1482 }
1483
1484 /*
1485  * Scraping a connected socket failed, so we don't have a useable
1486  * local address.  Fallback: generate an address that will prevent
1487  * the server from calling us back.
1488  *
1489  * Returns zero and fills in "buf" if successful; otherwise, a
1490  * negative errno is returned.
1491  */
1492 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1493 {
1494         switch (family) {
1495         case AF_INET:
1496                 if (buflen < sizeof(rpc_inaddr_loopback))
1497                         return -EINVAL;
1498                 memcpy(buf, &rpc_inaddr_loopback,
1499                                 sizeof(rpc_inaddr_loopback));
1500                 break;
1501         case AF_INET6:
1502                 if (buflen < sizeof(rpc_in6addr_loopback))
1503                         return -EINVAL;
1504                 memcpy(buf, &rpc_in6addr_loopback,
1505                                 sizeof(rpc_in6addr_loopback));
1506                 break;
1507         default:
1508                 dprintk("RPC:       %s: address family not supported\n",
1509                         __func__);
1510                 return -EAFNOSUPPORT;
1511         }
1512         dprintk("RPC:       %s: succeeded\n", __func__);
1513         return 0;
1514 }
1515
1516 /**
1517  * rpc_localaddr - discover local endpoint address for an RPC client
1518  * @clnt: RPC client structure
1519  * @buf: target buffer
1520  * @buflen: size of target buffer, in bytes
1521  *
1522  * Returns zero and fills in "buf" and "buflen" if successful;
1523  * otherwise, a negative errno is returned.
1524  *
1525  * This works even if the underlying transport is not currently connected,
1526  * or if the upper layer never previously provided a source address.
1527  *
1528  * The result of this function call is transient: multiple calls in
1529  * succession may give different results, depending on how local
1530  * networking configuration changes over time.
1531  */
1532 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1533 {
1534         struct sockaddr_storage address;
1535         struct sockaddr *sap = (struct sockaddr *)&address;
1536         struct rpc_xprt *xprt;
1537         struct net *net;
1538         size_t salen;
1539         int err;
1540
1541         rcu_read_lock();
1542         xprt = rcu_dereference(clnt->cl_xprt);
1543         salen = xprt->addrlen;
1544         memcpy(sap, &xprt->addr, salen);
1545         net = get_net(xprt->xprt_net);
1546         rcu_read_unlock();
1547
1548         rpc_set_port(sap, 0);
1549         err = rpc_sockname(net, sap, salen, buf);
1550         put_net(net);
1551         if (err != 0)
1552                 /* Couldn't discover local address, return ANYADDR */
1553                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1554         return 0;
1555 }
1556 EXPORT_SYMBOL_GPL(rpc_localaddr);
1557
1558 void
1559 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1560 {
1561         struct rpc_xprt *xprt;
1562
1563         rcu_read_lock();
1564         xprt = rcu_dereference(clnt->cl_xprt);
1565         if (xprt->ops->set_buffer_size)
1566                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1567         rcu_read_unlock();
1568 }
1569 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1570
1571 /**
1572  * rpc_net_ns - Get the network namespace for this RPC client
1573  * @clnt: RPC client to query
1574  *
1575  */
1576 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1577 {
1578         struct net *ret;
1579
1580         rcu_read_lock();
1581         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1582         rcu_read_unlock();
1583         return ret;
1584 }
1585 EXPORT_SYMBOL_GPL(rpc_net_ns);
1586
1587 /**
1588  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1589  * @clnt: RPC client to query
1590  *
1591  * For stream transports, this is one RPC record fragment (see RFC
1592  * 1831), as we don't support multi-record requests yet.  For datagram
1593  * transports, this is the size of an IP packet minus the IP, UDP, and
1594  * RPC header sizes.
1595  */
1596 size_t rpc_max_payload(struct rpc_clnt *clnt)
1597 {
1598         size_t ret;
1599
1600         rcu_read_lock();
1601         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1602         rcu_read_unlock();
1603         return ret;
1604 }
1605 EXPORT_SYMBOL_GPL(rpc_max_payload);
1606
1607 /**
1608  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1609  * @clnt: RPC client to query
1610  */
1611 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1612 {
1613         struct rpc_xprt *xprt;
1614         size_t ret;
1615
1616         rcu_read_lock();
1617         xprt = rcu_dereference(clnt->cl_xprt);
1618         ret = xprt->ops->bc_maxpayload(xprt);
1619         rcu_read_unlock();
1620         return ret;
1621 }
1622 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1623
1624 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1625 {
1626         struct rpc_xprt *xprt;
1627         unsigned int ret;
1628
1629         rcu_read_lock();
1630         xprt = rcu_dereference(clnt->cl_xprt);
1631         ret = xprt->ops->bc_num_slots(xprt);
1632         rcu_read_unlock();
1633         return ret;
1634 }
1635 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1636
1637 /**
1638  * rpc_force_rebind - force transport to check that remote port is unchanged
1639  * @clnt: client to rebind
1640  *
1641  */
1642 void rpc_force_rebind(struct rpc_clnt *clnt)
1643 {
1644         if (clnt->cl_autobind) {
1645                 rcu_read_lock();
1646                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1647                 rcu_read_unlock();
1648         }
1649 }
1650 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1651
1652 static int
1653 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1654 {
1655         task->tk_status = 0;
1656         task->tk_rpc_status = 0;
1657         task->tk_action = action;
1658         return 1;
1659 }
1660
1661 /*
1662  * Restart an (async) RPC call. Usually called from within the
1663  * exit handler.
1664  */
1665 int
1666 rpc_restart_call(struct rpc_task *task)
1667 {
1668         return __rpc_restart_call(task, call_start);
1669 }
1670 EXPORT_SYMBOL_GPL(rpc_restart_call);
1671
1672 /*
1673  * Restart an (async) RPC call from the call_prepare state.
1674  * Usually called from within the exit handler.
1675  */
1676 int
1677 rpc_restart_call_prepare(struct rpc_task *task)
1678 {
1679         if (task->tk_ops->rpc_call_prepare != NULL)
1680                 return __rpc_restart_call(task, rpc_prepare_task);
1681         return rpc_restart_call(task);
1682 }
1683 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1684
1685 const char
1686 *rpc_proc_name(const struct rpc_task *task)
1687 {
1688         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1689
1690         if (proc) {
1691                 if (proc->p_name)
1692                         return proc->p_name;
1693                 else
1694                         return "NULL";
1695         } else
1696                 return "no proc";
1697 }
1698
1699 static void
1700 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1701 {
1702         trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1703         rpc_task_set_rpc_status(task, rpc_status);
1704         rpc_exit(task, tk_status);
1705 }
1706
1707 static void
1708 rpc_call_rpcerror(struct rpc_task *task, int status)
1709 {
1710         __rpc_call_rpcerror(task, status, status);
1711 }
1712
1713 /*
1714  * 0.  Initial state
1715  *
1716  *     Other FSM states can be visited zero or more times, but
1717  *     this state is visited exactly once for each RPC.
1718  */
1719 static void
1720 call_start(struct rpc_task *task)
1721 {
1722         struct rpc_clnt *clnt = task->tk_client;
1723         int idx = task->tk_msg.rpc_proc->p_statidx;
1724
1725         trace_rpc_request(task);
1726
1727         /* Increment call count (version might not be valid for ping) */
1728         if (clnt->cl_program->version[clnt->cl_vers])
1729                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1730         clnt->cl_stats->rpccnt++;
1731         task->tk_action = call_reserve;
1732         rpc_task_set_transport(task, clnt);
1733 }
1734
1735 /*
1736  * 1.   Reserve an RPC call slot
1737  */
1738 static void
1739 call_reserve(struct rpc_task *task)
1740 {
1741         task->tk_status  = 0;
1742         task->tk_action  = call_reserveresult;
1743         xprt_reserve(task);
1744 }
1745
1746 static void call_retry_reserve(struct rpc_task *task);
1747
1748 /*
1749  * 1b.  Grok the result of xprt_reserve()
1750  */
1751 static void
1752 call_reserveresult(struct rpc_task *task)
1753 {
1754         int status = task->tk_status;
1755
1756         /*
1757          * After a call to xprt_reserve(), we must have either
1758          * a request slot or else an error status.
1759          */
1760         task->tk_status = 0;
1761         if (status >= 0) {
1762                 if (task->tk_rqstp) {
1763                         task->tk_action = call_refresh;
1764                         return;
1765                 }
1766
1767                 rpc_call_rpcerror(task, -EIO);
1768                 return;
1769         }
1770
1771         switch (status) {
1772         case -ENOMEM:
1773                 rpc_delay(task, HZ >> 2);
1774                 fallthrough;
1775         case -EAGAIN:   /* woken up; retry */
1776                 task->tk_action = call_retry_reserve;
1777                 return;
1778         default:
1779                 rpc_call_rpcerror(task, status);
1780         }
1781 }
1782
1783 /*
1784  * 1c.  Retry reserving an RPC call slot
1785  */
1786 static void
1787 call_retry_reserve(struct rpc_task *task)
1788 {
1789         task->tk_status  = 0;
1790         task->tk_action  = call_reserveresult;
1791         xprt_retry_reserve(task);
1792 }
1793
1794 /*
1795  * 2.   Bind and/or refresh the credentials
1796  */
1797 static void
1798 call_refresh(struct rpc_task *task)
1799 {
1800         task->tk_action = call_refreshresult;
1801         task->tk_status = 0;
1802         task->tk_client->cl_stats->rpcauthrefresh++;
1803         rpcauth_refreshcred(task);
1804 }
1805
1806 /*
1807  * 2a.  Process the results of a credential refresh
1808  */
1809 static void
1810 call_refreshresult(struct rpc_task *task)
1811 {
1812         int status = task->tk_status;
1813
1814         task->tk_status = 0;
1815         task->tk_action = call_refresh;
1816         switch (status) {
1817         case 0:
1818                 if (rpcauth_uptodatecred(task)) {
1819                         task->tk_action = call_allocate;
1820                         return;
1821                 }
1822                 /* Use rate-limiting and a max number of retries if refresh
1823                  * had status 0 but failed to update the cred.
1824                  */
1825                 fallthrough;
1826         case -ETIMEDOUT:
1827                 rpc_delay(task, 3*HZ);
1828                 fallthrough;
1829         case -EAGAIN:
1830                 status = -EACCES;
1831                 fallthrough;
1832         case -EKEYEXPIRED:
1833                 if (!task->tk_cred_retry)
1834                         break;
1835                 task->tk_cred_retry--;
1836                 trace_rpc_retry_refresh_status(task);
1837                 return;
1838         case -ENOMEM:
1839                 rpc_delay(task, HZ >> 4);
1840                 return;
1841         }
1842         trace_rpc_refresh_status(task);
1843         rpc_call_rpcerror(task, status);
1844 }
1845
1846 /*
1847  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1848  *      (Note: buffer memory is freed in xprt_release).
1849  */
1850 static void
1851 call_allocate(struct rpc_task *task)
1852 {
1853         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1854         struct rpc_rqst *req = task->tk_rqstp;
1855         struct rpc_xprt *xprt = req->rq_xprt;
1856         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1857         int status;
1858
1859         task->tk_status = 0;
1860         task->tk_action = call_encode;
1861
1862         if (req->rq_buffer)
1863                 return;
1864
1865         if (proc->p_proc != 0) {
1866                 BUG_ON(proc->p_arglen == 0);
1867                 if (proc->p_decode != NULL)
1868                         BUG_ON(proc->p_replen == 0);
1869         }
1870
1871         /*
1872          * Calculate the size (in quads) of the RPC call
1873          * and reply headers, and convert both values
1874          * to byte sizes.
1875          */
1876         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1877                            proc->p_arglen;
1878         req->rq_callsize <<= 2;
1879         /*
1880          * Note: the reply buffer must at minimum allocate enough space
1881          * for the 'struct accepted_reply' from RFC5531.
1882          */
1883         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1884                         max_t(size_t, proc->p_replen, 2);
1885         req->rq_rcvsize <<= 2;
1886
1887         status = xprt->ops->buf_alloc(task);
1888         trace_rpc_buf_alloc(task, status);
1889         if (status == 0)
1890                 return;
1891         if (status != -ENOMEM) {
1892                 rpc_call_rpcerror(task, status);
1893                 return;
1894         }
1895
1896         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1897                 task->tk_action = call_allocate;
1898                 rpc_delay(task, HZ>>4);
1899                 return;
1900         }
1901
1902         rpc_call_rpcerror(task, -ERESTARTSYS);
1903 }
1904
1905 static int
1906 rpc_task_need_encode(struct rpc_task *task)
1907 {
1908         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1909                 (!(task->tk_flags & RPC_TASK_SENT) ||
1910                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1911                  xprt_request_need_retransmit(task));
1912 }
1913
1914 static void
1915 rpc_xdr_encode(struct rpc_task *task)
1916 {
1917         struct rpc_rqst *req = task->tk_rqstp;
1918         struct xdr_stream xdr;
1919
1920         xdr_buf_init(&req->rq_snd_buf,
1921                      req->rq_buffer,
1922                      req->rq_callsize);
1923         xdr_buf_init(&req->rq_rcv_buf,
1924                      req->rq_rbuffer,
1925                      req->rq_rcvsize);
1926
1927         req->rq_reply_bytes_recvd = 0;
1928         req->rq_snd_buf.head[0].iov_len = 0;
1929         xdr_init_encode(&xdr, &req->rq_snd_buf,
1930                         req->rq_snd_buf.head[0].iov_base, req);
1931         if (rpc_encode_header(task, &xdr))
1932                 return;
1933
1934         task->tk_status = rpcauth_wrap_req(task, &xdr);
1935 }
1936
1937 /*
1938  * 3.   Encode arguments of an RPC call
1939  */
1940 static void
1941 call_encode(struct rpc_task *task)
1942 {
1943         if (!rpc_task_need_encode(task))
1944                 goto out;
1945
1946         /* Dequeue task from the receive queue while we're encoding */
1947         xprt_request_dequeue_xprt(task);
1948         /* Encode here so that rpcsec_gss can use correct sequence number. */
1949         rpc_xdr_encode(task);
1950         /* Add task to reply queue before transmission to avoid races */
1951         if (task->tk_status == 0 && rpc_reply_expected(task))
1952                 task->tk_status = xprt_request_enqueue_receive(task);
1953         /* Did the encode result in an error condition? */
1954         if (task->tk_status != 0) {
1955                 /* Was the error nonfatal? */
1956                 switch (task->tk_status) {
1957                 case -EAGAIN:
1958                 case -ENOMEM:
1959                         rpc_delay(task, HZ >> 4);
1960                         break;
1961                 case -EKEYEXPIRED:
1962                         if (!task->tk_cred_retry) {
1963                                 rpc_call_rpcerror(task, task->tk_status);
1964                         } else {
1965                                 task->tk_action = call_refresh;
1966                                 task->tk_cred_retry--;
1967                                 trace_rpc_retry_refresh_status(task);
1968                         }
1969                         break;
1970                 default:
1971                         rpc_call_rpcerror(task, task->tk_status);
1972                 }
1973                 return;
1974         }
1975
1976         xprt_request_enqueue_transmit(task);
1977 out:
1978         task->tk_action = call_transmit;
1979         /* Check that the connection is OK */
1980         if (!xprt_bound(task->tk_xprt))
1981                 task->tk_action = call_bind;
1982         else if (!xprt_connected(task->tk_xprt))
1983                 task->tk_action = call_connect;
1984 }
1985
1986 /*
1987  * Helpers to check if the task was already transmitted, and
1988  * to take action when that is the case.
1989  */
1990 static bool
1991 rpc_task_transmitted(struct rpc_task *task)
1992 {
1993         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1994 }
1995
1996 static void
1997 rpc_task_handle_transmitted(struct rpc_task *task)
1998 {
1999         xprt_end_transmit(task);
2000         task->tk_action = call_transmit_status;
2001 }
2002
2003 /*
2004  * 4.   Get the server port number if not yet set
2005  */
2006 static void
2007 call_bind(struct rpc_task *task)
2008 {
2009         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2010
2011         if (rpc_task_transmitted(task)) {
2012                 rpc_task_handle_transmitted(task);
2013                 return;
2014         }
2015
2016         if (xprt_bound(xprt)) {
2017                 task->tk_action = call_connect;
2018                 return;
2019         }
2020
2021         task->tk_action = call_bind_status;
2022         if (!xprt_prepare_transmit(task))
2023                 return;
2024
2025         xprt->ops->rpcbind(task);
2026 }
2027
2028 /*
2029  * 4a.  Sort out bind result
2030  */
2031 static void
2032 call_bind_status(struct rpc_task *task)
2033 {
2034         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2035         int status = -EIO;
2036
2037         if (rpc_task_transmitted(task)) {
2038                 rpc_task_handle_transmitted(task);
2039                 return;
2040         }
2041
2042         if (task->tk_status >= 0)
2043                 goto out_next;
2044         if (xprt_bound(xprt)) {
2045                 task->tk_status = 0;
2046                 goto out_next;
2047         }
2048
2049         switch (task->tk_status) {
2050         case -ENOMEM:
2051                 rpc_delay(task, HZ >> 2);
2052                 goto retry_timeout;
2053         case -EACCES:
2054                 trace_rpcb_prog_unavail_err(task);
2055                 /* fail immediately if this is an RPC ping */
2056                 if (task->tk_msg.rpc_proc->p_proc == 0) {
2057                         status = -EOPNOTSUPP;
2058                         break;
2059                 }
2060                 rpc_delay(task, 3*HZ);
2061                 goto retry_timeout;
2062         case -ENOBUFS:
2063                 rpc_delay(task, HZ >> 2);
2064                 goto retry_timeout;
2065         case -EAGAIN:
2066                 goto retry_timeout;
2067         case -ETIMEDOUT:
2068                 trace_rpcb_timeout_err(task);
2069                 goto retry_timeout;
2070         case -EPFNOSUPPORT:
2071                 /* server doesn't support any rpcbind version we know of */
2072                 trace_rpcb_bind_version_err(task);
2073                 break;
2074         case -EPROTONOSUPPORT:
2075                 trace_rpcb_bind_version_err(task);
2076                 goto retry_timeout;
2077         case -ECONNREFUSED:             /* connection problems */
2078         case -ECONNRESET:
2079         case -ECONNABORTED:
2080         case -ENOTCONN:
2081         case -EHOSTDOWN:
2082         case -ENETDOWN:
2083         case -EHOSTUNREACH:
2084         case -ENETUNREACH:
2085         case -EPIPE:
2086                 trace_rpcb_unreachable_err(task);
2087                 if (!RPC_IS_SOFTCONN(task)) {
2088                         rpc_delay(task, 5*HZ);
2089                         goto retry_timeout;
2090                 }
2091                 status = task->tk_status;
2092                 break;
2093         default:
2094                 trace_rpcb_unrecognized_err(task);
2095         }
2096
2097         rpc_call_rpcerror(task, status);
2098         return;
2099 out_next:
2100         task->tk_action = call_connect;
2101         return;
2102 retry_timeout:
2103         task->tk_status = 0;
2104         task->tk_action = call_bind;
2105         rpc_check_timeout(task);
2106 }
2107
2108 /*
2109  * 4b.  Connect to the RPC server
2110  */
2111 static void
2112 call_connect(struct rpc_task *task)
2113 {
2114         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2115
2116         if (rpc_task_transmitted(task)) {
2117                 rpc_task_handle_transmitted(task);
2118                 return;
2119         }
2120
2121         if (xprt_connected(xprt)) {
2122                 task->tk_action = call_transmit;
2123                 return;
2124         }
2125
2126         task->tk_action = call_connect_status;
2127         if (task->tk_status < 0)
2128                 return;
2129         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2130                 rpc_call_rpcerror(task, -ENOTCONN);
2131                 return;
2132         }
2133         if (!xprt_prepare_transmit(task))
2134                 return;
2135         xprt_connect(task);
2136 }
2137
2138 /*
2139  * 4c.  Sort out connect result
2140  */
2141 static void
2142 call_connect_status(struct rpc_task *task)
2143 {
2144         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2145         struct rpc_clnt *clnt = task->tk_client;
2146         int status = task->tk_status;
2147
2148         if (rpc_task_transmitted(task)) {
2149                 rpc_task_handle_transmitted(task);
2150                 return;
2151         }
2152
2153         trace_rpc_connect_status(task);
2154
2155         if (task->tk_status == 0) {
2156                 clnt->cl_stats->netreconn++;
2157                 goto out_next;
2158         }
2159         if (xprt_connected(xprt)) {
2160                 task->tk_status = 0;
2161                 goto out_next;
2162         }
2163
2164         task->tk_status = 0;
2165         switch (status) {
2166         case -ECONNREFUSED:
2167                 /* A positive refusal suggests a rebind is needed. */
2168                 if (RPC_IS_SOFTCONN(task))
2169                         break;
2170                 if (clnt->cl_autobind) {
2171                         rpc_force_rebind(clnt);
2172                         goto out_retry;
2173                 }
2174                 fallthrough;
2175         case -ECONNRESET:
2176         case -ECONNABORTED:
2177         case -ENETDOWN:
2178         case -ENETUNREACH:
2179         case -EHOSTUNREACH:
2180         case -EPIPE:
2181         case -EPROTO:
2182                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2183                                             task->tk_rqstp->rq_connect_cookie);
2184                 if (RPC_IS_SOFTCONN(task))
2185                         break;
2186                 /* retry with existing socket, after a delay */
2187                 rpc_delay(task, 3*HZ);
2188                 fallthrough;
2189         case -EADDRINUSE:
2190         case -ENOTCONN:
2191         case -EAGAIN:
2192         case -ETIMEDOUT:
2193                 if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2194                     (task->tk_flags & RPC_TASK_MOVEABLE) &&
2195                     test_bit(XPRT_REMOVE, &xprt->state)) {
2196                         struct rpc_xprt *saved = task->tk_xprt;
2197                         struct rpc_xprt_switch *xps;
2198
2199                         rcu_read_lock();
2200                         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2201                         rcu_read_unlock();
2202                         if (xps->xps_nxprts > 1) {
2203                                 long value;
2204
2205                                 xprt_release(task);
2206                                 value = atomic_long_dec_return(&xprt->queuelen);
2207                                 if (value == 0)
2208                                         rpc_xprt_switch_remove_xprt(xps, saved,
2209                                                                     true);
2210                                 xprt_put(saved);
2211                                 task->tk_xprt = NULL;
2212                                 task->tk_action = call_start;
2213                         }
2214                         xprt_switch_put(xps);
2215                         if (!task->tk_xprt)
2216                                 return;
2217                 }
2218                 goto out_retry;
2219         case -ENOBUFS:
2220                 rpc_delay(task, HZ >> 2);
2221                 goto out_retry;
2222         }
2223         rpc_call_rpcerror(task, status);
2224         return;
2225 out_next:
2226         task->tk_action = call_transmit;
2227         return;
2228 out_retry:
2229         /* Check for timeouts before looping back to call_bind */
2230         task->tk_action = call_bind;
2231         rpc_check_timeout(task);
2232 }
2233
2234 /*
2235  * 5.   Transmit the RPC request, and wait for reply
2236  */
2237 static void
2238 call_transmit(struct rpc_task *task)
2239 {
2240         if (rpc_task_transmitted(task)) {
2241                 rpc_task_handle_transmitted(task);
2242                 return;
2243         }
2244
2245         task->tk_action = call_transmit_status;
2246         if (!xprt_prepare_transmit(task))
2247                 return;
2248         task->tk_status = 0;
2249         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2250                 if (!xprt_connected(task->tk_xprt)) {
2251                         task->tk_status = -ENOTCONN;
2252                         return;
2253                 }
2254                 xprt_transmit(task);
2255         }
2256         xprt_end_transmit(task);
2257 }
2258
2259 /*
2260  * 5a.  Handle cleanup after a transmission
2261  */
2262 static void
2263 call_transmit_status(struct rpc_task *task)
2264 {
2265         task->tk_action = call_status;
2266
2267         /*
2268          * Common case: success.  Force the compiler to put this
2269          * test first.
2270          */
2271         if (rpc_task_transmitted(task)) {
2272                 task->tk_status = 0;
2273                 xprt_request_wait_receive(task);
2274                 return;
2275         }
2276
2277         switch (task->tk_status) {
2278         default:
2279                 break;
2280         case -EBADMSG:
2281                 task->tk_status = 0;
2282                 task->tk_action = call_encode;
2283                 break;
2284                 /*
2285                  * Special cases: if we've been waiting on the
2286                  * socket's write_space() callback, or if the
2287                  * socket just returned a connection error,
2288                  * then hold onto the transport lock.
2289                  */
2290         case -ENOMEM:
2291         case -ENOBUFS:
2292                 rpc_delay(task, HZ>>2);
2293                 fallthrough;
2294         case -EBADSLT:
2295         case -EAGAIN:
2296                 task->tk_action = call_transmit;
2297                 task->tk_status = 0;
2298                 break;
2299         case -ECONNREFUSED:
2300         case -EHOSTDOWN:
2301         case -ENETDOWN:
2302         case -EHOSTUNREACH:
2303         case -ENETUNREACH:
2304         case -EPERM:
2305                 if (RPC_IS_SOFTCONN(task)) {
2306                         if (!task->tk_msg.rpc_proc->p_proc)
2307                                 trace_xprt_ping(task->tk_xprt,
2308                                                 task->tk_status);
2309                         rpc_call_rpcerror(task, task->tk_status);
2310                         return;
2311                 }
2312                 fallthrough;
2313         case -ECONNRESET:
2314         case -ECONNABORTED:
2315         case -EADDRINUSE:
2316         case -ENOTCONN:
2317         case -EPIPE:
2318                 task->tk_action = call_bind;
2319                 task->tk_status = 0;
2320                 break;
2321         }
2322         rpc_check_timeout(task);
2323 }
2324
2325 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2326 static void call_bc_transmit(struct rpc_task *task);
2327 static void call_bc_transmit_status(struct rpc_task *task);
2328
2329 static void
2330 call_bc_encode(struct rpc_task *task)
2331 {
2332         xprt_request_enqueue_transmit(task);
2333         task->tk_action = call_bc_transmit;
2334 }
2335
2336 /*
2337  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2338  * addition, disconnect on connectivity errors.
2339  */
2340 static void
2341 call_bc_transmit(struct rpc_task *task)
2342 {
2343         task->tk_action = call_bc_transmit_status;
2344         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2345                 if (!xprt_prepare_transmit(task))
2346                         return;
2347                 task->tk_status = 0;
2348                 xprt_transmit(task);
2349         }
2350         xprt_end_transmit(task);
2351 }
2352
2353 static void
2354 call_bc_transmit_status(struct rpc_task *task)
2355 {
2356         struct rpc_rqst *req = task->tk_rqstp;
2357
2358         if (rpc_task_transmitted(task))
2359                 task->tk_status = 0;
2360
2361         switch (task->tk_status) {
2362         case 0:
2363                 /* Success */
2364         case -ENETDOWN:
2365         case -EHOSTDOWN:
2366         case -EHOSTUNREACH:
2367         case -ENETUNREACH:
2368         case -ECONNRESET:
2369         case -ECONNREFUSED:
2370         case -EADDRINUSE:
2371         case -ENOTCONN:
2372         case -EPIPE:
2373                 break;
2374         case -ENOMEM:
2375         case -ENOBUFS:
2376                 rpc_delay(task, HZ>>2);
2377                 fallthrough;
2378         case -EBADSLT:
2379         case -EAGAIN:
2380                 task->tk_status = 0;
2381                 task->tk_action = call_bc_transmit;
2382                 return;
2383         case -ETIMEDOUT:
2384                 /*
2385                  * Problem reaching the server.  Disconnect and let the
2386                  * forechannel reestablish the connection.  The server will
2387                  * have to retransmit the backchannel request and we'll
2388                  * reprocess it.  Since these ops are idempotent, there's no
2389                  * need to cache our reply at this time.
2390                  */
2391                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2392                         "error: %d\n", task->tk_status);
2393                 xprt_conditional_disconnect(req->rq_xprt,
2394                         req->rq_connect_cookie);
2395                 break;
2396         default:
2397                 /*
2398                  * We were unable to reply and will have to drop the
2399                  * request.  The server should reconnect and retransmit.
2400                  */
2401                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2402                         "error: %d\n", task->tk_status);
2403                 break;
2404         }
2405         task->tk_action = rpc_exit_task;
2406 }
2407 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2408
2409 /*
2410  * 6.   Sort out the RPC call status
2411  */
2412 static void
2413 call_status(struct rpc_task *task)
2414 {
2415         struct rpc_clnt *clnt = task->tk_client;
2416         int             status;
2417
2418         if (!task->tk_msg.rpc_proc->p_proc)
2419                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2420
2421         status = task->tk_status;
2422         if (status >= 0) {
2423                 task->tk_action = call_decode;
2424                 return;
2425         }
2426
2427         trace_rpc_call_status(task);
2428         task->tk_status = 0;
2429         switch(status) {
2430         case -EHOSTDOWN:
2431         case -ENETDOWN:
2432         case -EHOSTUNREACH:
2433         case -ENETUNREACH:
2434         case -EPERM:
2435                 if (RPC_IS_SOFTCONN(task))
2436                         goto out_exit;
2437                 /*
2438                  * Delay any retries for 3 seconds, then handle as if it
2439                  * were a timeout.
2440                  */
2441                 rpc_delay(task, 3*HZ);
2442                 fallthrough;
2443         case -ETIMEDOUT:
2444                 break;
2445         case -ECONNREFUSED:
2446         case -ECONNRESET:
2447         case -ECONNABORTED:
2448         case -ENOTCONN:
2449                 rpc_force_rebind(clnt);
2450                 break;
2451         case -EADDRINUSE:
2452                 rpc_delay(task, 3*HZ);
2453                 fallthrough;
2454         case -EPIPE:
2455         case -EAGAIN:
2456                 break;
2457         case -ENFILE:
2458         case -ENOBUFS:
2459         case -ENOMEM:
2460                 rpc_delay(task, HZ>>2);
2461                 break;
2462         case -EIO:
2463                 /* shutdown or soft timeout */
2464                 goto out_exit;
2465         default:
2466                 if (clnt->cl_chatty)
2467                         printk("%s: RPC call returned error %d\n",
2468                                clnt->cl_program->name, -status);
2469                 goto out_exit;
2470         }
2471         task->tk_action = call_encode;
2472         if (status != -ECONNRESET && status != -ECONNABORTED)
2473                 rpc_check_timeout(task);
2474         return;
2475 out_exit:
2476         rpc_call_rpcerror(task, status);
2477 }
2478
2479 static bool
2480 rpc_check_connected(const struct rpc_rqst *req)
2481 {
2482         /* No allocated request or transport? return true */
2483         if (!req || !req->rq_xprt)
2484                 return true;
2485         return xprt_connected(req->rq_xprt);
2486 }
2487
2488 static void
2489 rpc_check_timeout(struct rpc_task *task)
2490 {
2491         struct rpc_clnt *clnt = task->tk_client;
2492
2493         if (RPC_SIGNALLED(task))
2494                 return;
2495
2496         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2497                 return;
2498
2499         trace_rpc_timeout_status(task);
2500         task->tk_timeouts++;
2501
2502         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2503                 rpc_call_rpcerror(task, -ETIMEDOUT);
2504                 return;
2505         }
2506
2507         if (RPC_IS_SOFT(task)) {
2508                 /*
2509                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2510                  * been sent, it should time out only if the transport
2511                  * connection gets terminally broken.
2512                  */
2513                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2514                     rpc_check_connected(task->tk_rqstp))
2515                         return;
2516
2517                 if (clnt->cl_chatty) {
2518                         pr_notice_ratelimited(
2519                                 "%s: server %s not responding, timed out\n",
2520                                 clnt->cl_program->name,
2521                                 task->tk_xprt->servername);
2522                 }
2523                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2524                         rpc_call_rpcerror(task, -ETIMEDOUT);
2525                 else
2526                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2527                 return;
2528         }
2529
2530         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2531                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2532                 if (clnt->cl_chatty) {
2533                         pr_notice_ratelimited(
2534                                 "%s: server %s not responding, still trying\n",
2535                                 clnt->cl_program->name,
2536                                 task->tk_xprt->servername);
2537                 }
2538         }
2539         rpc_force_rebind(clnt);
2540         /*
2541          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2542          * event? RFC2203 requires the server to drop all such requests.
2543          */
2544         rpcauth_invalcred(task);
2545 }
2546
2547 /*
2548  * 7.   Decode the RPC reply
2549  */
2550 static void
2551 call_decode(struct rpc_task *task)
2552 {
2553         struct rpc_clnt *clnt = task->tk_client;
2554         struct rpc_rqst *req = task->tk_rqstp;
2555         struct xdr_stream xdr;
2556         int err;
2557
2558         if (!task->tk_msg.rpc_proc->p_decode) {
2559                 task->tk_action = rpc_exit_task;
2560                 return;
2561         }
2562
2563         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2564                 if (clnt->cl_chatty) {
2565                         pr_notice_ratelimited("%s: server %s OK\n",
2566                                 clnt->cl_program->name,
2567                                 task->tk_xprt->servername);
2568                 }
2569                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2570         }
2571
2572         /*
2573          * Did we ever call xprt_complete_rqst()? If not, we should assume
2574          * the message is incomplete.
2575          */
2576         err = -EAGAIN;
2577         if (!req->rq_reply_bytes_recvd)
2578                 goto out;
2579
2580         /* Ensure that we see all writes made by xprt_complete_rqst()
2581          * before it changed req->rq_reply_bytes_recvd.
2582          */
2583         smp_rmb();
2584
2585         req->rq_rcv_buf.len = req->rq_private_buf.len;
2586         trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2587
2588         /* Check that the softirq receive buffer is valid */
2589         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2590                                 sizeof(req->rq_rcv_buf)) != 0);
2591
2592         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2593                         req->rq_rcv_buf.head[0].iov_base, req);
2594         err = rpc_decode_header(task, &xdr);
2595 out:
2596         switch (err) {
2597         case 0:
2598                 task->tk_action = rpc_exit_task;
2599                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2600                 return;
2601         case -EAGAIN:
2602                 task->tk_status = 0;
2603                 if (task->tk_client->cl_discrtry)
2604                         xprt_conditional_disconnect(req->rq_xprt,
2605                                                     req->rq_connect_cookie);
2606                 task->tk_action = call_encode;
2607                 rpc_check_timeout(task);
2608                 break;
2609         case -EKEYREJECTED:
2610                 task->tk_action = call_reserve;
2611                 rpc_check_timeout(task);
2612                 rpcauth_invalcred(task);
2613                 /* Ensure we obtain a new XID if we retry! */
2614                 xprt_release(task);
2615         }
2616 }
2617
2618 static int
2619 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2620 {
2621         struct rpc_clnt *clnt = task->tk_client;
2622         struct rpc_rqst *req = task->tk_rqstp;
2623         __be32 *p;
2624         int error;
2625
2626         error = -EMSGSIZE;
2627         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2628         if (!p)
2629                 goto out_fail;
2630         *p++ = req->rq_xid;
2631         *p++ = rpc_call;
2632         *p++ = cpu_to_be32(RPC_VERSION);
2633         *p++ = cpu_to_be32(clnt->cl_prog);
2634         *p++ = cpu_to_be32(clnt->cl_vers);
2635         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2636
2637         error = rpcauth_marshcred(task, xdr);
2638         if (error < 0)
2639                 goto out_fail;
2640         return 0;
2641 out_fail:
2642         trace_rpc_bad_callhdr(task);
2643         rpc_call_rpcerror(task, error);
2644         return error;
2645 }
2646
2647 static noinline int
2648 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2649 {
2650         struct rpc_clnt *clnt = task->tk_client;
2651         int error;
2652         __be32 *p;
2653
2654         /* RFC-1014 says that the representation of XDR data must be a
2655          * multiple of four bytes
2656          * - if it isn't pointer subtraction in the NFS client may give
2657          *   undefined results
2658          */
2659         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2660                 goto out_unparsable;
2661
2662         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2663         if (!p)
2664                 goto out_unparsable;
2665         p++;    /* skip XID */
2666         if (*p++ != rpc_reply)
2667                 goto out_unparsable;
2668         if (*p++ != rpc_msg_accepted)
2669                 goto out_msg_denied;
2670
2671         error = rpcauth_checkverf(task, xdr);
2672         if (error)
2673                 goto out_verifier;
2674
2675         p = xdr_inline_decode(xdr, sizeof(*p));
2676         if (!p)
2677                 goto out_unparsable;
2678         switch (*p) {
2679         case rpc_success:
2680                 return 0;
2681         case rpc_prog_unavail:
2682                 trace_rpc__prog_unavail(task);
2683                 error = -EPFNOSUPPORT;
2684                 goto out_err;
2685         case rpc_prog_mismatch:
2686                 trace_rpc__prog_mismatch(task);
2687                 error = -EPROTONOSUPPORT;
2688                 goto out_err;
2689         case rpc_proc_unavail:
2690                 trace_rpc__proc_unavail(task);
2691                 error = -EOPNOTSUPP;
2692                 goto out_err;
2693         case rpc_garbage_args:
2694         case rpc_system_err:
2695                 trace_rpc__garbage_args(task);
2696                 error = -EIO;
2697                 break;
2698         default:
2699                 goto out_unparsable;
2700         }
2701
2702 out_garbage:
2703         clnt->cl_stats->rpcgarbage++;
2704         if (task->tk_garb_retry) {
2705                 task->tk_garb_retry--;
2706                 task->tk_action = call_encode;
2707                 return -EAGAIN;
2708         }
2709 out_err:
2710         rpc_call_rpcerror(task, error);
2711         return error;
2712
2713 out_unparsable:
2714         trace_rpc__unparsable(task);
2715         error = -EIO;
2716         goto out_garbage;
2717
2718 out_verifier:
2719         trace_rpc_bad_verifier(task);
2720         goto out_err;
2721
2722 out_msg_denied:
2723         error = -EACCES;
2724         p = xdr_inline_decode(xdr, sizeof(*p));
2725         if (!p)
2726                 goto out_unparsable;
2727         switch (*p++) {
2728         case rpc_auth_error:
2729                 break;
2730         case rpc_mismatch:
2731                 trace_rpc__mismatch(task);
2732                 error = -EPROTONOSUPPORT;
2733                 goto out_err;
2734         default:
2735                 goto out_unparsable;
2736         }
2737
2738         p = xdr_inline_decode(xdr, sizeof(*p));
2739         if (!p)
2740                 goto out_unparsable;
2741         switch (*p++) {
2742         case rpc_autherr_rejectedcred:
2743         case rpc_autherr_rejectedverf:
2744         case rpcsec_gsserr_credproblem:
2745         case rpcsec_gsserr_ctxproblem:
2746                 if (!task->tk_cred_retry)
2747                         break;
2748                 task->tk_cred_retry--;
2749                 trace_rpc__stale_creds(task);
2750                 return -EKEYREJECTED;
2751         case rpc_autherr_badcred:
2752         case rpc_autherr_badverf:
2753                 /* possibly garbled cred/verf? */
2754                 if (!task->tk_garb_retry)
2755                         break;
2756                 task->tk_garb_retry--;
2757                 trace_rpc__bad_creds(task);
2758                 task->tk_action = call_encode;
2759                 return -EAGAIN;
2760         case rpc_autherr_tooweak:
2761                 trace_rpc__auth_tooweak(task);
2762                 pr_warn("RPC: server %s requires stronger authentication.\n",
2763                         task->tk_xprt->servername);
2764                 break;
2765         default:
2766                 goto out_unparsable;
2767         }
2768         goto out_err;
2769 }
2770
2771 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2772                 const void *obj)
2773 {
2774 }
2775
2776 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2777                 void *obj)
2778 {
2779         return 0;
2780 }
2781
2782 static const struct rpc_procinfo rpcproc_null = {
2783         .p_encode = rpcproc_encode_null,
2784         .p_decode = rpcproc_decode_null,
2785 };
2786
2787 static const struct rpc_procinfo rpcproc_null_noreply = {
2788         .p_encode = rpcproc_encode_null,
2789 };
2790
2791 static void
2792 rpc_null_call_prepare(struct rpc_task *task, void *data)
2793 {
2794         task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2795         rpc_call_start(task);
2796 }
2797
2798 static const struct rpc_call_ops rpc_null_ops = {
2799         .rpc_call_prepare = rpc_null_call_prepare,
2800         .rpc_call_done = rpc_default_callback,
2801 };
2802
2803 static
2804 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2805                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2806                 const struct rpc_call_ops *ops, void *data)
2807 {
2808         struct rpc_message msg = {
2809                 .rpc_proc = &rpcproc_null,
2810         };
2811         struct rpc_task_setup task_setup_data = {
2812                 .rpc_client = clnt,
2813                 .rpc_xprt = xprt,
2814                 .rpc_message = &msg,
2815                 .rpc_op_cred = cred,
2816                 .callback_ops = ops ?: &rpc_null_ops,
2817                 .callback_data = data,
2818                 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2819                          RPC_TASK_NULLCREDS,
2820         };
2821
2822         return rpc_run_task(&task_setup_data);
2823 }
2824
2825 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2826 {
2827         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2828 }
2829 EXPORT_SYMBOL_GPL(rpc_call_null);
2830
2831 static int rpc_ping(struct rpc_clnt *clnt)
2832 {
2833         struct rpc_task *task;
2834         int status;
2835
2836         if (clnt->cl_auth->au_ops->ping)
2837                 return clnt->cl_auth->au_ops->ping(clnt);
2838
2839         task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2840         if (IS_ERR(task))
2841                 return PTR_ERR(task);
2842         status = task->tk_status;
2843         rpc_put_task(task);
2844         return status;
2845 }
2846
2847 static int rpc_ping_noreply(struct rpc_clnt *clnt)
2848 {
2849         struct rpc_message msg = {
2850                 .rpc_proc = &rpcproc_null_noreply,
2851         };
2852         struct rpc_task_setup task_setup_data = {
2853                 .rpc_client = clnt,
2854                 .rpc_message = &msg,
2855                 .callback_ops = &rpc_null_ops,
2856                 .flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2857         };
2858         struct rpc_task *task;
2859         int status;
2860
2861         task = rpc_run_task(&task_setup_data);
2862         if (IS_ERR(task))
2863                 return PTR_ERR(task);
2864         status = task->tk_status;
2865         rpc_put_task(task);
2866         return status;
2867 }
2868
2869 struct rpc_cb_add_xprt_calldata {
2870         struct rpc_xprt_switch *xps;
2871         struct rpc_xprt *xprt;
2872 };
2873
2874 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2875 {
2876         struct rpc_cb_add_xprt_calldata *data = calldata;
2877
2878         if (task->tk_status == 0)
2879                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2880 }
2881
2882 static void rpc_cb_add_xprt_release(void *calldata)
2883 {
2884         struct rpc_cb_add_xprt_calldata *data = calldata;
2885
2886         xprt_put(data->xprt);
2887         xprt_switch_put(data->xps);
2888         kfree(data);
2889 }
2890
2891 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2892         .rpc_call_prepare = rpc_null_call_prepare,
2893         .rpc_call_done = rpc_cb_add_xprt_done,
2894         .rpc_release = rpc_cb_add_xprt_release,
2895 };
2896
2897 /**
2898  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2899  * @clnt: pointer to struct rpc_clnt
2900  * @xps: pointer to struct rpc_xprt_switch,
2901  * @xprt: pointer struct rpc_xprt
2902  * @dummy: unused
2903  */
2904 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2905                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2906                 void *dummy)
2907 {
2908         struct rpc_cb_add_xprt_calldata *data;
2909         struct rpc_task *task;
2910
2911         if (xps->xps_nunique_destaddr_xprts + 1 > clnt->cl_max_connect) {
2912                 rcu_read_lock();
2913                 pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2914                         "transport to server: %s\n", clnt->cl_max_connect,
2915                         rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2916                 rcu_read_unlock();
2917                 return -EINVAL;
2918         }
2919
2920         data = kmalloc(sizeof(*data), GFP_KERNEL);
2921         if (!data)
2922                 return -ENOMEM;
2923         data->xps = xprt_switch_get(xps);
2924         data->xprt = xprt_get(xprt);
2925         if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2926                 rpc_cb_add_xprt_release(data);
2927                 goto success;
2928         }
2929
2930         task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2931                         &rpc_cb_add_xprt_call_ops, data);
2932         if (IS_ERR(task))
2933                 return PTR_ERR(task);
2934
2935         data->xps->xps_nunique_destaddr_xprts++;
2936         rpc_put_task(task);
2937 success:
2938         return 1;
2939 }
2940 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2941
2942 static int rpc_clnt_add_xprt_helper(struct rpc_clnt *clnt,
2943                                     struct rpc_xprt *xprt,
2944                                     struct rpc_add_xprt_test *data)
2945 {
2946         struct rpc_task *task;
2947         int status = -EADDRINUSE;
2948
2949         /* Test the connection */
2950         task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2951         if (IS_ERR(task))
2952                 return PTR_ERR(task);
2953
2954         status = task->tk_status;
2955         rpc_put_task(task);
2956
2957         if (status < 0)
2958                 return status;
2959
2960         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2961         data->add_xprt_test(clnt, xprt, data->data);
2962
2963         return 0;
2964 }
2965
2966 /**
2967  * rpc_clnt_setup_test_and_add_xprt()
2968  *
2969  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2970  *   1) caller of the test function must dereference the rpc_xprt_switch
2971  *   and the rpc_xprt.
2972  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2973  *   the rpc_call_done routine.
2974  *
2975  * Upon success (return of 1), the test function adds the new
2976  * transport to the rpc_clnt xprt switch
2977  *
2978  * @clnt: struct rpc_clnt to get the new transport
2979  * @xps:  the rpc_xprt_switch to hold the new transport
2980  * @xprt: the rpc_xprt to test
2981  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2982  *        and test function call data
2983  */
2984 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2985                                      struct rpc_xprt_switch *xps,
2986                                      struct rpc_xprt *xprt,
2987                                      void *data)
2988 {
2989         int status = -EADDRINUSE;
2990
2991         xprt = xprt_get(xprt);
2992         xprt_switch_get(xps);
2993
2994         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2995                 goto out_err;
2996
2997         status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
2998         if (status < 0)
2999                 goto out_err;
3000
3001         status = 1;
3002 out_err:
3003         xprt_put(xprt);
3004         xprt_switch_put(xps);
3005         if (status < 0)
3006                 pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not "
3007                         "added\n", status,
3008                         xprt->address_strings[RPC_DISPLAY_ADDR]);
3009         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
3010         return status;
3011 }
3012 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
3013
3014 /**
3015  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
3016  * @clnt: pointer to struct rpc_clnt
3017  * @xprtargs: pointer to struct xprt_create
3018  * @setup: callback to test and/or set up the connection
3019  * @data: pointer to setup function data
3020  *
3021  * Creates a new transport using the parameters set in args and
3022  * adds it to clnt.
3023  * If ping is set, then test that connectivity succeeds before
3024  * adding the new transport.
3025  *
3026  */
3027 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
3028                 struct xprt_create *xprtargs,
3029                 int (*setup)(struct rpc_clnt *,
3030                         struct rpc_xprt_switch *,
3031                         struct rpc_xprt *,
3032                         void *),
3033                 void *data)
3034 {
3035         struct rpc_xprt_switch *xps;
3036         struct rpc_xprt *xprt;
3037         unsigned long connect_timeout;
3038         unsigned long reconnect_timeout;
3039         unsigned char resvport, reuseport;
3040         int ret = 0, ident;
3041
3042         rcu_read_lock();
3043         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3044         xprt = xprt_iter_xprt(&clnt->cl_xpi);
3045         if (xps == NULL || xprt == NULL) {
3046                 rcu_read_unlock();
3047                 xprt_switch_put(xps);
3048                 return -EAGAIN;
3049         }
3050         resvport = xprt->resvport;
3051         reuseport = xprt->reuseport;
3052         connect_timeout = xprt->connect_timeout;
3053         reconnect_timeout = xprt->max_reconnect_timeout;
3054         ident = xprt->xprt_class->ident;
3055         rcu_read_unlock();
3056
3057         if (!xprtargs->ident)
3058                 xprtargs->ident = ident;
3059         xprtargs->xprtsec = clnt->cl_xprtsec;
3060         xprt = xprt_create_transport(xprtargs);
3061         if (IS_ERR(xprt)) {
3062                 ret = PTR_ERR(xprt);
3063                 goto out_put_switch;
3064         }
3065         xprt->resvport = resvport;
3066         xprt->reuseport = reuseport;
3067         if (xprt->ops->set_connect_timeout != NULL)
3068                 xprt->ops->set_connect_timeout(xprt,
3069                                 connect_timeout,
3070                                 reconnect_timeout);
3071
3072         rpc_xprt_switch_set_roundrobin(xps);
3073         if (setup) {
3074                 ret = setup(clnt, xps, xprt, data);
3075                 if (ret != 0)
3076                         goto out_put_xprt;
3077         }
3078         rpc_xprt_switch_add_xprt(xps, xprt);
3079 out_put_xprt:
3080         xprt_put(xprt);
3081 out_put_switch:
3082         xprt_switch_put(xps);
3083         return ret;
3084 }
3085 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3086
3087 static int rpc_xprt_probe_trunked(struct rpc_clnt *clnt,
3088                                   struct rpc_xprt *xprt,
3089                                   struct rpc_add_xprt_test *data)
3090 {
3091         struct rpc_xprt_switch *xps;
3092         struct rpc_xprt *main_xprt;
3093         int status = 0;
3094
3095         xprt_get(xprt);
3096
3097         rcu_read_lock();
3098         main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3099         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3100         status = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3101                                    (struct sockaddr *)&main_xprt->addr);
3102         rcu_read_unlock();
3103         xprt_put(main_xprt);
3104         if (status || !test_bit(XPRT_OFFLINE, &xprt->state))
3105                 goto out;
3106
3107         status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3108 out:
3109         xprt_put(xprt);
3110         xprt_switch_put(xps);
3111         return status;
3112 }
3113
3114 /* rpc_clnt_probe_trunked_xprt -- probe offlined transport for session trunking
3115  * @clnt rpc_clnt structure
3116  *
3117  * For each offlined transport found in the rpc_clnt structure call
3118  * the function rpc_xprt_probe_trunked() which will determine if this
3119  * transport still belongs to the trunking group.
3120  */
3121 void rpc_clnt_probe_trunked_xprts(struct rpc_clnt *clnt,
3122                                   struct rpc_add_xprt_test *data)
3123 {
3124         struct rpc_xprt_iter xpi;
3125         int ret;
3126
3127         ret = rpc_clnt_xprt_iter_offline_init(clnt, &xpi);
3128         if (ret)
3129                 return;
3130         for (;;) {
3131                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
3132
3133                 if (!xprt)
3134                         break;
3135                 ret = rpc_xprt_probe_trunked(clnt, xprt, data);
3136                 xprt_put(xprt);
3137                 if (ret < 0)
3138                         break;
3139                 xprt_iter_rewind(&xpi);
3140         }
3141         xprt_iter_destroy(&xpi);
3142 }
3143 EXPORT_SYMBOL_GPL(rpc_clnt_probe_trunked_xprts);
3144
3145 static int rpc_xprt_offline(struct rpc_clnt *clnt,
3146                             struct rpc_xprt *xprt,
3147                             void *data)
3148 {
3149         struct rpc_xprt *main_xprt;
3150         struct rpc_xprt_switch *xps;
3151         int err = 0;
3152
3153         xprt_get(xprt);
3154
3155         rcu_read_lock();
3156         main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3157         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3158         err = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3159                                 (struct sockaddr *)&main_xprt->addr);
3160         rcu_read_unlock();
3161         xprt_put(main_xprt);
3162         if (err)
3163                 goto out;
3164
3165         if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) {
3166                 err = -EINTR;
3167                 goto out;
3168         }
3169         xprt_set_offline_locked(xprt, xps);
3170
3171         xprt_release_write(xprt, NULL);
3172 out:
3173         xprt_put(xprt);
3174         xprt_switch_put(xps);
3175         return err;
3176 }
3177
3178 /* rpc_clnt_manage_trunked_xprts -- offline trunked transports
3179  * @clnt rpc_clnt structure
3180  *
3181  * For each active transport found in the rpc_clnt structure call
3182  * the function rpc_xprt_offline() which will identify trunked transports
3183  * and will mark them offline.
3184  */
3185 void rpc_clnt_manage_trunked_xprts(struct rpc_clnt *clnt)
3186 {
3187         rpc_clnt_iterate_for_each_xprt(clnt, rpc_xprt_offline, NULL);
3188 }
3189 EXPORT_SYMBOL_GPL(rpc_clnt_manage_trunked_xprts);
3190
3191 struct connect_timeout_data {
3192         unsigned long connect_timeout;
3193         unsigned long reconnect_timeout;
3194 };
3195
3196 static int
3197 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3198                 struct rpc_xprt *xprt,
3199                 void *data)
3200 {
3201         struct connect_timeout_data *timeo = data;
3202
3203         if (xprt->ops->set_connect_timeout)
3204                 xprt->ops->set_connect_timeout(xprt,
3205                                 timeo->connect_timeout,
3206                                 timeo->reconnect_timeout);
3207         return 0;
3208 }
3209
3210 void
3211 rpc_set_connect_timeout(struct rpc_clnt *clnt,
3212                 unsigned long connect_timeout,
3213                 unsigned long reconnect_timeout)
3214 {
3215         struct connect_timeout_data timeout = {
3216                 .connect_timeout = connect_timeout,
3217                 .reconnect_timeout = reconnect_timeout,
3218         };
3219         rpc_clnt_iterate_for_each_xprt(clnt,
3220                         rpc_xprt_set_connect_timeout,
3221                         &timeout);
3222 }
3223 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3224
3225 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
3226 {
3227         rcu_read_lock();
3228         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3229         rcu_read_unlock();
3230 }
3231 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
3232
3233 void rpc_clnt_xprt_set_online(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3234 {
3235         struct rpc_xprt_switch *xps;
3236
3237         rcu_read_lock();
3238         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3239         rcu_read_unlock();
3240         xprt_set_online_locked(xprt, xps);
3241 }
3242
3243 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3244 {
3245         if (rpc_clnt_xprt_switch_has_addr(clnt,
3246                 (const struct sockaddr *)&xprt->addr)) {
3247                 return rpc_clnt_xprt_set_online(clnt, xprt);
3248         }
3249         rcu_read_lock();
3250         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3251                                  xprt);
3252         rcu_read_unlock();
3253 }
3254 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3255
3256 void rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3257 {
3258         struct rpc_xprt_switch *xps;
3259
3260         rcu_read_lock();
3261         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3262         rpc_xprt_switch_remove_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3263                                     xprt, 0);
3264         xps->xps_nunique_destaddr_xprts--;
3265         rcu_read_unlock();
3266 }
3267 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_remove_xprt);
3268
3269 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3270                                    const struct sockaddr *sap)
3271 {
3272         struct rpc_xprt_switch *xps;
3273         bool ret;
3274
3275         rcu_read_lock();
3276         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3277         ret = rpc_xprt_switch_has_addr(xps, sap);
3278         rcu_read_unlock();
3279         return ret;
3280 }
3281 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3282
3283 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3284 static void rpc_show_header(void)
3285 {
3286         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3287                 "-timeout ---ops--\n");
3288 }
3289
3290 static void rpc_show_task(const struct rpc_clnt *clnt,
3291                           const struct rpc_task *task)
3292 {
3293         const char *rpc_waitq = "none";
3294
3295         if (RPC_IS_QUEUED(task))
3296                 rpc_waitq = rpc_qname(task->tk_waitqueue);
3297
3298         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3299                 task->tk_pid, task->tk_flags, task->tk_status,
3300                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3301                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3302                 task->tk_action, rpc_waitq);
3303 }
3304
3305 void rpc_show_tasks(struct net *net)
3306 {
3307         struct rpc_clnt *clnt;
3308         struct rpc_task *task;
3309         int header = 0;
3310         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3311
3312         spin_lock(&sn->rpc_client_lock);
3313         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3314                 spin_lock(&clnt->cl_lock);
3315                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3316                         if (!header) {
3317                                 rpc_show_header();
3318                                 header++;
3319                         }
3320                         rpc_show_task(clnt, task);
3321                 }
3322                 spin_unlock(&clnt->cl_lock);
3323         }
3324         spin_unlock(&sn->rpc_client_lock);
3325 }
3326 #endif
3327
3328 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3329 static int
3330 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3331                 struct rpc_xprt *xprt,
3332                 void *dummy)
3333 {
3334         return xprt_enable_swap(xprt);
3335 }
3336
3337 int
3338 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3339 {
3340         while (clnt != clnt->cl_parent)
3341                 clnt = clnt->cl_parent;
3342         if (atomic_inc_return(&clnt->cl_swapper) == 1)
3343                 return rpc_clnt_iterate_for_each_xprt(clnt,
3344                                 rpc_clnt_swap_activate_callback, NULL);
3345         return 0;
3346 }
3347 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3348
3349 static int
3350 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3351                 struct rpc_xprt *xprt,
3352                 void *dummy)
3353 {
3354         xprt_disable_swap(xprt);
3355         return 0;
3356 }
3357
3358 void
3359 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3360 {
3361         while (clnt != clnt->cl_parent)
3362                 clnt = clnt->cl_parent;
3363         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3364                 rpc_clnt_iterate_for_each_xprt(clnt,
3365                                 rpc_clnt_swap_deactivate_callback, NULL);
3366 }
3367 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3368 #endif /* CONFIG_SUNRPC_SWAP */