dlm: use idr instead of list for recovered rsbs
authorDavid Teigland <teigland@redhat.com>
Tue, 15 May 2012 21:07:49 +0000 (16:07 -0500)
committerDavid Teigland <teigland@redhat.com>
Mon, 16 Jul 2012 19:17:52 +0000 (14:17 -0500)
When a large number of resources are being recovered,
a linear search of the recover_list takes a long time.
Use an idr in place of a list.

Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/dlm_internal.h
fs/dlm/lockspace.c
fs/dlm/rcom.c
fs/dlm/recover.c

index 3093207..a5f82d5 100644 (file)
@@ -288,6 +288,7 @@ struct dlm_rsb {
        int                     res_nodeid;
        int                     res_master_nodeid;
        int                     res_dir_nodeid;
+       int                     res_id;         /* for ls_recover_idr */
        uint32_t                res_lvbseq;
        uint32_t                res_hash;
        uint32_t                res_bucket;     /* rsbtbl */
@@ -587,6 +588,8 @@ struct dlm_ls {
        struct list_head        ls_recover_list;
        spinlock_t              ls_recover_list_lock;
        int                     ls_recover_list_count;
+       struct idr              ls_recover_idr;
+       spinlock_t              ls_recover_idr_lock;
        wait_queue_head_t       ls_wait_general;
        struct mutex            ls_clear_proc_locks;
 
index 065bb75..d4d3b31 100644 (file)
@@ -565,6 +565,8 @@ static int new_lockspace(const char *name, const char *cluster,
 
        INIT_LIST_HEAD(&ls->ls_recover_list);
        spin_lock_init(&ls->ls_recover_list_lock);
+       idr_init(&ls->ls_recover_idr);
+       spin_lock_init(&ls->ls_recover_idr_lock);
        ls->ls_recover_list_count = 0;
        ls->ls_local_handle = ls;
        init_waitqueue_head(&ls->ls_wait_general);
@@ -636,6 +638,7 @@ static int new_lockspace(const char *name, const char *cluster,
        spin_lock(&lslist_lock);
        list_del(&ls->ls_list);
        spin_unlock(&lslist_lock);
+       idr_destroy(&ls->ls_recover_idr);
        kfree(ls->ls_recover_buf);
  out_lkbfree:
        idr_destroy(&ls->ls_lkbidr);
index c8c298d..87f1a56 100644 (file)
@@ -325,7 +325,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
        if (error)
                goto out;
        memcpy(rc->rc_buf, r->res_name, r->res_length);
-       rc->rc_id = (unsigned long) r;
+       rc->rc_id = (unsigned long) r->res_id;
 
        send_rcom(ls, mh, rc);
  out:
index 3c025fe..ff6f276 100644 (file)
@@ -277,22 +277,6 @@ static void recover_list_del(struct dlm_rsb *r)
        dlm_put_rsb(r);
 }
 
-static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
-{
-       struct dlm_rsb *r = NULL;
-
-       spin_lock(&ls->ls_recover_list_lock);
-
-       list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
-               if (id == (unsigned long) r)
-                       goto out;
-       }
-       r = NULL;
- out:
-       spin_unlock(&ls->ls_recover_list_lock);
-       return r;
-}
-
 static void recover_list_clear(struct dlm_ls *ls)
 {
        struct dlm_rsb *r, *s;
@@ -313,6 +297,94 @@ static void recover_list_clear(struct dlm_ls *ls)
        spin_unlock(&ls->ls_recover_list_lock);
 }
 
+static int recover_idr_empty(struct dlm_ls *ls)
+{
+       int empty = 1;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       if (ls->ls_recover_list_count)
+               empty = 0;
+       spin_unlock(&ls->ls_recover_idr_lock);
+
+       return empty;
+}
+
+static int recover_idr_add(struct dlm_rsb *r)
+{
+       struct dlm_ls *ls = r->res_ls;
+       int rv, id;
+
+       rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
+       if (!rv)
+               return -ENOMEM;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       if (r->res_id) {
+               spin_unlock(&ls->ls_recover_idr_lock);
+               return -1;
+       }
+       rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
+       if (rv) {
+               spin_unlock(&ls->ls_recover_idr_lock);
+               return rv;
+       }
+       r->res_id = id;
+       ls->ls_recover_list_count++;
+       dlm_hold_rsb(r);
+       spin_unlock(&ls->ls_recover_idr_lock);
+       return 0;
+}
+
+static void recover_idr_del(struct dlm_rsb *r)
+{
+       struct dlm_ls *ls = r->res_ls;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       idr_remove(&ls->ls_recover_idr, r->res_id);
+       r->res_id = 0;
+       ls->ls_recover_list_count--;
+       spin_unlock(&ls->ls_recover_idr_lock);
+
+       dlm_put_rsb(r);
+}
+
+static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
+{
+       struct dlm_rsb *r;
+
+       spin_lock(&ls->ls_recover_idr_lock);
+       r = idr_find(&ls->ls_recover_idr, (int)id);
+       spin_unlock(&ls->ls_recover_idr_lock);
+       return r;
+}
+
+static int recover_idr_clear_rsb(int id, void *p, void *data)
+{
+       struct dlm_ls *ls = data;
+       struct dlm_rsb *r = p;
+
+       r->res_id = 0;
+       r->res_recover_locks_count = 0;
+       ls->ls_recover_list_count--;
+
+       dlm_put_rsb(r);
+       return 0;
+}
+
+static void recover_idr_clear(struct dlm_ls *ls)
+{
+       spin_lock(&ls->ls_recover_idr_lock);
+       idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
+       idr_remove_all(&ls->ls_recover_idr);
+
+       if (ls->ls_recover_list_count != 0) {
+               log_error(ls, "warning: recover_list_count %d",
+                         ls->ls_recover_list_count);
+               ls->ls_recover_list_count = 0;
+       }
+       spin_unlock(&ls->ls_recover_idr_lock);
+}
+
 
 /* Master recovery: find new master node for rsb's that were
    mastered on nodes that have been removed.
@@ -408,7 +480,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count)
                set_new_master(r);
                error = 0;
        } else {
-               recover_list_add(r);
+               recover_idr_add(r);
                error = dlm_send_rcom_lookup(r, dir_nodeid);
        }
 
@@ -493,10 +565,10 @@ int dlm_recover_masters(struct dlm_ls *ls)
 
        log_debug(ls, "dlm_recover_masters %u of %u", count, total);
 
-       error = dlm_wait_function(ls, &recover_list_empty);
+       error = dlm_wait_function(ls, &recover_idr_empty);
  out:
        if (error)
-               recover_list_clear(ls);
+               recover_idr_clear(ls);
        return error;
 }
 
@@ -505,7 +577,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
        struct dlm_rsb *r;
        int ret_nodeid, new_master;
 
-       r = recover_list_find(ls, rc->rc_id);
+       r = recover_idr_find(ls, rc->rc_id);
        if (!r) {
                log_error(ls, "dlm_recover_master_reply no id %llx",
                          (unsigned long long)rc->rc_id);
@@ -524,9 +596,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
        r->res_nodeid = new_master;
        set_new_master(r);
        unlock_rsb(r);
-       recover_list_del(r);
+       recover_idr_del(r);
 
-       if (recover_list_empty(ls))
+       if (recover_idr_empty(ls))
                wake_up(&ls->ls_wait_general);
  out:
        return 0;