ceph: search cache postion for dcache readdir
[platform/kernel/linux-starfive.git] / fs / ceph / dir.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/spinlock.h>
4 #include <linux/fs_struct.h>
5 #include <linux/namei.h>
6 #include <linux/slab.h>
7 #include <linux/sched.h>
8
9 #include "super.h"
10 #include "mds_client.h"
11
12 /*
13  * Directory operations: readdir, lookup, create, link, unlink,
14  * rename, etc.
15  */
16
17 /*
18  * Ceph MDS operations are specified in terms of a base ino and
19  * relative path.  Thus, the client can specify an operation on a
20  * specific inode (e.g., a getattr due to fstat(2)), or as a path
21  * relative to, say, the root directory.
22  *
23  * Normally, we limit ourselves to strict inode ops (no path component)
24  * or dentry operations (a single path component relative to an ino).  The
25  * exception to this is open_root_dentry(), which will open the mount
26  * point by name.
27  */
28
29 const struct dentry_operations ceph_dentry_ops;
30
31 /*
32  * Initialize ceph dentry state.
33  */
34 int ceph_init_dentry(struct dentry *dentry)
35 {
36         struct ceph_dentry_info *di;
37
38         if (dentry->d_fsdata)
39                 return 0;
40
41         di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
42         if (!di)
43                 return -ENOMEM;          /* oh well */
44
45         spin_lock(&dentry->d_lock);
46         if (dentry->d_fsdata) {
47                 /* lost a race */
48                 kmem_cache_free(ceph_dentry_cachep, di);
49                 goto out_unlock;
50         }
51
52         if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP)
53                 d_set_d_op(dentry, &ceph_dentry_ops);
54         else if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
55                 d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
56         else
57                 d_set_d_op(dentry, &ceph_snap_dentry_ops);
58
59         di->dentry = dentry;
60         di->lease_session = NULL;
61         dentry->d_time = jiffies;
62         /* avoid reordering d_fsdata setup so that the check above is safe */
63         smp_mb();
64         dentry->d_fsdata = di;
65         ceph_dentry_lru_add(dentry);
66 out_unlock:
67         spin_unlock(&dentry->d_lock);
68         return 0;
69 }
70
71 /*
72  * for readdir, we encode the directory frag and offset within that
73  * frag into f_pos.
74  */
75 static unsigned fpos_frag(loff_t p)
76 {
77         return p >> 32;
78 }
79 static unsigned fpos_off(loff_t p)
80 {
81         return p & 0xffffffff;
82 }
83
84 static int fpos_cmp(loff_t l, loff_t r)
85 {
86         int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
87         if (v)
88                 return v;
89         return (int)(fpos_off(l) - fpos_off(r));
90 }
91
92 /*
93  * make note of the last dentry we read, so we can
94  * continue at the same lexicographical point,
95  * regardless of what dir changes take place on the
96  * server.
97  */
98 static int note_last_dentry(struct ceph_file_info *fi, const char *name,
99                             int len, unsigned next_offset)
100 {
101         char *buf = kmalloc(len+1, GFP_KERNEL);
102         if (!buf)
103                 return -ENOMEM;
104         kfree(fi->last_name);
105         fi->last_name = buf;
106         memcpy(fi->last_name, name, len);
107         fi->last_name[len] = 0;
108         fi->next_offset = next_offset;
109         dout("note_last_dentry '%s'\n", fi->last_name);
110         return 0;
111 }
112
113
114 static struct dentry *
115 __dcache_find_get_entry(struct dentry *parent, u64 idx,
116                         struct ceph_readdir_cache_control *cache_ctl)
117 {
118         struct inode *dir = d_inode(parent);
119         struct dentry *dentry;
120         unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
121         loff_t ptr_pos = idx * sizeof(struct dentry *);
122         pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
123
124         if (ptr_pos >= i_size_read(dir))
125                 return NULL;
126
127         if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
128                 ceph_readdir_cache_release(cache_ctl);
129                 cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
130                 if (!cache_ctl->page) {
131                         dout(" page %lu not found\n", ptr_pgoff);
132                         return ERR_PTR(-EAGAIN);
133                 }
134                 /* reading/filling the cache are serialized by
135                    i_mutex, no need to use page lock */
136                 unlock_page(cache_ctl->page);
137                 cache_ctl->dentries = kmap(cache_ctl->page);
138         }
139
140         cache_ctl->index = idx & idx_mask;
141
142         rcu_read_lock();
143         spin_lock(&parent->d_lock);
144         /* check i_size again here, because empty directory can be
145          * marked as complete while not holding the i_mutex. */
146         if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
147                 dentry = cache_ctl->dentries[cache_ctl->index];
148         else
149                 dentry = NULL;
150         spin_unlock(&parent->d_lock);
151         if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
152                 dentry = NULL;
153         rcu_read_unlock();
154         return dentry ? : ERR_PTR(-EAGAIN);
155 }
156
157 /*
158  * When possible, we try to satisfy a readdir by peeking at the
159  * dcache.  We make this work by carefully ordering dentries on
160  * d_child when we initially get results back from the MDS, and
161  * falling back to a "normal" sync readdir if any dentries in the dir
162  * are dropped.
163  *
164  * Complete dir indicates that we have all dentries in the dir.  It is
165  * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
166  * the MDS if/when the directory is modified).
167  */
168 static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
169                             u32 shared_gen)
170 {
171         struct ceph_file_info *fi = file->private_data;
172         struct dentry *parent = file->f_path.dentry;
173         struct inode *dir = d_inode(parent);
174         struct dentry *dentry, *last = NULL;
175         struct ceph_dentry_info *di;
176         struct ceph_readdir_cache_control cache_ctl = {};
177         u64 idx = 0;
178         int err = 0;
179
180         dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
181
182         /* search start position */
183         if (ctx->pos > 2) {
184                 u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
185                 while (count > 0) {
186                         u64 step = count >> 1;
187                         dentry = __dcache_find_get_entry(parent, idx + step,
188                                                          &cache_ctl);
189                         if (!dentry) {
190                                 /* use linar search */
191                                 idx = 0;
192                                 break;
193                         }
194                         if (IS_ERR(dentry)) {
195                                 err = PTR_ERR(dentry);
196                                 goto out;
197                         }
198                         di = ceph_dentry(dentry);
199                         spin_lock(&dentry->d_lock);
200                         if (fpos_cmp(di->offset, ctx->pos) < 0) {
201                                 idx += step + 1;
202                                 count -= step + 1;
203                         } else {
204                                 count = step;
205                         }
206                         spin_unlock(&dentry->d_lock);
207                         dput(dentry);
208                 }
209
210                 dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
211         }
212
213
214         for (;;) {
215                 bool emit_dentry = false;
216                 dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
217                 if (!dentry) {
218                         fi->flags |= CEPH_F_ATEND;
219                         err = 0;
220                         break;
221                 }
222                 if (IS_ERR(dentry)) {
223                         err = PTR_ERR(dentry);
224                         goto out;
225                 }
226
227                 di = ceph_dentry(dentry);
228                 spin_lock(&dentry->d_lock);
229                 if (di->lease_shared_gen == shared_gen &&
230                     d_really_is_positive(dentry) &&
231                     ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
232                     ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
233                     fpos_cmp(ctx->pos, di->offset) <= 0) {
234                         emit_dentry = true;
235                 }
236                 spin_unlock(&dentry->d_lock);
237
238                 if (emit_dentry) {
239                         dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
240                              dentry, dentry, d_inode(dentry));
241                         ctx->pos = di->offset;
242                         if (!dir_emit(ctx, dentry->d_name.name,
243                                       dentry->d_name.len,
244                                       ceph_translate_ino(dentry->d_sb,
245                                                          d_inode(dentry)->i_ino),
246                                       d_inode(dentry)->i_mode >> 12)) {
247                                 dput(dentry);
248                                 err = 0;
249                                 break;
250                         }
251                         ctx->pos++;
252
253                         if (last)
254                                 dput(last);
255                         last = dentry;
256                 } else {
257                         dput(dentry);
258                 }
259         }
260 out:
261         ceph_readdir_cache_release(&cache_ctl);
262         if (last) {
263                 int ret;
264                 di = ceph_dentry(last);
265                 ret = note_last_dentry(fi, last->d_name.name, last->d_name.len,
266                                        fpos_off(di->offset) + 1);
267                 if (ret < 0)
268                         err = ret;
269                 dput(last);
270         }
271         return err;
272 }
273
274 static int ceph_readdir(struct file *file, struct dir_context *ctx)
275 {
276         struct ceph_file_info *fi = file->private_data;
277         struct inode *inode = file_inode(file);
278         struct ceph_inode_info *ci = ceph_inode(inode);
279         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
280         struct ceph_mds_client *mdsc = fsc->mdsc;
281         unsigned frag = fpos_frag(ctx->pos);
282         int off = fpos_off(ctx->pos);
283         int err;
284         u32 ftype;
285         struct ceph_mds_reply_info_parsed *rinfo;
286
287         dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
288         if (fi->flags & CEPH_F_ATEND)
289                 return 0;
290
291         /* always start with . and .. */
292         if (ctx->pos == 0) {
293                 dout("readdir off 0 -> '.'\n");
294                 if (!dir_emit(ctx, ".", 1, 
295                             ceph_translate_ino(inode->i_sb, inode->i_ino),
296                             inode->i_mode >> 12))
297                         return 0;
298                 ctx->pos = 1;
299                 off = 1;
300         }
301         if (ctx->pos == 1) {
302                 ino_t ino = parent_ino(file->f_path.dentry);
303                 dout("readdir off 1 -> '..'\n");
304                 if (!dir_emit(ctx, "..", 2,
305                             ceph_translate_ino(inode->i_sb, ino),
306                             inode->i_mode >> 12))
307                         return 0;
308                 ctx->pos = 2;
309                 off = 2;
310         }
311
312         /* can we use the dcache? */
313         spin_lock(&ci->i_ceph_lock);
314         if (ceph_test_mount_opt(fsc, DCACHE) &&
315             !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
316             ceph_snap(inode) != CEPH_SNAPDIR &&
317             __ceph_dir_is_complete_ordered(ci) &&
318             __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
319                 u32 shared_gen = ci->i_shared_gen;
320                 spin_unlock(&ci->i_ceph_lock);
321                 err = __dcache_readdir(file, ctx, shared_gen);
322                 if (err != -EAGAIN)
323                         return err;
324                 frag = fpos_frag(ctx->pos);
325                 off = fpos_off(ctx->pos);
326         } else {
327                 spin_unlock(&ci->i_ceph_lock);
328         }
329
330         /* proceed with a normal readdir */
331 more:
332         /* do we have the correct frag content buffered? */
333         if (fi->frag != frag || fi->last_readdir == NULL) {
334                 struct ceph_mds_request *req;
335                 int op = ceph_snap(inode) == CEPH_SNAPDIR ?
336                         CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
337
338                 /* discard old result, if any */
339                 if (fi->last_readdir) {
340                         ceph_mdsc_put_request(fi->last_readdir);
341                         fi->last_readdir = NULL;
342                 }
343
344                 dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
345                      ceph_vinop(inode), frag, fi->last_name);
346                 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
347                 if (IS_ERR(req))
348                         return PTR_ERR(req);
349                 err = ceph_alloc_readdir_reply_buffer(req, inode);
350                 if (err) {
351                         ceph_mdsc_put_request(req);
352                         return err;
353                 }
354                 /* hints to request -> mds selection code */
355                 req->r_direct_mode = USE_AUTH_MDS;
356                 req->r_direct_hash = ceph_frag_value(frag);
357                 req->r_direct_is_hash = true;
358                 if (fi->last_name) {
359                         req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
360                         if (!req->r_path2) {
361                                 ceph_mdsc_put_request(req);
362                                 return -ENOMEM;
363                         }
364                 }
365                 req->r_dir_release_cnt = fi->dir_release_count;
366                 req->r_dir_ordered_cnt = fi->dir_ordered_count;
367                 req->r_readdir_cache_idx = fi->readdir_cache_idx;
368                 req->r_readdir_offset = fi->next_offset;
369                 req->r_args.readdir.frag = cpu_to_le32(frag);
370
371                 req->r_inode = inode;
372                 ihold(inode);
373                 req->r_dentry = dget(file->f_path.dentry);
374                 err = ceph_mdsc_do_request(mdsc, NULL, req);
375                 if (err < 0) {
376                         ceph_mdsc_put_request(req);
377                         return err;
378                 }
379                 dout("readdir got and parsed readdir result=%d"
380                      " on frag %x, end=%d, complete=%d\n", err, frag,
381                      (int)req->r_reply_info.dir_end,
382                      (int)req->r_reply_info.dir_complete);
383
384
385                 /* note next offset and last dentry name */
386                 rinfo = &req->r_reply_info;
387                 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
388                         frag = le32_to_cpu(rinfo->dir_dir->frag);
389                         off = req->r_readdir_offset;
390                         fi->next_offset = off;
391                 }
392
393                 fi->frag = frag;
394                 fi->offset = fi->next_offset;
395                 fi->last_readdir = req;
396
397                 if (req->r_did_prepopulate) {
398                         fi->readdir_cache_idx = req->r_readdir_cache_idx;
399                         if (fi->readdir_cache_idx < 0) {
400                                 /* preclude from marking dir ordered */
401                                 fi->dir_ordered_count = 0;
402                         } else if (ceph_frag_is_leftmost(frag) && off == 2) {
403                                 /* note dir version at start of readdir so
404                                  * we can tell if any dentries get dropped */
405                                 fi->dir_release_count = req->r_dir_release_cnt;
406                                 fi->dir_ordered_count = req->r_dir_ordered_cnt;
407                         }
408                 } else {
409                         dout("readdir !did_prepopulate");
410                         /* disable readdir cache */
411                         fi->readdir_cache_idx = -1;
412                         /* preclude from marking dir complete */
413                         fi->dir_release_count = 0;
414                 }
415
416                 if (req->r_reply_info.dir_end) {
417                         kfree(fi->last_name);
418                         fi->last_name = NULL;
419                         if (ceph_frag_is_rightmost(frag))
420                                 fi->next_offset = 2;
421                         else
422                                 fi->next_offset = 0;
423                 } else {
424                         err = note_last_dentry(fi,
425                                        rinfo->dir_dname[rinfo->dir_nr-1],
426                                        rinfo->dir_dname_len[rinfo->dir_nr-1],
427                                        fi->next_offset + rinfo->dir_nr);
428                         if (err)
429                                 return err;
430                 }
431         }
432
433         rinfo = &fi->last_readdir->r_reply_info;
434         dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
435              rinfo->dir_nr, off, fi->offset);
436
437         ctx->pos = ceph_make_fpos(frag, off);
438         while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
439                 struct ceph_mds_reply_inode *in =
440                         rinfo->dir_in[off - fi->offset].in;
441                 struct ceph_vino vino;
442                 ino_t ino;
443
444                 dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
445                      off, off - fi->offset, rinfo->dir_nr, ctx->pos,
446                      rinfo->dir_dname_len[off - fi->offset],
447                      rinfo->dir_dname[off - fi->offset], in);
448                 BUG_ON(!in);
449                 ftype = le32_to_cpu(in->mode) >> 12;
450                 vino.ino = le64_to_cpu(in->ino);
451                 vino.snap = le64_to_cpu(in->snapid);
452                 ino = ceph_vino_to_ino(vino);
453                 if (!dir_emit(ctx,
454                             rinfo->dir_dname[off - fi->offset],
455                             rinfo->dir_dname_len[off - fi->offset],
456                             ceph_translate_ino(inode->i_sb, ino), ftype)) {
457                         dout("filldir stopping us...\n");
458                         return 0;
459                 }
460                 off++;
461                 ctx->pos++;
462         }
463
464         if (fi->last_name) {
465                 ceph_mdsc_put_request(fi->last_readdir);
466                 fi->last_readdir = NULL;
467                 goto more;
468         }
469
470         /* more frags? */
471         if (!ceph_frag_is_rightmost(frag)) {
472                 frag = ceph_frag_next(frag);
473                 off = 0;
474                 ctx->pos = ceph_make_fpos(frag, off);
475                 dout("readdir next frag is %x\n", frag);
476                 goto more;
477         }
478         fi->flags |= CEPH_F_ATEND;
479
480         /*
481          * if dir_release_count still matches the dir, no dentries
482          * were released during the whole readdir, and we should have
483          * the complete dir contents in our cache.
484          */
485         if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) {
486                 spin_lock(&ci->i_ceph_lock);
487                 if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) {
488                         dout(" marking %p complete and ordered\n", inode);
489                         /* use i_size to track number of entries in
490                          * readdir cache */
491                         BUG_ON(fi->readdir_cache_idx < 0);
492                         i_size_write(inode, fi->readdir_cache_idx *
493                                      sizeof(struct dentry*));
494                 } else {
495                         dout(" marking %p complete\n", inode);
496                 }
497                 __ceph_dir_set_complete(ci, fi->dir_release_count,
498                                         fi->dir_ordered_count);
499                 spin_unlock(&ci->i_ceph_lock);
500         }
501
502         dout("readdir %p file %p done.\n", inode, file);
503         return 0;
504 }
505
506 static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
507 {
508         if (fi->last_readdir) {
509                 ceph_mdsc_put_request(fi->last_readdir);
510                 fi->last_readdir = NULL;
511         }
512         kfree(fi->last_name);
513         fi->last_name = NULL;
514         fi->dir_release_count = 0;
515         fi->readdir_cache_idx = -1;
516         if (ceph_frag_is_leftmost(frag))
517                 fi->next_offset = 2;  /* compensate for . and .. */
518         else
519                 fi->next_offset = 0;
520         fi->flags &= ~CEPH_F_ATEND;
521 }
522
523 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
524 {
525         struct ceph_file_info *fi = file->private_data;
526         struct inode *inode = file->f_mapping->host;
527         loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
528         loff_t retval;
529
530         inode_lock(inode);
531         retval = -EINVAL;
532         switch (whence) {
533         case SEEK_CUR:
534                 offset += file->f_pos;
535         case SEEK_SET:
536                 break;
537         case SEEK_END:
538                 retval = -EOPNOTSUPP;
539         default:
540                 goto out;
541         }
542
543         if (offset >= 0) {
544                 if (offset != file->f_pos) {
545                         file->f_pos = offset;
546                         file->f_version = 0;
547                         fi->flags &= ~CEPH_F_ATEND;
548                 }
549                 retval = offset;
550
551                 if (offset == 0 ||
552                     fpos_frag(offset) != fi->frag ||
553                     fpos_off(offset) < fi->offset) {
554                         /* discard buffered readdir content on seekdir(0), or
555                          * seek to new frag, or seek prior to current chunk */
556                         dout("dir_llseek dropping %p content\n", file);
557                         reset_readdir(fi, fpos_frag(offset));
558                 } else if (fpos_cmp(offset, old_offset) > 0) {
559                         /* reset dir_release_count if we did a forward seek */
560                         fi->dir_release_count = 0;
561                         fi->readdir_cache_idx = -1;
562                 }
563         }
564 out:
565         inode_unlock(inode);
566         return retval;
567 }
568
569 /*
570  * Handle lookups for the hidden .snap directory.
571  */
572 int ceph_handle_snapdir(struct ceph_mds_request *req,
573                         struct dentry *dentry, int err)
574 {
575         struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
576         struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */
577
578         /* .snap dir? */
579         if (err == -ENOENT &&
580             ceph_snap(parent) == CEPH_NOSNAP &&
581             strcmp(dentry->d_name.name,
582                    fsc->mount_options->snapdir_name) == 0) {
583                 struct inode *inode = ceph_get_snapdir(parent);
584                 dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n",
585                      dentry, dentry, inode);
586                 BUG_ON(!d_unhashed(dentry));
587                 d_add(dentry, inode);
588                 err = 0;
589         }
590         return err;
591 }
592
593 /*
594  * Figure out final result of a lookup/open request.
595  *
596  * Mainly, make sure we return the final req->r_dentry (if it already
597  * existed) in place of the original VFS-provided dentry when they
598  * differ.
599  *
600  * Gracefully handle the case where the MDS replies with -ENOENT and
601  * no trace (which it may do, at its discretion, e.g., if it doesn't
602  * care to issue a lease on the negative dentry).
603  */
604 struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
605                                   struct dentry *dentry, int err)
606 {
607         if (err == -ENOENT) {
608                 /* no trace? */
609                 err = 0;
610                 if (!req->r_reply_info.head->is_dentry) {
611                         dout("ENOENT and no trace, dentry %p inode %p\n",
612                              dentry, d_inode(dentry));
613                         if (d_really_is_positive(dentry)) {
614                                 d_drop(dentry);
615                                 err = -ENOENT;
616                         } else {
617                                 d_add(dentry, NULL);
618                         }
619                 }
620         }
621         if (err)
622                 dentry = ERR_PTR(err);
623         else if (dentry != req->r_dentry)
624                 dentry = dget(req->r_dentry);   /* we got spliced */
625         else
626                 dentry = NULL;
627         return dentry;
628 }
629
630 static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
631 {
632         return ceph_ino(inode) == CEPH_INO_ROOT &&
633                 strncmp(dentry->d_name.name, ".ceph", 5) == 0;
634 }
635
636 /*
637  * Look up a single dir entry.  If there is a lookup intent, inform
638  * the MDS so that it gets our 'caps wanted' value in a single op.
639  */
640 static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
641                                   unsigned int flags)
642 {
643         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
644         struct ceph_mds_client *mdsc = fsc->mdsc;
645         struct ceph_mds_request *req;
646         int op;
647         int mask;
648         int err;
649
650         dout("lookup %p dentry %p '%pd'\n",
651              dir, dentry, dentry);
652
653         if (dentry->d_name.len > NAME_MAX)
654                 return ERR_PTR(-ENAMETOOLONG);
655
656         err = ceph_init_dentry(dentry);
657         if (err < 0)
658                 return ERR_PTR(err);
659
660         /* can we conclude ENOENT locally? */
661         if (d_really_is_negative(dentry)) {
662                 struct ceph_inode_info *ci = ceph_inode(dir);
663                 struct ceph_dentry_info *di = ceph_dentry(dentry);
664
665                 spin_lock(&ci->i_ceph_lock);
666                 dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
667                 if (strncmp(dentry->d_name.name,
668                             fsc->mount_options->snapdir_name,
669                             dentry->d_name.len) &&
670                     !is_root_ceph_dentry(dir, dentry) &&
671                     ceph_test_mount_opt(fsc, DCACHE) &&
672                     __ceph_dir_is_complete(ci) &&
673                     (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
674                         spin_unlock(&ci->i_ceph_lock);
675                         dout(" dir %p complete, -ENOENT\n", dir);
676                         d_add(dentry, NULL);
677                         di->lease_shared_gen = ci->i_shared_gen;
678                         return NULL;
679                 }
680                 spin_unlock(&ci->i_ceph_lock);
681         }
682
683         op = ceph_snap(dir) == CEPH_SNAPDIR ?
684                 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
685         req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
686         if (IS_ERR(req))
687                 return ERR_CAST(req);
688         req->r_dentry = dget(dentry);
689         req->r_num_caps = 2;
690
691         mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
692         if (ceph_security_xattr_wanted(dir))
693                 mask |= CEPH_CAP_XATTR_SHARED;
694         req->r_args.getattr.mask = cpu_to_le32(mask);
695
696         req->r_locked_dir = dir;
697         err = ceph_mdsc_do_request(mdsc, NULL, req);
698         err = ceph_handle_snapdir(req, dentry, err);
699         dentry = ceph_finish_lookup(req, dentry, err);
700         ceph_mdsc_put_request(req);  /* will dput(dentry) */
701         dout("lookup result=%p\n", dentry);
702         return dentry;
703 }
704
705 /*
706  * If we do a create but get no trace back from the MDS, follow up with
707  * a lookup (the VFS expects us to link up the provided dentry).
708  */
709 int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
710 {
711         struct dentry *result = ceph_lookup(dir, dentry, 0);
712
713         if (result && !IS_ERR(result)) {
714                 /*
715                  * We created the item, then did a lookup, and found
716                  * it was already linked to another inode we already
717                  * had in our cache (and thus got spliced). To not
718                  * confuse VFS (especially when inode is a directory),
719                  * we don't link our dentry to that inode, return an
720                  * error instead.
721                  *
722                  * This event should be rare and it happens only when
723                  * we talk to old MDS. Recent MDS does not send traceless
724                  * reply for request that creates new inode.
725                  */
726                 d_drop(result);
727                 return -ESTALE;
728         }
729         return PTR_ERR(result);
730 }
731
732 static int ceph_mknod(struct inode *dir, struct dentry *dentry,
733                       umode_t mode, dev_t rdev)
734 {
735         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
736         struct ceph_mds_client *mdsc = fsc->mdsc;
737         struct ceph_mds_request *req;
738         struct ceph_acls_info acls = {};
739         int err;
740
741         if (ceph_snap(dir) != CEPH_NOSNAP)
742                 return -EROFS;
743
744         err = ceph_pre_init_acls(dir, &mode, &acls);
745         if (err < 0)
746                 return err;
747
748         dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
749              dir, dentry, mode, rdev);
750         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
751         if (IS_ERR(req)) {
752                 err = PTR_ERR(req);
753                 goto out;
754         }
755         req->r_dentry = dget(dentry);
756         req->r_num_caps = 2;
757         req->r_locked_dir = dir;
758         req->r_args.mknod.mode = cpu_to_le32(mode);
759         req->r_args.mknod.rdev = cpu_to_le32(rdev);
760         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
761         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
762         if (acls.pagelist) {
763                 req->r_pagelist = acls.pagelist;
764                 acls.pagelist = NULL;
765         }
766         err = ceph_mdsc_do_request(mdsc, dir, req);
767         if (!err && !req->r_reply_info.head->is_dentry)
768                 err = ceph_handle_notrace_create(dir, dentry);
769         ceph_mdsc_put_request(req);
770 out:
771         if (!err)
772                 ceph_init_inode_acls(d_inode(dentry), &acls);
773         else
774                 d_drop(dentry);
775         ceph_release_acls_info(&acls);
776         return err;
777 }
778
779 static int ceph_create(struct inode *dir, struct dentry *dentry, umode_t mode,
780                        bool excl)
781 {
782         return ceph_mknod(dir, dentry, mode, 0);
783 }
784
785 static int ceph_symlink(struct inode *dir, struct dentry *dentry,
786                             const char *dest)
787 {
788         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
789         struct ceph_mds_client *mdsc = fsc->mdsc;
790         struct ceph_mds_request *req;
791         int err;
792
793         if (ceph_snap(dir) != CEPH_NOSNAP)
794                 return -EROFS;
795
796         dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
797         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
798         if (IS_ERR(req)) {
799                 err = PTR_ERR(req);
800                 goto out;
801         }
802         req->r_path2 = kstrdup(dest, GFP_KERNEL);
803         if (!req->r_path2) {
804                 err = -ENOMEM;
805                 ceph_mdsc_put_request(req);
806                 goto out;
807         }
808         req->r_locked_dir = dir;
809         req->r_dentry = dget(dentry);
810         req->r_num_caps = 2;
811         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
812         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
813         err = ceph_mdsc_do_request(mdsc, dir, req);
814         if (!err && !req->r_reply_info.head->is_dentry)
815                 err = ceph_handle_notrace_create(dir, dentry);
816         ceph_mdsc_put_request(req);
817 out:
818         if (err)
819                 d_drop(dentry);
820         return err;
821 }
822
823 static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
824 {
825         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
826         struct ceph_mds_client *mdsc = fsc->mdsc;
827         struct ceph_mds_request *req;
828         struct ceph_acls_info acls = {};
829         int err = -EROFS;
830         int op;
831
832         if (ceph_snap(dir) == CEPH_SNAPDIR) {
833                 /* mkdir .snap/foo is a MKSNAP */
834                 op = CEPH_MDS_OP_MKSNAP;
835                 dout("mksnap dir %p snap '%pd' dn %p\n", dir,
836                      dentry, dentry);
837         } else if (ceph_snap(dir) == CEPH_NOSNAP) {
838                 dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
839                 op = CEPH_MDS_OP_MKDIR;
840         } else {
841                 goto out;
842         }
843
844         mode |= S_IFDIR;
845         err = ceph_pre_init_acls(dir, &mode, &acls);
846         if (err < 0)
847                 goto out;
848
849         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
850         if (IS_ERR(req)) {
851                 err = PTR_ERR(req);
852                 goto out;
853         }
854
855         req->r_dentry = dget(dentry);
856         req->r_num_caps = 2;
857         req->r_locked_dir = dir;
858         req->r_args.mkdir.mode = cpu_to_le32(mode);
859         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
860         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
861         if (acls.pagelist) {
862                 req->r_pagelist = acls.pagelist;
863                 acls.pagelist = NULL;
864         }
865         err = ceph_mdsc_do_request(mdsc, dir, req);
866         if (!err &&
867             !req->r_reply_info.head->is_target &&
868             !req->r_reply_info.head->is_dentry)
869                 err = ceph_handle_notrace_create(dir, dentry);
870         ceph_mdsc_put_request(req);
871 out:
872         if (!err)
873                 ceph_init_inode_acls(d_inode(dentry), &acls);
874         else
875                 d_drop(dentry);
876         ceph_release_acls_info(&acls);
877         return err;
878 }
879
880 static int ceph_link(struct dentry *old_dentry, struct inode *dir,
881                      struct dentry *dentry)
882 {
883         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
884         struct ceph_mds_client *mdsc = fsc->mdsc;
885         struct ceph_mds_request *req;
886         int err;
887
888         if (ceph_snap(dir) != CEPH_NOSNAP)
889                 return -EROFS;
890
891         dout("link in dir %p old_dentry %p dentry %p\n", dir,
892              old_dentry, dentry);
893         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
894         if (IS_ERR(req)) {
895                 d_drop(dentry);
896                 return PTR_ERR(req);
897         }
898         req->r_dentry = dget(dentry);
899         req->r_num_caps = 2;
900         req->r_old_dentry = dget(old_dentry);
901         req->r_locked_dir = dir;
902         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
903         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
904         /* release LINK_SHARED on source inode (mds will lock it) */
905         req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
906         err = ceph_mdsc_do_request(mdsc, dir, req);
907         if (err) {
908                 d_drop(dentry);
909         } else if (!req->r_reply_info.head->is_dentry) {
910                 ihold(d_inode(old_dentry));
911                 d_instantiate(dentry, d_inode(old_dentry));
912         }
913         ceph_mdsc_put_request(req);
914         return err;
915 }
916
917 /*
918  * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps.  If it
919  * looks like the link count will hit 0, drop any other caps (other
920  * than PIN) we don't specifically want (due to the file still being
921  * open).
922  */
923 static int drop_caps_for_unlink(struct inode *inode)
924 {
925         struct ceph_inode_info *ci = ceph_inode(inode);
926         int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
927
928         spin_lock(&ci->i_ceph_lock);
929         if (inode->i_nlink == 1) {
930                 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
931                 ci->i_ceph_flags |= CEPH_I_NODELAY;
932         }
933         spin_unlock(&ci->i_ceph_lock);
934         return drop;
935 }
936
937 /*
938  * rmdir and unlink are differ only by the metadata op code
939  */
940 static int ceph_unlink(struct inode *dir, struct dentry *dentry)
941 {
942         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
943         struct ceph_mds_client *mdsc = fsc->mdsc;
944         struct inode *inode = d_inode(dentry);
945         struct ceph_mds_request *req;
946         int err = -EROFS;
947         int op;
948
949         if (ceph_snap(dir) == CEPH_SNAPDIR) {
950                 /* rmdir .snap/foo is RMSNAP */
951                 dout("rmsnap dir %p '%pd' dn %p\n", dir, dentry, dentry);
952                 op = CEPH_MDS_OP_RMSNAP;
953         } else if (ceph_snap(dir) == CEPH_NOSNAP) {
954                 dout("unlink/rmdir dir %p dn %p inode %p\n",
955                      dir, dentry, inode);
956                 op = d_is_dir(dentry) ?
957                         CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
958         } else
959                 goto out;
960         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
961         if (IS_ERR(req)) {
962                 err = PTR_ERR(req);
963                 goto out;
964         }
965         req->r_dentry = dget(dentry);
966         req->r_num_caps = 2;
967         req->r_locked_dir = dir;
968         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
969         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
970         req->r_inode_drop = drop_caps_for_unlink(inode);
971         err = ceph_mdsc_do_request(mdsc, dir, req);
972         if (!err && !req->r_reply_info.head->is_dentry)
973                 d_delete(dentry);
974         ceph_mdsc_put_request(req);
975 out:
976         return err;
977 }
978
979 static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
980                        struct inode *new_dir, struct dentry *new_dentry)
981 {
982         struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb);
983         struct ceph_mds_client *mdsc = fsc->mdsc;
984         struct ceph_mds_request *req;
985         int op = CEPH_MDS_OP_RENAME;
986         int err;
987
988         if (ceph_snap(old_dir) != ceph_snap(new_dir))
989                 return -EXDEV;
990         if (ceph_snap(old_dir) != CEPH_NOSNAP) {
991                 if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR)
992                         op = CEPH_MDS_OP_RENAMESNAP;
993                 else
994                         return -EROFS;
995         }
996         dout("rename dir %p dentry %p to dir %p dentry %p\n",
997              old_dir, old_dentry, new_dir, new_dentry);
998         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
999         if (IS_ERR(req))
1000                 return PTR_ERR(req);
1001         ihold(old_dir);
1002         req->r_dentry = dget(new_dentry);
1003         req->r_num_caps = 2;
1004         req->r_old_dentry = dget(old_dentry);
1005         req->r_old_dentry_dir = old_dir;
1006         req->r_locked_dir = new_dir;
1007         req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
1008         req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
1009         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
1010         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1011         /* release LINK_RDCACHE on source inode (mds will lock it) */
1012         req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
1013         if (d_really_is_positive(new_dentry))
1014                 req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry));
1015         err = ceph_mdsc_do_request(mdsc, old_dir, req);
1016         if (!err && !req->r_reply_info.head->is_dentry) {
1017                 /*
1018                  * Normally d_move() is done by fill_trace (called by
1019                  * do_request, above).  If there is no trace, we need
1020                  * to do it here.
1021                  */
1022
1023                 /* d_move screws up sibling dentries' offsets */
1024                 ceph_dir_clear_complete(old_dir);
1025                 ceph_dir_clear_complete(new_dir);
1026
1027                 d_move(old_dentry, new_dentry);
1028
1029                 /* ensure target dentry is invalidated, despite
1030                    rehashing bug in vfs_rename_dir */
1031                 ceph_invalidate_dentry_lease(new_dentry);
1032         }
1033         ceph_mdsc_put_request(req);
1034         return err;
1035 }
1036
1037 /*
1038  * Ensure a dentry lease will no longer revalidate.
1039  */
1040 void ceph_invalidate_dentry_lease(struct dentry *dentry)
1041 {
1042         spin_lock(&dentry->d_lock);
1043         dentry->d_time = jiffies;
1044         ceph_dentry(dentry)->lease_shared_gen = 0;
1045         spin_unlock(&dentry->d_lock);
1046 }
1047
1048 /*
1049  * Check if dentry lease is valid.  If not, delete the lease.  Try to
1050  * renew if the least is more than half up.
1051  */
1052 static int dentry_lease_is_valid(struct dentry *dentry)
1053 {
1054         struct ceph_dentry_info *di;
1055         struct ceph_mds_session *s;
1056         int valid = 0;
1057         u32 gen;
1058         unsigned long ttl;
1059         struct ceph_mds_session *session = NULL;
1060         struct inode *dir = NULL;
1061         u32 seq = 0;
1062
1063         spin_lock(&dentry->d_lock);
1064         di = ceph_dentry(dentry);
1065         if (di->lease_session) {
1066                 s = di->lease_session;
1067                 spin_lock(&s->s_gen_ttl_lock);
1068                 gen = s->s_cap_gen;
1069                 ttl = s->s_cap_ttl;
1070                 spin_unlock(&s->s_gen_ttl_lock);
1071
1072                 if (di->lease_gen == gen &&
1073                     time_before(jiffies, dentry->d_time) &&
1074                     time_before(jiffies, ttl)) {
1075                         valid = 1;
1076                         if (di->lease_renew_after &&
1077                             time_after(jiffies, di->lease_renew_after)) {
1078                                 /* we should renew */
1079                                 dir = d_inode(dentry->d_parent);
1080                                 session = ceph_get_mds_session(s);
1081                                 seq = di->lease_seq;
1082                                 di->lease_renew_after = 0;
1083                                 di->lease_renew_from = jiffies;
1084                         }
1085                 }
1086         }
1087         spin_unlock(&dentry->d_lock);
1088
1089         if (session) {
1090                 ceph_mdsc_lease_send_msg(session, dir, dentry,
1091                                          CEPH_MDS_LEASE_RENEW, seq);
1092                 ceph_put_mds_session(session);
1093         }
1094         dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid);
1095         return valid;
1096 }
1097
1098 /*
1099  * Check if directory-wide content lease/cap is valid.
1100  */
1101 static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
1102 {
1103         struct ceph_inode_info *ci = ceph_inode(dir);
1104         struct ceph_dentry_info *di = ceph_dentry(dentry);
1105         int valid = 0;
1106
1107         spin_lock(&ci->i_ceph_lock);
1108         if (ci->i_shared_gen == di->lease_shared_gen)
1109                 valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
1110         spin_unlock(&ci->i_ceph_lock);
1111         dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
1112              dir, (unsigned)ci->i_shared_gen, dentry,
1113              (unsigned)di->lease_shared_gen, valid);
1114         return valid;
1115 }
1116
1117 /*
1118  * Check if cached dentry can be trusted.
1119  */
1120 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1121 {
1122         int valid = 0;
1123         struct dentry *parent;
1124         struct inode *dir;
1125
1126         if (flags & LOOKUP_RCU)
1127                 return -ECHILD;
1128
1129         dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
1130              dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
1131
1132         parent = dget_parent(dentry);
1133         dir = d_inode(parent);
1134
1135         /* always trust cached snapped dentries, snapdir dentry */
1136         if (ceph_snap(dir) != CEPH_NOSNAP) {
1137                 dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
1138                      dentry, d_inode(dentry));
1139                 valid = 1;
1140         } else if (d_really_is_positive(dentry) &&
1141                    ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
1142                 valid = 1;
1143         } else if (dentry_lease_is_valid(dentry) ||
1144                    dir_lease_is_valid(dir, dentry)) {
1145                 if (d_really_is_positive(dentry))
1146                         valid = ceph_is_any_caps(d_inode(dentry));
1147                 else
1148                         valid = 1;
1149         }
1150
1151         if (!valid) {
1152                 struct ceph_mds_client *mdsc =
1153                         ceph_sb_to_client(dir->i_sb)->mdsc;
1154                 struct ceph_mds_request *req;
1155                 int op, mask, err;
1156
1157                 op = ceph_snap(dir) == CEPH_SNAPDIR ?
1158                         CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
1159                 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
1160                 if (!IS_ERR(req)) {
1161                         req->r_dentry = dget(dentry);
1162                         req->r_num_caps = 2;
1163
1164                         mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
1165                         if (ceph_security_xattr_wanted(dir))
1166                                 mask |= CEPH_CAP_XATTR_SHARED;
1167                         req->r_args.getattr.mask = mask;
1168
1169                         req->r_locked_dir = dir;
1170                         err = ceph_mdsc_do_request(mdsc, NULL, req);
1171                         if (err == 0 || err == -ENOENT) {
1172                                 if (dentry == req->r_dentry) {
1173                                         valid = !d_unhashed(dentry);
1174                                 } else {
1175                                         d_invalidate(req->r_dentry);
1176                                         err = -EAGAIN;
1177                                 }
1178                         }
1179                         ceph_mdsc_put_request(req);
1180                         dout("d_revalidate %p lookup result=%d\n",
1181                              dentry, err);
1182                 }
1183         }
1184
1185         dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1186         if (valid) {
1187                 ceph_dentry_lru_touch(dentry);
1188         } else {
1189                 ceph_dir_clear_complete(dir);
1190         }
1191
1192         dput(parent);
1193         return valid;
1194 }
1195
1196 /*
1197  * Release our ceph_dentry_info.
1198  */
1199 static void ceph_d_release(struct dentry *dentry)
1200 {
1201         struct ceph_dentry_info *di = ceph_dentry(dentry);
1202
1203         dout("d_release %p\n", dentry);
1204         ceph_dentry_lru_del(dentry);
1205         if (di->lease_session)
1206                 ceph_put_mds_session(di->lease_session);
1207         kmem_cache_free(ceph_dentry_cachep, di);
1208         dentry->d_fsdata = NULL;
1209 }
1210
1211 static int ceph_snapdir_d_revalidate(struct dentry *dentry,
1212                                           unsigned int flags)
1213 {
1214         /*
1215          * Eventually, we'll want to revalidate snapped metadata
1216          * too... probably...
1217          */
1218         return 1;
1219 }
1220
1221 /*
1222  * When the VFS prunes a dentry from the cache, we need to clear the
1223  * complete flag on the parent directory.
1224  *
1225  * Called under dentry->d_lock.
1226  */
1227 static void ceph_d_prune(struct dentry *dentry)
1228 {
1229         dout("ceph_d_prune %p\n", dentry);
1230
1231         /* do we have a valid parent? */
1232         if (IS_ROOT(dentry))
1233                 return;
1234
1235         /* if we are not hashed, we don't affect dir's completeness */
1236         if (d_unhashed(dentry))
1237                 return;
1238
1239         /*
1240          * we hold d_lock, so d_parent is stable, and d_fsdata is never
1241          * cleared until d_release
1242          */
1243         ceph_dir_clear_complete(d_inode(dentry->d_parent));
1244 }
1245
1246 /*
1247  * read() on a dir.  This weird interface hack only works if mounted
1248  * with '-o dirstat'.
1249  */
1250 static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
1251                              loff_t *ppos)
1252 {
1253         struct ceph_file_info *cf = file->private_data;
1254         struct inode *inode = file_inode(file);
1255         struct ceph_inode_info *ci = ceph_inode(inode);
1256         int left;
1257         const int bufsize = 1024;
1258
1259         if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
1260                 return -EISDIR;
1261
1262         if (!cf->dir_info) {
1263                 cf->dir_info = kmalloc(bufsize, GFP_KERNEL);
1264                 if (!cf->dir_info)
1265                         return -ENOMEM;
1266                 cf->dir_info_len =
1267                         snprintf(cf->dir_info, bufsize,
1268                                 "entries:   %20lld\n"
1269                                 " files:    %20lld\n"
1270                                 " subdirs:  %20lld\n"
1271                                 "rentries:  %20lld\n"
1272                                 " rfiles:   %20lld\n"
1273                                 " rsubdirs: %20lld\n"
1274                                 "rbytes:    %20lld\n"
1275                                 "rctime:    %10ld.%09ld\n",
1276                                 ci->i_files + ci->i_subdirs,
1277                                 ci->i_files,
1278                                 ci->i_subdirs,
1279                                 ci->i_rfiles + ci->i_rsubdirs,
1280                                 ci->i_rfiles,
1281                                 ci->i_rsubdirs,
1282                                 ci->i_rbytes,
1283                                 (long)ci->i_rctime.tv_sec,
1284                                 (long)ci->i_rctime.tv_nsec);
1285         }
1286
1287         if (*ppos >= cf->dir_info_len)
1288                 return 0;
1289         size = min_t(unsigned, size, cf->dir_info_len-*ppos);
1290         left = copy_to_user(buf, cf->dir_info + *ppos, size);
1291         if (left == size)
1292                 return -EFAULT;
1293         *ppos += (size - left);
1294         return size - left;
1295 }
1296
1297 /*
1298  * We maintain a private dentry LRU.
1299  *
1300  * FIXME: this needs to be changed to a per-mds lru to be useful.
1301  */
1302 void ceph_dentry_lru_add(struct dentry *dn)
1303 {
1304         struct ceph_dentry_info *di = ceph_dentry(dn);
1305         struct ceph_mds_client *mdsc;
1306
1307         dout("dentry_lru_add %p %p '%pd'\n", di, dn, dn);
1308         mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1309         spin_lock(&mdsc->dentry_lru_lock);
1310         list_add_tail(&di->lru, &mdsc->dentry_lru);
1311         mdsc->num_dentry++;
1312         spin_unlock(&mdsc->dentry_lru_lock);
1313 }
1314
1315 void ceph_dentry_lru_touch(struct dentry *dn)
1316 {
1317         struct ceph_dentry_info *di = ceph_dentry(dn);
1318         struct ceph_mds_client *mdsc;
1319
1320         dout("dentry_lru_touch %p %p '%pd' (offset %lld)\n", di, dn, dn,
1321              di->offset);
1322         mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1323         spin_lock(&mdsc->dentry_lru_lock);
1324         list_move_tail(&di->lru, &mdsc->dentry_lru);
1325         spin_unlock(&mdsc->dentry_lru_lock);
1326 }
1327
1328 void ceph_dentry_lru_del(struct dentry *dn)
1329 {
1330         struct ceph_dentry_info *di = ceph_dentry(dn);
1331         struct ceph_mds_client *mdsc;
1332
1333         dout("dentry_lru_del %p %p '%pd'\n", di, dn, dn);
1334         mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1335         spin_lock(&mdsc->dentry_lru_lock);
1336         list_del_init(&di->lru);
1337         mdsc->num_dentry--;
1338         spin_unlock(&mdsc->dentry_lru_lock);
1339 }
1340
1341 /*
1342  * Return name hash for a given dentry.  This is dependent on
1343  * the parent directory's hash function.
1344  */
1345 unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
1346 {
1347         struct ceph_inode_info *dci = ceph_inode(dir);
1348
1349         switch (dci->i_dir_layout.dl_dir_hash) {
1350         case 0: /* for backward compat */
1351         case CEPH_STR_HASH_LINUX:
1352                 return dn->d_name.hash;
1353
1354         default:
1355                 return ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
1356                                      dn->d_name.name, dn->d_name.len);
1357         }
1358 }
1359
1360 const struct file_operations ceph_dir_fops = {
1361         .read = ceph_read_dir,
1362         .iterate = ceph_readdir,
1363         .llseek = ceph_dir_llseek,
1364         .open = ceph_open,
1365         .release = ceph_release,
1366         .unlocked_ioctl = ceph_ioctl,
1367         .fsync = ceph_fsync,
1368 };
1369
1370 const struct file_operations ceph_snapdir_fops = {
1371         .iterate = ceph_readdir,
1372         .llseek = ceph_dir_llseek,
1373         .open = ceph_open,
1374         .release = ceph_release,
1375 };
1376
1377 const struct inode_operations ceph_dir_iops = {
1378         .lookup = ceph_lookup,
1379         .permission = ceph_permission,
1380         .getattr = ceph_getattr,
1381         .setattr = ceph_setattr,
1382         .setxattr = ceph_setxattr,
1383         .getxattr = ceph_getxattr,
1384         .listxattr = ceph_listxattr,
1385         .removexattr = ceph_removexattr,
1386         .get_acl = ceph_get_acl,
1387         .set_acl = ceph_set_acl,
1388         .mknod = ceph_mknod,
1389         .symlink = ceph_symlink,
1390         .mkdir = ceph_mkdir,
1391         .link = ceph_link,
1392         .unlink = ceph_unlink,
1393         .rmdir = ceph_unlink,
1394         .rename = ceph_rename,
1395         .create = ceph_create,
1396         .atomic_open = ceph_atomic_open,
1397 };
1398
1399 const struct inode_operations ceph_snapdir_iops = {
1400         .lookup = ceph_lookup,
1401         .permission = ceph_permission,
1402         .getattr = ceph_getattr,
1403         .mkdir = ceph_mkdir,
1404         .rmdir = ceph_unlink,
1405         .rename = ceph_rename,
1406 };
1407
1408 const struct dentry_operations ceph_dentry_ops = {
1409         .d_revalidate = ceph_d_revalidate,
1410         .d_release = ceph_d_release,
1411         .d_prune = ceph_d_prune,
1412 };
1413
1414 const struct dentry_operations ceph_snapdir_dentry_ops = {
1415         .d_revalidate = ceph_snapdir_d_revalidate,
1416         .d_release = ceph_d_release,
1417 };
1418
1419 const struct dentry_operations ceph_snap_dentry_ops = {
1420         .d_release = ceph_d_release,
1421         .d_prune = ceph_d_prune,
1422 };