agent: Add a nice_agent_recv_nonblocking() function
authorPhilip Withnall <philip.withnall@collabora.co.uk>
Thu, 9 Jan 2014 17:59:11 +0000 (17:59 +0000)
committerOlivier Crête <olivier.crete@collabora.com>
Fri, 31 Jan 2014 06:48:59 +0000 (01:48 -0500)
This is a non-blocking variant of nice_agent_recv(), and will be used
internally by the GPollableInputStream implementation. External
implementations may use it as well.

It reserves the right to iterate the main context, but doesn’t currently
do so.

agent/agent.c
agent/agent.h
docs/reference/libnice/libnice-sections.txt
nice/libnice.sym

index 231f5b4..0669b9c 100644 (file)
@@ -2637,6 +2637,181 @@ done:
 }
 
 /**
+ * nice_agent_recv_nonblocking:
+ * @agent: a #NiceAgent
+ * @stream_id: the ID of the stream to receive on
+ * @component_id: the ID of the component to receive on
+ * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
+ * to write the received data into, of length at least @buf_len
+ * @buf_len: length of @buf
+ * @cancellable: (allow-none): a #GCancellable to allow the operation to be
+ * cancelled from another thread, or %NULL
+ * @error: (allow-none): return location for a #GError, or %NULL
+ *
+ * Try to receive data from the given stream/component combination on @agent,
+ * without blocking. If receiving data would block, -1 is returned and a
+ * %G_IO_ERROR_WOULD_BLOCK is set in @error. If any other error occurs, -1 is
+ * returned. Otherwise, 0 is returned if (and only if) @buf_len is 0. In all
+ * other cases, the number of bytes read into @buf is returned, and will be
+ * greater than 0.
+ *
+ * For a reliable @agent, this function will receive as many bytes as possible
+ * up to @buf_len. For a non-reliable @agent, it will receive a single message.
+ * In this case, @buf must be big enough to contain the entire message (65536
+ * bytes), or any excess data may be silently dropped.
+ *
+ * As this function is non-blocking, @cancellable is included only for parity
+ * with nice_agent_recv(). If @cancellable is cancelled before this function is
+ * called, a %G_IO_ERROR_CANCELLED error will be returned immediately.
+ *
+ * This must not be used in combination with nice_agent_attach_recv() on the
+ * same stream/component pair.
+ *
+ * Internally, this may iterate the current thread’s default main context.
+ *
+ * If the stream/component pair doesn’t exist, or if a suitable candidate socket
+ * hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
+ * returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
+ * cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
+ *
+ * Returns: the number of bytes received into @buf on success (guaranteed to be
+ * greater than 0 unless @buf_len is 0), or -1 on error
+ *
+ * Since: 0.1.5
+ */
+NICEAPI_EXPORT gssize
+nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id,
+    guint component_id, guint8 *buf, gsize buf_len, GCancellable *cancellable,
+    GError **error)
+{
+  Component *component;
+  Stream *stream;
+  gssize total_len = 0;
+  gboolean received_enough = FALSE, error_reported = FALSE;
+  gboolean all_sockets_would_block = FALSE;
+  GError *child_error = NULL;
+
+  if (buf_len == 0)
+    return 0;
+
+  /* Support cancellation at the beginning only. */
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  /* Try and receive some data. */
+  agent_lock ();
+
+  if (!agent_find_component (agent, stream_id, component_id,
+          &stream, &component)) {
+    g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
+                 "Invalid stream/component.");
+    total_len = -1;
+    goto done;
+  }
+
+  /* For a reliable stream, grab any data from the pseudo-TCP input buffer
+   * before trying the sockets (which we try to see if there’s any more data
+   * available to read without blocking). */
+  if (agent->reliable && component->tcp != NULL &&
+      pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
+    gssize len;
+
+    len = pseudo_tcp_socket_recv (component->tcp, (gchar *) buf, buf_len);
+    adjust_tcp_clock (agent, stream, component);
+
+    nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from pseudo-TCP read "
+        "buffer.", G_STRFUNC, len);
+
+    if (len < 0 &&
+        pseudo_tcp_socket_get_error (component->tcp) == EWOULDBLOCK) {
+      len = 0;
+    } else if (len < 0 &&
+        pseudo_tcp_socket_get_error (component->tcp) == ENOTCONN) {
+      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
+          "Error reading data from pseudo-TCP socket: not connected.");
+    } else if (len < 0) {
+      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
+          "Error reading data from pseudo-TCP socket.");
+    } else if (len > 0) {
+      /* Got some data! */
+      buf += len;
+      buf_len -= len;
+      total_len += len;
+    }
+
+    received_enough = ((gsize) total_len == buf_len);
+    error_reported = (len < 0);
+  }
+
+  /* Each call to agent_recv_locked() will either receive some data or a socket
+   * error (including EWOULDBLOCK). (Cancellation is not supported.) If *any*
+   * socket returns an error, discard all the data in @buf and return an error
+   * from nice_agent_recv_nonblocking() overall.
+   *
+   * In reliable mode, iterate the loop enough to receive at least one byte.
+   * In non-reliable mode, iterate the loop to receive a single message. */
+  while (!received_enough && !error_reported && !all_sockets_would_block) {
+    GSList *i;
+    gssize len = 0;
+
+    for (i = component->socket_sources; i != NULL; i = i->next) {
+      SocketSource *socket_source = i->data;
+
+      /* Actually read the data. This will return 0 if the data has already been
+       * handled (e.g. for STUN control packets). */
+      len = agent_recv_locked (agent, stream, component,
+          socket_source->socket, buf, buf_len);
+
+      nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from socket %p.",
+         G_STRFUNC, len, socket_source->socket);
+
+      if (len < 0) {
+        g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
+            "Unable to receive from socket %p. Detaching.",
+            socket_source->socket);
+
+        break;
+      } else if (len > 0) {
+        /* Got some data! */
+        buf += len;
+        buf_len -= len;
+        total_len += len;
+
+        break;
+      }
+    }
+
+    received_enough =
+       ((agent->reliable && (gsize) total_len == buf_len) ||
+        (!agent->reliable && total_len > 0));
+    error_reported = (len < 0);
+    all_sockets_would_block = (len == 0);
+  }
+
+  nice_debug ("%s: total_len: %" G_GSSIZE_FORMAT ", buf_len: %" G_GSIZE_FORMAT,
+      G_STRFUNC, total_len, buf_len);
+
+  if (error_reported) {
+    total_len = -1;
+  } else if (total_len == 0 && all_sockets_would_block) {
+    g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+        g_strerror (EAGAIN));
+    total_len = -1;
+  }
+
+done:
+  g_assert ((child_error != NULL) == (total_len == -1));
+  g_assert (total_len != 0);
+
+  if (child_error != NULL)
+    g_propagate_error (error, child_error);
+
+  agent_unlock ();
+
+  return total_len;
+}
+
+/**
  * nice_agent_send_full:
  * @agent: a #NiceAgent
  * @stream_id: the ID of the stream to send to
index f7e808b..8b4c974 100644 (file)
@@ -711,6 +711,16 @@ nice_agent_recv (
     GCancellable *cancellable,
     GError **error);
 
+gssize
+nice_agent_recv_nonblocking (
+    NiceAgent *agent,
+    guint stream_id,
+    guint component_id,
+    guint8 *buf,
+    gsize buf_len,
+    GCancellable *cancellable,
+    GError **error);
+
 /**
  * nice_agent_set_selected_pair:
  * @agent: The #NiceAgent Object
index 74fdb24..f763133 100644 (file)
@@ -25,6 +25,7 @@ nice_agent_get_selected_pair
 nice_agent_send
 nice_agent_send_full
 nice_agent_recv
+nice_agent_recv_nonblocking
 nice_agent_attach_recv
 nice_agent_set_selected_pair
 nice_agent_set_selected_remote_candidate
index bc25f66..f075a57 100644 (file)
@@ -17,6 +17,7 @@ nice_address_to_string
 nice_agent_add_local_address
 nice_agent_add_stream
 nice_agent_recv
+nice_agent_recv_nonblocking
 nice_agent_attach_recv
 nice_agent_gather_candidates
 nice_agent_generate_local_candidate_sdp