SUNRPC release the transport of a relocated task with an assigned transport
[platform/kernel/linux-rpi.git] / net / sunrpc / clnt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -   RPC header generation and argument serialization.
10  *  -   Credential refresh.
11  *  -   TCP connect handling.
12  *  -   Retry of operation when it is suspected the operation failed because
13  *      of uid squashing on the server, or when the credentials were stale
14  *      and need to be refreshed, or when a packet was damaged in transit.
15  *      This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20
21
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY        RPCDBG_CALL
49 #endif
50
51 /*
52  * All RPC clients are linked into this list
53  */
54
55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56
57
58 static void     call_start(struct rpc_task *task);
59 static void     call_reserve(struct rpc_task *task);
60 static void     call_reserveresult(struct rpc_task *task);
61 static void     call_allocate(struct rpc_task *task);
62 static void     call_encode(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 static void     call_status(struct rpc_task *task);
68 static void     call_transmit_status(struct rpc_task *task);
69 static void     call_refresh(struct rpc_task *task);
70 static void     call_refreshresult(struct rpc_task *task);
71 static void     call_connect(struct rpc_task *task);
72 static void     call_connect_status(struct rpc_task *task);
73
74 static int      rpc_encode_header(struct rpc_task *task,
75                                   struct xdr_stream *xdr);
76 static int      rpc_decode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_ping(struct rpc_clnt *clnt);
79 static void     rpc_check_timeout(struct rpc_task *task);
80
81 static void rpc_register_client(struct rpc_clnt *clnt)
82 {
83         struct net *net = rpc_net_ns(clnt);
84         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
85
86         spin_lock(&sn->rpc_client_lock);
87         list_add(&clnt->cl_clients, &sn->all_clients);
88         spin_unlock(&sn->rpc_client_lock);
89 }
90
91 static void rpc_unregister_client(struct rpc_clnt *clnt)
92 {
93         struct net *net = rpc_net_ns(clnt);
94         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
95
96         spin_lock(&sn->rpc_client_lock);
97         list_del(&clnt->cl_clients);
98         spin_unlock(&sn->rpc_client_lock);
99 }
100
101 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
102 {
103         rpc_remove_client_dir(clnt);
104 }
105
106 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
107 {
108         struct net *net = rpc_net_ns(clnt);
109         struct super_block *pipefs_sb;
110
111         pipefs_sb = rpc_get_sb_net(net);
112         if (pipefs_sb) {
113                 __rpc_clnt_remove_pipedir(clnt);
114                 rpc_put_sb_net(net);
115         }
116 }
117
118 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
119                                     struct rpc_clnt *clnt)
120 {
121         static uint32_t clntid;
122         const char *dir_name = clnt->cl_program->pipe_dir_name;
123         char name[15];
124         struct dentry *dir, *dentry;
125
126         dir = rpc_d_lookup_sb(sb, dir_name);
127         if (dir == NULL) {
128                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
129                 return dir;
130         }
131         for (;;) {
132                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
133                 name[sizeof(name) - 1] = '\0';
134                 dentry = rpc_create_client_dir(dir, name, clnt);
135                 if (!IS_ERR(dentry))
136                         break;
137                 if (dentry == ERR_PTR(-EEXIST))
138                         continue;
139                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
140                                 " %s/%s, error %ld\n",
141                                 dir_name, name, PTR_ERR(dentry));
142                 break;
143         }
144         dput(dir);
145         return dentry;
146 }
147
148 static int
149 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
150 {
151         struct dentry *dentry;
152
153         if (clnt->cl_program->pipe_dir_name != NULL) {
154                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
155                 if (IS_ERR(dentry))
156                         return PTR_ERR(dentry);
157         }
158         return 0;
159 }
160
161 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
162 {
163         if (clnt->cl_program->pipe_dir_name == NULL)
164                 return 1;
165
166         switch (event) {
167         case RPC_PIPEFS_MOUNT:
168                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
169                         return 1;
170                 if (refcount_read(&clnt->cl_count) == 0)
171                         return 1;
172                 break;
173         case RPC_PIPEFS_UMOUNT:
174                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
175                         return 1;
176                 break;
177         }
178         return 0;
179 }
180
181 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
182                                    struct super_block *sb)
183 {
184         struct dentry *dentry;
185
186         switch (event) {
187         case RPC_PIPEFS_MOUNT:
188                 dentry = rpc_setup_pipedir_sb(sb, clnt);
189                 if (!dentry)
190                         return -ENOENT;
191                 if (IS_ERR(dentry))
192                         return PTR_ERR(dentry);
193                 break;
194         case RPC_PIPEFS_UMOUNT:
195                 __rpc_clnt_remove_pipedir(clnt);
196                 break;
197         default:
198                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
199                 return -ENOTSUPP;
200         }
201         return 0;
202 }
203
204 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
205                                 struct super_block *sb)
206 {
207         int error = 0;
208
209         for (;; clnt = clnt->cl_parent) {
210                 if (!rpc_clnt_skip_event(clnt, event))
211                         error = __rpc_clnt_handle_event(clnt, event, sb);
212                 if (error || clnt == clnt->cl_parent)
213                         break;
214         }
215         return error;
216 }
217
218 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
219 {
220         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
221         struct rpc_clnt *clnt;
222
223         spin_lock(&sn->rpc_client_lock);
224         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
225                 if (rpc_clnt_skip_event(clnt, event))
226                         continue;
227                 spin_unlock(&sn->rpc_client_lock);
228                 return clnt;
229         }
230         spin_unlock(&sn->rpc_client_lock);
231         return NULL;
232 }
233
234 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
235                             void *ptr)
236 {
237         struct super_block *sb = ptr;
238         struct rpc_clnt *clnt;
239         int error = 0;
240
241         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
242                 error = __rpc_pipefs_event(clnt, event, sb);
243                 if (error)
244                         break;
245         }
246         return error;
247 }
248
249 static struct notifier_block rpc_clients_block = {
250         .notifier_call  = rpc_pipefs_event,
251         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
252 };
253
254 int rpc_clients_notifier_register(void)
255 {
256         return rpc_pipefs_notifier_register(&rpc_clients_block);
257 }
258
259 void rpc_clients_notifier_unregister(void)
260 {
261         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
262 }
263
264 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
265                 struct rpc_xprt *xprt,
266                 const struct rpc_timeout *timeout)
267 {
268         struct rpc_xprt *old;
269
270         spin_lock(&clnt->cl_lock);
271         old = rcu_dereference_protected(clnt->cl_xprt,
272                         lockdep_is_held(&clnt->cl_lock));
273
274         if (!xprt_bound(xprt))
275                 clnt->cl_autobind = 1;
276
277         clnt->cl_timeout = timeout;
278         rcu_assign_pointer(clnt->cl_xprt, xprt);
279         spin_unlock(&clnt->cl_lock);
280
281         return old;
282 }
283
284 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
285 {
286         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
287                         nodename, sizeof(clnt->cl_nodename));
288 }
289
290 static int rpc_client_register(struct rpc_clnt *clnt,
291                                rpc_authflavor_t pseudoflavor,
292                                const char *client_name)
293 {
294         struct rpc_auth_create_args auth_args = {
295                 .pseudoflavor = pseudoflavor,
296                 .target_name = client_name,
297         };
298         struct rpc_auth *auth;
299         struct net *net = rpc_net_ns(clnt);
300         struct super_block *pipefs_sb;
301         int err;
302
303         rpc_clnt_debugfs_register(clnt);
304
305         pipefs_sb = rpc_get_sb_net(net);
306         if (pipefs_sb) {
307                 err = rpc_setup_pipedir(pipefs_sb, clnt);
308                 if (err)
309                         goto out;
310         }
311
312         rpc_register_client(clnt);
313         if (pipefs_sb)
314                 rpc_put_sb_net(net);
315
316         auth = rpcauth_create(&auth_args, clnt);
317         if (IS_ERR(auth)) {
318                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
319                                 pseudoflavor);
320                 err = PTR_ERR(auth);
321                 goto err_auth;
322         }
323         return 0;
324 err_auth:
325         pipefs_sb = rpc_get_sb_net(net);
326         rpc_unregister_client(clnt);
327         __rpc_clnt_remove_pipedir(clnt);
328 out:
329         if (pipefs_sb)
330                 rpc_put_sb_net(net);
331         rpc_sysfs_client_destroy(clnt);
332         rpc_clnt_debugfs_unregister(clnt);
333         return err;
334 }
335
336 static DEFINE_IDA(rpc_clids);
337
338 void rpc_cleanup_clids(void)
339 {
340         ida_destroy(&rpc_clids);
341 }
342
343 static int rpc_alloc_clid(struct rpc_clnt *clnt)
344 {
345         int clid;
346
347         clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
348         if (clid < 0)
349                 return clid;
350         clnt->cl_clid = clid;
351         return 0;
352 }
353
354 static void rpc_free_clid(struct rpc_clnt *clnt)
355 {
356         ida_simple_remove(&rpc_clids, clnt->cl_clid);
357 }
358
359 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
360                 struct rpc_xprt_switch *xps,
361                 struct rpc_xprt *xprt,
362                 struct rpc_clnt *parent)
363 {
364         const struct rpc_program *program = args->program;
365         const struct rpc_version *version;
366         struct rpc_clnt *clnt = NULL;
367         const struct rpc_timeout *timeout;
368         const char *nodename = args->nodename;
369         int err;
370
371         err = rpciod_up();
372         if (err)
373                 goto out_no_rpciod;
374
375         err = -EINVAL;
376         if (args->version >= program->nrvers)
377                 goto out_err;
378         version = program->version[args->version];
379         if (version == NULL)
380                 goto out_err;
381
382         err = -ENOMEM;
383         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
384         if (!clnt)
385                 goto out_err;
386         clnt->cl_parent = parent ? : clnt;
387
388         err = rpc_alloc_clid(clnt);
389         if (err)
390                 goto out_no_clid;
391
392         clnt->cl_cred     = get_cred(args->cred);
393         clnt->cl_procinfo = version->procs;
394         clnt->cl_maxproc  = version->nrprocs;
395         clnt->cl_prog     = args->prognumber ? : program->number;
396         clnt->cl_vers     = version->number;
397         clnt->cl_stats    = program->stats;
398         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
399         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
400         err = -ENOMEM;
401         if (clnt->cl_metrics == NULL)
402                 goto out_no_stats;
403         clnt->cl_program  = program;
404         INIT_LIST_HEAD(&clnt->cl_tasks);
405         spin_lock_init(&clnt->cl_lock);
406
407         timeout = xprt->timeout;
408         if (args->timeout != NULL) {
409                 memcpy(&clnt->cl_timeout_default, args->timeout,
410                                 sizeof(clnt->cl_timeout_default));
411                 timeout = &clnt->cl_timeout_default;
412         }
413
414         rpc_clnt_set_transport(clnt, xprt, timeout);
415         xprt->main = true;
416         xprt_iter_init(&clnt->cl_xpi, xps);
417         xprt_switch_put(xps);
418
419         clnt->cl_rtt = &clnt->cl_rtt_default;
420         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
421
422         refcount_set(&clnt->cl_count, 1);
423
424         if (nodename == NULL)
425                 nodename = utsname()->nodename;
426         /* save the nodename */
427         rpc_clnt_set_nodename(clnt, nodename);
428
429         rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
430         err = rpc_client_register(clnt, args->authflavor, args->client_name);
431         if (err)
432                 goto out_no_path;
433         if (parent)
434                 refcount_inc(&parent->cl_count);
435
436         trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
437         return clnt;
438
439 out_no_path:
440         rpc_free_iostats(clnt->cl_metrics);
441 out_no_stats:
442         put_cred(clnt->cl_cred);
443         rpc_free_clid(clnt);
444 out_no_clid:
445         kfree(clnt);
446 out_err:
447         rpciod_down();
448 out_no_rpciod:
449         xprt_switch_put(xps);
450         xprt_put(xprt);
451         trace_rpc_clnt_new_err(program->name, args->servername, err);
452         return ERR_PTR(err);
453 }
454
455 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
456                                         struct rpc_xprt *xprt)
457 {
458         struct rpc_clnt *clnt = NULL;
459         struct rpc_xprt_switch *xps;
460
461         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
462                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
463                 xps = args->bc_xprt->xpt_bc_xps;
464                 xprt_switch_get(xps);
465         } else {
466                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
467                 if (xps == NULL) {
468                         xprt_put(xprt);
469                         return ERR_PTR(-ENOMEM);
470                 }
471                 if (xprt->bc_xprt) {
472                         xprt_switch_get(xps);
473                         xprt->bc_xprt->xpt_bc_xps = xps;
474                 }
475         }
476         clnt = rpc_new_client(args, xps, xprt, NULL);
477         if (IS_ERR(clnt))
478                 return clnt;
479
480         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
481                 int err = rpc_ping(clnt);
482                 if (err != 0) {
483                         rpc_shutdown_client(clnt);
484                         return ERR_PTR(err);
485                 }
486         }
487
488         clnt->cl_softrtry = 1;
489         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
490                 clnt->cl_softrtry = 0;
491                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
492                         clnt->cl_softerr = 1;
493         }
494
495         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
496                 clnt->cl_autobind = 1;
497         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
498                 clnt->cl_noretranstimeo = 1;
499         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
500                 clnt->cl_discrtry = 1;
501         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
502                 clnt->cl_chatty = 1;
503
504         return clnt;
505 }
506
507 /**
508  * rpc_create - create an RPC client and transport with one call
509  * @args: rpc_clnt create argument structure
510  *
511  * Creates and initializes an RPC transport and an RPC client.
512  *
513  * It can ping the server in order to determine if it is up, and to see if
514  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
515  * this behavior so asynchronous tasks can also use rpc_create.
516  */
517 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
518 {
519         struct rpc_xprt *xprt;
520         struct xprt_create xprtargs = {
521                 .net = args->net,
522                 .ident = args->protocol,
523                 .srcaddr = args->saddress,
524                 .dstaddr = args->address,
525                 .addrlen = args->addrsize,
526                 .servername = args->servername,
527                 .bc_xprt = args->bc_xprt,
528         };
529         char servername[48];
530         struct rpc_clnt *clnt;
531         int i;
532
533         if (args->bc_xprt) {
534                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
535                 xprt = args->bc_xprt->xpt_bc_xprt;
536                 if (xprt) {
537                         xprt_get(xprt);
538                         return rpc_create_xprt(args, xprt);
539                 }
540         }
541
542         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
543                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
544         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
545                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
546         /*
547          * If the caller chooses not to specify a hostname, whip
548          * up a string representation of the passed-in address.
549          */
550         if (xprtargs.servername == NULL) {
551                 struct sockaddr_un *sun =
552                                 (struct sockaddr_un *)args->address;
553                 struct sockaddr_in *sin =
554                                 (struct sockaddr_in *)args->address;
555                 struct sockaddr_in6 *sin6 =
556                                 (struct sockaddr_in6 *)args->address;
557
558                 servername[0] = '\0';
559                 switch (args->address->sa_family) {
560                 case AF_LOCAL:
561                         snprintf(servername, sizeof(servername), "%s",
562                                  sun->sun_path);
563                         break;
564                 case AF_INET:
565                         snprintf(servername, sizeof(servername), "%pI4",
566                                  &sin->sin_addr.s_addr);
567                         break;
568                 case AF_INET6:
569                         snprintf(servername, sizeof(servername), "%pI6",
570                                  &sin6->sin6_addr);
571                         break;
572                 default:
573                         /* caller wants default server name, but
574                          * address family isn't recognized. */
575                         return ERR_PTR(-EINVAL);
576                 }
577                 xprtargs.servername = servername;
578         }
579
580         xprt = xprt_create_transport(&xprtargs);
581         if (IS_ERR(xprt))
582                 return (struct rpc_clnt *)xprt;
583
584         /*
585          * By default, kernel RPC client connects from a reserved port.
586          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
587          * but it is always enabled for rpciod, which handles the connect
588          * operation.
589          */
590         xprt->resvport = 1;
591         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
592                 xprt->resvport = 0;
593         xprt->reuseport = 0;
594         if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
595                 xprt->reuseport = 1;
596
597         clnt = rpc_create_xprt(args, xprt);
598         if (IS_ERR(clnt) || args->nconnect <= 1)
599                 return clnt;
600
601         for (i = 0; i < args->nconnect - 1; i++) {
602                 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
603                         break;
604         }
605         return clnt;
606 }
607 EXPORT_SYMBOL_GPL(rpc_create);
608
609 /*
610  * This function clones the RPC client structure. It allows us to share the
611  * same transport while varying parameters such as the authentication
612  * flavour.
613  */
614 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
615                                            struct rpc_clnt *clnt)
616 {
617         struct rpc_xprt_switch *xps;
618         struct rpc_xprt *xprt;
619         struct rpc_clnt *new;
620         int err;
621
622         err = -ENOMEM;
623         rcu_read_lock();
624         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
625         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
626         rcu_read_unlock();
627         if (xprt == NULL || xps == NULL) {
628                 xprt_put(xprt);
629                 xprt_switch_put(xps);
630                 goto out_err;
631         }
632         args->servername = xprt->servername;
633         args->nodename = clnt->cl_nodename;
634
635         new = rpc_new_client(args, xps, xprt, clnt);
636         if (IS_ERR(new))
637                 return new;
638
639         /* Turn off autobind on clones */
640         new->cl_autobind = 0;
641         new->cl_softrtry = clnt->cl_softrtry;
642         new->cl_softerr = clnt->cl_softerr;
643         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
644         new->cl_discrtry = clnt->cl_discrtry;
645         new->cl_chatty = clnt->cl_chatty;
646         new->cl_principal = clnt->cl_principal;
647         return new;
648
649 out_err:
650         trace_rpc_clnt_clone_err(clnt, err);
651         return ERR_PTR(err);
652 }
653
654 /**
655  * rpc_clone_client - Clone an RPC client structure
656  *
657  * @clnt: RPC client whose parameters are copied
658  *
659  * Returns a fresh RPC client or an ERR_PTR.
660  */
661 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
662 {
663         struct rpc_create_args args = {
664                 .program        = clnt->cl_program,
665                 .prognumber     = clnt->cl_prog,
666                 .version        = clnt->cl_vers,
667                 .authflavor     = clnt->cl_auth->au_flavor,
668                 .cred           = clnt->cl_cred,
669         };
670         return __rpc_clone_client(&args, clnt);
671 }
672 EXPORT_SYMBOL_GPL(rpc_clone_client);
673
674 /**
675  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
676  *
677  * @clnt: RPC client whose parameters are copied
678  * @flavor: security flavor for new client
679  *
680  * Returns a fresh RPC client or an ERR_PTR.
681  */
682 struct rpc_clnt *
683 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
684 {
685         struct rpc_create_args args = {
686                 .program        = clnt->cl_program,
687                 .prognumber     = clnt->cl_prog,
688                 .version        = clnt->cl_vers,
689                 .authflavor     = flavor,
690                 .cred           = clnt->cl_cred,
691         };
692         return __rpc_clone_client(&args, clnt);
693 }
694 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
695
696 /**
697  * rpc_switch_client_transport: switch the RPC transport on the fly
698  * @clnt: pointer to a struct rpc_clnt
699  * @args: pointer to the new transport arguments
700  * @timeout: pointer to the new timeout parameters
701  *
702  * This function allows the caller to switch the RPC transport for the
703  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
704  * server, for instance.  It assumes that the caller has ensured that
705  * there are no active RPC tasks by using some form of locking.
706  *
707  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
708  * negative errno is returned, and "clnt" continues to use the old
709  * xprt.
710  */
711 int rpc_switch_client_transport(struct rpc_clnt *clnt,
712                 struct xprt_create *args,
713                 const struct rpc_timeout *timeout)
714 {
715         const struct rpc_timeout *old_timeo;
716         rpc_authflavor_t pseudoflavor;
717         struct rpc_xprt_switch *xps, *oldxps;
718         struct rpc_xprt *xprt, *old;
719         struct rpc_clnt *parent;
720         int err;
721
722         xprt = xprt_create_transport(args);
723         if (IS_ERR(xprt))
724                 return PTR_ERR(xprt);
725
726         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
727         if (xps == NULL) {
728                 xprt_put(xprt);
729                 return -ENOMEM;
730         }
731
732         pseudoflavor = clnt->cl_auth->au_flavor;
733
734         old_timeo = clnt->cl_timeout;
735         old = rpc_clnt_set_transport(clnt, xprt, timeout);
736         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
737
738         rpc_unregister_client(clnt);
739         __rpc_clnt_remove_pipedir(clnt);
740         rpc_sysfs_client_destroy(clnt);
741         rpc_clnt_debugfs_unregister(clnt);
742
743         /*
744          * A new transport was created.  "clnt" therefore
745          * becomes the root of a new cl_parent tree.  clnt's
746          * children, if it has any, still point to the old xprt.
747          */
748         parent = clnt->cl_parent;
749         clnt->cl_parent = clnt;
750
751         /*
752          * The old rpc_auth cache cannot be re-used.  GSS
753          * contexts in particular are between a single
754          * client and server.
755          */
756         err = rpc_client_register(clnt, pseudoflavor, NULL);
757         if (err)
758                 goto out_revert;
759
760         synchronize_rcu();
761         if (parent != clnt)
762                 rpc_release_client(parent);
763         xprt_switch_put(oldxps);
764         xprt_put(old);
765         trace_rpc_clnt_replace_xprt(clnt);
766         return 0;
767
768 out_revert:
769         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
770         rpc_clnt_set_transport(clnt, old, old_timeo);
771         clnt->cl_parent = parent;
772         rpc_client_register(clnt, pseudoflavor, NULL);
773         xprt_switch_put(xps);
774         xprt_put(xprt);
775         trace_rpc_clnt_replace_xprt_err(clnt);
776         return err;
777 }
778 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
779
780 static
781 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
782 {
783         struct rpc_xprt_switch *xps;
784
785         rcu_read_lock();
786         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
787         rcu_read_unlock();
788         if (xps == NULL)
789                 return -EAGAIN;
790         xprt_iter_init_listall(xpi, xps);
791         xprt_switch_put(xps);
792         return 0;
793 }
794
795 /**
796  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
797  * @clnt: pointer to client
798  * @fn: function to apply
799  * @data: void pointer to function data
800  *
801  * Iterates through the list of RPC transports currently attached to the
802  * client and applies the function fn(clnt, xprt, data).
803  *
804  * On error, the iteration stops, and the function returns the error value.
805  */
806 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
807                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
808                 void *data)
809 {
810         struct rpc_xprt_iter xpi;
811         int ret;
812
813         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
814         if (ret)
815                 return ret;
816         for (;;) {
817                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
818
819                 if (!xprt)
820                         break;
821                 ret = fn(clnt, xprt, data);
822                 xprt_put(xprt);
823                 if (ret < 0)
824                         break;
825         }
826         xprt_iter_destroy(&xpi);
827         return ret;
828 }
829 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
830
831 /*
832  * Kill all tasks for the given client.
833  * XXX: kill their descendants as well?
834  */
835 void rpc_killall_tasks(struct rpc_clnt *clnt)
836 {
837         struct rpc_task *rovr;
838
839
840         if (list_empty(&clnt->cl_tasks))
841                 return;
842
843         /*
844          * Spin lock all_tasks to prevent changes...
845          */
846         trace_rpc_clnt_killall(clnt);
847         spin_lock(&clnt->cl_lock);
848         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
849                 rpc_signal_task(rovr);
850         spin_unlock(&clnt->cl_lock);
851 }
852 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
853
854 /*
855  * Properly shut down an RPC client, terminating all outstanding
856  * requests.
857  */
858 void rpc_shutdown_client(struct rpc_clnt *clnt)
859 {
860         might_sleep();
861
862         trace_rpc_clnt_shutdown(clnt);
863
864         while (!list_empty(&clnt->cl_tasks)) {
865                 rpc_killall_tasks(clnt);
866                 wait_event_timeout(destroy_wait,
867                         list_empty(&clnt->cl_tasks), 1*HZ);
868         }
869
870         rpc_release_client(clnt);
871 }
872 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
873
874 /*
875  * Free an RPC client
876  */
877 static void rpc_free_client_work(struct work_struct *work)
878 {
879         struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
880
881         trace_rpc_clnt_free(clnt);
882
883         /* These might block on processes that might allocate memory,
884          * so they cannot be called in rpciod, so they are handled separately
885          * here.
886          */
887         rpc_sysfs_client_destroy(clnt);
888         rpc_clnt_debugfs_unregister(clnt);
889         rpc_free_clid(clnt);
890         rpc_clnt_remove_pipedir(clnt);
891         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
892
893         kfree(clnt);
894         rpciod_down();
895 }
896 static struct rpc_clnt *
897 rpc_free_client(struct rpc_clnt *clnt)
898 {
899         struct rpc_clnt *parent = NULL;
900
901         trace_rpc_clnt_release(clnt);
902         if (clnt->cl_parent != clnt)
903                 parent = clnt->cl_parent;
904         rpc_unregister_client(clnt);
905         rpc_free_iostats(clnt->cl_metrics);
906         clnt->cl_metrics = NULL;
907         xprt_iter_destroy(&clnt->cl_xpi);
908         put_cred(clnt->cl_cred);
909
910         INIT_WORK(&clnt->cl_work, rpc_free_client_work);
911         schedule_work(&clnt->cl_work);
912         return parent;
913 }
914
915 /*
916  * Free an RPC client
917  */
918 static struct rpc_clnt *
919 rpc_free_auth(struct rpc_clnt *clnt)
920 {
921         /*
922          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
923          *       release remaining GSS contexts. This mechanism ensures
924          *       that it can do so safely.
925          */
926         if (clnt->cl_auth != NULL) {
927                 rpcauth_release(clnt->cl_auth);
928                 clnt->cl_auth = NULL;
929         }
930         if (refcount_dec_and_test(&clnt->cl_count))
931                 return rpc_free_client(clnt);
932         return NULL;
933 }
934
935 /*
936  * Release reference to the RPC client
937  */
938 void
939 rpc_release_client(struct rpc_clnt *clnt)
940 {
941         do {
942                 if (list_empty(&clnt->cl_tasks))
943                         wake_up(&destroy_wait);
944                 if (refcount_dec_not_one(&clnt->cl_count))
945                         break;
946                 clnt = rpc_free_auth(clnt);
947         } while (clnt != NULL);
948 }
949 EXPORT_SYMBOL_GPL(rpc_release_client);
950
951 /**
952  * rpc_bind_new_program - bind a new RPC program to an existing client
953  * @old: old rpc_client
954  * @program: rpc program to set
955  * @vers: rpc program version
956  *
957  * Clones the rpc client and sets up a new RPC program. This is mainly
958  * of use for enabling different RPC programs to share the same transport.
959  * The Sun NFSv2/v3 ACL protocol can do this.
960  */
961 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
962                                       const struct rpc_program *program,
963                                       u32 vers)
964 {
965         struct rpc_create_args args = {
966                 .program        = program,
967                 .prognumber     = program->number,
968                 .version        = vers,
969                 .authflavor     = old->cl_auth->au_flavor,
970                 .cred           = old->cl_cred,
971         };
972         struct rpc_clnt *clnt;
973         int err;
974
975         clnt = __rpc_clone_client(&args, old);
976         if (IS_ERR(clnt))
977                 goto out;
978         err = rpc_ping(clnt);
979         if (err != 0) {
980                 rpc_shutdown_client(clnt);
981                 clnt = ERR_PTR(err);
982         }
983 out:
984         return clnt;
985 }
986 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
987
988 struct rpc_xprt *
989 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
990 {
991         struct rpc_xprt_switch *xps;
992
993         if (!xprt)
994                 return NULL;
995         rcu_read_lock();
996         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
997         atomic_long_inc(&xps->xps_queuelen);
998         rcu_read_unlock();
999         atomic_long_inc(&xprt->queuelen);
1000
1001         return xprt;
1002 }
1003
1004 static void
1005 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1006 {
1007         struct rpc_xprt_switch *xps;
1008
1009         atomic_long_dec(&xprt->queuelen);
1010         rcu_read_lock();
1011         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1012         atomic_long_dec(&xps->xps_queuelen);
1013         rcu_read_unlock();
1014
1015         xprt_put(xprt);
1016 }
1017
1018 void rpc_task_release_transport(struct rpc_task *task)
1019 {
1020         struct rpc_xprt *xprt = task->tk_xprt;
1021
1022         if (xprt) {
1023                 task->tk_xprt = NULL;
1024                 if (task->tk_client)
1025                         rpc_task_release_xprt(task->tk_client, xprt);
1026                 else
1027                         xprt_put(xprt);
1028         }
1029 }
1030 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1031
1032 void rpc_task_release_client(struct rpc_task *task)
1033 {
1034         struct rpc_clnt *clnt = task->tk_client;
1035
1036         rpc_task_release_transport(task);
1037         if (clnt != NULL) {
1038                 /* Remove from client task list */
1039                 spin_lock(&clnt->cl_lock);
1040                 list_del(&task->tk_task);
1041                 spin_unlock(&clnt->cl_lock);
1042                 task->tk_client = NULL;
1043
1044                 rpc_release_client(clnt);
1045         }
1046 }
1047
1048 static struct rpc_xprt *
1049 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1050 {
1051         struct rpc_xprt *xprt;
1052
1053         rcu_read_lock();
1054         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1055         rcu_read_unlock();
1056         return rpc_task_get_xprt(clnt, xprt);
1057 }
1058
1059 static struct rpc_xprt *
1060 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1061 {
1062         return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1063 }
1064
1065 static
1066 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1067 {
1068         if (task->tk_xprt) {
1069                 if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1070                       (task->tk_flags & RPC_TASK_MOVEABLE)))
1071                         return;
1072                 xprt_release(task);
1073                 xprt_put(task->tk_xprt);
1074         }
1075         if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1076                 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1077         else
1078                 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1079 }
1080
1081 static
1082 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1083 {
1084
1085         if (clnt != NULL) {
1086                 rpc_task_set_transport(task, clnt);
1087                 task->tk_client = clnt;
1088                 refcount_inc(&clnt->cl_count);
1089                 if (clnt->cl_softrtry)
1090                         task->tk_flags |= RPC_TASK_SOFT;
1091                 if (clnt->cl_softerr)
1092                         task->tk_flags |= RPC_TASK_TIMEOUT;
1093                 if (clnt->cl_noretranstimeo)
1094                         task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1095                 if (atomic_read(&clnt->cl_swapper))
1096                         task->tk_flags |= RPC_TASK_SWAPPER;
1097                 /* Add to the client's list of all tasks */
1098                 spin_lock(&clnt->cl_lock);
1099                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
1100                 spin_unlock(&clnt->cl_lock);
1101         }
1102 }
1103
1104 static void
1105 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1106 {
1107         if (msg != NULL) {
1108                 task->tk_msg.rpc_proc = msg->rpc_proc;
1109                 task->tk_msg.rpc_argp = msg->rpc_argp;
1110                 task->tk_msg.rpc_resp = msg->rpc_resp;
1111                 task->tk_msg.rpc_cred = msg->rpc_cred;
1112                 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1113                         get_cred(task->tk_msg.rpc_cred);
1114         }
1115 }
1116
1117 /*
1118  * Default callback for async RPC calls
1119  */
1120 static void
1121 rpc_default_callback(struct rpc_task *task, void *data)
1122 {
1123 }
1124
1125 static const struct rpc_call_ops rpc_default_ops = {
1126         .rpc_call_done = rpc_default_callback,
1127 };
1128
1129 /**
1130  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1131  * @task_setup_data: pointer to task initialisation data
1132  */
1133 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1134 {
1135         struct rpc_task *task;
1136
1137         task = rpc_new_task(task_setup_data);
1138
1139         if (!RPC_IS_ASYNC(task))
1140                 task->tk_flags |= RPC_TASK_CRED_NOREF;
1141
1142         rpc_task_set_client(task, task_setup_data->rpc_client);
1143         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1144
1145         if (task->tk_action == NULL)
1146                 rpc_call_start(task);
1147
1148         atomic_inc(&task->tk_count);
1149         rpc_execute(task);
1150         return task;
1151 }
1152 EXPORT_SYMBOL_GPL(rpc_run_task);
1153
1154 /**
1155  * rpc_call_sync - Perform a synchronous RPC call
1156  * @clnt: pointer to RPC client
1157  * @msg: RPC call parameters
1158  * @flags: RPC call flags
1159  */
1160 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1161 {
1162         struct rpc_task *task;
1163         struct rpc_task_setup task_setup_data = {
1164                 .rpc_client = clnt,
1165                 .rpc_message = msg,
1166                 .callback_ops = &rpc_default_ops,
1167                 .flags = flags,
1168         };
1169         int status;
1170
1171         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1172         if (flags & RPC_TASK_ASYNC) {
1173                 rpc_release_calldata(task_setup_data.callback_ops,
1174                         task_setup_data.callback_data);
1175                 return -EINVAL;
1176         }
1177
1178         task = rpc_run_task(&task_setup_data);
1179         if (IS_ERR(task))
1180                 return PTR_ERR(task);
1181         status = task->tk_status;
1182         rpc_put_task(task);
1183         return status;
1184 }
1185 EXPORT_SYMBOL_GPL(rpc_call_sync);
1186
1187 /**
1188  * rpc_call_async - Perform an asynchronous RPC call
1189  * @clnt: pointer to RPC client
1190  * @msg: RPC call parameters
1191  * @flags: RPC call flags
1192  * @tk_ops: RPC call ops
1193  * @data: user call data
1194  */
1195 int
1196 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1197                const struct rpc_call_ops *tk_ops, void *data)
1198 {
1199         struct rpc_task *task;
1200         struct rpc_task_setup task_setup_data = {
1201                 .rpc_client = clnt,
1202                 .rpc_message = msg,
1203                 .callback_ops = tk_ops,
1204                 .callback_data = data,
1205                 .flags = flags|RPC_TASK_ASYNC,
1206         };
1207
1208         task = rpc_run_task(&task_setup_data);
1209         if (IS_ERR(task))
1210                 return PTR_ERR(task);
1211         rpc_put_task(task);
1212         return 0;
1213 }
1214 EXPORT_SYMBOL_GPL(rpc_call_async);
1215
1216 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1217 static void call_bc_encode(struct rpc_task *task);
1218
1219 /**
1220  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1221  * rpc_execute against it
1222  * @req: RPC request
1223  */
1224 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1225 {
1226         struct rpc_task *task;
1227         struct rpc_task_setup task_setup_data = {
1228                 .callback_ops = &rpc_default_ops,
1229                 .flags = RPC_TASK_SOFTCONN |
1230                         RPC_TASK_NO_RETRANS_TIMEOUT,
1231         };
1232
1233         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1234         /*
1235          * Create an rpc_task to send the data
1236          */
1237         task = rpc_new_task(&task_setup_data);
1238         xprt_init_bc_request(req, task);
1239
1240         task->tk_action = call_bc_encode;
1241         atomic_inc(&task->tk_count);
1242         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1243         rpc_execute(task);
1244
1245         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1246         return task;
1247 }
1248 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1249
1250 /**
1251  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1252  * @req: RPC request to prepare
1253  * @pages: vector of struct page pointers
1254  * @base: offset in first page where receive should start, in bytes
1255  * @len: expected size of the upper layer data payload, in bytes
1256  * @hdrsize: expected size of upper layer reply header, in XDR words
1257  *
1258  */
1259 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1260                              unsigned int base, unsigned int len,
1261                              unsigned int hdrsize)
1262 {
1263         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1264
1265         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1266         trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1267 }
1268 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1269
1270 void
1271 rpc_call_start(struct rpc_task *task)
1272 {
1273         task->tk_action = call_start;
1274 }
1275 EXPORT_SYMBOL_GPL(rpc_call_start);
1276
1277 /**
1278  * rpc_peeraddr - extract remote peer address from clnt's xprt
1279  * @clnt: RPC client structure
1280  * @buf: target buffer
1281  * @bufsize: length of target buffer
1282  *
1283  * Returns the number of bytes that are actually in the stored address.
1284  */
1285 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1286 {
1287         size_t bytes;
1288         struct rpc_xprt *xprt;
1289
1290         rcu_read_lock();
1291         xprt = rcu_dereference(clnt->cl_xprt);
1292
1293         bytes = xprt->addrlen;
1294         if (bytes > bufsize)
1295                 bytes = bufsize;
1296         memcpy(buf, &xprt->addr, bytes);
1297         rcu_read_unlock();
1298
1299         return bytes;
1300 }
1301 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1302
1303 /**
1304  * rpc_peeraddr2str - return remote peer address in printable format
1305  * @clnt: RPC client structure
1306  * @format: address format
1307  *
1308  * NB: the lifetime of the memory referenced by the returned pointer is
1309  * the same as the rpc_xprt itself.  As long as the caller uses this
1310  * pointer, it must hold the RCU read lock.
1311  */
1312 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1313                              enum rpc_display_format_t format)
1314 {
1315         struct rpc_xprt *xprt;
1316
1317         xprt = rcu_dereference(clnt->cl_xprt);
1318
1319         if (xprt->address_strings[format] != NULL)
1320                 return xprt->address_strings[format];
1321         else
1322                 return "unprintable";
1323 }
1324 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1325
1326 static const struct sockaddr_in rpc_inaddr_loopback = {
1327         .sin_family             = AF_INET,
1328         .sin_addr.s_addr        = htonl(INADDR_ANY),
1329 };
1330
1331 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1332         .sin6_family            = AF_INET6,
1333         .sin6_addr              = IN6ADDR_ANY_INIT,
1334 };
1335
1336 /*
1337  * Try a getsockname() on a connected datagram socket.  Using a
1338  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1339  * This conserves the ephemeral port number space.
1340  *
1341  * Returns zero and fills in "buf" if successful; otherwise, a
1342  * negative errno is returned.
1343  */
1344 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1345                         struct sockaddr *buf)
1346 {
1347         struct socket *sock;
1348         int err;
1349
1350         err = __sock_create(net, sap->sa_family,
1351                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1352         if (err < 0) {
1353                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1354                 goto out;
1355         }
1356
1357         switch (sap->sa_family) {
1358         case AF_INET:
1359                 err = kernel_bind(sock,
1360                                 (struct sockaddr *)&rpc_inaddr_loopback,
1361                                 sizeof(rpc_inaddr_loopback));
1362                 break;
1363         case AF_INET6:
1364                 err = kernel_bind(sock,
1365                                 (struct sockaddr *)&rpc_in6addr_loopback,
1366                                 sizeof(rpc_in6addr_loopback));
1367                 break;
1368         default:
1369                 err = -EAFNOSUPPORT;
1370                 goto out;
1371         }
1372         if (err < 0) {
1373                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1374                 goto out_release;
1375         }
1376
1377         err = kernel_connect(sock, sap, salen, 0);
1378         if (err < 0) {
1379                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1380                 goto out_release;
1381         }
1382
1383         err = kernel_getsockname(sock, buf);
1384         if (err < 0) {
1385                 dprintk("RPC:       getsockname failed (%d)\n", err);
1386                 goto out_release;
1387         }
1388
1389         err = 0;
1390         if (buf->sa_family == AF_INET6) {
1391                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1392                 sin6->sin6_scope_id = 0;
1393         }
1394         dprintk("RPC:       %s succeeded\n", __func__);
1395
1396 out_release:
1397         sock_release(sock);
1398 out:
1399         return err;
1400 }
1401
1402 /*
1403  * Scraping a connected socket failed, so we don't have a useable
1404  * local address.  Fallback: generate an address that will prevent
1405  * the server from calling us back.
1406  *
1407  * Returns zero and fills in "buf" if successful; otherwise, a
1408  * negative errno is returned.
1409  */
1410 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1411 {
1412         switch (family) {
1413         case AF_INET:
1414                 if (buflen < sizeof(rpc_inaddr_loopback))
1415                         return -EINVAL;
1416                 memcpy(buf, &rpc_inaddr_loopback,
1417                                 sizeof(rpc_inaddr_loopback));
1418                 break;
1419         case AF_INET6:
1420                 if (buflen < sizeof(rpc_in6addr_loopback))
1421                         return -EINVAL;
1422                 memcpy(buf, &rpc_in6addr_loopback,
1423                                 sizeof(rpc_in6addr_loopback));
1424                 break;
1425         default:
1426                 dprintk("RPC:       %s: address family not supported\n",
1427                         __func__);
1428                 return -EAFNOSUPPORT;
1429         }
1430         dprintk("RPC:       %s: succeeded\n", __func__);
1431         return 0;
1432 }
1433
1434 /**
1435  * rpc_localaddr - discover local endpoint address for an RPC client
1436  * @clnt: RPC client structure
1437  * @buf: target buffer
1438  * @buflen: size of target buffer, in bytes
1439  *
1440  * Returns zero and fills in "buf" and "buflen" if successful;
1441  * otherwise, a negative errno is returned.
1442  *
1443  * This works even if the underlying transport is not currently connected,
1444  * or if the upper layer never previously provided a source address.
1445  *
1446  * The result of this function call is transient: multiple calls in
1447  * succession may give different results, depending on how local
1448  * networking configuration changes over time.
1449  */
1450 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1451 {
1452         struct sockaddr_storage address;
1453         struct sockaddr *sap = (struct sockaddr *)&address;
1454         struct rpc_xprt *xprt;
1455         struct net *net;
1456         size_t salen;
1457         int err;
1458
1459         rcu_read_lock();
1460         xprt = rcu_dereference(clnt->cl_xprt);
1461         salen = xprt->addrlen;
1462         memcpy(sap, &xprt->addr, salen);
1463         net = get_net(xprt->xprt_net);
1464         rcu_read_unlock();
1465
1466         rpc_set_port(sap, 0);
1467         err = rpc_sockname(net, sap, salen, buf);
1468         put_net(net);
1469         if (err != 0)
1470                 /* Couldn't discover local address, return ANYADDR */
1471                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1472         return 0;
1473 }
1474 EXPORT_SYMBOL_GPL(rpc_localaddr);
1475
1476 void
1477 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1478 {
1479         struct rpc_xprt *xprt;
1480
1481         rcu_read_lock();
1482         xprt = rcu_dereference(clnt->cl_xprt);
1483         if (xprt->ops->set_buffer_size)
1484                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1485         rcu_read_unlock();
1486 }
1487 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1488
1489 /**
1490  * rpc_net_ns - Get the network namespace for this RPC client
1491  * @clnt: RPC client to query
1492  *
1493  */
1494 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1495 {
1496         struct net *ret;
1497
1498         rcu_read_lock();
1499         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1500         rcu_read_unlock();
1501         return ret;
1502 }
1503 EXPORT_SYMBOL_GPL(rpc_net_ns);
1504
1505 /**
1506  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1507  * @clnt: RPC client to query
1508  *
1509  * For stream transports, this is one RPC record fragment (see RFC
1510  * 1831), as we don't support multi-record requests yet.  For datagram
1511  * transports, this is the size of an IP packet minus the IP, UDP, and
1512  * RPC header sizes.
1513  */
1514 size_t rpc_max_payload(struct rpc_clnt *clnt)
1515 {
1516         size_t ret;
1517
1518         rcu_read_lock();
1519         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1520         rcu_read_unlock();
1521         return ret;
1522 }
1523 EXPORT_SYMBOL_GPL(rpc_max_payload);
1524
1525 /**
1526  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1527  * @clnt: RPC client to query
1528  */
1529 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1530 {
1531         struct rpc_xprt *xprt;
1532         size_t ret;
1533
1534         rcu_read_lock();
1535         xprt = rcu_dereference(clnt->cl_xprt);
1536         ret = xprt->ops->bc_maxpayload(xprt);
1537         rcu_read_unlock();
1538         return ret;
1539 }
1540 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1541
1542 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1543 {
1544         struct rpc_xprt *xprt;
1545         unsigned int ret;
1546
1547         rcu_read_lock();
1548         xprt = rcu_dereference(clnt->cl_xprt);
1549         ret = xprt->ops->bc_num_slots(xprt);
1550         rcu_read_unlock();
1551         return ret;
1552 }
1553 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1554
1555 /**
1556  * rpc_force_rebind - force transport to check that remote port is unchanged
1557  * @clnt: client to rebind
1558  *
1559  */
1560 void rpc_force_rebind(struct rpc_clnt *clnt)
1561 {
1562         if (clnt->cl_autobind) {
1563                 rcu_read_lock();
1564                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1565                 rcu_read_unlock();
1566         }
1567 }
1568 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1569
1570 static int
1571 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1572 {
1573         task->tk_status = 0;
1574         task->tk_rpc_status = 0;
1575         task->tk_action = action;
1576         return 1;
1577 }
1578
1579 /*
1580  * Restart an (async) RPC call. Usually called from within the
1581  * exit handler.
1582  */
1583 int
1584 rpc_restart_call(struct rpc_task *task)
1585 {
1586         return __rpc_restart_call(task, call_start);
1587 }
1588 EXPORT_SYMBOL_GPL(rpc_restart_call);
1589
1590 /*
1591  * Restart an (async) RPC call from the call_prepare state.
1592  * Usually called from within the exit handler.
1593  */
1594 int
1595 rpc_restart_call_prepare(struct rpc_task *task)
1596 {
1597         if (task->tk_ops->rpc_call_prepare != NULL)
1598                 return __rpc_restart_call(task, rpc_prepare_task);
1599         return rpc_restart_call(task);
1600 }
1601 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1602
1603 const char
1604 *rpc_proc_name(const struct rpc_task *task)
1605 {
1606         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1607
1608         if (proc) {
1609                 if (proc->p_name)
1610                         return proc->p_name;
1611                 else
1612                         return "NULL";
1613         } else
1614                 return "no proc";
1615 }
1616
1617 static void
1618 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1619 {
1620         trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1621         task->tk_rpc_status = rpc_status;
1622         rpc_exit(task, tk_status);
1623 }
1624
1625 static void
1626 rpc_call_rpcerror(struct rpc_task *task, int status)
1627 {
1628         __rpc_call_rpcerror(task, status, status);
1629 }
1630
1631 /*
1632  * 0.  Initial state
1633  *
1634  *     Other FSM states can be visited zero or more times, but
1635  *     this state is visited exactly once for each RPC.
1636  */
1637 static void
1638 call_start(struct rpc_task *task)
1639 {
1640         struct rpc_clnt *clnt = task->tk_client;
1641         int idx = task->tk_msg.rpc_proc->p_statidx;
1642
1643         trace_rpc_request(task);
1644
1645         /* Increment call count (version might not be valid for ping) */
1646         if (clnt->cl_program->version[clnt->cl_vers])
1647                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1648         clnt->cl_stats->rpccnt++;
1649         task->tk_action = call_reserve;
1650         rpc_task_set_transport(task, clnt);
1651 }
1652
1653 /*
1654  * 1.   Reserve an RPC call slot
1655  */
1656 static void
1657 call_reserve(struct rpc_task *task)
1658 {
1659         task->tk_status  = 0;
1660         task->tk_action  = call_reserveresult;
1661         xprt_reserve(task);
1662 }
1663
1664 static void call_retry_reserve(struct rpc_task *task);
1665
1666 /*
1667  * 1b.  Grok the result of xprt_reserve()
1668  */
1669 static void
1670 call_reserveresult(struct rpc_task *task)
1671 {
1672         int status = task->tk_status;
1673
1674         /*
1675          * After a call to xprt_reserve(), we must have either
1676          * a request slot or else an error status.
1677          */
1678         task->tk_status = 0;
1679         if (status >= 0) {
1680                 if (task->tk_rqstp) {
1681                         task->tk_action = call_refresh;
1682                         return;
1683                 }
1684
1685                 rpc_call_rpcerror(task, -EIO);
1686                 return;
1687         }
1688
1689         switch (status) {
1690         case -ENOMEM:
1691                 rpc_delay(task, HZ >> 2);
1692                 fallthrough;
1693         case -EAGAIN:   /* woken up; retry */
1694                 task->tk_action = call_retry_reserve;
1695                 return;
1696         default:
1697                 rpc_call_rpcerror(task, status);
1698         }
1699 }
1700
1701 /*
1702  * 1c.  Retry reserving an RPC call slot
1703  */
1704 static void
1705 call_retry_reserve(struct rpc_task *task)
1706 {
1707         task->tk_status  = 0;
1708         task->tk_action  = call_reserveresult;
1709         xprt_retry_reserve(task);
1710 }
1711
1712 /*
1713  * 2.   Bind and/or refresh the credentials
1714  */
1715 static void
1716 call_refresh(struct rpc_task *task)
1717 {
1718         task->tk_action = call_refreshresult;
1719         task->tk_status = 0;
1720         task->tk_client->cl_stats->rpcauthrefresh++;
1721         rpcauth_refreshcred(task);
1722 }
1723
1724 /*
1725  * 2a.  Process the results of a credential refresh
1726  */
1727 static void
1728 call_refreshresult(struct rpc_task *task)
1729 {
1730         int status = task->tk_status;
1731
1732         task->tk_status = 0;
1733         task->tk_action = call_refresh;
1734         switch (status) {
1735         case 0:
1736                 if (rpcauth_uptodatecred(task)) {
1737                         task->tk_action = call_allocate;
1738                         return;
1739                 }
1740                 /* Use rate-limiting and a max number of retries if refresh
1741                  * had status 0 but failed to update the cred.
1742                  */
1743                 fallthrough;
1744         case -ETIMEDOUT:
1745                 rpc_delay(task, 3*HZ);
1746                 fallthrough;
1747         case -EAGAIN:
1748                 status = -EACCES;
1749                 fallthrough;
1750         case -EKEYEXPIRED:
1751                 if (!task->tk_cred_retry)
1752                         break;
1753                 task->tk_cred_retry--;
1754                 trace_rpc_retry_refresh_status(task);
1755                 return;
1756         }
1757         trace_rpc_refresh_status(task);
1758         rpc_call_rpcerror(task, status);
1759 }
1760
1761 /*
1762  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1763  *      (Note: buffer memory is freed in xprt_release).
1764  */
1765 static void
1766 call_allocate(struct rpc_task *task)
1767 {
1768         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1769         struct rpc_rqst *req = task->tk_rqstp;
1770         struct rpc_xprt *xprt = req->rq_xprt;
1771         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1772         int status;
1773
1774         task->tk_status = 0;
1775         task->tk_action = call_encode;
1776
1777         if (req->rq_buffer)
1778                 return;
1779
1780         if (proc->p_proc != 0) {
1781                 BUG_ON(proc->p_arglen == 0);
1782                 if (proc->p_decode != NULL)
1783                         BUG_ON(proc->p_replen == 0);
1784         }
1785
1786         /*
1787          * Calculate the size (in quads) of the RPC call
1788          * and reply headers, and convert both values
1789          * to byte sizes.
1790          */
1791         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1792                            proc->p_arglen;
1793         req->rq_callsize <<= 2;
1794         /*
1795          * Note: the reply buffer must at minimum allocate enough space
1796          * for the 'struct accepted_reply' from RFC5531.
1797          */
1798         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1799                         max_t(size_t, proc->p_replen, 2);
1800         req->rq_rcvsize <<= 2;
1801
1802         status = xprt->ops->buf_alloc(task);
1803         trace_rpc_buf_alloc(task, status);
1804         if (status == 0)
1805                 return;
1806         if (status != -ENOMEM) {
1807                 rpc_call_rpcerror(task, status);
1808                 return;
1809         }
1810
1811         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1812                 task->tk_action = call_allocate;
1813                 rpc_delay(task, HZ>>4);
1814                 return;
1815         }
1816
1817         rpc_call_rpcerror(task, -ERESTARTSYS);
1818 }
1819
1820 static int
1821 rpc_task_need_encode(struct rpc_task *task)
1822 {
1823         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1824                 (!(task->tk_flags & RPC_TASK_SENT) ||
1825                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1826                  xprt_request_need_retransmit(task));
1827 }
1828
1829 static void
1830 rpc_xdr_encode(struct rpc_task *task)
1831 {
1832         struct rpc_rqst *req = task->tk_rqstp;
1833         struct xdr_stream xdr;
1834
1835         xdr_buf_init(&req->rq_snd_buf,
1836                      req->rq_buffer,
1837                      req->rq_callsize);
1838         xdr_buf_init(&req->rq_rcv_buf,
1839                      req->rq_rbuffer,
1840                      req->rq_rcvsize);
1841
1842         req->rq_reply_bytes_recvd = 0;
1843         req->rq_snd_buf.head[0].iov_len = 0;
1844         xdr_init_encode(&xdr, &req->rq_snd_buf,
1845                         req->rq_snd_buf.head[0].iov_base, req);
1846         xdr_free_bvec(&req->rq_snd_buf);
1847         if (rpc_encode_header(task, &xdr))
1848                 return;
1849
1850         task->tk_status = rpcauth_wrap_req(task, &xdr);
1851 }
1852
1853 /*
1854  * 3.   Encode arguments of an RPC call
1855  */
1856 static void
1857 call_encode(struct rpc_task *task)
1858 {
1859         if (!rpc_task_need_encode(task))
1860                 goto out;
1861
1862         /* Dequeue task from the receive queue while we're encoding */
1863         xprt_request_dequeue_xprt(task);
1864         /* Encode here so that rpcsec_gss can use correct sequence number. */
1865         rpc_xdr_encode(task);
1866         /* Did the encode result in an error condition? */
1867         if (task->tk_status != 0) {
1868                 /* Was the error nonfatal? */
1869                 switch (task->tk_status) {
1870                 case -EAGAIN:
1871                 case -ENOMEM:
1872                         rpc_delay(task, HZ >> 4);
1873                         break;
1874                 case -EKEYEXPIRED:
1875                         if (!task->tk_cred_retry) {
1876                                 rpc_exit(task, task->tk_status);
1877                         } else {
1878                                 task->tk_action = call_refresh;
1879                                 task->tk_cred_retry--;
1880                                 trace_rpc_retry_refresh_status(task);
1881                         }
1882                         break;
1883                 default:
1884                         rpc_call_rpcerror(task, task->tk_status);
1885                 }
1886                 return;
1887         }
1888
1889         /* Add task to reply queue before transmission to avoid races */
1890         if (rpc_reply_expected(task))
1891                 xprt_request_enqueue_receive(task);
1892         xprt_request_enqueue_transmit(task);
1893 out:
1894         task->tk_action = call_transmit;
1895         /* Check that the connection is OK */
1896         if (!xprt_bound(task->tk_xprt))
1897                 task->tk_action = call_bind;
1898         else if (!xprt_connected(task->tk_xprt))
1899                 task->tk_action = call_connect;
1900 }
1901
1902 /*
1903  * Helpers to check if the task was already transmitted, and
1904  * to take action when that is the case.
1905  */
1906 static bool
1907 rpc_task_transmitted(struct rpc_task *task)
1908 {
1909         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1910 }
1911
1912 static void
1913 rpc_task_handle_transmitted(struct rpc_task *task)
1914 {
1915         xprt_end_transmit(task);
1916         task->tk_action = call_transmit_status;
1917 }
1918
1919 /*
1920  * 4.   Get the server port number if not yet set
1921  */
1922 static void
1923 call_bind(struct rpc_task *task)
1924 {
1925         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1926
1927         if (rpc_task_transmitted(task)) {
1928                 rpc_task_handle_transmitted(task);
1929                 return;
1930         }
1931
1932         if (xprt_bound(xprt)) {
1933                 task->tk_action = call_connect;
1934                 return;
1935         }
1936
1937         task->tk_action = call_bind_status;
1938         if (!xprt_prepare_transmit(task))
1939                 return;
1940
1941         xprt->ops->rpcbind(task);
1942 }
1943
1944 /*
1945  * 4a.  Sort out bind result
1946  */
1947 static void
1948 call_bind_status(struct rpc_task *task)
1949 {
1950         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1951         int status = -EIO;
1952
1953         if (rpc_task_transmitted(task)) {
1954                 rpc_task_handle_transmitted(task);
1955                 return;
1956         }
1957
1958         if (task->tk_status >= 0)
1959                 goto out_next;
1960         if (xprt_bound(xprt)) {
1961                 task->tk_status = 0;
1962                 goto out_next;
1963         }
1964
1965         switch (task->tk_status) {
1966         case -ENOMEM:
1967                 rpc_delay(task, HZ >> 2);
1968                 goto retry_timeout;
1969         case -EACCES:
1970                 trace_rpcb_prog_unavail_err(task);
1971                 /* fail immediately if this is an RPC ping */
1972                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1973                         status = -EOPNOTSUPP;
1974                         break;
1975                 }
1976                 if (task->tk_rebind_retry == 0)
1977                         break;
1978                 task->tk_rebind_retry--;
1979                 rpc_delay(task, 3*HZ);
1980                 goto retry_timeout;
1981         case -ENOBUFS:
1982                 rpc_delay(task, HZ >> 2);
1983                 goto retry_timeout;
1984         case -EAGAIN:
1985                 goto retry_timeout;
1986         case -ETIMEDOUT:
1987                 trace_rpcb_timeout_err(task);
1988                 goto retry_timeout;
1989         case -EPFNOSUPPORT:
1990                 /* server doesn't support any rpcbind version we know of */
1991                 trace_rpcb_bind_version_err(task);
1992                 break;
1993         case -EPROTONOSUPPORT:
1994                 trace_rpcb_bind_version_err(task);
1995                 goto retry_timeout;
1996         case -ECONNREFUSED:             /* connection problems */
1997         case -ECONNRESET:
1998         case -ECONNABORTED:
1999         case -ENOTCONN:
2000         case -EHOSTDOWN:
2001         case -ENETDOWN:
2002         case -EHOSTUNREACH:
2003         case -ENETUNREACH:
2004         case -EPIPE:
2005                 trace_rpcb_unreachable_err(task);
2006                 if (!RPC_IS_SOFTCONN(task)) {
2007                         rpc_delay(task, 5*HZ);
2008                         goto retry_timeout;
2009                 }
2010                 status = task->tk_status;
2011                 break;
2012         default:
2013                 trace_rpcb_unrecognized_err(task);
2014         }
2015
2016         rpc_call_rpcerror(task, status);
2017         return;
2018 out_next:
2019         task->tk_action = call_connect;
2020         return;
2021 retry_timeout:
2022         task->tk_status = 0;
2023         task->tk_action = call_bind;
2024         rpc_check_timeout(task);
2025 }
2026
2027 /*
2028  * 4b.  Connect to the RPC server
2029  */
2030 static void
2031 call_connect(struct rpc_task *task)
2032 {
2033         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2034
2035         if (rpc_task_transmitted(task)) {
2036                 rpc_task_handle_transmitted(task);
2037                 return;
2038         }
2039
2040         if (xprt_connected(xprt)) {
2041                 task->tk_action = call_transmit;
2042                 return;
2043         }
2044
2045         task->tk_action = call_connect_status;
2046         if (task->tk_status < 0)
2047                 return;
2048         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2049                 rpc_call_rpcerror(task, -ENOTCONN);
2050                 return;
2051         }
2052         if (!xprt_prepare_transmit(task))
2053                 return;
2054         xprt_connect(task);
2055 }
2056
2057 /*
2058  * 4c.  Sort out connect result
2059  */
2060 static void
2061 call_connect_status(struct rpc_task *task)
2062 {
2063         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2064         struct rpc_clnt *clnt = task->tk_client;
2065         int status = task->tk_status;
2066
2067         if (rpc_task_transmitted(task)) {
2068                 rpc_task_handle_transmitted(task);
2069                 return;
2070         }
2071
2072         trace_rpc_connect_status(task);
2073
2074         if (task->tk_status == 0) {
2075                 clnt->cl_stats->netreconn++;
2076                 goto out_next;
2077         }
2078         if (xprt_connected(xprt)) {
2079                 task->tk_status = 0;
2080                 goto out_next;
2081         }
2082
2083         task->tk_status = 0;
2084         switch (status) {
2085         case -ECONNREFUSED:
2086                 /* A positive refusal suggests a rebind is needed. */
2087                 if (RPC_IS_SOFTCONN(task))
2088                         break;
2089                 if (clnt->cl_autobind) {
2090                         rpc_force_rebind(clnt);
2091                         goto out_retry;
2092                 }
2093                 fallthrough;
2094         case -ECONNRESET:
2095         case -ECONNABORTED:
2096         case -ENETDOWN:
2097         case -ENETUNREACH:
2098         case -EHOSTUNREACH:
2099         case -EPIPE:
2100         case -EPROTO:
2101                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2102                                             task->tk_rqstp->rq_connect_cookie);
2103                 if (RPC_IS_SOFTCONN(task))
2104                         break;
2105                 /* retry with existing socket, after a delay */
2106                 rpc_delay(task, 3*HZ);
2107                 fallthrough;
2108         case -EADDRINUSE:
2109         case -ENOTCONN:
2110         case -EAGAIN:
2111         case -ETIMEDOUT:
2112                 if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2113                     (task->tk_flags & RPC_TASK_MOVEABLE) &&
2114                     test_bit(XPRT_REMOVE, &xprt->state)) {
2115                         struct rpc_xprt *saved = task->tk_xprt;
2116                         struct rpc_xprt_switch *xps;
2117
2118                         rcu_read_lock();
2119                         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2120                         rcu_read_unlock();
2121                         if (xps->xps_nxprts > 1) {
2122                                 long value;
2123
2124                                 xprt_release(task);
2125                                 value = atomic_long_dec_return(&xprt->queuelen);
2126                                 if (value == 0)
2127                                         rpc_xprt_switch_remove_xprt(xps, saved);
2128                                 xprt_put(saved);
2129                                 task->tk_xprt = NULL;
2130                                 task->tk_action = call_start;
2131                         }
2132                         xprt_switch_put(xps);
2133                         if (!task->tk_xprt)
2134                                 return;
2135                 }
2136                 goto out_retry;
2137         case -ENOBUFS:
2138                 rpc_delay(task, HZ >> 2);
2139                 goto out_retry;
2140         }
2141         rpc_call_rpcerror(task, status);
2142         return;
2143 out_next:
2144         task->tk_action = call_transmit;
2145         return;
2146 out_retry:
2147         /* Check for timeouts before looping back to call_bind */
2148         task->tk_action = call_bind;
2149         rpc_check_timeout(task);
2150 }
2151
2152 /*
2153  * 5.   Transmit the RPC request, and wait for reply
2154  */
2155 static void
2156 call_transmit(struct rpc_task *task)
2157 {
2158         if (rpc_task_transmitted(task)) {
2159                 rpc_task_handle_transmitted(task);
2160                 return;
2161         }
2162
2163         task->tk_action = call_transmit_status;
2164         if (!xprt_prepare_transmit(task))
2165                 return;
2166         task->tk_status = 0;
2167         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2168                 if (!xprt_connected(task->tk_xprt)) {
2169                         task->tk_status = -ENOTCONN;
2170                         return;
2171                 }
2172                 xprt_transmit(task);
2173         }
2174         xprt_end_transmit(task);
2175 }
2176
2177 /*
2178  * 5a.  Handle cleanup after a transmission
2179  */
2180 static void
2181 call_transmit_status(struct rpc_task *task)
2182 {
2183         task->tk_action = call_status;
2184
2185         /*
2186          * Common case: success.  Force the compiler to put this
2187          * test first.
2188          */
2189         if (rpc_task_transmitted(task)) {
2190                 task->tk_status = 0;
2191                 xprt_request_wait_receive(task);
2192                 return;
2193         }
2194
2195         switch (task->tk_status) {
2196         default:
2197                 break;
2198         case -EBADMSG:
2199                 task->tk_status = 0;
2200                 task->tk_action = call_encode;
2201                 break;
2202                 /*
2203                  * Special cases: if we've been waiting on the
2204                  * socket's write_space() callback, or if the
2205                  * socket just returned a connection error,
2206                  * then hold onto the transport lock.
2207                  */
2208         case -ENOMEM:
2209         case -ENOBUFS:
2210                 rpc_delay(task, HZ>>2);
2211                 fallthrough;
2212         case -EBADSLT:
2213         case -EAGAIN:
2214                 task->tk_action = call_transmit;
2215                 task->tk_status = 0;
2216                 break;
2217         case -ECONNREFUSED:
2218         case -EHOSTDOWN:
2219         case -ENETDOWN:
2220         case -EHOSTUNREACH:
2221         case -ENETUNREACH:
2222         case -EPERM:
2223                 if (RPC_IS_SOFTCONN(task)) {
2224                         if (!task->tk_msg.rpc_proc->p_proc)
2225                                 trace_xprt_ping(task->tk_xprt,
2226                                                 task->tk_status);
2227                         rpc_call_rpcerror(task, task->tk_status);
2228                         return;
2229                 }
2230                 fallthrough;
2231         case -ECONNRESET:
2232         case -ECONNABORTED:
2233         case -EADDRINUSE:
2234         case -ENOTCONN:
2235         case -EPIPE:
2236                 task->tk_action = call_bind;
2237                 task->tk_status = 0;
2238                 break;
2239         }
2240         rpc_check_timeout(task);
2241 }
2242
2243 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2244 static void call_bc_transmit(struct rpc_task *task);
2245 static void call_bc_transmit_status(struct rpc_task *task);
2246
2247 static void
2248 call_bc_encode(struct rpc_task *task)
2249 {
2250         xprt_request_enqueue_transmit(task);
2251         task->tk_action = call_bc_transmit;
2252 }
2253
2254 /*
2255  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2256  * addition, disconnect on connectivity errors.
2257  */
2258 static void
2259 call_bc_transmit(struct rpc_task *task)
2260 {
2261         task->tk_action = call_bc_transmit_status;
2262         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2263                 if (!xprt_prepare_transmit(task))
2264                         return;
2265                 task->tk_status = 0;
2266                 xprt_transmit(task);
2267         }
2268         xprt_end_transmit(task);
2269 }
2270
2271 static void
2272 call_bc_transmit_status(struct rpc_task *task)
2273 {
2274         struct rpc_rqst *req = task->tk_rqstp;
2275
2276         if (rpc_task_transmitted(task))
2277                 task->tk_status = 0;
2278
2279         switch (task->tk_status) {
2280         case 0:
2281                 /* Success */
2282         case -ENETDOWN:
2283         case -EHOSTDOWN:
2284         case -EHOSTUNREACH:
2285         case -ENETUNREACH:
2286         case -ECONNRESET:
2287         case -ECONNREFUSED:
2288         case -EADDRINUSE:
2289         case -ENOTCONN:
2290         case -EPIPE:
2291                 break;
2292         case -ENOMEM:
2293         case -ENOBUFS:
2294                 rpc_delay(task, HZ>>2);
2295                 fallthrough;
2296         case -EBADSLT:
2297         case -EAGAIN:
2298                 task->tk_status = 0;
2299                 task->tk_action = call_bc_transmit;
2300                 return;
2301         case -ETIMEDOUT:
2302                 /*
2303                  * Problem reaching the server.  Disconnect and let the
2304                  * forechannel reestablish the connection.  The server will
2305                  * have to retransmit the backchannel request and we'll
2306                  * reprocess it.  Since these ops are idempotent, there's no
2307                  * need to cache our reply at this time.
2308                  */
2309                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2310                         "error: %d\n", task->tk_status);
2311                 xprt_conditional_disconnect(req->rq_xprt,
2312                         req->rq_connect_cookie);
2313                 break;
2314         default:
2315                 /*
2316                  * We were unable to reply and will have to drop the
2317                  * request.  The server should reconnect and retransmit.
2318                  */
2319                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2320                         "error: %d\n", task->tk_status);
2321                 break;
2322         }
2323         task->tk_action = rpc_exit_task;
2324 }
2325 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2326
2327 /*
2328  * 6.   Sort out the RPC call status
2329  */
2330 static void
2331 call_status(struct rpc_task *task)
2332 {
2333         struct rpc_clnt *clnt = task->tk_client;
2334         int             status;
2335
2336         if (!task->tk_msg.rpc_proc->p_proc)
2337                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2338
2339         status = task->tk_status;
2340         if (status >= 0) {
2341                 task->tk_action = call_decode;
2342                 return;
2343         }
2344
2345         trace_rpc_call_status(task);
2346         task->tk_status = 0;
2347         switch(status) {
2348         case -EHOSTDOWN:
2349         case -ENETDOWN:
2350         case -EHOSTUNREACH:
2351         case -ENETUNREACH:
2352         case -EPERM:
2353                 if (RPC_IS_SOFTCONN(task))
2354                         goto out_exit;
2355                 /*
2356                  * Delay any retries for 3 seconds, then handle as if it
2357                  * were a timeout.
2358                  */
2359                 rpc_delay(task, 3*HZ);
2360                 fallthrough;
2361         case -ETIMEDOUT:
2362                 break;
2363         case -ECONNREFUSED:
2364         case -ECONNRESET:
2365         case -ECONNABORTED:
2366         case -ENOTCONN:
2367                 rpc_force_rebind(clnt);
2368                 break;
2369         case -EADDRINUSE:
2370                 rpc_delay(task, 3*HZ);
2371                 fallthrough;
2372         case -EPIPE:
2373         case -EAGAIN:
2374                 break;
2375         case -ENFILE:
2376         case -ENOBUFS:
2377         case -ENOMEM:
2378                 rpc_delay(task, HZ>>2);
2379                 break;
2380         case -EIO:
2381                 /* shutdown or soft timeout */
2382                 goto out_exit;
2383         default:
2384                 if (clnt->cl_chatty)
2385                         printk("%s: RPC call returned error %d\n",
2386                                clnt->cl_program->name, -status);
2387                 goto out_exit;
2388         }
2389         task->tk_action = call_encode;
2390         if (status != -ECONNRESET && status != -ECONNABORTED)
2391                 rpc_check_timeout(task);
2392         return;
2393 out_exit:
2394         rpc_call_rpcerror(task, status);
2395 }
2396
2397 static bool
2398 rpc_check_connected(const struct rpc_rqst *req)
2399 {
2400         /* No allocated request or transport? return true */
2401         if (!req || !req->rq_xprt)
2402                 return true;
2403         return xprt_connected(req->rq_xprt);
2404 }
2405
2406 static void
2407 rpc_check_timeout(struct rpc_task *task)
2408 {
2409         struct rpc_clnt *clnt = task->tk_client;
2410
2411         if (RPC_SIGNALLED(task)) {
2412                 rpc_call_rpcerror(task, -ERESTARTSYS);
2413                 return;
2414         }
2415
2416         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2417                 return;
2418
2419         trace_rpc_timeout_status(task);
2420         task->tk_timeouts++;
2421
2422         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2423                 rpc_call_rpcerror(task, -ETIMEDOUT);
2424                 return;
2425         }
2426
2427         if (RPC_IS_SOFT(task)) {
2428                 /*
2429                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2430                  * been sent, it should time out only if the transport
2431                  * connection gets terminally broken.
2432                  */
2433                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2434                     rpc_check_connected(task->tk_rqstp))
2435                         return;
2436
2437                 if (clnt->cl_chatty) {
2438                         pr_notice_ratelimited(
2439                                 "%s: server %s not responding, timed out\n",
2440                                 clnt->cl_program->name,
2441                                 task->tk_xprt->servername);
2442                 }
2443                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2444                         rpc_call_rpcerror(task, -ETIMEDOUT);
2445                 else
2446                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2447                 return;
2448         }
2449
2450         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2451                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2452                 if (clnt->cl_chatty) {
2453                         pr_notice_ratelimited(
2454                                 "%s: server %s not responding, still trying\n",
2455                                 clnt->cl_program->name,
2456                                 task->tk_xprt->servername);
2457                 }
2458         }
2459         rpc_force_rebind(clnt);
2460         /*
2461          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2462          * event? RFC2203 requires the server to drop all such requests.
2463          */
2464         rpcauth_invalcred(task);
2465 }
2466
2467 /*
2468  * 7.   Decode the RPC reply
2469  */
2470 static void
2471 call_decode(struct rpc_task *task)
2472 {
2473         struct rpc_clnt *clnt = task->tk_client;
2474         struct rpc_rqst *req = task->tk_rqstp;
2475         struct xdr_stream xdr;
2476         int err;
2477
2478         if (!task->tk_msg.rpc_proc->p_decode) {
2479                 task->tk_action = rpc_exit_task;
2480                 return;
2481         }
2482
2483         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2484                 if (clnt->cl_chatty) {
2485                         pr_notice_ratelimited("%s: server %s OK\n",
2486                                 clnt->cl_program->name,
2487                                 task->tk_xprt->servername);
2488                 }
2489                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2490         }
2491
2492         /*
2493          * Did we ever call xprt_complete_rqst()? If not, we should assume
2494          * the message is incomplete.
2495          */
2496         err = -EAGAIN;
2497         if (!req->rq_reply_bytes_recvd)
2498                 goto out;
2499
2500         /* Ensure that we see all writes made by xprt_complete_rqst()
2501          * before it changed req->rq_reply_bytes_recvd.
2502          */
2503         smp_rmb();
2504
2505         req->rq_rcv_buf.len = req->rq_private_buf.len;
2506         trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2507
2508         /* Check that the softirq receive buffer is valid */
2509         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2510                                 sizeof(req->rq_rcv_buf)) != 0);
2511
2512         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2513                         req->rq_rcv_buf.head[0].iov_base, req);
2514         err = rpc_decode_header(task, &xdr);
2515 out:
2516         switch (err) {
2517         case 0:
2518                 task->tk_action = rpc_exit_task;
2519                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2520                 return;
2521         case -EAGAIN:
2522                 task->tk_status = 0;
2523                 if (task->tk_client->cl_discrtry)
2524                         xprt_conditional_disconnect(req->rq_xprt,
2525                                                     req->rq_connect_cookie);
2526                 task->tk_action = call_encode;
2527                 rpc_check_timeout(task);
2528                 break;
2529         case -EKEYREJECTED:
2530                 task->tk_action = call_reserve;
2531                 rpc_check_timeout(task);
2532                 rpcauth_invalcred(task);
2533                 /* Ensure we obtain a new XID if we retry! */
2534                 xprt_release(task);
2535         }
2536 }
2537
2538 static int
2539 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2540 {
2541         struct rpc_clnt *clnt = task->tk_client;
2542         struct rpc_rqst *req = task->tk_rqstp;
2543         __be32 *p;
2544         int error;
2545
2546         error = -EMSGSIZE;
2547         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2548         if (!p)
2549                 goto out_fail;
2550         *p++ = req->rq_xid;
2551         *p++ = rpc_call;
2552         *p++ = cpu_to_be32(RPC_VERSION);
2553         *p++ = cpu_to_be32(clnt->cl_prog);
2554         *p++ = cpu_to_be32(clnt->cl_vers);
2555         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2556
2557         error = rpcauth_marshcred(task, xdr);
2558         if (error < 0)
2559                 goto out_fail;
2560         return 0;
2561 out_fail:
2562         trace_rpc_bad_callhdr(task);
2563         rpc_call_rpcerror(task, error);
2564         return error;
2565 }
2566
2567 static noinline int
2568 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2569 {
2570         struct rpc_clnt *clnt = task->tk_client;
2571         int error;
2572         __be32 *p;
2573
2574         /* RFC-1014 says that the representation of XDR data must be a
2575          * multiple of four bytes
2576          * - if it isn't pointer subtraction in the NFS client may give
2577          *   undefined results
2578          */
2579         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2580                 goto out_unparsable;
2581
2582         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2583         if (!p)
2584                 goto out_unparsable;
2585         p++;    /* skip XID */
2586         if (*p++ != rpc_reply)
2587                 goto out_unparsable;
2588         if (*p++ != rpc_msg_accepted)
2589                 goto out_msg_denied;
2590
2591         error = rpcauth_checkverf(task, xdr);
2592         if (error)
2593                 goto out_verifier;
2594
2595         p = xdr_inline_decode(xdr, sizeof(*p));
2596         if (!p)
2597                 goto out_unparsable;
2598         switch (*p) {
2599         case rpc_success:
2600                 return 0;
2601         case rpc_prog_unavail:
2602                 trace_rpc__prog_unavail(task);
2603                 error = -EPFNOSUPPORT;
2604                 goto out_err;
2605         case rpc_prog_mismatch:
2606                 trace_rpc__prog_mismatch(task);
2607                 error = -EPROTONOSUPPORT;
2608                 goto out_err;
2609         case rpc_proc_unavail:
2610                 trace_rpc__proc_unavail(task);
2611                 error = -EOPNOTSUPP;
2612                 goto out_err;
2613         case rpc_garbage_args:
2614         case rpc_system_err:
2615                 trace_rpc__garbage_args(task);
2616                 error = -EIO;
2617                 break;
2618         default:
2619                 goto out_unparsable;
2620         }
2621
2622 out_garbage:
2623         clnt->cl_stats->rpcgarbage++;
2624         if (task->tk_garb_retry) {
2625                 task->tk_garb_retry--;
2626                 task->tk_action = call_encode;
2627                 return -EAGAIN;
2628         }
2629 out_err:
2630         rpc_call_rpcerror(task, error);
2631         return error;
2632
2633 out_unparsable:
2634         trace_rpc__unparsable(task);
2635         error = -EIO;
2636         goto out_garbage;
2637
2638 out_verifier:
2639         trace_rpc_bad_verifier(task);
2640         goto out_garbage;
2641
2642 out_msg_denied:
2643         error = -EACCES;
2644         p = xdr_inline_decode(xdr, sizeof(*p));
2645         if (!p)
2646                 goto out_unparsable;
2647         switch (*p++) {
2648         case rpc_auth_error:
2649                 break;
2650         case rpc_mismatch:
2651                 trace_rpc__mismatch(task);
2652                 error = -EPROTONOSUPPORT;
2653                 goto out_err;
2654         default:
2655                 goto out_unparsable;
2656         }
2657
2658         p = xdr_inline_decode(xdr, sizeof(*p));
2659         if (!p)
2660                 goto out_unparsable;
2661         switch (*p++) {
2662         case rpc_autherr_rejectedcred:
2663         case rpc_autherr_rejectedverf:
2664         case rpcsec_gsserr_credproblem:
2665         case rpcsec_gsserr_ctxproblem:
2666                 if (!task->tk_cred_retry)
2667                         break;
2668                 task->tk_cred_retry--;
2669                 trace_rpc__stale_creds(task);
2670                 return -EKEYREJECTED;
2671         case rpc_autherr_badcred:
2672         case rpc_autherr_badverf:
2673                 /* possibly garbled cred/verf? */
2674                 if (!task->tk_garb_retry)
2675                         break;
2676                 task->tk_garb_retry--;
2677                 trace_rpc__bad_creds(task);
2678                 task->tk_action = call_encode;
2679                 return -EAGAIN;
2680         case rpc_autherr_tooweak:
2681                 trace_rpc__auth_tooweak(task);
2682                 pr_warn("RPC: server %s requires stronger authentication.\n",
2683                         task->tk_xprt->servername);
2684                 break;
2685         default:
2686                 goto out_unparsable;
2687         }
2688         goto out_err;
2689 }
2690
2691 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2692                 const void *obj)
2693 {
2694 }
2695
2696 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2697                 void *obj)
2698 {
2699         return 0;
2700 }
2701
2702 static const struct rpc_procinfo rpcproc_null = {
2703         .p_encode = rpcproc_encode_null,
2704         .p_decode = rpcproc_decode_null,
2705 };
2706
2707 static void
2708 rpc_null_call_prepare(struct rpc_task *task, void *data)
2709 {
2710         task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2711         rpc_call_start(task);
2712 }
2713
2714 static const struct rpc_call_ops rpc_null_ops = {
2715         .rpc_call_prepare = rpc_null_call_prepare,
2716         .rpc_call_done = rpc_default_callback,
2717 };
2718
2719 static
2720 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2721                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2722                 const struct rpc_call_ops *ops, void *data)
2723 {
2724         struct rpc_message msg = {
2725                 .rpc_proc = &rpcproc_null,
2726         };
2727         struct rpc_task_setup task_setup_data = {
2728                 .rpc_client = clnt,
2729                 .rpc_xprt = xprt,
2730                 .rpc_message = &msg,
2731                 .rpc_op_cred = cred,
2732                 .callback_ops = ops ?: &rpc_null_ops,
2733                 .callback_data = data,
2734                 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2735                          RPC_TASK_NULLCREDS,
2736         };
2737
2738         return rpc_run_task(&task_setup_data);
2739 }
2740
2741 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2742 {
2743         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2744 }
2745 EXPORT_SYMBOL_GPL(rpc_call_null);
2746
2747 static int rpc_ping(struct rpc_clnt *clnt)
2748 {
2749         struct rpc_task *task;
2750         int status;
2751
2752         task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2753         if (IS_ERR(task))
2754                 return PTR_ERR(task);
2755         status = task->tk_status;
2756         rpc_put_task(task);
2757         return status;
2758 }
2759
2760 struct rpc_cb_add_xprt_calldata {
2761         struct rpc_xprt_switch *xps;
2762         struct rpc_xprt *xprt;
2763 };
2764
2765 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2766 {
2767         struct rpc_cb_add_xprt_calldata *data = calldata;
2768
2769         if (task->tk_status == 0)
2770                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2771 }
2772
2773 static void rpc_cb_add_xprt_release(void *calldata)
2774 {
2775         struct rpc_cb_add_xprt_calldata *data = calldata;
2776
2777         xprt_put(data->xprt);
2778         xprt_switch_put(data->xps);
2779         kfree(data);
2780 }
2781
2782 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2783         .rpc_call_prepare = rpc_null_call_prepare,
2784         .rpc_call_done = rpc_cb_add_xprt_done,
2785         .rpc_release = rpc_cb_add_xprt_release,
2786 };
2787
2788 /**
2789  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2790  * @clnt: pointer to struct rpc_clnt
2791  * @xps: pointer to struct rpc_xprt_switch,
2792  * @xprt: pointer struct rpc_xprt
2793  * @dummy: unused
2794  */
2795 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2796                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2797                 void *dummy)
2798 {
2799         struct rpc_cb_add_xprt_calldata *data;
2800         struct rpc_task *task;
2801
2802         if (xps->xps_nunique_destaddr_xprts + 1 > clnt->cl_max_connect) {
2803                 rcu_read_lock();
2804                 pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2805                         "transport to server: %s\n", clnt->cl_max_connect,
2806                         rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2807                 rcu_read_unlock();
2808                 return -EINVAL;
2809         }
2810
2811         data = kmalloc(sizeof(*data), GFP_NOFS);
2812         if (!data)
2813                 return -ENOMEM;
2814         data->xps = xprt_switch_get(xps);
2815         data->xprt = xprt_get(xprt);
2816         if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2817                 rpc_cb_add_xprt_release(data);
2818                 goto success;
2819         }
2820
2821         task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2822                         &rpc_cb_add_xprt_call_ops, data);
2823         data->xps->xps_nunique_destaddr_xprts++;
2824         rpc_put_task(task);
2825 success:
2826         return 1;
2827 }
2828 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2829
2830 /**
2831  * rpc_clnt_setup_test_and_add_xprt()
2832  *
2833  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2834  *   1) caller of the test function must dereference the rpc_xprt_switch
2835  *   and the rpc_xprt.
2836  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2837  *   the rpc_call_done routine.
2838  *
2839  * Upon success (return of 1), the test function adds the new
2840  * transport to the rpc_clnt xprt switch
2841  *
2842  * @clnt: struct rpc_clnt to get the new transport
2843  * @xps:  the rpc_xprt_switch to hold the new transport
2844  * @xprt: the rpc_xprt to test
2845  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2846  *        and test function call data
2847  */
2848 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2849                                      struct rpc_xprt_switch *xps,
2850                                      struct rpc_xprt *xprt,
2851                                      void *data)
2852 {
2853         struct rpc_task *task;
2854         struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2855         int status = -EADDRINUSE;
2856
2857         xprt = xprt_get(xprt);
2858         xprt_switch_get(xps);
2859
2860         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2861                 goto out_err;
2862
2863         /* Test the connection */
2864         task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2865         if (IS_ERR(task)) {
2866                 status = PTR_ERR(task);
2867                 goto out_err;
2868         }
2869         status = task->tk_status;
2870         rpc_put_task(task);
2871
2872         if (status < 0)
2873                 goto out_err;
2874
2875         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2876         xtest->add_xprt_test(clnt, xprt, xtest->data);
2877
2878         xprt_put(xprt);
2879         xprt_switch_put(xps);
2880
2881         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2882         return 1;
2883 out_err:
2884         xprt_put(xprt);
2885         xprt_switch_put(xps);
2886         pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2887                 status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2888         return status;
2889 }
2890 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2891
2892 /**
2893  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2894  * @clnt: pointer to struct rpc_clnt
2895  * @xprtargs: pointer to struct xprt_create
2896  * @setup: callback to test and/or set up the connection
2897  * @data: pointer to setup function data
2898  *
2899  * Creates a new transport using the parameters set in args and
2900  * adds it to clnt.
2901  * If ping is set, then test that connectivity succeeds before
2902  * adding the new transport.
2903  *
2904  */
2905 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2906                 struct xprt_create *xprtargs,
2907                 int (*setup)(struct rpc_clnt *,
2908                         struct rpc_xprt_switch *,
2909                         struct rpc_xprt *,
2910                         void *),
2911                 void *data)
2912 {
2913         struct rpc_xprt_switch *xps;
2914         struct rpc_xprt *xprt;
2915         unsigned long connect_timeout;
2916         unsigned long reconnect_timeout;
2917         unsigned char resvport, reuseport;
2918         int ret = 0, ident;
2919
2920         rcu_read_lock();
2921         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2922         xprt = xprt_iter_xprt(&clnt->cl_xpi);
2923         if (xps == NULL || xprt == NULL) {
2924                 rcu_read_unlock();
2925                 xprt_switch_put(xps);
2926                 return -EAGAIN;
2927         }
2928         resvport = xprt->resvport;
2929         reuseport = xprt->reuseport;
2930         connect_timeout = xprt->connect_timeout;
2931         reconnect_timeout = xprt->max_reconnect_timeout;
2932         ident = xprt->xprt_class->ident;
2933         rcu_read_unlock();
2934
2935         if (!xprtargs->ident)
2936                 xprtargs->ident = ident;
2937         xprt = xprt_create_transport(xprtargs);
2938         if (IS_ERR(xprt)) {
2939                 ret = PTR_ERR(xprt);
2940                 goto out_put_switch;
2941         }
2942         xprt->resvport = resvport;
2943         xprt->reuseport = reuseport;
2944         if (xprt->ops->set_connect_timeout != NULL)
2945                 xprt->ops->set_connect_timeout(xprt,
2946                                 connect_timeout,
2947                                 reconnect_timeout);
2948
2949         rpc_xprt_switch_set_roundrobin(xps);
2950         if (setup) {
2951                 ret = setup(clnt, xps, xprt, data);
2952                 if (ret != 0)
2953                         goto out_put_xprt;
2954         }
2955         rpc_xprt_switch_add_xprt(xps, xprt);
2956 out_put_xprt:
2957         xprt_put(xprt);
2958 out_put_switch:
2959         xprt_switch_put(xps);
2960         return ret;
2961 }
2962 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2963
2964 struct connect_timeout_data {
2965         unsigned long connect_timeout;
2966         unsigned long reconnect_timeout;
2967 };
2968
2969 static int
2970 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2971                 struct rpc_xprt *xprt,
2972                 void *data)
2973 {
2974         struct connect_timeout_data *timeo = data;
2975
2976         if (xprt->ops->set_connect_timeout)
2977                 xprt->ops->set_connect_timeout(xprt,
2978                                 timeo->connect_timeout,
2979                                 timeo->reconnect_timeout);
2980         return 0;
2981 }
2982
2983 void
2984 rpc_set_connect_timeout(struct rpc_clnt *clnt,
2985                 unsigned long connect_timeout,
2986                 unsigned long reconnect_timeout)
2987 {
2988         struct connect_timeout_data timeout = {
2989                 .connect_timeout = connect_timeout,
2990                 .reconnect_timeout = reconnect_timeout,
2991         };
2992         rpc_clnt_iterate_for_each_xprt(clnt,
2993                         rpc_xprt_set_connect_timeout,
2994                         &timeout);
2995 }
2996 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2997
2998 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2999 {
3000         rcu_read_lock();
3001         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3002         rcu_read_unlock();
3003 }
3004 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
3005
3006 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3007 {
3008         rcu_read_lock();
3009         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3010                                  xprt);
3011         rcu_read_unlock();
3012 }
3013 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3014
3015 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3016                                    const struct sockaddr *sap)
3017 {
3018         struct rpc_xprt_switch *xps;
3019         bool ret;
3020
3021         rcu_read_lock();
3022         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3023         ret = rpc_xprt_switch_has_addr(xps, sap);
3024         rcu_read_unlock();
3025         return ret;
3026 }
3027 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3028
3029 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3030 static void rpc_show_header(void)
3031 {
3032         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3033                 "-timeout ---ops--\n");
3034 }
3035
3036 static void rpc_show_task(const struct rpc_clnt *clnt,
3037                           const struct rpc_task *task)
3038 {
3039         const char *rpc_waitq = "none";
3040
3041         if (RPC_IS_QUEUED(task))
3042                 rpc_waitq = rpc_qname(task->tk_waitqueue);
3043
3044         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3045                 task->tk_pid, task->tk_flags, task->tk_status,
3046                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3047                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3048                 task->tk_action, rpc_waitq);
3049 }
3050
3051 void rpc_show_tasks(struct net *net)
3052 {
3053         struct rpc_clnt *clnt;
3054         struct rpc_task *task;
3055         int header = 0;
3056         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3057
3058         spin_lock(&sn->rpc_client_lock);
3059         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3060                 spin_lock(&clnt->cl_lock);
3061                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3062                         if (!header) {
3063                                 rpc_show_header();
3064                                 header++;
3065                         }
3066                         rpc_show_task(clnt, task);
3067                 }
3068                 spin_unlock(&clnt->cl_lock);
3069         }
3070         spin_unlock(&sn->rpc_client_lock);
3071 }
3072 #endif
3073
3074 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3075 static int
3076 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3077                 struct rpc_xprt *xprt,
3078                 void *dummy)
3079 {
3080         return xprt_enable_swap(xprt);
3081 }
3082
3083 int
3084 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3085 {
3086         if (atomic_inc_return(&clnt->cl_swapper) == 1)
3087                 return rpc_clnt_iterate_for_each_xprt(clnt,
3088                                 rpc_clnt_swap_activate_callback, NULL);
3089         return 0;
3090 }
3091 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3092
3093 static int
3094 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3095                 struct rpc_xprt *xprt,
3096                 void *dummy)
3097 {
3098         xprt_disable_swap(xprt);
3099         return 0;
3100 }
3101
3102 void
3103 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3104 {
3105         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3106                 rpc_clnt_iterate_for_each_xprt(clnt,
3107                                 rpc_clnt_swap_deactivate_callback, NULL);
3108 }
3109 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3110 #endif /* CONFIG_SUNRPC_SWAP */