GFS2: Use RCU/hlist_bl based hash for quotas
authorSteven Whitehouse <swhiteho@redhat.com>
Thu, 12 Dec 2013 10:47:59 +0000 (10:47 +0000)
committerSteven Whitehouse <swhiteho@redhat.com>
Tue, 14 Jan 2014 19:27:56 +0000 (19:27 +0000)
Prior to this patch, GFS2 kept all the quotas for each
super block in a single linked list. This is rather slow
when there are large numbers of quotas.

This patch introduces a hlist_bl based hash table, similar
to the one used for glocks. The initial look up of the quota
is now lockless in the case where it is already cached,
although we still have to take the per quota spinlock in
order to bump the ref count. Either way though, this is a
big improvement on what was there before.

The qd_lock and the per super block list is preserved, for
the time being. However it is intended that since this is no
longer used for its original role, it should be possible to
shrink the number of items on that list in due course and
remove the requirement to take qd_lock in qd_get.

Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Cc: Abhijith Das <adas@redhat.com>
Cc: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
fs/gfs2/incore.h
fs/gfs2/main.c
fs/gfs2/quota.c
fs/gfs2/quota.h

index a99f60c98845db04ccc733feb1a6c0a4e09d4d6e..59d99ec9d8755fad7eeb573c439f697a61404fcb 100644 (file)
@@ -428,10 +428,13 @@ enum {
 };
 
 struct gfs2_quota_data {
+       struct hlist_bl_node qd_hlist;
        struct list_head qd_list;
        struct kqid qd_id;
+       struct gfs2_sbd *qd_sbd;
        struct lockref qd_lockref;
        struct list_head qd_lru;
+       unsigned qd_hash;
 
        unsigned long qd_flags;         /* QDF_... */
 
@@ -450,6 +453,7 @@ struct gfs2_quota_data {
 
        u64 qd_sync_gen;
        unsigned long qd_last_warn;
+       struct rcu_head qd_rcu;
 };
 
 struct gfs2_trans {
index 0650db2541ef6ff1739f0321bdf132f084a9f23f..c272e73063dede68b4e36c4a3ffc63b682a0c898 100644 (file)
@@ -76,6 +76,7 @@ static int __init init_gfs2_fs(void)
 
        gfs2_str2qstr(&gfs2_qdot, ".");
        gfs2_str2qstr(&gfs2_qdotdot, "..");
+       gfs2_quota_hash_init();
 
        error = gfs2_sys_init();
        if (error)
index 1b6b3675ee1d546b15b78bdf5dc1ee9fb5823b0d..a1df01d381a859da72f91317d5bee16b35c02e94 100644 (file)
 #include <linux/dqblk_xfs.h>
 #include <linux/lockref.h>
 #include <linux/list_lru.h>
+#include <linux/rcupdate.h>
+#include <linux/rculist_bl.h>
+#include <linux/bit_spinlock.h>
+#include <linux/jhash.h>
 
 #include "gfs2.h"
 #include "incore.h"
 #include "inode.h"
 #include "util.h"
 
-/* Lock order: qd_lock -> qd->lockref.lock -> lru lock */
+#define GFS2_QD_HASH_SHIFT      12
+#define GFS2_QD_HASH_SIZE       (1 << GFS2_QD_HASH_SHIFT)
+#define GFS2_QD_HASH_MASK       (GFS2_QD_HASH_SIZE - 1)
+
+/* Lock order: qd_lock -> bucket lock -> qd->lockref.lock -> lru lock */
 static DEFINE_SPINLOCK(qd_lock);
 struct list_lru gfs2_qd_lru;
 
+static struct hlist_bl_head qd_hash_table[GFS2_QD_HASH_SIZE];
+
+static unsigned int gfs2_qd_hash(const struct gfs2_sbd *sdp,
+                                const struct kqid qid)
+{
+       unsigned int h;
+
+       h = jhash(&sdp, sizeof(struct gfs2_sbd *), 0);
+       h = jhash(&qid, sizeof(struct kqid), h);
+
+       return h & GFS2_QD_HASH_MASK;
+}
+
+static inline void spin_lock_bucket(unsigned int hash)
+{
+        hlist_bl_lock(&qd_hash_table[hash]);
+}
+
+static inline void spin_unlock_bucket(unsigned int hash)
+{
+        hlist_bl_unlock(&qd_hash_table[hash]);
+}
+
+static void gfs2_qd_dealloc(struct rcu_head *rcu)
+{
+       struct gfs2_quota_data *qd = container_of(rcu, struct gfs2_quota_data, qd_rcu);
+       kmem_cache_free(gfs2_quotad_cachep, qd);
+}
+
 static void gfs2_qd_dispose(struct list_head *list)
 {
        struct gfs2_quota_data *qd;
@@ -87,6 +124,10 @@ static void gfs2_qd_dispose(struct list_head *list)
                list_del(&qd->qd_list);
                spin_unlock(&qd_lock);
 
+               spin_lock_bucket(qd->qd_hash);
+               hlist_bl_del_rcu(&qd->qd_hlist);
+               spin_unlock_bucket(qd->qd_hash);
+
                gfs2_assert_warn(sdp, !qd->qd_change);
                gfs2_assert_warn(sdp, !qd->qd_slot_count);
                gfs2_assert_warn(sdp, !qd->qd_bh_count);
@@ -95,7 +136,7 @@ static void gfs2_qd_dispose(struct list_head *list)
                atomic_dec(&sdp->sd_quota_count);
 
                /* Delete it from the common reclaim list */
-               kmem_cache_free(gfs2_quotad_cachep, qd);
+               call_rcu(&qd->qd_rcu, gfs2_qd_dealloc);
        }
 }
 
@@ -165,83 +206,95 @@ static u64 qd2offset(struct gfs2_quota_data *qd)
        return offset;
 }
 
-static int qd_alloc(struct gfs2_sbd *sdp, struct kqid qid,
-                   struct gfs2_quota_data **qdp)
+static struct gfs2_quota_data *qd_alloc(unsigned hash, struct gfs2_sbd *sdp, struct kqid qid)
 {
        struct gfs2_quota_data *qd;
        int error;
 
        qd = kmem_cache_zalloc(gfs2_quotad_cachep, GFP_NOFS);
        if (!qd)
-               return -ENOMEM;
+               return NULL;
 
+       qd->qd_sbd = sdp;
        qd->qd_lockref.count = 1;
        spin_lock_init(&qd->qd_lockref.lock);
        qd->qd_id = qid;
        qd->qd_slot = -1;
        INIT_LIST_HEAD(&qd->qd_lru);
+       qd->qd_hash = hash;
 
        error = gfs2_glock_get(sdp, qd2index(qd),
                              &gfs2_quota_glops, CREATE, &qd->qd_gl);
        if (error)
                goto fail;
 
-       *qdp = qd;
-
-       return 0;
+       return qd;
 
 fail:
        kmem_cache_free(gfs2_quotad_cachep, qd);
-       return error;
+       return NULL;
 }
 
-static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
-                 struct gfs2_quota_data **qdp)
+static struct gfs2_quota_data *gfs2_qd_search_bucket(unsigned int hash,
+                                                    const struct gfs2_sbd *sdp,
+                                                    struct kqid qid)
 {
-       struct gfs2_quota_data *qd = NULL, *new_qd = NULL;
-       int error, found;
-
-       *qdp = NULL;
+       struct gfs2_quota_data *qd;
+       struct hlist_bl_node *h;
 
-       for (;;) {
-               found = 0;
-               spin_lock(&qd_lock);
-               list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
-                       if (qid_eq(qd->qd_id, qid) &&
-                           lockref_get_not_dead(&qd->qd_lockref)) {
-                               list_lru_del(&gfs2_qd_lru, &qd->qd_lru);
-                               found = 1;
-                               break;
-                       }
+       hlist_bl_for_each_entry_rcu(qd, h, &qd_hash_table[hash], qd_hlist) {
+               if (!qid_eq(qd->qd_id, qid))
+                       continue;
+               if (qd->qd_sbd != sdp)
+                       continue;
+               if (lockref_get_not_dead(&qd->qd_lockref)) {
+                       list_lru_del(&gfs2_qd_lru, &qd->qd_lru);
+                       return qd;
                }
+       }
 
-               if (!found)
-                       qd = NULL;
+       return NULL;
+}
 
-               if (!qd && new_qd) {
-                       qd = new_qd;
-                       list_add(&qd->qd_list, &sdp->sd_quota_list);
-                       atomic_inc(&sdp->sd_quota_count);
-                       new_qd = NULL;
-               }
 
-               spin_unlock(&qd_lock);
+static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
+                 struct gfs2_quota_data **qdp)
+{
+       struct gfs2_quota_data *qd, *new_qd;
+       unsigned int hash = gfs2_qd_hash(sdp, qid);
 
-               if (qd) {
-                       if (new_qd) {
-                               gfs2_glock_put(new_qd->qd_gl);
-                               kmem_cache_free(gfs2_quotad_cachep, new_qd);
-                       }
-                       *qdp = qd;
-                       return 0;
-               }
+       rcu_read_lock();
+       *qdp = qd = gfs2_qd_search_bucket(hash, sdp, qid);
+       rcu_read_unlock();
 
-               error = qd_alloc(sdp, qid, &new_qd);
-               if (error)
-                       return error;
+       if (qd)
+               return 0;
+
+       new_qd = qd_alloc(hash, sdp, qid);
+       if (!new_qd)
+               return -ENOMEM;
+
+       spin_lock(&qd_lock);
+       spin_lock_bucket(hash);
+       *qdp = qd = gfs2_qd_search_bucket(hash, sdp, qid);
+       if (qd == NULL) {
+               *qdp = new_qd;
+               list_add(&new_qd->qd_list, &sdp->sd_quota_list);
+               hlist_bl_add_head_rcu(&new_qd->qd_hlist, &qd_hash_table[hash]);
+               atomic_inc(&sdp->sd_quota_count);
+       }
+       spin_unlock_bucket(hash);
+       spin_unlock(&qd_lock);
+
+       if (qd) {
+               gfs2_glock_put(new_qd->qd_gl);
+               kmem_cache_free(gfs2_quotad_cachep, new_qd);
        }
+
+       return 0;
 }
 
+
 static void qd_hold(struct gfs2_quota_data *qd)
 {
        struct gfs2_sbd *sdp = qd->qd_gl->gl_sbd;
@@ -1215,6 +1268,7 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
        unsigned int blocks = size >> sdp->sd_sb.sb_bsize_shift;
        unsigned int x, slot = 0;
        unsigned int found = 0;
+       unsigned int hash;
        u64 dblock;
        u32 extlen = 0;
        int error;
@@ -1272,8 +1326,9 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
                        if (!qc_change)
                                continue;
 
-                       error = qd_alloc(sdp, qc_id, &qd);
-                       if (error) {
+                       hash = gfs2_qd_hash(sdp, qc_id);
+                       qd = qd_alloc(hash, sdp, qc_id);
+                       if (qd == NULL) {
                                brelse(bh);
                                goto fail;
                        }
@@ -1289,6 +1344,10 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
                        atomic_inc(&sdp->sd_quota_count);
                        spin_unlock(&qd_lock);
 
+                       spin_lock_bucket(hash);
+                       hlist_bl_add_head_rcu(&qd->qd_hlist, &qd_hash_table[hash]);
+                       spin_unlock_bucket(hash);
+
                        found++;
                }
 
@@ -1335,11 +1394,16 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
                spin_unlock(&qd->qd_lockref.lock);
 
                list_del(&qd->qd_list);
+
                /* Also remove if this qd exists in the reclaim list */
                list_lru_del(&gfs2_qd_lru, &qd->qd_lru);
                atomic_dec(&sdp->sd_quota_count);
                spin_unlock(&qd_lock);
 
+               spin_lock_bucket(qd->qd_hash);
+               hlist_bl_del_rcu(&qd->qd_hlist);
+               spin_unlock_bucket(qd->qd_hash);
+
                if (!qd->qd_lockref.count) {
                        gfs2_assert_warn(sdp, !qd->qd_change);
                        gfs2_assert_warn(sdp, !qd->qd_slot_count);
@@ -1348,7 +1412,7 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
                gfs2_assert_warn(sdp, !qd->qd_bh_count);
 
                gfs2_glock_put(qd->qd_gl);
-               kmem_cache_free(gfs2_quotad_cachep, qd);
+               call_rcu(&qd->qd_rcu, gfs2_qd_dealloc);
 
                spin_lock(&qd_lock);
        }
@@ -1643,3 +1707,11 @@ const struct quotactl_ops gfs2_quotactl_ops = {
        .get_dqblk      = gfs2_get_dqblk,
        .set_dqblk      = gfs2_set_dqblk,
 };
+
+void __init gfs2_quota_hash_init(void)
+{
+       unsigned i;
+
+       for(i = 0; i < GFS2_QD_HASH_SIZE; i++)
+               INIT_HLIST_BL_HEAD(&qd_hash_table[i]);
+}
index 96e4f34a03b0d8e8eadf13a330e0e48907416bc6..55d506eb3c4a31ab956ab23e85158d0ddc4947ea 100644 (file)
@@ -57,5 +57,6 @@ static inline int gfs2_quota_lock_check(struct gfs2_inode *ip)
 extern const struct quotactl_ops gfs2_quotactl_ops;
 extern struct shrinker gfs2_qd_shrinker;
 extern struct list_lru gfs2_qd_lru;
+extern void __init gfs2_quota_hash_init(void);
 
 #endif /* __QUOTA_DOT_H__ */