orangefs: bufmap rewrite
authorAl Viro <viro@zeniv.linux.org.uk>
Sun, 14 Feb 2016 02:01:21 +0000 (21:01 -0500)
committerMike Marshall <hubcap@omnibond.com>
Fri, 19 Feb 2016 18:45:54 +0000 (13:45 -0500)
new waiting-for-slot logics:
* make request for slot wait for bufmap to be set up if it
comes before it's installed *OR* while it's running down
* make closing control device wait for all slots to be freed
* waiting itself rewritten to (open-coded) analogues of wait_event_...
primitives - we would need wait_event_locked() and, pardon an obscenely
long name, wait_event_interruptible_exclusive_timeout_locked().
* we never wait for more than slot_timeout_secs in total and,
if during the wait the daemon goes away, we only allow
ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS for it to come back.
* (cosmetical) bitmap is used instead of an array of zeroes and ones
* old (and only reached if we are about to corrupt memory) waiting
for daemon restart in service_operation() removed.

[Martin's fixes folded]

Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
fs/orangefs/devorangefs-req.c
fs/orangefs/orangefs-bufmap.c
fs/orangefs/orangefs-bufmap.h
fs/orangefs/waitqueue.c

index 6a7df12..790855a 100644 (file)
@@ -508,8 +508,7 @@ static int orangefs_devreq_release(struct inode *inode, struct file *file)
                     __func__);
 
        mutex_lock(&devreq_mutex);
-       if (orangefs_get_bufmap_init())
-               orangefs_bufmap_finalize();
+       orangefs_bufmap_finalize();
 
        open_access_count = -1;
 
@@ -527,6 +526,9 @@ static int orangefs_devreq_release(struct inode *inode, struct file *file)
         * them as purged and wake them up
         */
        purge_inprogress_ops();
+
+       orangefs_bufmap_run_down();
+
        gossip_debug(GOSSIP_DEV_DEBUG,
                     "pvfs2-client-core: device close complete\n");
        open_access_count = 0;
@@ -607,13 +609,8 @@ static long dispatch_ioctl_command(unsigned int command, unsigned long arg)
                                     (struct ORANGEFS_dev_map_desc __user *)
                                     arg,
                                     sizeof(struct ORANGEFS_dev_map_desc));
-               if (orangefs_get_bufmap_init()) {
-                       return -EINVAL;
-               } else {
-                       return ret ?
-                              -EIO :
-                              orangefs_bufmap_initialize(&user_desc);
-               }
+               /* WTF -EIO and not -EFAULT? */
+               return ret ? -EIO : orangefs_bufmap_initialize(&user_desc);
        case ORANGEFS_DEV_REMOUNT_ALL:
                gossip_debug(GOSSIP_DEV_DEBUG,
                             "%s: got ORANGEFS_DEV_REMOUNT_ALL\n",
index cd48466..96faf4e 100644 (file)
@@ -7,7 +7,133 @@
 #include "orangefs-kernel.h"
 #include "orangefs-bufmap.h"
 
-DECLARE_WAIT_QUEUE_HEAD(orangefs_bufmap_init_waitq);
+struct slot_map {
+       int c;
+       wait_queue_head_t q;
+       int count;
+       unsigned long *map;
+};
+
+static struct slot_map rw_map = {
+       .c = -1,
+       .q = __WAIT_QUEUE_HEAD_INITIALIZER(rw_map.q)
+};
+static struct slot_map readdir_map = {
+       .c = -1,
+       .q = __WAIT_QUEUE_HEAD_INITIALIZER(readdir_map.q)
+};
+
+
+static void install(struct slot_map *m, int count, unsigned long *map)
+{
+       spin_lock(&m->q.lock);
+       m->c = m->count = count;
+       m->map = map;
+       wake_up_all_locked(&m->q);
+       spin_unlock(&m->q.lock);
+}
+
+static void mark_killed(struct slot_map *m)
+{
+       spin_lock(&m->q.lock);
+       m->c -= m->count + 1;
+       spin_unlock(&m->q.lock);
+}
+
+static void run_down(struct slot_map *m)
+{
+       DEFINE_WAIT(wait);
+       spin_lock(&m->q.lock);
+       if (m->c != -1) {
+               for (;;) {
+                       if (likely(list_empty(&wait.task_list)))
+                               __add_wait_queue_tail(&m->q, &wait);
+                       set_current_state(TASK_UNINTERRUPTIBLE);
+
+                       if (m->c == -1)
+                               break;
+
+                       spin_unlock(&m->q.lock);
+                       schedule();
+                       spin_lock(&m->q.lock);
+               }
+               __remove_wait_queue(&m->q, &wait);
+               __set_current_state(TASK_RUNNING);
+       }
+       m->map = NULL;
+       spin_unlock(&m->q.lock);
+}
+
+static void put(struct slot_map *m, int slot)
+{
+       int v;
+       spin_lock(&m->q.lock);
+       __clear_bit(slot, m->map);
+       v = ++m->c;
+       if (unlikely(v == 1))   /* no free slots -> one free slot */
+               wake_up_locked(&m->q);
+       else if (unlikely(v == -1))     /* finished dying */
+               wake_up_all_locked(&m->q);
+       spin_unlock(&m->q.lock);
+}
+
+static int wait_for_free(struct slot_map *m)
+{
+       long left = slot_timeout_secs * HZ;
+       DEFINE_WAIT(wait);
+
+       do {
+               long n = left, t;
+               if (likely(list_empty(&wait.task_list)))
+                       __add_wait_queue_tail_exclusive(&m->q, &wait);
+               set_current_state(TASK_INTERRUPTIBLE);
+
+               if (m->c > 0)
+                       break;
+
+               if (m->c < 0) {
+                       /* we are waiting for map to be installed */
+                       /* it would better be there soon, or we go away */
+                       if (n > ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ)
+                               n = ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ;
+               }
+               spin_unlock(&m->q.lock);
+               t = schedule_timeout(n);
+               spin_lock(&m->q.lock);
+               if (unlikely(!t) && n != left && m->c < 0)
+                       left = t;
+               else
+                       left = t + (left - n);
+               if (unlikely(signal_pending(current)))
+                       left = -EINTR;
+       } while (left > 0);
+
+       if (!list_empty(&wait.task_list))
+               list_del(&wait.task_list);
+       else if (left <= 0 && waitqueue_active(&m->q))
+               __wake_up_locked_key(&m->q, TASK_INTERRUPTIBLE, NULL);
+       __set_current_state(TASK_RUNNING);
+
+       if (likely(left > 0))
+               return 0;
+
+       return left < 0 ? -EINTR : -ETIMEDOUT;
+}
+
+static int get(struct slot_map *m)
+{
+       int res = 0;
+       spin_lock(&m->q.lock);
+       if (unlikely(m->c <= 0))
+               res = wait_for_free(m);
+       if (likely(!res)) {
+               m->c--;
+               res = find_first_zero_bit(m->map, m->count);
+               __set_bit(res, m->map);
+       }
+       spin_unlock(&m->q.lock);
+       return res;
+}
 
 /* used to describe mapped buffers */
 struct orangefs_bufmap_desc {
@@ -18,8 +144,6 @@ struct orangefs_bufmap_desc {
 };
 
 static struct orangefs_bufmap {
-       atomic_t refcnt;
-
        int desc_size;
        int desc_shift;
        int desc_count;
@@ -30,12 +154,12 @@ static struct orangefs_bufmap {
        struct orangefs_bufmap_desc *desc_array;
 
        /* array to track usage of buffer descriptors */
-       int *buffer_index_array;
-       spinlock_t buffer_index_lock;
+       unsigned long *buffer_index_array;
 
        /* array to track usage of buffer descriptors for readdir */
-       int readdir_index_array[ORANGEFS_READDIR_DEFAULT_DESC_COUNT];
-       spinlock_t readdir_index_lock;
+#define N DIV_ROUND_UP(ORANGEFS_READDIR_DEFAULT_DESC_COUNT, BITS_PER_LONG)
+       unsigned long readdir_index_array[N];
+#undef N
 } *__orangefs_bufmap;
 
 static DEFINE_SPINLOCK(orangefs_bufmap_lock);
@@ -58,30 +182,6 @@ orangefs_bufmap_free(struct orangefs_bufmap *bufmap)
        kfree(bufmap);
 }
 
-static struct orangefs_bufmap *orangefs_bufmap_ref(void)
-{
-       struct orangefs_bufmap *bufmap = NULL;
-
-       spin_lock(&orangefs_bufmap_lock);
-       if (__orangefs_bufmap) {
-               bufmap = __orangefs_bufmap;
-               atomic_inc(&bufmap->refcnt);
-       }
-       spin_unlock(&orangefs_bufmap_lock);
-       return bufmap;
-}
-
-static void orangefs_bufmap_unref(struct orangefs_bufmap *bufmap)
-{
-       if (atomic_dec_and_lock(&bufmap->refcnt, &orangefs_bufmap_lock)) {
-               __orangefs_bufmap = NULL;
-               spin_unlock(&orangefs_bufmap_lock);
-
-               orangefs_bufmap_unmap(bufmap);
-               orangefs_bufmap_free(bufmap);
-       }
-}
-
 /*
  * XXX: Can the size and shift change while the caller gives up the 
  * XXX: lock between calling this and doing something useful?
@@ -137,21 +237,18 @@ orangefs_bufmap_alloc(struct ORANGEFS_dev_map_desc *user_desc)
        if (!bufmap)
                goto out;
 
-       atomic_set(&bufmap->refcnt, 1);
        bufmap->total_size = user_desc->total_size;
        bufmap->desc_count = user_desc->count;
        bufmap->desc_size = user_desc->size;
        bufmap->desc_shift = ilog2(bufmap->desc_size);
 
-       spin_lock_init(&bufmap->buffer_index_lock);
        bufmap->buffer_index_array =
-               kcalloc(bufmap->desc_count, sizeof(int), GFP_KERNEL);
+               kzalloc(DIV_ROUND_UP(bufmap->desc_count, BITS_PER_LONG), GFP_KERNEL);
        if (!bufmap->buffer_index_array) {
                gossip_err("orangefs: could not allocate %d buffer indices\n",
                                bufmap->desc_count);
                goto out_free_bufmap;
        }
-       spin_lock_init(&bufmap->readdir_index_lock);
 
        bufmap->desc_array =
                kcalloc(bufmap->desc_count, sizeof(struct orangefs_bufmap_desc),
@@ -294,24 +391,18 @@ int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc)
        if (__orangefs_bufmap) {
                spin_unlock(&orangefs_bufmap_lock);
                gossip_err("orangefs: error: bufmap already initialized.\n");
-               ret = -EALREADY;
+               ret = -EINVAL;
                goto out_unmap_bufmap;
        }
        __orangefs_bufmap = bufmap;
+       install(&rw_map,
+               bufmap->desc_count,
+               bufmap->buffer_index_array);
+       install(&readdir_map,
+               ORANGEFS_READDIR_DEFAULT_DESC_COUNT,
+               bufmap->readdir_index_array);
        spin_unlock(&orangefs_bufmap_lock);
 
-       /*
-        * If there are operations in orangefs_bufmap_init_waitq, wake them up.
-        * This scenario occurs when the client-core is restarted and I/O
-        * requests in the in-progress or waiting tables are restarted.  I/O
-        * requests cannot be restarted until the shared memory system is
-        * completely re-initialized, so we put the I/O requests in this
-        * waitq until initialization has completed.  NOTE:  the I/O requests
-        * are also on a timer, so they don't wait forever just in case the
-        * client-core doesn't come back up.
-        */
-       wake_up_interruptible(&orangefs_bufmap_init_waitq);
-
        gossip_debug(GOSSIP_BUFMAP_DEBUG,
                     "orangefs_bufmap_initialize: exiting normally\n");
        return 0;
@@ -334,91 +425,28 @@ out:
  */
 void orangefs_bufmap_finalize(void)
 {
+       struct orangefs_bufmap *bufmap = __orangefs_bufmap;
+       if (!bufmap)
+               return;
        gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: called\n");
-       BUG_ON(!__orangefs_bufmap);
-       orangefs_bufmap_unref(__orangefs_bufmap);
+       mark_killed(&rw_map);
+       mark_killed(&readdir_map);
        gossip_debug(GOSSIP_BUFMAP_DEBUG,
                     "orangefs_bufmap_finalize: exiting normally\n");
 }
 
-struct slot_args {
-       int slot_count;
-       int *slot_array;
-       spinlock_t *slot_lock;
-       wait_queue_head_t *slot_wq;
-};
-
-static int wait_for_a_slot(struct slot_args *slargs, int *buffer_index)
+void orangefs_bufmap_run_down(void)
 {
-       int ret = -1;
-       int i = 0;
-       DEFINE_WAIT(wait_entry);
-
-       while (1) {
-               /*
-                * check for available desc, slot_lock is the appropriate
-                * index_lock
-                */
-               spin_lock(slargs->slot_lock);
-               prepare_to_wait_exclusive(slargs->slot_wq,
-                                         &wait_entry,
-                                         TASK_INTERRUPTIBLE);
-               for (i = 0; i < slargs->slot_count; i++)
-                       if (slargs->slot_array[i] == 0) {
-                               slargs->slot_array[i] = 1;
-                               *buffer_index = i;
-                               ret = 0;
-                               break;
-                       }
-               spin_unlock(slargs->slot_lock);
-
-               /* if we acquired a buffer, then break out of while */
-               if (ret == 0)
-                       break;
-
-               if (!signal_pending(current)) {
-                       gossip_debug(GOSSIP_BUFMAP_DEBUG,
-                                    "[BUFMAP]: waiting %d "
-                                    "seconds for a slot\n",
-                                    slot_timeout_secs);
-                       if (!schedule_timeout(slot_timeout_secs * HZ)) {
-                               gossip_debug(GOSSIP_BUFMAP_DEBUG,
-                                            "*** wait_for_a_slot timed out\n");
-                               ret = -ETIMEDOUT;
-                               break;
-                       }
-                       gossip_debug(GOSSIP_BUFMAP_DEBUG,
-                         "[BUFMAP]: woken up by a slot becoming available.\n");
-                       continue;
-               }
-
-               gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs: %s interrupted.\n",
-                            __func__);
-               ret = -EINTR;
-               break;
-       }
-
-       spin_lock(slargs->slot_lock);
-       finish_wait(slargs->slot_wq, &wait_entry);
-       spin_unlock(slargs->slot_lock);
-       return ret;
-}
-
-static void put_back_slot(struct slot_args *slargs, int buffer_index)
-{
-       /* slot_lock is the appropriate index_lock */
-       spin_lock(slargs->slot_lock);
-       if (buffer_index < 0 || buffer_index >= slargs->slot_count) {
-               spin_unlock(slargs->slot_lock);
+       struct orangefs_bufmap *bufmap = __orangefs_bufmap;
+       if (!bufmap)
                return;
-       }
-
-       /* put the desc back on the queue */
-       slargs->slot_array[buffer_index] = 0;
-       spin_unlock(slargs->slot_lock);
-
-       /* wake up anyone who may be sleeping on the queue */
-       wake_up_interruptible(slargs->slot_wq);
+       run_down(&rw_map);
+       run_down(&readdir_map);
+       spin_lock(&orangefs_bufmap_lock);
+       __orangefs_bufmap = NULL;
+       spin_unlock(&orangefs_bufmap_lock);
+       orangefs_bufmap_unmap(bufmap);
+       orangefs_bufmap_free(bufmap);
 }
 
 /*
@@ -431,23 +459,12 @@ static void put_back_slot(struct slot_args *slargs, int buffer_index)
  */
 int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index)
 {
-       struct orangefs_bufmap *bufmap = orangefs_bufmap_ref();
-       struct slot_args slargs;
-       int ret;
-
-       if (!bufmap) {
-               gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n");
-               return -EIO;
+       int ret = get(&rw_map);
+       if (ret >= 0) {
+               *mapp = __orangefs_bufmap;
+               *buffer_index = ret;
+               ret = 0;
        }
-
-       slargs.slot_count = bufmap->desc_count;
-       slargs.slot_array = bufmap->buffer_index_array;
-       slargs.slot_lock = &bufmap->buffer_index_lock;
-       slargs.slot_wq = &bufmap_waitq;
-       ret = wait_for_a_slot(&slargs, buffer_index);
-       if (ret)
-               orangefs_bufmap_unref(bufmap);
-       *mapp = bufmap;
        return ret;
 }
 
@@ -460,15 +477,7 @@ int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index)
  */
 void orangefs_bufmap_put(int buffer_index)
 {
-       struct slot_args slargs;
-       struct orangefs_bufmap *bufmap = __orangefs_bufmap;
-
-       slargs.slot_count = bufmap->desc_count;
-       slargs.slot_array = bufmap->buffer_index_array;
-       slargs.slot_lock = &bufmap->buffer_index_lock;
-       slargs.slot_wq = &bufmap_waitq;
-       put_back_slot(&slargs, buffer_index);
-       orangefs_bufmap_unref(bufmap);
+       put(&rw_map, buffer_index);
 }
 
 /*
@@ -484,36 +493,18 @@ void orangefs_bufmap_put(int buffer_index)
  */
 int orangefs_readdir_index_get(struct orangefs_bufmap **mapp, int *buffer_index)
 {
-       struct orangefs_bufmap *bufmap = orangefs_bufmap_ref();
-       struct slot_args slargs;
-       int ret;
-
-       if (!bufmap) {
-               gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n");
-               return -EIO;
+       int ret = get(&readdir_map);
+       if (ret >= 0) {
+               *mapp = __orangefs_bufmap;
+               *buffer_index = ret;
+               ret = 0;
        }
-
-       slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT;
-       slargs.slot_array = bufmap->readdir_index_array;
-       slargs.slot_lock = &bufmap->readdir_index_lock;
-       slargs.slot_wq = &readdir_waitq;
-       ret = wait_for_a_slot(&slargs, buffer_index);
-       if (ret)
-               orangefs_bufmap_unref(bufmap);
-       *mapp = bufmap;
        return ret;
 }
 
 void orangefs_readdir_index_put(struct orangefs_bufmap *bufmap, int buffer_index)
 {
-       struct slot_args slargs;
-
-       slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT;
-       slargs.slot_array = bufmap->readdir_index_array;
-       slargs.slot_lock = &bufmap->readdir_index_lock;
-       slargs.slot_wq = &readdir_waitq;
-       put_back_slot(&slargs, buffer_index);
-       orangefs_bufmap_unref(bufmap);
+       put(&readdir_map, buffer_index);
 }
 
 /*
index 2a2d426..f0684f0 100644 (file)
@@ -15,10 +15,10 @@ int orangefs_bufmap_shift_query(void);
 
 int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc);
 
-int orangefs_get_bufmap_init(void);
-
 void orangefs_bufmap_finalize(void);
 
+void orangefs_bufmap_run_down(void);
+
 int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index);
 
 void orangefs_bufmap_put(int buffer_index);
index 378cdcf..36eedd6 100644 (file)
@@ -155,67 +155,6 @@ retry_servicing:
                         * system
                         */
                        goto retry_servicing;
-
-               /* op uses shared memory */
-               if (orangefs_get_bufmap_init() == 0) {
-                       WARN_ON(1);
-                       /*
-                        * This operation uses the shared memory system AND
-                        * the system is not yet ready. This situation occurs
-                        * when the client-core is restarted AND there were
-                        * operations waiting to be processed or were already
-                        * in process.
-                        */
-                       gossip_debug(GOSSIP_WAIT_DEBUG,
-                                    "uses_shared_memory is true.\n");
-                       gossip_debug(GOSSIP_WAIT_DEBUG,
-                                    "Client core in-service status(%d).\n",
-                                    is_daemon_in_service());
-                       gossip_debug(GOSSIP_WAIT_DEBUG, "bufmap_init:%d.\n",
-                                    orangefs_get_bufmap_init());
-                       gossip_debug(GOSSIP_WAIT_DEBUG,
-                                    "operation's status is 0x%0x.\n",
-                                    op->op_state);
-
-                       /*
-                        * let process sleep for a few seconds so shared
-                        * memory system can be initialized.
-                        */
-                       prepare_to_wait(&orangefs_bufmap_init_waitq,
-                                       &wait_entry,
-                                       TASK_INTERRUPTIBLE);
-
-                       /*
-                        * Wait for orangefs_bufmap_initialize() to wake me up
-                        * within the allotted time.
-                        */
-                       ret = schedule_timeout(
-                               ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ);
-
-                       gossip_debug(GOSSIP_WAIT_DEBUG,
-                                    "Value returned from schedule_timeout:"
-                                    "%d.\n",
-                                    ret);
-                       gossip_debug(GOSSIP_WAIT_DEBUG,
-                                    "Is shared memory available? (%d).\n",
-                                    orangefs_get_bufmap_init());
-
-                       finish_wait(&orangefs_bufmap_init_waitq, &wait_entry);
-
-                       if (orangefs_get_bufmap_init() == 0) {
-                               gossip_err("%s:The shared memory system has not started in %d seconds after the client core restarted.  Aborting user's request(%s).\n",
-                                          __func__,
-                                          ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS,
-                                          get_opname_string(op));
-                               return -EIO;
-                       }
-
-                       /*
-                        * Return to the calling function and re-populate a
-                        * shared memory buffer.
-                        */
-                       return -EAGAIN;
-               }
        }
 
 out: