libsocket_la_SOURCES = \
socket.h \
+ socket-priv.h \
socket.c \
udp-bsd.h \
udp-bsd.c \
#include "http.h"
#include "agent-priv.h"
+#include "socket-priv.h"
#include <string.h>
#include <stdlib.h>
} HttpPriv;
-struct to_be_sent {
- guint length;
- gchar *buf;
- NiceAddress to;
-};
-
-
static void socket_close (NiceSocket *sock);
static gint socket_recv_messages (NiceSocket *sock,
NiceInputMessage *recv_messages, guint n_recv_messages);
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
-static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
- const NiceOutputMessage *messages, guint n_messages);
-static void free_to_be_sent (struct to_be_sent *tbs);
-
-
NiceSocket *
nice_http_socket_new (NiceSocket *base_socket,
NiceAddress *addr, gchar *username, gchar *password)
if (priv->recv_buf)
g_free (priv->recv_buf);
- g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
- g_queue_clear (&priv->send_queue);
+ nice_socket_free_send_queue (&priv->send_queue);
g_slice_free(HttpPriv, sock->priv);
}
case HTTP_STATE_CONNECTED:
{
gsize len;
- struct to_be_sent *tbs = NULL;
len = memcpy_ring_buffer_to_input_messages (priv,
recv_messages, n_recv_messages);
/* Send the pending data */
- while ((tbs = g_queue_pop_head (&priv->send_queue))) {
- /* We only queue reliable data */
- nice_socket_send_reliable (priv->base_socket, &tbs->to,
- tbs->length, tbs->buf);
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
- }
+ nice_socket_flush_send_queue (priv->base_socket,
+ &priv->send_queue);
return len;
}
} else if (priv->state == HTTP_STATE_ERROR) {
return -1;
} else {
- add_to_be_sent (sock, to, messages, n_messages);
+ nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
}
return n_messages;
return nice_socket_is_reliable (priv->base_socket);
}
-
-
-static void
-add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
- const NiceOutputMessage *messages, guint n_messages)
-{
- HttpPriv *priv = sock->priv;
- guint i;
-
- if (n_messages == 0)
- return;
-
- /* Compact the message’s buffers before queueing. */
- for (i = 0; i < n_messages; i++) {
- const NiceOutputMessage *message = &messages[i];
- struct to_be_sent *tbs = NULL;
- guint j;
- gsize message_len_remaining = output_message_get_size (message);
- gsize offset = 0;
-
- if (message_len_remaining == 0)
- continue;
-
- tbs = g_slice_new0 (struct to_be_sent);
- tbs->buf = g_malloc (message_len_remaining);
- tbs->length = message_len_remaining;
- if (to)
- tbs->to = *to;
- g_queue_push_tail (&priv->send_queue, tbs);
-
- for (j = 0;
- (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
- (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
- j++) {
- const GOutputVector *buffer = &message->buffers[j];
- gsize len;
-
- len = MIN (buffer->size, message_len_remaining);
- memcpy (tbs->buf + offset, buffer->buffer, len);
- message_len_remaining -= len;
- offset += len;
- }
- }
-}
-
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
-}
#include "pseudossl.h"
#include "agent-priv.h"
+#include "socket-priv.h"
#include <string.h>
} PseudoSSLPriv;
-struct to_be_sent {
- guint8 *buf; /* owned */
- gsize length;
- NiceAddress to;
-};
-
-
static const gchar SSL_SERVER_GOOGLE_HANDSHAKE[] = {
0x16, 0x03, 0x01, 0x00, 0x4a, 0x02, 0x00, 0x00,
0x46, 0x03, 0x01, 0x42, 0x85, 0x45, 0xa7, 0x27,
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
-static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
- const NiceOutputMessage *messages, guint n_messages);
-static void free_to_be_sent (struct to_be_sent *tbs);
-
-
NiceSocket *
nice_pseudossl_socket_new (NiceSocket *base_socket,
NicePseudoSSLSocketCompatibility compatibility)
if (priv->base_socket)
nice_socket_free (priv->base_socket);
- g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
- g_queue_clear (&priv->send_queue);
+ nice_socket_free_send_queue (&priv->send_queue);
g_slice_free(PseudoSSLPriv, sock->priv);
}
if (ret <= 0) {
return ret;
} else if (ret == 1 && server_handshake_valid(sock, &local_recv_buf)) {
- struct to_be_sent *tbs = NULL;
priv->handshaken = TRUE;
- while ((tbs = g_queue_pop_head (&priv->send_queue))) {
- /* We only queue reliable data */
- nice_socket_send_reliable (priv->base_socket, &tbs->to, tbs->length,
- (const gchar *) tbs->buf);
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
- }
+ nice_socket_flush_send_queue (priv->base_socket, &priv->send_queue);
} else {
if (priv->base_socket)
nice_socket_free (priv->base_socket);
return nice_socket_send_messages_reliable (priv->base_socket, to, messages,
n_messages);
} else {
- add_to_be_sent (sock, to, messages, n_messages);
+ nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
}
return n_messages;
}
return nice_socket_is_reliable (priv->base_socket);
}
-
-
-static void
-add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
- const NiceOutputMessage *messages, guint n_messages)
-{
- PseudoSSLPriv *priv = sock->priv;
- guint i;
-
- for (i = 0; i < n_messages; i++) {
- struct to_be_sent *tbs;
- const NiceOutputMessage *message = &messages[i];
- guint j;
- gsize offset = 0;
- gsize message_len;
-
- tbs = g_slice_new0 (struct to_be_sent);
-
- message_len = output_message_get_size (message);
-
- /* Compact the buffer. */
- tbs->buf = g_malloc (message_len);
- tbs->length = message_len;
- if (to != NULL)
- tbs->to = *to;
- g_queue_push_tail (&priv->send_queue, tbs);
-
- for (j = 0;
- (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
- (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
- j++) {
- const GOutputVector *buffer = &message->buffers[j];
- gsize len;
-
- len = MIN (message_len - offset, buffer->size);
- memcpy (tbs->buf + offset, buffer->buffer, len);
- offset += len;
- }
-
- g_assert (offset == message_len);
- }
-}
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
-}
--- /dev/null
+/*
+ * This file is part of the Nice GLib ICE library.
+ *
+ * (C) 2008-2009 Collabora Ltd.
+ * Contact: Youness Alaoui
+ * (C) 2008-2009 Nokia Corporation. All rights reserved.
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is the Nice GLib ICE library.
+ *
+ * The Initial Developers of the Original Code are Collabora Ltd and Nokia
+ * Corporation. All Rights Reserved.
+ *
+ * Contributors:
+ * Youness Alaoui, Collabora Ltd.
+ *
+ * Alternatively, the contents of this file may be used under the terms of the
+ * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
+ * case the provisions of LGPL are applicable instead of those above. If you
+ * wish to allow use of your version of this file only under the terms of the
+ * LGPL and not to allow others to use your version of this file under the
+ * MPL, indicate your decision by deleting the provisions above and replace
+ * them with the notice and other provisions required by the LGPL. If you do
+ * not delete the provisions above, a recipient may use your version of this
+ * file under either the MPL or the LGPL.
+ */
+
+#ifndef _SOCKET_PRIV_H
+#define _SOCKET_PRIV_H
+
+#include "socket.h"
+
+G_BEGIN_DECLS
+
+/**
+ * nice_socket_queue_send:
+ * @send_queue: The queue to add to
+ * @to : Destination
+ * @messages: Messages to queue
+ * @n_messages: Number of messages to queue
+ *
+ * Queue messages to be sent later into the GQueue
+ */
+void nice_socket_queue_send (GQueue *send_queue, const NiceAddress *to,
+ const NiceOutputMessage *messages, guint n_messages);
+
+/**
+ * nice_socket_queue_send_with_callback:
+ * @send_queue: The queue to add to
+ * @message: The message to queue
+ * @message_offset: Number of bytes to skip in the message
+ * @message_len: Total length of the message
+ * @head: Whether to add the message to the head of the queue or the tail
+ * @gsock: The #GSocket to create the callback on
+ * @io_source: Pointer to #GSource pointer to store the created source
+ * @context: #GMainContext to attach the @io_source to
+ * @cb: Callback function to call when the @gsock is writable
+ * @user_data: User data for @cb
+ *
+ * Queue (partial) message to be sent later and create a source to call @cb
+ * when the @gsock becomes writable.
+ * The @message_offset can be used if a partial write happened and some bytes
+ * were already written, in which case @head should be set to TRUE to add the
+ * message to the head of the queue.
+ */
+void nice_socket_queue_send_with_callback (GQueue *send_queue,
+ const NiceOutputMessage *message, gsize message_offset, gsize message_len,
+ gboolean head, GSocket *gsock, GSource **io_source, GMainContext *context,
+ GSourceFunc cb, gpointer user_data);
+
+/**
+ * nice_socket_flush_send_queue:
+ * @base_socket: Base socket to send on
+ * @send_queue: Queue to flush
+ *
+ * Send all the queued messages reliably to the base socket. We assume only
+ * reliable messages were queued and the underlying socket will handle the
+ * send.
+ */
+void nice_socket_flush_send_queue (NiceSocket *base_socket, GQueue *send_queue);
+
+/**
+ * nice_socket_flush_send_queue_to_socket:
+ * @gsock: GSocket to send on
+ * @send_queue: Queue to flush
+ *
+ * Send all the queued messages to the socket. If any message fails to be sent
+ * it will be readded to the queue and #FALSE will be returned, in which case
+ * the IO source must be kept to allow flushing the next time the socket
+ * is writable.
+ * If the queue gets flushed, #TRUE will be returned, in which case, the IO
+ * source should be destroyed.
+ *
+ * Returns: #TRUE if the queue was emptied, #FALSE if the socket would block.
+ */
+gboolean nice_socket_flush_send_queue_to_socket (GSocket *gsock,
+ GQueue *send_queue);
+
+/**
+ * nice_socket_free_send_queue:
+ * @send_queue: The send queue
+ *
+ * Frees every item in the send queue without sending them and empties the queue
+ */
+void nice_socket_free_send_queue (GQueue *send_queue);
+
+G_END_DECLS
+
+#endif /* _SOCKET_PRIV_H */
+
#include <glib.h>
#include "socket.h"
+#include "socket-priv.h"
+#include "agent-priv.h"
+#include <string.h>
+
+#ifndef G_OS_WIN32
+#include <unistd.h>
+#endif
+
+typedef struct _NiceSocketQueuedSend NiceSocketQueuedSend;
+
+struct _NiceSocketQueuedSend {
+ guint8 *buf; /* owned */
+ gsize length;
+ NiceAddress to;
+};
/**
* nice_socket_recv_messages:
g_slice_free (NiceSocket,sock);
}
}
+
+static void
+nice_socket_free_queued_send (NiceSocketQueuedSend *tbs)
+{
+ g_free (tbs->buf);
+ g_slice_free (NiceSocketQueuedSend, tbs);
+}
+
+void
+nice_socket_queue_send (GQueue *send_queue, const NiceAddress *to,
+ const NiceOutputMessage *messages, guint n_messages)
+{
+ guint i;
+
+ if (n_messages == 0)
+ return;
+
+ /* Compact the message’s buffers before queueing. */
+ for (i = 0; i < n_messages; i++) {
+ NiceSocketQueuedSend *tbs;
+ const NiceOutputMessage *message = &messages[i];
+ gsize message_len_remaining = output_message_get_size (message);
+ guint j;
+ gsize offset = 0;
+
+ if (message_len_remaining == 0)
+ continue;
+
+ /* Compact the buffer. */
+ tbs = g_slice_new0 (NiceSocketQueuedSend);
+ tbs->buf = g_malloc (message_len_remaining);
+ tbs->length = message_len_remaining;
+
+ if (to)
+ tbs->to = *to;
+ else
+ memset (&tbs->to, 0, sizeof(NiceAddress));
+ g_queue_push_tail (send_queue, tbs);
+
+ for (j = 0;
+ (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
+ (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
+ j++) {
+ const GOutputVector *buffer = &message->buffers[j];
+ gsize len;
+
+ len = MIN (buffer->size, message_len_remaining);
+ memcpy (tbs->buf + offset, buffer->buffer, len);
+ message_len_remaining -= len;
+ offset += len;
+ }
+
+ g_assert (offset == tbs->length);
+ }
+}
+
+void nice_socket_queue_send_with_callback (GQueue *send_queue,
+ const NiceOutputMessage *message, gsize message_offset, gsize message_len,
+ gboolean head, GSocket *gsock, GSource **io_source, GMainContext *context,
+ GSourceFunc cb, gpointer user_data)
+{
+ NiceSocketQueuedSend *tbs;
+ guint j;
+ gsize offset = 0;
+
+ if (message_offset >= message_len)
+ return;
+
+ tbs = g_slice_new0 (NiceSocketQueuedSend);
+ tbs->length = message_len - message_offset;
+ tbs->buf = g_malloc (tbs->length);
+
+ if (head)
+ g_queue_push_head (send_queue, tbs);
+ else
+ g_queue_push_tail (send_queue, tbs);
+
+ if (io_source && gsock && context && cb && *io_source == NULL) {
+ *io_source = g_socket_create_source(gsock, G_IO_OUT, NULL);
+ g_source_set_callback (*io_source, (GSourceFunc) cb, user_data, NULL);
+ g_source_attach (*io_source, context);
+ }
+
+ /* Move the data into the buffer. */
+ for (j = 0;
+ (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
+ (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
+ j++) {
+ const GOutputVector *buffer = &message->buffers[j];
+ gsize len;
+
+ /* Skip this buffer if it’s within @message_offset. */
+ if (buffer->size <= message_offset) {
+ message_offset -= buffer->size;
+ continue;
+ }
+
+ len = MIN (tbs->length - offset, buffer->size - message_offset);
+ memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
+ offset += len;
+ if (message_offset >= len)
+ message_offset -= len;
+ else
+ message_offset = 0;
+ }
+}
+
+void nice_socket_flush_send_queue (NiceSocket *base_socket, GQueue *send_queue)
+{
+ NiceSocketQueuedSend *tbs;
+
+ while ((tbs = g_queue_pop_head (send_queue))) {
+ NiceAddress *to = &tbs->to;
+
+ if (!nice_address_is_valid (to))
+ to = NULL;
+
+ /* We only queue reliable data */
+ nice_socket_send_reliable (base_socket, to,
+ tbs->length, (const gchar *) tbs->buf);
+ nice_socket_free_queued_send (tbs);
+ }
+}
+
+gboolean nice_socket_flush_send_queue_to_socket (GSocket *gsock,
+ GQueue *send_queue)
+{
+ NiceSocketQueuedSend *tbs;
+ GError *gerr = NULL;
+
+
+ while ((tbs = g_queue_pop_head (send_queue)) != NULL) {
+ int ret;
+
+ GOutputVector local_bufs = { tbs->buf, tbs->length };
+ ret = g_socket_send_message (gsock, NULL, &local_bufs, 1, NULL, 0,
+ G_SOCKET_MSG_NONE, NULL, &gerr);
+
+ if (ret < 0) {
+ if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ GOutputVector local_buf = { tbs->buf, tbs->length };
+ NiceOutputMessage local_message = {&local_buf, 1};
+
+ nice_socket_queue_send_with_callback (send_queue, &local_message,
+ 0, local_buf.size, TRUE, NULL, NULL, NULL, NULL, NULL);
+ nice_socket_free_queued_send (tbs);
+ g_error_free (gerr);
+ return FALSE;
+ }
+ g_clear_error (&gerr);
+ } else if (ret < (int) tbs->length) {
+ GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
+ NiceOutputMessage local_message = {&local_buf, 1};
+
+ nice_socket_queue_send_with_callback (send_queue, &local_message,
+ 0, local_buf.size, TRUE, NULL, NULL, NULL, NULL, NULL);
+ nice_socket_free_queued_send (tbs);
+ return FALSE;
+ }
+
+ nice_socket_free_queued_send (tbs);
+ }
+
+ return TRUE;
+}
+
+void
+nice_socket_free_send_queue (GQueue *send_queue)
+{
+ g_queue_foreach (send_queue, (GFunc) nice_socket_free_queued_send, NULL);
+ g_queue_clear (send_queue);
+}
#include "socks5.h"
#include "agent-priv.h"
+#include "socket-priv.h"
#include <string.h>
} Socks5Priv;
-struct to_be_sent {
- guint8 *buf; /* owned */
- gsize length;
- NiceAddress to;
-};
-
-
static void socket_close (NiceSocket *sock);
static gint socket_recv_messages (NiceSocket *sock,
NiceInputMessage *recv_messages, guint n_recv_messages);
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
-static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
- const NiceOutputMessage *messages, guint n_messages);
-static void free_to_be_sent (struct to_be_sent *tbs);
-
NiceSocket *
nice_socks5_socket_new (NiceSocket *base_socket,
if (priv->password)
g_free (priv->password);
- g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
- g_queue_clear (&priv->send_queue);
+ nice_socket_free_send_queue (&priv->send_queue);
g_slice_free(Socks5Priv, sock->priv);
}
switch (data[1]) {
case 0x00:
if (data[2] == 0x00) {
- struct to_be_sent *tbs = NULL;
switch (data[3]) {
case 0x01: /* IPV4 bound address */
local_recv_buf.size = 6;
/* Unsupported address type */
goto error;
}
- while ((tbs = g_queue_pop_head (&priv->send_queue))) {
- /* We only queue reliable data */
- nice_socket_send_reliable (priv->base_socket, &tbs->to,
- tbs->length, (const gchar *) tbs->buf);
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
- }
+ nice_socket_flush_send_queue (priv->base_socket,
+ &priv->send_queue);
priv->state = SOCKS_STATE_CONNECTED;
} else {
/* Wrong reserved value */
} else if (priv->state == SOCKS_STATE_ERROR) {
return -1;
} else {
- add_to_be_sent (sock, to, messages, n_messages);
+ nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
}
return n_messages;
}
return nice_socket_is_reliable (priv->base_socket);
}
-
-static void
-add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
- const NiceOutputMessage *messages, guint n_messages)
-{
- Socks5Priv *priv = sock->priv;
- guint i;
-
- for (i = 0; i < n_messages; i++) {
- struct to_be_sent *tbs;
- const NiceOutputMessage *message = &messages[i];
- guint j;
- gsize offset = 0;
-
- tbs = g_slice_new0 (struct to_be_sent);
-
- /* Compact the buffer. */
- tbs->length = output_message_get_size (message);
- tbs->buf = g_malloc (tbs->length);
- if (to != NULL)
- tbs->to = *to;
- g_queue_push_tail (&priv->send_queue, tbs);
-
- for (j = 0;
- (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
- (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
- j++) {
- const GOutputVector *buffer = &message->buffers[j];
- gsize len;
-
- len = MIN (tbs->length - offset, buffer->size);
- memcpy (tbs->buf + offset, buffer->buffer, len);
- offset += len;
- }
-
- g_assert (offset == tbs->length);
- }
-}
-
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
-}
#include "tcp-bsd.h"
#include "agent-priv.h"
+#include "socket-priv.h"
#include <string.h>
#include <errno.h>
gboolean reliable;
} TcpPriv;
-struct to_be_sent {
- guint8 *buf;
- gsize length;
-};
-
#define MAX_QUEUE_LENGTH 20
static void socket_close (NiceSocket *sock);
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
-
-static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
- gsize message_offset, gsize message_len, gboolean head);
-static void free_to_be_sent (struct to_be_sent *tbs);
static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
gpointer data);
g_source_destroy (priv->io_source);
g_source_unref (priv->io_source);
}
- g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
- g_queue_clear (&priv->send_queue);
+
+ nice_socket_free_send_queue (&priv->send_queue);
if (priv->context)
g_main_context_unref (priv->context);
if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
/* Queue the message and send it later. */
- add_to_be_sent (sock, message, 0, message_len, FALSE);
+ nice_socket_queue_send_with_callback (&priv->send_queue,
+ message, 0, message_len, FALSE, sock->fileno, &priv->io_source,
+ priv->context, (GSourceFunc) socket_send_more, sock);
ret = message_len;
}
g_error_free (gerr);
} else if ((gsize) ret < message_len) {
/* Partial send. */
- add_to_be_sent (sock, message, ret, message_len, TRUE);
+ nice_socket_queue_send_with_callback (&priv->send_queue,
+ message, ret, message_len, TRUE, sock->fileno, &priv->io_source,
+ priv->context, (GSourceFunc) socket_send_more, sock);
ret = message_len;
}
} else {
/* Only queue if we're sending reliably */
if (reliable) {
/* Queue the message and send it later. */
- add_to_be_sent (sock, message, 0, message_len, FALSE);
+ nice_socket_queue_send_with_callback (&priv->send_queue,
+ message, 0, message_len, FALSE, sock->fileno, &priv->io_source,
+ priv->context, (GSourceFunc) socket_send_more, sock);
ret = message_len;
} else {
/* non reliable send, so we shouldn't queue the message */
{
NiceSocket *sock = (NiceSocket *) data;
TcpPriv *priv = sock->priv;
- struct to_be_sent *tbs = NULL;
- GError *gerr = NULL;
agent_lock ();
return FALSE;
}
- while ((tbs = g_queue_pop_head (&priv->send_queue)) != NULL) {
- int ret;
-
- if(condition & G_IO_HUP) {
- /* connection hangs up */
- ret = -1;
- } else {
- GOutputVector local_bufs = { tbs->buf, tbs->length };
- ret = g_socket_send_message (sock->fileno, NULL, &local_bufs, 1, NULL, 0,
- G_SOCKET_MSG_NONE, NULL, &gerr);
- }
-
- if (ret < 0) {
- if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- GOutputVector local_buf = { tbs->buf, tbs->length };
- NiceOutputMessage local_message = {&local_buf, 1};
-
- add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
- free_to_be_sent (tbs);
- g_error_free (gerr);
- break;
- }
- g_clear_error (&gerr);
- } else if (ret < (int) tbs->length) {
- GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
- NiceOutputMessage local_message = {&local_buf, 1};
-
- add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
- free_to_be_sent (tbs);
- break;
- }
-
- free_to_be_sent (tbs);
- }
-
- if (g_queue_is_empty (&priv->send_queue)) {
+ /* connection hangs up or queue was emptied */
+ if (condition & G_IO_HUP ||
+ nice_socket_flush_send_queue_to_socket (sock->fileno,
+ &priv->send_queue)) {
g_source_destroy (priv->io_source);
g_source_unref (priv->io_source);
priv->io_source = NULL;
agent_unlock ();
return TRUE;
}
-
-
-/* Queue data starting at byte offset @message_offset from @message’s
- * buffers.
- *
- * Returns the message's length */
-static void
-add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
- gsize message_offset, gsize message_len, gboolean head)
-{
- TcpPriv *priv = sock->priv;
- struct to_be_sent *tbs;
- guint j;
- gsize offset = 0;
-
- if (message_offset >= message_len)
- return;
-
- tbs = g_slice_new0 (struct to_be_sent);
- tbs->length = message_len - message_offset;
- tbs->buf = g_malloc (tbs->length);
-
- if (head)
- g_queue_push_head (&priv->send_queue, tbs);
- else
- g_queue_push_tail (&priv->send_queue, tbs);
-
- if (priv->io_source == NULL) {
- priv->io_source = g_socket_create_source(sock->fileno, G_IO_OUT, NULL);
- g_source_set_callback (priv->io_source, (GSourceFunc) socket_send_more,
- sock, NULL);
- g_source_attach (priv->io_source, priv->context);
- }
-
- /* Move the data into the buffer. */
- for (j = 0;
- (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
- (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
- j++) {
- const GOutputVector *buffer = &message->buffers[j];
- gsize len;
-
- /* Skip this buffer if it’s within @message_offset. */
- if (buffer->size <= message_offset) {
- message_offset -= buffer->size;
- continue;
- }
-
- len = MIN (tbs->length - offset, buffer->size - message_offset);
- memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
- offset += len;
- if (message_offset >= len)
- message_offset -= len;
- else
- message_offset = 0;
- }
-}
-
-
-
-static void
-free_to_be_sent (struct to_be_sent *tbs)
-{
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
-}
-