fs: dlm: don't allow half transmitted messages
authorAlexander Aring <aahringo@redhat.com>
Fri, 21 May 2021 19:08:48 +0000 (15:08 -0400)
committerDavid Teigland <teigland@redhat.com>
Tue, 25 May 2021 14:22:20 +0000 (09:22 -0500)
This patch will clean a dirty page buffer if a reconnect occurs. If a page
buffer was half transmitted we cannot start inside the middle of a dlm
message if a node connects again. I observed invalid length receptions
errors and was guessing that this behaviour occurs, after this patch I
never saw an invalid message length again. This patch might drops more
messages for dlm version 3.1 but 3.1 can't deal with half messages as
well, for 3.2 it might trigger more re-transmissions but will not leave dlm
in a broken state.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/lowcomms.c

index fe9113b..36adccc 100644 (file)
@@ -118,6 +118,7 @@ struct writequeue_entry {
        int len;
        int end;
        int users;
+       bool dirty;
        struct connection *con;
        struct list_head msgs;
        struct kref ref;
@@ -700,6 +701,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 }
 
+static void dlm_page_release(struct kref *kref)
+{
+       struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
+                                                 ref);
+
+       __free_page(e->page);
+       kfree(e);
+}
+
+static void dlm_msg_release(struct kref *kref)
+{
+       struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
+
+       kref_put(&msg->entry->ref, dlm_page_release);
+       kfree(msg);
+}
+
+static void free_entry(struct writequeue_entry *e)
+{
+       struct dlm_msg *msg, *tmp;
+
+       list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+               if (msg->orig_msg) {
+                       msg->orig_msg->retransmit = false;
+                       kref_put(&msg->orig_msg->ref, dlm_msg_release);
+               }
+
+               list_del(&msg->list);
+               kref_put(&msg->ref, dlm_msg_release);
+       }
+
+       list_del(&e->list);
+       atomic_dec(&e->con->writequeue_cnt);
+       kref_put(&e->ref, dlm_page_release);
+}
+
 static void dlm_close_sock(struct socket **sock)
 {
        if (*sock) {
@@ -714,6 +751,7 @@ static void close_connection(struct connection *con, bool and_other,
                             bool tx, bool rx)
 {
        bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
+       struct writequeue_entry *e;
 
        if (tx && !closing && cancel_work_sync(&con->swork)) {
                log_print("canceled swork for node %d", con->nodeid);
@@ -732,6 +770,26 @@ static void close_connection(struct connection *con, bool and_other,
                close_connection(con->othercon, false, tx, rx);
        }
 
+       /* if we send a writequeue entry only a half way, we drop the
+        * whole entry because reconnection and that we not start of the
+        * middle of a msg which will confuse the other end.
+        *
+        * we can always drop messages because retransmits, but what we
+        * cannot allow is to transmit half messages which may be processed
+        * at the other side.
+        *
+        * our policy is to start on a clean state when disconnects, we don't
+        * know what's send/received on transport layer in this case.
+        */
+       spin_lock(&con->writequeue_lock);
+       if (!list_empty(&con->writequeue)) {
+               e = list_first_entry(&con->writequeue, struct writequeue_entry,
+                                    list);
+               if (e->dirty)
+                       free_entry(e);
+       }
+       spin_unlock(&con->writequeue_lock);
+
        con->rx_leftover = 0;
        con->retries = 0;
        clear_bit(CF_CONNECTED, &con->flags);
@@ -1026,41 +1084,6 @@ accept_err:
        return result;
 }
 
-static void dlm_page_release(struct kref *kref)
-{
-       struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
-                                                 ref);
-
-       __free_page(e->page);
-       kfree(e);
-}
-
-static void dlm_msg_release(struct kref *kref)
-{
-       struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
-
-       kref_put(&msg->entry->ref, dlm_page_release);
-       kfree(msg);
-}
-
-static void free_entry(struct writequeue_entry *e)
-{
-       struct dlm_msg *msg, *tmp;
-
-       list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
-               if (msg->orig_msg) {
-                       msg->orig_msg->retransmit = false;
-                       kref_put(&msg->orig_msg->ref, dlm_msg_release);
-               }
-               list_del(&msg->list);
-               kref_put(&msg->ref, dlm_msg_release);
-       }
-
-       list_del(&e->list);
-       atomic_dec(&e->con->writequeue_cnt);
-       kref_put(&e->ref, dlm_page_release);
-}
-
 /*
  * writequeue_entry_complete - try to delete and free write queue entry
  * @e: write queue entry to try to delete
@@ -1072,6 +1095,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
 {
        e->offset += completed;
        e->len -= completed;
+       /* signal that page was half way transmitted */
+       e->dirty = true;
 
        if (e->len == 0 && e->users == 0)
                free_entry(e);