memcpy_buffer_to_input_message (NiceInputMessage *message,
const guint8 *buffer, gsize buffer_length);
guint8 *
-compact_input_message (NiceInputMessage *message, gsize *buffer_length);
+compact_input_message (const NiceInputMessage *message, gsize *buffer_length);
guint8 *
compact_output_message (const NiceOutputMessage *message, gsize *buffer_length);
+gsize
+output_message_get_size (const NiceOutputMessage *message);
+
+
#endif /*_NICE_AGENT_PRIV_H */
#define MAX_TCP_MTU 1400 /* Use 1400 because of VPNs and we assume IEE 802.3 */
static void
-nice_debug_message_composition (NiceInputMessage *messages, guint n_messages);
+nice_debug_input_message_composition (const NiceInputMessage *messages,
+ guint n_messages);
G_DEFINE_TYPE (NiceAgent, nice_agent, G_TYPE_OBJECT);
* used in reliable mode, and there is no concept of a ‘message’, but is
* necessary because the calling API has no way of returning to the client
* and indicating that a message was partially sent. */
- if (message->length > pseudo_tcp_socket_get_available_send_space (self)) {
+ if (output_message_get_size (message) >
+ pseudo_tcp_socket_get_available_send_space (self)) {
return i;
}
nice_debug ("%s: Client buffers case: Received %d valid messages:",
G_STRFUNC, n_valid_messages);
- nice_debug_message_composition (component->recv_messages,
+ nice_debug_input_message_composition (component->recv_messages,
component->n_recv_messages);
if (n_valid_messages < 0) {
if (component->selected_pair.local != NULL) {
NiceSocket *sock;
NiceAddress *addr;
- GOutputVector local_buf;
- NiceOutputMessage local_message;
sock = component->selected_pair.local->sockptr;
addr = &component->selected_pair.remote->addr;
- local_buf.buffer = buffer;
- local_buf.size = len;
- local_message.buffers = &local_buf;
- local_message.n_buffers = 1;
- local_message.length = len;
-
- if (nice_socket_send_messages (sock, addr, &local_message, 1)) {
+ if (nice_socket_send (sock, addr, len, buffer))
return WR_SUCCESS;
- }
} else {
nice_debug ("%s: WARNING: Failed to send pseudo-TCP packet from agent %p "
"as no pair has been selected yet.", G_STRFUNC, component->agent);
/* Print the composition of an array of messages. No-op if debugging is
* disabled. */
static void
-nice_debug_message_composition (NiceInputMessage *messages, guint n_messages)
+nice_debug_input_message_composition (const NiceInputMessage *messages,
+ guint n_messages)
{
#ifndef NDEBUG
guint i;
for (i = 0; i < n_messages; i++) {
- NiceInputMessage *message = &messages[i];
+ const NiceInputMessage *message = &messages[i];
guint j;
nice_debug ("Message %p (from: %p, length: %" G_GSIZE_FORMAT ")", message,
#endif
}
-/* Concatenate all the buffers in the given @recv_message into a single, newly
- * allocated, monolithic buffer which is returned. The length of the new buffer
- * is returned in @buffer_length, and should be equal to the length field of
- * @recv_message.
- *
- * The return value must be freed with g_free(). */
-guint8 *
-compact_input_message (NiceInputMessage *message, gsize *buffer_length)
+static guint8 *
+compact_message (const NiceOutputMessage *message, gsize buffer_length)
{
guint8 *buffer;
gsize offset = 0;
guint i;
- nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
- nice_debug_message_composition (message, 1);
-
- *buffer_length = message->length;
- buffer = g_malloc (*buffer_length);
+ buffer = g_malloc (buffer_length);
for (i = 0;
(message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[i].buffer != NULL);
i++) {
- gsize len = MIN (*buffer_length - offset, message->buffers[i].size);
+ gsize len = MIN (buffer_length - offset, message->buffers[i].size);
memcpy (buffer + offset, message->buffers[i].buffer, len);
offset += len;
}
return buffer;
}
+/* Concatenate all the buffers in the given @recv_message into a single, newly
+ * allocated, monolithic buffer which is returned. The length of the new buffer
+ * is returned in @buffer_length, and should be equal to the length field of
+ * @recv_message.
+ *
+ * The return value must be freed with g_free(). */
+guint8 *
+compact_input_message (const NiceInputMessage *message, gsize *buffer_length)
+{
+ nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
+ nice_debug_input_message_composition (message, 1);
+
+ /* This works as long as NiceInputMessage is a subset of eNiceOutputMessage */
+
+ *buffer_length = message->length;
+
+ return compact_message ((NiceOutputMessage *) message, *buffer_length);
+}
+
/* Returns the number of bytes copied. Silently drops any data from @buffer
* which doesn’t fit in @message. */
gsize
message->length += len;
}
- nice_debug_message_composition (message, 1);
+ nice_debug_input_message_composition (message, 1);
if (buffer_length > 0) {
g_warning ("Dropped %" G_GSIZE_FORMAT " bytes of data from the end of "
guint8 *
compact_output_message (const NiceOutputMessage *message, gsize *buffer_length)
{
- /* This works as long as NiceInputMessage and NiceOutputMessage are layed out
- * identically. */
- return compact_input_message ((NiceInputMessage *) message, buffer_length);
+ nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
+
+ *buffer_length = output_message_get_size (message);
+
+ return compact_message (message, *buffer_length);
+}
+
+gsize
+output_message_get_size (const NiceOutputMessage *message)
+{
+ guint i;
+ gsize message_len = 0;
+
+ /* Find the total size of the message */
+ for (i = 0;
+ (message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
+ (message->n_buffers < 0 && message->buffers[i].buffer != NULL);
+ i++)
+ message_len += message->buffers[i].size;
+
+ return message_len;
}
/**
}
nice_debug ("%s: (%s):", G_STRFUNC, blocking ? "blocking" : "non-blocking");
- nice_debug_message_composition (messages, n_messages);
+ nice_debug_input_message_composition (messages, n_messages);
/* Set the component’s receive buffer. */
context = component_dup_io_context (component);
const gchar *buf)
{
GOutputVector local_buf = { buf, len };
- NiceOutputMessage local_message = { &local_buf, 1, len };
gint n_sent_messages;
+ NiceOutputMessage local_message = { &local_buf, 1 };
n_sent_messages = nice_agent_send_messages_nonblocking (agent, stream_id,
component_id, &local_message, 1, NULL, NULL);
* which contain data to transmit for this message
* @n_buffers: number of #GOutputVectors in @buffers, or -1 to indicate @buffers
* is %NULL-terminated
- * @length: total number of valid bytes contiguously stored in @buffers
*
* Represents a single message to transmit on the network. For
* reliable connections, this is essentially just an array of
typedef struct {
GOutputVector *buffers;
gint n_buffers;
- gsize length;
} NiceOutputMessage;
do {
GOutputVector local_buf = { (const guint8 *) buffer + len, count - len };
- NiceOutputMessage local_message = {&local_buf, 1, count - len};
+ NiceOutputMessage local_message = {&local_buf, 1};
/* Have to unlock while calling into the agent because
* it will take the agent lock which will cause a deadlock if one of
NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
NiceAgent *agent; /* owned */
GOutputVector local_buf = { buffer, count };
- NiceOutputMessage local_message = { &local_buf, 1, count };
+ NiceOutputMessage local_message = { &local_buf, 1 };
gint n_sent_messages;
/* Closed streams are not writeable. */
#endif
#include "http.h"
+#include "agent-priv.h"
#include <string.h>
#include <stdlib.h>
local_bufs.size = strlen (msg);
local_messages.buffers = &local_bufs;
local_messages.n_buffers = 1;
- local_messages.length = local_bufs.size;
nice_socket_send_messages (priv->base_socket, NULL, &local_messages, 1);
priv->state = HTTP_STATE_INIT;
message->buffers[j].size =
memcpy_ring_buffer_to_buffer (priv,
message->buffers[j].buffer, message->buffers[j].size);
- message->length += message->buffers[j].size;
}
}
const NiceOutputMessage *message = &messages[i];
struct to_be_sent *tbs = NULL;
guint j;
- gsize message_len_remaining = message->length;
+ gsize message_len_remaining = output_message_get_size (message);
gsize offset = 0;
- if (message->length == 0)
+ if (message_len_remaining == 0)
continue;
tbs = g_slice_new0 (struct to_be_sent);
- tbs->buf = g_malloc (message->length);
- tbs->length = message->length;
+ tbs->buf = g_malloc (message_len_remaining);
+ tbs->length = message_len_remaining;
if (to)
tbs->to = *to;
g_queue_push_tail (&priv->send_queue, tbs);
#endif
#include "pseudossl.h"
+#include "agent-priv.h"
#include <string.h>
const NiceOutputMessage *message = &messages[i];
guint j;
gsize offset = 0;
+ gsize message_len;
tbs = g_slice_new0 (struct to_be_sent);
- /* Compact the buffer. */
- tbs->buf = g_malloc (message->length);
- tbs->length = message->length;
+ message_len = output_message_get_size (message);
+
+ /* Compact the buffer. */
+ tbs->buf = g_malloc (message_len);
+ tbs->length = message_len;
if (to != NULL)
tbs->to = *to;
g_queue_push_tail (&priv->send_queue, tbs);
const GOutputVector *buffer = &message->buffers[j];
gsize len;
- len = MIN (message->length - offset, buffer->size);
+ len = MIN (message_len - offset, buffer->size);
memcpy (tbs->buf + offset, buffer->buffer, len);
offset += len;
}
- g_assert_cmpuint (offset, ==, message->length);
+ g_assert (offset == message_len);
}
}
const gchar *buf)
{
GOutputVector local_buf = { buf, len };
- NiceOutputMessage local_message = { &local_buf, 1, len };
+ NiceOutputMessage local_message = { &local_buf, 1};
gint ret;
ret = sock->send_messages (sock, to, &local_message, 1);
g_slice_free (NiceSocket,sock);
}
}
-
#endif
#include "socks5.h"
+#include "agent-priv.h"
#include <string.h>
tbs = g_slice_new0 (struct to_be_sent);
/* Compact the buffer. */
- tbs->buf = g_malloc (message->length);
- tbs->length = message->length;
+ tbs->length = output_message_get_size (message);
+ tbs->buf = g_malloc (tbs->length);
if (to != NULL)
tbs->to = *to;
g_queue_push_tail (&priv->send_queue, tbs);
const GOutputVector *buffer = &message->buffers[j];
gsize len;
- len = MIN (message->length - offset, buffer->size);
+ len = MIN (tbs->length - offset, buffer->size);
memcpy (tbs->buf + offset, buffer->buffer, len);
offset += len;
}
- g_assert_cmpuint (offset, ==, message->length);
+ g_assert (offset == tbs->length);
}
}
static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
- gsize message_offset, gboolean head);
+ gsize message_offset, gsize message_len, gboolean head);
static void free_to_be_sent (struct to_be_sent *tbs);
static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
gpointer data);
TcpPriv *priv = sock->priv;
gssize ret;
GError *gerr = NULL;
+ gsize message_len;
/* Don't try to access the socket if it had an error, otherwise we risk a
* crash with SIGPIPE (Broken pipe) */
if (priv->error)
return -1;
+ message_len = output_message_get_size (message);
+
/* First try to send the data, don't send it later if it can be sent now
* this way we avoid allocating memory on every send */
if (g_queue_is_empty (&priv->send_queue)) {
if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
/* Queue the message and send it later. */
- add_to_be_sent (sock, message, 0, FALSE);
- ret = message->length;
+ add_to_be_sent (sock, message, 0, message_len, FALSE);
+ ret = message_len;
}
g_error_free (gerr);
- } else if ((gsize) ret < message->length) {
+ } else if ((gsize) ret < message_len) {
/* Partial send. */
- add_to_be_sent (sock, message, ret, TRUE);
- ret = message->length;
+ add_to_be_sent (sock, message, ret, message_len, TRUE);
+ ret = message_len;
}
} else {
/* FIXME: This dropping will break http/socks5/etc
}
/* Queue the message and send it later. */
- add_to_be_sent (sock, message, 0, FALSE);
- ret = message->length;
+ add_to_be_sent (sock, message, 0, message_len, FALSE);
+ ret = message_len;
}
return ret;
if (gerr != NULL &&
g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
GOutputVector local_buf = { tbs->buf, tbs->length };
- NiceOutputMessage local_message = {&local_buf, 1, local_buf.size};
+ NiceOutputMessage local_message = {&local_buf, 1};
- add_to_be_sent (sock, &local_message, 0, TRUE);
+ add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
free_to_be_sent (tbs);
g_error_free (gerr);
break;
g_error_free (gerr);
} else if (ret < (int) tbs->length) {
GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
- NiceOutputMessage local_message = {&local_buf, 1, local_buf.size};
+ NiceOutputMessage local_message = {&local_buf, 1};
- add_to_be_sent (sock, &local_message, 0, TRUE);
+ add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
free_to_be_sent (tbs);
break;
}
/* Queue data starting at byte offset @message_offset from @message’s
- * buffers. */
+ * buffers.
+ *
+ * Returns the message's length */
static void
add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
- gsize message_offset, gboolean head)
+ gsize message_offset, gsize message_len, gboolean head)
{
TcpPriv *priv = sock->priv;
struct to_be_sent *tbs;
guint j;
gsize offset = 0;
- if (message_offset >= message->length)
+ if (message_offset >= message_len)
return;
tbs = g_slice_new0 (struct to_be_sent);
- tbs->buf = g_malloc (message->length - message_offset);
- tbs->length = message->length - message_offset;
+ tbs->length = message_len - message_offset;
+ tbs->buf = g_malloc (tbs->length);
tbs->can_drop = !head;
if (head)
local_bufs = g_malloc_n (n_bufs + 2, sizeof (GOutputVector));
local_message.buffers = local_bufs;
local_message.n_buffers = n_bufs + 2;
- local_message.length = message->length;
/* Copy the existing buffers across. */
for (j = 0; j < n_bufs; j++) {
/* Header buffer. */
if (priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_GOOGLE) {
- header_buf = htons (message->length);
+ header_buf = htons (output_message_get_size (message));
local_bufs[0].buffer = &header_buf;
local_bufs[0].size = sizeof (header_buf);
- local_message.length += sizeof (header_buf);
} else {
/* Skip over the allocated header buffer. */
local_message.buffers++;
/* Tail buffer. */
if (priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_DRAFT9 ||
priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_RFC5766) {
- gsize padlen = (message->length % 4) ? 4 - (message->length % 4) : 0;
+ gsize message_len = output_message_get_size (message);
+ gsize padlen = (message_len % 4) ? 4 - (message_len % 4) : 0;
local_bufs[n_bufs].buffer = &padbuf;
local_bufs[n_bufs].size = padlen;
- local_message.length += padlen;
} else {
/* Skip over the allocated tail buffer. */
local_message.n_buffers--;
g_free (local_bufs);
if (ret == 1)
- return local_message.length;
+ return output_message_get_size (&local_message);
return ret;
}
if (binding) {
if (priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_DRAFT9 ||
priv->compatibility == NICE_TURN_SOCKET_COMPATIBILITY_RFC5766) {
- if (message->length + sizeof(uint32_t) <= sizeof(buffer)) {
+ gsize message_len = output_message_get_size (message);
+
+ if (message_len + sizeof(uint32_t) <= sizeof(buffer)) {
guint j;
uint16_t len16, channel16;
gsize message_offset = 0;
- len16 = htons ((uint16_t) message->length);
+ len16 = htons ((uint16_t) message_len);
channel16 = htons (binding->channel);
memcpy (buffer, &channel16, sizeof(uint16_t));
- memcpy (buffer + sizeof(uint16_t), &len16,sizeof(uint16_t));
+ memcpy (buffer + sizeof(uint16_t), &len16, sizeof(uint16_t));
/* FIXME: Slow path! This should be replaced by code which manipulates
* the GOutputVector array, rather than the buffer contents
const GOutputVector *out_buf = &message->buffers[j];
gsize out_len;
- out_len = MIN (message->length - message_offset, out_buf->size);
+ out_len = MIN (message_len - message_offset, out_buf->size);
memcpy (buffer + sizeof (uint32_t) + message_offset,
out_buf->buffer, out_len);
message_offset += out_len;
}
- msg_len = message->length + sizeof(uint32_t);
+ msg_len = message_len + sizeof(uint32_t);
} else {
return 0;
}
} else {
- NiceOutputMessage local_message = {
- message->buffers, message->n_buffers, message->length
- };
-
ret = nice_socket_send_messages (priv->base_socket, &priv->server_addr,
- &local_message, 1);
+ message, 1);
if (ret == 1)
- return message->length;
+ return output_message_get_size (message);
return ret;
}
} else {
return msg_len;
} else {
GOutputVector local_buf = { buffer, msg_len };
- NiceOutputMessage local_message = {&local_buf, 1, msg_len};
+ NiceOutputMessage local_message = {&local_buf, 1};
ret = nice_socket_send_messages (priv->base_socket, &priv->server_addr,
&local_message, 1);
/* Error condition pass through to the base socket. */
ret = nice_socket_send_messages (priv->base_socket, to, message, 1);
if (ret == 1)
- return message->length;
+ return output_message_get_size (message);
return ret;
}
message->buffers[0].buffer != NULL &&
message->buffers[1].buffer == NULL)) {
/* Fast path. Single massive buffer. */
- g_assert_cmpuint (message->length, <=, message->buffers[0].size);
-
len = nice_turn_socket_parse_recv (sock, from_sock,
message->from, message->length, message->buffers[0].buffer,
message->from, message->buffers[0].buffer, message->length);
send_messages[i].buffers = send_bufs + i * n_bufs_per_message;
send_messages[i].n_buffers = n_bufs_per_message;
- send_messages[i].length = 0;
}
/* Set up the receive buffers. Yay for dynamic tests! */
NiceOutputMessage *message = &((*messages)[i]);
guint j;
gsize max_message_size;
+ gsize message_len = 0;
message->n_buffers =
generate_buffer_count (test_data->transmit.buffer_count_strategy,
test_data->transmit_size_rand, buffer_offset);
message->buffers = g_malloc_n (message->n_buffers, sizeof (GOutputVector));
- message->length = 0;
/* Limit the overall message size to the smaller of (n_bytes / n_messages)
* and MAX_MESSAGE_SIZE, to ensure each message is non-empty. */
buf_len =
MIN (buf_len,
test_data->n_bytes - test_data->transmitted_bytes - total_buf_len);
- buf_len = MIN (buf_len, max_message_size - message->length);
+ buf_len = MIN (buf_len, max_message_size - message_len);
buffer->size = buf_len;
buf = g_malloc (buffer->size);
buffer->buffer = buf;
- message->length += buf_len;
+ message_len += buf_len;
total_buf_len += buf_len;
/* Fill it with data. */
buffer_offset += buf_len;
/* Reached the maximum UDP payload size? */
- if (message->length >= max_message_size) {
+ if (message_len >= max_message_size) {
message->n_buffers = j + 1;
break;
}
}
- g_assert_cmpuint (message->length, <=, max_message_size);
+ g_assert_cmpuint (message_len, <=, max_message_size);
}
}
g_free (*buf);
}
+static gsize
+output_message_get_size (const NiceOutputMessage *message)
+{
+ guint i;
+ gsize message_len = 0;
+
+ /* Find the total size of the message */
+ for (i = 0;
+ (message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
+ (message->n_buffers < 0 && message->buffers[i].buffer != NULL);
+ i++)
+ message_len += message->buffers[i].size;
+
+ return message_len;
+}
+
/* Similar to notify_transmitted_buffer(), except it operates on an array of
* messages from generate_messages_to_transmit(). */
static void
guint j;
if (i < (guint) n_sent_messages)
- test_data->transmitted_bytes += message->length;
+ test_data->transmitted_bytes += output_message_get_size (message);
for (j = 0; j < (guint) message->n_buffers; j++) {
GOutputVector *buffer = &message->buffers[j];