/* Number of messages to send before rescheduling */
#define MAX_SEND_MSG_COUNT 25
-#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct connection {
struct socket *sock; /* NULL if not connected */
#define CF_CLOSE 6
#define CF_APP_LIMITED 7
#define CF_CLOSING 8
-#define CF_SHUTDOWN 9
-#define CF_CONNECTED 10
-#define CF_RECONNECT 11
-#define CF_DELAY_CONNECT 12
-#define CF_EOF 13
+#define CF_CONNECTED 9
+#define CF_RECONNECT 10
+#define CF_DELAY_CONNECT 11
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
- atomic_t writequeue_cnt;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
struct connection *sendcon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
- wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
unsigned char *rx_buf;
int rx_buflen;
int rx_leftover;
int (*listen_validate)(void);
void (*listen_sockopts)(struct socket *sock);
int (*listen_bind)(struct socket *sock);
- /* What to do to shutdown */
- void (*shutdown_action)(struct connection *con);
- /* What to do to eof check */
- bool (*eof_condition)(struct connection *con);
};
static struct listen_sock_callbacks {
return NULL;
}
-static bool tcp_eof_condition(struct connection *con)
-{
- return atomic_read(&con->writequeue_cnt);
-}
-
static int dlm_con_init(struct connection *con, int nodeid)
{
con->rx_buflen = dlm_config.ci_buffer_size;
mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue);
spin_lock_init(&con->writequeue_lock);
- atomic_set(&con->writequeue_cnt, 0);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
- init_waitqueue_head(&con->shutdown_wait);
return 0;
}
}
list_del(&e->list);
- atomic_dec(&e->con->writequeue_cnt);
kref_put(&e->ref, dlm_page_release);
}
clear_bit(CF_CONNECTED, &con->flags);
clear_bit(CF_DELAY_CONNECT, &con->flags);
clear_bit(CF_RECONNECT, &con->flags);
- clear_bit(CF_EOF, &con->flags);
mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
}
-static void shutdown_connection(struct connection *con)
-{
- int ret;
-
- flush_work(&con->swork);
-
- mutex_lock(&con->sock_mutex);
- /* nothing to shutdown */
- if (!con->sock) {
- mutex_unlock(&con->sock_mutex);
- return;
- }
-
- set_bit(CF_SHUTDOWN, &con->flags);
- ret = kernel_sock_shutdown(con->sock, SHUT_WR);
- mutex_unlock(&con->sock_mutex);
- if (ret) {
- log_print("Connection %p failed to shutdown: %d will force close",
- con, ret);
- goto force_close;
- } else {
- ret = wait_event_timeout(con->shutdown_wait,
- !test_bit(CF_SHUTDOWN, &con->flags),
- DLM_SHUTDOWN_WAIT_TIMEOUT);
- if (ret == 0) {
- log_print("Connection %p shutdown timed out, will force close",
- con);
- goto force_close;
- }
- }
-
- return;
-
-force_close:
- clear_bit(CF_SHUTDOWN, &con->flags);
- close_connection(con, false, true, true);
-}
-
-static void dlm_tcp_shutdown(struct connection *con)
-{
- if (con->othercon)
- shutdown_connection(con->othercon);
- shutdown_connection(con);
-}
-
static int con_realloc_receive_buf(struct connection *con, int newlen)
{
unsigned char *newbuf;
log_print("connection %p got EOF from %d",
con, con->nodeid);
- if (dlm_proto_ops->eof_condition &&
- dlm_proto_ops->eof_condition(con)) {
- set_bit(CF_EOF, &con->flags);
- mutex_unlock(&con->sock_mutex);
- } else {
- mutex_unlock(&con->sock_mutex);
- close_connection(con, false, true, false);
-
- /* handling for tcp shutdown */
- clear_bit(CF_SHUTDOWN, &con->flags);
- wake_up(&con->shutdown_wait);
- }
-
+ mutex_unlock(&con->sock_mutex);
+ close_connection(con, false, true, false);
/* signal to breaking receive worker */
ret = -1;
} else {
kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
- atomic_inc(&con->writequeue_cnt);
if (cb)
cb(data);
}
spin_unlock(&con->writequeue_lock);
- /* close if we got EOF */
- if (test_and_clear_bit(CF_EOF, &con->flags)) {
- mutex_unlock(&con->sock_mutex);
- close_connection(con, false, false, true);
-
- /* handling for tcp shutdown */
- clear_bit(CF_SHUTDOWN, &con->flags);
- wake_up(&con->shutdown_wait);
- } else {
- mutex_unlock(&con->sock_mutex);
- }
-
- return;
-
out:
mutex_unlock(&con->sock_mutex);
return;
return 0;
}
-static void shutdown_conn(struct connection *con)
-{
- if (dlm_proto_ops->shutdown_action)
- dlm_proto_ops->shutdown_action(con);
-}
-
void dlm_lowcomms_shutdown(void)
{
- int idx;
-
restore_callbacks(listen_con.sock);
if (recv_workqueue)
flush_workqueue(send_workqueue);
dlm_close_sock(&listen_con.sock);
+}
+
+void dlm_lowcomms_shutdown_node(int nodeid, bool force)
+{
+ struct connection *con;
+ int idx;
idx = srcu_read_lock(&connections_srcu);
- foreach_conn(shutdown_conn);
+ con = nodeid2con(nodeid, 0);
+ if (WARN_ON_ONCE(!con)) {
+ srcu_read_unlock(&connections_srcu, idx);
+ return;
+ }
+
+ WARN_ON_ONCE(!force && !list_empty(&con->writequeue));
+ clean_one_writequeue(con);
+ if (con->othercon)
+ clean_one_writequeue(con->othercon);
+ close_connection(con, true, true, true);
srcu_read_unlock(&connections_srcu, idx);
}
.listen_validate = dlm_tcp_listen_validate,
.listen_sockopts = dlm_tcp_listen_sockopts,
.listen_bind = dlm_tcp_listen_bind,
- .shutdown_action = dlm_tcp_shutdown,
- .eof_condition = tcp_eof_condition,
};
static int dlm_sctp_bind(struct socket *sock)