agent: Add support for vectored I/O for sends
authorPhilip Withnall <philip.withnall@collabora.co.uk>
Tue, 21 Jan 2014 15:56:18 +0000 (15:56 +0000)
committerOlivier Crête <olivier.crete@collabora.com>
Fri, 31 Jan 2014 06:49:07 +0000 (01:49 -0500)
Add one new public function, nice_agent_send_messages_nonblocking(),
which replaces nice_agent_send_full(). This isn’t an API break, because
nice_agent_send_full() hasn’t been in a release yet. The new API allows
sending multiple messages in a single call, and supports vectors of
buffers to transmit the messages from.

The existing nice_agent_send() API has been left untouched, although
it’s a bit of a bugbear because it’s non-blocking and doesn’t fit with
the new *_nonblocking() naming scheme. Oh well.

This doesn’t bring any notable changes to the number of memcpy()s on the
critical path: it remains at zero for the common cases and common socket
types. It introduces the possibility for future work to eliminate some
memcpy()s in more complex socket types, like tcp-turn and tcp-bsd, but
these optimisations have not been made yet. FIXME comments have been
added.

This includes modifications to the test-send-recv unit test to cover the
new API.

agent/agent.c
agent/agent.h
agent/outputstream.c
agent/pseudotcp.c
agent/pseudotcp.h
docs/reference/libnice/libnice-sections.txt
nice/libnice.sym
tests/test-send-recv.c

index d5bdf5f..4f7842b 100644 (file)
@@ -1028,6 +1028,62 @@ pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data)
       stream->id, component->id);
 }
 
+/* Will attempt to queue all @n_messages into the pseudo-TCP transmission
+ * buffer. This is always used in reliable mode, so essentially treats @messages
+ * as a massive flat array of buffers.
+ *
+ * Returns the number of messages successfully sent on success (which may be
+ * zero if sending the first buffer of the message would have blocked), or
+ * a negative number on error. */
+static gint
+pseudo_tcp_socket_send_messages (PseudoTcpSocket *self,
+    const NiceOutputMessage *messages, guint n_messages, GError **error)
+{
+  guint i;
+
+  for (i = 0; i < n_messages; i++) {
+    const NiceOutputMessage *message = &messages[i];
+    guint j;
+
+    /* If there’s not enough space for the entire message, bail now before
+     * queuing anything. This doesn’t gel with the fact this function is only
+     * used in reliable mode, and there is no concept of a ‘message’, but is
+     * necessary because the calling API has no way of returning to the client
+     * and indicating that a message was partially sent. */
+    if (message->length > pseudo_tcp_socket_get_available_send_space (self)) {
+      return i;
+    }
+
+    for (j = 0;
+         (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
+         (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
+         j++) {
+      const GOutputVector *buffer = &message->buffers[j];
+      gssize ret;
+
+      /* Send on the pseudo-TCP socket. */
+      ret = pseudo_tcp_socket_send (self, buffer->buffer, buffer->size);
+
+      /* In case of -1, the error is either EWOULDBLOCK or ENOTCONN, which both
+       * need the user to wait for the reliable-transport-writable signal */
+      if (ret < 0 && pseudo_tcp_socket_get_error (self) == EWOULDBLOCK) {
+        ret = 0;
+        return i;
+      } else if (ret < 0 && pseudo_tcp_socket_get_error (self) == ENOTCONN) {
+        g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+            "TCP connection is not yet established.");
+        return ret;
+      } else if (ret < 0) {
+        g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+            "Error writing data to pseudo-TCP socket.");
+        return ret;
+      }
+    }
+  }
+
+  return i;
+}
+
 /* Will fill up @messages from the first free byte onwards (as determined using
  * @iter). This is always used in reliable mode, so it essentially treats
  * @messages as a massive flat array of buffers.
@@ -1236,6 +1292,8 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
   if (component->selected_pair.local != NULL) {
     NiceSocket *sock;
     NiceAddress *addr;
+    GOutputVector local_buf;
+    NiceOutputMessage local_message;
 
     sock = component->selected_pair.local->sockptr;
 
@@ -1253,7 +1311,15 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
 #endif
 
     addr = &component->selected_pair.remote->addr;
-    if (nice_socket_send (sock, addr, len, buffer)) {
+
+    local_buf.buffer = buffer;
+    local_buf.size = len;
+    local_message.buffers = &local_buf;
+    local_message.n_buffers = 1;
+    local_message.to = addr;
+    local_message.length = len;
+
+    if (nice_socket_send_messages (sock, &local_message, 1)) {
       return WR_SUCCESS;
     }
   } else {
@@ -3141,62 +3207,25 @@ nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id,
   return local_messages.length;
 }
 
-/**
- * nice_agent_send_full:
- * @agent: a #NiceAgent
- * @stream_id: the ID of the stream to send to
- * @component_id: the ID of the component to send to
- * @buf: (array length=buf_len): data to transmit, of at least @buf_len bytes in
- * size
- * @buf_len: length of valid data in @buf, in bytes
- * @cancellable: (allow-none): a #GCancellable to cancel the operation from
- * another thread, or %NULL
- * @error: (allow-none): return location for a #GError, or %NULL
- *
- * Sends the data in @buf on the socket identified by the given stream/component
- * pair. Transmission is non-blocking, so a %G_IO_ERROR_WOULD_BLOCK error may be
- * returned if the send buffer is full.
- *
- * As with nice_agent_send(), the given component must be in
- * %NICE_COMPONENT_STATE_READY or, as a special case, in any state if it was
- * previously ready and was then restarted.
- *
- * On success, the number of bytes written to the socket will be returned (which
- * will always be @buf_len when in non-reliable mode, and may be less than
- * @buf_len when in reliable mode).
- *
- * On failure, -1 will be returned and @error will be set. If the #NiceAgent is
- * reliable and the socket is not yet connected, %G_IO_ERROR_BROKEN_PIPE will be
- * returned; if the write buffer is full, %G_IO_ERROR_WOULD_BLOCK will be
- * returned. In both cases, wait for the #NiceAgent::reliable-transport-writable
- * signal before trying again. If the given @stream_id or @component_id are
- * invalid or not yet connected, %G_IO_ERROR_BROKEN_PIPE will be returned.
- * %G_IO_ERROR_FAILED will be returned for other errors.
- *
- * Returns: the number of bytes sent (guaranteed to be greater than 0), or -1 on
- * error
- *
- * Since: 0.1.5
- */
-NICEAPI_EXPORT gssize
-nice_agent_send_full (
+NICEAPI_EXPORT gint
+nice_agent_send_messages_nonblocking (
   NiceAgent *agent,
   guint stream_id,
   guint component_id,
-  const guint8 *buf,
-  gsize buf_len,
+  const NiceOutputMessage *messages,
+  guint n_messages,
   GCancellable *cancellable,
   GError **error)
 {
   Stream *stream;
   Component *component;
-  gssize ret = -1;
+  gint n_sent_messages = -1;
   GError *child_error = NULL;
 
   g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
   g_return_val_if_fail (stream_id >= 1, -1);
   g_return_val_if_fail (component_id >= 1, -1);
-  g_return_val_if_fail (buf != NULL, -1);
+  g_return_val_if_fail (n_messages == 0 || messages != NULL, -1);
   g_return_val_if_fail (
       cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
   g_return_val_if_fail (error == NULL || *error == NULL, -1);
@@ -3218,79 +3247,75 @@ nice_agent_send_full (
 
   if (component->tcp != NULL) {
     /* Send on the pseudo-TCP socket. */
-    ret = pseudo_tcp_socket_send (component->tcp, (const gchar *) buf, buf_len);
+    n_sent_messages = pseudo_tcp_socket_send_messages (component->tcp, messages,
+        n_messages, &child_error);
     adjust_tcp_clock (agent, stream, component);
 
     if (!pseudo_tcp_socket_can_send (component->tcp))
       g_cancellable_reset (component->tcp_writable_cancellable);
 
-    /* In case of -1, the error is either EWOULDBLOCK or ENOTCONN, which both
-       need the user to wait for the reliable-transport-writable signal */
-    if (ret < 0 &&
-        pseudo_tcp_socket_get_error (component->tcp) == EWOULDBLOCK) {
-      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-          "Write would block.");
-      goto done;
-    } else if (ret < 0 &&
-        pseudo_tcp_socket_get_error (component->tcp) == ENOTCONN) {
-      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-          "TCP connection is not yet established.");
-      goto done;
-    } else if (ret < 0) {
+    if (n_sent_messages < 0) {
       /* Signal error */
       priv_pseudo_tcp_error (agent, stream, component);
-
-      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
-          "Error writing data to pseudo-TCP socket.");
-      goto done;
     }
   } else if (agent->reliable) {
     g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
         "Error writing data to failed pseudo-TCP socket.");
-    goto done;
   } else if (component->selected_pair.local != NULL) {
     NiceSocket *sock;
     NiceAddress *addr;
+    guint i;
 
 #ifndef NDEBUG
     gchar tmpbuf[INET6_ADDRSTRLEN];
     nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
 
-    nice_debug ("Agent %p : s%d:%d: sending %" G_GSIZE_FORMAT " bytes to "
-        "[%s]:%d", agent, stream_id, component_id, buf_len, tmpbuf,
+    nice_debug ("Agent %p : s%d:%d: sending %u messages to "
+        "[%s]:%d", agent, stream_id, component_id, n_messages, tmpbuf,
         nice_address_get_port (&component->selected_pair.remote->addr));
 #endif
 
     sock = component->selected_pair.local->sockptr;
     addr = &component->selected_pair.remote->addr;
 
-    if (nice_socket_send (sock, addr, buf_len, (const gchar *) buf)) {
-      /* Success: sent all the bytes. */
-      ret = buf_len;
-    } else {
-      /* Some error. Since nice_socket_send() provides absolutely no useful
-       * feedback, assume it’s EWOULDBLOCK. */
-      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-          "Error writing data to socket: probably would block.");
-      goto done;
+    /* Set the destination address. FIXME: This is ugly. */
+    for (i = 0; i < n_messages; i++) {
+      NiceOutputMessage *message = (NiceOutputMessage *) &messages[i];
+      message->to = addr;
+    }
+
+    n_sent_messages = nice_socket_send_messages (sock, messages, n_messages);
+
+    if (n_sent_messages < 0) {
+      g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
+          "Error writing data to socket.");
     }
   } else {
     /* Socket isn’t properly open yet. */
-    g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-        "Can’t send: No selected pair yet.");
-    goto done;
+    n_sent_messages = 0;  /* EWOULDBLOCK */
+  }
+
+  /* Handle errors and cancellations. */
+  if (n_sent_messages == 0) {
+    g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+        g_strerror (EAGAIN));
+    n_sent_messages = -1;
   }
 
+  nice_debug ("%s: n_sent_messages: %d, n_messages: %u", G_STRFUNC,
+      n_sent_messages, n_messages);
+
 done:
-  g_assert ((child_error != NULL) == (ret == -1));
-  g_assert (ret != 0);
+  g_assert ((child_error != NULL) == (n_sent_messages == -1));
+  g_assert (n_sent_messages != 0);
+  g_assert (n_sent_messages < 0 || (guint) n_sent_messages <= n_messages);
 
   if (child_error != NULL)
     g_propagate_error (error, child_error);
 
   agent_unlock ();
 
-  return ret;
+  return n_sent_messages;
 }
 
 NICEAPI_EXPORT gint
@@ -3301,8 +3326,16 @@ nice_agent_send (
   guint len,
   const gchar *buf)
 {
-  return nice_agent_send_full (agent, stream_id, component_id,
-      (const guint8 *) buf, len, NULL, NULL);
+  GOutputVector local_buf = { buf, len };
+  NiceOutputMessage local_message = { &local_buf, 1, NULL, len };
+  gint n_sent_messages;
+
+  n_sent_messages = nice_agent_send_messages_nonblocking (agent, stream_id,
+      component_id, &local_message, 1, NULL, NULL);
+
+  if (n_sent_messages == 1)
+    return len;
+  return n_sent_messages;
 }
 
 NICEAPI_EXPORT GSList *
index 6afc494..3f34506 100644 (file)
@@ -659,28 +659,29 @@ nice_agent_send (
   const gchar *buf);
 
 /**
- * nice_agent_send_full:
+ * nice_agent_send_messages_nonblocking:
  * @agent: a #NiceAgent
  * @stream_id: the ID of the stream to send to
  * @component_id: the ID of the component to send to
- * @buf: (array length=buf_len): data to transmit, of at least @buf_len bytes in
- * size
- * @buf_len: length of valid data in @buf, in bytes
+ * @messages: (array length=n_messages): array of messages to send, of at least
+ * @n_messages entries in length
+ * @n_messages: number of entries in @messages
  * @cancellable: (allow-none): a #GCancellable to cancel the operation from
  * another thread, or %NULL
  * @error: (allow-none): return location for a #GError, or %NULL
  *
- * Sends the data in @buf on the socket identified by the given stream/component
- * pair. Transmission is non-blocking, so a %G_IO_ERROR_WOULD_BLOCK error may be
- * returned if the send buffer is full.
+ * Sends multiple messages on the socket identified by the given
+ * stream/component pair. Transmission is non-blocking, so a
+ * %G_IO_ERROR_WOULD_BLOCK error may be returned if the send buffer is full.
  *
  * As with nice_agent_send(), the given component must be in
  * %NICE_COMPONENT_STATE_READY or, as a special case, in any state if it was
  * previously ready and was then restarted.
  *
- * On success, the number of bytes written to the socket will be returned (which
- * will always be @buf_len when in non-reliable mode, and may be less than
- * @buf_len when in reliable mode).
+ * On success, the number of messages written to the socket will be returned,
+ * which may be less than @n_messages if transmission would have blocked
+ * part-way through. Zero will be returned if @n_messages is zero, or if
+ * transmission would have blocked on the first message.
  *
  * On failure, -1 will be returned and @error will be set. If the #NiceAgent is
  * reliable and the socket is not yet connected, %G_IO_ERROR_BROKEN_PIPE will be
@@ -690,18 +691,17 @@ nice_agent_send (
  * invalid or not yet connected, %G_IO_ERROR_BROKEN_PIPE will be returned.
  * %G_IO_ERROR_FAILED will be returned for other errors.
  *
- * Returns: the number of bytes sent (guaranteed to be greater than 0), or -1 on
- * error
+ * Returns: the number of messages sent (may be zero), or -1 on error
  *
  * Since: 0.1.5
  */
-gssize
-nice_agent_send_full (
+gint
+nice_agent_send_messages_nonblocking (
     NiceAgent *agent,
     guint stream_id,
     guint component_id,
-    const guint8 *buf,
-    gsize buf_len,
+    const NiceOutputMessage *messages,
+    guint n_messages,
     GCancellable *cancellable,
     GError **error);
 
index ba2be0b..f96e190 100644 (file)
@@ -348,7 +348,8 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
     GCancellable *cancellable, GError **error)
 {
   NiceOutputStream *self = NICE_OUTPUT_STREAM (stream);
-  gssize len = -1, _len;
+  gssize len = -1;
+  gint n_sent_messages;
   GError *child_error = NULL;
   NiceAgent *agent = NULL;  /* owned */
   gulong cancel_id = 0, writeable_id;
@@ -401,6 +402,11 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
 
 
   do {
+    GOutputVector local_buf = { (const guint8 *) buffer + len, count - len };
+    NiceOutputMessage local_message = {
+      &local_buf, 1, NULL, count - len
+    };
+
     /* Have to unlock while calling into the agent because
      * it will take the agent lock which will cause a deadlock if one of
      * the callbacks is called.
@@ -408,23 +414,24 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
     write_data->writable = FALSE;
     g_mutex_unlock (&write_data->mutex);
 
-    _len = nice_agent_send_full (agent, self->priv->stream_id,
-        self->priv->component_id, (guint8 *) buffer + len, count - len,
+    n_sent_messages = nice_agent_send_messages_nonblocking (agent,
+        self->priv->stream_id, self->priv->component_id, &local_message, 1,
         cancellable, &child_error);
+
     g_mutex_lock (&write_data->mutex);
 
-    if (_len == -1 &&
+    if (n_sent_messages == -1 &&
         g_error_matches (child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
       /* EWOULDBLOCK. */
       g_clear_error (&child_error);
       if (!write_data->writable && !write_data->error)
         g_cond_wait (&write_data->cond, &write_data->mutex);
-    } else if (_len > 0) {
+    } else if (n_sent_messages > 0) {
       /* Success. */
-      len += _len;
+      len = count;
     } else {
       /* Other error. */
-      len = _len;
+      len = n_sent_messages;
       break;
     }
   } while ((gsize) len < count);
@@ -517,7 +524,9 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
 {
   NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
   NiceAgent *agent;  /* owned */
-  gssize len;
+  GOutputVector local_buf = { buffer, count };
+  NiceOutputMessage local_message = { &local_buf, 1, NULL, count };
+  gint n_sent_messages;
 
   /* Closed streams are not writeable. */
   if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
@@ -544,12 +553,12 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
     return -1;
   }
 
-  len = nice_agent_send_full (agent, priv->stream_id, priv->component_id,
-      buffer, count, NULL, error);
+  n_sent_messages = nice_agent_send_messages_nonblocking (agent,
+      priv->stream_id, priv->component_id, &local_message, 1, NULL, error);
 
   g_object_unref (agent);
 
-  return len;
+  return (n_sent_messages == 1) ? (gssize) count : n_sent_messages;
 }
 
 static GSource *
index be7cfdc..02a14bd 100644 (file)
@@ -1875,3 +1875,15 @@ pseudo_tcp_socket_can_send (PseudoTcpSocket *self)
 
   return (pseudo_tcp_fifo_get_write_remaining (&priv->sbuf) != 0);
 }
+
+gsize
+pseudo_tcp_socket_get_available_send_space (PseudoTcpSocket *self)
+{
+  PseudoTcpSocketPrivate *priv = self->priv;
+
+  if (priv->state != TCP_ESTABLISHED) {
+    return 0;
+  }
+
+  return pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
+}
index a43c3d4..5082c15 100644 (file)
@@ -458,6 +458,18 @@ gint pseudo_tcp_socket_get_available_bytes (PseudoTcpSocket *self);
 
 gboolean pseudo_tcp_socket_can_send (PseudoTcpSocket *self);
 
+/**
+ * pseudo_tcp_socket_get_available_send_space:
+ * @self: The #PseudoTcpSocket object.
+ *
+ * Gets the number of bytes of space available in the transmission buffer.
+ *
+ * Returns: The numbero f bytes, or 0 if the connection is not established.
+ *
+ * Since: 0.1.5
+ */
+gsize pseudo_tcp_socket_get_available_send_space (PseudoTcpSocket *self);
+
 G_END_DECLS
 
 #endif /* _PSEUDOTCP_H */
index e8fb305..9b69411 100644 (file)
@@ -24,7 +24,7 @@ nice_agent_get_remote_candidates
 nice_agent_get_local_candidates
 nice_agent_get_selected_pair
 nice_agent_send
-nice_agent_send_full
+nice_agent_send_messages_nonblocking
 nice_agent_recv
 nice_agent_recv_messages
 nice_agent_recv_nonblocking
index 601afc6..b1d0d15 100644 (file)
@@ -42,7 +42,7 @@ nice_agent_parse_remote_stream_sdp
 nice_agent_remove_stream
 nice_agent_restart
 nice_agent_send
-nice_agent_send_full
+nice_agent_send_messages_nonblocking
 nice_agent_set_port_range
 nice_agent_set_relay_info
 nice_agent_set_remote_candidates
index 1dd1de4..4650ba1 100644 (file)
@@ -64,6 +64,9 @@
 #include <unistd.h>
 #endif
 
+/* Maximum IP payload ((1 << 16) - 1), minus IP header, minus UDP header. */
+#define MAX_MESSAGE_SIZE (65535 - 20 - 8) /* bytes */
+
 typedef enum {
   STREAM_AGENT,  /* nice_agent_[send|recv]() */
   STREAM_AGENT_NONBLOCKING,  /* nice_agent_[send|recv]_nonblocking() */
@@ -74,7 +77,7 @@ typedef enum {
 
 typedef enum {
   BUFFER_SIZE_CONSTANT_LARGE,  /* always 65535 bytes */
-  BUFFER_SIZE_CONSTANT_SMALL,  /* always 1024 bytes */
+  BUFFER_SIZE_CONSTANT_SMALL,  /* always 4096 bytes */
   BUFFER_SIZE_CONSTANT_TINY,  /* always 1 byte */
   BUFFER_SIZE_ASCENDING,  /* ascending powers of 2 */
   BUFFER_SIZE_RANDOM,  /* random every time */
@@ -118,6 +121,7 @@ typedef struct {
   } receive;
   BufferDataStrategy buffer_data_strategy;
   gsize n_bytes;
+  guint n_messages;
 
   /* Test state. */
   GRand *transmit_size_rand;
@@ -125,6 +129,9 @@ typedef struct {
   gsize transmitted_bytes;
   gsize received_bytes;
   gsize *other_received_bytes;
+  guint transmitted_messages;
+  guint received_messages;
+  guint *other_received_messages;
 } TestData;
 
 /* Whether @stream_api is blocking (vs. non-blocking). */
@@ -327,13 +334,18 @@ generate_buffer_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
  *
  * @max_buffer_size may be used to limit the total size of all the buffers in
  * all the messages, for example to avoid blocking on receiving data which will
- * never be sent.
+ * never be sent. This only applies for blocking, reliable stream APIs.
+ *
+ * @max_n_messages may be used to limit the number of messages generated, to
+ * avoid blocking on receiving messages which will never be sent. This only
+ * applies for blocking, non-reliable stream APIs.
  *
  * @messages must be freed with g_free(), as must all of the buffer arrays and
  * the buffers themselves. */
 static void
 generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
-    NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size)
+    NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size,
+    guint max_n_messages)
 {
   TestData *test_data = data->user_data;
   guint i;
@@ -342,6 +354,10 @@ generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
   *n_messages =
       generate_message_count (test_data->receive.message_count_strategy,
           test_data->receive_size_rand, buffer_offset);
+
+  if (!data->reliable)
+    *n_messages = MIN (*n_messages, max_n_messages);
+
   *messages = g_malloc_n (*n_messages, sizeof (NiceInputMessage));
 
   for (i = 0; i < *n_messages; i++) {
@@ -364,12 +380,12 @@ generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
               test_data->receive_size_rand, buffer_offset);
 
       /* Trim the buffer length if it would otherwise cause the API to block. */
-      if (data->reliable)
+      if (data->reliable) {
         buf_len = MIN (buf_len, max_buffer_size);
+        max_buffer_size -= buf_len;
+      }
 
-      max_buffer_size -= buf_len;
       buffer->size = buf_len;
-
       buffer->buffer = g_malloc (buffer->size);
 
       /* Fill it with poison to try and detect incorrect writes. */
@@ -377,7 +393,7 @@ generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
 
       /* If we’ve hit the max_buffer_size, adjust the buffer and message counts
        * and run away. */
-      if (max_buffer_size == 0) {
+      if (data->reliable && max_buffer_size == 0) {
         message->n_buffers = j + 1;
         *n_messages = i + 1;
         return;
@@ -402,12 +418,17 @@ validate_received_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
   if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
     g_assert_cmpint (len, ==, buf_len);
 
-  /* Validate the buffer contents. */
+  /* Validate the buffer contents.
+   *
+   * Note: Buffers can only be validated up to valid_len. The buffer may
+   * have been re-used internally (e.g. by receiving a STUN message, then
+   * overwriting it with a data packet), so we can’t guarantee that the
+   * bytes beyond valid_len have been untouched. */
   expected_buf = g_malloc (buf_len);
   memset (expected_buf, 0xaa, buf_len);
   generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
       expected_buf, len);
-  g_assert (memcmp (*buf, expected_buf, buf_len) == 0);
+  g_assert (memcmp (*buf, expected_buf, len) == 0);
   g_free (expected_buf);
 
   test_data->received_bytes += len;
@@ -431,6 +452,8 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
   if (stream_api_is_blocking (test_data->stream_api))
     g_assert_cmpint (n_valid_messages, ==, n_messages);
 
+  test_data->received_messages += n_valid_messages;
+
   /* Validate the message contents. */
   for (i = 0; i < (guint) n_valid_messages; i++) {
     NiceInputMessage *message = &((*messages)[i]);
@@ -445,6 +468,7 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
       guint8 *expected_buf;
       gsize valid_len;
 
+      /* See note above about valid_len. */
       total_buf_len += buffer->size;
       valid_len = MIN (message_len_remaining, buffer->size);
 
@@ -452,7 +476,7 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
       memset (expected_buf, 0xaa, buffer->size);
       generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
           expected_buf, valid_len);
-      g_assert (memcmp (buffer->buffer, expected_buf, buffer->size) == 0);
+      g_assert (memcmp (buffer->buffer, expected_buf, valid_len) == 0);
       g_free (expected_buf);
 
       test_data->received_bytes += valid_len;
@@ -471,7 +495,7 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
     prev_message_len = message->length;
 
     /* If the API was blocking, it should have completely filled the message. */
-    if (stream_api_is_blocking (test_data->stream_api))
+    if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
       g_assert_cmpuint (message->length, ==, total_buf_len);
 
     g_assert (message->from == NULL);
@@ -501,6 +525,79 @@ generate_buffer_to_transmit (TestIOStreamThreadData *data, gsize buffer_offset,
       *buf, *buf_len);
 }
 
+/* Similar to generate_buffer_to_transmit(), except that it generates an array
+ * of NiceOutputMessages rather than a single buffer. */
+static void
+generate_messages_to_transmit (TestIOStreamThreadData *data,
+    gsize buffer_offset, NiceOutputMessage **messages, guint *n_messages)
+{
+  TestData *test_data = data->user_data;
+  guint i;
+  gsize total_buf_len = 0;
+
+  /* Determine the number of messages to send. */
+  *n_messages =
+      generate_message_count (test_data->transmit.message_count_strategy,
+          test_data->transmit_size_rand, buffer_offset);
+  *n_messages =
+      MIN (*n_messages,
+          test_data->n_messages - test_data->transmitted_messages);
+
+  *messages = g_malloc_n (*n_messages, sizeof (NiceOutputMessage));
+
+  for (i = 0; i < *n_messages; i++) {
+    NiceOutputMessage *message = &((*messages)[i]);
+    guint j;
+    gsize max_message_size;
+
+    message->n_buffers =
+        generate_buffer_count (test_data->transmit.buffer_count_strategy,
+            test_data->transmit_size_rand, buffer_offset);
+    message->buffers = g_malloc_n (message->n_buffers, sizeof (GOutputVector));
+    message->to = NULL;
+    message->length = 0;
+
+    /* Limit the overall message size to the smaller of (n_bytes / n_messages)
+     * and MAX_MESSAGE_SIZE, to ensure each message is non-empty. */
+    max_message_size =
+        MIN ((test_data->n_bytes / test_data->n_messages), MAX_MESSAGE_SIZE);
+
+    for (j = 0; j < (guint) message->n_buffers; j++) {
+      GOutputVector *buffer = &message->buffers[j];
+      gsize buf_len;
+      guint8 *buf;
+
+      buf_len =
+          generate_buffer_size (test_data->transmit.buffer_size_strategy,
+              test_data->transmit_size_rand, buffer_offset);
+      buf_len =
+          MIN (buf_len,
+              test_data->n_bytes - test_data->transmitted_bytes - total_buf_len);
+      buf_len = MIN (buf_len, max_message_size - message->length);
+
+      buffer->size = buf_len;
+      buf = g_malloc (buffer->size);
+      buffer->buffer = buf;
+      message->length += buf_len;
+      total_buf_len += buf_len;
+
+      /* Fill it with data. */
+      generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
+          buf, buf_len);
+
+      buffer_offset += buf_len;
+
+      /* Reached the maximum UDP payload size? */
+      if (message->length >= max_message_size) {
+        message->n_buffers = j + 1;
+        break;
+      }
+    }
+
+    g_assert_cmpuint (message->length, <=, max_message_size);
+  }
+}
+
 /* Validate the number of bytes transmitted, and update the test’s internal
  * state machine. Consumes @buf. */
 static void
@@ -517,6 +614,39 @@ notify_transmitted_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
   g_free (*buf);
 }
 
+/* Similar to notify_transmitted_buffer(), except it operates on an array of
+ * messages from generate_messages_to_transmit(). */
+static void
+notify_transmitted_messages (TestIOStreamThreadData *data, gsize buffer_offset,
+    NiceOutputMessage **messages, guint n_messages, gint n_sent_messages)
+{
+  TestData *test_data = data->user_data;
+  guint i;
+
+  g_assert_cmpint (n_sent_messages, <=, n_messages);
+  g_assert_cmpint (n_sent_messages, >=, 0);
+
+  test_data->transmitted_messages += n_sent_messages;
+
+  for (i = 0; i < n_messages; i++) {
+    NiceOutputMessage *message = &((*messages)[i]);
+    guint j;
+
+    if (i < (guint) n_sent_messages)
+      test_data->transmitted_bytes += message->length;
+
+    for (j = 0; j < (guint) message->n_buffers; j++) {
+      GOutputVector *buffer = &message->buffers[j];
+
+      g_free ((guint8 *) buffer->buffer);
+    }
+
+    g_free (message->buffers);
+  }
+
+  g_free (*messages);
+}
+
 /*
  * Implementation using nice_agent_recv_messages() and nice_agent_send().
  */
@@ -539,7 +669,8 @@ read_thread_agent_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
 
     /* Initialise an array of messages to receive into. */
     generate_messages_to_receive (data, test_data->received_bytes, &messages,
-        &n_messages, test_data->n_bytes - test_data->received_bytes);
+        &n_messages, test_data->n_bytes - test_data->received_bytes,
+        test_data->n_messages - test_data->received_messages);
 
     /* Block on receiving some data. */
     n_valid_messages = nice_agent_recv_messages (data->agent, stream_id,
@@ -570,37 +701,26 @@ write_thread_agent_cb (GOutputStream *output_stream,
 
   while (test_data->transmitted_bytes < test_data->n_bytes) {
     GError *error = NULL;
-    guint8 *buf = NULL;
-    gsize buf_len = 0;
-    gssize _len;
-    gssize len = 0;
+    NiceOutputMessage *messages;
+    guint n_messages;
+    gint n_sent_messages;
 
     /* Generate a buffer to transmit. */
-    generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf,
-        &buf_len);
+    generate_messages_to_transmit (data, test_data->transmitted_bytes,
+        &messages, &n_messages);
 
-    /* Transmit it. */
+    /* Busy loop on receiving some data. */
     do {
-      _len = nice_agent_send_full (data->agent, stream_id, component_id,
-          buf + len, buf_len - len, NULL, &error);
-
-      /* Busy loop on EWOULDBLOCK. */
-      if (_len == -1 &&
-          g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-        g_clear_error (&error);
-        continue;
-      } else if (_len > 0) {
-        len += _len;
-      } else {
-        len = _len;
-      }
-
-      g_assert_no_error (error);
-    } while (len != -1 && (gsize) len < buf_len);
+      g_clear_error (&error);
+      n_sent_messages = nice_agent_send_messages_nonblocking (data->agent,
+          stream_id, component_id, messages, n_messages, NULL, &error);
+    } while (n_sent_messages == -1 &&
+        g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
+    g_assert_no_error (error);
 
     /* Update the test’s buffer generation state machine. */
-    notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf,
-        buf_len, len);
+    notify_transmitted_messages (data, test_data->transmitted_bytes, &messages,
+        n_messages, n_sent_messages);
   }
 }
 
@@ -628,7 +748,12 @@ read_thread_agent_nonblocking_cb (GInputStream *input_stream,
 
     /* Initialise an array of messages to receive into. */
     generate_messages_to_receive (data, test_data->received_bytes, &messages,
-        &n_messages, test_data->n_bytes - test_data->received_bytes);
+        &n_messages, test_data->n_bytes - test_data->received_bytes,
+        test_data->n_messages - test_data->received_messages);
+
+    /* Trim n_messages to avoid consuming the ‘done’ message. */
+    n_messages =
+        MIN (n_messages, test_data->n_messages - test_data->received_messages);
 
     /* Busy loop on receiving some data. */
     do {
@@ -889,18 +1014,21 @@ write_thread_gsource_cb (GOutputStream *output_stream,
 
 static void
 test_data_init (TestData *data, gboolean reliable, StreamApi stream_api,
-    gsize n_bytes, BufferSizeStrategy transmit_buffer_size_strategy,
+    gsize n_bytes, guint n_messages,
+    BufferSizeStrategy transmit_buffer_size_strategy,
     BufferCountStrategy transmit_buffer_count_strategy,
     MessageCountStrategy transmit_message_count_strategy,
     BufferSizeStrategy receive_buffer_size_strategy,
     BufferCountStrategy receive_buffer_count_strategy,
     MessageCountStrategy receive_message_count_strategy,
     BufferDataStrategy buffer_data_strategy, guint32 transmit_seed,
-    guint32 receive_seed, gsize *other_received_bytes)
+    guint32 receive_seed, gsize *other_received_bytes,
+    guint *other_received_messages)
 {
   data->reliable = reliable;
   data->stream_api = stream_api;
   data->n_bytes = n_bytes;
+  data->n_messages = n_messages;
   data->transmit.buffer_size_strategy = transmit_buffer_size_strategy;
   data->transmit.buffer_count_strategy = transmit_buffer_count_strategy;
   data->transmit.message_count_strategy = transmit_message_count_strategy;
@@ -913,6 +1041,9 @@ test_data_init (TestData *data, gboolean reliable, StreamApi stream_api,
   data->transmitted_bytes = 0;
   data->received_bytes = 0;
   data->other_received_bytes = other_received_bytes;
+  data->transmitted_messages = 0;
+  data->received_messages = 0;
+  data->other_received_messages = other_received_messages;
 }
 
 /*
@@ -926,7 +1057,7 @@ test_data_clear (TestData *data)
 }
 
 static void
-test (gboolean reliable, StreamApi stream_api, gsize n_bytes,
+test (gboolean reliable, StreamApi stream_api, gsize n_bytes, guint n_messages,
     BufferSizeStrategy transmit_buffer_size_strategy,
     BufferCountStrategy transmit_buffer_count_strategy,
     MessageCountStrategy transmit_message_count_strategy,
@@ -950,18 +1081,18 @@ test (gboolean reliable, StreamApi stream_api, gsize n_bytes,
       NULL, NULL },  /* STREAM_GSOURCE */
   };
 
-  test_data_init (&l_data, reliable, stream_api, n_bytes,
+  test_data_init (&l_data, reliable, stream_api, n_bytes, n_messages,
       transmit_buffer_size_strategy, transmit_buffer_count_strategy,
       transmit_message_count_strategy, receive_buffer_size_strategy,
       receive_buffer_count_strategy, receive_message_count_strategy,
       buffer_data_strategy, transmit_seed, receive_seed,
-      &r_data.received_bytes);
-  test_data_init (&r_data, reliable, stream_api, n_bytes,
+      &r_data.received_bytes, &r_data.received_messages);
+  test_data_init (&r_data, reliable, stream_api, n_bytes, n_messages,
       transmit_buffer_size_strategy, transmit_buffer_count_strategy,
       transmit_message_count_strategy, receive_buffer_size_strategy,
       receive_buffer_count_strategy, receive_message_count_strategy,
       buffer_data_strategy, transmit_seed, receive_seed,
-      &l_data.received_bytes);
+      &l_data.received_bytes, &l_data.received_messages);
 
   run_io_stream_test (deadlock_timeout, reliable, &callbacks[stream_api],
       &l_data, NULL, &r_data, NULL);
@@ -973,7 +1104,8 @@ test (gboolean reliable, StreamApi stream_api, gsize n_bytes,
 /* Options with default values. */
 guint32 option_transmit_seed = 0;
 guint32 option_receive_seed = 0;
-gsize option_n_bytes = 100000;
+gsize option_n_bytes = 10000;
+guint option_n_messages = 50;
 guint option_timeout = 1200;  /* seconds */
 gboolean option_long_mode = FALSE;
 
@@ -983,7 +1115,9 @@ static GOptionEntry entries[] = {
   { "receive-seed", 0, 0, G_OPTION_ARG_INT, &option_receive_seed,
     "Seed for reception RNG", "S" },
   { "n-bytes", 'n', 0, G_OPTION_ARG_INT64, &option_n_bytes,
-    "Number of bytes to send in each test (default 100000)", "N" },
+    "Number of bytes to send in each test (default 10000)", "N" },
+  { "n-messages", 'm', 0, G_OPTION_ARG_INT64, &option_n_messages,
+    "Number of messages to send in each test (default 50)", "M" },
   { "timeout", 't', 0, G_OPTION_ARG_INT, &option_timeout,
     "Deadlock detection timeout length, in seconds (default: 1200)", "S" },
   { "long-mode", 'l', 0, G_OPTION_ARG_NONE, &option_long_mode,
@@ -1006,6 +1140,7 @@ main (int argc, char *argv[])
   guint32 transmit_seed;
   guint32 receive_seed;
   gsize n_bytes;
+  guint n_messages;
   guint deadlock_timeout;
   gboolean long_mode;
   GOptionContext *context;
@@ -1026,6 +1161,7 @@ main (int argc, char *argv[])
   transmit_seed = option_transmit_seed;
   receive_seed = option_receive_seed;
   n_bytes = option_n_bytes;
+  n_messages = option_n_messages;
   deadlock_timeout = option_timeout;
   long_mode = option_long_mode;
 
@@ -1073,12 +1209,14 @@ main (int argc, char *argv[])
           receive_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
         }
 
-        g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, "
+        g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
             "%u, %u, %u, %u)…",
-            reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
+            reliable, stream_api, n_bytes, n_messages,
+            transmit_buffer_size_strategy,
             receive_buffer_size_strategy, buffer_data_strategy,
             transmit_seed, receive_seed);
-        test (reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
+        test (reliable, stream_api, n_bytes, n_messages,
+            transmit_buffer_size_strategy,
             transmit_buffer_count_strategy, transmit_message_count_strategy,
             receive_buffer_size_strategy, receive_buffer_count_strategy,
             receive_message_count_strategy, buffer_data_strategy,
@@ -1129,14 +1267,16 @@ main (int argc, char *argv[])
            receive_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE))
         continue;
 
-      g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, "
+      g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
           "%u, %u, %u, %u, %u, %u, %u, %u)…",
-          reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
+          reliable, stream_api, n_bytes, n_messages,
+          transmit_buffer_size_strategy,
           transmit_buffer_count_strategy, transmit_message_count_strategy,
           receive_buffer_size_strategy, receive_buffer_count_strategy,
           receive_message_count_strategy, buffer_data_strategy,
           transmit_seed, receive_seed);
-      test (reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
+      test (reliable, stream_api, n_bytes, n_messages,
+          transmit_buffer_size_strategy,
           transmit_buffer_count_strategy, transmit_message_count_strategy,
           receive_buffer_size_strategy, receive_buffer_count_strategy,
           receive_message_count_strategy, buffer_data_strategy,