ceph: direct requests in snapped namespace based on nonsnap parent
authorSage Weil <sage@newdream.net>
Mon, 16 Aug 2010 16:21:27 +0000 (09:21 -0700)
committerSage Weil <sage@newdream.net>
Sun, 22 Aug 2010 22:16:48 +0000 (15:16 -0700)
When making a request in the virtual snapdir or a snapped portion of the
namespace, we should choose the MDS based on the first nonsnap parent (and
its caps).  If that is not the best place, we will get forward hints to
find the right MDS in the cluster.  This fixes ESTALE errors when using
the .snap directory and namespace with multiple MDSs.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/mds_client.c

index 397a47b..8d1f11c 100644 (file)
@@ -560,6 +560,13 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
  *
  * Called under mdsc->mutex.
  */
+struct dentry *get_nonsnap_parent(struct dentry *dentry)
+{
+       while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
+               dentry = dentry->d_parent;
+       return dentry;
+}
+
 static int __choose_mds(struct ceph_mds_client *mdsc,
                        struct ceph_mds_request *req)
 {
@@ -590,14 +597,29 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
        if (req->r_inode) {
                inode = req->r_inode;
        } else if (req->r_dentry) {
-               if (req->r_dentry->d_inode) {
+               struct inode *dir = req->r_dentry->d_parent->d_inode;
+
+               if (dir->i_sb != mdsc->client->sb) {
+                       /* not this fs! */
+                       inode = req->r_dentry->d_inode;
+               } else if (ceph_snap(dir) != CEPH_NOSNAP) {
+                       /* direct snapped/virtual snapdir requests
+                        * based on parent dir inode */
+                       struct dentry *dn =
+                               get_nonsnap_parent(req->r_dentry->d_parent);
+                       inode = dn->d_inode;
+                       dout("__choose_mds using nonsnap parent %p\n", inode);
+               } else if (req->r_dentry->d_inode) {
+                       /* dentry target */
                        inode = req->r_dentry->d_inode;
                } else {
-                       inode = req->r_dentry->d_parent->d_inode;
+                       /* dir + name */
+                       inode = dir;
                        hash = req->r_dentry->d_name.hash;
                        is_hash = true;
                }
        }
+
        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
             (int)hash, mode);
        if (!inode)