fs: dlm: simplify writequeue handling
authorAlexander Aring <aahringo@redhat.com>
Mon, 1 Mar 2021 22:05:16 +0000 (17:05 -0500)
committerDavid Teigland <teigland@redhat.com>
Tue, 9 Mar 2021 14:56:42 +0000 (08:56 -0600)
This patch cleans up the current dlm sending allocator handling by using
some named macros, list functionality and removes some goto statements.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/lowcomms.c

index 97d0c93..a97c69d 100644 (file)
@@ -102,6 +102,9 @@ struct listen_connection {
        struct work_struct rwork;
 };
 
+#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
+#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
+
 /* An entry waiting to be sent */
 struct writequeue_entry {
        struct list_head list;
@@ -1351,7 +1354,7 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
 {
        struct writequeue_entry *entry;
 
-       entry = kmalloc(sizeof(struct writequeue_entry), allocation);
+       entry = kzalloc(sizeof(*entry), allocation);
        if (!entry)
                return NULL;
 
@@ -1361,20 +1364,48 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
                return NULL;
        }
 
-       entry->offset = 0;
-       entry->len = 0;
-       entry->end = 0;
-       entry->users = 0;
        entry->con = con;
+       entry->users = 1;
 
        return entry;
 }
 
+static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
+                                            gfp_t allocation, char **ppc)
+{
+       struct writequeue_entry *e;
+
+       spin_lock(&con->writequeue_lock);
+       if (!list_empty(&con->writequeue)) {
+               e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
+               if (DLM_WQ_REMAIN_BYTES(e) >= len) {
+                       *ppc = page_address(e->page) + e->end;
+                       e->end += len;
+                       e->users++;
+                       spin_unlock(&con->writequeue_lock);
+
+                       return e;
+               }
+       }
+       spin_unlock(&con->writequeue_lock);
+
+       e = new_writequeue_entry(con, allocation);
+       if (!e)
+               return NULL;
+
+       *ppc = page_address(e->page);
+       e->end += len;
+
+       spin_lock(&con->writequeue_lock);
+       list_add_tail(&e->list, &con->writequeue);
+       spin_unlock(&con->writequeue_lock);
+
+       return e;
+};
+
 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
 {
        struct connection *con;
-       struct writequeue_entry *e;
-       int offset = 0;
 
        if (len > DEFAULT_BUFFER_SIZE ||
            len < sizeof(struct dlm_header)) {
@@ -1388,35 +1419,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
        if (!con)
                return NULL;
 
-       spin_lock(&con->writequeue_lock);
-       e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
-       if ((&e->list == &con->writequeue) ||
-           (PAGE_SIZE - e->end < len)) {
-               e = NULL;
-       } else {
-               offset = e->end;
-               e->end += len;
-               e->users++;
-       }
-       spin_unlock(&con->writequeue_lock);
-
-       if (e) {
-       got_one:
-               *ppc = page_address(e->page) + offset;
-               return e;
-       }
-
-       e = new_writequeue_entry(con, allocation);
-       if (e) {
-               spin_lock(&con->writequeue_lock);
-               offset = e->end;
-               e->end += len;
-               e->users++;
-               list_add_tail(&e->list, &con->writequeue);
-               spin_unlock(&con->writequeue_lock);
-               goto got_one;
-       }
-       return NULL;
+       return new_wq_entry(con, len, allocation, ppc);
 }
 
 void dlm_lowcomms_commit_buffer(void *mh)
@@ -1429,7 +1432,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
        users = --e->users;
        if (users)
                goto out;
-       e->len = e->end - e->offset;
+
+       e->len = DLM_WQ_LENGTH_BYTES(e);
        spin_unlock(&con->writequeue_lock);
 
        queue_work(send_workqueue, &con->swork);
@@ -1455,11 +1459,10 @@ static void send_to_sock(struct connection *con)
 
        spin_lock(&con->writequeue_lock);
        for (;;) {
-               e = list_entry(con->writequeue.next, struct writequeue_entry,
-                              list);
-               if ((struct list_head *) e == &con->writequeue)
+               if (list_empty(&con->writequeue))
                        break;
 
+               e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
                len = e->len;
                offset = e->offset;
                BUG_ON(len == 0 && e->users == 0);