Factor out the add_to_be_sent into a helper API for sockets
authorYouness Alaoui <youness.alaoui@collabora.co.uk>
Thu, 10 Apr 2014 02:13:08 +0000 (22:13 -0400)
committerOlivier Crête <olivier.crete@ocrete.ca>
Thu, 15 May 2014 13:44:00 +0000 (09:44 -0400)
The add_to_be_sent was duplicated in http/socks5/pseudossl/tcp-bsd and
had some small differences. It's better to factor it out so bug fixes
get applied to all of them and code is cleaner.

socket/Makefile.am
socket/http.c
socket/pseudossl.c
socket/socket-priv.h [new file with mode: 0644]
socket/socket.c
socket/socks5.c
socket/tcp-bsd.c

index e464e0f..6ebae37 100644 (file)
@@ -21,6 +21,7 @@ noinst_LTLIBRARIES = libsocket.la
 
 libsocket_la_SOURCES = \
        socket.h \
+       socket-priv.h \
        socket.c \
        udp-bsd.h \
        udp-bsd.c \
index 0268fbb..40ee697 100644 (file)
@@ -44,6 +44,7 @@
 
 #include "http.h"
 #include "agent-priv.h"
+#include "socket-priv.h"
 
 #include <string.h>
 #include <stdlib.h>
@@ -83,13 +84,6 @@ typedef struct {
 } HttpPriv;
 
 
-struct to_be_sent {
-  guint length;
-  gchar *buf;
-  NiceAddress to;
-};
-
-
 static void socket_close (NiceSocket *sock);
 static gint socket_recv_messages (NiceSocket *sock,
     NiceInputMessage *recv_messages, guint n_recv_messages);
@@ -99,11 +93,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
     const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
 static gboolean socket_is_reliable (NiceSocket *sock);
 
-static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
-    const NiceOutputMessage *messages, guint n_messages);
-static void free_to_be_sent (struct to_be_sent *tbs);
-
-
 NiceSocket *
 nice_http_socket_new (NiceSocket *base_socket,
     NiceAddress *addr, gchar *username, gchar *password)
@@ -198,8 +187,7 @@ socket_close (NiceSocket *sock)
   if (priv->recv_buf)
     g_free (priv->recv_buf);
 
-  g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
-  g_queue_clear (&priv->send_queue);
+  nice_socket_free_send_queue (&priv->send_queue);
 
   g_slice_free(HttpPriv, sock->priv);
 }
@@ -542,19 +530,13 @@ retry:
     case HTTP_STATE_CONNECTED:
       {
         gsize len;
-        struct to_be_sent *tbs = NULL;
 
         len = memcpy_ring_buffer_to_input_messages (priv,
             recv_messages, n_recv_messages);
 
         /* Send the pending data */
-        while ((tbs = g_queue_pop_head (&priv->send_queue))) {
-          /* We only queue reliable data */
-          nice_socket_send_reliable (priv->base_socket, &tbs->to,
-              tbs->length, tbs->buf);
-          g_free (tbs->buf);
-          g_slice_free (struct to_be_sent, tbs);
-        }
+        nice_socket_flush_send_queue (priv->base_socket,
+            &priv->send_queue);
 
         return len;
       }
@@ -616,7 +598,7 @@ socket_send_messages_reliable (NiceSocket *sock, const NiceAddress *to,
   } else if (priv->state == HTTP_STATE_ERROR) {
     return -1;
   } else {
-    add_to_be_sent (sock, to, messages, n_messages);
+    nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
   }
 
   return n_messages;
@@ -629,55 +611,3 @@ socket_is_reliable (NiceSocket *sock)
 
   return nice_socket_is_reliable (priv->base_socket);
 }
-
-
-static void
-add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
-    const NiceOutputMessage *messages, guint n_messages)
-{
-  HttpPriv *priv = sock->priv;
-  guint i;
-
-  if (n_messages == 0)
-    return;
-
-  /* Compact the message’s buffers before queueing. */
-  for (i = 0; i < n_messages; i++) {
-    const NiceOutputMessage *message = &messages[i];
-    struct to_be_sent *tbs = NULL;
-    guint j;
-    gsize message_len_remaining = output_message_get_size (message);
-    gsize offset = 0;
-
-    if (message_len_remaining == 0)
-      continue;
-
-    tbs = g_slice_new0 (struct to_be_sent);
-    tbs->buf = g_malloc (message_len_remaining);
-    tbs->length = message_len_remaining;
-    if (to)
-      tbs->to = *to;
-    g_queue_push_tail (&priv->send_queue, tbs);
-
-    for (j = 0;
-         (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
-         (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
-         j++) {
-      const GOutputVector *buffer = &message->buffers[j];
-      gsize len;
-
-      len = MIN (buffer->size, message_len_remaining);
-      memcpy (tbs->buf + offset, buffer->buffer, len);
-      message_len_remaining -= len;
-      offset += len;
-    }
-  }
-}
-
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
-  g_free (tbs->buf);
-  g_slice_free (struct to_be_sent, tbs);
-}
index a0253de..c713b2f 100644 (file)
@@ -44,6 +44,7 @@
 
 #include "pseudossl.h"
 #include "agent-priv.h"
+#include "socket-priv.h"
 
 #include <string.h>
 
@@ -59,13 +60,6 @@ typedef struct {
 } PseudoSSLPriv;
 
 
-struct to_be_sent {
-  guint8 *buf;  /* owned */
-  gsize length;
-  NiceAddress to;
-};
-
-
 static const gchar SSL_SERVER_GOOGLE_HANDSHAKE[] = {
   0x16, 0x03, 0x01, 0x00, 0x4a, 0x02, 0x00, 0x00,
   0x46, 0x03, 0x01, 0x42, 0x85, 0x45, 0xa7, 0x27,
@@ -120,11 +114,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
     const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
 static gboolean socket_is_reliable (NiceSocket *sock);
 
-static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
-    const NiceOutputMessage *messages, guint n_messages);
-static void free_to_be_sent (struct to_be_sent *tbs);
-
-
 NiceSocket *
 nice_pseudossl_socket_new (NiceSocket *base_socket,
     NicePseudoSSLSocketCompatibility compatibility)
@@ -176,8 +165,7 @@ socket_close (NiceSocket *sock)
   if (priv->base_socket)
     nice_socket_free (priv->base_socket);
 
-  g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
-  g_queue_clear (&priv->send_queue);
+  nice_socket_free_send_queue (&priv->send_queue);
 
   g_slice_free(PseudoSSLPriv, sock->priv);
 }
@@ -238,15 +226,8 @@ socket_recv_messages (NiceSocket *sock,
     if (ret <= 0) {
       return ret;
     } else if (ret == 1 && server_handshake_valid(sock, &local_recv_buf)) {
-      struct to_be_sent *tbs = NULL;
       priv->handshaken = TRUE;
-      while ((tbs = g_queue_pop_head (&priv->send_queue))) {
-        /* We only queue reliable data */
-        nice_socket_send_reliable (priv->base_socket, &tbs->to, tbs->length,
-            (const gchar *) tbs->buf);
-        g_free (tbs->buf);
-        g_slice_free (struct to_be_sent, tbs);
-      }
+      nice_socket_flush_send_queue (priv->base_socket, &priv->send_queue);
     } else {
       if (priv->base_socket)
         nice_socket_free (priv->base_socket);
@@ -294,7 +275,7 @@ socket_send_messages_reliable (NiceSocket *sock, const NiceAddress *to,
     return nice_socket_send_messages_reliable (priv->base_socket, to, messages,
         n_messages);
   } else {
-    add_to_be_sent (sock, to, messages, n_messages);
+    nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
   }
   return n_messages;
 }
@@ -306,52 +287,3 @@ socket_is_reliable (NiceSocket *sock)
 
   return nice_socket_is_reliable (priv->base_socket);
 }
-
-
-static void
-add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
-    const NiceOutputMessage *messages, guint n_messages)
-{
-  PseudoSSLPriv *priv = sock->priv;
-  guint i;
-
-  for (i = 0; i < n_messages; i++) {
-    struct to_be_sent *tbs;
-    const NiceOutputMessage *message = &messages[i];
-    guint j;
-    gsize offset = 0;
-    gsize message_len;
-
-    tbs = g_slice_new0 (struct to_be_sent);
-
-    message_len = output_message_get_size (message);
-
-   /* Compact the buffer. */
-    tbs->buf = g_malloc (message_len);
-    tbs->length = message_len;
-    if (to != NULL)
-      tbs->to = *to;
-    g_queue_push_tail (&priv->send_queue, tbs);
-
-    for (j = 0;
-         (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
-         (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
-         j++) {
-      const GOutputVector *buffer = &message->buffers[j];
-      gsize len;
-
-      len = MIN (message_len - offset, buffer->size);
-      memcpy (tbs->buf + offset, buffer->buffer, len);
-      offset += len;
-    }
-
-    g_assert (offset == message_len);
-  }
-}
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
-  g_free (tbs->buf);
-  g_slice_free (struct to_be_sent, tbs);
-}
diff --git a/socket/socket-priv.h b/socket/socket-priv.h
new file mode 100644 (file)
index 0000000..10286e3
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * This file is part of the Nice GLib ICE library.
+ *
+ * (C) 2008-2009 Collabora Ltd.
+ *  Contact: Youness Alaoui
+ * (C) 2008-2009 Nokia Corporation. All rights reserved.
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is the Nice GLib ICE library.
+ *
+ * The Initial Developers of the Original Code are Collabora Ltd and Nokia
+ * Corporation. All Rights Reserved.
+ *
+ * Contributors:
+ *   Youness Alaoui, Collabora Ltd.
+ *
+ * Alternatively, the contents of this file may be used under the terms of the
+ * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
+ * case the provisions of LGPL are applicable instead of those above. If you
+ * wish to allow use of your version of this file only under the terms of the
+ * LGPL and not to allow others to use your version of this file under the
+ * MPL, indicate your decision by deleting the provisions above and replace
+ * them with the notice and other provisions required by the LGPL. If you do
+ * not delete the provisions above, a recipient may use your version of this
+ * file under either the MPL or the LGPL.
+ */
+
+#ifndef _SOCKET_PRIV_H
+#define _SOCKET_PRIV_H
+
+#include "socket.h"
+
+G_BEGIN_DECLS
+
+/**
+ * nice_socket_queue_send:
+ * @send_queue: The queue to add to
+ * @to : Destination
+ * @messages: Messages to queue
+ * @n_messages: Number of messages to queue
+ *
+ * Queue messages to be sent later into the GQueue
+ */
+void nice_socket_queue_send (GQueue *send_queue, const NiceAddress *to,
+    const NiceOutputMessage *messages, guint n_messages);
+
+/**
+ * nice_socket_queue_send_with_callback:
+ * @send_queue: The queue to add to
+ * @message: The message to queue
+ * @message_offset: Number of bytes to skip in the message
+ * @message_len: Total length of the message
+ * @head: Whether to add the message to the head of the queue or the tail
+ * @gsock: The #GSocket to create the callback on
+ * @io_source: Pointer to #GSource pointer to store the created source
+ * @context: #GMainContext to attach the @io_source to
+ * @cb: Callback function to call when the @gsock is writable
+ * @user_data: User data for @cb
+ *
+ * Queue (partial) message to be sent later and create a source to call @cb
+ * when the @gsock becomes writable.
+ * The @message_offset can be used if a partial write happened and some bytes
+ * were already written, in which case @head should be set to TRUE to add the
+ * message to the head of the queue.
+ */
+void nice_socket_queue_send_with_callback (GQueue *send_queue,
+    const NiceOutputMessage *message, gsize message_offset, gsize message_len,
+    gboolean head, GSocket *gsock, GSource **io_source, GMainContext *context,
+    GSourceFunc cb, gpointer user_data);
+
+/**
+ * nice_socket_flush_send_queue:
+ * @base_socket: Base socket to send on
+ * @send_queue: Queue to flush
+ *
+ * Send all the queued messages reliably to the base socket. We assume only
+ * reliable messages were queued and the underlying socket will handle the
+ * send.
+ */
+void nice_socket_flush_send_queue (NiceSocket *base_socket, GQueue *send_queue);
+
+/**
+ * nice_socket_flush_send_queue_to_socket:
+ * @gsock: GSocket to send on
+ * @send_queue: Queue to flush
+ *
+ * Send all the queued messages to the socket. If any message fails to be sent
+ * it will be readded to the queue and #FALSE will be returned, in which case
+ * the IO source must be kept to allow flushing the next time the socket
+ * is writable.
+ * If the queue gets flushed, #TRUE will be returned, in which case, the IO
+ * source should be destroyed.
+ *
+ * Returns: #TRUE if the queue was emptied, #FALSE if the socket would block.
+ */
+gboolean nice_socket_flush_send_queue_to_socket (GSocket *gsock,
+    GQueue *send_queue);
+
+/**
+ * nice_socket_free_send_queue:
+ * @send_queue: The send queue
+ *
+ * Frees every item in the send queue without sending them and empties the queue
+ */
+void nice_socket_free_send_queue (GQueue *send_queue);
+
+G_END_DECLS
+
+#endif /* _SOCKET_PRIV_H */
+
index d7ce0a6..8c78a42 100644 (file)
 #include <glib.h>
 
 #include "socket.h"
+#include "socket-priv.h"
+#include "agent-priv.h"
 
+#include <string.h>
+
+#ifndef G_OS_WIN32
+#include <unistd.h>
+#endif
+
+typedef struct _NiceSocketQueuedSend NiceSocketQueuedSend;
+
+struct _NiceSocketQueuedSend {
+  guint8 *buf;  /* owned */
+  gsize length;
+  NiceAddress to;
+};
 
 /**
  * nice_socket_recv_messages:
@@ -225,3 +240,175 @@ nice_socket_free (NiceSocket *sock)
     g_slice_free (NiceSocket,sock);
   }
 }
+
+static void
+nice_socket_free_queued_send (NiceSocketQueuedSend *tbs)
+{
+  g_free (tbs->buf);
+  g_slice_free (NiceSocketQueuedSend, tbs);
+}
+
+void
+nice_socket_queue_send (GQueue *send_queue, const NiceAddress *to,
+    const NiceOutputMessage *messages, guint n_messages)
+{
+  guint i;
+
+  if (n_messages == 0)
+    return;
+
+  /* Compact the message’s buffers before queueing. */
+  for (i = 0; i < n_messages; i++) {
+    NiceSocketQueuedSend *tbs;
+    const NiceOutputMessage *message = &messages[i];
+    gsize message_len_remaining = output_message_get_size (message);
+    guint j;
+    gsize offset = 0;
+
+    if (message_len_remaining == 0)
+      continue;
+
+    /* Compact the buffer. */
+    tbs = g_slice_new0 (NiceSocketQueuedSend);
+    tbs->buf = g_malloc (message_len_remaining);
+    tbs->length = message_len_remaining;
+
+    if (to)
+      tbs->to = *to;
+    else
+      memset (&tbs->to, 0, sizeof(NiceAddress));
+    g_queue_push_tail (send_queue, tbs);
+
+    for (j = 0;
+         (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
+         (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
+         j++) {
+      const GOutputVector *buffer = &message->buffers[j];
+      gsize len;
+
+      len = MIN (buffer->size, message_len_remaining);
+      memcpy (tbs->buf + offset, buffer->buffer, len);
+      message_len_remaining -= len;
+      offset += len;
+    }
+
+    g_assert (offset == tbs->length);
+  }
+}
+
+void nice_socket_queue_send_with_callback (GQueue *send_queue,
+    const NiceOutputMessage *message, gsize message_offset, gsize message_len,
+    gboolean head, GSocket *gsock, GSource **io_source, GMainContext *context,
+    GSourceFunc cb, gpointer user_data)
+{
+  NiceSocketQueuedSend *tbs;
+  guint j;
+  gsize offset = 0;
+
+  if (message_offset >= message_len)
+    return;
+
+  tbs = g_slice_new0 (NiceSocketQueuedSend);
+  tbs->length = message_len - message_offset;
+  tbs->buf = g_malloc (tbs->length);
+
+  if (head)
+    g_queue_push_head (send_queue, tbs);
+  else
+    g_queue_push_tail (send_queue, tbs);
+
+  if (io_source && gsock && context && cb && *io_source == NULL) {
+    *io_source = g_socket_create_source(gsock, G_IO_OUT, NULL);
+    g_source_set_callback (*io_source, (GSourceFunc) cb, user_data, NULL);
+    g_source_attach (*io_source, context);
+  }
+
+  /* Move the data into the buffer. */
+  for (j = 0;
+       (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
+       (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
+       j++) {
+    const GOutputVector *buffer = &message->buffers[j];
+    gsize len;
+
+    /* Skip this buffer if it’s within @message_offset. */
+    if (buffer->size <= message_offset) {
+      message_offset -= buffer->size;
+      continue;
+    }
+
+    len = MIN (tbs->length - offset, buffer->size - message_offset);
+    memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
+    offset += len;
+    if (message_offset >= len)
+      message_offset -= len;
+    else
+      message_offset = 0;
+  }
+}
+
+void nice_socket_flush_send_queue (NiceSocket *base_socket, GQueue *send_queue)
+{
+  NiceSocketQueuedSend *tbs;
+
+  while ((tbs = g_queue_pop_head (send_queue))) {
+    NiceAddress *to = &tbs->to;
+
+    if (!nice_address_is_valid (to))
+      to = NULL;
+
+    /* We only queue reliable data */
+    nice_socket_send_reliable (base_socket, to,
+        tbs->length, (const gchar *) tbs->buf);
+    nice_socket_free_queued_send (tbs);
+  }
+}
+
+gboolean nice_socket_flush_send_queue_to_socket (GSocket *gsock,
+    GQueue *send_queue)
+{
+  NiceSocketQueuedSend *tbs;
+  GError *gerr = NULL;
+
+
+  while ((tbs = g_queue_pop_head (send_queue)) != NULL) {
+    int ret;
+
+    GOutputVector local_bufs = { tbs->buf, tbs->length };
+    ret = g_socket_send_message (gsock, NULL, &local_bufs, 1, NULL, 0,
+        G_SOCKET_MSG_NONE, NULL, &gerr);
+
+    if (ret < 0) {
+      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+        GOutputVector local_buf = { tbs->buf, tbs->length };
+        NiceOutputMessage local_message = {&local_buf, 1};
+
+        nice_socket_queue_send_with_callback (send_queue, &local_message,
+            0, local_buf.size, TRUE, NULL, NULL, NULL, NULL, NULL);
+        nice_socket_free_queued_send (tbs);
+        g_error_free (gerr);
+        return FALSE;
+      }
+      g_clear_error (&gerr);
+    } else if (ret < (int) tbs->length) {
+      GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
+      NiceOutputMessage local_message = {&local_buf, 1};
+
+      nice_socket_queue_send_with_callback (send_queue, &local_message,
+          0, local_buf.size, TRUE, NULL, NULL, NULL, NULL, NULL);
+      nice_socket_free_queued_send (tbs);
+      return FALSE;
+    }
+
+    nice_socket_free_queued_send (tbs);
+  }
+
+  return TRUE;
+}
+
+void
+nice_socket_free_send_queue (GQueue *send_queue)
+{
+  g_queue_foreach (send_queue, (GFunc) nice_socket_free_queued_send, NULL);
+  g_queue_clear (send_queue);
+}
index af2d3d0..3d44e8e 100644 (file)
@@ -44,6 +44,7 @@
 
 #include "socks5.h"
 #include "agent-priv.h"
+#include "socket-priv.h"
 
 #include <string.h>
 
@@ -69,13 +70,6 @@ typedef struct {
 } Socks5Priv;
 
 
-struct to_be_sent {
-  guint8 *buf;  /* owned */
-  gsize length;
-  NiceAddress to;
-};
-
-
 static void socket_close (NiceSocket *sock);
 static gint socket_recv_messages (NiceSocket *sock,
     NiceInputMessage *recv_messages, guint n_recv_messages);
@@ -85,10 +79,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
     const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
 static gboolean socket_is_reliable (NiceSocket *sock);
 
-static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
-    const NiceOutputMessage *messages, guint n_messages);
-static void free_to_be_sent (struct to_be_sent *tbs);
-
 
 NiceSocket *
 nice_socks5_socket_new (NiceSocket *base_socket,
@@ -157,8 +147,7 @@ socket_close (NiceSocket *sock)
   if (priv->password)
     g_free (priv->password);
 
-  g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
-  g_queue_clear (&priv->send_queue);
+  nice_socket_free_send_queue (&priv->send_queue);
 
   g_slice_free(Socks5Priv, sock->priv);
 }
@@ -309,7 +298,6 @@ socket_recv_messages (NiceSocket *sock,
             switch (data[1]) {
               case 0x00:
                 if (data[2] == 0x00) {
-                  struct to_be_sent *tbs = NULL;
                   switch (data[3]) {
                     case 0x01: /* IPV4 bound address */
                       local_recv_buf.size = 6;
@@ -333,13 +321,8 @@ socket_recv_messages (NiceSocket *sock,
                       /* Unsupported address type */
                       goto error;
                   }
-                  while ((tbs = g_queue_pop_head (&priv->send_queue))) {
-                    /* We only queue reliable data */
-                    nice_socket_send_reliable (priv->base_socket, &tbs->to,
-                        tbs->length, (const gchar *) tbs->buf);
-                    g_free (tbs->buf);
-                    g_slice_free (struct to_be_sent, tbs);
-                  }
+                  nice_socket_flush_send_queue (priv->base_socket,
+                      &priv->send_queue);
                   priv->state = SOCKS_STATE_CONNECTED;
                 } else {
                   /* Wrong reserved value */
@@ -462,7 +445,7 @@ socket_send_messages_reliable (NiceSocket *sock, const NiceAddress *to,
   } else if (priv->state == SOCKS_STATE_ERROR) {
     return -1;
   } else {
-    add_to_be_sent (sock, to, messages, n_messages);
+    nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
   }
   return n_messages;
 }
@@ -476,49 +459,3 @@ socket_is_reliable (NiceSocket *sock)
   return nice_socket_is_reliable (priv->base_socket);
 }
 
-
-static void
-add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
-    const NiceOutputMessage *messages, guint n_messages)
-{
-  Socks5Priv *priv = sock->priv;
-  guint i;
-
-  for (i = 0; i < n_messages; i++) {
-    struct to_be_sent *tbs;
-    const NiceOutputMessage *message = &messages[i];
-    guint j;
-    gsize offset = 0;
-
-    tbs = g_slice_new0 (struct to_be_sent);
-
-    /* Compact the buffer. */
-    tbs->length = output_message_get_size (message);
-    tbs->buf = g_malloc (tbs->length);
-    if (to != NULL)
-      tbs->to = *to;
-    g_queue_push_tail (&priv->send_queue, tbs);
-
-    for (j = 0;
-         (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
-         (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
-         j++) {
-      const GOutputVector *buffer = &message->buffers[j];
-      gsize len;
-
-      len = MIN (tbs->length - offset, buffer->size);
-      memcpy (tbs->buf + offset, buffer->buffer, len);
-      offset += len;
-    }
-
-    g_assert (offset == tbs->length);
-  }
-}
-
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
-  g_free (tbs->buf);
-  g_slice_free (struct to_be_sent, tbs);
-}
index b344ed0..903cce4 100644 (file)
@@ -44,6 +44,7 @@
 
 #include "tcp-bsd.h"
 #include "agent-priv.h"
+#include "socket-priv.h"
 
 #include <string.h>
 #include <errno.h>
@@ -62,11 +63,6 @@ typedef struct {
   gboolean reliable;
 } TcpPriv;
 
-struct to_be_sent {
-  guint8 *buf;
-  gsize length;
-};
-
 #define MAX_QUEUE_LENGTH 20
 
 static void socket_close (NiceSocket *sock);
@@ -78,10 +74,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
     const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
 static gboolean socket_is_reliable (NiceSocket *sock);
 
-
-static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
-    gsize message_offset, gsize message_len, gboolean head);
-static void free_to_be_sent (struct to_be_sent *tbs);
 static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
     gpointer data);
 
@@ -201,8 +193,8 @@ socket_close (NiceSocket *sock)
     g_source_destroy (priv->io_source);
     g_source_unref (priv->io_source);
   }
-  g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
-  g_queue_clear (&priv->send_queue);
+
+  nice_socket_free_send_queue (&priv->send_queue);
 
   if (priv->context)
     g_main_context_unref (priv->context);
@@ -287,21 +279,27 @@ socket_send_message (NiceSocket *sock,
       if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
           g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
         /* Queue the message and send it later. */
-        add_to_be_sent (sock, message, 0, message_len, FALSE);
+        nice_socket_queue_send_with_callback (&priv->send_queue,
+            message, 0, message_len, FALSE, sock->fileno, &priv->io_source,
+            priv->context, (GSourceFunc) socket_send_more, sock);
         ret = message_len;
       }
 
       g_error_free (gerr);
     } else if ((gsize) ret < message_len) {
       /* Partial send. */
-      add_to_be_sent (sock, message, ret, message_len, TRUE);
+      nice_socket_queue_send_with_callback (&priv->send_queue,
+          message, ret, message_len, TRUE, sock->fileno, &priv->io_source,
+          priv->context, (GSourceFunc) socket_send_more, sock);
       ret = message_len;
     }
   } else {
     /* Only queue if we're sending reliably  */
     if (reliable) {
       /* Queue the message and send it later. */
-      add_to_be_sent (sock, message, 0, message_len, FALSE);
+      nice_socket_queue_send_with_callback (&priv->send_queue,
+          message, 0, message_len, FALSE, sock->fileno, &priv->io_source,
+          priv->context, (GSourceFunc) socket_send_more, sock);
       ret = message_len;
     } else {
       /* non reliable send, so we shouldn't queue the message */
@@ -381,8 +379,6 @@ socket_send_more (
 {
   NiceSocket *sock = (NiceSocket *) data;
   TcpPriv *priv = sock->priv;
-  struct to_be_sent *tbs = NULL;
-  GError *gerr = NULL;
 
   agent_lock ();
 
@@ -393,42 +389,10 @@ socket_send_more (
     return FALSE;
   }
 
-  while ((tbs = g_queue_pop_head (&priv->send_queue)) != NULL) {
-    int ret;
-
-    if(condition & G_IO_HUP) {
-      /* connection hangs up */
-      ret = -1;
-    } else {
-      GOutputVector local_bufs = { tbs->buf, tbs->length };
-      ret = g_socket_send_message (sock->fileno, NULL, &local_bufs, 1, NULL, 0,
-          G_SOCKET_MSG_NONE, NULL, &gerr);
-    }
-
-    if (ret < 0) {
-      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-        GOutputVector local_buf = { tbs->buf, tbs->length };
-        NiceOutputMessage local_message = {&local_buf, 1};
-
-        add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
-        free_to_be_sent (tbs);
-        g_error_free (gerr);
-        break;
-      }
-      g_clear_error (&gerr);
-    } else if (ret < (int) tbs->length) {
-      GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
-      NiceOutputMessage local_message = {&local_buf, 1};
-
-      add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
-      free_to_be_sent (tbs);
-      break;
-    }
-
-    free_to_be_sent (tbs);
-  }
-
-  if (g_queue_is_empty (&priv->send_queue)) {
+  /* connection hangs up or queue was emptied */
+  if (condition & G_IO_HUP ||
+      nice_socket_flush_send_queue_to_socket (sock->fileno,
+          &priv->send_queue)) {
     g_source_destroy (priv->io_source);
     g_source_unref (priv->io_source);
     priv->io_source = NULL;
@@ -440,70 +404,3 @@ socket_send_more (
   agent_unlock ();
   return TRUE;
 }
-
-
-/* Queue data starting at byte offset @message_offset from @message’s
- * buffers.
- *
- * Returns the message's length */
-static void
-add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
-    gsize message_offset, gsize message_len, gboolean head)
-{
-  TcpPriv *priv = sock->priv;
-  struct to_be_sent *tbs;
-  guint j;
-  gsize offset = 0;
-
-  if (message_offset >= message_len)
-    return;
-
-  tbs = g_slice_new0 (struct to_be_sent);
-  tbs->length = message_len - message_offset;
-  tbs->buf = g_malloc (tbs->length);
-
-  if (head)
-    g_queue_push_head (&priv->send_queue, tbs);
-  else
-    g_queue_push_tail (&priv->send_queue, tbs);
-
-  if (priv->io_source == NULL) {
-    priv->io_source = g_socket_create_source(sock->fileno, G_IO_OUT, NULL);
-    g_source_set_callback (priv->io_source, (GSourceFunc) socket_send_more,
-        sock, NULL);
-    g_source_attach (priv->io_source, priv->context);
-  }
-
-  /* Move the data into the buffer. */
-  for (j = 0;
-       (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
-       (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
-       j++) {
-    const GOutputVector *buffer = &message->buffers[j];
-    gsize len;
-
-    /* Skip this buffer if it’s within @message_offset. */
-    if (buffer->size <= message_offset) {
-      message_offset -= buffer->size;
-      continue;
-    }
-
-    len = MIN (tbs->length - offset, buffer->size - message_offset);
-    memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
-    offset += len;
-    if (message_offset >= len)
-      message_offset -= len;
-    else
-      message_offset = 0;
-  }
-}
-
-
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
-  g_free (tbs->buf);
-  g_slice_free (struct to_be_sent, tbs);
-}
-