rxrpc: Add notification of end-of-Tx phase
authorDavid Howells <dhowells@redhat.com>
Tue, 29 Aug 2017 09:18:56 +0000 (10:18 +0100)
committerDavid Howells <dhowells@redhat.com>
Tue, 29 Aug 2017 09:55:20 +0000 (10:55 +0100)
Add a callback to rxrpc_kernel_send_data() so that a kernel service can get
a notification that the AF_RXRPC call has transitioned out the Tx phase and
is now waiting for a reply or a final ACK.

This is called from AF_RXRPC with the call state lock held so the
notification is guaranteed to come before any reply is passed back.

Further, modify the AFS filesystem to make use of this so that we don't have
to change the afs_call state before sending the last bit of data.

Signed-off-by: David Howells <dhowells@redhat.com>
Documentation/networking/rxrpc.txt
fs/afs/rxrpc.c
include/net/af_rxrpc.h
net/rxrpc/sendmsg.c

index 8c70ba5dee4d0072da0ac8666d987248758afa0f..92a3c3bd5ac3b2e4e7bf40e9fcd3fedd3e956832 100644 (file)
@@ -818,10 +818,15 @@ The kernel interface functions are as follows:
 
  (*) Send data through a call.
 
 
  (*) Send data through a call.
 
+       typedef void (*rxrpc_notify_end_tx_t)(struct sock *sk,
+                                             unsigned long user_call_ID,
+                                             struct sk_buff *skb);
+
        int rxrpc_kernel_send_data(struct socket *sock,
                                   struct rxrpc_call *call,
                                   struct msghdr *msg,
        int rxrpc_kernel_send_data(struct socket *sock,
                                   struct rxrpc_call *call,
                                   struct msghdr *msg,
-                                  size_t len);
+                                  size_t len,
+                                  rxrpc_notify_end_tx_t notify_end_rx);
 
      This is used to supply either the request part of a client call or the
      reply part of a server call.  msg.msg_iovlen and msg.msg_iov specify the
 
      This is used to supply either the request part of a client call or the
      reply part of a server call.  msg.msg_iovlen and msg.msg_iov specify the
@@ -832,6 +837,11 @@ The kernel interface functions are as follows:
      The msg must not specify a destination address, control data or any flags
      other than MSG_MORE.  len is the total amount of data to transmit.
 
      The msg must not specify a destination address, control data or any flags
      other than MSG_MORE.  len is the total amount of data to transmit.
 
+     notify_end_rx can be NULL or it can be used to specify a function to be
+     called when the call changes state to end the Tx phase.  This function is
+     called with the call-state spinlock held to prevent any reply or final ACK
+     from being delivered first.
+
  (*) Receive data from a call.
 
        int rxrpc_kernel_recv_data(struct socket *sock,
  (*) Receive data from a call.
 
        int rxrpc_kernel_recv_data(struct socket *sock,
index 10743043d431de03dab2e4ce665aab224af0381c..0bf191f0dbafa7d65277f227cabc4578e707eac5 100644 (file)
@@ -291,6 +291,19 @@ static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
        iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
 }
 
        iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
 }
 
+/*
+ * Advance the AFS call state when the RxRPC call ends the transmit phase.
+ */
+static void afs_notify_end_request_tx(struct sock *sock,
+                                     struct rxrpc_call *rxcall,
+                                     unsigned long call_user_ID)
+{
+       struct afs_call *call = (struct afs_call *)call_user_ID;
+
+       if (call->state == AFS_CALL_REQUESTING)
+               call->state = AFS_CALL_AWAIT_REPLY;
+}
+
 /*
  * attach the data from a bunch of pages on an inode to a call
  */
 /*
  * attach the data from a bunch of pages on an inode to a call
  */
@@ -310,14 +323,8 @@ static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
                bytes = msg->msg_iter.count;
                nr = msg->msg_iter.nr_segs;
 
                bytes = msg->msg_iter.count;
                nr = msg->msg_iter.nr_segs;
 
-               /* Have to change the state *before* sending the last
-                * packet as RxRPC might give us the reply before it
-                * returns from sending the request.
-                */
-               if (first + nr - 1 >= last)
-                       call->state = AFS_CALL_AWAIT_REPLY;
-               ret = rxrpc_kernel_send_data(afs_socket, call->rxcall,
-                                            msg, bytes);
+               ret = rxrpc_kernel_send_data(afs_socket, call->rxcall, msg,
+                                            bytes, afs_notify_end_request_tx);
                for (loop = 0; loop < nr; loop++)
                        put_page(bv[loop].bv_page);
                if (ret < 0)
                for (loop = 0; loop < nr; loop++)
                        put_page(bv[loop].bv_page);
                if (ret < 0)
@@ -409,7 +416,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
        if (!call->send_pages)
                call->state = AFS_CALL_AWAIT_REPLY;
        ret = rxrpc_kernel_send_data(afs_socket, rxcall,
        if (!call->send_pages)
                call->state = AFS_CALL_AWAIT_REPLY;
        ret = rxrpc_kernel_send_data(afs_socket, rxcall,
-                                    &msg, call->request_size);
+                                    &msg, call->request_size,
+                                    afs_notify_end_request_tx);
        if (ret < 0)
                goto error_do_abort;
 
        if (ret < 0)
                goto error_do_abort;
 
@@ -740,6 +748,20 @@ static int afs_deliver_cm_op_id(struct afs_call *call)
        return call->type->deliver(call);
 }
 
        return call->type->deliver(call);
 }
 
+/*
+ * Advance the AFS call state when an RxRPC service call ends the transmit
+ * phase.
+ */
+static void afs_notify_end_reply_tx(struct sock *sock,
+                                   struct rxrpc_call *rxcall,
+                                   unsigned long call_user_ID)
+{
+       struct afs_call *call = (struct afs_call *)call_user_ID;
+
+       if (call->state == AFS_CALL_REPLYING)
+               call->state = AFS_CALL_AWAIT_ACK;
+}
+
 /*
  * send an empty reply
  */
 /*
  * send an empty reply
  */
@@ -759,7 +781,8 @@ void afs_send_empty_reply(struct afs_call *call)
        msg.msg_flags           = 0;
 
        call->state = AFS_CALL_AWAIT_ACK;
        msg.msg_flags           = 0;
 
        call->state = AFS_CALL_AWAIT_ACK;
-       switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0)) {
+       switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0,
+                                      afs_notify_end_reply_tx)) {
        case 0:
                _leave(" [replied]");
                return;
        case 0:
                _leave(" [replied]");
                return;
@@ -797,7 +820,8 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
        msg.msg_flags           = 0;
 
        call->state = AFS_CALL_AWAIT_ACK;
        msg.msg_flags           = 0;
 
        call->state = AFS_CALL_AWAIT_ACK;
-       n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len);
+       n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len,
+                                  afs_notify_end_reply_tx);
        if (n >= 0) {
                /* Success */
                _leave(" [replied]");
        if (n >= 0) {
                /* Success */
                _leave(" [replied]");
index c172709787af316efaee4cfa94cc185ca989b5fb..07a47ee6f783e449a8f7ec2216a194e81fae10a3 100644 (file)
@@ -21,6 +21,8 @@ struct rxrpc_call;
 
 typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
                                  unsigned long);
 
 typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
                                  unsigned long);
+typedef void (*rxrpc_notify_end_tx_t)(struct sock *, struct rxrpc_call *,
+                                     unsigned long);
 typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
                                        unsigned long);
 typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
 typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
                                        unsigned long);
 typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
@@ -37,7 +39,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
                                           gfp_t,
                                           rxrpc_notify_rx_t);
 int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
                                           gfp_t,
                                           rxrpc_notify_rx_t);
 int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
-                          struct msghdr *, size_t);
+                          struct msghdr *, size_t,
+                          rxrpc_notify_end_tx_t);
 int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
                           void *, size_t, size_t *, bool, u32 *);
 bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
 int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
                           void *, size_t, size_t *, bool, u32 *);
 bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
index bc7f0241d92bfc9ee0fd63f494088b334fb15c47..344fdce89823f668a8fc3e21b4b45be2dd14f918 100644 (file)
@@ -100,12 +100,24 @@ static inline void rxrpc_instant_resend(struct rxrpc_call *call, int ix)
        spin_unlock_bh(&call->lock);
 }
 
        spin_unlock_bh(&call->lock);
 }
 
+/*
+ * Notify the owner of the call that the transmit phase is ended and the last
+ * packet has been queued.
+ */
+static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
+                               rxrpc_notify_end_tx_t notify_end_tx)
+{
+       if (notify_end_tx)
+               notify_end_tx(&rx->sk, call, call->user_call_ID);
+}
+
 /*
  * Queue a DATA packet for transmission, set the resend timeout and send the
  * packet immediately
  */
 /*
  * Queue a DATA packet for transmission, set the resend timeout and send the
  * packet immediately
  */
-static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
-                              bool last)
+static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
+                              struct sk_buff *skb, bool last,
+                              rxrpc_notify_end_tx_t notify_end_tx)
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
        rxrpc_seq_t seq = sp->hdr.seq;
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
        rxrpc_seq_t seq = sp->hdr.seq;
@@ -141,6 +153,7 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
                switch (call->state) {
                case RXRPC_CALL_CLIENT_SEND_REQUEST:
                        call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
                switch (call->state) {
                case RXRPC_CALL_CLIENT_SEND_REQUEST:
                        call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
+                       rxrpc_notify_end_tx(rx, call, notify_end_tx);
                        break;
                case RXRPC_CALL_SERVER_ACK_REQUEST:
                        call->state = RXRPC_CALL_SERVER_SEND_REPLY;
                        break;
                case RXRPC_CALL_SERVER_ACK_REQUEST:
                        call->state = RXRPC_CALL_SERVER_SEND_REPLY;
@@ -153,6 +166,7 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
                                break;
                case RXRPC_CALL_SERVER_SEND_REPLY:
                        call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
                                break;
                case RXRPC_CALL_SERVER_SEND_REPLY:
                        call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
+                       rxrpc_notify_end_tx(rx, call, notify_end_tx);
                        break;
                default:
                        break;
                        break;
                default:
                        break;
@@ -189,7 +203,8 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
  */
 static int rxrpc_send_data(struct rxrpc_sock *rx,
                           struct rxrpc_call *call,
  */
 static int rxrpc_send_data(struct rxrpc_sock *rx,
                           struct rxrpc_call *call,
-                          struct msghdr *msg, size_t len)
+                          struct msghdr *msg, size_t len,
+                          rxrpc_notify_end_tx_t notify_end_tx)
 {
        struct rxrpc_skb_priv *sp;
        struct sk_buff *skb;
 {
        struct rxrpc_skb_priv *sp;
        struct sk_buff *skb;
@@ -350,7 +365,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
                        if (ret < 0)
                                goto out;
 
                        if (ret < 0)
                                goto out;
 
-                       rxrpc_queue_packet(call, skb, !msg_data_left(msg) && !more);
+                       rxrpc_queue_packet(rx, call, skb,
+                                          !msg_data_left(msg) && !more,
+                                          notify_end_tx);
                        skb = NULL;
                }
        } while (msg_data_left(msg) > 0);
                        skb = NULL;
                }
        } while (msg_data_left(msg) > 0);
@@ -611,7 +628,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
                /* Reply phase not begun or not complete for service call. */
                ret = -EPROTO;
        } else {
                /* Reply phase not begun or not complete for service call. */
                ret = -EPROTO;
        } else {
-               ret = rxrpc_send_data(rx, call, msg, len);
+               ret = rxrpc_send_data(rx, call, msg, len, NULL);
        }
 
        mutex_unlock(&call->user_mutex);
        }
 
        mutex_unlock(&call->user_mutex);
@@ -631,6 +648,7 @@ error_release_sock:
  * @call: The call to send data through
  * @msg: The data to send
  * @len: The amount of data to send
  * @call: The call to send data through
  * @msg: The data to send
  * @len: The amount of data to send
+ * @notify_end_tx: Notification that the last packet is queued.
  *
  * Allow a kernel service to send data on a call.  The call must be in an state
  * appropriate to sending data.  No control data should be supplied in @msg,
  *
  * Allow a kernel service to send data on a call.  The call must be in an state
  * appropriate to sending data.  No control data should be supplied in @msg,
@@ -638,7 +656,8 @@ error_release_sock:
  * more data to come, otherwise this data will end the transmission phase.
  */
 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
  * more data to come, otherwise this data will end the transmission phase.
  */
 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
-                          struct msghdr *msg, size_t len)
+                          struct msghdr *msg, size_t len,
+                          rxrpc_notify_end_tx_t notify_end_tx)
 {
        int ret;
 
 {
        int ret;
 
@@ -656,7 +675,8 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
        case RXRPC_CALL_CLIENT_SEND_REQUEST:
        case RXRPC_CALL_SERVER_ACK_REQUEST:
        case RXRPC_CALL_SERVER_SEND_REPLY:
        case RXRPC_CALL_CLIENT_SEND_REQUEST:
        case RXRPC_CALL_SERVER_ACK_REQUEST:
        case RXRPC_CALL_SERVER_SEND_REPLY:
-               ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len);
+               ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
+                                     notify_end_tx);
                break;
        case RXRPC_CALL_COMPLETE:
                read_lock_bh(&call->state_lock);
                break;
        case RXRPC_CALL_COMPLETE:
                read_lock_bh(&call->state_lock);