Factor out pseudotcp so it's not used if the transport is already reliable
authorYouness Alaoui <youness.alaoui@collabora.co.uk>
Fri, 4 Apr 2014 06:12:02 +0000 (02:12 -0400)
committerOlivier Crête <olivier.crete@ocrete.ca>
Thu, 15 May 2014 13:43:12 +0000 (09:43 -0400)
agent/agent.c
agent/outputstream.c

index 72b4cd4..b0aea57 100644 (file)
@@ -133,6 +133,21 @@ static GStaticMutex agent_mutex = G_STATIC_REC_MUTEX_INIT;
 
 static void priv_free_upnp (NiceAgent *agent);
 
+static void pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data);
+static void pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data);
+static void pseudo_tcp_socket_writable (PseudoTcpSocket *sock, gpointer user_data);
+static void pseudo_tcp_socket_closed (PseudoTcpSocket *sock, guint32 err,
+    gpointer user_data);
+static PseudoTcpWriteResult pseudo_tcp_socket_write_packet (PseudoTcpSocket *sock,
+    const gchar *buffer, guint32 len, gpointer user_data);
+static void adjust_tcp_clock (NiceAgent *agent, Stream *stream, Component *component);
+
+static void nice_agent_dispose (GObject *object);
+static void nice_agent_get_property (GObject *object,
+  guint property_id, GValue *value, GParamSpec *pspec);
+static void nice_agent_set_property (GObject *object,
+  guint property_id, const GValue *value, GParamSpec *pspec);
+
 #if GLIB_CHECK_VERSION(2,31,8)
 void agent_lock (void)
 {
@@ -337,25 +352,6 @@ agent_find_component (
   return TRUE;
 }
 
-
-static void
-nice_agent_dispose (GObject *object);
-
-static void
-nice_agent_get_property (
-  GObject *object,
-  guint property_id,
-  GValue *value,
-  GParamSpec *pspec);
-
-static void
-nice_agent_set_property (
-  GObject *object,
-  guint property_id,
-  const GValue *value,
-  GParamSpec *pspec);
-
-
 static void
 nice_agent_class_init (NiceAgentClass *klass)
 {
@@ -1074,6 +1070,21 @@ nice_agent_set_property (
 
 }
 
+static void
+pseudo_tcp_socket_create (NiceAgent *agent, Stream *stream, Component *component)
+{
+  PseudoTcpCallbacks tcp_callbacks = {component,
+                                      pseudo_tcp_socket_opened,
+                                      pseudo_tcp_socket_readable,
+                                      pseudo_tcp_socket_writable,
+                                      pseudo_tcp_socket_closed,
+                                      pseudo_tcp_socket_write_packet};
+  component->tcp = pseudo_tcp_socket_new (0, &tcp_callbacks);
+  component->tcp_writable_cancellable = g_cancellable_new ();
+  nice_debug ("Agent %p: Create Pseudo Tcp Socket for component %d",
+      agent, component->id);
+}
+
 static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
     Component *component)
 {
@@ -1098,10 +1109,6 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
 }
 
 static void
-adjust_tcp_clock (NiceAgent *agent, Stream *stream, Component *component);
-
-
-static void
 pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data)
 {
   Component *component = user_data;
@@ -1571,7 +1578,9 @@ process_queued_tcp_packets (NiceAgent *agent, Stream *stream,
   guint stream_id = stream->id;
   guint component_id = component->id;
 
-  if (component->selected_pair.local == NULL || component->tcp == NULL)
+  if (component->selected_pair.local == NULL ||
+      (!nice_socket_is_reliable (component->selected_pair.local->sockptr)
+          && component->tcp == NULL))
     return;
 
   nice_debug ("%s: Sending outstanding packets for agent %p.", G_STRFUNC,
@@ -1625,17 +1634,14 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id,
   }
 
   if(agent->reliable) {
-    if (lcandidate->transport == NICE_CANDIDATE_TRANSPORT_UDP) {
-      if (component->tcp) {
-        process_queued_tcp_packets (agent, stream, component);
+    if (!nice_socket_is_reliable (lcandidate->sockptr)) {
+      if (!component->tcp)
+        pseudo_tcp_socket_create (agent, stream, component);
+      process_queued_tcp_packets (agent, stream, component);
 
-        pseudo_tcp_socket_connect (component->tcp);
-        pseudo_tcp_socket_notify_mtu (component->tcp, MAX_TCP_MTU);
-        adjust_tcp_clock (agent, stream, component);
-      } else {
-        nice_debug ("New reliable UDP pair selected but pseudo tcp socket in error");
-        return;
-      }
+      pseudo_tcp_socket_connect (component->tcp);
+      pseudo_tcp_socket_notify_mtu (component->tcp, MAX_TCP_MTU);
+      adjust_tcp_clock (agent, stream, component);
     } else {
       nice_debug ("ICE-TCP not yet supported");
       return;
@@ -1690,14 +1696,6 @@ void agent_signal_component_state_change (NiceAgent *agent, guint stream_id, gui
           &stream, &component))
     return;
 
-  if (agent->reliable && component->tcp == NULL &&
-      state != NICE_COMPONENT_STATE_FAILED) {
-    nice_debug ("Agent %p: not changing component state for s%d:%d to %s "
-        "because pseudo tcp socket does not exist in reliable mode", agent,
-        stream->id, component->id, nice_component_state_to_string (state));
-    return;
-  }
-
   if (component->state != state && state < NICE_COMPONENT_STATE_LAST) {
     nice_debug ("Agent %p : stream %u component %u STATE-CHANGE %s -> %s.", agent,
         stream_id, component_id, nice_component_state_to_string (component->state),
@@ -1885,17 +1883,7 @@ nice_agent_add_stream (
     for (i = 0; i < n_components; i++) {
       Component *component = stream_find_component_by_id (stream, i + 1);
       if (component) {
-        PseudoTcpCallbacks tcp_callbacks = {component,
-                                            pseudo_tcp_socket_opened,
-                                            pseudo_tcp_socket_readable,
-                                            pseudo_tcp_socket_writable,
-                                            pseudo_tcp_socket_closed,
-                                            pseudo_tcp_socket_write_packet};
-        component->tcp = pseudo_tcp_socket_new (0, &tcp_callbacks);
-        component->tcp_writable_cancellable = g_cancellable_new ();
-        adjust_tcp_clock (agent, stream, component);
-        nice_debug ("Agent %p: Create Pseudo Tcp Socket for component %d",
-            agent, i+1);
+        pseudo_tcp_socket_create (agent, stream, component);
       } else {
         nice_debug ("Agent %p: couldn't find component %d", agent, i+1);
       }
@@ -2618,13 +2606,6 @@ _set_remote_candidates_locked (NiceAgent *agent, Stream *stream,
   const GSList *i;
   int added = 0;
 
-  if (agent->reliable && component->tcp == NULL) {
-    nice_debug ("Agent %p: not setting remote candidate for s%d:%d because "
-        "pseudo tcp socket does not exist in reliable mode", agent,
-        stream->id, component->id);
-    goto done;
-  }
-
  for (i = candidates; i && added >= 0; i = i->next) {
    NiceCandidate *d = (NiceCandidate*) i->data;
 
@@ -2654,7 +2635,6 @@ _set_remote_candidates_locked (NiceAgent *agent, Stream *stream,
      nice_debug ("Agent %p : Warning: unable to schedule any conn checks!", agent);
  }
 
- done:
  return added;
 }
 
@@ -2824,43 +2804,50 @@ agent_recv_message_unlocked (
   }
 
   /* Unhandled STUN; try handling TCP data, then pass to the client. */
-  if (message->length > 0 && component->tcp) {
-    /* If we don’t yet have an underlying selected socket, queue up the incoming
-     * data to handle later. This is because we can’t send ACKs (or, more
-     * importantly for the first few packets, SYNACKs) without an underlying
-     * socket. We’d rather wait a little longer for a pair to be selected, then
-     * process the incoming packets and send out ACKs, than try to process them
-     * now, fail to send the ACKs, and incur a timeout in our pseudo-TCP state
-     * machine. */
-    if (component->selected_pair.local == NULL) {
-      GOutputVector *vec = g_slice_new (GOutputVector);
-      vec->buffer = compact_input_message (message, &vec->size);
-      g_queue_push_tail (&component->queued_tcp_packets, vec);
-      nice_debug ("%s: Queued %" G_GSSIZE_FORMAT " bytes for agent %p.",
-          G_STRFUNC, vec->size, agent);
-
-      return RECV_OOB;
-    } else {
-      process_queued_tcp_packets (agent, stream, component);
-    }
+  if (message->length > 0  && agent->reliable) {
+    if (!nice_socket_is_reliable (nicesock) && component->tcp) {
+      /* If we don’t yet have an underlying selected socket, queue up the
+       * incoming data to handle later. This is because we can’t send ACKs (or,
+       * more importantly for the first few packets, SYNACKs) without an
+       * underlying socket. We’d rather wait a little longer for a pair to be
+       * selected, then process the incoming packets and send out ACKs, than try
+       * to process them now, fail to send the ACKs, and incur a timeout in our
+       * pseudo-TCP state machine. */
+      if (component->selected_pair.local == NULL) {
+        GOutputVector *vec = g_slice_new (GOutputVector);
+        vec->buffer = compact_input_message (message, &vec->size);
+        g_queue_push_tail (&component->queued_tcp_packets, vec);
+        nice_debug ("%s: Queued %" G_GSSIZE_FORMAT " bytes for agent %p.",
+            G_STRFUNC, vec->size, agent);
+
+        return RECV_OOB;
+      } else {
+        process_queued_tcp_packets (agent, stream, component);
+      }
 
-    /* Received data on a reliable connection. */
+      /* Received data on a reliable connection. */
 
-    nice_debug ("%s: notifying pseudo-TCP of packet, length %" G_GSIZE_FORMAT,
-        G_STRFUNC, message->length);
-    pseudo_tcp_socket_notify_message (component->tcp, message);
+      nice_debug ("%s: notifying pseudo-TCP of packet, length %" G_GSIZE_FORMAT,
+          G_STRFUNC, message->length);
+      pseudo_tcp_socket_notify_message (component->tcp, message);
 
-    adjust_tcp_clock (agent, stream, component);
+      adjust_tcp_clock (agent, stream, component);
 
-    /* Success! Handled out-of-band. */
-    retval = RECV_OOB;
-    goto done;
-  } else if (message->length > 0 && !component->tcp && agent->reliable) {
-    /* Received data on a reliable connection which has no TCP component. */
-    nice_debug ("Received data on a pseudo tcp FAILED component. Ignoring.");
+      /* Success! Handled out-of-band. */
+      retval = RECV_OOB;
+      goto done;
+    } else if (!nice_socket_is_reliable (nicesock)) {
+      nice_debug ("Received data on a pseudo tcp FAILED component. Ignoring.");
 
-    retval = RECV_OOB;
-    goto done;
+      retval = RECV_OOB;
+      goto done;
+    } else {
+      /* Received data on a reliable connection which has no TCP component. */
+      nice_debug ("Ice TCP unsupported\n");
+
+      retval = RECV_OOB;
+      goto done;
+    }
   }
 
 done:
@@ -3525,7 +3512,7 @@ nice_agent_send_messages_nonblocking_internal (
     }
 
     if(agent->reliable) {
-      if (component->selected_pair.local->transport == NICE_CANDIDATE_TRANSPORT_UDP) {
+      if (!nice_socket_is_reliable (component->selected_pair.local->sockptr)) {
         if (component->tcp != NULL) {
           /* Send on the pseudo-TCP socket. */
           n_sent = pseudo_tcp_socket_send_messages (component->tcp, messages,
@@ -3541,7 +3528,7 @@ nice_agent_send_messages_nonblocking_internal (
           }
         } else {
           g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
-              "Error writing data to failed pseudo-TCP socket.");
+              "Pseudo-TCP socket not connected.");
         }
       } else {
           g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
@@ -3849,69 +3836,73 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data)
 
   if (agent->reliable) {
 #define TCP_HEADER_SIZE 24 /* bytes */
-    guint8 local_header_buf[TCP_HEADER_SIZE];
-    /* FIXME: Currently, the critical path for reliable packet delivery has two
-     * memcpy()s: one into the pseudo-TCP receive buffer, and one out of it.
-     * This could moderately easily be reduced to one memcpy() in the common
-     * case of in-order packet delivery, by replacing local_body_buf with a
-     * pointer into the pseudo-TCP receive buffer. If it turns out the packet
-     * is out-of-order (which we can only know after parsing its header), the
-     * data will need to be moved in the buffer. If the packet *is* in order,
-     * however, the only memcpy() then needed is from the pseudo-TCP receive
-     * buffer to the client’s message buffers.
-     *
-     * In fact, in the case of a reliable agent with I/O callbacks, zero
-     * memcpy()s can be achieved (for in-order packet delivery) by emittin the
-     * I/O callback directly from the pseudo-TCP receive buffer. */
-    guint8 local_body_buf[MAX_BUFFER_SIZE];
-    GInputVector local_bufs[] = {
-      { local_header_buf, sizeof (local_header_buf) },
-      { local_body_buf, sizeof (local_body_buf) },
-    };
-    NiceInputMessage local_message = {
-      local_bufs, G_N_ELEMENTS (local_bufs), NULL, 0
-    };
-    RecvStatus retval = 0;
-
-    if (component->tcp == NULL) {
-      nice_debug ("Agent %p: not handling incoming packet for s%d:%d "
-          "because pseudo-TCP socket does not exist in reliable mode.", agent,
-          stream->id, component->id);
-      remove_source = TRUE;
-      goto done;
-    }
-
-    while (has_io_callback ||
-           (component->recv_messages != NULL &&
-            !nice_input_message_iter_is_at_end (&component->recv_messages_iter,
-                component->recv_messages, component->n_recv_messages))) {
-      /* Receive a single message. This will receive it into the given
-       * @local_bufs then, for pseudo-TCP, emit I/O callbacks or copy it into
-       * component->recv_messages in pseudo_tcp_socket_readable(). STUN packets
-       * will be parsed in-place. */
-      retval = agent_recv_message_unlocked (agent, stream, component,
-          socket_source->socket, &local_message);
-
-      nice_debug ("%s: %p: received %d valid messages with %" G_GSSIZE_FORMAT
-           " bytes", G_STRFUNC, agent, retval, local_message.length);
-
-      /* Don’t expect any valid messages to escape pseudo_tcp_socket_readable()
-       * when in reliable mode. */
-      g_assert_cmpint (retval, !=, RECV_SUCCESS);
-
-      if (retval == RECV_WOULD_BLOCK) {
-        /* EWOULDBLOCK. */
-        break;
-      } else if (retval == RECV_ERROR) {
-        /* Other error. */
-        nice_debug ("%s: error receiving message", G_STRFUNC);
+    if (!nice_socket_is_reliable (socket_source->socket)) {
+      guint8 local_header_buf[TCP_HEADER_SIZE];
+      /* FIXME: Currently, the critical path for reliable packet delivery has two
+       * memcpy()s: one into the pseudo-TCP receive buffer, and one out of it.
+       * This could moderately easily be reduced to one memcpy() in the common
+       * case of in-order packet delivery, by replacing local_body_buf with a
+       * pointer into the pseudo-TCP receive buffer. If it turns out the packet
+       * is out-of-order (which we can only know after parsing its header), the
+       * data will need to be moved in the buffer. If the packet *is* in order,
+       * however, the only memcpy() then needed is from the pseudo-TCP receive
+       * buffer to the client’s message buffers.
+       *
+       * In fact, in the case of a reliable agent with I/O callbacks, zero
+       * memcpy()s can be achieved (for in-order packet delivery) by emittin the
+       * I/O callback directly from the pseudo-TCP receive buffer. */
+      guint8 local_body_buf[MAX_BUFFER_SIZE];
+      GInputVector local_bufs[] = {
+        { local_header_buf, sizeof (local_header_buf) },
+        { local_body_buf, sizeof (local_body_buf) },
+      };
+      NiceInputMessage local_message = {
+        local_bufs, G_N_ELEMENTS (local_bufs), NULL, 0
+      };
+      RecvStatus retval = 0;
+
+      if (component->tcp == NULL) {
+        nice_debug ("Agent %p: not handling incoming packet for s%d:%d "
+            "because pseudo-TCP socket does not exist in reliable mode.", agent,
+            stream->id, component->id);
         remove_source = TRUE;
-        break;
+        goto done;
       }
 
-      has_io_callback = component_has_io_callback (component);
+      while (has_io_callback ||
+          (component->recv_messages != NULL &&
+              !nice_input_message_iter_is_at_end (&component->recv_messages_iter,
+                  component->recv_messages, component->n_recv_messages))) {
+        /* Receive a single message. This will receive it into the given
+         * @local_bufs then, for pseudo-TCP, emit I/O callbacks or copy it into
+         * component->recv_messages in pseudo_tcp_socket_readable(). STUN packets
+         * will be parsed in-place. */
+        retval = agent_recv_message_unlocked (agent, stream, component,
+            socket_source->socket, &local_message);
+
+        nice_debug ("%s: %p: received %d valid messages with %" G_GSSIZE_FORMAT
+            " bytes", G_STRFUNC, agent, retval, local_message.length);
+
+        /* Don’t expect any valid messages to escape pseudo_tcp_socket_readable()
+         * when in reliable mode. */
+        g_assert_cmpint (retval, !=, RECV_SUCCESS);
+
+        if (retval == RECV_WOULD_BLOCK) {
+          /* EWOULDBLOCK. */
+          break;
+        } else if (retval == RECV_ERROR) {
+          /* Other error. */
+          nice_debug ("%s: error receiving message", G_STRFUNC);
+          remove_source = TRUE;
+          break;
+        }
+
+        has_io_callback = component_has_io_callback (component);
+      }
+    } else {
+      nice_debug ("unsupported ice-tcp");
     }
-  } else if (!agent->reliable && has_io_callback) {
+  } else if (has_io_callback) {
     while (has_io_callback) {
       guint8 local_buf[MAX_BUFFER_SIZE];
       GInputVector local_bufs = { local_buf, sizeof (local_buf) };
@@ -3940,7 +3931,7 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data)
 
       has_io_callback = component_has_io_callback (component);
     }
-  } else if (!agent->reliable && component->recv_messages != NULL) {
+  } else if (component->recv_messages != NULL) {
     RecvStatus retval;
 
     /* Don’t want to trample over partially-valid buffers. */
@@ -4066,7 +4057,8 @@ nice_agent_set_selected_pair (
   /* step: stop connectivity checks (note: for the whole stream) */
   conn_check_prune_stream (agent, stream);
 
-  if (agent->reliable && component->tcp == NULL) {
+  if (agent->reliable && !nice_socket_is_reliable (pair.local->sockptr) &&
+      component->tcp == NULL) {
     nice_debug ("Agent %p: not setting selected pair for s%d:%d because "
         "pseudo tcp socket does not exist in reliable mode", agent,
         stream->id, component->id);
@@ -4178,6 +4170,8 @@ nice_agent_set_selected_remote_candidate (
   Stream *stream;
   NiceCandidate *lcandidate = NULL;
   gboolean ret = FALSE;
+  NiceCandidate *local = NULL, *remote = NULL;
+  guint64 priority;
 
   agent_lock();
 
@@ -4189,13 +4183,10 @@ nice_agent_set_selected_remote_candidate (
   /* step: stop connectivity checks (note: for the whole stream) */
   conn_check_prune_stream (agent, stream);
 
-
-  if (agent->reliable && component->tcp == NULL) {
-    nice_debug ("Agent %p: not setting selected remote candidate s%d:%d because "
-        "pseudo tcp socket does not exist in reliable mode", agent,
-        stream->id, component->id);
-    goto done;
-  }
+  /* Store previous selected pair */
+  local = component->selected_pair.local;
+  remote = component->selected_pair.remote;
+  priority = component->selected_pair.priority;
 
   /* step: set the selected pair */
   lcandidate = component_set_selected_remote_candidate (agent, component,
@@ -4203,6 +4194,19 @@ nice_agent_set_selected_remote_candidate (
   if (!lcandidate)
     goto done;
 
+  if (agent->reliable && !nice_socket_is_reliable (lcandidate->sockptr) &&
+      component->tcp == NULL) {
+    nice_debug ("Agent %p: not setting selected remote candidate s%d:%d because"
+        " pseudo tcp socket does not exist in reliable mode", agent,
+        stream->id, component->id);
+    /* Revert back to previous selected pair */
+    /* FIXME: by doing this, we lose the keepalive tick */
+    component->selected_pair.local = local;
+    component->selected_pair.remote = remote;
+    component->selected_pair.priority = priority;
+    goto done;
+  }
+
   /* step: change component state */
   agent_signal_component_state_change (agent, stream_id, component_id, NICE_COMPONENT_STATE_READY);
 
index 74b567b..c92b64d 100644 (file)
@@ -490,22 +490,24 @@ nice_output_stream_is_writable (GPollableOutputStream *stream)
         priv->stream_id);
     goto done;
   }
+  if (component->selected_pair.local != NULL) {
+    /* If it’s a reliable agent, see if there’s any space in the pseudo-TCP
+     * output buffer. */
+    if (!nice_socket_is_reliable (component->selected_pair.local->sockptr) &&
+        component->tcp != NULL) {
+      retval = pseudo_tcp_socket_can_send (component->tcp);
+      goto done;
+    }
 
-  /* If it’s a reliable agent, see if there’s any space in the pseudo-TCP output
-   * buffer. */
-  if (component->tcp != NULL) {
-    retval = pseudo_tcp_socket_can_send (component->tcp);
-    goto done;
-  }
-
-  /* Check whether any of the component’s FDs are pollable. */
-  for (i = component->socket_sources; i != NULL; i = i->next) {
-    SocketSource *socket_source = i->data;
-    NiceSocket *nicesock = socket_source->socket;
+    /* Check whether any of the component’s FDs are pollable. */
+    for (i = component->socket_sources; i != NULL; i = i->next) {
+      SocketSource *socket_source = i->data;
+      NiceSocket *nicesock = socket_source->socket;
 
-    if (g_socket_condition_check (nicesock->fileno, G_IO_OUT) != 0) {
-      retval = TRUE;
-      break;
+      if (g_socket_condition_check (nicesock->fileno, G_IO_OUT) != 0) {
+        retval = TRUE;
+        break;
+      }
     }
   }