Merge tag 'i3c/for-6.6' of git://git.kernel.org/pub/scm/linux/kernel/git/i3c/linux
[platform/kernel/linux-starfive.git] / fs / ceph / mds_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14 #include <linux/bitmap.h>
15
16 #include "super.h"
17 #include "mds_client.h"
18 #include "crypto.h"
19
20 #include <linux/ceph/ceph_features.h>
21 #include <linux/ceph/messenger.h>
22 #include <linux/ceph/decode.h>
23 #include <linux/ceph/pagelist.h>
24 #include <linux/ceph/auth.h>
25 #include <linux/ceph/debugfs.h>
26
27 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
28
29 /*
30  * A cluster of MDS (metadata server) daemons is responsible for
31  * managing the file system namespace (the directory hierarchy and
32  * inodes) and for coordinating shared access to storage.  Metadata is
33  * partitioning hierarchically across a number of servers, and that
34  * partition varies over time as the cluster adjusts the distribution
35  * in order to balance load.
36  *
37  * The MDS client is primarily responsible to managing synchronous
38  * metadata requests for operations like open, unlink, and so forth.
39  * If there is a MDS failure, we find out about it when we (possibly
40  * request and) receive a new MDS map, and can resubmit affected
41  * requests.
42  *
43  * For the most part, though, we take advantage of a lossless
44  * communications channel to the MDS, and do not need to worry about
45  * timing out or resubmitting requests.
46  *
47  * We maintain a stateful "session" with each MDS we interact with.
48  * Within each session, we sent periodic heartbeat messages to ensure
49  * any capabilities or leases we have been issues remain valid.  If
50  * the session times out and goes stale, our leases and capabilities
51  * are no longer valid.
52  */
53
54 struct ceph_reconnect_state {
55         struct ceph_mds_session *session;
56         int nr_caps, nr_realms;
57         struct ceph_pagelist *pagelist;
58         unsigned msg_version;
59         bool allow_multi;
60 };
61
62 static void __wake_requests(struct ceph_mds_client *mdsc,
63                             struct list_head *head);
64 static void ceph_cap_release_work(struct work_struct *work);
65 static void ceph_cap_reclaim_work(struct work_struct *work);
66
67 static const struct ceph_connection_operations mds_con_ops;
68
69
70 /*
71  * mds reply parsing
72  */
73
74 static int parse_reply_info_quota(void **p, void *end,
75                                   struct ceph_mds_reply_info_in *info)
76 {
77         u8 struct_v, struct_compat;
78         u32 struct_len;
79
80         ceph_decode_8_safe(p, end, struct_v, bad);
81         ceph_decode_8_safe(p, end, struct_compat, bad);
82         /* struct_v is expected to be >= 1. we only
83          * understand encoding with struct_compat == 1. */
84         if (!struct_v || struct_compat != 1)
85                 goto bad;
86         ceph_decode_32_safe(p, end, struct_len, bad);
87         ceph_decode_need(p, end, struct_len, bad);
88         end = *p + struct_len;
89         ceph_decode_64_safe(p, end, info->max_bytes, bad);
90         ceph_decode_64_safe(p, end, info->max_files, bad);
91         *p = end;
92         return 0;
93 bad:
94         return -EIO;
95 }
96
97 /*
98  * parse individual inode info
99  */
100 static int parse_reply_info_in(void **p, void *end,
101                                struct ceph_mds_reply_info_in *info,
102                                u64 features)
103 {
104         int err = 0;
105         u8 struct_v = 0;
106
107         if (features == (u64)-1) {
108                 u32 struct_len;
109                 u8 struct_compat;
110                 ceph_decode_8_safe(p, end, struct_v, bad);
111                 ceph_decode_8_safe(p, end, struct_compat, bad);
112                 /* struct_v is expected to be >= 1. we only understand
113                  * encoding with struct_compat == 1. */
114                 if (!struct_v || struct_compat != 1)
115                         goto bad;
116                 ceph_decode_32_safe(p, end, struct_len, bad);
117                 ceph_decode_need(p, end, struct_len, bad);
118                 end = *p + struct_len;
119         }
120
121         ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
122         info->in = *p;
123         *p += sizeof(struct ceph_mds_reply_inode) +
124                 sizeof(*info->in->fragtree.splits) *
125                 le32_to_cpu(info->in->fragtree.nsplits);
126
127         ceph_decode_32_safe(p, end, info->symlink_len, bad);
128         ceph_decode_need(p, end, info->symlink_len, bad);
129         info->symlink = *p;
130         *p += info->symlink_len;
131
132         ceph_decode_copy_safe(p, end, &info->dir_layout,
133                               sizeof(info->dir_layout), bad);
134         ceph_decode_32_safe(p, end, info->xattr_len, bad);
135         ceph_decode_need(p, end, info->xattr_len, bad);
136         info->xattr_data = *p;
137         *p += info->xattr_len;
138
139         if (features == (u64)-1) {
140                 /* inline data */
141                 ceph_decode_64_safe(p, end, info->inline_version, bad);
142                 ceph_decode_32_safe(p, end, info->inline_len, bad);
143                 ceph_decode_need(p, end, info->inline_len, bad);
144                 info->inline_data = *p;
145                 *p += info->inline_len;
146                 /* quota */
147                 err = parse_reply_info_quota(p, end, info);
148                 if (err < 0)
149                         goto out_bad;
150                 /* pool namespace */
151                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
152                 if (info->pool_ns_len > 0) {
153                         ceph_decode_need(p, end, info->pool_ns_len, bad);
154                         info->pool_ns_data = *p;
155                         *p += info->pool_ns_len;
156                 }
157
158                 /* btime */
159                 ceph_decode_need(p, end, sizeof(info->btime), bad);
160                 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
161
162                 /* change attribute */
163                 ceph_decode_64_safe(p, end, info->change_attr, bad);
164
165                 /* dir pin */
166                 if (struct_v >= 2) {
167                         ceph_decode_32_safe(p, end, info->dir_pin, bad);
168                 } else {
169                         info->dir_pin = -ENODATA;
170                 }
171
172                 /* snapshot birth time, remains zero for v<=2 */
173                 if (struct_v >= 3) {
174                         ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
175                         ceph_decode_copy(p, &info->snap_btime,
176                                          sizeof(info->snap_btime));
177                 } else {
178                         memset(&info->snap_btime, 0, sizeof(info->snap_btime));
179                 }
180
181                 /* snapshot count, remains zero for v<=3 */
182                 if (struct_v >= 4) {
183                         ceph_decode_64_safe(p, end, info->rsnaps, bad);
184                 } else {
185                         info->rsnaps = 0;
186                 }
187
188                 if (struct_v >= 5) {
189                         u32 alen;
190
191                         ceph_decode_32_safe(p, end, alen, bad);
192
193                         while (alen--) {
194                                 u32 len;
195
196                                 /* key */
197                                 ceph_decode_32_safe(p, end, len, bad);
198                                 ceph_decode_skip_n(p, end, len, bad);
199                                 /* value */
200                                 ceph_decode_32_safe(p, end, len, bad);
201                                 ceph_decode_skip_n(p, end, len, bad);
202                         }
203                 }
204
205                 /* fscrypt flag -- ignore */
206                 if (struct_v >= 6)
207                         ceph_decode_skip_8(p, end, bad);
208
209                 info->fscrypt_auth = NULL;
210                 info->fscrypt_auth_len = 0;
211                 info->fscrypt_file = NULL;
212                 info->fscrypt_file_len = 0;
213                 if (struct_v >= 7) {
214                         ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
215                         if (info->fscrypt_auth_len) {
216                                 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
217                                                              GFP_KERNEL);
218                                 if (!info->fscrypt_auth)
219                                         return -ENOMEM;
220                                 ceph_decode_copy_safe(p, end, info->fscrypt_auth,
221                                                       info->fscrypt_auth_len, bad);
222                         }
223                         ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
224                         if (info->fscrypt_file_len) {
225                                 info->fscrypt_file = kmalloc(info->fscrypt_file_len,
226                                                              GFP_KERNEL);
227                                 if (!info->fscrypt_file)
228                                         return -ENOMEM;
229                                 ceph_decode_copy_safe(p, end, info->fscrypt_file,
230                                                       info->fscrypt_file_len, bad);
231                         }
232                 }
233                 *p = end;
234         } else {
235                 /* legacy (unversioned) struct */
236                 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
237                         ceph_decode_64_safe(p, end, info->inline_version, bad);
238                         ceph_decode_32_safe(p, end, info->inline_len, bad);
239                         ceph_decode_need(p, end, info->inline_len, bad);
240                         info->inline_data = *p;
241                         *p += info->inline_len;
242                 } else
243                         info->inline_version = CEPH_INLINE_NONE;
244
245                 if (features & CEPH_FEATURE_MDS_QUOTA) {
246                         err = parse_reply_info_quota(p, end, info);
247                         if (err < 0)
248                                 goto out_bad;
249                 } else {
250                         info->max_bytes = 0;
251                         info->max_files = 0;
252                 }
253
254                 info->pool_ns_len = 0;
255                 info->pool_ns_data = NULL;
256                 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
257                         ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
258                         if (info->pool_ns_len > 0) {
259                                 ceph_decode_need(p, end, info->pool_ns_len, bad);
260                                 info->pool_ns_data = *p;
261                                 *p += info->pool_ns_len;
262                         }
263                 }
264
265                 if (features & CEPH_FEATURE_FS_BTIME) {
266                         ceph_decode_need(p, end, sizeof(info->btime), bad);
267                         ceph_decode_copy(p, &info->btime, sizeof(info->btime));
268                         ceph_decode_64_safe(p, end, info->change_attr, bad);
269                 }
270
271                 info->dir_pin = -ENODATA;
272                 /* info->snap_btime and info->rsnaps remain zero */
273         }
274         return 0;
275 bad:
276         err = -EIO;
277 out_bad:
278         return err;
279 }
280
281 static int parse_reply_info_dir(void **p, void *end,
282                                 struct ceph_mds_reply_dirfrag **dirfrag,
283                                 u64 features)
284 {
285         if (features == (u64)-1) {
286                 u8 struct_v, struct_compat;
287                 u32 struct_len;
288                 ceph_decode_8_safe(p, end, struct_v, bad);
289                 ceph_decode_8_safe(p, end, struct_compat, bad);
290                 /* struct_v is expected to be >= 1. we only understand
291                  * encoding whose struct_compat == 1. */
292                 if (!struct_v || struct_compat != 1)
293                         goto bad;
294                 ceph_decode_32_safe(p, end, struct_len, bad);
295                 ceph_decode_need(p, end, struct_len, bad);
296                 end = *p + struct_len;
297         }
298
299         ceph_decode_need(p, end, sizeof(**dirfrag), bad);
300         *dirfrag = *p;
301         *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
302         if (unlikely(*p > end))
303                 goto bad;
304         if (features == (u64)-1)
305                 *p = end;
306         return 0;
307 bad:
308         return -EIO;
309 }
310
311 static int parse_reply_info_lease(void **p, void *end,
312                                   struct ceph_mds_reply_lease **lease,
313                                   u64 features, u32 *altname_len, u8 **altname)
314 {
315         u8 struct_v;
316         u32 struct_len;
317         void *lend;
318
319         if (features == (u64)-1) {
320                 u8 struct_compat;
321
322                 ceph_decode_8_safe(p, end, struct_v, bad);
323                 ceph_decode_8_safe(p, end, struct_compat, bad);
324
325                 /* struct_v is expected to be >= 1. we only understand
326                  * encoding whose struct_compat == 1. */
327                 if (!struct_v || struct_compat != 1)
328                         goto bad;
329
330                 ceph_decode_32_safe(p, end, struct_len, bad);
331         } else {
332                 struct_len = sizeof(**lease);
333                 *altname_len = 0;
334                 *altname = NULL;
335         }
336
337         lend = *p + struct_len;
338         ceph_decode_need(p, end, struct_len, bad);
339         *lease = *p;
340         *p += sizeof(**lease);
341
342         if (features == (u64)-1) {
343                 if (struct_v >= 2) {
344                         ceph_decode_32_safe(p, end, *altname_len, bad);
345                         ceph_decode_need(p, end, *altname_len, bad);
346                         *altname = *p;
347                         *p += *altname_len;
348                 } else {
349                         *altname = NULL;
350                         *altname_len = 0;
351                 }
352         }
353         *p = lend;
354         return 0;
355 bad:
356         return -EIO;
357 }
358
359 /*
360  * parse a normal reply, which may contain a (dir+)dentry and/or a
361  * target inode.
362  */
363 static int parse_reply_info_trace(void **p, void *end,
364                                   struct ceph_mds_reply_info_parsed *info,
365                                   u64 features)
366 {
367         int err;
368
369         if (info->head->is_dentry) {
370                 err = parse_reply_info_in(p, end, &info->diri, features);
371                 if (err < 0)
372                         goto out_bad;
373
374                 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
375                 if (err < 0)
376                         goto out_bad;
377
378                 ceph_decode_32_safe(p, end, info->dname_len, bad);
379                 ceph_decode_need(p, end, info->dname_len, bad);
380                 info->dname = *p;
381                 *p += info->dname_len;
382
383                 err = parse_reply_info_lease(p, end, &info->dlease, features,
384                                              &info->altname_len, &info->altname);
385                 if (err < 0)
386                         goto out_bad;
387         }
388
389         if (info->head->is_target) {
390                 err = parse_reply_info_in(p, end, &info->targeti, features);
391                 if (err < 0)
392                         goto out_bad;
393         }
394
395         if (unlikely(*p != end))
396                 goto bad;
397         return 0;
398
399 bad:
400         err = -EIO;
401 out_bad:
402         pr_err("problem parsing mds trace %d\n", err);
403         return err;
404 }
405
406 /*
407  * parse readdir results
408  */
409 static int parse_reply_info_readdir(void **p, void *end,
410                                     struct ceph_mds_request *req,
411                                     u64 features)
412 {
413         struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
414         u32 num, i = 0;
415         int err;
416
417         err = parse_reply_info_dir(p, end, &info->dir_dir, features);
418         if (err < 0)
419                 goto out_bad;
420
421         ceph_decode_need(p, end, sizeof(num) + 2, bad);
422         num = ceph_decode_32(p);
423         {
424                 u16 flags = ceph_decode_16(p);
425                 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
426                 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
427                 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
428                 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
429         }
430         if (num == 0)
431                 goto done;
432
433         BUG_ON(!info->dir_entries);
434         if ((unsigned long)(info->dir_entries + num) >
435             (unsigned long)info->dir_entries + info->dir_buf_size) {
436                 pr_err("dir contents are larger than expected\n");
437                 WARN_ON(1);
438                 goto bad;
439         }
440
441         info->dir_nr = num;
442         while (num) {
443                 struct inode *inode = d_inode(req->r_dentry);
444                 struct ceph_inode_info *ci = ceph_inode(inode);
445                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
446                 struct fscrypt_str tname = FSTR_INIT(NULL, 0);
447                 struct fscrypt_str oname = FSTR_INIT(NULL, 0);
448                 struct ceph_fname fname;
449                 u32 altname_len, _name_len;
450                 u8 *altname, *_name;
451
452                 /* dentry */
453                 ceph_decode_32_safe(p, end, _name_len, bad);
454                 ceph_decode_need(p, end, _name_len, bad);
455                 _name = *p;
456                 *p += _name_len;
457                 dout("parsed dir dname '%.*s'\n", _name_len, _name);
458
459                 if (info->hash_order)
460                         rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
461                                                       _name, _name_len);
462
463                 /* dentry lease */
464                 err = parse_reply_info_lease(p, end, &rde->lease, features,
465                                              &altname_len, &altname);
466                 if (err)
467                         goto out_bad;
468
469                 /*
470                  * Try to dencrypt the dentry names and update them
471                  * in the ceph_mds_reply_dir_entry struct.
472                  */
473                 fname.dir = inode;
474                 fname.name = _name;
475                 fname.name_len = _name_len;
476                 fname.ctext = altname;
477                 fname.ctext_len = altname_len;
478                 /*
479                  * The _name_len maybe larger than altname_len, such as
480                  * when the human readable name length is in range of
481                  * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
482                  * then the copy in ceph_fname_to_usr will corrupt the
483                  * data if there has no encryption key.
484                  *
485                  * Just set the no_copy flag and then if there has no
486                  * encryption key the oname.name will be assigned to
487                  * _name always.
488                  */
489                 fname.no_copy = true;
490                 if (altname_len == 0) {
491                         /*
492                          * Set tname to _name, and this will be used
493                          * to do the base64_decode in-place. It's
494                          * safe because the decoded string should
495                          * always be shorter, which is 3/4 of origin
496                          * string.
497                          */
498                         tname.name = _name;
499
500                         /*
501                          * Set oname to _name too, and this will be
502                          * used to do the dencryption in-place.
503                          */
504                         oname.name = _name;
505                         oname.len = _name_len;
506                 } else {
507                         /*
508                          * This will do the decryption only in-place
509                          * from altname cryptext directly.
510                          */
511                         oname.name = altname;
512                         oname.len = altname_len;
513                 }
514                 rde->is_nokey = false;
515                 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
516                 if (err) {
517                         pr_err("%s unable to decode %.*s, got %d\n", __func__,
518                                _name_len, _name, err);
519                         goto out_bad;
520                 }
521                 rde->name = oname.name;
522                 rde->name_len = oname.len;
523
524                 /* inode */
525                 err = parse_reply_info_in(p, end, &rde->inode, features);
526                 if (err < 0)
527                         goto out_bad;
528                 /* ceph_readdir_prepopulate() will update it */
529                 rde->offset = 0;
530                 i++;
531                 num--;
532         }
533
534 done:
535         /* Skip over any unrecognized fields */
536         *p = end;
537         return 0;
538
539 bad:
540         err = -EIO;
541 out_bad:
542         pr_err("problem parsing dir contents %d\n", err);
543         return err;
544 }
545
546 /*
547  * parse fcntl F_GETLK results
548  */
549 static int parse_reply_info_filelock(void **p, void *end,
550                                      struct ceph_mds_reply_info_parsed *info,
551                                      u64 features)
552 {
553         if (*p + sizeof(*info->filelock_reply) > end)
554                 goto bad;
555
556         info->filelock_reply = *p;
557
558         /* Skip over any unrecognized fields */
559         *p = end;
560         return 0;
561 bad:
562         return -EIO;
563 }
564
565
566 #if BITS_PER_LONG == 64
567
568 #define DELEGATED_INO_AVAILABLE         xa_mk_value(1)
569
570 static int ceph_parse_deleg_inos(void **p, void *end,
571                                  struct ceph_mds_session *s)
572 {
573         u32 sets;
574
575         ceph_decode_32_safe(p, end, sets, bad);
576         dout("got %u sets of delegated inodes\n", sets);
577         while (sets--) {
578                 u64 start, len;
579
580                 ceph_decode_64_safe(p, end, start, bad);
581                 ceph_decode_64_safe(p, end, len, bad);
582
583                 /* Don't accept a delegation of system inodes */
584                 if (start < CEPH_INO_SYSTEM_BASE) {
585                         pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
586                                         start, len);
587                         continue;
588                 }
589                 while (len--) {
590                         int err = xa_insert(&s->s_delegated_inos, start++,
591                                             DELEGATED_INO_AVAILABLE,
592                                             GFP_KERNEL);
593                         if (!err) {
594                                 dout("added delegated inode 0x%llx\n",
595                                      start - 1);
596                         } else if (err == -EBUSY) {
597                                 pr_warn("MDS delegated inode 0x%llx more than once.\n",
598                                         start - 1);
599                         } else {
600                                 return err;
601                         }
602                 }
603         }
604         return 0;
605 bad:
606         return -EIO;
607 }
608
609 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
610 {
611         unsigned long ino;
612         void *val;
613
614         xa_for_each(&s->s_delegated_inos, ino, val) {
615                 val = xa_erase(&s->s_delegated_inos, ino);
616                 if (val == DELEGATED_INO_AVAILABLE)
617                         return ino;
618         }
619         return 0;
620 }
621
622 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
623 {
624         return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
625                          GFP_KERNEL);
626 }
627 #else /* BITS_PER_LONG == 64 */
628 /*
629  * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
630  * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
631  * and bottom words?
632  */
633 static int ceph_parse_deleg_inos(void **p, void *end,
634                                  struct ceph_mds_session *s)
635 {
636         u32 sets;
637
638         ceph_decode_32_safe(p, end, sets, bad);
639         if (sets)
640                 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
641         return 0;
642 bad:
643         return -EIO;
644 }
645
646 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
647 {
648         return 0;
649 }
650
651 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
652 {
653         return 0;
654 }
655 #endif /* BITS_PER_LONG == 64 */
656
657 /*
658  * parse create results
659  */
660 static int parse_reply_info_create(void **p, void *end,
661                                   struct ceph_mds_reply_info_parsed *info,
662                                   u64 features, struct ceph_mds_session *s)
663 {
664         int ret;
665
666         if (features == (u64)-1 ||
667             (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
668                 if (*p == end) {
669                         /* Malformed reply? */
670                         info->has_create_ino = false;
671                 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
672                         info->has_create_ino = true;
673                         /* struct_v, struct_compat, and len */
674                         ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
675                         ceph_decode_64_safe(p, end, info->ino, bad);
676                         ret = ceph_parse_deleg_inos(p, end, s);
677                         if (ret)
678                                 return ret;
679                 } else {
680                         /* legacy */
681                         ceph_decode_64_safe(p, end, info->ino, bad);
682                         info->has_create_ino = true;
683                 }
684         } else {
685                 if (*p != end)
686                         goto bad;
687         }
688
689         /* Skip over any unrecognized fields */
690         *p = end;
691         return 0;
692 bad:
693         return -EIO;
694 }
695
696 static int parse_reply_info_getvxattr(void **p, void *end,
697                                       struct ceph_mds_reply_info_parsed *info,
698                                       u64 features)
699 {
700         u32 value_len;
701
702         ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
703         ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
704         ceph_decode_skip_32(p, end, bad); /* skip payload length */
705
706         ceph_decode_32_safe(p, end, value_len, bad);
707
708         if (value_len == end - *p) {
709           info->xattr_info.xattr_value = *p;
710           info->xattr_info.xattr_value_len = value_len;
711           *p = end;
712           return value_len;
713         }
714 bad:
715         return -EIO;
716 }
717
718 /*
719  * parse extra results
720  */
721 static int parse_reply_info_extra(void **p, void *end,
722                                   struct ceph_mds_request *req,
723                                   u64 features, struct ceph_mds_session *s)
724 {
725         struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
726         u32 op = le32_to_cpu(info->head->op);
727
728         if (op == CEPH_MDS_OP_GETFILELOCK)
729                 return parse_reply_info_filelock(p, end, info, features);
730         else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
731                 return parse_reply_info_readdir(p, end, req, features);
732         else if (op == CEPH_MDS_OP_CREATE)
733                 return parse_reply_info_create(p, end, info, features, s);
734         else if (op == CEPH_MDS_OP_GETVXATTR)
735                 return parse_reply_info_getvxattr(p, end, info, features);
736         else
737                 return -EIO;
738 }
739
740 /*
741  * parse entire mds reply
742  */
743 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
744                             struct ceph_mds_request *req, u64 features)
745 {
746         struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
747         void *p, *end;
748         u32 len;
749         int err;
750
751         info->head = msg->front.iov_base;
752         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
753         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
754
755         /* trace */
756         ceph_decode_32_safe(&p, end, len, bad);
757         if (len > 0) {
758                 ceph_decode_need(&p, end, len, bad);
759                 err = parse_reply_info_trace(&p, p+len, info, features);
760                 if (err < 0)
761                         goto out_bad;
762         }
763
764         /* extra */
765         ceph_decode_32_safe(&p, end, len, bad);
766         if (len > 0) {
767                 ceph_decode_need(&p, end, len, bad);
768                 err = parse_reply_info_extra(&p, p+len, req, features, s);
769                 if (err < 0)
770                         goto out_bad;
771         }
772
773         /* snap blob */
774         ceph_decode_32_safe(&p, end, len, bad);
775         info->snapblob_len = len;
776         info->snapblob = p;
777         p += len;
778
779         if (p != end)
780                 goto bad;
781         return 0;
782
783 bad:
784         err = -EIO;
785 out_bad:
786         pr_err("mds parse_reply err %d\n", err);
787         ceph_msg_dump(msg);
788         return err;
789 }
790
791 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
792 {
793         int i;
794
795         kfree(info->diri.fscrypt_auth);
796         kfree(info->diri.fscrypt_file);
797         kfree(info->targeti.fscrypt_auth);
798         kfree(info->targeti.fscrypt_file);
799         if (!info->dir_entries)
800                 return;
801
802         for (i = 0; i < info->dir_nr; i++) {
803                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
804
805                 kfree(rde->inode.fscrypt_auth);
806                 kfree(rde->inode.fscrypt_file);
807         }
808         free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
809 }
810
811 /*
812  * In async unlink case the kclient won't wait for the first reply
813  * from MDS and just drop all the links and unhash the dentry and then
814  * succeeds immediately.
815  *
816  * For any new create/link/rename,etc requests followed by using the
817  * same file names we must wait for the first reply of the inflight
818  * unlink request, or the MDS possibly will fail these following
819  * requests with -EEXIST if the inflight async unlink request was
820  * delayed for some reasons.
821  *
822  * And the worst case is that for the none async openc request it will
823  * successfully open the file if the CDentry hasn't been unlinked yet,
824  * but later the previous delayed async unlink request will remove the
825  * CDenty. That means the just created file is possiblly deleted later
826  * by accident.
827  *
828  * We need to wait for the inflight async unlink requests to finish
829  * when creating new files/directories by using the same file names.
830  */
831 int ceph_wait_on_conflict_unlink(struct dentry *dentry)
832 {
833         struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
834         struct dentry *pdentry = dentry->d_parent;
835         struct dentry *udentry, *found = NULL;
836         struct ceph_dentry_info *di;
837         struct qstr dname;
838         u32 hash = dentry->d_name.hash;
839         int err;
840
841         dname.name = dentry->d_name.name;
842         dname.len = dentry->d_name.len;
843
844         rcu_read_lock();
845         hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
846                                    hnode, hash) {
847                 udentry = di->dentry;
848
849                 spin_lock(&udentry->d_lock);
850                 if (udentry->d_name.hash != hash)
851                         goto next;
852                 if (unlikely(udentry->d_parent != pdentry))
853                         goto next;
854                 if (!hash_hashed(&di->hnode))
855                         goto next;
856
857                 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
858                         pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
859                                 __func__, dentry, dentry);
860
861                 if (!d_same_name(udentry, pdentry, &dname))
862                         goto next;
863
864                 spin_unlock(&udentry->d_lock);
865                 found = dget(udentry);
866                 break;
867 next:
868                 spin_unlock(&udentry->d_lock);
869         }
870         rcu_read_unlock();
871
872         if (likely(!found))
873                 return 0;
874
875         dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
876              dentry, dentry, found, found);
877
878         err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
879                           TASK_KILLABLE);
880         dput(found);
881         return err;
882 }
883
884
885 /*
886  * sessions
887  */
888 const char *ceph_session_state_name(int s)
889 {
890         switch (s) {
891         case CEPH_MDS_SESSION_NEW: return "new";
892         case CEPH_MDS_SESSION_OPENING: return "opening";
893         case CEPH_MDS_SESSION_OPEN: return "open";
894         case CEPH_MDS_SESSION_HUNG: return "hung";
895         case CEPH_MDS_SESSION_CLOSING: return "closing";
896         case CEPH_MDS_SESSION_CLOSED: return "closed";
897         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
898         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
899         case CEPH_MDS_SESSION_REJECTED: return "rejected";
900         default: return "???";
901         }
902 }
903
904 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
905 {
906         if (refcount_inc_not_zero(&s->s_ref))
907                 return s;
908         return NULL;
909 }
910
911 void ceph_put_mds_session(struct ceph_mds_session *s)
912 {
913         if (IS_ERR_OR_NULL(s))
914                 return;
915
916         if (refcount_dec_and_test(&s->s_ref)) {
917                 if (s->s_auth.authorizer)
918                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
919                 WARN_ON(mutex_is_locked(&s->s_mutex));
920                 xa_destroy(&s->s_delegated_inos);
921                 kfree(s);
922         }
923 }
924
925 /*
926  * called under mdsc->mutex
927  */
928 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
929                                                    int mds)
930 {
931         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
932                 return NULL;
933         return ceph_get_mds_session(mdsc->sessions[mds]);
934 }
935
936 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
937 {
938         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
939                 return false;
940         else
941                 return true;
942 }
943
944 static int __verify_registered_session(struct ceph_mds_client *mdsc,
945                                        struct ceph_mds_session *s)
946 {
947         if (s->s_mds >= mdsc->max_sessions ||
948             mdsc->sessions[s->s_mds] != s)
949                 return -ENOENT;
950         return 0;
951 }
952
953 /*
954  * create+register a new session for given mds.
955  * called under mdsc->mutex.
956  */
957 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
958                                                  int mds)
959 {
960         struct ceph_mds_session *s;
961
962         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
963                 return ERR_PTR(-EIO);
964
965         if (mds >= mdsc->mdsmap->possible_max_rank)
966                 return ERR_PTR(-EINVAL);
967
968         s = kzalloc(sizeof(*s), GFP_NOFS);
969         if (!s)
970                 return ERR_PTR(-ENOMEM);
971
972         if (mds >= mdsc->max_sessions) {
973                 int newmax = 1 << get_count_order(mds + 1);
974                 struct ceph_mds_session **sa;
975
976                 dout("%s: realloc to %d\n", __func__, newmax);
977                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
978                 if (!sa)
979                         goto fail_realloc;
980                 if (mdsc->sessions) {
981                         memcpy(sa, mdsc->sessions,
982                                mdsc->max_sessions * sizeof(void *));
983                         kfree(mdsc->sessions);
984                 }
985                 mdsc->sessions = sa;
986                 mdsc->max_sessions = newmax;
987         }
988
989         dout("%s: mds%d\n", __func__, mds);
990         s->s_mdsc = mdsc;
991         s->s_mds = mds;
992         s->s_state = CEPH_MDS_SESSION_NEW;
993         mutex_init(&s->s_mutex);
994
995         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
996
997         atomic_set(&s->s_cap_gen, 1);
998         s->s_cap_ttl = jiffies - 1;
999
1000         spin_lock_init(&s->s_cap_lock);
1001         INIT_LIST_HEAD(&s->s_caps);
1002         refcount_set(&s->s_ref, 1);
1003         INIT_LIST_HEAD(&s->s_waiting);
1004         INIT_LIST_HEAD(&s->s_unsafe);
1005         xa_init(&s->s_delegated_inos);
1006         INIT_LIST_HEAD(&s->s_cap_releases);
1007         INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1008
1009         INIT_LIST_HEAD(&s->s_cap_dirty);
1010         INIT_LIST_HEAD(&s->s_cap_flushing);
1011
1012         mdsc->sessions[mds] = s;
1013         atomic_inc(&mdsc->num_sessions);
1014         refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
1015
1016         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1017                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1018
1019         return s;
1020
1021 fail_realloc:
1022         kfree(s);
1023         return ERR_PTR(-ENOMEM);
1024 }
1025
1026 /*
1027  * called under mdsc->mutex
1028  */
1029 static void __unregister_session(struct ceph_mds_client *mdsc,
1030                                struct ceph_mds_session *s)
1031 {
1032         dout("__unregister_session mds%d %p\n", s->s_mds, s);
1033         BUG_ON(mdsc->sessions[s->s_mds] != s);
1034         mdsc->sessions[s->s_mds] = NULL;
1035         ceph_con_close(&s->s_con);
1036         ceph_put_mds_session(s);
1037         atomic_dec(&mdsc->num_sessions);
1038 }
1039
1040 /*
1041  * drop session refs in request.
1042  *
1043  * should be last request ref, or hold mdsc->mutex
1044  */
1045 static void put_request_session(struct ceph_mds_request *req)
1046 {
1047         if (req->r_session) {
1048                 ceph_put_mds_session(req->r_session);
1049                 req->r_session = NULL;
1050         }
1051 }
1052
1053 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1054                                 void (*cb)(struct ceph_mds_session *),
1055                                 bool check_state)
1056 {
1057         int mds;
1058
1059         mutex_lock(&mdsc->mutex);
1060         for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1061                 struct ceph_mds_session *s;
1062
1063                 s = __ceph_lookup_mds_session(mdsc, mds);
1064                 if (!s)
1065                         continue;
1066
1067                 if (check_state && !check_session_state(s)) {
1068                         ceph_put_mds_session(s);
1069                         continue;
1070                 }
1071
1072                 mutex_unlock(&mdsc->mutex);
1073                 cb(s);
1074                 ceph_put_mds_session(s);
1075                 mutex_lock(&mdsc->mutex);
1076         }
1077         mutex_unlock(&mdsc->mutex);
1078 }
1079
1080 void ceph_mdsc_release_request(struct kref *kref)
1081 {
1082         struct ceph_mds_request *req = container_of(kref,
1083                                                     struct ceph_mds_request,
1084                                                     r_kref);
1085         ceph_mdsc_release_dir_caps_no_check(req);
1086         destroy_reply_info(&req->r_reply_info);
1087         if (req->r_request)
1088                 ceph_msg_put(req->r_request);
1089         if (req->r_reply)
1090                 ceph_msg_put(req->r_reply);
1091         if (req->r_inode) {
1092                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1093                 iput(req->r_inode);
1094         }
1095         if (req->r_parent) {
1096                 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1097                 iput(req->r_parent);
1098         }
1099         iput(req->r_target_inode);
1100         iput(req->r_new_inode);
1101         if (req->r_dentry)
1102                 dput(req->r_dentry);
1103         if (req->r_old_dentry)
1104                 dput(req->r_old_dentry);
1105         if (req->r_old_dentry_dir) {
1106                 /*
1107                  * track (and drop pins for) r_old_dentry_dir
1108                  * separately, since r_old_dentry's d_parent may have
1109                  * changed between the dir mutex being dropped and
1110                  * this request being freed.
1111                  */
1112                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1113                                   CEPH_CAP_PIN);
1114                 iput(req->r_old_dentry_dir);
1115         }
1116         kfree(req->r_path1);
1117         kfree(req->r_path2);
1118         put_cred(req->r_cred);
1119         if (req->r_pagelist)
1120                 ceph_pagelist_release(req->r_pagelist);
1121         kfree(req->r_fscrypt_auth);
1122         kfree(req->r_altname);
1123         put_request_session(req);
1124         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1125         WARN_ON_ONCE(!list_empty(&req->r_wait));
1126         kmem_cache_free(ceph_mds_request_cachep, req);
1127 }
1128
1129 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1130
1131 /*
1132  * lookup session, bump ref if found.
1133  *
1134  * called under mdsc->mutex.
1135  */
1136 static struct ceph_mds_request *
1137 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1138 {
1139         struct ceph_mds_request *req;
1140
1141         req = lookup_request(&mdsc->request_tree, tid);
1142         if (req)
1143                 ceph_mdsc_get_request(req);
1144
1145         return req;
1146 }
1147
1148 /*
1149  * Register an in-flight request, and assign a tid.  Link to directory
1150  * are modifying (if any).
1151  *
1152  * Called under mdsc->mutex.
1153  */
1154 static void __register_request(struct ceph_mds_client *mdsc,
1155                                struct ceph_mds_request *req,
1156                                struct inode *dir)
1157 {
1158         int ret = 0;
1159
1160         req->r_tid = ++mdsc->last_tid;
1161         if (req->r_num_caps) {
1162                 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1163                                         req->r_num_caps);
1164                 if (ret < 0) {
1165                         pr_err("__register_request %p "
1166                                "failed to reserve caps: %d\n", req, ret);
1167                         /* set req->r_err to fail early from __do_request */
1168                         req->r_err = ret;
1169                         return;
1170                 }
1171         }
1172         dout("__register_request %p tid %lld\n", req, req->r_tid);
1173         ceph_mdsc_get_request(req);
1174         insert_request(&mdsc->request_tree, req);
1175
1176         req->r_cred = get_current_cred();
1177
1178         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1179                 mdsc->oldest_tid = req->r_tid;
1180
1181         if (dir) {
1182                 struct ceph_inode_info *ci = ceph_inode(dir);
1183
1184                 ihold(dir);
1185                 req->r_unsafe_dir = dir;
1186                 spin_lock(&ci->i_unsafe_lock);
1187                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1188                 spin_unlock(&ci->i_unsafe_lock);
1189         }
1190 }
1191
1192 static void __unregister_request(struct ceph_mds_client *mdsc,
1193                                  struct ceph_mds_request *req)
1194 {
1195         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1196
1197         /* Never leave an unregistered request on an unsafe list! */
1198         list_del_init(&req->r_unsafe_item);
1199
1200         if (req->r_tid == mdsc->oldest_tid) {
1201                 struct rb_node *p = rb_next(&req->r_node);
1202                 mdsc->oldest_tid = 0;
1203                 while (p) {
1204                         struct ceph_mds_request *next_req =
1205                                 rb_entry(p, struct ceph_mds_request, r_node);
1206                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1207                                 mdsc->oldest_tid = next_req->r_tid;
1208                                 break;
1209                         }
1210                         p = rb_next(p);
1211                 }
1212         }
1213
1214         erase_request(&mdsc->request_tree, req);
1215
1216         if (req->r_unsafe_dir) {
1217                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1218                 spin_lock(&ci->i_unsafe_lock);
1219                 list_del_init(&req->r_unsafe_dir_item);
1220                 spin_unlock(&ci->i_unsafe_lock);
1221         }
1222         if (req->r_target_inode &&
1223             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1224                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1225                 spin_lock(&ci->i_unsafe_lock);
1226                 list_del_init(&req->r_unsafe_target_item);
1227                 spin_unlock(&ci->i_unsafe_lock);
1228         }
1229
1230         if (req->r_unsafe_dir) {
1231                 iput(req->r_unsafe_dir);
1232                 req->r_unsafe_dir = NULL;
1233         }
1234
1235         complete_all(&req->r_safe_completion);
1236
1237         ceph_mdsc_put_request(req);
1238 }
1239
1240 /*
1241  * Walk back up the dentry tree until we hit a dentry representing a
1242  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1243  * when calling this) to ensure that the objects won't disappear while we're
1244  * working with them. Once we hit a candidate dentry, we attempt to take a
1245  * reference to it, and return that as the result.
1246  */
1247 static struct inode *get_nonsnap_parent(struct dentry *dentry)
1248 {
1249         struct inode *inode = NULL;
1250
1251         while (dentry && !IS_ROOT(dentry)) {
1252                 inode = d_inode_rcu(dentry);
1253                 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1254                         break;
1255                 dentry = dentry->d_parent;
1256         }
1257         if (inode)
1258                 inode = igrab(inode);
1259         return inode;
1260 }
1261
1262 /*
1263  * Choose mds to send request to next.  If there is a hint set in the
1264  * request (e.g., due to a prior forward hint from the mds), use that.
1265  * Otherwise, consult frag tree and/or caps to identify the
1266  * appropriate mds.  If all else fails, choose randomly.
1267  *
1268  * Called under mdsc->mutex.
1269  */
1270 static int __choose_mds(struct ceph_mds_client *mdsc,
1271                         struct ceph_mds_request *req,
1272                         bool *random)
1273 {
1274         struct inode *inode;
1275         struct ceph_inode_info *ci;
1276         struct ceph_cap *cap;
1277         int mode = req->r_direct_mode;
1278         int mds = -1;
1279         u32 hash = req->r_direct_hash;
1280         bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1281
1282         if (random)
1283                 *random = false;
1284
1285         /*
1286          * is there a specific mds we should try?  ignore hint if we have
1287          * no session and the mds is not up (active or recovering).
1288          */
1289         if (req->r_resend_mds >= 0 &&
1290             (__have_session(mdsc, req->r_resend_mds) ||
1291              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1292                 dout("%s using resend_mds mds%d\n", __func__,
1293                      req->r_resend_mds);
1294                 return req->r_resend_mds;
1295         }
1296
1297         if (mode == USE_RANDOM_MDS)
1298                 goto random;
1299
1300         inode = NULL;
1301         if (req->r_inode) {
1302                 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1303                         inode = req->r_inode;
1304                         ihold(inode);
1305                 } else {
1306                         /* req->r_dentry is non-null for LSSNAP request */
1307                         rcu_read_lock();
1308                         inode = get_nonsnap_parent(req->r_dentry);
1309                         rcu_read_unlock();
1310                         dout("%s using snapdir's parent %p\n", __func__, inode);
1311                 }
1312         } else if (req->r_dentry) {
1313                 /* ignore race with rename; old or new d_parent is okay */
1314                 struct dentry *parent;
1315                 struct inode *dir;
1316
1317                 rcu_read_lock();
1318                 parent = READ_ONCE(req->r_dentry->d_parent);
1319                 dir = req->r_parent ? : d_inode_rcu(parent);
1320
1321                 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1322                         /*  not this fs or parent went negative */
1323                         inode = d_inode(req->r_dentry);
1324                         if (inode)
1325                                 ihold(inode);
1326                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1327                         /* direct snapped/virtual snapdir requests
1328                          * based on parent dir inode */
1329                         inode = get_nonsnap_parent(parent);
1330                         dout("%s using nonsnap parent %p\n", __func__, inode);
1331                 } else {
1332                         /* dentry target */
1333                         inode = d_inode(req->r_dentry);
1334                         if (!inode || mode == USE_AUTH_MDS) {
1335                                 /* dir + name */
1336                                 inode = igrab(dir);
1337                                 hash = ceph_dentry_hash(dir, req->r_dentry);
1338                                 is_hash = true;
1339                         } else {
1340                                 ihold(inode);
1341                         }
1342                 }
1343                 rcu_read_unlock();
1344         }
1345
1346         dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1347              hash, mode);
1348         if (!inode)
1349                 goto random;
1350         ci = ceph_inode(inode);
1351
1352         if (is_hash && S_ISDIR(inode->i_mode)) {
1353                 struct ceph_inode_frag frag;
1354                 int found;
1355
1356                 ceph_choose_frag(ci, hash, &frag, &found);
1357                 if (found) {
1358                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
1359                                 u8 r;
1360
1361                                 /* choose a random replica */
1362                                 get_random_bytes(&r, 1);
1363                                 r %= frag.ndist;
1364                                 mds = frag.dist[r];
1365                                 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1366                                      __func__, inode, ceph_vinop(inode),
1367                                      frag.frag, mds, (int)r, frag.ndist);
1368                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1369                                     CEPH_MDS_STATE_ACTIVE &&
1370                                     !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1371                                         goto out;
1372                         }
1373
1374                         /* since this file/dir wasn't known to be
1375                          * replicated, then we want to look for the
1376                          * authoritative mds. */
1377                         if (frag.mds >= 0) {
1378                                 /* choose auth mds */
1379                                 mds = frag.mds;
1380                                 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1381                                      __func__, inode, ceph_vinop(inode),
1382                                      frag.frag, mds);
1383                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1384                                     CEPH_MDS_STATE_ACTIVE) {
1385                                         if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1386                                                                   mds))
1387                                                 goto out;
1388                                 }
1389                         }
1390                         mode = USE_AUTH_MDS;
1391                 }
1392         }
1393
1394         spin_lock(&ci->i_ceph_lock);
1395         cap = NULL;
1396         if (mode == USE_AUTH_MDS)
1397                 cap = ci->i_auth_cap;
1398         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1399                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1400         if (!cap) {
1401                 spin_unlock(&ci->i_ceph_lock);
1402                 iput(inode);
1403                 goto random;
1404         }
1405         mds = cap->session->s_mds;
1406         dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1407              inode, ceph_vinop(inode), mds,
1408              cap == ci->i_auth_cap ? "auth " : "", cap);
1409         spin_unlock(&ci->i_ceph_lock);
1410 out:
1411         iput(inode);
1412         return mds;
1413
1414 random:
1415         if (random)
1416                 *random = true;
1417
1418         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1419         dout("%s chose random mds%d\n", __func__, mds);
1420         return mds;
1421 }
1422
1423
1424 /*
1425  * session messages
1426  */
1427 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1428 {
1429         struct ceph_msg *msg;
1430         struct ceph_mds_session_head *h;
1431
1432         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1433                            false);
1434         if (!msg) {
1435                 pr_err("ENOMEM creating session %s msg\n",
1436                        ceph_session_op_name(op));
1437                 return NULL;
1438         }
1439         h = msg->front.iov_base;
1440         h->op = cpu_to_le32(op);
1441         h->seq = cpu_to_le64(seq);
1442
1443         return msg;
1444 }
1445
1446 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1447 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1448 static int encode_supported_features(void **p, void *end)
1449 {
1450         static const size_t count = ARRAY_SIZE(feature_bits);
1451
1452         if (count > 0) {
1453                 size_t i;
1454                 size_t size = FEATURE_BYTES(count);
1455                 unsigned long bit;
1456
1457                 if (WARN_ON_ONCE(*p + 4 + size > end))
1458                         return -ERANGE;
1459
1460                 ceph_encode_32(p, size);
1461                 memset(*p, 0, size);
1462                 for (i = 0; i < count; i++) {
1463                         bit = feature_bits[i];
1464                         ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1465                 }
1466                 *p += size;
1467         } else {
1468                 if (WARN_ON_ONCE(*p + 4 > end))
1469                         return -ERANGE;
1470
1471                 ceph_encode_32(p, 0);
1472         }
1473
1474         return 0;
1475 }
1476
1477 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1478 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1479 static int encode_metric_spec(void **p, void *end)
1480 {
1481         static const size_t count = ARRAY_SIZE(metric_bits);
1482
1483         /* header */
1484         if (WARN_ON_ONCE(*p + 2 > end))
1485                 return -ERANGE;
1486
1487         ceph_encode_8(p, 1); /* version */
1488         ceph_encode_8(p, 1); /* compat */
1489
1490         if (count > 0) {
1491                 size_t i;
1492                 size_t size = METRIC_BYTES(count);
1493
1494                 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1495                         return -ERANGE;
1496
1497                 /* metric spec info length */
1498                 ceph_encode_32(p, 4 + size);
1499
1500                 /* metric spec */
1501                 ceph_encode_32(p, size);
1502                 memset(*p, 0, size);
1503                 for (i = 0; i < count; i++)
1504                         ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1505                 *p += size;
1506         } else {
1507                 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1508                         return -ERANGE;
1509
1510                 /* metric spec info length */
1511                 ceph_encode_32(p, 4);
1512                 /* metric spec */
1513                 ceph_encode_32(p, 0);
1514         }
1515
1516         return 0;
1517 }
1518
1519 /*
1520  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1521  * to include additional client metadata fields.
1522  */
1523 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1524 {
1525         struct ceph_msg *msg;
1526         struct ceph_mds_session_head *h;
1527         int i;
1528         int extra_bytes = 0;
1529         int metadata_key_count = 0;
1530         struct ceph_options *opt = mdsc->fsc->client->options;
1531         struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1532         size_t size, count;
1533         void *p, *end;
1534         int ret;
1535
1536         const char* metadata[][2] = {
1537                 {"hostname", mdsc->nodename},
1538                 {"kernel_version", init_utsname()->release},
1539                 {"entity_id", opt->name ? : ""},
1540                 {"root", fsopt->server_path ? : "/"},
1541                 {NULL, NULL}
1542         };
1543
1544         /* Calculate serialized length of metadata */
1545         extra_bytes = 4;  /* map length */
1546         for (i = 0; metadata[i][0]; ++i) {
1547                 extra_bytes += 8 + strlen(metadata[i][0]) +
1548                         strlen(metadata[i][1]);
1549                 metadata_key_count++;
1550         }
1551
1552         /* supported feature */
1553         size = 0;
1554         count = ARRAY_SIZE(feature_bits);
1555         if (count > 0)
1556                 size = FEATURE_BYTES(count);
1557         extra_bytes += 4 + size;
1558
1559         /* metric spec */
1560         size = 0;
1561         count = ARRAY_SIZE(metric_bits);
1562         if (count > 0)
1563                 size = METRIC_BYTES(count);
1564         extra_bytes += 2 + 4 + 4 + size;
1565
1566         /* Allocate the message */
1567         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1568                            GFP_NOFS, false);
1569         if (!msg) {
1570                 pr_err("ENOMEM creating session open msg\n");
1571                 return ERR_PTR(-ENOMEM);
1572         }
1573         p = msg->front.iov_base;
1574         end = p + msg->front.iov_len;
1575
1576         h = p;
1577         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1578         h->seq = cpu_to_le64(seq);
1579
1580         /*
1581          * Serialize client metadata into waiting buffer space, using
1582          * the format that userspace expects for map<string, string>
1583          *
1584          * ClientSession messages with metadata are v4
1585          */
1586         msg->hdr.version = cpu_to_le16(4);
1587         msg->hdr.compat_version = cpu_to_le16(1);
1588
1589         /* The write pointer, following the session_head structure */
1590         p += sizeof(*h);
1591
1592         /* Number of entries in the map */
1593         ceph_encode_32(&p, metadata_key_count);
1594
1595         /* Two length-prefixed strings for each entry in the map */
1596         for (i = 0; metadata[i][0]; ++i) {
1597                 size_t const key_len = strlen(metadata[i][0]);
1598                 size_t const val_len = strlen(metadata[i][1]);
1599
1600                 ceph_encode_32(&p, key_len);
1601                 memcpy(p, metadata[i][0], key_len);
1602                 p += key_len;
1603                 ceph_encode_32(&p, val_len);
1604                 memcpy(p, metadata[i][1], val_len);
1605                 p += val_len;
1606         }
1607
1608         ret = encode_supported_features(&p, end);
1609         if (ret) {
1610                 pr_err("encode_supported_features failed!\n");
1611                 ceph_msg_put(msg);
1612                 return ERR_PTR(ret);
1613         }
1614
1615         ret = encode_metric_spec(&p, end);
1616         if (ret) {
1617                 pr_err("encode_metric_spec failed!\n");
1618                 ceph_msg_put(msg);
1619                 return ERR_PTR(ret);
1620         }
1621
1622         msg->front.iov_len = p - msg->front.iov_base;
1623         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1624
1625         return msg;
1626 }
1627
1628 /*
1629  * send session open request.
1630  *
1631  * called under mdsc->mutex
1632  */
1633 static int __open_session(struct ceph_mds_client *mdsc,
1634                           struct ceph_mds_session *session)
1635 {
1636         struct ceph_msg *msg;
1637         int mstate;
1638         int mds = session->s_mds;
1639
1640         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1641                 return -EIO;
1642
1643         /* wait for mds to go active? */
1644         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1645         dout("open_session to mds%d (%s)\n", mds,
1646              ceph_mds_state_name(mstate));
1647         session->s_state = CEPH_MDS_SESSION_OPENING;
1648         session->s_renew_requested = jiffies;
1649
1650         /* send connect message */
1651         msg = create_session_open_msg(mdsc, session->s_seq);
1652         if (IS_ERR(msg))
1653                 return PTR_ERR(msg);
1654         ceph_con_send(&session->s_con, msg);
1655         return 0;
1656 }
1657
1658 /*
1659  * open sessions for any export targets for the given mds
1660  *
1661  * called under mdsc->mutex
1662  */
1663 static struct ceph_mds_session *
1664 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1665 {
1666         struct ceph_mds_session *session;
1667         int ret;
1668
1669         session = __ceph_lookup_mds_session(mdsc, target);
1670         if (!session) {
1671                 session = register_session(mdsc, target);
1672                 if (IS_ERR(session))
1673                         return session;
1674         }
1675         if (session->s_state == CEPH_MDS_SESSION_NEW ||
1676             session->s_state == CEPH_MDS_SESSION_CLOSING) {
1677                 ret = __open_session(mdsc, session);
1678                 if (ret)
1679                         return ERR_PTR(ret);
1680         }
1681
1682         return session;
1683 }
1684
1685 struct ceph_mds_session *
1686 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1687 {
1688         struct ceph_mds_session *session;
1689
1690         dout("open_export_target_session to mds%d\n", target);
1691
1692         mutex_lock(&mdsc->mutex);
1693         session = __open_export_target_session(mdsc, target);
1694         mutex_unlock(&mdsc->mutex);
1695
1696         return session;
1697 }
1698
1699 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1700                                           struct ceph_mds_session *session)
1701 {
1702         struct ceph_mds_info *mi;
1703         struct ceph_mds_session *ts;
1704         int i, mds = session->s_mds;
1705
1706         if (mds >= mdsc->mdsmap->possible_max_rank)
1707                 return;
1708
1709         mi = &mdsc->mdsmap->m_info[mds];
1710         dout("open_export_target_sessions for mds%d (%d targets)\n",
1711              session->s_mds, mi->num_export_targets);
1712
1713         for (i = 0; i < mi->num_export_targets; i++) {
1714                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1715                 ceph_put_mds_session(ts);
1716         }
1717 }
1718
1719 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1720                                            struct ceph_mds_session *session)
1721 {
1722         mutex_lock(&mdsc->mutex);
1723         __open_export_target_sessions(mdsc, session);
1724         mutex_unlock(&mdsc->mutex);
1725 }
1726
1727 /*
1728  * session caps
1729  */
1730
1731 static void detach_cap_releases(struct ceph_mds_session *session,
1732                                 struct list_head *target)
1733 {
1734         lockdep_assert_held(&session->s_cap_lock);
1735
1736         list_splice_init(&session->s_cap_releases, target);
1737         session->s_num_cap_releases = 0;
1738         dout("dispose_cap_releases mds%d\n", session->s_mds);
1739 }
1740
1741 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1742                                  struct list_head *dispose)
1743 {
1744         while (!list_empty(dispose)) {
1745                 struct ceph_cap *cap;
1746                 /* zero out the in-progress message */
1747                 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1748                 list_del(&cap->session_caps);
1749                 ceph_put_cap(mdsc, cap);
1750         }
1751 }
1752
1753 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1754                                      struct ceph_mds_session *session)
1755 {
1756         struct ceph_mds_request *req;
1757         struct rb_node *p;
1758
1759         dout("cleanup_session_requests mds%d\n", session->s_mds);
1760         mutex_lock(&mdsc->mutex);
1761         while (!list_empty(&session->s_unsafe)) {
1762                 req = list_first_entry(&session->s_unsafe,
1763                                        struct ceph_mds_request, r_unsafe_item);
1764                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1765                                     req->r_tid);
1766                 if (req->r_target_inode)
1767                         mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1768                 if (req->r_unsafe_dir)
1769                         mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1770                 __unregister_request(mdsc, req);
1771         }
1772         /* zero r_attempts, so kick_requests() will re-send requests */
1773         p = rb_first(&mdsc->request_tree);
1774         while (p) {
1775                 req = rb_entry(p, struct ceph_mds_request, r_node);
1776                 p = rb_next(p);
1777                 if (req->r_session &&
1778                     req->r_session->s_mds == session->s_mds)
1779                         req->r_attempts = 0;
1780         }
1781         mutex_unlock(&mdsc->mutex);
1782 }
1783
1784 /*
1785  * Helper to safely iterate over all caps associated with a session, with
1786  * special care taken to handle a racing __ceph_remove_cap().
1787  *
1788  * Caller must hold session s_mutex.
1789  */
1790 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1791                               int (*cb)(struct inode *, int mds, void *),
1792                               void *arg)
1793 {
1794         struct list_head *p;
1795         struct ceph_cap *cap;
1796         struct inode *inode, *last_inode = NULL;
1797         struct ceph_cap *old_cap = NULL;
1798         int ret;
1799
1800         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1801         spin_lock(&session->s_cap_lock);
1802         p = session->s_caps.next;
1803         while (p != &session->s_caps) {
1804                 int mds;
1805
1806                 cap = list_entry(p, struct ceph_cap, session_caps);
1807                 inode = igrab(&cap->ci->netfs.inode);
1808                 if (!inode) {
1809                         p = p->next;
1810                         continue;
1811                 }
1812                 session->s_cap_iterator = cap;
1813                 mds = cap->mds;
1814                 spin_unlock(&session->s_cap_lock);
1815
1816                 if (last_inode) {
1817                         iput(last_inode);
1818                         last_inode = NULL;
1819                 }
1820                 if (old_cap) {
1821                         ceph_put_cap(session->s_mdsc, old_cap);
1822                         old_cap = NULL;
1823                 }
1824
1825                 ret = cb(inode, mds, arg);
1826                 last_inode = inode;
1827
1828                 spin_lock(&session->s_cap_lock);
1829                 p = p->next;
1830                 if (!cap->ci) {
1831                         dout("iterate_session_caps  finishing cap %p removal\n",
1832                              cap);
1833                         BUG_ON(cap->session != session);
1834                         cap->session = NULL;
1835                         list_del_init(&cap->session_caps);
1836                         session->s_nr_caps--;
1837                         atomic64_dec(&session->s_mdsc->metric.total_caps);
1838                         if (cap->queue_release)
1839                                 __ceph_queue_cap_release(session, cap);
1840                         else
1841                                 old_cap = cap;  /* put_cap it w/o locks held */
1842                 }
1843                 if (ret < 0)
1844                         goto out;
1845         }
1846         ret = 0;
1847 out:
1848         session->s_cap_iterator = NULL;
1849         spin_unlock(&session->s_cap_lock);
1850
1851         iput(last_inode);
1852         if (old_cap)
1853                 ceph_put_cap(session->s_mdsc, old_cap);
1854
1855         return ret;
1856 }
1857
1858 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1859 {
1860         struct ceph_inode_info *ci = ceph_inode(inode);
1861         bool invalidate = false;
1862         struct ceph_cap *cap;
1863         int iputs = 0;
1864
1865         spin_lock(&ci->i_ceph_lock);
1866         cap = __get_cap_for_mds(ci, mds);
1867         if (cap) {
1868                 dout(" removing cap %p, ci is %p, inode is %p\n",
1869                      cap, ci, &ci->netfs.inode);
1870
1871                 iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1872         }
1873         spin_unlock(&ci->i_ceph_lock);
1874
1875         if (cap)
1876                 wake_up_all(&ci->i_cap_wq);
1877         if (invalidate)
1878                 ceph_queue_invalidate(inode);
1879         while (iputs--)
1880                 iput(inode);
1881         return 0;
1882 }
1883
1884 /*
1885  * caller must hold session s_mutex
1886  */
1887 static void remove_session_caps(struct ceph_mds_session *session)
1888 {
1889         struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1890         struct super_block *sb = fsc->sb;
1891         LIST_HEAD(dispose);
1892
1893         dout("remove_session_caps on %p\n", session);
1894         ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1895
1896         wake_up_all(&fsc->mdsc->cap_flushing_wq);
1897
1898         spin_lock(&session->s_cap_lock);
1899         if (session->s_nr_caps > 0) {
1900                 struct inode *inode;
1901                 struct ceph_cap *cap, *prev = NULL;
1902                 struct ceph_vino vino;
1903                 /*
1904                  * iterate_session_caps() skips inodes that are being
1905                  * deleted, we need to wait until deletions are complete.
1906                  * __wait_on_freeing_inode() is designed for the job,
1907                  * but it is not exported, so use lookup inode function
1908                  * to access it.
1909                  */
1910                 while (!list_empty(&session->s_caps)) {
1911                         cap = list_entry(session->s_caps.next,
1912                                          struct ceph_cap, session_caps);
1913                         if (cap == prev)
1914                                 break;
1915                         prev = cap;
1916                         vino = cap->ci->i_vino;
1917                         spin_unlock(&session->s_cap_lock);
1918
1919                         inode = ceph_find_inode(sb, vino);
1920                         iput(inode);
1921
1922                         spin_lock(&session->s_cap_lock);
1923                 }
1924         }
1925
1926         // drop cap expires and unlock s_cap_lock
1927         detach_cap_releases(session, &dispose);
1928
1929         BUG_ON(session->s_nr_caps > 0);
1930         BUG_ON(!list_empty(&session->s_cap_flushing));
1931         spin_unlock(&session->s_cap_lock);
1932         dispose_cap_releases(session->s_mdsc, &dispose);
1933 }
1934
1935 enum {
1936         RECONNECT,
1937         RENEWCAPS,
1938         FORCE_RO,
1939 };
1940
1941 /*
1942  * wake up any threads waiting on this session's caps.  if the cap is
1943  * old (didn't get renewed on the client reconnect), remove it now.
1944  *
1945  * caller must hold s_mutex.
1946  */
1947 static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1948 {
1949         struct ceph_inode_info *ci = ceph_inode(inode);
1950         unsigned long ev = (unsigned long)arg;
1951
1952         if (ev == RECONNECT) {
1953                 spin_lock(&ci->i_ceph_lock);
1954                 ci->i_wanted_max_size = 0;
1955                 ci->i_requested_max_size = 0;
1956                 spin_unlock(&ci->i_ceph_lock);
1957         } else if (ev == RENEWCAPS) {
1958                 struct ceph_cap *cap;
1959
1960                 spin_lock(&ci->i_ceph_lock);
1961                 cap = __get_cap_for_mds(ci, mds);
1962                 /* mds did not re-issue stale cap */
1963                 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1964                         cap->issued = cap->implemented = CEPH_CAP_PIN;
1965                 spin_unlock(&ci->i_ceph_lock);
1966         } else if (ev == FORCE_RO) {
1967         }
1968         wake_up_all(&ci->i_cap_wq);
1969         return 0;
1970 }
1971
1972 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1973 {
1974         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1975         ceph_iterate_session_caps(session, wake_up_session_cb,
1976                                   (void *)(unsigned long)ev);
1977 }
1978
1979 /*
1980  * Send periodic message to MDS renewing all currently held caps.  The
1981  * ack will reset the expiration for all caps from this session.
1982  *
1983  * caller holds s_mutex
1984  */
1985 static int send_renew_caps(struct ceph_mds_client *mdsc,
1986                            struct ceph_mds_session *session)
1987 {
1988         struct ceph_msg *msg;
1989         int state;
1990
1991         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1992             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1993                 pr_info("mds%d caps stale\n", session->s_mds);
1994         session->s_renew_requested = jiffies;
1995
1996         /* do not try to renew caps until a recovering mds has reconnected
1997          * with its clients. */
1998         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1999         if (state < CEPH_MDS_STATE_RECONNECT) {
2000                 dout("send_renew_caps ignoring mds%d (%s)\n",
2001                      session->s_mds, ceph_mds_state_name(state));
2002                 return 0;
2003         }
2004
2005         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
2006                 ceph_mds_state_name(state));
2007         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
2008                                       ++session->s_renew_seq);
2009         if (!msg)
2010                 return -ENOMEM;
2011         ceph_con_send(&session->s_con, msg);
2012         return 0;
2013 }
2014
2015 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2016                              struct ceph_mds_session *session, u64 seq)
2017 {
2018         struct ceph_msg *msg;
2019
2020         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
2021              session->s_mds, ceph_session_state_name(session->s_state), seq);
2022         msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2023         if (!msg)
2024                 return -ENOMEM;
2025         ceph_con_send(&session->s_con, msg);
2026         return 0;
2027 }
2028
2029
2030 /*
2031  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
2032  *
2033  * Called under session->s_mutex
2034  */
2035 static void renewed_caps(struct ceph_mds_client *mdsc,
2036                          struct ceph_mds_session *session, int is_renew)
2037 {
2038         int was_stale;
2039         int wake = 0;
2040
2041         spin_lock(&session->s_cap_lock);
2042         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2043
2044         session->s_cap_ttl = session->s_renew_requested +
2045                 mdsc->mdsmap->m_session_timeout*HZ;
2046
2047         if (was_stale) {
2048                 if (time_before(jiffies, session->s_cap_ttl)) {
2049                         pr_info("mds%d caps renewed\n", session->s_mds);
2050                         wake = 1;
2051                 } else {
2052                         pr_info("mds%d caps still stale\n", session->s_mds);
2053                 }
2054         }
2055         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
2056              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
2057              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2058         spin_unlock(&session->s_cap_lock);
2059
2060         if (wake)
2061                 wake_up_session_caps(session, RENEWCAPS);
2062 }
2063
2064 /*
2065  * send a session close request
2066  */
2067 static int request_close_session(struct ceph_mds_session *session)
2068 {
2069         struct ceph_msg *msg;
2070
2071         dout("request_close_session mds%d state %s seq %lld\n",
2072              session->s_mds, ceph_session_state_name(session->s_state),
2073              session->s_seq);
2074         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2075                                       session->s_seq);
2076         if (!msg)
2077                 return -ENOMEM;
2078         ceph_con_send(&session->s_con, msg);
2079         return 1;
2080 }
2081
2082 /*
2083  * Called with s_mutex held.
2084  */
2085 static int __close_session(struct ceph_mds_client *mdsc,
2086                          struct ceph_mds_session *session)
2087 {
2088         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2089                 return 0;
2090         session->s_state = CEPH_MDS_SESSION_CLOSING;
2091         return request_close_session(session);
2092 }
2093
2094 static bool drop_negative_children(struct dentry *dentry)
2095 {
2096         struct dentry *child;
2097         bool all_negative = true;
2098
2099         if (!d_is_dir(dentry))
2100                 goto out;
2101
2102         spin_lock(&dentry->d_lock);
2103         list_for_each_entry(child, &dentry->d_subdirs, d_child) {
2104                 if (d_really_is_positive(child)) {
2105                         all_negative = false;
2106                         break;
2107                 }
2108         }
2109         spin_unlock(&dentry->d_lock);
2110
2111         if (all_negative)
2112                 shrink_dcache_parent(dentry);
2113 out:
2114         return all_negative;
2115 }
2116
2117 /*
2118  * Trim old(er) caps.
2119  *
2120  * Because we can't cache an inode without one or more caps, we do
2121  * this indirectly: if a cap is unused, we prune its aliases, at which
2122  * point the inode will hopefully get dropped to.
2123  *
2124  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
2125  * memory pressure from the MDS, though, so it needn't be perfect.
2126  */
2127 static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2128 {
2129         int *remaining = arg;
2130         struct ceph_inode_info *ci = ceph_inode(inode);
2131         int used, wanted, oissued, mine;
2132         struct ceph_cap *cap;
2133
2134         if (*remaining <= 0)
2135                 return -1;
2136
2137         spin_lock(&ci->i_ceph_lock);
2138         cap = __get_cap_for_mds(ci, mds);
2139         if (!cap) {
2140                 spin_unlock(&ci->i_ceph_lock);
2141                 return 0;
2142         }
2143         mine = cap->issued | cap->implemented;
2144         used = __ceph_caps_used(ci);
2145         wanted = __ceph_caps_file_wanted(ci);
2146         oissued = __ceph_caps_issued_other(ci, cap);
2147
2148         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2149              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
2150              ceph_cap_string(used), ceph_cap_string(wanted));
2151         if (cap == ci->i_auth_cap) {
2152                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
2153                     !list_empty(&ci->i_cap_snaps))
2154                         goto out;
2155                 if ((used | wanted) & CEPH_CAP_ANY_WR)
2156                         goto out;
2157                 /* Note: it's possible that i_filelock_ref becomes non-zero
2158                  * after dropping auth caps. It doesn't hurt because reply
2159                  * of lock mds request will re-add auth caps. */
2160                 if (atomic_read(&ci->i_filelock_ref) > 0)
2161                         goto out;
2162         }
2163         /* The inode has cached pages, but it's no longer used.
2164          * we can safely drop it */
2165         if (S_ISREG(inode->i_mode) &&
2166             wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2167             !(oissued & CEPH_CAP_FILE_CACHE)) {
2168           used = 0;
2169           oissued = 0;
2170         }
2171         if ((used | wanted) & ~oissued & mine)
2172                 goto out;   /* we need these caps */
2173
2174         if (oissued) {
2175                 /* we aren't the only cap.. just remove us */
2176                 ceph_remove_cap(cap, true);
2177                 (*remaining)--;
2178         } else {
2179                 struct dentry *dentry;
2180                 /* try dropping referring dentries */
2181                 spin_unlock(&ci->i_ceph_lock);
2182                 dentry = d_find_any_alias(inode);
2183                 if (dentry && drop_negative_children(dentry)) {
2184                         int count;
2185                         dput(dentry);
2186                         d_prune_aliases(inode);
2187                         count = atomic_read(&inode->i_count);
2188                         if (count == 1)
2189                                 (*remaining)--;
2190                         dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2191                              inode, cap, count);
2192                 } else {
2193                         dput(dentry);
2194                 }
2195                 return 0;
2196         }
2197
2198 out:
2199         spin_unlock(&ci->i_ceph_lock);
2200         return 0;
2201 }
2202
2203 /*
2204  * Trim session cap count down to some max number.
2205  */
2206 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2207                    struct ceph_mds_session *session,
2208                    int max_caps)
2209 {
2210         int trim_caps = session->s_nr_caps - max_caps;
2211
2212         dout("trim_caps mds%d start: %d / %d, trim %d\n",
2213              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2214         if (trim_caps > 0) {
2215                 int remaining = trim_caps;
2216
2217                 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2218                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2219                      session->s_mds, session->s_nr_caps, max_caps,
2220                         trim_caps - remaining);
2221         }
2222
2223         ceph_flush_cap_releases(mdsc, session);
2224         return 0;
2225 }
2226
2227 static int check_caps_flush(struct ceph_mds_client *mdsc,
2228                             u64 want_flush_tid)
2229 {
2230         int ret = 1;
2231
2232         spin_lock(&mdsc->cap_dirty_lock);
2233         if (!list_empty(&mdsc->cap_flush_list)) {
2234                 struct ceph_cap_flush *cf =
2235                         list_first_entry(&mdsc->cap_flush_list,
2236                                          struct ceph_cap_flush, g_list);
2237                 if (cf->tid <= want_flush_tid) {
2238                         dout("check_caps_flush still flushing tid "
2239                              "%llu <= %llu\n", cf->tid, want_flush_tid);
2240                         ret = 0;
2241                 }
2242         }
2243         spin_unlock(&mdsc->cap_dirty_lock);
2244         return ret;
2245 }
2246
2247 /*
2248  * flush all dirty inode data to disk.
2249  *
2250  * returns true if we've flushed through want_flush_tid
2251  */
2252 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2253                             u64 want_flush_tid)
2254 {
2255         dout("check_caps_flush want %llu\n", want_flush_tid);
2256
2257         wait_event(mdsc->cap_flushing_wq,
2258                    check_caps_flush(mdsc, want_flush_tid));
2259
2260         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2261 }
2262
2263 /*
2264  * called under s_mutex
2265  */
2266 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2267                                    struct ceph_mds_session *session)
2268 {
2269         struct ceph_msg *msg = NULL;
2270         struct ceph_mds_cap_release *head;
2271         struct ceph_mds_cap_item *item;
2272         struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2273         struct ceph_cap *cap;
2274         LIST_HEAD(tmp_list);
2275         int num_cap_releases;
2276         __le32  barrier, *cap_barrier;
2277
2278         down_read(&osdc->lock);
2279         barrier = cpu_to_le32(osdc->epoch_barrier);
2280         up_read(&osdc->lock);
2281
2282         spin_lock(&session->s_cap_lock);
2283 again:
2284         list_splice_init(&session->s_cap_releases, &tmp_list);
2285         num_cap_releases = session->s_num_cap_releases;
2286         session->s_num_cap_releases = 0;
2287         spin_unlock(&session->s_cap_lock);
2288
2289         while (!list_empty(&tmp_list)) {
2290                 if (!msg) {
2291                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2292                                         PAGE_SIZE, GFP_NOFS, false);
2293                         if (!msg)
2294                                 goto out_err;
2295                         head = msg->front.iov_base;
2296                         head->num = cpu_to_le32(0);
2297                         msg->front.iov_len = sizeof(*head);
2298
2299                         msg->hdr.version = cpu_to_le16(2);
2300                         msg->hdr.compat_version = cpu_to_le16(1);
2301                 }
2302
2303                 cap = list_first_entry(&tmp_list, struct ceph_cap,
2304                                         session_caps);
2305                 list_del(&cap->session_caps);
2306                 num_cap_releases--;
2307
2308                 head = msg->front.iov_base;
2309                 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2310                                    &head->num);
2311                 item = msg->front.iov_base + msg->front.iov_len;
2312                 item->ino = cpu_to_le64(cap->cap_ino);
2313                 item->cap_id = cpu_to_le64(cap->cap_id);
2314                 item->migrate_seq = cpu_to_le32(cap->mseq);
2315                 item->seq = cpu_to_le32(cap->issue_seq);
2316                 msg->front.iov_len += sizeof(*item);
2317
2318                 ceph_put_cap(mdsc, cap);
2319
2320                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2321                         // Append cap_barrier field
2322                         cap_barrier = msg->front.iov_base + msg->front.iov_len;
2323                         *cap_barrier = barrier;
2324                         msg->front.iov_len += sizeof(*cap_barrier);
2325
2326                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2327                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2328                         ceph_con_send(&session->s_con, msg);
2329                         msg = NULL;
2330                 }
2331         }
2332
2333         BUG_ON(num_cap_releases != 0);
2334
2335         spin_lock(&session->s_cap_lock);
2336         if (!list_empty(&session->s_cap_releases))
2337                 goto again;
2338         spin_unlock(&session->s_cap_lock);
2339
2340         if (msg) {
2341                 // Append cap_barrier field
2342                 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2343                 *cap_barrier = barrier;
2344                 msg->front.iov_len += sizeof(*cap_barrier);
2345
2346                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2347                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2348                 ceph_con_send(&session->s_con, msg);
2349         }
2350         return;
2351 out_err:
2352         pr_err("send_cap_releases mds%d, failed to allocate message\n",
2353                 session->s_mds);
2354         spin_lock(&session->s_cap_lock);
2355         list_splice(&tmp_list, &session->s_cap_releases);
2356         session->s_num_cap_releases += num_cap_releases;
2357         spin_unlock(&session->s_cap_lock);
2358 }
2359
2360 static void ceph_cap_release_work(struct work_struct *work)
2361 {
2362         struct ceph_mds_session *session =
2363                 container_of(work, struct ceph_mds_session, s_cap_release_work);
2364
2365         mutex_lock(&session->s_mutex);
2366         if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2367             session->s_state == CEPH_MDS_SESSION_HUNG)
2368                 ceph_send_cap_releases(session->s_mdsc, session);
2369         mutex_unlock(&session->s_mutex);
2370         ceph_put_mds_session(session);
2371 }
2372
2373 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2374                              struct ceph_mds_session *session)
2375 {
2376         if (mdsc->stopping)
2377                 return;
2378
2379         ceph_get_mds_session(session);
2380         if (queue_work(mdsc->fsc->cap_wq,
2381                        &session->s_cap_release_work)) {
2382                 dout("cap release work queued\n");
2383         } else {
2384                 ceph_put_mds_session(session);
2385                 dout("failed to queue cap release work\n");
2386         }
2387 }
2388
2389 /*
2390  * caller holds session->s_cap_lock
2391  */
2392 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2393                               struct ceph_cap *cap)
2394 {
2395         list_add_tail(&cap->session_caps, &session->s_cap_releases);
2396         session->s_num_cap_releases++;
2397
2398         if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2399                 ceph_flush_cap_releases(session->s_mdsc, session);
2400 }
2401
2402 static void ceph_cap_reclaim_work(struct work_struct *work)
2403 {
2404         struct ceph_mds_client *mdsc =
2405                 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2406         int ret = ceph_trim_dentries(mdsc);
2407         if (ret == -EAGAIN)
2408                 ceph_queue_cap_reclaim_work(mdsc);
2409 }
2410
2411 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2412 {
2413         if (mdsc->stopping)
2414                 return;
2415
2416         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2417                 dout("caps reclaim work queued\n");
2418         } else {
2419                 dout("failed to queue caps release work\n");
2420         }
2421 }
2422
2423 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2424 {
2425         int val;
2426         if (!nr)
2427                 return;
2428         val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2429         if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2430                 atomic_set(&mdsc->cap_reclaim_pending, 0);
2431                 ceph_queue_cap_reclaim_work(mdsc);
2432         }
2433 }
2434
2435 /*
2436  * requests
2437  */
2438
2439 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2440                                     struct inode *dir)
2441 {
2442         struct ceph_inode_info *ci = ceph_inode(dir);
2443         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2444         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2445         size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2446         unsigned int num_entries;
2447         int order;
2448
2449         spin_lock(&ci->i_ceph_lock);
2450         num_entries = ci->i_files + ci->i_subdirs;
2451         spin_unlock(&ci->i_ceph_lock);
2452         num_entries = max(num_entries, 1U);
2453         num_entries = min(num_entries, opt->max_readdir);
2454
2455         order = get_order(size * num_entries);
2456         while (order >= 0) {
2457                 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2458                                                              __GFP_NOWARN |
2459                                                              __GFP_ZERO,
2460                                                              order);
2461                 if (rinfo->dir_entries)
2462                         break;
2463                 order--;
2464         }
2465         if (!rinfo->dir_entries)
2466                 return -ENOMEM;
2467
2468         num_entries = (PAGE_SIZE << order) / size;
2469         num_entries = min(num_entries, opt->max_readdir);
2470
2471         rinfo->dir_buf_size = PAGE_SIZE << order;
2472         req->r_num_caps = num_entries + 1;
2473         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2474         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2475         return 0;
2476 }
2477
2478 /*
2479  * Create an mds request.
2480  */
2481 struct ceph_mds_request *
2482 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2483 {
2484         struct ceph_mds_request *req;
2485
2486         req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2487         if (!req)
2488                 return ERR_PTR(-ENOMEM);
2489
2490         mutex_init(&req->r_fill_mutex);
2491         req->r_mdsc = mdsc;
2492         req->r_started = jiffies;
2493         req->r_start_latency = ktime_get();
2494         req->r_resend_mds = -1;
2495         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2496         INIT_LIST_HEAD(&req->r_unsafe_target_item);
2497         req->r_fmode = -1;
2498         req->r_feature_needed = -1;
2499         kref_init(&req->r_kref);
2500         RB_CLEAR_NODE(&req->r_node);
2501         INIT_LIST_HEAD(&req->r_wait);
2502         init_completion(&req->r_completion);
2503         init_completion(&req->r_safe_completion);
2504         INIT_LIST_HEAD(&req->r_unsafe_item);
2505
2506         ktime_get_coarse_real_ts64(&req->r_stamp);
2507
2508         req->r_op = op;
2509         req->r_direct_mode = mode;
2510         return req;
2511 }
2512
2513 /*
2514  * return oldest (lowest) request, tid in request tree, 0 if none.
2515  *
2516  * called under mdsc->mutex.
2517  */
2518 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2519 {
2520         if (RB_EMPTY_ROOT(&mdsc->request_tree))
2521                 return NULL;
2522         return rb_entry(rb_first(&mdsc->request_tree),
2523                         struct ceph_mds_request, r_node);
2524 }
2525
2526 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2527 {
2528         return mdsc->oldest_tid;
2529 }
2530
2531 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
2532 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2533 {
2534         struct inode *dir = req->r_parent;
2535         struct dentry *dentry = req->r_dentry;
2536         u8 *cryptbuf = NULL;
2537         u32 len = 0;
2538         int ret = 0;
2539
2540         /* only encode if we have parent and dentry */
2541         if (!dir || !dentry)
2542                 goto success;
2543
2544         /* No-op unless this is encrypted */
2545         if (!IS_ENCRYPTED(dir))
2546                 goto success;
2547
2548         ret = ceph_fscrypt_prepare_readdir(dir);
2549         if (ret < 0)
2550                 return ERR_PTR(ret);
2551
2552         /* No key? Just ignore it. */
2553         if (!fscrypt_has_encryption_key(dir))
2554                 goto success;
2555
2556         if (!fscrypt_fname_encrypted_size(dir, dentry->d_name.len, NAME_MAX,
2557                                           &len)) {
2558                 WARN_ON_ONCE(1);
2559                 return ERR_PTR(-ENAMETOOLONG);
2560         }
2561
2562         /* No need to append altname if name is short enough */
2563         if (len <= CEPH_NOHASH_NAME_MAX) {
2564                 len = 0;
2565                 goto success;
2566         }
2567
2568         cryptbuf = kmalloc(len, GFP_KERNEL);
2569         if (!cryptbuf)
2570                 return ERR_PTR(-ENOMEM);
2571
2572         ret = fscrypt_fname_encrypt(dir, &dentry->d_name, cryptbuf, len);
2573         if (ret) {
2574                 kfree(cryptbuf);
2575                 return ERR_PTR(ret);
2576         }
2577 success:
2578         *plen = len;
2579         return cryptbuf;
2580 }
2581 #else
2582 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2583 {
2584         *plen = 0;
2585         return NULL;
2586 }
2587 #endif
2588
2589 /**
2590  * ceph_mdsc_build_path - build a path string to a given dentry
2591  * @dentry: dentry to which path should be built
2592  * @plen: returned length of string
2593  * @pbase: returned base inode number
2594  * @for_wire: is this path going to be sent to the MDS?
2595  *
2596  * Build a string that represents the path to the dentry. This is mostly called
2597  * for two different purposes:
2598  *
2599  * 1) we need to build a path string to send to the MDS (for_wire == true)
2600  * 2) we need a path string for local presentation (e.g. debugfs)
2601  *    (for_wire == false)
2602  *
2603  * The path is built in reverse, starting with the dentry. Walk back up toward
2604  * the root, building the path until the first non-snapped inode is reached
2605  * (for_wire) or the root inode is reached (!for_wire).
2606  *
2607  * Encode hidden .snap dirs as a double /, i.e.
2608  *   foo/.snap/bar -> foo//bar
2609  */
2610 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2611                            int for_wire)
2612 {
2613         struct dentry *cur;
2614         struct inode *inode;
2615         char *path;
2616         int pos;
2617         unsigned seq;
2618         u64 base;
2619
2620         if (!dentry)
2621                 return ERR_PTR(-EINVAL);
2622
2623         path = __getname();
2624         if (!path)
2625                 return ERR_PTR(-ENOMEM);
2626 retry:
2627         pos = PATH_MAX - 1;
2628         path[pos] = '\0';
2629
2630         seq = read_seqbegin(&rename_lock);
2631         cur = dget(dentry);
2632         for (;;) {
2633                 struct dentry *parent;
2634
2635                 spin_lock(&cur->d_lock);
2636                 inode = d_inode(cur);
2637                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2638                         dout("build_path path+%d: %p SNAPDIR\n",
2639                              pos, cur);
2640                         spin_unlock(&cur->d_lock);
2641                         parent = dget_parent(cur);
2642                 } else if (for_wire && inode && dentry != cur &&
2643                            ceph_snap(inode) == CEPH_NOSNAP) {
2644                         spin_unlock(&cur->d_lock);
2645                         pos++; /* get rid of any prepended '/' */
2646                         break;
2647                 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2648                         pos -= cur->d_name.len;
2649                         if (pos < 0) {
2650                                 spin_unlock(&cur->d_lock);
2651                                 break;
2652                         }
2653                         memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2654                         spin_unlock(&cur->d_lock);
2655                         parent = dget_parent(cur);
2656                 } else {
2657                         int len, ret;
2658                         char buf[NAME_MAX];
2659
2660                         /*
2661                          * Proactively copy name into buf, in case we need to
2662                          * present it as-is.
2663                          */
2664                         memcpy(buf, cur->d_name.name, cur->d_name.len);
2665                         len = cur->d_name.len;
2666                         spin_unlock(&cur->d_lock);
2667                         parent = dget_parent(cur);
2668
2669                         ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2670                         if (ret < 0) {
2671                                 dput(parent);
2672                                 dput(cur);
2673                                 return ERR_PTR(ret);
2674                         }
2675
2676                         if (fscrypt_has_encryption_key(d_inode(parent))) {
2677                                 len = ceph_encode_encrypted_fname(d_inode(parent),
2678                                                                   cur, buf);
2679                                 if (len < 0) {
2680                                         dput(parent);
2681                                         dput(cur);
2682                                         return ERR_PTR(len);
2683                                 }
2684                         }
2685                         pos -= len;
2686                         if (pos < 0) {
2687                                 dput(parent);
2688                                 break;
2689                         }
2690                         memcpy(path + pos, buf, len);
2691                 }
2692                 dput(cur);
2693                 cur = parent;
2694
2695                 /* Are we at the root? */
2696                 if (IS_ROOT(cur))
2697                         break;
2698
2699                 /* Are we out of buffer? */
2700                 if (--pos < 0)
2701                         break;
2702
2703                 path[pos] = '/';
2704         }
2705         inode = d_inode(cur);
2706         base = inode ? ceph_ino(inode) : 0;
2707         dput(cur);
2708
2709         if (read_seqretry(&rename_lock, seq))
2710                 goto retry;
2711
2712         if (pos < 0) {
2713                 /*
2714                  * A rename didn't occur, but somehow we didn't end up where
2715                  * we thought we would. Throw a warning and try again.
2716                  */
2717                 pr_warn("build_path did not end path lookup where expected (pos = %d)\n",
2718                         pos);
2719                 goto retry;
2720         }
2721
2722         *pbase = base;
2723         *plen = PATH_MAX - 1 - pos;
2724         dout("build_path on %p %d built %llx '%.*s'\n",
2725              dentry, d_count(dentry), base, *plen, path + pos);
2726         return path + pos;
2727 }
2728
2729 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2730                              const char **ppath, int *ppathlen, u64 *pino,
2731                              bool *pfreepath, bool parent_locked)
2732 {
2733         char *path;
2734
2735         rcu_read_lock();
2736         if (!dir)
2737                 dir = d_inode_rcu(dentry->d_parent);
2738         if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2739             !IS_ENCRYPTED(dir)) {
2740                 *pino = ceph_ino(dir);
2741                 rcu_read_unlock();
2742                 *ppath = dentry->d_name.name;
2743                 *ppathlen = dentry->d_name.len;
2744                 return 0;
2745         }
2746         rcu_read_unlock();
2747         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2748         if (IS_ERR(path))
2749                 return PTR_ERR(path);
2750         *ppath = path;
2751         *pfreepath = true;
2752         return 0;
2753 }
2754
2755 static int build_inode_path(struct inode *inode,
2756                             const char **ppath, int *ppathlen, u64 *pino,
2757                             bool *pfreepath)
2758 {
2759         struct dentry *dentry;
2760         char *path;
2761
2762         if (ceph_snap(inode) == CEPH_NOSNAP) {
2763                 *pino = ceph_ino(inode);
2764                 *ppathlen = 0;
2765                 return 0;
2766         }
2767         dentry = d_find_alias(inode);
2768         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2769         dput(dentry);
2770         if (IS_ERR(path))
2771                 return PTR_ERR(path);
2772         *ppath = path;
2773         *pfreepath = true;
2774         return 0;
2775 }
2776
2777 /*
2778  * request arguments may be specified via an inode *, a dentry *, or
2779  * an explicit ino+path.
2780  */
2781 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2782                                   struct inode *rdiri, const char *rpath,
2783                                   u64 rino, const char **ppath, int *pathlen,
2784                                   u64 *ino, bool *freepath, bool parent_locked)
2785 {
2786         int r = 0;
2787
2788         if (rinode) {
2789                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2790                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2791                      ceph_snap(rinode));
2792         } else if (rdentry) {
2793                 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2794                                         freepath, parent_locked);
2795                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2796                      *ppath);
2797         } else if (rpath || rino) {
2798                 *ino = rino;
2799                 *ppath = rpath;
2800                 *pathlen = rpath ? strlen(rpath) : 0;
2801                 dout(" path %.*s\n", *pathlen, rpath);
2802         }
2803
2804         return r;
2805 }
2806
2807 static void encode_mclientrequest_tail(void **p,
2808                                        const struct ceph_mds_request *req)
2809 {
2810         struct ceph_timespec ts;
2811         int i;
2812
2813         ceph_encode_timespec64(&ts, &req->r_stamp);
2814         ceph_encode_copy(p, &ts, sizeof(ts));
2815
2816         /* v4: gid_list */
2817         ceph_encode_32(p, req->r_cred->group_info->ngroups);
2818         for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2819                 ceph_encode_64(p, from_kgid(&init_user_ns,
2820                                             req->r_cred->group_info->gid[i]));
2821
2822         /* v5: altname */
2823         ceph_encode_32(p, req->r_altname_len);
2824         ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2825
2826         /* v6: fscrypt_auth and fscrypt_file */
2827         if (req->r_fscrypt_auth) {
2828                 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2829
2830                 ceph_encode_32(p, authlen);
2831                 ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2832         } else {
2833                 ceph_encode_32(p, 0);
2834         }
2835         if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2836                 ceph_encode_32(p, sizeof(__le64));
2837                 ceph_encode_64(p, req->r_fscrypt_file);
2838         } else {
2839                 ceph_encode_32(p, 0);
2840         }
2841 }
2842
2843 static struct ceph_mds_request_head_legacy *
2844 find_legacy_request_head(void *p, u64 features)
2845 {
2846         bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2847         struct ceph_mds_request_head_old *ohead;
2848
2849         if (legacy)
2850                 return (struct ceph_mds_request_head_legacy *)p;
2851         ohead = (struct ceph_mds_request_head_old *)p;
2852         return (struct ceph_mds_request_head_legacy *)&ohead->oldest_client_tid;
2853 }
2854
2855 /*
2856  * called under mdsc->mutex
2857  */
2858 static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2859                                                struct ceph_mds_request *req,
2860                                                bool drop_cap_releases)
2861 {
2862         int mds = session->s_mds;
2863         struct ceph_mds_client *mdsc = session->s_mdsc;
2864         struct ceph_msg *msg;
2865         struct ceph_mds_request_head_legacy *lhead;
2866         const char *path1 = NULL;
2867         const char *path2 = NULL;
2868         u64 ino1 = 0, ino2 = 0;
2869         int pathlen1 = 0, pathlen2 = 0;
2870         bool freepath1 = false, freepath2 = false;
2871         struct dentry *old_dentry = NULL;
2872         int len;
2873         u16 releases;
2874         void *p, *end;
2875         int ret;
2876         bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2877         bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
2878                                      &session->s_features);
2879
2880         ret = set_request_path_attr(req->r_inode, req->r_dentry,
2881                               req->r_parent, req->r_path1, req->r_ino1.ino,
2882                               &path1, &pathlen1, &ino1, &freepath1,
2883                               test_bit(CEPH_MDS_R_PARENT_LOCKED,
2884                                         &req->r_req_flags));
2885         if (ret < 0) {
2886                 msg = ERR_PTR(ret);
2887                 goto out;
2888         }
2889
2890         /* If r_old_dentry is set, then assume that its parent is locked */
2891         if (req->r_old_dentry &&
2892             !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
2893                 old_dentry = req->r_old_dentry;
2894         ret = set_request_path_attr(NULL, old_dentry,
2895                               req->r_old_dentry_dir,
2896                               req->r_path2, req->r_ino2.ino,
2897                               &path2, &pathlen2, &ino2, &freepath2, true);
2898         if (ret < 0) {
2899                 msg = ERR_PTR(ret);
2900                 goto out_free1;
2901         }
2902
2903         req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
2904         if (IS_ERR(req->r_altname)) {
2905                 msg = ERR_CAST(req->r_altname);
2906                 req->r_altname = NULL;
2907                 goto out_free2;
2908         }
2909
2910         /*
2911          * For old cephs without supporting the 32bit retry/fwd feature
2912          * it will copy the raw memories directly when decoding the
2913          * requests. While new cephs will decode the head depending the
2914          * version member, so we need to make sure it will be compatible
2915          * with them both.
2916          */
2917         if (legacy)
2918                 len = sizeof(struct ceph_mds_request_head_legacy);
2919         else if (old_version)
2920                 len = sizeof(struct ceph_mds_request_head_old);
2921         else
2922                 len = sizeof(struct ceph_mds_request_head);
2923
2924         /* filepaths */
2925         len += 2 * (1 + sizeof(u32) + sizeof(u64));
2926         len += pathlen1 + pathlen2;
2927
2928         /* cap releases */
2929         len += sizeof(struct ceph_mds_request_release) *
2930                 (!!req->r_inode_drop + !!req->r_dentry_drop +
2931                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2932
2933         if (req->r_dentry_drop)
2934                 len += pathlen1;
2935         if (req->r_old_dentry_drop)
2936                 len += pathlen2;
2937
2938         /* MClientRequest tail */
2939
2940         /* req->r_stamp */
2941         len += sizeof(struct ceph_timespec);
2942
2943         /* gid list */
2944         len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2945
2946         /* alternate name */
2947         len += sizeof(u32) + req->r_altname_len;
2948
2949         /* fscrypt_auth */
2950         len += sizeof(u32); // fscrypt_auth
2951         if (req->r_fscrypt_auth)
2952                 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2953
2954         /* fscrypt_file */
2955         len += sizeof(u32);
2956         if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
2957                 len += sizeof(__le64);
2958
2959         msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2960         if (!msg) {
2961                 msg = ERR_PTR(-ENOMEM);
2962                 goto out_free2;
2963         }
2964
2965         msg->hdr.tid = cpu_to_le64(req->r_tid);
2966
2967         lhead = find_legacy_request_head(msg->front.iov_base,
2968                                          session->s_con.peer_features);
2969
2970         /*
2971          * The ceph_mds_request_head_legacy didn't contain a version field, and
2972          * one was added when we moved the message version from 3->4.
2973          */
2974         if (legacy) {
2975                 msg->hdr.version = cpu_to_le16(3);
2976                 p = msg->front.iov_base + sizeof(*lhead);
2977         } else if (old_version) {
2978                 struct ceph_mds_request_head_old *ohead = msg->front.iov_base;
2979
2980                 msg->hdr.version = cpu_to_le16(4);
2981                 ohead->version = cpu_to_le16(1);
2982                 p = msg->front.iov_base + sizeof(*ohead);
2983         } else {
2984                 struct ceph_mds_request_head *nhead = msg->front.iov_base;
2985
2986                 msg->hdr.version = cpu_to_le16(6);
2987                 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2988                 p = msg->front.iov_base + sizeof(*nhead);
2989         }
2990
2991         end = msg->front.iov_base + msg->front.iov_len;
2992
2993         lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2994         lhead->op = cpu_to_le32(req->r_op);
2995         lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2996                                                   req->r_cred->fsuid));
2997         lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2998                                                   req->r_cred->fsgid));
2999         lhead->ino = cpu_to_le64(req->r_deleg_ino);
3000         lhead->args = req->r_args;
3001
3002         ceph_encode_filepath(&p, end, ino1, path1);
3003         ceph_encode_filepath(&p, end, ino2, path2);
3004
3005         /* make note of release offset, in case we need to replay */
3006         req->r_request_release_offset = p - msg->front.iov_base;
3007
3008         /* cap releases */
3009         releases = 0;
3010         if (req->r_inode_drop)
3011                 releases += ceph_encode_inode_release(&p,
3012                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
3013                       mds, req->r_inode_drop, req->r_inode_unless,
3014                       req->r_op == CEPH_MDS_OP_READDIR);
3015         if (req->r_dentry_drop) {
3016                 ret = ceph_encode_dentry_release(&p, req->r_dentry,
3017                                 req->r_parent, mds, req->r_dentry_drop,
3018                                 req->r_dentry_unless);
3019                 if (ret < 0)
3020                         goto out_err;
3021                 releases += ret;
3022         }
3023         if (req->r_old_dentry_drop) {
3024                 ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
3025                                 req->r_old_dentry_dir, mds,
3026                                 req->r_old_dentry_drop,
3027                                 req->r_old_dentry_unless);
3028                 if (ret < 0)
3029                         goto out_err;
3030                 releases += ret;
3031         }
3032         if (req->r_old_inode_drop)
3033                 releases += ceph_encode_inode_release(&p,
3034                       d_inode(req->r_old_dentry),
3035                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3036
3037         if (drop_cap_releases) {
3038                 releases = 0;
3039                 p = msg->front.iov_base + req->r_request_release_offset;
3040         }
3041
3042         lhead->num_releases = cpu_to_le16(releases);
3043
3044         encode_mclientrequest_tail(&p, req);
3045
3046         if (WARN_ON_ONCE(p > end)) {
3047                 ceph_msg_put(msg);
3048                 msg = ERR_PTR(-ERANGE);
3049                 goto out_free2;
3050         }
3051
3052         msg->front.iov_len = p - msg->front.iov_base;
3053         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3054
3055         if (req->r_pagelist) {
3056                 struct ceph_pagelist *pagelist = req->r_pagelist;
3057                 ceph_msg_data_add_pagelist(msg, pagelist);
3058                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
3059         } else {
3060                 msg->hdr.data_len = 0;
3061         }
3062
3063         msg->hdr.data_off = cpu_to_le16(0);
3064
3065 out_free2:
3066         if (freepath2)
3067                 ceph_mdsc_free_path((char *)path2, pathlen2);
3068 out_free1:
3069         if (freepath1)
3070                 ceph_mdsc_free_path((char *)path1, pathlen1);
3071 out:
3072         return msg;
3073 out_err:
3074         ceph_msg_put(msg);
3075         msg = ERR_PTR(ret);
3076         goto out_free2;
3077 }
3078
3079 /*
3080  * called under mdsc->mutex if error, under no mutex if
3081  * success.
3082  */
3083 static void complete_request(struct ceph_mds_client *mdsc,
3084                              struct ceph_mds_request *req)
3085 {
3086         req->r_end_latency = ktime_get();
3087
3088         if (req->r_callback)
3089                 req->r_callback(mdsc, req);
3090         complete_all(&req->r_completion);
3091 }
3092
3093 /*
3094  * called under mdsc->mutex
3095  */
3096 static int __prepare_send_request(struct ceph_mds_session *session,
3097                                   struct ceph_mds_request *req,
3098                                   bool drop_cap_releases)
3099 {
3100         int mds = session->s_mds;
3101         struct ceph_mds_client *mdsc = session->s_mdsc;
3102         struct ceph_mds_request_head_legacy *lhead;
3103         struct ceph_mds_request_head *nhead;
3104         struct ceph_msg *msg;
3105         int flags = 0, old_max_retry;
3106         bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
3107                                      &session->s_features);
3108
3109         /*
3110          * Avoid inifinite retrying after overflow. The client will
3111          * increase the retry count and if the MDS is old version,
3112          * so we limit to retry at most 256 times.
3113          */
3114         if (req->r_attempts) {
3115                old_max_retry = sizeof_field(struct ceph_mds_request_head_old,
3116                                             num_retry);
3117                old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
3118                if ((old_version && req->r_attempts >= old_max_retry) ||
3119                    ((uint32_t)req->r_attempts >= U32_MAX)) {
3120                         pr_warn_ratelimited("%s request tid %llu seq overflow\n",
3121                                             __func__, req->r_tid);
3122                         return -EMULTIHOP;
3123                }
3124         }
3125
3126         req->r_attempts++;
3127         if (req->r_inode) {
3128                 struct ceph_cap *cap =
3129                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3130
3131                 if (cap)
3132                         req->r_sent_on_mseq = cap->mseq;
3133                 else
3134                         req->r_sent_on_mseq = -1;
3135         }
3136         dout("%s %p tid %lld %s (attempt %d)\n", __func__, req,
3137              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
3138
3139         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3140                 void *p;
3141
3142                 /*
3143                  * Replay.  Do not regenerate message (and rebuild
3144                  * paths, etc.); just use the original message.
3145                  * Rebuilding paths will break for renames because
3146                  * d_move mangles the src name.
3147                  */
3148                 msg = req->r_request;
3149                 lhead = find_legacy_request_head(msg->front.iov_base,
3150                                                  session->s_con.peer_features);
3151
3152                 flags = le32_to_cpu(lhead->flags);
3153                 flags |= CEPH_MDS_FLAG_REPLAY;
3154                 lhead->flags = cpu_to_le32(flags);
3155
3156                 if (req->r_target_inode)
3157                         lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3158
3159                 lhead->num_retry = req->r_attempts - 1;
3160                 if (!old_version) {
3161                         nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3162                         nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3163                 }
3164
3165                 /* remove cap/dentry releases from message */
3166                 lhead->num_releases = 0;
3167
3168                 p = msg->front.iov_base + req->r_request_release_offset;
3169                 encode_mclientrequest_tail(&p, req);
3170
3171                 msg->front.iov_len = p - msg->front.iov_base;
3172                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3173                 return 0;
3174         }
3175
3176         if (req->r_request) {
3177                 ceph_msg_put(req->r_request);
3178                 req->r_request = NULL;
3179         }
3180         msg = create_request_message(session, req, drop_cap_releases);
3181         if (IS_ERR(msg)) {
3182                 req->r_err = PTR_ERR(msg);
3183                 return PTR_ERR(msg);
3184         }
3185         req->r_request = msg;
3186
3187         lhead = find_legacy_request_head(msg->front.iov_base,
3188                                          session->s_con.peer_features);
3189         lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3190         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3191                 flags |= CEPH_MDS_FLAG_REPLAY;
3192         if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3193                 flags |= CEPH_MDS_FLAG_ASYNC;
3194         if (req->r_parent)
3195                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3196         lhead->flags = cpu_to_le32(flags);
3197         lhead->num_fwd = req->r_num_fwd;
3198         lhead->num_retry = req->r_attempts - 1;
3199         if (!old_version) {
3200                 nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3201                 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
3202                 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3203         }
3204
3205         dout(" r_parent = %p\n", req->r_parent);
3206         return 0;
3207 }
3208
3209 /*
3210  * called under mdsc->mutex
3211  */
3212 static int __send_request(struct ceph_mds_session *session,
3213                           struct ceph_mds_request *req,
3214                           bool drop_cap_releases)
3215 {
3216         int err;
3217
3218         err = __prepare_send_request(session, req, drop_cap_releases);
3219         if (!err) {
3220                 ceph_msg_get(req->r_request);
3221                 ceph_con_send(&session->s_con, req->r_request);
3222         }
3223
3224         return err;
3225 }
3226
3227 /*
3228  * send request, or put it on the appropriate wait list.
3229  */
3230 static void __do_request(struct ceph_mds_client *mdsc,
3231                         struct ceph_mds_request *req)
3232 {
3233         struct ceph_mds_session *session = NULL;
3234         int mds = -1;
3235         int err = 0;
3236         bool random;
3237
3238         if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3239                 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3240                         __unregister_request(mdsc, req);
3241                 return;
3242         }
3243
3244         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3245                 dout("do_request metadata corrupted\n");
3246                 err = -EIO;
3247                 goto finish;
3248         }
3249         if (req->r_timeout &&
3250             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3251                 dout("do_request timed out\n");
3252                 err = -ETIMEDOUT;
3253                 goto finish;
3254         }
3255         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3256                 dout("do_request forced umount\n");
3257                 err = -EIO;
3258                 goto finish;
3259         }
3260         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3261                 if (mdsc->mdsmap_err) {
3262                         err = mdsc->mdsmap_err;
3263                         dout("do_request mdsmap err %d\n", err);
3264                         goto finish;
3265                 }
3266                 if (mdsc->mdsmap->m_epoch == 0) {
3267                         dout("do_request no mdsmap, waiting for map\n");
3268                         list_add(&req->r_wait, &mdsc->waiting_for_map);
3269                         return;
3270                 }
3271                 if (!(mdsc->fsc->mount_options->flags &
3272                       CEPH_MOUNT_OPT_MOUNTWAIT) &&
3273                     !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3274                         err = -EHOSTUNREACH;
3275                         goto finish;
3276                 }
3277         }
3278
3279         put_request_session(req);
3280
3281         mds = __choose_mds(mdsc, req, &random);
3282         if (mds < 0 ||
3283             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3284                 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3285                         err = -EJUKEBOX;
3286                         goto finish;
3287                 }
3288                 dout("do_request no mds or not active, waiting for map\n");
3289                 list_add(&req->r_wait, &mdsc->waiting_for_map);
3290                 return;
3291         }
3292
3293         /* get, open session */
3294         session = __ceph_lookup_mds_session(mdsc, mds);
3295         if (!session) {
3296                 session = register_session(mdsc, mds);
3297                 if (IS_ERR(session)) {
3298                         err = PTR_ERR(session);
3299                         goto finish;
3300                 }
3301         }
3302         req->r_session = ceph_get_mds_session(session);
3303
3304         dout("do_request mds%d session %p state %s\n", mds, session,
3305              ceph_session_state_name(session->s_state));
3306
3307         /*
3308          * The old ceph will crash the MDSs when see unknown OPs
3309          */
3310         if (req->r_feature_needed > 0 &&
3311             !test_bit(req->r_feature_needed, &session->s_features)) {
3312                 err = -EOPNOTSUPP;
3313                 goto out_session;
3314         }
3315
3316         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3317             session->s_state != CEPH_MDS_SESSION_HUNG) {
3318                 /*
3319                  * We cannot queue async requests since the caps and delegated
3320                  * inodes are bound to the session. Just return -EJUKEBOX and
3321                  * let the caller retry a sync request in that case.
3322                  */
3323                 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3324                         err = -EJUKEBOX;
3325                         goto out_session;
3326                 }
3327
3328                 /*
3329                  * If the session has been REJECTED, then return a hard error,
3330                  * unless it's a CLEANRECOVER mount, in which case we'll queue
3331                  * it to the mdsc queue.
3332                  */
3333                 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3334                         if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
3335                                 list_add(&req->r_wait, &mdsc->waiting_for_map);
3336                         else
3337                                 err = -EACCES;
3338                         goto out_session;
3339                 }
3340
3341                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
3342                     session->s_state == CEPH_MDS_SESSION_CLOSING) {
3343                         err = __open_session(mdsc, session);
3344                         if (err)
3345                                 goto out_session;
3346                         /* retry the same mds later */
3347                         if (random)
3348                                 req->r_resend_mds = mds;
3349                 }
3350                 list_add(&req->r_wait, &session->s_waiting);
3351                 goto out_session;
3352         }
3353
3354         /* send request */
3355         req->r_resend_mds = -1;   /* forget any previous mds hint */
3356
3357         if (req->r_request_started == 0)   /* note request start time */
3358                 req->r_request_started = jiffies;
3359
3360         /*
3361          * For async create we will choose the auth MDS of frag in parent
3362          * directory to send the request and ususally this works fine, but
3363          * if the migrated the dirtory to another MDS before it could handle
3364          * it the request will be forwarded.
3365          *
3366          * And then the auth cap will be changed.
3367          */
3368         if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3369                 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3370                 struct ceph_inode_info *ci;
3371                 struct ceph_cap *cap;
3372
3373                 /*
3374                  * The request maybe handled very fast and the new inode
3375                  * hasn't been linked to the dentry yet. We need to wait
3376                  * for the ceph_finish_async_create(), which shouldn't be
3377                  * stuck too long or fail in thoery, to finish when forwarding
3378                  * the request.
3379                  */
3380                 if (!d_inode(req->r_dentry)) {
3381                         err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3382                                           TASK_KILLABLE);
3383                         if (err) {
3384                                 mutex_lock(&req->r_fill_mutex);
3385                                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3386                                 mutex_unlock(&req->r_fill_mutex);
3387                                 goto out_session;
3388                         }
3389                 }
3390
3391                 ci = ceph_inode(d_inode(req->r_dentry));
3392
3393                 spin_lock(&ci->i_ceph_lock);
3394                 cap = ci->i_auth_cap;
3395                 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3396                         dout("do_request session changed for auth cap %d -> %d\n",
3397                              cap->session->s_mds, session->s_mds);
3398
3399                         /* Remove the auth cap from old session */
3400                         spin_lock(&cap->session->s_cap_lock);
3401                         cap->session->s_nr_caps--;
3402                         list_del_init(&cap->session_caps);
3403                         spin_unlock(&cap->session->s_cap_lock);
3404
3405                         /* Add the auth cap to the new session */
3406                         cap->mds = mds;
3407                         cap->session = session;
3408                         spin_lock(&session->s_cap_lock);
3409                         session->s_nr_caps++;
3410                         list_add_tail(&cap->session_caps, &session->s_caps);
3411                         spin_unlock(&session->s_cap_lock);
3412
3413                         change_auth_cap_ses(ci, session);
3414                 }
3415                 spin_unlock(&ci->i_ceph_lock);
3416         }
3417
3418         err = __send_request(session, req, false);
3419
3420 out_session:
3421         ceph_put_mds_session(session);
3422 finish:
3423         if (err) {
3424                 dout("__do_request early error %d\n", err);
3425                 req->r_err = err;
3426                 complete_request(mdsc, req);
3427                 __unregister_request(mdsc, req);
3428         }
3429         return;
3430 }
3431
3432 /*
3433  * called under mdsc->mutex
3434  */
3435 static void __wake_requests(struct ceph_mds_client *mdsc,
3436                             struct list_head *head)
3437 {
3438         struct ceph_mds_request *req;
3439         LIST_HEAD(tmp_list);
3440
3441         list_splice_init(head, &tmp_list);
3442
3443         while (!list_empty(&tmp_list)) {
3444                 req = list_entry(tmp_list.next,
3445                                  struct ceph_mds_request, r_wait);
3446                 list_del_init(&req->r_wait);
3447                 dout(" wake request %p tid %llu\n", req, req->r_tid);
3448                 __do_request(mdsc, req);
3449         }
3450 }
3451
3452 /*
3453  * Wake up threads with requests pending for @mds, so that they can
3454  * resubmit their requests to a possibly different mds.
3455  */
3456 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3457 {
3458         struct ceph_mds_request *req;
3459         struct rb_node *p = rb_first(&mdsc->request_tree);
3460
3461         dout("kick_requests mds%d\n", mds);
3462         while (p) {
3463                 req = rb_entry(p, struct ceph_mds_request, r_node);
3464                 p = rb_next(p);
3465                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3466                         continue;
3467                 if (req->r_attempts > 0)
3468                         continue; /* only new requests */
3469                 if (req->r_session &&
3470                     req->r_session->s_mds == mds) {
3471                         dout(" kicking tid %llu\n", req->r_tid);
3472                         list_del_init(&req->r_wait);
3473                         __do_request(mdsc, req);
3474                 }
3475         }
3476 }
3477
3478 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3479                               struct ceph_mds_request *req)
3480 {
3481         int err = 0;
3482
3483         /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3484         if (req->r_inode)
3485                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3486         if (req->r_parent) {
3487                 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3488                 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3489                             CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3490                 spin_lock(&ci->i_ceph_lock);
3491                 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3492                 __ceph_touch_fmode(ci, mdsc, fmode);
3493                 spin_unlock(&ci->i_ceph_lock);
3494         }
3495         if (req->r_old_dentry_dir)
3496                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3497                                   CEPH_CAP_PIN);
3498
3499         if (req->r_inode) {
3500                 err = ceph_wait_on_async_create(req->r_inode);
3501                 if (err) {
3502                         dout("%s: wait for async create returned: %d\n",
3503                              __func__, err);
3504                         return err;
3505                 }
3506         }
3507
3508         if (!err && req->r_old_inode) {
3509                 err = ceph_wait_on_async_create(req->r_old_inode);
3510                 if (err) {
3511                         dout("%s: wait for async create returned: %d\n",
3512                              __func__, err);
3513                         return err;
3514                 }
3515         }
3516
3517         dout("submit_request on %p for inode %p\n", req, dir);
3518         mutex_lock(&mdsc->mutex);
3519         __register_request(mdsc, req, dir);
3520         __do_request(mdsc, req);
3521         err = req->r_err;
3522         mutex_unlock(&mdsc->mutex);
3523         return err;
3524 }
3525
3526 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3527                            struct ceph_mds_request *req,
3528                            ceph_mds_request_wait_callback_t wait_func)
3529 {
3530         int err;
3531
3532         /* wait */
3533         dout("do_request waiting\n");
3534         if (wait_func) {
3535                 err = wait_func(mdsc, req);
3536         } else {
3537                 long timeleft = wait_for_completion_killable_timeout(
3538                                         &req->r_completion,
3539                                         ceph_timeout_jiffies(req->r_timeout));
3540                 if (timeleft > 0)
3541                         err = 0;
3542                 else if (!timeleft)
3543                         err = -ETIMEDOUT;  /* timed out */
3544                 else
3545                         err = timeleft;  /* killed */
3546         }
3547         dout("do_request waited, got %d\n", err);
3548         mutex_lock(&mdsc->mutex);
3549
3550         /* only abort if we didn't race with a real reply */
3551         if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3552                 err = le32_to_cpu(req->r_reply_info.head->result);
3553         } else if (err < 0) {
3554                 dout("aborted request %lld with %d\n", req->r_tid, err);
3555
3556                 /*
3557                  * ensure we aren't running concurrently with
3558                  * ceph_fill_trace or ceph_readdir_prepopulate, which
3559                  * rely on locks (dir mutex) held by our caller.
3560                  */
3561                 mutex_lock(&req->r_fill_mutex);
3562                 req->r_err = err;
3563                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3564                 mutex_unlock(&req->r_fill_mutex);
3565
3566                 if (req->r_parent &&
3567                     (req->r_op & CEPH_MDS_OP_WRITE))
3568                         ceph_invalidate_dir_request(req);
3569         } else {
3570                 err = req->r_err;
3571         }
3572
3573         mutex_unlock(&mdsc->mutex);
3574         return err;
3575 }
3576
3577 /*
3578  * Synchrously perform an mds request.  Take care of all of the
3579  * session setup, forwarding, retry details.
3580  */
3581 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3582                          struct inode *dir,
3583                          struct ceph_mds_request *req)
3584 {
3585         int err;
3586
3587         dout("do_request on %p\n", req);
3588
3589         /* issue */
3590         err = ceph_mdsc_submit_request(mdsc, dir, req);
3591         if (!err)
3592                 err = ceph_mdsc_wait_request(mdsc, req, NULL);
3593         dout("do_request %p done, result %d\n", req, err);
3594         return err;
3595 }
3596
3597 /*
3598  * Invalidate dir's completeness, dentry lease state on an aborted MDS
3599  * namespace request.
3600  */
3601 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3602 {
3603         struct inode *dir = req->r_parent;
3604         struct inode *old_dir = req->r_old_dentry_dir;
3605
3606         dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3607
3608         ceph_dir_clear_complete(dir);
3609         if (old_dir)
3610                 ceph_dir_clear_complete(old_dir);
3611         if (req->r_dentry)
3612                 ceph_invalidate_dentry_lease(req->r_dentry);
3613         if (req->r_old_dentry)
3614                 ceph_invalidate_dentry_lease(req->r_old_dentry);
3615 }
3616
3617 /*
3618  * Handle mds reply.
3619  *
3620  * We take the session mutex and parse and process the reply immediately.
3621  * This preserves the logical ordering of replies, capabilities, etc., sent
3622  * by the MDS as they are applied to our local cache.
3623  */
3624 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3625 {
3626         struct ceph_mds_client *mdsc = session->s_mdsc;
3627         struct ceph_mds_request *req;
3628         struct ceph_mds_reply_head *head = msg->front.iov_base;
3629         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3630         struct ceph_snap_realm *realm;
3631         u64 tid;
3632         int err, result;
3633         int mds = session->s_mds;
3634         bool close_sessions = false;
3635
3636         if (msg->front.iov_len < sizeof(*head)) {
3637                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3638                 ceph_msg_dump(msg);
3639                 return;
3640         }
3641
3642         /* get request, session */
3643         tid = le64_to_cpu(msg->hdr.tid);
3644         mutex_lock(&mdsc->mutex);
3645         req = lookup_get_request(mdsc, tid);
3646         if (!req) {
3647                 dout("handle_reply on unknown tid %llu\n", tid);
3648                 mutex_unlock(&mdsc->mutex);
3649                 return;
3650         }
3651         dout("handle_reply %p\n", req);
3652
3653         /* correct session? */
3654         if (req->r_session != session) {
3655                 pr_err("mdsc_handle_reply got %llu on session mds%d"
3656                        " not mds%d\n", tid, session->s_mds,
3657                        req->r_session ? req->r_session->s_mds : -1);
3658                 mutex_unlock(&mdsc->mutex);
3659                 goto out;
3660         }
3661
3662         /* dup? */
3663         if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3664             (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3665                 pr_warn("got a dup %s reply on %llu from mds%d\n",
3666                            head->safe ? "safe" : "unsafe", tid, mds);
3667                 mutex_unlock(&mdsc->mutex);
3668                 goto out;
3669         }
3670         if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3671                 pr_warn("got unsafe after safe on %llu from mds%d\n",
3672                            tid, mds);
3673                 mutex_unlock(&mdsc->mutex);
3674                 goto out;
3675         }
3676
3677         result = le32_to_cpu(head->result);
3678
3679         if (head->safe) {
3680                 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3681                 __unregister_request(mdsc, req);
3682
3683                 /* last request during umount? */
3684                 if (mdsc->stopping && !__get_oldest_req(mdsc))
3685                         complete_all(&mdsc->safe_umount_waiters);
3686
3687                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3688                         /*
3689                          * We already handled the unsafe response, now do the
3690                          * cleanup.  No need to examine the response; the MDS
3691                          * doesn't include any result info in the safe
3692                          * response.  And even if it did, there is nothing
3693                          * useful we could do with a revised return value.
3694                          */
3695                         dout("got safe reply %llu, mds%d\n", tid, mds);
3696
3697                         mutex_unlock(&mdsc->mutex);
3698                         goto out;
3699                 }
3700         } else {
3701                 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3702                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3703         }
3704
3705         dout("handle_reply tid %lld result %d\n", tid, result);
3706         if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3707                 err = parse_reply_info(session, msg, req, (u64)-1);
3708         else
3709                 err = parse_reply_info(session, msg, req,
3710                                        session->s_con.peer_features);
3711         mutex_unlock(&mdsc->mutex);
3712
3713         /* Must find target inode outside of mutexes to avoid deadlocks */
3714         rinfo = &req->r_reply_info;
3715         if ((err >= 0) && rinfo->head->is_target) {
3716                 struct inode *in = xchg(&req->r_new_inode, NULL);
3717                 struct ceph_vino tvino = {
3718                         .ino  = le64_to_cpu(rinfo->targeti.in->ino),
3719                         .snap = le64_to_cpu(rinfo->targeti.in->snapid)
3720                 };
3721
3722                 /*
3723                  * If we ended up opening an existing inode, discard
3724                  * r_new_inode
3725                  */
3726                 if (req->r_op == CEPH_MDS_OP_CREATE &&
3727                     !req->r_reply_info.has_create_ino) {
3728                         /* This should never happen on an async create */
3729                         WARN_ON_ONCE(req->r_deleg_ino);
3730                         iput(in);
3731                         in = NULL;
3732                 }
3733
3734                 in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3735                 if (IS_ERR(in)) {
3736                         err = PTR_ERR(in);
3737                         mutex_lock(&session->s_mutex);
3738                         goto out_err;
3739                 }
3740                 req->r_target_inode = in;
3741         }
3742
3743         mutex_lock(&session->s_mutex);
3744         if (err < 0) {
3745                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3746                 ceph_msg_dump(msg);
3747                 goto out_err;
3748         }
3749
3750         /* snap trace */
3751         realm = NULL;
3752         if (rinfo->snapblob_len) {
3753                 down_write(&mdsc->snap_rwsem);
3754                 err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3755                                 rinfo->snapblob + rinfo->snapblob_len,
3756                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3757                                 &realm);
3758                 if (err) {
3759                         up_write(&mdsc->snap_rwsem);
3760                         close_sessions = true;
3761                         if (err == -EIO)
3762                                 ceph_msg_dump(msg);
3763                         goto out_err;
3764                 }
3765                 downgrade_write(&mdsc->snap_rwsem);
3766         } else {
3767                 down_read(&mdsc->snap_rwsem);
3768         }
3769
3770         /* insert trace into our cache */
3771         mutex_lock(&req->r_fill_mutex);
3772         current->journal_info = req;
3773         err = ceph_fill_trace(mdsc->fsc->sb, req);
3774         if (err == 0) {
3775                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3776                                     req->r_op == CEPH_MDS_OP_LSSNAP))
3777                         err = ceph_readdir_prepopulate(req, req->r_session);
3778         }
3779         current->journal_info = NULL;
3780         mutex_unlock(&req->r_fill_mutex);
3781
3782         up_read(&mdsc->snap_rwsem);
3783         if (realm)
3784                 ceph_put_snap_realm(mdsc, realm);
3785
3786         if (err == 0) {
3787                 if (req->r_target_inode &&
3788                     test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3789                         struct ceph_inode_info *ci =
3790                                 ceph_inode(req->r_target_inode);
3791                         spin_lock(&ci->i_unsafe_lock);
3792                         list_add_tail(&req->r_unsafe_target_item,
3793                                       &ci->i_unsafe_iops);
3794                         spin_unlock(&ci->i_unsafe_lock);
3795                 }
3796
3797                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3798         }
3799 out_err:
3800         mutex_lock(&mdsc->mutex);
3801         if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3802                 if (err) {
3803                         req->r_err = err;
3804                 } else {
3805                         req->r_reply =  ceph_msg_get(msg);
3806                         set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3807                 }
3808         } else {
3809                 dout("reply arrived after request %lld was aborted\n", tid);
3810         }
3811         mutex_unlock(&mdsc->mutex);
3812
3813         mutex_unlock(&session->s_mutex);
3814
3815         /* kick calling process */
3816         complete_request(mdsc, req);
3817
3818         ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3819                                      req->r_end_latency, err);
3820 out:
3821         ceph_mdsc_put_request(req);
3822
3823         /* Defer closing the sessions after s_mutex lock being released */
3824         if (close_sessions)
3825                 ceph_mdsc_close_sessions(mdsc);
3826         return;
3827 }
3828
3829
3830
3831 /*
3832  * handle mds notification that our request has been forwarded.
3833  */
3834 static void handle_forward(struct ceph_mds_client *mdsc,
3835                            struct ceph_mds_session *session,
3836                            struct ceph_msg *msg)
3837 {
3838         struct ceph_mds_request *req;
3839         u64 tid = le64_to_cpu(msg->hdr.tid);
3840         u32 next_mds;
3841         u32 fwd_seq;
3842         int err = -EINVAL;
3843         void *p = msg->front.iov_base;
3844         void *end = p + msg->front.iov_len;
3845         bool aborted = false;
3846
3847         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3848         next_mds = ceph_decode_32(&p);
3849         fwd_seq = ceph_decode_32(&p);
3850
3851         mutex_lock(&mdsc->mutex);
3852         req = lookup_get_request(mdsc, tid);
3853         if (!req) {
3854                 mutex_unlock(&mdsc->mutex);
3855                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3856                 return;  /* dup reply? */
3857         }
3858
3859         if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3860                 dout("forward tid %llu aborted, unregistering\n", tid);
3861                 __unregister_request(mdsc, req);
3862         } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
3863                 /*
3864                  * Avoid inifinite retrying after overflow.
3865                  *
3866                  * The MDS will increase the fwd count and in client side
3867                  * if the num_fwd is less than the one saved in request
3868                  * that means the MDS is an old version and overflowed of
3869                  * 8 bits.
3870                  */
3871                 mutex_lock(&req->r_fill_mutex);
3872                 req->r_err = -EMULTIHOP;
3873                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3874                 mutex_unlock(&req->r_fill_mutex);
3875                 aborted = true;
3876                 pr_warn_ratelimited("forward tid %llu seq overflow\n", tid);
3877         } else {
3878                 /* resend. forward race not possible; mds would drop */
3879                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3880                 BUG_ON(req->r_err);
3881                 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3882                 req->r_attempts = 0;
3883                 req->r_num_fwd = fwd_seq;
3884                 req->r_resend_mds = next_mds;
3885                 put_request_session(req);
3886                 __do_request(mdsc, req);
3887         }
3888         mutex_unlock(&mdsc->mutex);
3889
3890         /* kick calling process */
3891         if (aborted)
3892                 complete_request(mdsc, req);
3893         ceph_mdsc_put_request(req);
3894         return;
3895
3896 bad:
3897         pr_err("mdsc_handle_forward decode error err=%d\n", err);
3898         ceph_msg_dump(msg);
3899 }
3900
3901 static int __decode_session_metadata(void **p, void *end,
3902                                      bool *blocklisted)
3903 {
3904         /* map<string,string> */
3905         u32 n;
3906         bool err_str;
3907         ceph_decode_32_safe(p, end, n, bad);
3908         while (n-- > 0) {
3909                 u32 len;
3910                 ceph_decode_32_safe(p, end, len, bad);
3911                 ceph_decode_need(p, end, len, bad);
3912                 err_str = !strncmp(*p, "error_string", len);
3913                 *p += len;
3914                 ceph_decode_32_safe(p, end, len, bad);
3915                 ceph_decode_need(p, end, len, bad);
3916                 /*
3917                  * Match "blocklisted (blacklisted)" from newer MDSes,
3918                  * or "blacklisted" from older MDSes.
3919                  */
3920                 if (err_str && strnstr(*p, "blacklisted", len))
3921                         *blocklisted = true;
3922                 *p += len;
3923         }
3924         return 0;
3925 bad:
3926         return -1;
3927 }
3928
3929 /*
3930  * handle a mds session control message
3931  */
3932 static void handle_session(struct ceph_mds_session *session,
3933                            struct ceph_msg *msg)
3934 {
3935         struct ceph_mds_client *mdsc = session->s_mdsc;
3936         int mds = session->s_mds;
3937         int msg_version = le16_to_cpu(msg->hdr.version);
3938         void *p = msg->front.iov_base;
3939         void *end = p + msg->front.iov_len;
3940         struct ceph_mds_session_head *h;
3941         u32 op;
3942         u64 seq, features = 0;
3943         int wake = 0;
3944         bool blocklisted = false;
3945
3946         /* decode */
3947         ceph_decode_need(&p, end, sizeof(*h), bad);
3948         h = p;
3949         p += sizeof(*h);
3950
3951         op = le32_to_cpu(h->op);
3952         seq = le64_to_cpu(h->seq);
3953
3954         if (msg_version >= 3) {
3955                 u32 len;
3956                 /* version >= 2 and < 5, decode metadata, skip otherwise
3957                  * as it's handled via flags.
3958                  */
3959                 if (msg_version >= 5)
3960                         ceph_decode_skip_map(&p, end, string, string, bad);
3961                 else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3962                         goto bad;
3963
3964                 /* version >= 3, feature bits */
3965                 ceph_decode_32_safe(&p, end, len, bad);
3966                 if (len) {
3967                         ceph_decode_64_safe(&p, end, features, bad);
3968                         p += len - sizeof(features);
3969                 }
3970         }
3971
3972         if (msg_version >= 5) {
3973                 u32 flags, len;
3974
3975                 /* version >= 4 */
3976                 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
3977                 ceph_decode_32_safe(&p, end, len, bad); /* len */
3978                 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
3979
3980                 /* version >= 5, flags   */
3981                 ceph_decode_32_safe(&p, end, flags, bad);
3982                 if (flags & CEPH_SESSION_BLOCKLISTED) {
3983                         pr_warn("mds%d session blocklisted\n", session->s_mds);
3984                         blocklisted = true;
3985                 }
3986         }
3987
3988         mutex_lock(&mdsc->mutex);
3989         if (op == CEPH_SESSION_CLOSE) {
3990                 ceph_get_mds_session(session);
3991                 __unregister_session(mdsc, session);
3992         }
3993         /* FIXME: this ttl calculation is generous */
3994         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3995         mutex_unlock(&mdsc->mutex);
3996
3997         mutex_lock(&session->s_mutex);
3998
3999         dout("handle_session mds%d %s %p state %s seq %llu\n",
4000              mds, ceph_session_op_name(op), session,
4001              ceph_session_state_name(session->s_state), seq);
4002
4003         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
4004                 session->s_state = CEPH_MDS_SESSION_OPEN;
4005                 pr_info("mds%d came back\n", session->s_mds);
4006         }
4007
4008         switch (op) {
4009         case CEPH_SESSION_OPEN:
4010                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4011                         pr_info("mds%d reconnect success\n", session->s_mds);
4012
4013                 if (session->s_state == CEPH_MDS_SESSION_OPEN) {
4014                         pr_notice("mds%d is already opened\n", session->s_mds);
4015                 } else {
4016                         session->s_state = CEPH_MDS_SESSION_OPEN;
4017                         session->s_features = features;
4018                         renewed_caps(mdsc, session, 0);
4019                         if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4020                                      &session->s_features))
4021                                 metric_schedule_delayed(&mdsc->metric);
4022                 }
4023
4024                 /*
4025                  * The connection maybe broken and the session in client
4026                  * side has been reinitialized, need to update the seq
4027                  * anyway.
4028                  */
4029                 if (!session->s_seq && seq)
4030                         session->s_seq = seq;
4031
4032                 wake = 1;
4033                 if (mdsc->stopping)
4034                         __close_session(mdsc, session);
4035                 break;
4036
4037         case CEPH_SESSION_RENEWCAPS:
4038                 if (session->s_renew_seq == seq)
4039                         renewed_caps(mdsc, session, 1);
4040                 break;
4041
4042         case CEPH_SESSION_CLOSE:
4043                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4044                         pr_info("mds%d reconnect denied\n", session->s_mds);
4045                 session->s_state = CEPH_MDS_SESSION_CLOSED;
4046                 cleanup_session_requests(mdsc, session);
4047                 remove_session_caps(session);
4048                 wake = 2; /* for good measure */
4049                 wake_up_all(&mdsc->session_close_wq);
4050                 break;
4051
4052         case CEPH_SESSION_STALE:
4053                 pr_info("mds%d caps went stale, renewing\n",
4054                         session->s_mds);
4055                 atomic_inc(&session->s_cap_gen);
4056                 session->s_cap_ttl = jiffies - 1;
4057                 send_renew_caps(mdsc, session);
4058                 break;
4059
4060         case CEPH_SESSION_RECALL_STATE:
4061                 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4062                 break;
4063
4064         case CEPH_SESSION_FLUSHMSG:
4065                 /* flush cap releases */
4066                 spin_lock(&session->s_cap_lock);
4067                 if (session->s_num_cap_releases)
4068                         ceph_flush_cap_releases(mdsc, session);
4069                 spin_unlock(&session->s_cap_lock);
4070
4071                 send_flushmsg_ack(mdsc, session, seq);
4072                 break;
4073
4074         case CEPH_SESSION_FORCE_RO:
4075                 dout("force_session_readonly %p\n", session);
4076                 spin_lock(&session->s_cap_lock);
4077                 session->s_readonly = true;
4078                 spin_unlock(&session->s_cap_lock);
4079                 wake_up_session_caps(session, FORCE_RO);
4080                 break;
4081
4082         case CEPH_SESSION_REJECT:
4083                 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4084                 pr_info("mds%d rejected session\n", session->s_mds);
4085                 session->s_state = CEPH_MDS_SESSION_REJECTED;
4086                 cleanup_session_requests(mdsc, session);
4087                 remove_session_caps(session);
4088                 if (blocklisted)
4089                         mdsc->fsc->blocklisted = true;
4090                 wake = 2; /* for good measure */
4091                 break;
4092
4093         default:
4094                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
4095                 WARN_ON(1);
4096         }
4097
4098         mutex_unlock(&session->s_mutex);
4099         if (wake) {
4100                 mutex_lock(&mdsc->mutex);
4101                 __wake_requests(mdsc, &session->s_waiting);
4102                 if (wake == 2)
4103                         kick_requests(mdsc, mds);
4104                 mutex_unlock(&mdsc->mutex);
4105         }
4106         if (op == CEPH_SESSION_CLOSE)
4107                 ceph_put_mds_session(session);
4108         return;
4109
4110 bad:
4111         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
4112                (int)msg->front.iov_len);
4113         ceph_msg_dump(msg);
4114         return;
4115 }
4116
4117 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4118 {
4119         int dcaps;
4120
4121         dcaps = xchg(&req->r_dir_caps, 0);
4122         if (dcaps) {
4123                 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4124                 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4125         }
4126 }
4127
4128 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
4129 {
4130         int dcaps;
4131
4132         dcaps = xchg(&req->r_dir_caps, 0);
4133         if (dcaps) {
4134                 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4135                 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
4136                                                 dcaps);
4137         }
4138 }
4139
4140 /*
4141  * called under session->mutex.
4142  */
4143 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4144                                    struct ceph_mds_session *session)
4145 {
4146         struct ceph_mds_request *req, *nreq;
4147         struct rb_node *p;
4148
4149         dout("replay_unsafe_requests mds%d\n", session->s_mds);
4150
4151         mutex_lock(&mdsc->mutex);
4152         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4153                 __send_request(session, req, true);
4154
4155         /*
4156          * also re-send old requests when MDS enters reconnect stage. So that MDS
4157          * can process completed request in clientreplay stage.
4158          */
4159         p = rb_first(&mdsc->request_tree);
4160         while (p) {
4161                 req = rb_entry(p, struct ceph_mds_request, r_node);
4162                 p = rb_next(p);
4163                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4164                         continue;
4165                 if (req->r_attempts == 0)
4166                         continue; /* only old requests */
4167                 if (!req->r_session)
4168                         continue;
4169                 if (req->r_session->s_mds != session->s_mds)
4170                         continue;
4171
4172                 ceph_mdsc_release_dir_caps_no_check(req);
4173
4174                 __send_request(session, req, true);
4175         }
4176         mutex_unlock(&mdsc->mutex);
4177 }
4178
4179 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4180 {
4181         struct ceph_msg *reply;
4182         struct ceph_pagelist *_pagelist;
4183         struct page *page;
4184         __le32 *addr;
4185         int err = -ENOMEM;
4186
4187         if (!recon_state->allow_multi)
4188                 return -ENOSPC;
4189
4190         /* can't handle message that contains both caps and realm */
4191         BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4192
4193         /* pre-allocate new pagelist */
4194         _pagelist = ceph_pagelist_alloc(GFP_NOFS);
4195         if (!_pagelist)
4196                 return -ENOMEM;
4197
4198         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4199         if (!reply)
4200                 goto fail_msg;
4201
4202         /* placeholder for nr_caps */
4203         err = ceph_pagelist_encode_32(_pagelist, 0);
4204         if (err < 0)
4205                 goto fail;
4206
4207         if (recon_state->nr_caps) {
4208                 /* currently encoding caps */
4209                 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4210                 if (err)
4211                         goto fail;
4212         } else {
4213                 /* placeholder for nr_realms (currently encoding relams) */
4214                 err = ceph_pagelist_encode_32(_pagelist, 0);
4215                 if (err < 0)
4216                         goto fail;
4217         }
4218
4219         err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4220         if (err)
4221                 goto fail;
4222
4223         page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4224         addr = kmap_atomic(page);
4225         if (recon_state->nr_caps) {
4226                 /* currently encoding caps */
4227                 *addr = cpu_to_le32(recon_state->nr_caps);
4228         } else {
4229                 /* currently encoding relams */
4230                 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4231         }
4232         kunmap_atomic(addr);
4233
4234         reply->hdr.version = cpu_to_le16(5);
4235         reply->hdr.compat_version = cpu_to_le16(4);
4236
4237         reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4238         ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4239
4240         ceph_con_send(&recon_state->session->s_con, reply);
4241         ceph_pagelist_release(recon_state->pagelist);
4242
4243         recon_state->pagelist = _pagelist;
4244         recon_state->nr_caps = 0;
4245         recon_state->nr_realms = 0;
4246         recon_state->msg_version = 5;
4247         return 0;
4248 fail:
4249         ceph_msg_put(reply);
4250 fail_msg:
4251         ceph_pagelist_release(_pagelist);
4252         return err;
4253 }
4254
4255 static struct dentry* d_find_primary(struct inode *inode)
4256 {
4257         struct dentry *alias, *dn = NULL;
4258
4259         if (hlist_empty(&inode->i_dentry))
4260                 return NULL;
4261
4262         spin_lock(&inode->i_lock);
4263         if (hlist_empty(&inode->i_dentry))
4264                 goto out_unlock;
4265
4266         if (S_ISDIR(inode->i_mode)) {
4267                 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4268                 if (!IS_ROOT(alias))
4269                         dn = dget(alias);
4270                 goto out_unlock;
4271         }
4272
4273         hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4274                 spin_lock(&alias->d_lock);
4275                 if (!d_unhashed(alias) &&
4276                     (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4277                         dn = dget_dlock(alias);
4278                 }
4279                 spin_unlock(&alias->d_lock);
4280                 if (dn)
4281                         break;
4282         }
4283 out_unlock:
4284         spin_unlock(&inode->i_lock);
4285         return dn;
4286 }
4287
4288 /*
4289  * Encode information about a cap for a reconnect with the MDS.
4290  */
4291 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4292 {
4293         union {
4294                 struct ceph_mds_cap_reconnect v2;
4295                 struct ceph_mds_cap_reconnect_v1 v1;
4296         } rec;
4297         struct ceph_inode_info *ci = ceph_inode(inode);
4298         struct ceph_reconnect_state *recon_state = arg;
4299         struct ceph_pagelist *pagelist = recon_state->pagelist;
4300         struct dentry *dentry;
4301         struct ceph_cap *cap;
4302         char *path;
4303         int pathlen = 0, err;
4304         u64 pathbase;
4305         u64 snap_follows;
4306
4307         dentry = d_find_primary(inode);
4308         if (dentry) {
4309                 /* set pathbase to parent dir when msg_version >= 2 */
4310                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
4311                                             recon_state->msg_version >= 2);
4312                 dput(dentry);
4313                 if (IS_ERR(path)) {
4314                         err = PTR_ERR(path);
4315                         goto out_err;
4316                 }
4317         } else {
4318                 path = NULL;
4319                 pathbase = 0;
4320         }
4321
4322         spin_lock(&ci->i_ceph_lock);
4323         cap = __get_cap_for_mds(ci, mds);
4324         if (!cap) {
4325                 spin_unlock(&ci->i_ceph_lock);
4326                 err = 0;
4327                 goto out_err;
4328         }
4329         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
4330              inode, ceph_vinop(inode), cap, cap->cap_id,
4331              ceph_cap_string(cap->issued));
4332
4333         cap->seq = 0;        /* reset cap seq */
4334         cap->issue_seq = 0;  /* and issue_seq */
4335         cap->mseq = 0;       /* and migrate_seq */
4336         cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4337
4338         /* These are lost when the session goes away */
4339         if (S_ISDIR(inode->i_mode)) {
4340                 if (cap->issued & CEPH_CAP_DIR_CREATE) {
4341                         ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4342                         memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4343                 }
4344                 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4345         }
4346
4347         if (recon_state->msg_version >= 2) {
4348                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4349                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4350                 rec.v2.issued = cpu_to_le32(cap->issued);
4351                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4352                 rec.v2.pathbase = cpu_to_le64(pathbase);
4353                 rec.v2.flock_len = (__force __le32)
4354                         ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4355         } else {
4356                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4357                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4358                 rec.v1.issued = cpu_to_le32(cap->issued);
4359                 rec.v1.size = cpu_to_le64(i_size_read(inode));
4360                 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
4361                 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
4362                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4363                 rec.v1.pathbase = cpu_to_le64(pathbase);
4364         }
4365
4366         if (list_empty(&ci->i_cap_snaps)) {
4367                 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4368         } else {
4369                 struct ceph_cap_snap *capsnap =
4370                         list_first_entry(&ci->i_cap_snaps,
4371                                          struct ceph_cap_snap, ci_item);
4372                 snap_follows = capsnap->follows;
4373         }
4374         spin_unlock(&ci->i_ceph_lock);
4375
4376         if (recon_state->msg_version >= 2) {
4377                 int num_fcntl_locks, num_flock_locks;
4378                 struct ceph_filelock *flocks = NULL;
4379                 size_t struct_len, total_len = sizeof(u64);
4380                 u8 struct_v = 0;
4381
4382 encode_again:
4383                 if (rec.v2.flock_len) {
4384                         ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4385                 } else {
4386                         num_fcntl_locks = 0;
4387                         num_flock_locks = 0;
4388                 }
4389                 if (num_fcntl_locks + num_flock_locks > 0) {
4390                         flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4391                                                sizeof(struct ceph_filelock),
4392                                                GFP_NOFS);
4393                         if (!flocks) {
4394                                 err = -ENOMEM;
4395                                 goto out_err;
4396                         }
4397                         err = ceph_encode_locks_to_buffer(inode, flocks,
4398                                                           num_fcntl_locks,
4399                                                           num_flock_locks);
4400                         if (err) {
4401                                 kfree(flocks);
4402                                 flocks = NULL;
4403                                 if (err == -ENOSPC)
4404                                         goto encode_again;
4405                                 goto out_err;
4406                         }
4407                 } else {
4408                         kfree(flocks);
4409                         flocks = NULL;
4410                 }
4411
4412                 if (recon_state->msg_version >= 3) {
4413                         /* version, compat_version and struct_len */
4414                         total_len += 2 * sizeof(u8) + sizeof(u32);
4415                         struct_v = 2;
4416                 }
4417                 /*
4418                  * number of encoded locks is stable, so copy to pagelist
4419                  */
4420                 struct_len = 2 * sizeof(u32) +
4421                             (num_fcntl_locks + num_flock_locks) *
4422                             sizeof(struct ceph_filelock);
4423                 rec.v2.flock_len = cpu_to_le32(struct_len);
4424
4425                 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
4426
4427                 if (struct_v >= 2)
4428                         struct_len += sizeof(u64); /* snap_follows */
4429
4430                 total_len += struct_len;
4431
4432                 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4433                         err = send_reconnect_partial(recon_state);
4434                         if (err)
4435                                 goto out_freeflocks;
4436                         pagelist = recon_state->pagelist;
4437                 }
4438
4439                 err = ceph_pagelist_reserve(pagelist, total_len);
4440                 if (err)
4441                         goto out_freeflocks;
4442
4443                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4444                 if (recon_state->msg_version >= 3) {
4445                         ceph_pagelist_encode_8(pagelist, struct_v);
4446                         ceph_pagelist_encode_8(pagelist, 1);
4447                         ceph_pagelist_encode_32(pagelist, struct_len);
4448                 }
4449                 ceph_pagelist_encode_string(pagelist, path, pathlen);
4450                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4451                 ceph_locks_to_pagelist(flocks, pagelist,
4452                                        num_fcntl_locks, num_flock_locks);
4453                 if (struct_v >= 2)
4454                         ceph_pagelist_encode_64(pagelist, snap_follows);
4455 out_freeflocks:
4456                 kfree(flocks);
4457         } else {
4458                 err = ceph_pagelist_reserve(pagelist,
4459                                             sizeof(u64) + sizeof(u32) +
4460                                             pathlen + sizeof(rec.v1));
4461                 if (err)
4462                         goto out_err;
4463
4464                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4465                 ceph_pagelist_encode_string(pagelist, path, pathlen);
4466                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4467         }
4468
4469 out_err:
4470         ceph_mdsc_free_path(path, pathlen);
4471         if (!err)
4472                 recon_state->nr_caps++;
4473         return err;
4474 }
4475
4476 static int encode_snap_realms(struct ceph_mds_client *mdsc,
4477                               struct ceph_reconnect_state *recon_state)
4478 {
4479         struct rb_node *p;
4480         struct ceph_pagelist *pagelist = recon_state->pagelist;
4481         int err = 0;
4482
4483         if (recon_state->msg_version >= 4) {
4484                 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4485                 if (err < 0)
4486                         goto fail;
4487         }
4488
4489         /*
4490          * snaprealms.  we provide mds with the ino, seq (version), and
4491          * parent for all of our realms.  If the mds has any newer info,
4492          * it will tell us.
4493          */
4494         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4495                 struct ceph_snap_realm *realm =
4496                        rb_entry(p, struct ceph_snap_realm, node);
4497                 struct ceph_mds_snaprealm_reconnect sr_rec;
4498
4499                 if (recon_state->msg_version >= 4) {
4500                         size_t need = sizeof(u8) * 2 + sizeof(u32) +
4501                                       sizeof(sr_rec);
4502
4503                         if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4504                                 err = send_reconnect_partial(recon_state);
4505                                 if (err)
4506                                         goto fail;
4507                                 pagelist = recon_state->pagelist;
4508                         }
4509
4510                         err = ceph_pagelist_reserve(pagelist, need);
4511                         if (err)
4512                                 goto fail;
4513
4514                         ceph_pagelist_encode_8(pagelist, 1);
4515                         ceph_pagelist_encode_8(pagelist, 1);
4516                         ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4517                 }
4518
4519                 dout(" adding snap realm %llx seq %lld parent %llx\n",
4520                      realm->ino, realm->seq, realm->parent_ino);
4521                 sr_rec.ino = cpu_to_le64(realm->ino);
4522                 sr_rec.seq = cpu_to_le64(realm->seq);
4523                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
4524
4525                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4526                 if (err)
4527                         goto fail;
4528
4529                 recon_state->nr_realms++;
4530         }
4531 fail:
4532         return err;
4533 }
4534
4535
4536 /*
4537  * If an MDS fails and recovers, clients need to reconnect in order to
4538  * reestablish shared state.  This includes all caps issued through
4539  * this session _and_ the snap_realm hierarchy.  Because it's not
4540  * clear which snap realms the mds cares about, we send everything we
4541  * know about.. that ensures we'll then get any new info the
4542  * recovering MDS might have.
4543  *
4544  * This is a relatively heavyweight operation, but it's rare.
4545  */
4546 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4547                                struct ceph_mds_session *session)
4548 {
4549         struct ceph_msg *reply;
4550         int mds = session->s_mds;
4551         int err = -ENOMEM;
4552         struct ceph_reconnect_state recon_state = {
4553                 .session = session,
4554         };
4555         LIST_HEAD(dispose);
4556
4557         pr_info("mds%d reconnect start\n", mds);
4558
4559         recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4560         if (!recon_state.pagelist)
4561                 goto fail_nopagelist;
4562
4563         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4564         if (!reply)
4565                 goto fail_nomsg;
4566
4567         xa_destroy(&session->s_delegated_inos);
4568
4569         mutex_lock(&session->s_mutex);
4570         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4571         session->s_seq = 0;
4572
4573         dout("session %p state %s\n", session,
4574              ceph_session_state_name(session->s_state));
4575
4576         atomic_inc(&session->s_cap_gen);
4577
4578         spin_lock(&session->s_cap_lock);
4579         /* don't know if session is readonly */
4580         session->s_readonly = 0;
4581         /*
4582          * notify __ceph_remove_cap() that we are composing cap reconnect.
4583          * If a cap get released before being added to the cap reconnect,
4584          * __ceph_remove_cap() should skip queuing cap release.
4585          */
4586         session->s_cap_reconnect = 1;
4587         /* drop old cap expires; we're about to reestablish that state */
4588         detach_cap_releases(session, &dispose);
4589         spin_unlock(&session->s_cap_lock);
4590         dispose_cap_releases(mdsc, &dispose);
4591
4592         /* trim unused caps to reduce MDS's cache rejoin time */
4593         if (mdsc->fsc->sb->s_root)
4594                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
4595
4596         ceph_con_close(&session->s_con);
4597         ceph_con_open(&session->s_con,
4598                       CEPH_ENTITY_TYPE_MDS, mds,
4599                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4600
4601         /* replay unsafe requests */
4602         replay_unsafe_requests(mdsc, session);
4603
4604         ceph_early_kick_flushing_caps(mdsc, session);
4605
4606         down_read(&mdsc->snap_rwsem);
4607
4608         /* placeholder for nr_caps */
4609         err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4610         if (err)
4611                 goto fail;
4612
4613         if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4614                 recon_state.msg_version = 3;
4615                 recon_state.allow_multi = true;
4616         } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4617                 recon_state.msg_version = 3;
4618         } else {
4619                 recon_state.msg_version = 2;
4620         }
4621         /* trsaverse this session's caps */
4622         err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4623
4624         spin_lock(&session->s_cap_lock);
4625         session->s_cap_reconnect = 0;
4626         spin_unlock(&session->s_cap_lock);
4627
4628         if (err < 0)
4629                 goto fail;
4630
4631         /* check if all realms can be encoded into current message */
4632         if (mdsc->num_snap_realms) {
4633                 size_t total_len =
4634                         recon_state.pagelist->length +
4635                         mdsc->num_snap_realms *
4636                         sizeof(struct ceph_mds_snaprealm_reconnect);
4637                 if (recon_state.msg_version >= 4) {
4638                         /* number of realms */
4639                         total_len += sizeof(u32);
4640                         /* version, compat_version and struct_len */
4641                         total_len += mdsc->num_snap_realms *
4642                                      (2 * sizeof(u8) + sizeof(u32));
4643                 }
4644                 if (total_len > RECONNECT_MAX_SIZE) {
4645                         if (!recon_state.allow_multi) {
4646                                 err = -ENOSPC;
4647                                 goto fail;
4648                         }
4649                         if (recon_state.nr_caps) {
4650                                 err = send_reconnect_partial(&recon_state);
4651                                 if (err)
4652                                         goto fail;
4653                         }
4654                         recon_state.msg_version = 5;
4655                 }
4656         }
4657
4658         err = encode_snap_realms(mdsc, &recon_state);
4659         if (err < 0)
4660                 goto fail;
4661
4662         if (recon_state.msg_version >= 5) {
4663                 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4664                 if (err < 0)
4665                         goto fail;
4666         }
4667
4668         if (recon_state.nr_caps || recon_state.nr_realms) {
4669                 struct page *page =
4670                         list_first_entry(&recon_state.pagelist->head,
4671                                         struct page, lru);
4672                 __le32 *addr = kmap_atomic(page);
4673                 if (recon_state.nr_caps) {
4674                         WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4675                         *addr = cpu_to_le32(recon_state.nr_caps);
4676                 } else if (recon_state.msg_version >= 4) {
4677                         *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4678                 }
4679                 kunmap_atomic(addr);
4680         }
4681
4682         reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4683         if (recon_state.msg_version >= 4)
4684                 reply->hdr.compat_version = cpu_to_le16(4);
4685
4686         reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4687         ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4688
4689         ceph_con_send(&session->s_con, reply);
4690
4691         mutex_unlock(&session->s_mutex);
4692
4693         mutex_lock(&mdsc->mutex);
4694         __wake_requests(mdsc, &session->s_waiting);
4695         mutex_unlock(&mdsc->mutex);
4696
4697         up_read(&mdsc->snap_rwsem);
4698         ceph_pagelist_release(recon_state.pagelist);
4699         return;
4700
4701 fail:
4702         ceph_msg_put(reply);
4703         up_read(&mdsc->snap_rwsem);
4704         mutex_unlock(&session->s_mutex);
4705 fail_nomsg:
4706         ceph_pagelist_release(recon_state.pagelist);
4707 fail_nopagelist:
4708         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4709         return;
4710 }
4711
4712
4713 /*
4714  * compare old and new mdsmaps, kicking requests
4715  * and closing out old connections as necessary
4716  *
4717  * called under mdsc->mutex.
4718  */
4719 static void check_new_map(struct ceph_mds_client *mdsc,
4720                           struct ceph_mdsmap *newmap,
4721                           struct ceph_mdsmap *oldmap)
4722 {
4723         int i, j, err;
4724         int oldstate, newstate;
4725         struct ceph_mds_session *s;
4726         unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
4727
4728         dout("check_new_map new %u old %u\n",
4729              newmap->m_epoch, oldmap->m_epoch);
4730
4731         if (newmap->m_info) {
4732                 for (i = 0; i < newmap->possible_max_rank; i++) {
4733                         for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
4734                                 set_bit(newmap->m_info[i].export_targets[j], targets);
4735                 }
4736         }
4737
4738         for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4739                 if (!mdsc->sessions[i])
4740                         continue;
4741                 s = mdsc->sessions[i];
4742                 oldstate = ceph_mdsmap_get_state(oldmap, i);
4743                 newstate = ceph_mdsmap_get_state(newmap, i);
4744
4745                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4746                      i, ceph_mds_state_name(oldstate),
4747                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4748                      ceph_mds_state_name(newstate),
4749                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4750                      ceph_session_state_name(s->s_state));
4751
4752                 if (i >= newmap->possible_max_rank) {
4753                         /* force close session for stopped mds */
4754                         ceph_get_mds_session(s);
4755                         __unregister_session(mdsc, s);
4756                         __wake_requests(mdsc, &s->s_waiting);
4757                         mutex_unlock(&mdsc->mutex);
4758
4759                         mutex_lock(&s->s_mutex);
4760                         cleanup_session_requests(mdsc, s);
4761                         remove_session_caps(s);
4762                         mutex_unlock(&s->s_mutex);
4763
4764                         ceph_put_mds_session(s);
4765
4766                         mutex_lock(&mdsc->mutex);
4767                         kick_requests(mdsc, i);
4768                         continue;
4769                 }
4770
4771                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4772                            ceph_mdsmap_get_addr(newmap, i),
4773                            sizeof(struct ceph_entity_addr))) {
4774                         /* just close it */
4775                         mutex_unlock(&mdsc->mutex);
4776                         mutex_lock(&s->s_mutex);
4777                         mutex_lock(&mdsc->mutex);
4778                         ceph_con_close(&s->s_con);
4779                         mutex_unlock(&s->s_mutex);
4780                         s->s_state = CEPH_MDS_SESSION_RESTARTING;
4781                 } else if (oldstate == newstate) {
4782                         continue;  /* nothing new with this mds */
4783                 }
4784
4785                 /*
4786                  * send reconnect?
4787                  */
4788                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4789                     newstate >= CEPH_MDS_STATE_RECONNECT) {
4790                         mutex_unlock(&mdsc->mutex);
4791                         clear_bit(i, targets);
4792                         send_mds_reconnect(mdsc, s);
4793                         mutex_lock(&mdsc->mutex);
4794                 }
4795
4796                 /*
4797                  * kick request on any mds that has gone active.
4798                  */
4799                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4800                     newstate >= CEPH_MDS_STATE_ACTIVE) {
4801                         if (oldstate != CEPH_MDS_STATE_CREATING &&
4802                             oldstate != CEPH_MDS_STATE_STARTING)
4803                                 pr_info("mds%d recovery completed\n", s->s_mds);
4804                         kick_requests(mdsc, i);
4805                         mutex_unlock(&mdsc->mutex);
4806                         mutex_lock(&s->s_mutex);
4807                         mutex_lock(&mdsc->mutex);
4808                         ceph_kick_flushing_caps(mdsc, s);
4809                         mutex_unlock(&s->s_mutex);
4810                         wake_up_session_caps(s, RECONNECT);
4811                 }
4812         }
4813
4814         /*
4815          * Only open and reconnect sessions that don't exist yet.
4816          */
4817         for (i = 0; i < newmap->possible_max_rank; i++) {
4818                 /*
4819                  * In case the import MDS is crashed just after
4820                  * the EImportStart journal is flushed, so when
4821                  * a standby MDS takes over it and is replaying
4822                  * the EImportStart journal the new MDS daemon
4823                  * will wait the client to reconnect it, but the
4824                  * client may never register/open the session yet.
4825                  *
4826                  * Will try to reconnect that MDS daemon if the
4827                  * rank number is in the export targets array and
4828                  * is the up:reconnect state.
4829                  */
4830                 newstate = ceph_mdsmap_get_state(newmap, i);
4831                 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
4832                         continue;
4833
4834                 /*
4835                  * The session maybe registered and opened by some
4836                  * requests which were choosing random MDSes during
4837                  * the mdsc->mutex's unlock/lock gap below in rare
4838                  * case. But the related MDS daemon will just queue
4839                  * that requests and be still waiting for the client's
4840                  * reconnection request in up:reconnect state.
4841                  */
4842                 s = __ceph_lookup_mds_session(mdsc, i);
4843                 if (likely(!s)) {
4844                         s = __open_export_target_session(mdsc, i);
4845                         if (IS_ERR(s)) {
4846                                 err = PTR_ERR(s);
4847                                 pr_err("failed to open export target session, err %d\n",
4848                                        err);
4849                                 continue;
4850                         }
4851                 }
4852                 dout("send reconnect to export target mds.%d\n", i);
4853                 mutex_unlock(&mdsc->mutex);
4854                 send_mds_reconnect(mdsc, s);
4855                 ceph_put_mds_session(s);
4856                 mutex_lock(&mdsc->mutex);
4857         }
4858
4859         for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4860                 s = mdsc->sessions[i];
4861                 if (!s)
4862                         continue;
4863                 if (!ceph_mdsmap_is_laggy(newmap, i))
4864                         continue;
4865                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4866                     s->s_state == CEPH_MDS_SESSION_HUNG ||
4867                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
4868                         dout(" connecting to export targets of laggy mds%d\n",
4869                              i);
4870                         __open_export_target_sessions(mdsc, s);
4871                 }
4872         }
4873 }
4874
4875
4876
4877 /*
4878  * leases
4879  */
4880
4881 /*
4882  * caller must hold session s_mutex, dentry->d_lock
4883  */
4884 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4885 {
4886         struct ceph_dentry_info *di = ceph_dentry(dentry);
4887
4888         ceph_put_mds_session(di->lease_session);
4889         di->lease_session = NULL;
4890 }
4891
4892 static void handle_lease(struct ceph_mds_client *mdsc,
4893                          struct ceph_mds_session *session,
4894                          struct ceph_msg *msg)
4895 {
4896         struct super_block *sb = mdsc->fsc->sb;
4897         struct inode *inode;
4898         struct dentry *parent, *dentry;
4899         struct ceph_dentry_info *di;
4900         int mds = session->s_mds;
4901         struct ceph_mds_lease *h = msg->front.iov_base;
4902         u32 seq;
4903         struct ceph_vino vino;
4904         struct qstr dname;
4905         int release = 0;
4906
4907         dout("handle_lease from mds%d\n", mds);
4908
4909         if (!ceph_inc_mds_stopping_blocker(mdsc, session))
4910                 return;
4911
4912         /* decode */
4913         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4914                 goto bad;
4915         vino.ino = le64_to_cpu(h->ino);
4916         vino.snap = CEPH_NOSNAP;
4917         seq = le32_to_cpu(h->seq);
4918         dname.len = get_unaligned_le32(h + 1);
4919         if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4920                 goto bad;
4921         dname.name = (void *)(h + 1) + sizeof(u32);
4922
4923         /* lookup inode */
4924         inode = ceph_find_inode(sb, vino);
4925         dout("handle_lease %s, ino %llx %p %.*s\n",
4926              ceph_lease_op_name(h->action), vino.ino, inode,
4927              dname.len, dname.name);
4928
4929         mutex_lock(&session->s_mutex);
4930         if (!inode) {
4931                 dout("handle_lease no inode %llx\n", vino.ino);
4932                 goto release;
4933         }
4934
4935         /* dentry */
4936         parent = d_find_alias(inode);
4937         if (!parent) {
4938                 dout("no parent dentry on inode %p\n", inode);
4939                 WARN_ON(1);
4940                 goto release;  /* hrm... */
4941         }
4942         dname.hash = full_name_hash(parent, dname.name, dname.len);
4943         dentry = d_lookup(parent, &dname);
4944         dput(parent);
4945         if (!dentry)
4946                 goto release;
4947
4948         spin_lock(&dentry->d_lock);
4949         di = ceph_dentry(dentry);
4950         switch (h->action) {
4951         case CEPH_MDS_LEASE_REVOKE:
4952                 if (di->lease_session == session) {
4953                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4954                                 h->seq = cpu_to_le32(di->lease_seq);
4955                         __ceph_mdsc_drop_dentry_lease(dentry);
4956                 }
4957                 release = 1;
4958                 break;
4959
4960         case CEPH_MDS_LEASE_RENEW:
4961                 if (di->lease_session == session &&
4962                     di->lease_gen == atomic_read(&session->s_cap_gen) &&
4963                     di->lease_renew_from &&
4964                     di->lease_renew_after == 0) {
4965                         unsigned long duration =
4966                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4967
4968                         di->lease_seq = seq;
4969                         di->time = di->lease_renew_from + duration;
4970                         di->lease_renew_after = di->lease_renew_from +
4971                                 (duration >> 1);
4972                         di->lease_renew_from = 0;
4973                 }
4974                 break;
4975         }
4976         spin_unlock(&dentry->d_lock);
4977         dput(dentry);
4978
4979         if (!release)
4980                 goto out;
4981
4982 release:
4983         /* let's just reuse the same message */
4984         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4985         ceph_msg_get(msg);
4986         ceph_con_send(&session->s_con, msg);
4987
4988 out:
4989         mutex_unlock(&session->s_mutex);
4990         iput(inode);
4991
4992         ceph_dec_mds_stopping_blocker(mdsc);
4993         return;
4994
4995 bad:
4996         ceph_dec_mds_stopping_blocker(mdsc);
4997
4998         pr_err("corrupt lease message\n");
4999         ceph_msg_dump(msg);
5000 }
5001
5002 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
5003                               struct dentry *dentry, char action,
5004                               u32 seq)
5005 {
5006         struct ceph_msg *msg;
5007         struct ceph_mds_lease *lease;
5008         struct inode *dir;
5009         int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
5010
5011         dout("lease_send_msg identry %p %s to mds%d\n",
5012              dentry, ceph_lease_op_name(action), session->s_mds);
5013
5014         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
5015         if (!msg)
5016                 return;
5017         lease = msg->front.iov_base;
5018         lease->action = action;
5019         lease->seq = cpu_to_le32(seq);
5020
5021         spin_lock(&dentry->d_lock);
5022         dir = d_inode(dentry->d_parent);
5023         lease->ino = cpu_to_le64(ceph_ino(dir));
5024         lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5025
5026         put_unaligned_le32(dentry->d_name.len, lease + 1);
5027         memcpy((void *)(lease + 1) + 4,
5028                dentry->d_name.name, dentry->d_name.len);
5029         spin_unlock(&dentry->d_lock);
5030
5031         ceph_con_send(&session->s_con, msg);
5032 }
5033
5034 /*
5035  * lock unlock the session, to wait ongoing session activities
5036  */
5037 static void lock_unlock_session(struct ceph_mds_session *s)
5038 {
5039         mutex_lock(&s->s_mutex);
5040         mutex_unlock(&s->s_mutex);
5041 }
5042
5043 static void maybe_recover_session(struct ceph_mds_client *mdsc)
5044 {
5045         struct ceph_fs_client *fsc = mdsc->fsc;
5046
5047         if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5048                 return;
5049
5050         if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5051                 return;
5052
5053         if (!READ_ONCE(fsc->blocklisted))
5054                 return;
5055
5056         pr_info("auto reconnect after blocklisted\n");
5057         ceph_force_reconnect(fsc->sb);
5058 }
5059
5060 bool check_session_state(struct ceph_mds_session *s)
5061 {
5062         switch (s->s_state) {
5063         case CEPH_MDS_SESSION_OPEN:
5064                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5065                         s->s_state = CEPH_MDS_SESSION_HUNG;
5066                         pr_info("mds%d hung\n", s->s_mds);
5067                 }
5068                 break;
5069         case CEPH_MDS_SESSION_CLOSING:
5070         case CEPH_MDS_SESSION_NEW:
5071         case CEPH_MDS_SESSION_RESTARTING:
5072         case CEPH_MDS_SESSION_CLOSED:
5073         case CEPH_MDS_SESSION_REJECTED:
5074                 return false;
5075         }
5076
5077         return true;
5078 }
5079
5080 /*
5081  * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5082  * then we need to retransmit that request.
5083  */
5084 void inc_session_sequence(struct ceph_mds_session *s)
5085 {
5086         lockdep_assert_held(&s->s_mutex);
5087
5088         s->s_seq++;
5089
5090         if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5091                 int ret;
5092
5093                 dout("resending session close request for mds%d\n", s->s_mds);
5094                 ret = request_close_session(s);
5095                 if (ret < 0)
5096                         pr_err("unable to close session to mds%d: %d\n",
5097                                s->s_mds, ret);
5098         }
5099 }
5100
5101 /*
5102  * delayed work -- periodically trim expired leases, renew caps with mds.  If
5103  * the @delay parameter is set to 0 or if it's more than 5 secs, the default
5104  * workqueue delay value of 5 secs will be used.
5105  */
5106 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5107 {
5108         unsigned long max_delay = HZ * 5;
5109
5110         /* 5 secs default delay */
5111         if (!delay || (delay > max_delay))
5112                 delay = max_delay;
5113         schedule_delayed_work(&mdsc->delayed_work,
5114                               round_jiffies_relative(delay));
5115 }
5116
5117 static void delayed_work(struct work_struct *work)
5118 {
5119         struct ceph_mds_client *mdsc =
5120                 container_of(work, struct ceph_mds_client, delayed_work.work);
5121         unsigned long delay;
5122         int renew_interval;
5123         int renew_caps;
5124         int i;
5125
5126         dout("mdsc delayed_work\n");
5127
5128         if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5129                 return;
5130
5131         mutex_lock(&mdsc->mutex);
5132         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5133         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5134                                    mdsc->last_renew_caps);
5135         if (renew_caps)
5136                 mdsc->last_renew_caps = jiffies;
5137
5138         for (i = 0; i < mdsc->max_sessions; i++) {
5139                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5140                 if (!s)
5141                         continue;
5142
5143                 if (!check_session_state(s)) {
5144                         ceph_put_mds_session(s);
5145                         continue;
5146                 }
5147                 mutex_unlock(&mdsc->mutex);
5148
5149                 mutex_lock(&s->s_mutex);
5150                 if (renew_caps)
5151                         send_renew_caps(mdsc, s);
5152                 else
5153                         ceph_con_keepalive(&s->s_con);
5154                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5155                     s->s_state == CEPH_MDS_SESSION_HUNG)
5156                         ceph_send_cap_releases(mdsc, s);
5157                 mutex_unlock(&s->s_mutex);
5158                 ceph_put_mds_session(s);
5159
5160                 mutex_lock(&mdsc->mutex);
5161         }
5162         mutex_unlock(&mdsc->mutex);
5163
5164         delay = ceph_check_delayed_caps(mdsc);
5165
5166         ceph_queue_cap_reclaim_work(mdsc);
5167
5168         ceph_trim_snapid_map(mdsc);
5169
5170         maybe_recover_session(mdsc);
5171
5172         schedule_delayed(mdsc, delay);
5173 }
5174
5175 int ceph_mdsc_init(struct ceph_fs_client *fsc)
5176
5177 {
5178         struct ceph_mds_client *mdsc;
5179         int err;
5180
5181         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
5182         if (!mdsc)
5183                 return -ENOMEM;
5184         mdsc->fsc = fsc;
5185         mutex_init(&mdsc->mutex);
5186         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
5187         if (!mdsc->mdsmap) {
5188                 err = -ENOMEM;
5189                 goto err_mdsc;
5190         }
5191
5192         init_completion(&mdsc->safe_umount_waiters);
5193         spin_lock_init(&mdsc->stopping_lock);
5194         atomic_set(&mdsc->stopping_blockers, 0);
5195         init_completion(&mdsc->stopping_waiter);
5196         init_waitqueue_head(&mdsc->session_close_wq);
5197         INIT_LIST_HEAD(&mdsc->waiting_for_map);
5198         mdsc->quotarealms_inodes = RB_ROOT;
5199         mutex_init(&mdsc->quotarealms_inodes_mutex);
5200         init_rwsem(&mdsc->snap_rwsem);
5201         mdsc->snap_realms = RB_ROOT;
5202         INIT_LIST_HEAD(&mdsc->snap_empty);
5203         spin_lock_init(&mdsc->snap_empty_lock);
5204         mdsc->request_tree = RB_ROOT;
5205         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5206         mdsc->last_renew_caps = jiffies;
5207         INIT_LIST_HEAD(&mdsc->cap_delay_list);
5208         INIT_LIST_HEAD(&mdsc->cap_wait_list);
5209         spin_lock_init(&mdsc->cap_delay_lock);
5210         INIT_LIST_HEAD(&mdsc->snap_flush_list);
5211         spin_lock_init(&mdsc->snap_flush_lock);
5212         mdsc->last_cap_flush_tid = 1;
5213         INIT_LIST_HEAD(&mdsc->cap_flush_list);
5214         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5215         spin_lock_init(&mdsc->cap_dirty_lock);
5216         init_waitqueue_head(&mdsc->cap_flushing_wq);
5217         INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5218         err = ceph_metric_init(&mdsc->metric);
5219         if (err)
5220                 goto err_mdsmap;
5221
5222         spin_lock_init(&mdsc->dentry_list_lock);
5223         INIT_LIST_HEAD(&mdsc->dentry_leases);
5224         INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5225
5226         ceph_caps_init(mdsc);
5227         ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5228
5229         spin_lock_init(&mdsc->snapid_map_lock);
5230         mdsc->snapid_map_tree = RB_ROOT;
5231         INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5232
5233         init_rwsem(&mdsc->pool_perm_rwsem);
5234         mdsc->pool_perm_tree = RB_ROOT;
5235
5236         strscpy(mdsc->nodename, utsname()->nodename,
5237                 sizeof(mdsc->nodename));
5238
5239         fsc->mdsc = mdsc;
5240         return 0;
5241
5242 err_mdsmap:
5243         kfree(mdsc->mdsmap);
5244 err_mdsc:
5245         kfree(mdsc);
5246         return err;
5247 }
5248
5249 /*
5250  * Wait for safe replies on open mds requests.  If we time out, drop
5251  * all requests from the tree to avoid dangling dentry refs.
5252  */
5253 static void wait_requests(struct ceph_mds_client *mdsc)
5254 {
5255         struct ceph_options *opts = mdsc->fsc->client->options;
5256         struct ceph_mds_request *req;
5257
5258         mutex_lock(&mdsc->mutex);
5259         if (__get_oldest_req(mdsc)) {
5260                 mutex_unlock(&mdsc->mutex);
5261
5262                 dout("wait_requests waiting for requests\n");
5263                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5264                                     ceph_timeout_jiffies(opts->mount_timeout));
5265
5266                 /* tear down remaining requests */
5267                 mutex_lock(&mdsc->mutex);
5268                 while ((req = __get_oldest_req(mdsc))) {
5269                         dout("wait_requests timed out on tid %llu\n",
5270                              req->r_tid);
5271                         list_del_init(&req->r_wait);
5272                         __unregister_request(mdsc, req);
5273                 }
5274         }
5275         mutex_unlock(&mdsc->mutex);
5276         dout("wait_requests done\n");
5277 }
5278
5279 void send_flush_mdlog(struct ceph_mds_session *s)
5280 {
5281         struct ceph_msg *msg;
5282
5283         /*
5284          * Pre-luminous MDS crashes when it sees an unknown session request
5285          */
5286         if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5287                 return;
5288
5289         mutex_lock(&s->s_mutex);
5290         dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
5291              ceph_session_state_name(s->s_state), s->s_seq);
5292         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5293                                       s->s_seq);
5294         if (!msg) {
5295                 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
5296                        s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5297         } else {
5298                 ceph_con_send(&s->s_con, msg);
5299         }
5300         mutex_unlock(&s->s_mutex);
5301 }
5302
5303 /*
5304  * called before mount is ro, and before dentries are torn down.
5305  * (hmm, does this still race with new lookups?)
5306  */
5307 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5308 {
5309         dout("pre_umount\n");
5310         mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5311
5312         ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5313         ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5314         ceph_flush_dirty_caps(mdsc);
5315         wait_requests(mdsc);
5316
5317         /*
5318          * wait for reply handlers to drop their request refs and
5319          * their inode/dcache refs
5320          */
5321         ceph_msgr_flush();
5322
5323         ceph_cleanup_quotarealms_inodes(mdsc);
5324 }
5325
5326 /*
5327  * flush the mdlog and wait for all write mds requests to flush.
5328  */
5329 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5330                                                  u64 want_tid)
5331 {
5332         struct ceph_mds_request *req = NULL, *nextreq;
5333         struct ceph_mds_session *last_session = NULL;
5334         struct rb_node *n;
5335
5336         mutex_lock(&mdsc->mutex);
5337         dout("%s want %lld\n", __func__, want_tid);
5338 restart:
5339         req = __get_oldest_req(mdsc);
5340         while (req && req->r_tid <= want_tid) {
5341                 /* find next request */
5342                 n = rb_next(&req->r_node);
5343                 if (n)
5344                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5345                 else
5346                         nextreq = NULL;
5347                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5348                     (req->r_op & CEPH_MDS_OP_WRITE)) {
5349                         struct ceph_mds_session *s = req->r_session;
5350
5351                         if (!s) {
5352                                 req = nextreq;
5353                                 continue;
5354                         }
5355
5356                         /* write op */
5357                         ceph_mdsc_get_request(req);
5358                         if (nextreq)
5359                                 ceph_mdsc_get_request(nextreq);
5360                         s = ceph_get_mds_session(s);
5361                         mutex_unlock(&mdsc->mutex);
5362
5363                         /* send flush mdlog request to MDS */
5364                         if (last_session != s) {
5365                                 send_flush_mdlog(s);
5366                                 ceph_put_mds_session(last_session);
5367                                 last_session = s;
5368                         } else {
5369                                 ceph_put_mds_session(s);
5370                         }
5371                         dout("%s wait on %llu (want %llu)\n", __func__,
5372                              req->r_tid, want_tid);
5373                         wait_for_completion(&req->r_safe_completion);
5374
5375                         mutex_lock(&mdsc->mutex);
5376                         ceph_mdsc_put_request(req);
5377                         if (!nextreq)
5378                                 break;  /* next dne before, so we're done! */
5379                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
5380                                 /* next request was removed from tree */
5381                                 ceph_mdsc_put_request(nextreq);
5382                                 goto restart;
5383                         }
5384                         ceph_mdsc_put_request(nextreq);  /* won't go away */
5385                 }
5386                 req = nextreq;
5387         }
5388         mutex_unlock(&mdsc->mutex);
5389         ceph_put_mds_session(last_session);
5390         dout("%s done\n", __func__);
5391 }
5392
5393 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5394 {
5395         u64 want_tid, want_flush;
5396
5397         if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5398                 return;
5399
5400         dout("sync\n");
5401         mutex_lock(&mdsc->mutex);
5402         want_tid = mdsc->last_tid;
5403         mutex_unlock(&mdsc->mutex);
5404
5405         ceph_flush_dirty_caps(mdsc);
5406         spin_lock(&mdsc->cap_dirty_lock);
5407         want_flush = mdsc->last_cap_flush_tid;
5408         if (!list_empty(&mdsc->cap_flush_list)) {
5409                 struct ceph_cap_flush *cf =
5410                         list_last_entry(&mdsc->cap_flush_list,
5411                                         struct ceph_cap_flush, g_list);
5412                 cf->wake = true;
5413         }
5414         spin_unlock(&mdsc->cap_dirty_lock);
5415
5416         dout("sync want tid %lld flush_seq %lld\n",
5417              want_tid, want_flush);
5418
5419         flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5420         wait_caps_flush(mdsc, want_flush);
5421 }
5422
5423 /*
5424  * true if all sessions are closed, or we force unmount
5425  */
5426 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5427 {
5428         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5429                 return true;
5430         return atomic_read(&mdsc->num_sessions) <= skipped;
5431 }
5432
5433 /*
5434  * called after sb is ro or when metadata corrupted.
5435  */
5436 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5437 {
5438         struct ceph_options *opts = mdsc->fsc->client->options;
5439         struct ceph_mds_session *session;
5440         int i;
5441         int skipped = 0;
5442
5443         dout("close_sessions\n");
5444
5445         /* close sessions */
5446         mutex_lock(&mdsc->mutex);
5447         for (i = 0; i < mdsc->max_sessions; i++) {
5448                 session = __ceph_lookup_mds_session(mdsc, i);
5449                 if (!session)
5450                         continue;
5451                 mutex_unlock(&mdsc->mutex);
5452                 mutex_lock(&session->s_mutex);
5453                 if (__close_session(mdsc, session) <= 0)
5454                         skipped++;
5455                 mutex_unlock(&session->s_mutex);
5456                 ceph_put_mds_session(session);
5457                 mutex_lock(&mdsc->mutex);
5458         }
5459         mutex_unlock(&mdsc->mutex);
5460
5461         dout("waiting for sessions to close\n");
5462         wait_event_timeout(mdsc->session_close_wq,
5463                            done_closing_sessions(mdsc, skipped),
5464                            ceph_timeout_jiffies(opts->mount_timeout));
5465
5466         /* tear down remaining sessions */
5467         mutex_lock(&mdsc->mutex);
5468         for (i = 0; i < mdsc->max_sessions; i++) {
5469                 if (mdsc->sessions[i]) {
5470                         session = ceph_get_mds_session(mdsc->sessions[i]);
5471                         __unregister_session(mdsc, session);
5472                         mutex_unlock(&mdsc->mutex);
5473                         mutex_lock(&session->s_mutex);
5474                         remove_session_caps(session);
5475                         mutex_unlock(&session->s_mutex);
5476                         ceph_put_mds_session(session);
5477                         mutex_lock(&mdsc->mutex);
5478                 }
5479         }
5480         WARN_ON(!list_empty(&mdsc->cap_delay_list));
5481         mutex_unlock(&mdsc->mutex);
5482
5483         ceph_cleanup_snapid_map(mdsc);
5484         ceph_cleanup_global_and_empty_realms(mdsc);
5485
5486         cancel_work_sync(&mdsc->cap_reclaim_work);
5487         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
5488
5489         dout("stopped\n");
5490 }
5491
5492 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
5493 {
5494         struct ceph_mds_session *session;
5495         int mds;
5496
5497         dout("force umount\n");
5498
5499         mutex_lock(&mdsc->mutex);
5500         for (mds = 0; mds < mdsc->max_sessions; mds++) {
5501                 session = __ceph_lookup_mds_session(mdsc, mds);
5502                 if (!session)
5503                         continue;
5504
5505                 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
5506                         __unregister_session(mdsc, session);
5507                 __wake_requests(mdsc, &session->s_waiting);
5508                 mutex_unlock(&mdsc->mutex);
5509
5510                 mutex_lock(&session->s_mutex);
5511                 __close_session(mdsc, session);
5512                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
5513                         cleanup_session_requests(mdsc, session);
5514                         remove_session_caps(session);
5515                 }
5516                 mutex_unlock(&session->s_mutex);
5517                 ceph_put_mds_session(session);
5518
5519                 mutex_lock(&mdsc->mutex);
5520                 kick_requests(mdsc, mds);
5521         }
5522         __wake_requests(mdsc, &mdsc->waiting_for_map);
5523         mutex_unlock(&mdsc->mutex);
5524 }
5525
5526 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
5527 {
5528         dout("stop\n");
5529         /*
5530          * Make sure the delayed work stopped before releasing
5531          * the resources.
5532          *
5533          * Because the cancel_delayed_work_sync() will only
5534          * guarantee that the work finishes executing. But the
5535          * delayed work will re-arm itself again after that.
5536          */
5537         flush_delayed_work(&mdsc->delayed_work);
5538
5539         if (mdsc->mdsmap)
5540                 ceph_mdsmap_destroy(mdsc->mdsmap);
5541         kfree(mdsc->sessions);
5542         ceph_caps_finalize(mdsc);
5543         ceph_pool_perm_destroy(mdsc);
5544 }
5545
5546 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
5547 {
5548         struct ceph_mds_client *mdsc = fsc->mdsc;
5549         dout("mdsc_destroy %p\n", mdsc);
5550
5551         if (!mdsc)
5552                 return;
5553
5554         /* flush out any connection work with references to us */
5555         ceph_msgr_flush();
5556
5557         ceph_mdsc_stop(mdsc);
5558
5559         ceph_metric_destroy(&mdsc->metric);
5560
5561         fsc->mdsc = NULL;
5562         kfree(mdsc);
5563         dout("mdsc_destroy %p done\n", mdsc);
5564 }
5565
5566 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5567 {
5568         struct ceph_fs_client *fsc = mdsc->fsc;
5569         const char *mds_namespace = fsc->mount_options->mds_namespace;
5570         void *p = msg->front.iov_base;
5571         void *end = p + msg->front.iov_len;
5572         u32 epoch;
5573         u32 num_fs;
5574         u32 mount_fscid = (u32)-1;
5575         int err = -EINVAL;
5576
5577         ceph_decode_need(&p, end, sizeof(u32), bad);
5578         epoch = ceph_decode_32(&p);
5579
5580         dout("handle_fsmap epoch %u\n", epoch);
5581
5582         /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
5583         ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
5584
5585         ceph_decode_32_safe(&p, end, num_fs, bad);
5586         while (num_fs-- > 0) {
5587                 void *info_p, *info_end;
5588                 u32 info_len;
5589                 u32 fscid, namelen;
5590
5591                 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
5592                 p += 2;         // info_v, info_cv
5593                 info_len = ceph_decode_32(&p);
5594                 ceph_decode_need(&p, end, info_len, bad);
5595                 info_p = p;
5596                 info_end = p + info_len;
5597                 p = info_end;
5598
5599                 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
5600                 fscid = ceph_decode_32(&info_p);
5601                 namelen = ceph_decode_32(&info_p);
5602                 ceph_decode_need(&info_p, info_end, namelen, bad);
5603
5604                 if (mds_namespace &&
5605                     strlen(mds_namespace) == namelen &&
5606                     !strncmp(mds_namespace, (char *)info_p, namelen)) {
5607                         mount_fscid = fscid;
5608                         break;
5609                 }
5610         }
5611
5612         ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
5613         if (mount_fscid != (u32)-1) {
5614                 fsc->client->monc.fs_cluster_id = mount_fscid;
5615                 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
5616                                    0, true);
5617                 ceph_monc_renew_subs(&fsc->client->monc);
5618         } else {
5619                 err = -ENOENT;
5620                 goto err_out;
5621         }
5622         return;
5623
5624 bad:
5625         pr_err("error decoding fsmap %d. Shutting down mount.\n", err);
5626         ceph_umount_begin(mdsc->fsc->sb);
5627         ceph_msg_dump(msg);
5628 err_out:
5629         mutex_lock(&mdsc->mutex);
5630         mdsc->mdsmap_err = err;
5631         __wake_requests(mdsc, &mdsc->waiting_for_map);
5632         mutex_unlock(&mdsc->mutex);
5633 }
5634
5635 /*
5636  * handle mds map update.
5637  */
5638 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5639 {
5640         u32 epoch;
5641         u32 maplen;
5642         void *p = msg->front.iov_base;
5643         void *end = p + msg->front.iov_len;
5644         struct ceph_mdsmap *newmap, *oldmap;
5645         struct ceph_fsid fsid;
5646         int err = -EINVAL;
5647
5648         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5649         ceph_decode_copy(&p, &fsid, sizeof(fsid));
5650         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5651                 return;
5652         epoch = ceph_decode_32(&p);
5653         maplen = ceph_decode_32(&p);
5654         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5655
5656         /* do we need it? */
5657         mutex_lock(&mdsc->mutex);
5658         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5659                 dout("handle_map epoch %u <= our %u\n",
5660                      epoch, mdsc->mdsmap->m_epoch);
5661                 mutex_unlock(&mdsc->mutex);
5662                 return;
5663         }
5664
5665         newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5666         if (IS_ERR(newmap)) {
5667                 err = PTR_ERR(newmap);
5668                 goto bad_unlock;
5669         }
5670
5671         /* swap into place */
5672         if (mdsc->mdsmap) {
5673                 oldmap = mdsc->mdsmap;
5674                 mdsc->mdsmap = newmap;
5675                 check_new_map(mdsc, newmap, oldmap);
5676                 ceph_mdsmap_destroy(oldmap);
5677         } else {
5678                 mdsc->mdsmap = newmap;  /* first mds map */
5679         }
5680         mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5681                                         MAX_LFS_FILESIZE);
5682
5683         __wake_requests(mdsc, &mdsc->waiting_for_map);
5684         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5685                           mdsc->mdsmap->m_epoch);
5686
5687         mutex_unlock(&mdsc->mutex);
5688         schedule_delayed(mdsc, 0);
5689         return;
5690
5691 bad_unlock:
5692         mutex_unlock(&mdsc->mutex);
5693 bad:
5694         pr_err("error decoding mdsmap %d. Shutting down mount.\n", err);
5695         ceph_umount_begin(mdsc->fsc->sb);
5696         ceph_msg_dump(msg);
5697         return;
5698 }
5699
5700 static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5701 {
5702         struct ceph_mds_session *s = con->private;
5703
5704         if (ceph_get_mds_session(s))
5705                 return con;
5706         return NULL;
5707 }
5708
5709 static void mds_put_con(struct ceph_connection *con)
5710 {
5711         struct ceph_mds_session *s = con->private;
5712
5713         ceph_put_mds_session(s);
5714 }
5715
5716 /*
5717  * if the client is unresponsive for long enough, the mds will kill
5718  * the session entirely.
5719  */
5720 static void mds_peer_reset(struct ceph_connection *con)
5721 {
5722         struct ceph_mds_session *s = con->private;
5723         struct ceph_mds_client *mdsc = s->s_mdsc;
5724
5725         pr_warn("mds%d closed our session\n", s->s_mds);
5726         if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO)
5727                 send_mds_reconnect(mdsc, s);
5728 }
5729
5730 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5731 {
5732         struct ceph_mds_session *s = con->private;
5733         struct ceph_mds_client *mdsc = s->s_mdsc;
5734         int type = le16_to_cpu(msg->hdr.type);
5735
5736         mutex_lock(&mdsc->mutex);
5737         if (__verify_registered_session(mdsc, s) < 0) {
5738                 mutex_unlock(&mdsc->mutex);
5739                 goto out;
5740         }
5741         mutex_unlock(&mdsc->mutex);
5742
5743         switch (type) {
5744         case CEPH_MSG_MDS_MAP:
5745                 ceph_mdsc_handle_mdsmap(mdsc, msg);
5746                 break;
5747         case CEPH_MSG_FS_MAP_USER:
5748                 ceph_mdsc_handle_fsmap(mdsc, msg);
5749                 break;
5750         case CEPH_MSG_CLIENT_SESSION:
5751                 handle_session(s, msg);
5752                 break;
5753         case CEPH_MSG_CLIENT_REPLY:
5754                 handle_reply(s, msg);
5755                 break;
5756         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5757                 handle_forward(mdsc, s, msg);
5758                 break;
5759         case CEPH_MSG_CLIENT_CAPS:
5760                 ceph_handle_caps(s, msg);
5761                 break;
5762         case CEPH_MSG_CLIENT_SNAP:
5763                 ceph_handle_snap(mdsc, s, msg);
5764                 break;
5765         case CEPH_MSG_CLIENT_LEASE:
5766                 handle_lease(mdsc, s, msg);
5767                 break;
5768         case CEPH_MSG_CLIENT_QUOTA:
5769                 ceph_handle_quota(mdsc, s, msg);
5770                 break;
5771
5772         default:
5773                 pr_err("received unknown message type %d %s\n", type,
5774                        ceph_msg_type_name(type));
5775         }
5776 out:
5777         ceph_msg_put(msg);
5778 }
5779
5780 /*
5781  * authentication
5782  */
5783
5784 /*
5785  * Note: returned pointer is the address of a structure that's
5786  * managed separately.  Caller must *not* attempt to free it.
5787  */
5788 static struct ceph_auth_handshake *
5789 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5790 {
5791         struct ceph_mds_session *s = con->private;
5792         struct ceph_mds_client *mdsc = s->s_mdsc;
5793         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5794         struct ceph_auth_handshake *auth = &s->s_auth;
5795         int ret;
5796
5797         ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5798                                          force_new, proto, NULL, NULL);
5799         if (ret)
5800                 return ERR_PTR(ret);
5801
5802         return auth;
5803 }
5804
5805 static int mds_add_authorizer_challenge(struct ceph_connection *con,
5806                                     void *challenge_buf, int challenge_buf_len)
5807 {
5808         struct ceph_mds_session *s = con->private;
5809         struct ceph_mds_client *mdsc = s->s_mdsc;
5810         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5811
5812         return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5813                                             challenge_buf, challenge_buf_len);
5814 }
5815
5816 static int mds_verify_authorizer_reply(struct ceph_connection *con)
5817 {
5818         struct ceph_mds_session *s = con->private;
5819         struct ceph_mds_client *mdsc = s->s_mdsc;
5820         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5821         struct ceph_auth_handshake *auth = &s->s_auth;
5822
5823         return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5824                 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5825                 NULL, NULL, NULL, NULL);
5826 }
5827
5828 static int mds_invalidate_authorizer(struct ceph_connection *con)
5829 {
5830         struct ceph_mds_session *s = con->private;
5831         struct ceph_mds_client *mdsc = s->s_mdsc;
5832         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5833
5834         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5835
5836         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5837 }
5838
5839 static int mds_get_auth_request(struct ceph_connection *con,
5840                                 void *buf, int *buf_len,
5841                                 void **authorizer, int *authorizer_len)
5842 {
5843         struct ceph_mds_session *s = con->private;
5844         struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5845         struct ceph_auth_handshake *auth = &s->s_auth;
5846         int ret;
5847
5848         ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5849                                        buf, buf_len);
5850         if (ret)
5851                 return ret;
5852
5853         *authorizer = auth->authorizer_buf;
5854         *authorizer_len = auth->authorizer_buf_len;
5855         return 0;
5856 }
5857
5858 static int mds_handle_auth_reply_more(struct ceph_connection *con,
5859                                       void *reply, int reply_len,
5860                                       void *buf, int *buf_len,
5861                                       void **authorizer, int *authorizer_len)
5862 {
5863         struct ceph_mds_session *s = con->private;
5864         struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5865         struct ceph_auth_handshake *auth = &s->s_auth;
5866         int ret;
5867
5868         ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5869                                               buf, buf_len);
5870         if (ret)
5871                 return ret;
5872
5873         *authorizer = auth->authorizer_buf;
5874         *authorizer_len = auth->authorizer_buf_len;
5875         return 0;
5876 }
5877
5878 static int mds_handle_auth_done(struct ceph_connection *con,
5879                                 u64 global_id, void *reply, int reply_len,
5880                                 u8 *session_key, int *session_key_len,
5881                                 u8 *con_secret, int *con_secret_len)
5882 {
5883         struct ceph_mds_session *s = con->private;
5884         struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5885         struct ceph_auth_handshake *auth = &s->s_auth;
5886
5887         return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5888                                                session_key, session_key_len,
5889                                                con_secret, con_secret_len);
5890 }
5891
5892 static int mds_handle_auth_bad_method(struct ceph_connection *con,
5893                                       int used_proto, int result,
5894                                       const int *allowed_protos, int proto_cnt,
5895                                       const int *allowed_modes, int mode_cnt)
5896 {
5897         struct ceph_mds_session *s = con->private;
5898         struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5899         int ret;
5900
5901         if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5902                                             used_proto, result,
5903                                             allowed_protos, proto_cnt,
5904                                             allowed_modes, mode_cnt)) {
5905                 ret = ceph_monc_validate_auth(monc);
5906                 if (ret)
5907                         return ret;
5908         }
5909
5910         return -EACCES;
5911 }
5912
5913 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5914                                 struct ceph_msg_header *hdr, int *skip)
5915 {
5916         struct ceph_msg *msg;
5917         int type = (int) le16_to_cpu(hdr->type);
5918         int front_len = (int) le32_to_cpu(hdr->front_len);
5919
5920         if (con->in_msg)
5921                 return con->in_msg;
5922
5923         *skip = 0;
5924         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5925         if (!msg) {
5926                 pr_err("unable to allocate msg type %d len %d\n",
5927                        type, front_len);
5928                 return NULL;
5929         }
5930
5931         return msg;
5932 }
5933
5934 static int mds_sign_message(struct ceph_msg *msg)
5935 {
5936        struct ceph_mds_session *s = msg->con->private;
5937        struct ceph_auth_handshake *auth = &s->s_auth;
5938
5939        return ceph_auth_sign_message(auth, msg);
5940 }
5941
5942 static int mds_check_message_signature(struct ceph_msg *msg)
5943 {
5944        struct ceph_mds_session *s = msg->con->private;
5945        struct ceph_auth_handshake *auth = &s->s_auth;
5946
5947        return ceph_auth_check_message_signature(auth, msg);
5948 }
5949
5950 static const struct ceph_connection_operations mds_con_ops = {
5951         .get = mds_get_con,
5952         .put = mds_put_con,
5953         .alloc_msg = mds_alloc_msg,
5954         .dispatch = mds_dispatch,
5955         .peer_reset = mds_peer_reset,
5956         .get_authorizer = mds_get_authorizer,
5957         .add_authorizer_challenge = mds_add_authorizer_challenge,
5958         .verify_authorizer_reply = mds_verify_authorizer_reply,
5959         .invalidate_authorizer = mds_invalidate_authorizer,
5960         .sign_message = mds_sign_message,
5961         .check_message_signature = mds_check_message_signature,
5962         .get_auth_request = mds_get_auth_request,
5963         .handle_auth_reply_more = mds_handle_auth_reply_more,
5964         .handle_auth_done = mds_handle_auth_done,
5965         .handle_auth_bad_method = mds_handle_auth_bad_method,
5966 };
5967
5968 /* eof */