Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Jun 2014 06:06:23 +0000 (23:06 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Jun 2014 06:06:23 +0000 (23:06 -0700)
Pull Ceph updates from Sage Weil:
 "This has a mix of bug fixes and cleanups.

  Alex's patch fixes a rare race in RBD.  Ilya's patches fix an ENOENT
  check when a second rbd image is mapped and a couple memory leaks.
  Zheng fixes several issues with fragmented directories and multiple
  MDSs.  Josh fixes a spin/sleep issue, and Josh and Guangliang's
  patches fix setting and unsetting RBD images read-only.

  Naturally there are several other cleanups mixed in for good measure"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
  rbd: only set disk to read-only once
  rbd: move calls that may sleep out of spin lock range
  rbd: add ioctl for rbd
  ceph: use truncate_pagecache() instead of truncate_inode_pages()
  ceph: include time stamp in every MDS request
  rbd: fix ida/idr memory leak
  rbd: use reference counts for image requests
  rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync()
  rbd: make sure we have latest osdmap on 'rbd map'
  libceph: add ceph_monc_wait_osdmap()
  libceph: mon_get_version request infrastructure
  libceph: recognize poolop requests in debugfs
  ceph: refactor readpage_nounlock() to make the logic clearer
  mds: check cap ID when handling cap export message
  ceph: remember subtree root dirfrag's auth MDS
  ceph: introduce ceph_fill_fragtree()
  ceph: handle cap import atomically
  ceph: pre-allocate ceph_cap struct for ceph_add_cap()
  ceph: update inode fields according to issued caps
  rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO
  ...

1  2 
fs/ceph/addr.c
fs/ceph/inode.c
fs/ceph/mds_client.c

diff --combined fs/ceph/addr.c
@@@ -211,18 -211,15 +211,15 @@@ static int readpage_nounlock(struct fil
                SetPageError(page);
                ceph_fscache_readpage_cancel(inode, page);
                goto out;
-       } else {
-               if (err < PAGE_CACHE_SIZE) {
-               /* zero fill remainder of page */
-                       zero_user_segment(page, err, PAGE_CACHE_SIZE);
-               } else {
-                       flush_dcache_page(page);
-               }
        }
-       SetPageUptodate(page);
+       if (err < PAGE_CACHE_SIZE)
+               /* zero fill remainder of page */
+               zero_user_segment(page, err, PAGE_CACHE_SIZE);
+       else
+               flush_dcache_page(page);
  
-       if (err >= 0)
-               ceph_readpage_to_fscache(inode, page);
+       SetPageUptodate(page);
+       ceph_readpage_to_fscache(inode, page);
  
  out:
        return err < 0 ? err : 0;
@@@ -694,7 -691,7 +691,7 @@@ static int ceph_writepages_start(struc
             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
  
        if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
 -              pr_warning("writepage_start %p on forced umount\n", inode);
 +              pr_warn("writepage_start %p on forced umount\n", inode);
                return -EIO; /* we're in a forced umount, don't write! */
        }
        if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
@@@ -1187,8 -1184,8 +1184,8 @@@ static int ceph_write_end(struct file *
   * never get called.
   */
  static ssize_t ceph_direct_io(int rw, struct kiocb *iocb,
 -                            const struct iovec *iov,
 -                            loff_t pos, unsigned long nr_segs)
 +                            struct iov_iter *iter,
 +                            loff_t pos)
  {
        WARN_ON(1);
        return -EINVAL;
diff --combined fs/ceph/inode.c
@@@ -10,6 -10,7 +10,7 @@@
  #include <linux/writeback.h>
  #include <linux/vmalloc.h>
  #include <linux/posix_acl.h>
+ #include <linux/random.h>
  
  #include "super.h"
  #include "mds_client.h"
@@@ -179,9 -180,8 +180,8 @@@ struct ceph_inode_frag *__ceph_find_fra
   * specified, copy the frag delegation info to the caller if
   * it is present.
   */
- u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
-                    struct ceph_inode_frag *pfrag,
-                    int *found)
+ static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+                             struct ceph_inode_frag *pfrag, int *found)
  {
        u32 t = ceph_frag_make(0, 0);
        struct ceph_inode_frag *frag;
        if (found)
                *found = 0;
  
-       mutex_lock(&ci->i_fragtree_mutex);
        while (1) {
                WARN_ON(!ceph_frag_contains_value(t, v));
                frag = __ceph_find_frag(ci, t);
        }
        dout("choose_frag(%x) = %x\n", v, t);
  
-       mutex_unlock(&ci->i_fragtree_mutex);
        return t;
  }
  
+ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+                    struct ceph_inode_frag *pfrag, int *found)
+ {
+       u32 ret;
+       mutex_lock(&ci->i_fragtree_mutex);
+       ret = __ceph_choose_frag(ci, v, pfrag, found);
+       mutex_unlock(&ci->i_fragtree_mutex);
+       return ret;
+ }
  /*
   * Process dirfrag (delegation) info from the mds.  Include leaf
   * fragment in tree ONLY if ndist > 0.  Otherwise, only
@@@ -237,11 -245,17 +245,17 @@@ static int ceph_fill_dirfrag(struct ino
        u32 id = le32_to_cpu(dirinfo->frag);
        int mds = le32_to_cpu(dirinfo->auth);
        int ndist = le32_to_cpu(dirinfo->ndist);
+       int diri_auth = -1;
        int i;
        int err = 0;
  
+       spin_lock(&ci->i_ceph_lock);
+       if (ci->i_auth_cap)
+               diri_auth = ci->i_auth_cap->mds;
+       spin_unlock(&ci->i_ceph_lock);
        mutex_lock(&ci->i_fragtree_mutex);
-       if (ndist == 0) {
+       if (ndist == 0 && mds == diri_auth) {
                /* no delegation info needed. */
                frag = __ceph_find_frag(ci, id);
                if (!frag)
@@@ -286,6 -300,75 +300,75 @@@ out
        return err;
  }
  
+ static int ceph_fill_fragtree(struct inode *inode,
+                             struct ceph_frag_tree_head *fragtree,
+                             struct ceph_mds_reply_dirfrag *dirinfo)
+ {
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_inode_frag *frag;
+       struct rb_node *rb_node;
+       int i;
+       u32 id, nsplits;
+       bool update = false;
+       mutex_lock(&ci->i_fragtree_mutex);
+       nsplits = le32_to_cpu(fragtree->nsplits);
+       if (nsplits) {
+               i = prandom_u32() % nsplits;
+               id = le32_to_cpu(fragtree->splits[i].frag);
+               if (!__ceph_find_frag(ci, id))
+                       update = true;
+       } else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
+               rb_node = rb_first(&ci->i_fragtree);
+               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+               if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
+                       update = true;
+       }
+       if (!update && dirinfo) {
+               id = le32_to_cpu(dirinfo->frag);
+               if (id != __ceph_choose_frag(ci, id, NULL, NULL))
+                       update = true;
+       }
+       if (!update)
+               goto out_unlock;
+       dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
+       rb_node = rb_first(&ci->i_fragtree);
+       for (i = 0; i < nsplits; i++) {
+               id = le32_to_cpu(fragtree->splits[i].frag);
+               frag = NULL;
+               while (rb_node) {
+                       frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+                       if (ceph_frag_compare(frag->frag, id) >= 0) {
+                               if (frag->frag != id)
+                                       frag = NULL;
+                               else
+                                       rb_node = rb_next(rb_node);
+                               break;
+                       }
+                       rb_node = rb_next(rb_node);
+                       rb_erase(&frag->node, &ci->i_fragtree);
+                       kfree(frag);
+                       frag = NULL;
+               }
+               if (!frag) {
+                       frag = __get_or_create_frag(ci, id);
+                       if (IS_ERR(frag))
+                               continue;
+               }
+               frag->split_by = le32_to_cpu(fragtree->splits[i].by);
+               dout(" frag %x split by %d\n", frag->frag, frag->split_by);
+       }
+       while (rb_node) {
+               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+               rb_node = rb_next(rb_node);
+               rb_erase(&frag->node, &ci->i_fragtree);
+               kfree(frag);
+       }
+ out_unlock:
+       mutex_unlock(&ci->i_fragtree_mutex);
+       return 0;
+ }
  
  /*
   * initialize a newly allocated inode.
@@@ -341,7 -424,6 +424,6 @@@ struct inode *ceph_alloc_inode(struct s
        INIT_LIST_HEAD(&ci->i_cap_snaps);
        ci->i_head_snapc = NULL;
        ci->i_snap_caps = 0;
-       ci->i_cap_exporting_issued = 0;
  
        for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
                ci->i_nr_by_mode[i] = 0;
@@@ -407,7 -489,7 +489,7 @@@ void ceph_destroy_inode(struct inode *i
  
        /*
         * we may still have a snap_realm reference if there are stray
-        * caps in i_cap_exporting_issued or i_snap_caps.
+        * caps in i_snap_caps.
         */
        if (ci->i_snap_realm) {
                struct ceph_mds_client *mdsc =
@@@ -582,22 -664,26 +664,26 @@@ static int fill_inode(struct inode *ino
                      unsigned long ttl_from, int cap_fmode,
                      struct ceph_cap_reservation *caps_reservation)
  {
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct ceph_mds_reply_inode *info = iinfo->in;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int i;
-       int issued = 0, implemented;
+       int issued = 0, implemented, new_issued;
        struct timespec mtime, atime, ctime;
-       u32 nsplits;
-       struct ceph_inode_frag *frag;
-       struct rb_node *rb_node;
        struct ceph_buffer *xattr_blob = NULL;
+       struct ceph_cap *new_cap = NULL;
        int err = 0;
-       int queue_trunc = 0;
+       bool wake = false;
+       bool queue_trunc = false;
+       bool new_version = false;
  
        dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
             inode, ceph_vinop(inode), le64_to_cpu(info->version),
             ci->i_version);
  
+       /* prealloc new cap struct */
+       if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
+               new_cap = ceph_get_cap(mdsc, caps_reservation);
        /*
         * prealloc xattr data, if it looks like we'll need it.  only
         * if len > 4 (meaning there are actually xattrs; the first 4
         *   3    2     skip
         *   3    3     update
         */
-       if (le64_to_cpu(info->version) > 0 &&
-           (ci->i_version & ~1) >= le64_to_cpu(info->version))
-               goto no_change;
-       
+       if (ci->i_version == 0 ||
+           ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+            le64_to_cpu(info->version) > (ci->i_version & ~1)))
+               new_version = true;
        issued = __ceph_caps_issued(ci, &implemented);
        issued |= implemented | __ceph_caps_dirty(ci);
+       new_issued = ~issued & le32_to_cpu(info->cap.caps);
  
        /* update inode */
        ci->i_version = le64_to_cpu(info->version);
        inode->i_version++;
        inode->i_rdev = le32_to_cpu(info->rdev);
+       inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
  
-       if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+       if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
+           (issued & CEPH_CAP_AUTH_EXCL) == 0) {
                inode->i_mode = le32_to_cpu(info->mode);
                inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
                inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
                     from_kgid(&init_user_ns, inode->i_gid));
        }
  
-       if ((issued & CEPH_CAP_LINK_EXCL) == 0)
+       if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
+           (issued & CEPH_CAP_LINK_EXCL) == 0)
                set_nlink(inode, le32_to_cpu(info->nlink));
  
-       /* be careful with mtime, atime, size */
-       ceph_decode_timespec(&atime, &info->atime);
-       ceph_decode_timespec(&mtime, &info->mtime);
-       ceph_decode_timespec(&ctime, &info->ctime);
-       queue_trunc = ceph_fill_file_size(inode, issued,
-                                         le32_to_cpu(info->truncate_seq),
-                                         le64_to_cpu(info->truncate_size),
-                                         le64_to_cpu(info->size));
-       ceph_fill_file_time(inode, issued,
-                           le32_to_cpu(info->time_warp_seq),
-                           &ctime, &mtime, &atime);
-       ci->i_layout = info->layout;
-       inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+       if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
+               /* be careful with mtime, atime, size */
+               ceph_decode_timespec(&atime, &info->atime);
+               ceph_decode_timespec(&mtime, &info->mtime);
+               ceph_decode_timespec(&ctime, &info->ctime);
+               ceph_fill_file_time(inode, issued,
+                               le32_to_cpu(info->time_warp_seq),
+                               &ctime, &mtime, &atime);
+       }
+       if (new_version ||
+           (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
+               ci->i_layout = info->layout;
+               queue_trunc = ceph_fill_file_size(inode, issued,
+                                       le32_to_cpu(info->truncate_seq),
+                                       le64_to_cpu(info->truncate_size),
+                                       le64_to_cpu(info->size));
+               /* only update max_size on auth cap */
+               if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+                   ci->i_max_size != le64_to_cpu(info->max_size)) {
+                       dout("max_size %lld -> %llu\n", ci->i_max_size,
+                                       le64_to_cpu(info->max_size));
+                       ci->i_max_size = le64_to_cpu(info->max_size);
+               }
+       }
  
        /* xattrs */
        /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
                dout(" marking %p complete (empty)\n", inode);
                __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
        }
- no_change:
-       /* only update max_size on auth cap */
-       if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
-           ci->i_max_size != le64_to_cpu(info->max_size)) {
-               dout("max_size %lld -> %llu\n", ci->i_max_size,
-                    le64_to_cpu(info->max_size));
-               ci->i_max_size = le64_to_cpu(info->max_size);
-       }
-       spin_unlock(&ci->i_ceph_lock);
-       /* queue truncate if we saw i_size decrease */
-       if (queue_trunc)
-               ceph_queue_vmtruncate(inode);
-       /* populate frag tree */
-       /* FIXME: move me up, if/when version reflects fragtree changes */
-       nsplits = le32_to_cpu(info->fragtree.nsplits);
-       mutex_lock(&ci->i_fragtree_mutex);
-       rb_node = rb_first(&ci->i_fragtree);
-       for (i = 0; i < nsplits; i++) {
-               u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-               frag = NULL;
-               while (rb_node) {
-                       frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-                       if (ceph_frag_compare(frag->frag, id) >= 0) {
-                               if (frag->frag != id)
-                                       frag = NULL;
-                               else
-                                       rb_node = rb_next(rb_node);
-                               break;
-                       }
-                       rb_node = rb_next(rb_node);
-                       rb_erase(&frag->node, &ci->i_fragtree);
-                       kfree(frag);
-                       frag = NULL;
-               }
-               if (!frag) {
-                       frag = __get_or_create_frag(ci, id);
-                       if (IS_ERR(frag))
-                               continue;
-               }
-               frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
-               dout(" frag %x split by %d\n", frag->frag, frag->split_by);
-       }
-       while (rb_node) {
-               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-               rb_node = rb_next(rb_node);
-               rb_erase(&frag->node, &ci->i_fragtree);
-               kfree(frag);
-       }
-       mutex_unlock(&ci->i_fragtree_mutex);
  
        /* were we issued a capability? */
        if (info->cap.caps) {
                                     le32_to_cpu(info->cap.seq),
                                     le32_to_cpu(info->cap.mseq),
                                     le64_to_cpu(info->cap.realm),
-                                    info->cap.flags,
-                                    caps_reservation);
+                                    info->cap.flags, &new_cap);
+                       wake = true;
                } else {
-                       spin_lock(&ci->i_ceph_lock);
                        dout(" %p got snap_caps %s\n", inode,
                             ceph_cap_string(le32_to_cpu(info->cap.caps)));
                        ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
                        if (cap_fmode >= 0)
                                __ceph_get_fmode(ci, cap_fmode);
-                       spin_unlock(&ci->i_ceph_lock);
                }
        } else if (cap_fmode >= 0) {
 -              pr_warning("mds issued no caps on %llx.%llx\n",
 +              pr_warn("mds issued no caps on %llx.%llx\n",
                           ceph_vinop(inode));
                __ceph_get_fmode(ci, cap_fmode);
        }
+       spin_unlock(&ci->i_ceph_lock);
+       if (wake)
+               wake_up_all(&ci->i_cap_wq);
+       /* queue truncate if we saw i_size decrease */
+       if (queue_trunc)
+               ceph_queue_vmtruncate(inode);
+       /* populate frag tree */
+       if (S_ISDIR(inode->i_mode))
+               ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
  
        /* update delegation info? */
        if (dirinfo)
                ceph_fill_dirfrag(inode, dirinfo);
  
        err = 0;
  out:
+       if (new_cap)
+               ceph_put_cap(mdsc, new_cap);
        if (xattr_blob)
                ceph_buffer_put(xattr_blob);
        return err;
@@@ -1485,7 -1546,7 +1546,7 @@@ static void ceph_invalidate_work(struc
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&ci->i_ceph_lock);
  
-       truncate_inode_pages(inode->i_mapping, 0);
+       truncate_pagecache(inode, 0);
  
        spin_lock(&ci->i_ceph_lock);
        if (orig_gen == ci->i_rdcache_gen &&
@@@ -1588,7 -1649,7 +1649,7 @@@ retry
             ci->i_truncate_pending, to);
        spin_unlock(&ci->i_ceph_lock);
  
-       truncate_inode_pages(inode->i_mapping, to);
+       truncate_pagecache(inode, to);
  
        spin_lock(&ci->i_ceph_lock);
        if (to == ci->i_truncate_size) {
diff --combined fs/ceph/mds_client.c
@@@ -1558,6 -1558,8 +1558,8 @@@ ceph_mdsc_create_request(struct ceph_md
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
  
+       req->r_stamp = CURRENT_TIME;
        req->r_op = op;
        req->r_direct_mode = mode;
        return req;
@@@ -1783,7 -1785,8 +1785,8 @@@ static struct ceph_msg *create_request_
        }
  
        len = sizeof(*head) +
-               pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
+               pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
+               sizeof(struct timespec);
  
        /* calculate (max) length for cap releases */
        len += sizeof(struct ceph_mds_request_release) *
                goto out_free2;
        }
  
+       msg->hdr.version = 2;
        msg->hdr.tid = cpu_to_le64(req->r_tid);
  
        head = msg->front.iov_base;
                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
        head->num_releases = cpu_to_le16(releases);
  
+       /* time stamp */
+       ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
        BUG_ON(p > end);
        msg->front.iov_len = p - msg->front.iov_base;
        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@@ -2218,13 -2225,13 +2225,13 @@@ static void handle_reply(struct ceph_md
        /* dup? */
        if ((req->r_got_unsafe && !head->safe) ||
            (req->r_got_safe && head->safe)) {
 -              pr_warning("got a dup %s reply on %llu from mds%d\n",
 +              pr_warn("got a dup %s reply on %llu from mds%d\n",
                           head->safe ? "safe" : "unsafe", tid, mds);
                mutex_unlock(&mdsc->mutex);
                goto out;
        }
        if (req->r_got_safe && !head->safe) {
 -              pr_warning("got unsafe after safe on %llu from mds%d\n",
 +              pr_warn("got unsafe after safe on %llu from mds%d\n",
                           tid, mds);
                mutex_unlock(&mdsc->mutex);
                goto out;
@@@ -3525,7 -3532,7 +3532,7 @@@ static void peer_reset(struct ceph_conn
        struct ceph_mds_session *s = con->private;
        struct ceph_mds_client *mdsc = s->s_mdsc;
  
 -      pr_warning("mds%d closed our session\n", s->s_mds);
 +      pr_warn("mds%d closed our session\n", s->s_mds);
        send_mds_reconnect(mdsc, s);
  }