rxrpc: Offload the completion of service conn security to the I/O thread
authorDavid Howells <dhowells@redhat.com>
Fri, 21 Oct 2022 07:54:03 +0000 (08:54 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:32 +0000 (09:43 +0000)
Offload the completion of the challenge/response cycle on a service
connection to the I/O thread.  After the RESPONSE packet has been
successfully decrypted and verified by the work queue, offloading the
changing of the call states to the I/O thread makes iteration over the
conn's channel list simpler.

Do this by marking the RESPONSE skbuff and putting it onto the receive
queue for the I/O thread to collect.  We put it on the front of the queue
as we've already received the packet for it.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/conn_event.c
net/rxrpc/io_thread.c

index caeabd5..85671f4 100644 (file)
 #define rxrpc_skb_traces \
        EM(rxrpc_skb_eaten_by_unshare,          "ETN unshare  ") \
        EM(rxrpc_skb_eaten_by_unshare_nomem,    "ETN unshar-nm") \
+       EM(rxrpc_skb_get_conn_secured,          "GET conn-secd") \
        EM(rxrpc_skb_get_conn_work,             "GET conn-work") \
        EM(rxrpc_skb_get_local_work,            "GET locl-work") \
        EM(rxrpc_skb_get_reject_work,           "GET rej-work ") \
        EM(rxrpc_skb_new_error_report,          "NEW error-rpt") \
        EM(rxrpc_skb_new_jumbo_subpacket,       "NEW jumbo-sub") \
        EM(rxrpc_skb_new_unshared,              "NEW unshared ") \
+       EM(rxrpc_skb_put_conn_secured,          "PUT conn-secd") \
        EM(rxrpc_skb_put_conn_work,             "PUT conn-work") \
        EM(rxrpc_skb_put_error_report,          "PUT error-rep") \
        EM(rxrpc_skb_put_input,                 "PUT input    ") \
index e9ab061..e508ec2 100644 (file)
@@ -38,6 +38,7 @@ struct rxrpc_txbuf;
 enum rxrpc_skb_mark {
        RXRPC_SKB_MARK_PACKET,          /* Received packet */
        RXRPC_SKB_MARK_ERROR,           /* Error notification */
+       RXRPC_SKB_MARK_SERVICE_CONN_SECURED, /* Service connection response has been verified */
        RXRPC_SKB_MARK_REJECT_BUSY,     /* Reject with BUSY */
        RXRPC_SKB_MARK_REJECT_ABORT,    /* Reject with ABORT (code in skb->priority) */
 };
index 485d7f0..b204270 100644 (file)
@@ -248,7 +248,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
                               struct sk_buff *skb)
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-       int loop, ret;
+       int ret;
 
        if (conn->state == RXRPC_CONN_ABORTED)
                return -ECONNABORTED;
@@ -269,22 +269,21 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
                if (ret < 0)
                        return ret;
 
-               spin_lock(&conn->bundle->channel_lock);
                spin_lock(&conn->state_lock);
-
-               if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING) {
+               if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING)
                        conn->state = RXRPC_CONN_SERVICE;
-                       spin_unlock(&conn->state_lock);
-                       for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
-                               rxrpc_call_is_secure(
-                                       rcu_dereference_protected(
-                                               conn->channels[loop].call,
-                                               lockdep_is_held(&conn->bundle->channel_lock)));
-               } else {
-                       spin_unlock(&conn->state_lock);
-               }
+               spin_unlock(&conn->state_lock);
 
-               spin_unlock(&conn->bundle->channel_lock);
+               if (conn->state == RXRPC_CONN_SERVICE) {
+                       /* Offload call state flipping to the I/O thread.  As
+                        * we've already received the packet, put it on the
+                        * front of the queue.
+                        */
+                       skb->mark = RXRPC_SKB_MARK_SERVICE_CONN_SECURED;
+                       rxrpc_get_skb(skb, rxrpc_skb_get_conn_secured);
+                       skb_queue_head(&conn->local->rx_queue, skb);
+                       rxrpc_wake_up_io_thread(conn->local);
+               }
                return 0;
 
        default:
@@ -442,9 +441,28 @@ bool rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
  */
 void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
 {
+       unsigned int loop;
+
        if (test_and_clear_bit(RXRPC_CONN_EV_ABORT_CALLS, &conn->events))
                rxrpc_abort_calls(conn);
 
+       switch (skb->mark) {
+       case RXRPC_SKB_MARK_SERVICE_CONN_SECURED:
+               if (conn->state != RXRPC_CONN_SERVICE)
+                       break;
+
+               spin_lock(&conn->bundle->channel_lock);
+
+               for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
+                       rxrpc_call_is_secure(
+                               rcu_dereference_protected(
+                                       conn->channels[loop].call,
+                                       lockdep_is_held(&conn->bundle->channel_lock)));
+
+               spin_unlock(&conn->bundle->channel_lock);
+               break;
+       }
+
        /* Process delayed ACKs whose time has come. */
        if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
                rxrpc_process_delayed_final_acks(conn, false);
index 33fd239..751139b 100644 (file)
@@ -451,6 +451,7 @@ int rxrpc_io_thread(void *data)
 
                /* Process received packets and errors. */
                if ((skb = __skb_dequeue(&rx_queue))) {
+                       struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
                        switch (skb->mark) {
                        case RXRPC_SKB_MARK_PACKET:
                                skb->priority = 0;
@@ -463,6 +464,10 @@ int rxrpc_io_thread(void *data)
                                rxrpc_input_error(local, skb);
                                rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
                                break;
+                       case RXRPC_SKB_MARK_SERVICE_CONN_SECURED:
+                               rxrpc_input_conn_event(sp->conn, skb);
+                               rxrpc_put_connection(sp->conn, rxrpc_conn_put_poke);
+                               rxrpc_free_skb(skb, rxrpc_skb_put_conn_secured);
                                break;
                        default:
                                WARN_ON_ONCE(1);