rxrpc: Split the receive code
authorDavid Howells <dhowells@redhat.com>
Thu, 23 Jan 2020 13:01:33 +0000 (13:01 +0000)
committerDavid Howells <dhowells@redhat.com>
Thu, 1 Dec 2022 13:36:40 +0000 (13:36 +0000)
Split the code that handles packet reception in softirq mode as a prelude
to moving all the packet processing beyond routing to the appropriate call
and setting up of a new call out into process context.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

net/rxrpc/Makefile
net/rxrpc/ar-internal.h
net/rxrpc/input.c
net/rxrpc/io_thread.c [new file with mode: 0644]

index 7968747..e76d345 100644 (file)
@@ -16,6 +16,7 @@ rxrpc-y := \
        conn_service.o \
        input.o \
        insecure.o \
+       io_thread.o \
        key.o \
        local_event.o \
        local_object.o \
index 41a57c1..523cc9c 100644 (file)
@@ -946,6 +946,13 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 /*
  * input.c
  */
+void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
+                                  struct rxrpc_call *);
+
+/*
+ * io_thread.c
+ */
 int rxrpc_input_packet(struct sock *, struct sk_buff *);
 
 /*
index ab8b7a1..f4f6f3c 100644 (file)
@@ -1,7 +1,7 @@
 // SPDX-License-Identifier: GPL-2.0-or-later
-/* RxRPC packet reception
+/* Processing of received RxRPC packets
  *
- * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
  * Written by David Howells (dhowells@redhat.com)
  */
 
@@ -1029,7 +1029,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
 /*
  * Process an incoming call packet.
  */
-static void rxrpc_input_call_packet(struct rxrpc_call *call,
+void rxrpc_input_call_packet(struct rxrpc_call *call,
                                    struct sk_buff *skb)
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
@@ -1086,9 +1086,9 @@ no_free:
  *
  * TODO: If callNumber > call_id + 1, renegotiate security.
  */
-static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
-                                         struct rxrpc_connection *conn,
-                                         struct rxrpc_call *call)
+void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
+                                  struct rxrpc_connection *conn,
+                                  struct rxrpc_call *call)
 {
        switch (READ_ONCE(call->state)) {
        case RXRPC_CALL_SERVER_AWAIT_ACK:
@@ -1109,363 +1109,3 @@ static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
        __rxrpc_disconnect_call(conn, call);
        spin_unlock(&rx->incoming_lock);
 }
-
-/*
- * post connection-level events to the connection
- * - this includes challenges, responses, some aborts and call terminal packet
- *   retransmission.
- */
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
-                                     struct sk_buff *skb)
-{
-       _enter("%p,%p", conn, skb);
-
-       skb_queue_tail(&conn->rx_queue, skb);
-       rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work);
-}
-
-/*
- * post endpoint-level events to the local endpoint
- * - this includes debug and version messages
- */
-static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
-                                      struct sk_buff *skb)
-{
-       _enter("%p,%p", local, skb);
-
-       if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
-               skb_queue_tail(&local->event_queue, skb);
-               rxrpc_queue_local(local);
-       } else {
-               rxrpc_free_skb(skb, rxrpc_skb_put_input);
-       }
-}
-
-/*
- * put a packet up for transport-level abort
- */
-static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
-{
-       if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
-               skb_queue_tail(&local->reject_queue, skb);
-               rxrpc_queue_local(local);
-       } else {
-               rxrpc_free_skb(skb, rxrpc_skb_put_input);
-       }
-}
-
-/*
- * Extract the wire header from a packet and translate the byte order.
- */
-static noinline
-int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
-{
-       struct rxrpc_wire_header whdr;
-
-       /* dig out the RxRPC connection details */
-       if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) {
-               trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
-                                     tracepoint_string("bad_hdr"));
-               return -EBADMSG;
-       }
-
-       memset(sp, 0, sizeof(*sp));
-       sp->hdr.epoch           = ntohl(whdr.epoch);
-       sp->hdr.cid             = ntohl(whdr.cid);
-       sp->hdr.callNumber      = ntohl(whdr.callNumber);
-       sp->hdr.seq             = ntohl(whdr.seq);
-       sp->hdr.serial          = ntohl(whdr.serial);
-       sp->hdr.flags           = whdr.flags;
-       sp->hdr.type            = whdr.type;
-       sp->hdr.userStatus      = whdr.userStatus;
-       sp->hdr.securityIndex   = whdr.securityIndex;
-       sp->hdr._rsvd           = ntohs(whdr._rsvd);
-       sp->hdr.serviceId       = ntohs(whdr.serviceId);
-       return 0;
-}
-
-/*
- * Extract the abort code from an ABORT packet and stash it in skb->priority.
- */
-static bool rxrpc_extract_abort(struct sk_buff *skb)
-{
-       __be32 wtmp;
-
-       if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
-                         &wtmp, sizeof(wtmp)) < 0)
-               return false;
-       skb->priority = ntohl(wtmp);
-       return true;
-}
-
-/*
- * handle data received on the local endpoint
- * - may be called in interrupt context
- *
- * [!] Note that as this is called from the encap_rcv hook, the socket is not
- * held locked by the caller and nothing prevents sk_user_data on the UDP from
- * being cleared in the middle of processing this function.
- *
- * Called with the RCU read lock held from the IP layer via UDP.
- */
-int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
-{
-       struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
-       struct rxrpc_connection *conn;
-       struct rxrpc_channel *chan;
-       struct rxrpc_call *call = NULL;
-       struct rxrpc_skb_priv *sp;
-       struct rxrpc_peer *peer = NULL;
-       struct rxrpc_sock *rx = NULL;
-       unsigned int channel;
-
-       _enter("%p", udp_sk);
-
-       if (unlikely(!local)) {
-               kfree_skb(skb);
-               return 0;
-       }
-       if (skb->tstamp == 0)
-               skb->tstamp = ktime_get_real();
-
-       rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
-
-       skb_pull(skb, sizeof(struct udphdr));
-
-       /* The UDP protocol already released all skb resources;
-        * we are free to add our own data there.
-        */
-       sp = rxrpc_skb(skb);
-
-       /* dig out the RxRPC connection details */
-       if (rxrpc_extract_header(sp, skb) < 0)
-               goto bad_message;
-
-       if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
-               static int lose;
-               if ((lose++ & 7) == 7) {
-                       trace_rxrpc_rx_lose(sp);
-                       rxrpc_free_skb(skb, rxrpc_skb_put_lose);
-                       return 0;
-               }
-       }
-
-       if (skb->tstamp == 0)
-               skb->tstamp = ktime_get_real();
-       trace_rxrpc_rx_packet(sp);
-
-       switch (sp->hdr.type) {
-       case RXRPC_PACKET_TYPE_VERSION:
-               if (rxrpc_to_client(sp))
-                       goto discard;
-               rxrpc_post_packet_to_local(local, skb);
-               goto out;
-
-       case RXRPC_PACKET_TYPE_BUSY:
-               if (rxrpc_to_server(sp))
-                       goto discard;
-               fallthrough;
-       case RXRPC_PACKET_TYPE_ACK:
-       case RXRPC_PACKET_TYPE_ACKALL:
-               if (sp->hdr.callNumber == 0)
-                       goto bad_message;
-               break;
-       case RXRPC_PACKET_TYPE_ABORT:
-               if (!rxrpc_extract_abort(skb))
-                       return true; /* Just discard if malformed */
-               break;
-
-       case RXRPC_PACKET_TYPE_DATA:
-               if (sp->hdr.callNumber == 0 ||
-                   sp->hdr.seq == 0)
-                       goto bad_message;
-
-               /* Unshare the packet so that it can be modified for in-place
-                * decryption.
-                */
-               if (sp->hdr.securityIndex != 0) {
-                       struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
-                       if (!nskb) {
-                               rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare_nomem);
-                               goto out;
-                       }
-
-                       if (nskb != skb) {
-                               rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare);
-                               skb = nskb;
-                               rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
-                               sp = rxrpc_skb(skb);
-                       }
-               }
-               break;
-
-       case RXRPC_PACKET_TYPE_CHALLENGE:
-               if (rxrpc_to_server(sp))
-                       goto discard;
-               break;
-       case RXRPC_PACKET_TYPE_RESPONSE:
-               if (rxrpc_to_client(sp))
-                       goto discard;
-               break;
-
-               /* Packet types 9-11 should just be ignored. */
-       case RXRPC_PACKET_TYPE_PARAMS:
-       case RXRPC_PACKET_TYPE_10:
-       case RXRPC_PACKET_TYPE_11:
-               goto discard;
-
-       default:
-               goto bad_message;
-       }
-
-       if (sp->hdr.serviceId == 0)
-               goto bad_message;
-
-       if (rxrpc_to_server(sp)) {
-               /* Weed out packets to services we're not offering.  Packets
-                * that would begin a call are explicitly rejected and the rest
-                * are just discarded.
-                */
-               rx = rcu_dereference(local->service);
-               if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
-                           sp->hdr.serviceId != rx->second_service)) {
-                       if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
-                           sp->hdr.seq == 1)
-                               goto unsupported_service;
-                       goto discard;
-               }
-       }
-
-       conn = rxrpc_find_connection_rcu(local, skb, &peer);
-       if (conn) {
-               if (sp->hdr.securityIndex != conn->security_ix)
-                       goto wrong_security;
-
-               if (sp->hdr.serviceId != conn->service_id) {
-                       int old_id;
-
-                       if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
-                               goto reupgrade;
-                       old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
-                                        sp->hdr.serviceId);
-
-                       if (old_id != conn->orig_service_id &&
-                           old_id != sp->hdr.serviceId)
-                               goto reupgrade;
-               }
-
-               if (sp->hdr.callNumber == 0) {
-                       /* Connection-level packet */
-                       _debug("CONN %p {%d}", conn, conn->debug_id);
-                       rxrpc_post_packet_to_conn(conn, skb);
-                       goto out;
-               }
-
-               if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
-                       conn->hi_serial = sp->hdr.serial;
-
-               /* Call-bound packets are routed by connection channel. */
-               channel = sp->hdr.cid & RXRPC_CHANNELMASK;
-               chan = &conn->channels[channel];
-
-               /* Ignore really old calls */
-               if (sp->hdr.callNumber < chan->last_call)
-                       goto discard;
-
-               if (sp->hdr.callNumber == chan->last_call) {
-                       if (chan->call ||
-                           sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
-                               goto discard;
-
-                       /* For the previous service call, if completed
-                        * successfully, we discard all further packets.
-                        */
-                       if (rxrpc_conn_is_service(conn) &&
-                           chan->last_type == RXRPC_PACKET_TYPE_ACK)
-                               goto discard;
-
-                       /* But otherwise we need to retransmit the final packet
-                        * from data cached in the connection record.
-                        */
-                       if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
-                               trace_rxrpc_rx_data(chan->call_debug_id,
-                                                   sp->hdr.seq,
-                                                   sp->hdr.serial,
-                                                   sp->hdr.flags);
-                       rxrpc_post_packet_to_conn(conn, skb);
-                       goto out;
-               }
-
-               call = rcu_dereference(chan->call);
-
-               if (sp->hdr.callNumber > chan->call_id) {
-                       if (rxrpc_to_client(sp))
-                               goto reject_packet;
-                       if (call)
-                               rxrpc_input_implicit_end_call(rx, conn, call);
-                       call = NULL;
-               }
-
-               if (call) {
-                       if (sp->hdr.serviceId != call->service_id)
-                               call->service_id = sp->hdr.serviceId;
-                       if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
-                               call->rx_serial = sp->hdr.serial;
-                       if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
-                               set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
-               }
-       }
-
-       if (!call || refcount_read(&call->ref) == 0) {
-               if (rxrpc_to_client(sp) ||
-                   sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
-                       goto bad_message;
-               if (sp->hdr.seq != 1)
-                       goto discard;
-               call = rxrpc_new_incoming_call(local, rx, skb);
-               if (!call)
-                       goto reject_packet;
-       }
-
-       /* Process a call packet; this either discards or passes on the ref
-        * elsewhere.
-        */
-       rxrpc_input_call_packet(call, skb);
-       goto out;
-
-discard:
-       rxrpc_free_skb(skb, rxrpc_skb_put_input);
-out:
-       trace_rxrpc_rx_done(0, 0);
-       return 0;
-
-wrong_security:
-       trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
-                         RXKADINCONSISTENCY, EBADMSG);
-       skb->priority = RXKADINCONSISTENCY;
-       goto post_abort;
-
-unsupported_service:
-       trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
-                         RX_INVALID_OPERATION, EOPNOTSUPP);
-       skb->priority = RX_INVALID_OPERATION;
-       goto post_abort;
-
-reupgrade:
-       trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
-                         RX_PROTOCOL_ERROR, EBADMSG);
-       goto protocol_error;
-
-bad_message:
-       trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
-                         RX_PROTOCOL_ERROR, EBADMSG);
-protocol_error:
-       skb->priority = RX_PROTOCOL_ERROR;
-post_abort:
-       skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
-reject_packet:
-       trace_rxrpc_rx_done(skb->mark, skb->priority);
-       rxrpc_reject_packet(local, skb);
-       _leave(" [badmsg]");
-       return 0;
-}
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
new file mode 100644 (file)
index 0000000..d2aaad5
--- /dev/null
@@ -0,0 +1,370 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/* RxRPC packet reception
+ *
+ * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ */
+
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+
+#include "ar-internal.h"
+
+/*
+ * post connection-level events to the connection
+ * - this includes challenges, responses, some aborts and call terminal packet
+ *   retransmission.
+ */
+static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
+                                     struct sk_buff *skb)
+{
+       _enter("%p,%p", conn, skb);
+
+       skb_queue_tail(&conn->rx_queue, skb);
+       rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work);
+}
+
+/*
+ * post endpoint-level events to the local endpoint
+ * - this includes debug and version messages
+ */
+static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
+                                      struct sk_buff *skb)
+{
+       _enter("%p,%p", local, skb);
+
+       if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
+               skb_queue_tail(&local->event_queue, skb);
+               rxrpc_queue_local(local);
+       } else {
+               rxrpc_free_skb(skb, rxrpc_skb_put_input);
+       }
+}
+
+/*
+ * put a packet up for transport-level abort
+ */
+static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
+{
+       if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
+               skb_queue_tail(&local->reject_queue, skb);
+               rxrpc_queue_local(local);
+       } else {
+               rxrpc_free_skb(skb, rxrpc_skb_put_input);
+       }
+}
+
+/*
+ * Extract the wire header from a packet and translate the byte order.
+ */
+static noinline
+int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
+{
+       struct rxrpc_wire_header whdr;
+
+       /* dig out the RxRPC connection details */
+       if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) {
+               trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
+                                     tracepoint_string("bad_hdr"));
+               return -EBADMSG;
+       }
+
+       memset(sp, 0, sizeof(*sp));
+       sp->hdr.epoch           = ntohl(whdr.epoch);
+       sp->hdr.cid             = ntohl(whdr.cid);
+       sp->hdr.callNumber      = ntohl(whdr.callNumber);
+       sp->hdr.seq             = ntohl(whdr.seq);
+       sp->hdr.serial          = ntohl(whdr.serial);
+       sp->hdr.flags           = whdr.flags;
+       sp->hdr.type            = whdr.type;
+       sp->hdr.userStatus      = whdr.userStatus;
+       sp->hdr.securityIndex   = whdr.securityIndex;
+       sp->hdr._rsvd           = ntohs(whdr._rsvd);
+       sp->hdr.serviceId       = ntohs(whdr.serviceId);
+       return 0;
+}
+
+/*
+ * Extract the abort code from an ABORT packet and stash it in skb->priority.
+ */
+static bool rxrpc_extract_abort(struct sk_buff *skb)
+{
+       __be32 wtmp;
+
+       if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
+                         &wtmp, sizeof(wtmp)) < 0)
+               return false;
+       skb->priority = ntohl(wtmp);
+       return true;
+}
+
+/*
+ * handle data received on the local endpoint
+ * - may be called in interrupt context
+ *
+ * [!] Note that as this is called from the encap_rcv hook, the socket is not
+ * held locked by the caller and nothing prevents sk_user_data on the UDP from
+ * being cleared in the middle of processing this function.
+ *
+ * Called with the RCU read lock held from the IP layer via UDP.
+ */
+int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
+{
+       struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
+       struct rxrpc_connection *conn;
+       struct rxrpc_channel *chan;
+       struct rxrpc_call *call = NULL;
+       struct rxrpc_skb_priv *sp;
+       struct rxrpc_peer *peer = NULL;
+       struct rxrpc_sock *rx = NULL;
+       unsigned int channel;
+
+       _enter("%p", udp_sk);
+
+       if (unlikely(!local)) {
+               kfree_skb(skb);
+               return 0;
+       }
+       if (skb->tstamp == 0)
+               skb->tstamp = ktime_get_real();
+
+       rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
+
+       skb_pull(skb, sizeof(struct udphdr));
+
+       /* The UDP protocol already released all skb resources;
+        * we are free to add our own data there.
+        */
+       sp = rxrpc_skb(skb);
+
+       /* dig out the RxRPC connection details */
+       if (rxrpc_extract_header(sp, skb) < 0)
+               goto bad_message;
+
+       if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
+               static int lose;
+               if ((lose++ & 7) == 7) {
+                       trace_rxrpc_rx_lose(sp);
+                       rxrpc_free_skb(skb, rxrpc_skb_put_lose);
+                       return 0;
+               }
+       }
+
+       if (skb->tstamp == 0)
+               skb->tstamp = ktime_get_real();
+       trace_rxrpc_rx_packet(sp);
+
+       switch (sp->hdr.type) {
+       case RXRPC_PACKET_TYPE_VERSION:
+               if (rxrpc_to_client(sp))
+                       goto discard;
+               rxrpc_post_packet_to_local(local, skb);
+               goto out;
+
+       case RXRPC_PACKET_TYPE_BUSY:
+               if (rxrpc_to_server(sp))
+                       goto discard;
+               fallthrough;
+       case RXRPC_PACKET_TYPE_ACK:
+       case RXRPC_PACKET_TYPE_ACKALL:
+               if (sp->hdr.callNumber == 0)
+                       goto bad_message;
+               break;
+       case RXRPC_PACKET_TYPE_ABORT:
+               if (!rxrpc_extract_abort(skb))
+                       return true; /* Just discard if malformed */
+               break;
+
+       case RXRPC_PACKET_TYPE_DATA:
+               if (sp->hdr.callNumber == 0 ||
+                   sp->hdr.seq == 0)
+                       goto bad_message;
+
+               /* Unshare the packet so that it can be modified for in-place
+                * decryption.
+                */
+               if (sp->hdr.securityIndex != 0) {
+                       struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
+                       if (!nskb) {
+                               rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare_nomem);
+                               goto out;
+                       }
+
+                       if (nskb != skb) {
+                               rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare);
+                               skb = nskb;
+                               rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
+                               sp = rxrpc_skb(skb);
+                       }
+               }
+               break;
+
+       case RXRPC_PACKET_TYPE_CHALLENGE:
+               if (rxrpc_to_server(sp))
+                       goto discard;
+               break;
+       case RXRPC_PACKET_TYPE_RESPONSE:
+               if (rxrpc_to_client(sp))
+                       goto discard;
+               break;
+
+               /* Packet types 9-11 should just be ignored. */
+       case RXRPC_PACKET_TYPE_PARAMS:
+       case RXRPC_PACKET_TYPE_10:
+       case RXRPC_PACKET_TYPE_11:
+               goto discard;
+
+       default:
+               goto bad_message;
+       }
+
+       if (sp->hdr.serviceId == 0)
+               goto bad_message;
+
+       if (rxrpc_to_server(sp)) {
+               /* Weed out packets to services we're not offering.  Packets
+                * that would begin a call are explicitly rejected and the rest
+                * are just discarded.
+                */
+               rx = rcu_dereference(local->service);
+               if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
+                           sp->hdr.serviceId != rx->second_service)) {
+                       if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
+                           sp->hdr.seq == 1)
+                               goto unsupported_service;
+                       goto discard;
+               }
+       }
+
+       conn = rxrpc_find_connection_rcu(local, skb, &peer);
+       if (conn) {
+               if (sp->hdr.securityIndex != conn->security_ix)
+                       goto wrong_security;
+
+               if (sp->hdr.serviceId != conn->service_id) {
+                       int old_id;
+
+                       if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
+                               goto reupgrade;
+                       old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
+                                        sp->hdr.serviceId);
+
+                       if (old_id != conn->orig_service_id &&
+                           old_id != sp->hdr.serviceId)
+                               goto reupgrade;
+               }
+
+               if (sp->hdr.callNumber == 0) {
+                       /* Connection-level packet */
+                       _debug("CONN %p {%d}", conn, conn->debug_id);
+                       rxrpc_post_packet_to_conn(conn, skb);
+                       goto out;
+               }
+
+               if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
+                       conn->hi_serial = sp->hdr.serial;
+
+               /* Call-bound packets are routed by connection channel. */
+               channel = sp->hdr.cid & RXRPC_CHANNELMASK;
+               chan = &conn->channels[channel];
+
+               /* Ignore really old calls */
+               if (sp->hdr.callNumber < chan->last_call)
+                       goto discard;
+
+               if (sp->hdr.callNumber == chan->last_call) {
+                       if (chan->call ||
+                           sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
+                               goto discard;
+
+                       /* For the previous service call, if completed
+                        * successfully, we discard all further packets.
+                        */
+                       if (rxrpc_conn_is_service(conn) &&
+                           chan->last_type == RXRPC_PACKET_TYPE_ACK)
+                               goto discard;
+
+                       /* But otherwise we need to retransmit the final packet
+                        * from data cached in the connection record.
+                        */
+                       if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
+                               trace_rxrpc_rx_data(chan->call_debug_id,
+                                                   sp->hdr.seq,
+                                                   sp->hdr.serial,
+                                                   sp->hdr.flags);
+                       rxrpc_post_packet_to_conn(conn, skb);
+                       goto out;
+               }
+
+               call = rcu_dereference(chan->call);
+
+               if (sp->hdr.callNumber > chan->call_id) {
+                       if (rxrpc_to_client(sp))
+                               goto reject_packet;
+                       if (call)
+                               rxrpc_input_implicit_end_call(rx, conn, call);
+                       call = NULL;
+               }
+
+               if (call) {
+                       if (sp->hdr.serviceId != call->service_id)
+                               call->service_id = sp->hdr.serviceId;
+                       if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
+                               call->rx_serial = sp->hdr.serial;
+                       if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
+                               set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
+               }
+       }
+
+       if (!call || refcount_read(&call->ref) == 0) {
+               if (rxrpc_to_client(sp) ||
+                   sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
+                       goto bad_message;
+               if (sp->hdr.seq != 1)
+                       goto discard;
+               call = rxrpc_new_incoming_call(local, rx, skb);
+               if (!call)
+                       goto reject_packet;
+       }
+
+       /* Process a call packet; this either discards or passes on the ref
+        * elsewhere.
+        */
+       rxrpc_input_call_packet(call, skb);
+       goto out;
+
+discard:
+       rxrpc_free_skb(skb, rxrpc_skb_put_input);
+out:
+       trace_rxrpc_rx_done(0, 0);
+       return 0;
+
+wrong_security:
+       trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+                         RXKADINCONSISTENCY, EBADMSG);
+       skb->priority = RXKADINCONSISTENCY;
+       goto post_abort;
+
+unsupported_service:
+       trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+                         RX_INVALID_OPERATION, EOPNOTSUPP);
+       skb->priority = RX_INVALID_OPERATION;
+       goto post_abort;
+
+reupgrade:
+       trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+                         RX_PROTOCOL_ERROR, EBADMSG);
+       goto protocol_error;
+
+bad_message:
+       trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+                         RX_PROTOCOL_ERROR, EBADMSG);
+protocol_error:
+       skb->priority = RX_PROTOCOL_ERROR;
+post_abort:
+       skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
+reject_packet:
+       trace_rxrpc_rx_done(skb->mark, skb->priority);
+       rxrpc_reject_packet(local, skb);
+       _leave(" [badmsg]");
+       return 0;
+}