int len;
int end;
int users;
+ bool dirty;
struct connection *con;
struct list_head msgs;
struct kref ref;
memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
}
+static void dlm_page_release(struct kref *kref)
+{
+ struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
+ ref);
+
+ __free_page(e->page);
+ kfree(e);
+}
+
+static void dlm_msg_release(struct kref *kref)
+{
+ struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
+
+ kref_put(&msg->entry->ref, dlm_page_release);
+ kfree(msg);
+}
+
+static void free_entry(struct writequeue_entry *e)
+{
+ struct dlm_msg *msg, *tmp;
+
+ list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+ if (msg->orig_msg) {
+ msg->orig_msg->retransmit = false;
+ kref_put(&msg->orig_msg->ref, dlm_msg_release);
+ }
+
+ list_del(&msg->list);
+ kref_put(&msg->ref, dlm_msg_release);
+ }
+
+ list_del(&e->list);
+ atomic_dec(&e->con->writequeue_cnt);
+ kref_put(&e->ref, dlm_page_release);
+}
+
static void dlm_close_sock(struct socket **sock)
{
if (*sock) {
bool tx, bool rx)
{
bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
+ struct writequeue_entry *e;
if (tx && !closing && cancel_work_sync(&con->swork)) {
log_print("canceled swork for node %d", con->nodeid);
close_connection(con->othercon, false, tx, rx);
}
+ /* if we send a writequeue entry only a half way, we drop the
+ * whole entry because reconnection and that we not start of the
+ * middle of a msg which will confuse the other end.
+ *
+ * we can always drop messages because retransmits, but what we
+ * cannot allow is to transmit half messages which may be processed
+ * at the other side.
+ *
+ * our policy is to start on a clean state when disconnects, we don't
+ * know what's send/received on transport layer in this case.
+ */
+ spin_lock(&con->writequeue_lock);
+ if (!list_empty(&con->writequeue)) {
+ e = list_first_entry(&con->writequeue, struct writequeue_entry,
+ list);
+ if (e->dirty)
+ free_entry(e);
+ }
+ spin_unlock(&con->writequeue_lock);
+
con->rx_leftover = 0;
con->retries = 0;
clear_bit(CF_CONNECTED, &con->flags);
return result;
}
-static void dlm_page_release(struct kref *kref)
-{
- struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
- ref);
-
- __free_page(e->page);
- kfree(e);
-}
-
-static void dlm_msg_release(struct kref *kref)
-{
- struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
-
- kref_put(&msg->entry->ref, dlm_page_release);
- kfree(msg);
-}
-
-static void free_entry(struct writequeue_entry *e)
-{
- struct dlm_msg *msg, *tmp;
-
- list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
- if (msg->orig_msg) {
- msg->orig_msg->retransmit = false;
- kref_put(&msg->orig_msg->ref, dlm_msg_release);
- }
- list_del(&msg->list);
- kref_put(&msg->ref, dlm_msg_release);
- }
-
- list_del(&e->list);
- atomic_dec(&e->con->writequeue_cnt);
- kref_put(&e->ref, dlm_page_release);
-}
-
/*
* writequeue_entry_complete - try to delete and free write queue entry
* @e: write queue entry to try to delete
{
e->offset += completed;
e->len -= completed;
+ /* signal that page was half way transmitted */
+ e->dirty = true;
if (e->len == 0 && e->users == 0)
free_entry(e);