#define CF_CONNECTED 10
#define CF_RECONNECT 11
#define CF_DELAY_CONNECT 12
+#define CF_EOF 13
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
+ atomic_t writequeue_cnt;
void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
+ bool (*eof_condition)(struct connection *con); /* What to do to eof check */
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
return NULL;
}
+static bool tcp_eof_condition(struct connection *con)
+{
+ return atomic_read(&con->writequeue_cnt);
+}
+
static int dlm_con_init(struct connection *con, int nodeid)
{
con->rx_buflen = dlm_config.ci_buffer_size;
mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue);
spin_lock_init(&con->writequeue_lock);
+ atomic_set(&con->writequeue_cnt, 0);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
init_waitqueue_head(&con->shutdown_wait);
if (dlm_config.ci_protocol == 0) {
con->connect_action = tcp_connect_to_sock;
con->shutdown_action = dlm_tcp_shutdown;
+ con->eof_condition = tcp_eof_condition;
} else {
con->connect_action = sctp_connect_to_sock;
}
clear_bit(CF_CONNECTED, &con->flags);
clear_bit(CF_DELAY_CONNECT, &con->flags);
clear_bit(CF_RECONNECT, &con->flags);
+ clear_bit(CF_EOF, &con->flags);
mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
}
return -EAGAIN;
out_close:
- mutex_unlock(&con->sock_mutex);
if (ret == 0) {
- close_connection(con, false, true, false);
log_print("connection %p got EOF from %d",
con, con->nodeid);
- /* handling for tcp shutdown */
- clear_bit(CF_SHUTDOWN, &con->flags);
- wake_up(&con->shutdown_wait);
+
+ if (con->eof_condition && con->eof_condition(con)) {
+ set_bit(CF_EOF, &con->flags);
+ mutex_unlock(&con->sock_mutex);
+ } else {
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, true, false);
+
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ }
+
/* signal to breaking receive worker */
ret = -1;
+ } else {
+ mutex_unlock(&con->sock_mutex);
}
return ret;
}
if (e->len == 0 && e->users == 0) {
list_del(&e->list);
+ atomic_dec(&e->con->writequeue_cnt);
free_entry(e);
}
}
*ppc = page_address(e->page);
e->end += len;
+ atomic_inc(&con->writequeue_cnt);
spin_lock(&con->writequeue_lock);
list_add_tail(&e->list, &con->writequeue);
writequeue_entry_complete(e, ret);
}
spin_unlock(&con->writequeue_lock);
+
+ /* close if we got EOF */
+ if (test_and_clear_bit(CF_EOF, &con->flags)) {
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, false, true);
+
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ } else {
+ mutex_unlock(&con->sock_mutex);
+ }
+
+ return;
+
out:
mutex_unlock(&con->sock_mutex);
return;