From 0ef897be12b8b4cf297b6016e79ec97ec90f2cf6 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Thu, 15 Feb 2018 10:40:50 +0100 Subject: [PATCH] tipc: separate topology server listener socket from subcsriber sockets We move the listener socket to struct tipc_server and give it its own work item. This makes it easier to follow the code, and entails some simplifications in the reception code in subscriber sockets. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/server.c | 328 ++++++++++++++++++++++++------------------------------ 1 file changed, 147 insertions(+), 181 deletions(-) diff --git a/net/tipc/server.c b/net/tipc/server.c index 0abbdd6..0e351e8 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -64,7 +64,6 @@ * @tipc_conn_new: callback will be called when new connection is incoming * @tipc_conn_release: callback will be called before releasing the connection * @tipc_conn_recvmsg: callback will be called when message arrives - * @saddr: TIPC server address * @name: server name * @imp: message importance * @type: socket type @@ -74,10 +73,11 @@ struct tipc_server { spinlock_t idr_lock; /* for idr list */ int idr_in_use; struct net *net; + struct work_struct awork; struct workqueue_struct *rcv_wq; struct workqueue_struct *send_wq; int max_rcvbuf_size; - struct sockaddr_tipc *saddr; + struct socket *listener; char name[TIPC_SERVER_NAME_LEN]; }; @@ -106,7 +106,6 @@ struct tipc_conn { struct list_head sub_list; spinlock_t sub_lock; /* for subscription list */ struct work_struct rwork; - int (*rx_action) (struct tipc_conn *con); struct list_head outqueue; spinlock_t outqueue_lock; struct work_struct swork; @@ -121,12 +120,6 @@ struct outqueue_entry { static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); -static void tipc_clean_outqueues(struct tipc_conn *con); - -static struct tipc_conn *sock2con(struct sock *sk) -{ - return sk->sk_user_data; -} static bool connected(struct tipc_conn *con) { @@ -137,21 +130,21 @@ static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); struct tipc_server *s = con->server; - struct socket *sock = con->sock; + struct outqueue_entry *e, *safe; - if (sock) { - if (test_bit(CF_SERVER, &con->flags)) { - __module_get(sock->ops->owner); - __module_get(sock->sk->sk_prot_creator->owner); - } - sock_release(sock); - con->sock = NULL; - } spin_lock_bh(&s->idr_lock); idr_remove(&s->conn_idr, con->conid); s->idr_in_use--; spin_unlock_bh(&s->idr_lock); - tipc_clean_outqueues(con); + if (con->sock) + sock_release(con->sock); + + spin_lock_bh(&con->outqueue_lock); + list_for_each_entry_safe(e, safe, &con->outqueue, list) { + list_del(&e->list); + kfree(e); + } + spin_unlock_bh(&con->outqueue_lock); kfree(con); } @@ -178,14 +171,14 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) } /* sock_data_ready - interrupt callback indicating the socket has data to read - * The queued job is launched in tipc_recv_from_sock() + * The queued work is launched into tipc_recv_work()->tipc_recv_from_sock() */ static void sock_data_ready(struct sock *sk) { struct tipc_conn *con; read_lock_bh(&sk->sk_callback_lock); - con = sock2con(sk); + con = sk->sk_user_data; if (connected(con)) { conn_get(con); if (!queue_work(con->server->rcv_wq, &con->rwork)) @@ -195,15 +188,15 @@ static void sock_data_ready(struct sock *sk) } /* sock_write_space - interrupt callback after a sendmsg EAGAIN - * Indicates that there now is more is space in the send buffer - * The queued job is launched in tipc_send_to_sock() + * Indicates that there now is more space in the send buffer + * The queued work is launched into tipc_send_work()->tipc_send_to_sock() */ static void sock_write_space(struct sock *sk) { struct tipc_conn *con; read_lock_bh(&sk->sk_callback_lock); - con = sock2con(sk); + con = sk->sk_user_data; if (connected(con)) { conn_get(con); if (!queue_work(con->server->send_wq, &con->swork)) @@ -212,23 +205,8 @@ static void sock_write_space(struct sock *sk) read_unlock_bh(&sk->sk_callback_lock); } -static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) -{ - struct sock *sk = sock->sk; - - write_lock_bh(&sk->sk_callback_lock); - - sk->sk_data_ready = sock_data_ready; - sk->sk_write_space = sock_write_space; - sk->sk_user_data = con; - - con->sock = sock; - - write_unlock_bh(&sk->sk_callback_lock); -} - /* tipc_con_delete_sub - delete a specific or all subscriptions - * for a given subscriber + * for a given subscriber */ static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) { @@ -268,6 +246,7 @@ static void tipc_close_conn(struct tipc_conn *con) /* Don't flush pending works, -just let them expire */ kernel_sock_shutdown(con->sock, SHUT_RDWR); + conn_put(con); } @@ -357,117 +336,8 @@ static int tipc_receive_from_sock(struct tipc_conn *con) return ret; } -static int tipc_accept_from_sock(struct tipc_conn *con) -{ - struct socket *sock = con->sock; - struct socket *newsock; - struct tipc_conn *newcon; - int ret; - - ret = kernel_accept(sock, &newsock, O_NONBLOCK); - if (ret < 0) - return ret; - - newcon = tipc_alloc_conn(con->server); - if (IS_ERR(newcon)) { - ret = PTR_ERR(newcon); - sock_release(newsock); - return ret; - } - - newcon->rx_action = tipc_receive_from_sock; - tipc_register_callbacks(newsock, newcon); - - /* Wake up receive process in case of 'SYN+' message */ - newsock->sk->sk_data_ready(newsock->sk); - return ret; -} - -static struct socket *tipc_create_listen_sock(struct tipc_conn *con) -{ - struct tipc_server *s = con->server; - struct socket *sock = NULL; - int imp = TIPC_CRITICAL_IMPORTANCE; - int ret; - - ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock); - if (ret < 0) - return NULL; - ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, - (char *)&imp, sizeof(imp)); - if (ret < 0) - goto create_err; - ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr)); - if (ret < 0) - goto create_err; - - con->rx_action = tipc_accept_from_sock; - ret = kernel_listen(sock, 0); - if (ret < 0) - goto create_err; - - /* As server's listening socket owner and creator is the same module, - * we have to decrease TIPC module reference count to guarantee that - * it remains zero after the server socket is created, otherwise, - * executing "rmmod" command is unable to make TIPC module deleted - * after TIPC module is inserted successfully. - * - * However, the reference count is ever increased twice in - * sock_create_kern(): one is to increase the reference count of owner - * of TIPC socket's proto_ops struct; another is to increment the - * reference count of owner of TIPC proto struct. Therefore, we must - * decrement the module reference count twice to ensure that it keeps - * zero after server's listening socket is created. Of course, we - * must bump the module reference count twice as well before the socket - * is closed. - */ - module_put(sock->ops->owner); - module_put(sock->sk->sk_prot_creator->owner); - set_bit(CF_SERVER, &con->flags); - - return sock; - -create_err: - kernel_sock_shutdown(sock, SHUT_RDWR); - sock_release(sock); - return NULL; -} - -static int tipc_open_listening_sock(struct tipc_server *s) -{ - struct socket *sock; - struct tipc_conn *con; - - con = tipc_alloc_conn(s); - if (IS_ERR(con)) - return PTR_ERR(con); - - sock = tipc_create_listen_sock(con); - if (!sock) { - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - kfree(con); - return -EINVAL; - } - - tipc_register_callbacks(sock, con); - return 0; -} - -static void tipc_clean_outqueues(struct tipc_conn *con) -{ - struct outqueue_entry *e, *safe; - - spin_lock_bh(&con->outqueue_lock); - list_for_each_entry_safe(e, safe, &con->outqueue, list) { - list_del(&e->list); - kfree(e); - } - spin_unlock_bh(&con->outqueue_lock); -} - -/* tipc_conn_queue_evt - interrupt level call from a subscription instance - * The queued job is launched in tipc_send_to_sock() +/* tipc_conn_queue_evt() - interrupt level call from a subscription instance + * The queued work is launched into tipc_send_work()->tipc_send_to_sock() */ void tipc_conn_queue_evt(struct net *net, int conid, u32 event, struct tipc_event *evt) @@ -588,9 +458,9 @@ static void tipc_send_to_sock(struct tipc_conn *con) 1, sizeof(*evt)); if (ret == -EWOULDBLOCK || ret == 0) { cond_resched(); - goto out; + return; } else if (ret < 0) { - goto err; + return tipc_close_conn(con); } } else { tipc_send_kern_top_evt(srv->net, evt); @@ -606,10 +476,6 @@ static void tipc_send_to_sock(struct tipc_conn *con) kfree(e); } spin_unlock_bh(&con->outqueue_lock); -out: - return; -err: - tipc_close_conn(con); } static void tipc_recv_work(struct work_struct *work) @@ -618,7 +484,7 @@ static void tipc_recv_work(struct work_struct *work) int count = 0; while (connected(con)) { - if (con->rx_action(con)) + if (tipc_receive_from_sock(con)) break; /* Don't flood Rx machine */ @@ -640,10 +506,113 @@ static void tipc_send_work(struct work_struct *work) conn_put(con); } -static void tipc_work_stop(struct tipc_server *s) +static void tipc_accept_from_sock(struct work_struct *work) { - destroy_workqueue(s->rcv_wq); - destroy_workqueue(s->send_wq); + struct tipc_server *srv = container_of(work, struct tipc_server, awork); + struct socket *lsock = srv->listener; + struct socket *newsock; + struct tipc_conn *con; + struct sock *newsk; + int ret; + + while (1) { + ret = kernel_accept(lsock, &newsock, O_NONBLOCK); + if (ret < 0) + return; + con = tipc_alloc_conn(srv); + if (IS_ERR(con)) { + ret = PTR_ERR(con); + sock_release(newsock); + return; + } + /* Register callbacks */ + newsk = newsock->sk; + write_lock_bh(&newsk->sk_callback_lock); + newsk->sk_data_ready = sock_data_ready; + newsk->sk_write_space = sock_write_space; + newsk->sk_user_data = con; + con->sock = newsock; + write_unlock_bh(&newsk->sk_callback_lock); + + /* Wake up receive process in case of 'SYN+' message */ + newsk->sk_data_ready(newsk); + } +} + +/* listener_sock_data_ready - interrupt callback indicating new connection + * The queued job is launched into tipc_accept_from_sock() + */ +static void listener_sock_data_ready(struct sock *sk) +{ + struct tipc_server *srv; + + read_lock_bh(&sk->sk_callback_lock); + srv = sk->sk_user_data; + if (srv->listener) + queue_work(srv->rcv_wq, &srv->awork); + read_unlock_bh(&sk->sk_callback_lock); +} + +static int tipc_create_listener_sock(struct tipc_server *srv) +{ + int imp = TIPC_CRITICAL_IMPORTANCE; + struct socket *lsock = NULL; + struct sockaddr_tipc saddr; + struct sock *sk; + int rc; + + rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock); + if (rc < 0) + return rc; + + srv->listener = lsock; + sk = lsock->sk; + write_lock_bh(&sk->sk_callback_lock); + sk->sk_data_ready = listener_sock_data_ready; + sk->sk_user_data = srv; + write_unlock_bh(&sk->sk_callback_lock); + + rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE, + (char *)&imp, sizeof(imp)); + if (rc < 0) + goto err; + + saddr.family = AF_TIPC; + saddr.addrtype = TIPC_ADDR_NAMESEQ; + saddr.addr.nameseq.type = TIPC_TOP_SRV; + saddr.addr.nameseq.lower = TIPC_TOP_SRV; + saddr.addr.nameseq.upper = TIPC_TOP_SRV; + saddr.scope = TIPC_NODE_SCOPE; + + rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr)); + if (rc < 0) + goto err; + rc = kernel_listen(lsock, 0); + if (rc < 0) + goto err; + + /* As server's listening socket owner and creator is the same module, + * we have to decrease TIPC module reference count to guarantee that + * it remains zero after the server socket is created, otherwise, + * executing "rmmod" command is unable to make TIPC module deleted + * after TIPC module is inserted successfully. + * + * However, the reference count is ever increased twice in + * sock_create_kern(): one is to increase the reference count of owner + * of TIPC socket's proto_ops struct; another is to increment the + * reference count of owner of TIPC proto struct. Therefore, we must + * decrement the module reference count twice to ensure that it keeps + * zero after server's listening socket is created. Of course, we + * must bump the module reference count twice as well before the socket + * is closed. + */ + module_put(lsock->ops->owner); + module_put(sk->sk_prot_creator->owner); + + return 0; +err: + sock_release(lsock); + return -EINVAL; } static int tipc_work_start(struct tipc_server *s) @@ -664,32 +633,26 @@ static int tipc_work_start(struct tipc_server *s) return 0; } +static void tipc_work_stop(struct tipc_server *s) +{ + destroy_workqueue(s->rcv_wq); + destroy_workqueue(s->send_wq); +} + int tipc_topsrv_start(struct net *net) { struct tipc_net *tn = tipc_net(net); const char name[] = "topology_server"; - struct sockaddr_tipc *saddr; struct tipc_server *srv; int ret; - saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); - if (!saddr) - return -ENOMEM; - saddr->family = AF_TIPC; - saddr->addrtype = TIPC_ADDR_NAMESEQ; - saddr->addr.nameseq.type = TIPC_TOP_SRV; - saddr->addr.nameseq.lower = TIPC_TOP_SRV; - saddr->addr.nameseq.upper = TIPC_TOP_SRV; - saddr->scope = TIPC_NODE_SCOPE; - srv = kzalloc(sizeof(*srv), GFP_ATOMIC); - if (!srv) { - kfree(saddr); + if (!srv) return -ENOMEM; - } - srv->net = net; - srv->saddr = saddr; - srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + + srv->net = net; + srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + INIT_WORK(&srv->awork, tipc_accept_from_sock); strncpy(srv->name, name, strlen(name) + 1); tn->topsrv = srv; @@ -703,7 +666,7 @@ int tipc_topsrv_start(struct net *net) if (ret < 0) return ret; - ret = tipc_open_listening_sock(srv); + ret = tipc_create_listener_sock(srv); if (ret < 0) tipc_work_stop(srv); @@ -713,6 +676,7 @@ int tipc_topsrv_start(struct net *net) void tipc_topsrv_stop(struct net *net) { struct tipc_server *srv = tipc_topsrv(net); + struct socket *lsock = srv->listener; struct tipc_conn *con; int id; @@ -725,10 +689,12 @@ void tipc_topsrv_stop(struct net *net) spin_lock_bh(&srv->idr_lock); } } + __module_get(lsock->ops->owner); + __module_get(lsock->sk->sk_prot_creator->owner); + sock_release(lsock); + srv->listener = NULL; spin_unlock_bh(&srv->idr_lock); - tipc_work_stop(srv); idr_destroy(&srv->conn_idr); - kfree(srv->saddr); kfree(srv); } -- 2.7.4