Merge tag 'dlm-5.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 13 Oct 2020 15:59:39 +0000 (08:59 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 13 Oct 2020 15:59:39 +0000 (08:59 -0700)
Pull dlm updates from David Teigland:
 "This set continues the ongoing rework of the low level communication
  layer in the dlm.

  The focus here is on improvements to connection handling, and
  reworking the receiving of messages"

* tag 'dlm-5.10' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: fix race in nodeid2con
  fs: dlm: rework receive handling
  fs: dlm: disallow buffer size below default
  fs: dlm: handle range check as callback
  fs: dlm: fix mark per nodeid setting
  fs: dlm: remove lock dependency warning
  fs: dlm: use free_con to free connection
  fs: dlm: handle possible othercon writequeues
  fs: dlm: move free writequeue into con free
  fs: dlm: fix configfs memory leak
  fs: dlm: fix dlm_local_addr memory leak
  fs: dlm: make connection hash lockless
  fs: dlm: synchronize dlm before shutdown

fs/dlm/Kconfig
fs/dlm/config.c
fs/dlm/config.h
fs/dlm/lowcomms.c
fs/dlm/midcomms.c
fs/dlm/midcomms.h

index f82a495..ee92634 100644 (file)
@@ -4,6 +4,7 @@ menuconfig DLM
        depends on INET
        depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n)
        select IP_SCTP
+       select SRCU
        help
        A general purpose distributed lock manager for kernel or userspace
        applications.
index 47f0b98..49c5f94 100644 (file)
@@ -125,7 +125,7 @@ static ssize_t cluster_cluster_name_store(struct config_item *item,
 CONFIGFS_ATTR(cluster_, cluster_name);
 
 static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
-                          int *info_field, int check_zero,
+                          int *info_field, bool (*check_cb)(unsigned int x),
                           const char *buf, size_t len)
 {
        unsigned int x;
@@ -137,7 +137,7 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
        if (rc)
                return rc;
 
-       if (check_zero && !x)
+       if (check_cb && check_cb(x))
                return -EINVAL;
 
        *cl_field = x;
@@ -146,13 +146,13 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
        return len;
 }
 
-#define CLUSTER_ATTR(name, check_zero)                                        \
+#define CLUSTER_ATTR(name, check_cb)                                          \
 static ssize_t cluster_##name##_store(struct config_item *item, \
                const char *buf, size_t len) \
 {                                                                             \
        struct dlm_cluster *cl = config_item_to_cluster(item);                \
        return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name,         \
-                          check_zero, buf, len);                             \
+                          check_cb, buf, len);                               \
 }                                                                             \
 static ssize_t cluster_##name##_show(struct config_item *item, char *buf)     \
 {                                                                             \
@@ -161,20 +161,30 @@ static ssize_t cluster_##name##_show(struct config_item *item, char *buf)     \
 }                                                                             \
 CONFIGFS_ATTR(cluster_, name);
 
-CLUSTER_ATTR(tcp_port, 1);
-CLUSTER_ATTR(buffer_size, 1);
-CLUSTER_ATTR(rsbtbl_size, 1);
-CLUSTER_ATTR(recover_timer, 1);
-CLUSTER_ATTR(toss_secs, 1);
-CLUSTER_ATTR(scan_secs, 1);
-CLUSTER_ATTR(log_debug, 0);
-CLUSTER_ATTR(log_info, 0);
-CLUSTER_ATTR(protocol, 0);
-CLUSTER_ATTR(mark, 0);
-CLUSTER_ATTR(timewarn_cs, 1);
-CLUSTER_ATTR(waitwarn_us, 0);
-CLUSTER_ATTR(new_rsb_count, 0);
-CLUSTER_ATTR(recover_callbacks, 0);
+static bool dlm_check_zero(unsigned int x)
+{
+       return !x;
+}
+
+static bool dlm_check_buffer_size(unsigned int x)
+{
+       return (x < DEFAULT_BUFFER_SIZE);
+}
+
+CLUSTER_ATTR(tcp_port, dlm_check_zero);
+CLUSTER_ATTR(buffer_size, dlm_check_buffer_size);
+CLUSTER_ATTR(rsbtbl_size, dlm_check_zero);
+CLUSTER_ATTR(recover_timer, dlm_check_zero);
+CLUSTER_ATTR(toss_secs, dlm_check_zero);
+CLUSTER_ATTR(scan_secs, dlm_check_zero);
+CLUSTER_ATTR(log_debug, NULL);
+CLUSTER_ATTR(log_info, NULL);
+CLUSTER_ATTR(protocol, NULL);
+CLUSTER_ATTR(mark, NULL);
+CLUSTER_ATTR(timewarn_cs, dlm_check_zero);
+CLUSTER_ATTR(waitwarn_us, NULL);
+CLUSTER_ATTR(new_rsb_count, NULL);
+CLUSTER_ATTR(recover_callbacks, NULL);
 
 static struct configfs_attribute *cluster_attrs[] = {
        [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port,
@@ -221,6 +231,7 @@ struct dlm_space {
        struct list_head members;
        struct mutex members_lock;
        int members_count;
+       struct dlm_nodes *nds;
 };
 
 struct dlm_comms {
@@ -430,6 +441,7 @@ static struct config_group *make_space(struct config_group *g, const char *name)
        INIT_LIST_HEAD(&sp->members);
        mutex_init(&sp->members_lock);
        sp->members_count = 0;
+       sp->nds = nds;
        return &sp->group;
 
  fail:
@@ -451,6 +463,7 @@ static void drop_space(struct config_group *g, struct config_item *i)
 static void release_space(struct config_item *i)
 {
        struct dlm_space *sp = config_item_to_space(i);
+       kfree(sp->nds);
        kfree(sp);
 }
 
@@ -857,18 +870,22 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
        return 0;
 }
 
-int dlm_comm_mark(int nodeid, unsigned int *mark)
+void dlm_comm_mark(int nodeid, unsigned int *mark)
 {
        struct dlm_comm *cm;
 
        cm = get_comm(nodeid);
-       if (!cm)
-               return -ENOENT;
+       if (!cm) {
+               *mark = dlm_config.ci_mark;
+               return;
+       }
 
-       *mark = cm->mark;
-       put_comm(cm);
+       if (cm->mark)
+               *mark = cm->mark;
+       else
+               *mark = dlm_config.ci_mark;
 
-       return 0;
+       put_comm(cm);
 }
 
 int dlm_our_nodeid(void)
@@ -889,7 +906,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
 
 /* Config file defaults */
 #define DEFAULT_TCP_PORT       21064
-#define DEFAULT_BUFFER_SIZE     4096
 #define DEFAULT_RSBTBL_SIZE     1024
 #define DEFAULT_RECOVER_TIMER      5
 #define DEFAULT_TOSS_SECS         10
index f62996c..c210250 100644 (file)
@@ -12,6 +12,8 @@
 #ifndef __CONFIG_DOT_H__
 #define __CONFIG_DOT_H__
 
+#define DEFAULT_BUFFER_SIZE     4096
+
 struct dlm_config_node {
        int nodeid;
        int weight;
@@ -46,7 +48,7 @@ void dlm_config_exit(void);
 int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
                     int *count_out);
 int dlm_comm_seq(int nodeid, uint32_t *seq);
-int dlm_comm_mark(int nodeid, unsigned int *mark);
+void dlm_comm_mark(int nodeid, unsigned int *mark);
 int dlm_our_nodeid(void);
 int dlm_our_addr(struct sockaddr_storage *addr, int num);
 
index 5050fe0..79f56f1 100644 (file)
 #define MAX_SEND_MSG_COUNT 25
 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
 
-struct cbuf {
-       unsigned int base;
-       unsigned int len;
-       unsigned int mask;
-};
-
-static void cbuf_add(struct cbuf *cb, int n)
-{
-       cb->len += n;
-}
-
-static int cbuf_data(struct cbuf *cb)
-{
-       return ((cb->base + cb->len) & cb->mask);
-}
-
-static void cbuf_init(struct cbuf *cb, int size)
-{
-       cb->base = cb->len = 0;
-       cb->mask = size-1;
-}
-
-static void cbuf_eat(struct cbuf *cb, int n)
-{
-       cb->len  -= n;
-       cb->base += n;
-       cb->base &= cb->mask;
-}
-
-static bool cbuf_empty(struct cbuf *cb)
-{
-       return cb->len == 0;
-}
-
 struct connection {
        struct socket *sock;    /* NULL if not connected */
        uint32_t nodeid;        /* So we know who we are in the list */
@@ -117,8 +83,6 @@ struct connection {
        int (*rx_action) (struct connection *); /* What to do when active */
        void (*connect_action) (struct connection *);   /* What to do to connect */
        void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
-       struct page *rx_page;
-       struct cbuf cb;
        int retries;
 #define MAX_CONNECT_RETRIES 3
        struct hlist_node list;
@@ -126,6 +90,10 @@ struct connection {
        struct work_struct rwork; /* Receive workqueue */
        struct work_struct swork; /* Send workqueue */
        wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
+       unsigned char *rx_buf;
+       int rx_buflen;
+       int rx_leftover;
+       struct rcu_head rcu;
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
 
@@ -167,8 +135,8 @@ static struct workqueue_struct *recv_workqueue;
 static struct workqueue_struct *send_workqueue;
 
 static struct hlist_head connection_hash[CONN_HASH_SIZE];
-static DEFINE_MUTEX(connections_lock);
-static struct kmem_cache *con_cache;
+static DEFINE_SPINLOCK(connections_lock);
+DEFINE_STATIC_SRCU(connections_srcu);
 
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
@@ -184,15 +152,20 @@ static inline int nodeid_hash(int nodeid)
 
 static struct connection *__find_con(int nodeid)
 {
-       int r;
+       int r, idx;
        struct connection *con;
 
        r = nodeid_hash(nodeid);
 
-       hlist_for_each_entry(con, &connection_hash[r], list) {
-               if (con->nodeid == nodeid)
+       idx = srcu_read_lock(&connections_srcu);
+       hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
+               if (con->nodeid == nodeid) {
+                       srcu_read_unlock(&connections_srcu, idx);
                        return con;
+               }
        }
+       srcu_read_unlock(&connections_srcu, idx);
+
        return NULL;
 }
 
@@ -200,21 +173,25 @@ static struct connection *__find_con(int nodeid)
  * If 'allocation' is zero then we don't attempt to create a new
  * connection structure for this node.
  */
-static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
+static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 {
-       struct connection *con = NULL;
+       struct connection *con, *tmp;
        int r;
 
        con = __find_con(nodeid);
        if (con || !alloc)
                return con;
 
-       con = kmem_cache_zalloc(con_cache, alloc);
+       con = kzalloc(sizeof(*con), alloc);
        if (!con)
                return NULL;
 
-       r = nodeid_hash(nodeid);
-       hlist_add_head(&con->list, &connection_hash[r]);
+       con->rx_buflen = dlm_config.ci_buffer_size;
+       con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
+       if (!con->rx_buf) {
+               kfree(con);
+               return NULL;
+       }
 
        con->nodeid = nodeid;
        mutex_init(&con->sock_mutex);
@@ -233,31 +210,41 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
                        con->rx_action = zerocon->rx_action;
        }
 
+       r = nodeid_hash(nodeid);
+
+       spin_lock(&connections_lock);
+       /* Because multiple workqueues/threads calls this function it can
+        * race on multiple cpu's. Instead of locking hot path __find_con()
+        * we just check in rare cases of recently added nodes again
+        * under protection of connections_lock. If this is the case we
+        * abort our connection creation and return the existing connection.
+        */
+       tmp = __find_con(nodeid);
+       if (tmp) {
+               spin_unlock(&connections_lock);
+               kfree(con->rx_buf);
+               kfree(con);
+               return tmp;
+       }
+
+       hlist_add_head_rcu(&con->list, &connection_hash[r]);
+       spin_unlock(&connections_lock);
+
        return con;
 }
 
 /* Loop round all connections */
 static void foreach_conn(void (*conn_func)(struct connection *c))
 {
-       int i;
-       struct hlist_node *n;
+       int i, idx;
        struct connection *con;
 
+       idx = srcu_read_lock(&connections_srcu);
        for (i = 0; i < CONN_HASH_SIZE; i++) {
-               hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
+               hlist_for_each_entry_rcu(con, &connection_hash[i], list)
                        conn_func(con);
        }
-}
-
-static struct connection *nodeid2con(int nodeid, gfp_t allocation)
-{
-       struct connection *con;
-
-       mutex_lock(&connections_lock);
-       con = __nodeid2con(nodeid, allocation);
-       mutex_unlock(&connections_lock);
-
-       return con;
+       srcu_read_unlock(&connections_srcu, idx);
 }
 
 static struct dlm_node_addr *find_node_addr(int nodeid)
@@ -614,11 +601,8 @@ static void close_connection(struct connection *con, bool and_other,
                /* Will only re-enter once. */
                close_connection(con->othercon, false, true, true);
        }
-       if (con->rx_page) {
-               __free_page(con->rx_page);
-               con->rx_page = NULL;
-       }
 
+       con->rx_leftover = 0;
        con->retries = 0;
        mutex_unlock(&con->sock_mutex);
        clear_bit(CF_CLOSING, &con->flags);
@@ -672,16 +656,33 @@ static void dlm_tcp_shutdown(struct connection *con)
        shutdown_connection(con);
 }
 
+static int con_realloc_receive_buf(struct connection *con, int newlen)
+{
+       unsigned char *newbuf;
+
+       newbuf = kmalloc(newlen, GFP_NOFS);
+       if (!newbuf)
+               return -ENOMEM;
+
+       /* copy any leftover from last receive */
+       if (con->rx_leftover)
+               memmove(newbuf, con->rx_buf, con->rx_leftover);
+
+       /* swap to new buffer space */
+       kfree(con->rx_buf);
+       con->rx_buflen = newlen;
+       con->rx_buf = newbuf;
+
+       return 0;
+}
+
 /* Data received from remote end */
 static int receive_from_sock(struct connection *con)
 {
-       int ret = 0;
-       struct msghdr msg = {};
-       struct kvec iov[2];
-       unsigned len;
-       int r;
        int call_again_soon = 0;
-       int nvec;
+       struct msghdr msg;
+       struct kvec iov;
+       int ret, buflen;
 
        mutex_lock(&con->sock_mutex);
 
@@ -689,71 +690,55 @@ static int receive_from_sock(struct connection *con)
                ret = -EAGAIN;
                goto out_close;
        }
+
        if (con->nodeid == 0) {
                ret = -EINVAL;
                goto out_close;
        }
 
-       if (con->rx_page == NULL) {
-               /*
-                * This doesn't need to be atomic, but I think it should
-                * improve performance if it is.
-                */
-               con->rx_page = alloc_page(GFP_ATOMIC);
-               if (con->rx_page == NULL)
+       /* realloc if we get new buffer size to read out */
+       buflen = dlm_config.ci_buffer_size;
+       if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
+               ret = con_realloc_receive_buf(con, buflen);
+               if (ret < 0)
                        goto out_resched;
-               cbuf_init(&con->cb, PAGE_SIZE);
        }
 
-       /*
-        * iov[0] is the bit of the circular buffer between the current end
-        * point (cb.base + cb.len) and the end of the buffer.
-        */
-       iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
-       iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
-       iov[1].iov_len = 0;
-       nvec = 1;
-
-       /*
-        * iov[1] is the bit of the circular buffer between the start of the
-        * buffer and the start of the currently used section (cb.base)
+       /* calculate new buffer parameter regarding last receive and
+        * possible leftover bytes
         */
-       if (cbuf_data(&con->cb) >= con->cb.base) {
-               iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
-               iov[1].iov_len = con->cb.base;
-               iov[1].iov_base = page_address(con->rx_page);
-               nvec = 2;
-       }
-       len = iov[0].iov_len + iov[1].iov_len;
-       iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len);
+       iov.iov_base = con->rx_buf + con->rx_leftover;
+       iov.iov_len = con->rx_buflen - con->rx_leftover;
 
-       r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
+       memset(&msg, 0, sizeof(msg));
+       msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+       ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+                            msg.msg_flags);
        if (ret <= 0)
                goto out_close;
-       else if (ret == len)
+       else if (ret == iov.iov_len)
                call_again_soon = 1;
 
-       cbuf_add(&con->cb, ret);
-       ret = dlm_process_incoming_buffer(con->nodeid,
-                                         page_address(con->rx_page),
-                                         con->cb.base, con->cb.len,
-                                         PAGE_SIZE);
-       if (ret < 0) {
-               log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
-                         ret, page_address(con->rx_page), con->cb.base,
-                         con->cb.len, r);
-               cbuf_eat(&con->cb, r);
-       } else {
-               cbuf_eat(&con->cb, ret);
-       }
+       /* new buflen according readed bytes and leftover from last receive */
+       buflen = ret + con->rx_leftover;
+       ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
+       if (ret < 0)
+               goto out_close;
 
-       if (cbuf_empty(&con->cb) && !call_again_soon) {
-               __free_page(con->rx_page);
-               con->rx_page = NULL;
+       /* calculate leftover bytes from process and put it into begin of
+        * the receive buffer, so next receive we have the full message
+        * at the start address of the receive buffer.
+        */
+       con->rx_leftover = buflen - ret;
+       if (con->rx_leftover) {
+               memmove(con->rx_buf, con->rx_buf + ret,
+                       con->rx_leftover);
+               call_again_soon = true;
        }
 
        if (call_again_soon)
                goto out_resched;
+
        mutex_unlock(&con->sock_mutex);
        return 0;
 
@@ -791,13 +776,11 @@ static int accept_from_sock(struct connection *con)
        int nodeid;
        struct connection *newcon;
        struct connection *addcon;
+       unsigned int mark;
 
-       mutex_lock(&connections_lock);
        if (!dlm_allow_conn) {
-               mutex_unlock(&connections_lock);
                return -1;
        }
-       mutex_unlock(&connections_lock);
 
        mutex_lock_nested(&con->sock_mutex, 0);
 
@@ -830,6 +813,9 @@ static int accept_from_sock(struct connection *con)
                return -1;
        }
 
+       dlm_comm_mark(nodeid, &mark);
+       sock_set_mark(newsock->sk, mark);
+
        log_print("got connection from %d", nodeid);
 
        /*  Check to see if we already have a connection to this node. This
@@ -847,13 +833,24 @@ static int accept_from_sock(struct connection *con)
                struct connection *othercon = newcon->othercon;
 
                if (!othercon) {
-                       othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
+                       othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
                        if (!othercon) {
                                log_print("failed to allocate incoming socket");
                                mutex_unlock(&newcon->sock_mutex);
                                result = -ENOMEM;
                                goto accept_err;
                        }
+
+                       othercon->rx_buflen = dlm_config.ci_buffer_size;
+                       othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS);
+                       if (!othercon->rx_buf) {
+                               mutex_unlock(&newcon->sock_mutex);
+                               kfree(othercon);
+                               log_print("failed to allocate incoming socket receive buffer");
+                               result = -ENOMEM;
+                               goto accept_err;
+                       }
+
                        othercon->nodeid = nodeid;
                        othercon->rx_action = receive_from_sock;
                        mutex_init(&othercon->sock_mutex);
@@ -975,6 +972,8 @@ static void sctp_connect_to_sock(struct connection *con)
                return;
        }
 
+       dlm_comm_mark(con->nodeid, &mark);
+
        mutex_lock(&con->sock_mutex);
 
        /* Some odd races can cause double-connects, ignore them */
@@ -999,11 +998,6 @@ static void sctp_connect_to_sock(struct connection *con)
        if (result < 0)
                goto socket_err;
 
-       /* set skb mark */
-       result = dlm_comm_mark(con->nodeid, &mark);
-       if (result < 0)
-               goto bind_err;
-
        sock_set_mark(sock->sk, mark);
 
        con->rx_action = receive_from_sock;
@@ -1076,6 +1070,8 @@ static void tcp_connect_to_sock(struct connection *con)
                return;
        }
 
+       dlm_comm_mark(con->nodeid, &mark);
+
        mutex_lock(&con->sock_mutex);
        if (con->retries++ > MAX_CONNECT_RETRIES)
                goto out;
@@ -1090,11 +1086,6 @@ static void tcp_connect_to_sock(struct connection *con)
        if (result < 0)
                goto out_err;
 
-       /* set skb mark */
-       result = dlm_comm_mark(con->nodeid, &mark);
-       if (result < 0)
-               goto out_err;
-
        sock_set_mark(sock->sk, mark);
 
        memset(&saddr, 0, sizeof(saddr));
@@ -1238,6 +1229,14 @@ static void init_local(void)
        }
 }
 
+static void deinit_local(void)
+{
+       int i;
+
+       for (i = 0; i < dlm_local_count; i++)
+               kfree(dlm_local_addr[i]);
+}
+
 /* Initialise SCTP socket and bind to all interfaces */
 static int sctp_listen_for_all(void)
 {
@@ -1546,13 +1545,6 @@ static void process_send_sockets(struct work_struct *work)
                send_to_sock(con);
 }
 
-
-/* Discard all entries on the write queues */
-static void clean_writequeues(void)
-{
-       foreach_conn(clean_one_writequeue);
-}
-
 static void work_stop(void)
 {
        if (recv_workqueue)
@@ -1608,26 +1600,34 @@ static void shutdown_conn(struct connection *con)
                con->shutdown_action(con);
 }
 
+static void connection_release(struct rcu_head *rcu)
+{
+       struct connection *con = container_of(rcu, struct connection, rcu);
+
+       kfree(con->rx_buf);
+       kfree(con);
+}
+
 static void free_conn(struct connection *con)
 {
        close_connection(con, true, true, true);
-       if (con->othercon)
-               kmem_cache_free(con_cache, con->othercon);
-       hlist_del(&con->list);
-       kmem_cache_free(con_cache, con);
+       spin_lock(&connections_lock);
+       hlist_del_rcu(&con->list);
+       spin_unlock(&connections_lock);
+       if (con->othercon) {
+               clean_one_writequeue(con->othercon);
+               call_rcu(&con->othercon->rcu, connection_release);
+       }
+       clean_one_writequeue(con);
+       call_rcu(&con->rcu, connection_release);
 }
 
 static void work_flush(void)
 {
-       int ok;
+       int ok, idx;
        int i;
-       struct hlist_node *n;
        struct connection *con;
 
-       if (recv_workqueue)
-               flush_workqueue(recv_workqueue);
-       if (send_workqueue)
-               flush_workqueue(send_workqueue);
        do {
                ok = 1;
                foreach_conn(stop_conn);
@@ -1635,9 +1635,10 @@ static void work_flush(void)
                        flush_workqueue(recv_workqueue);
                if (send_workqueue)
                        flush_workqueue(send_workqueue);
+               idx = srcu_read_lock(&connections_srcu);
                for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
-                       hlist_for_each_entry_safe(con, n,
-                                                 &connection_hash[i], list) {
+                       hlist_for_each_entry_rcu(con, &connection_hash[i],
+                                                list) {
                                ok &= test_bit(CF_READ_PENDING, &con->flags);
                                ok &= test_bit(CF_WRITE_PENDING, &con->flags);
                                if (con->othercon) {
@@ -1648,6 +1649,7 @@ static void work_flush(void)
                                }
                        }
                }
+               srcu_read_unlock(&connections_srcu, idx);
        } while (!ok);
 }
 
@@ -1656,16 +1658,18 @@ void dlm_lowcomms_stop(void)
        /* Set all the flags to prevent any
           socket activity.
        */
-       mutex_lock(&connections_lock);
        dlm_allow_conn = 0;
-       mutex_unlock(&connections_lock);
+
+       if (recv_workqueue)
+               flush_workqueue(recv_workqueue);
+       if (send_workqueue)
+               flush_workqueue(send_workqueue);
+
        foreach_conn(shutdown_conn);
        work_flush();
-       clean_writequeues();
        foreach_conn(free_conn);
        work_stop();
-
-       kmem_cache_destroy(con_cache);
+       deinit_local();
 }
 
 int dlm_lowcomms_start(void)
@@ -1684,16 +1688,9 @@ int dlm_lowcomms_start(void)
                goto fail;
        }
 
-       error = -ENOMEM;
-       con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
-                                     __alignof__(struct connection), 0,
-                                     NULL);
-       if (!con_cache)
-               goto fail;
-
        error = work_start();
        if (error)
-               goto fail_destroy;
+               goto fail;
 
        dlm_allow_conn = 1;
 
@@ -1710,12 +1707,8 @@ int dlm_lowcomms_start(void)
 fail_unlisten:
        dlm_allow_conn = 0;
        con = nodeid2con(0,0);
-       if (con) {
-               close_connection(con, false, true, true);
-               kmem_cache_free(con_cache, con);
-       }
-fail_destroy:
-       kmem_cache_destroy(con_cache);
+       if (con)
+               free_conn(con);
 fail:
        return error;
 }
index 921322d..fde3a6a 100644 (file)
  * into packets and sends them to the comms layer.
  */
 
+#include <asm/unaligned.h>
+
 #include "dlm_internal.h"
 #include "lowcomms.h"
 #include "config.h"
 #include "lock.h"
 #include "midcomms.h"
 
-
-static void copy_from_cb(void *dst, const void *base, unsigned offset,
-                        unsigned len, unsigned limit)
-{
-       unsigned copy = len;
-
-       if ((copy + offset) > limit)
-               copy = limit - offset;
-       memcpy(dst, base + offset, copy);
-       len -= copy;
-       if (len)
-               memcpy(dst + copy, base, len);
-}
-
 /*
  * Called from the low-level comms layer to process a buffer of
  * commands.
- *
- * Only complete messages are processed here, any "spare" bytes from
- * the end of a buffer are saved and tacked onto the front of the next
- * message that comes in. I doubt this will happen very often but we
- * need to be able to cope with it and I don't want the task to be waiting
- * for packets to come in when there is useful work to be done.
  */
 
-int dlm_process_incoming_buffer(int nodeid, const void *base,
-                               unsigned offset, unsigned len, unsigned limit)
+int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 {
-       union {
-               unsigned char __buf[DLM_INBUF_LEN];
-               /* this is to force proper alignment on some arches */
-               union dlm_packet p;
-       } __tmp;
-       union dlm_packet *p = &__tmp.p;
-       int ret = 0;
-       int err = 0;
+       const unsigned char *ptr = buf;
+       const struct dlm_header *hd;
        uint16_t msglen;
-       uint32_t lockspace;
-
-       while (len > sizeof(struct dlm_header)) {
-
-               /* Copy just the header to check the total length.  The
-                  message may wrap around the end of the buffer back to the
-                  start, so we need to use a temp buffer and copy_from_cb. */
-
-               copy_from_cb(p, base, offset, sizeof(struct dlm_header),
-                            limit);
-
-               msglen = le16_to_cpu(p->header.h_length);
-               lockspace = p->header.h_lockspace;
+       int ret = 0;
 
-               err = -EINVAL;
-               if (msglen < sizeof(struct dlm_header))
-                       break;
-               if (p->header.h_cmd == DLM_MSG) {
-                       if (msglen < sizeof(struct dlm_message))
-                               break;
-               } else {
-                       if (msglen < sizeof(struct dlm_rcom))
-                               break;
-               }
-               err = -E2BIG;
-               if (msglen > dlm_config.ci_buffer_size) {
-                       log_print("message size %d from %d too big, buf len %d",
-                                 msglen, nodeid, len);
-                       break;
+       while (len >= sizeof(struct dlm_header)) {
+               hd = (struct dlm_header *)ptr;
+
+               /* no message should be more than this otherwise we
+                * cannot deliver this message to upper layers
+                */
+               msglen = get_unaligned_le16(&hd->h_length);
+               if (msglen > DEFAULT_BUFFER_SIZE) {
+                       log_print("received invalid length header: %u, will abort message parsing",
+                                 msglen);
+                       return -EBADMSG;
                }
-               err = 0;
-
-               /* If only part of the full message is contained in this
-                  buffer, then do nothing and wait for lowcomms to call
-                  us again later with more data.  We return 0 meaning
-                  we've consumed none of the input buffer. */
 
+               /* caller will take care that leftover
+                * will be parsed next call with more data
+                */
                if (msglen > len)
                        break;
 
-               /* Allocate a larger temp buffer if the full message won't fit
-                  in the buffer on the stack (which should work for most
-                  ordinary messages). */
-
-               if (msglen > sizeof(__tmp) && p == &__tmp.p) {
-                       p = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
-                       if (p == NULL)
-                               return ret;
-               }
+               switch (hd->h_cmd) {
+               case DLM_MSG:
+                       if (msglen < sizeof(struct dlm_message)) {
+                               log_print("dlm msg too small: %u, will skip this message",
+                                         msglen);
+                               goto skip;
+                       }
 
-               copy_from_cb(p, base, offset, msglen, limit);
+                       break;
+               case DLM_RCOM:
+                       if (msglen < sizeof(struct dlm_rcom)) {
+                               log_print("dlm rcom msg too small: %u, will skip this message",
+                                         msglen);
+                               goto skip;
+                       }
 
-               BUG_ON(lockspace != p->header.h_lockspace);
+                       break;
+               default:
+                       log_print("unsupported h_cmd received: %u, will skip this message",
+                                 hd->h_cmd);
+                       goto skip;
+               }
 
+               /* for aligned memory access, we just copy current message
+                * to begin of the buffer which contains already parsed buffer
+                * data and should provide align access for upper layers
+                * because the start address of the buffer has a aligned
+                * address. This memmove can be removed when the upperlayer
+                * is capable of unaligned memory access.
+                */
+               memmove(buf, ptr, msglen);
+               dlm_receive_buffer((union dlm_packet *)buf, nodeid);
+
+skip:
                ret += msglen;
-               offset += msglen;
-               offset &= (limit - 1);
                len -= msglen;
-
-               dlm_receive_buffer(p, nodeid);
+               ptr += msglen;
        }
 
-       if (p != &__tmp.p)
-               kfree(p);
-
-       return err ? err : ret;
+       return ret;
 }
 
index 2e122e8..61e90a9 100644 (file)
@@ -12,8 +12,7 @@
 #ifndef __MIDCOMMS_DOT_H__
 #define __MIDCOMMS_DOT_H__
 
-int dlm_process_incoming_buffer(int nodeid, const void *base, unsigned offset,
-                               unsigned len, unsigned limit);
+int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
 
 #endif                         /* __MIDCOMMS_DOT_H__ */