2 * This file is part of the Nice GLib ICE library.
4 * (C) 2014 Collabora Ltd.
5 * Contact: Philip Withnall
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
17 * The Original Code is the Nice GLib ICE library.
19 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
20 * Corporation. All Rights Reserved.
23 * Philip Withnall, Collabora Ltd.
25 * Alternatively, the contents of this file may be used under the terms of the
26 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
27 * case the provisions of LGPL are applicable instead of those above. If you
28 * wish to allow use of your version of this file only under the terms of the
29 * LGPL and not to allow others to use your version of this file under the
30 * MPL, indicate your decision by deleting the provisions above and replace
31 * them with the notice and other provisions required by the LGPL. If you do
32 * not delete the provisions above, a recipient may use your version of this
33 * file under either the MPL or the LGPL.
37 * This is a comprehensive unit test for send() and recv() behaviour in libnice,
38 * covering all APIs except the old nice_agent_attach_recv() one. It aims to
39 * test the correctness of reliable and non-reliable I/O through libnice, using
40 * a variety of data and a variety of buffer sizes.
42 * Abnormal features like error handling, zero-length buffer handling, stream
43 * closure and cancellation are not tested.
45 * This is *not* a performance test, and would require significant work to be
46 * useful as one. It allocates all of its buffers dynamically, and walks over
47 * them frequently to set and check data.
49 * Several of the strategies in the test make use of random numbers. The seed
50 * values for these are deterministically set (in main()), but may be specified
51 * on the command line to allow fuzzing.
59 #include "test-io-stream-common.h"
67 /* Maximum IP payload ((1 << 16) - 1), minus IP header, minus UDP header. */
68 #define MAX_MESSAGE_SIZE (65535 - 20 - 8) /* bytes */
71 STREAM_AGENT, /* nice_agent_[send|recv]() */
72 STREAM_AGENT_NONBLOCKING, /* nice_agent_[send|recv]_nonblocking() */
73 STREAM_GIO, /* Nice[Input|Output]Stream */
74 STREAM_GSOURCE, /* GPollable[Input|Output]Stream */
76 #define STREAM_API_N_ELEMENTS (STREAM_GSOURCE + 1)
79 BUFFER_SIZE_CONSTANT_LARGE, /* always 65535 bytes */
80 BUFFER_SIZE_CONSTANT_SMALL, /* always 4096 bytes */
81 BUFFER_SIZE_CONSTANT_TINY, /* always 1 byte */
82 BUFFER_SIZE_ASCENDING, /* ascending powers of 2 */
83 BUFFER_SIZE_RANDOM, /* random every time */
85 #define BUFFER_SIZE_STRATEGY_N_ELEMENTS (BUFFER_SIZE_RANDOM + 1)
88 BUFFER_COUNT_CONSTANT_ONE, /* always a single buffer */
89 BUFFER_COUNT_CONSTANT_TWO, /* always two buffers */
90 BUFFER_COUNT_RANDOM, /* random every time */
91 } BufferCountStrategy;
92 #define BUFFER_COUNT_STRATEGY_N_ELEMENTS (BUFFER_COUNT_RANDOM + 1)
95 MESSAGE_COUNT_CONSTANT_ONE, /* always a single message */
96 MESSAGE_COUNT_CONSTANT_TWO, /* always two messages */
97 MESSAGE_COUNT_RANDOM, /* random every time */
98 } MessageCountStrategy;
99 #define MESSAGE_COUNT_STRATEGY_N_ELEMENTS (MESSAGE_COUNT_RANDOM + 1)
102 BUFFER_DATA_CONSTANT, /* fill with 0xfe */
103 BUFFER_DATA_ASCENDING, /* ascending values for each byte */
104 BUFFER_DATA_PSEUDO_RANDOM, /* every byte is pseudo-random */
105 } BufferDataStrategy;
106 #define BUFFER_DATA_STRATEGY_N_ELEMENTS (BUFFER_DATA_PSEUDO_RANDOM + 1)
109 /* Test configuration (immutable per test run). */
111 StreamApi stream_api;
113 BufferSizeStrategy buffer_size_strategy;
114 BufferCountStrategy buffer_count_strategy;
115 MessageCountStrategy message_count_strategy;
118 BufferSizeStrategy buffer_size_strategy;
119 BufferCountStrategy buffer_count_strategy;
120 MessageCountStrategy message_count_strategy;
122 BufferDataStrategy buffer_data_strategy;
127 GRand *transmit_size_rand;
128 GRand *receive_size_rand;
129 gsize transmitted_bytes;
130 gsize received_bytes;
131 gsize *other_received_bytes;
132 guint transmitted_messages;
133 guint received_messages;
134 guint *other_received_messages;
137 /* Whether @stream_api is blocking (vs. non-blocking). */
139 stream_api_is_blocking (StreamApi stream_api)
141 switch (stream_api) {
145 case STREAM_AGENT_NONBLOCKING:
149 g_assert_not_reached ();
153 /* Whether @stream_api only works for reliable NiceAgents. */
155 stream_api_is_reliable_only (StreamApi stream_api)
157 switch (stream_api) {
162 case STREAM_AGENT_NONBLOCKING:
165 g_assert_not_reached ();
169 /* Whether @stream_api supports vectored I/O (multiple buffers or messages). */
171 stream_api_supports_vectored_io (StreamApi stream_api)
173 switch (stream_api) {
175 case STREAM_AGENT_NONBLOCKING:
181 g_assert_not_reached ();
185 /* Generate a size for the buffer containing the @buffer_offset-th byte.
186 * Guaranteed to be in the interval [1, 1 << 16). ((1 << 16) is the maximum
189 generate_buffer_size (BufferSizeStrategy strategy, GRand *grand,
193 case BUFFER_SIZE_CONSTANT_LARGE:
194 return (1 << 16) - 1;
196 case BUFFER_SIZE_CONSTANT_SMALL:
199 case BUFFER_SIZE_CONSTANT_TINY:
202 case BUFFER_SIZE_ASCENDING:
203 return CLAMP (1L << buffer_offset, 1, (1 << 16) - 1);
205 case BUFFER_SIZE_RANDOM:
206 return g_rand_int_range (grand, 1, 1 << 16);
209 g_assert_not_reached ();
213 /* Generate a number of buffers to allocate when receiving the @buffer_offset-th
214 * byte. Guaranteed to be in the interval [1, 100], where 100 was chosen
217 generate_buffer_count (BufferCountStrategy strategy, GRand *grand,
221 case BUFFER_COUNT_CONSTANT_ONE:
224 case BUFFER_COUNT_CONSTANT_TWO:
227 case BUFFER_COUNT_RANDOM:
228 return g_rand_int_range (grand, 1, 100 + 1);
231 g_assert_not_reached ();
235 /* Generate a number of messages to allocate and receive into when receiving the
236 * @buffer_offset-th byte. Guaranteed to be in the interval [1, 100], where 100
237 * was chosen arbitrarily.*/
239 generate_message_count (MessageCountStrategy strategy, GRand *grand,
243 case MESSAGE_COUNT_CONSTANT_ONE:
246 case MESSAGE_COUNT_CONSTANT_TWO:
249 case MESSAGE_COUNT_RANDOM:
250 return g_rand_int_range (grand, 1, 100 + 1);
253 g_assert_not_reached ();
257 /* Fill the given @buf with @buf_len bytes of generated data. The data is
258 * deterministically generated, so that:
259 * generate_buffer_data(_, I, buf, 2)
261 * generate_buffer_data(_, I+1, buf+1, 1)
262 * generate the same buf[I+1] byte, for all I.
264 * The generation strategies are generally chosen to produce data which makes
265 * send/receive errors (insertions, swaps, elisions) obvious. */
267 generate_buffer_data (BufferDataStrategy strategy, gsize buffer_offset,
268 guint8 *buf, gsize buf_len)
271 case BUFFER_DATA_CONSTANT:
272 memset (buf, 0xfe, buf_len);
275 case BUFFER_DATA_ASCENDING: {
278 for (i = 0; i < buf_len; i++) {
279 buf[i] = (i + buffer_offset) & 0xff;
285 case BUFFER_DATA_PSEUDO_RANDOM: {
288 /* This can’t use GRand, because then the number of calls to g_rand_*()
289 * methods would affect its output, and the bytes generated here have to be
290 * entirely deterministic on @buffer_offset.
292 * Instead, use something akin to a LCG, except without any feedback
293 * (because that would make it non-deterministic). The objective is to
294 * generate numbers which are sufficiently pseudo-random that it’s likely
295 * transpositions, elisions and insertions will be detected.
297 * The constants come from ‘ANSI C’ in:
298 * http://en.wikipedia.org/wiki/Linear_congruential_generator
300 for (i = 0; i < buf_len; i++) {
301 buf[i] = (1103515245 * (buffer_offset + i) + 12345) & 0xff;
308 g_assert_not_reached ();
312 /* Choose a size and allocate a receive buffer in @buf, ready to receive bytes
313 * starting at @buffer_offset into the stream. Fill the buffer with poison
314 * values to hopefully make incorrect writes/reads more obvious.
316 * @buf must be freed with g_free(). */
318 generate_buffer_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
319 guint8 **buf, gsize *buf_len)
321 TestData *test_data = data->user_data;
323 /* Allocate the buffer. */
324 *buf_len = generate_buffer_size (test_data->receive.buffer_size_strategy,
325 test_data->receive_size_rand, buffer_offset);
326 *buf = g_malloc (*buf_len);
328 /* Fill it with poison to try and detect incorrect writes. */
329 memset (*buf, 0xaa, *buf_len);
332 /* Similar to generate_buffer_to_receive(), but generate an entire message array
333 * with multiple buffers instead.
335 * @max_buffer_size may be used to limit the total size of all the buffers in
336 * all the messages, for example to avoid blocking on receiving data which will
337 * never be sent. This only applies for blocking, reliable stream APIs.
339 * @max_n_messages may be used to limit the number of messages generated, to
340 * avoid blocking on receiving messages which will never be sent. This only
341 * applies for blocking, non-reliable stream APIs.
343 * @messages must be freed with g_free(), as must all of the buffer arrays and
344 * the buffers themselves. */
346 generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
347 NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size,
348 guint max_n_messages)
350 TestData *test_data = data->user_data;
353 /* Allocate the messages. */
355 generate_message_count (test_data->receive.message_count_strategy,
356 test_data->receive_size_rand, buffer_offset);
359 *n_messages = MIN (*n_messages, max_n_messages);
361 *messages = g_malloc_n (*n_messages, sizeof (NiceInputMessage));
363 for (i = 0; i < *n_messages; i++) {
364 NiceInputMessage *message = &((*messages)[i]);
368 generate_buffer_count (test_data->receive.buffer_count_strategy,
369 test_data->receive_size_rand, buffer_offset);
370 message->buffers = g_malloc_n (message->n_buffers, sizeof (GInputVector));
371 message->from = NULL;
374 for (j = 0; j < (guint) message->n_buffers; j++) {
375 GInputVector *buffer = &message->buffers[j];
379 generate_buffer_size (test_data->receive.buffer_size_strategy,
380 test_data->receive_size_rand, buffer_offset);
382 /* Trim the buffer length if it would otherwise cause the API to block. */
383 if (data->reliable) {
384 buf_len = MIN (buf_len, max_buffer_size);
385 max_buffer_size -= buf_len;
388 buffer->size = buf_len;
389 buffer->buffer = g_malloc (buffer->size);
391 /* Fill it with poison to try and detect incorrect writes. */
392 memset (buffer->buffer, 0xaa, buffer->size);
394 /* If we’ve hit the max_buffer_size, adjust the buffer and message counts
396 if (data->reliable && max_buffer_size == 0) {
397 message->n_buffers = j + 1;
405 /* Validate the length and data of a received buffer of length @buf_len, filled
406 * with @len valid bytes. Updates the internal state machine to mark the bytes
407 * as received. This consumes @buf. */
409 validate_received_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
410 guint8 **buf, gsize buf_len, gssize len)
412 TestData *test_data = data->user_data;
413 guint8 *expected_buf;
415 g_assert_cmpint (len, <=, buf_len);
416 g_assert_cmpint (len, >=, 0);
418 if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
419 g_assert_cmpint (len, ==, buf_len);
421 /* Validate the buffer contents.
423 * Note: Buffers can only be validated up to valid_len. The buffer may
424 * have been re-used internally (e.g. by receiving a STUN message, then
425 * overwriting it with a data packet), so we can’t guarantee that the
426 * bytes beyond valid_len have been untouched. */
427 expected_buf = g_malloc (buf_len);
428 memset (expected_buf, 0xaa, buf_len);
429 generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
431 g_assert_cmpmem (*buf, len, expected_buf, len);
432 g_free (expected_buf);
434 test_data->received_bytes += len;
439 /* Similar to validate_received_buffer(), except it validates a message array
440 * instead of a single buffer. This consumes @messages. */
442 validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
443 NiceInputMessage *messages, guint n_messages, gint n_valid_messages)
445 TestData *test_data = data->user_data;
447 gsize prev_message_len = G_MAXSIZE;
449 g_assert_cmpint (n_valid_messages, <=, n_messages);
450 g_assert_cmpint (n_valid_messages, >=, 0);
452 if (stream_api_is_blocking (test_data->stream_api))
453 g_assert_cmpint (n_valid_messages, ==, n_messages);
455 test_data->received_messages += n_valid_messages;
457 /* Validate the message contents. */
458 for (i = 0; i < (guint) n_valid_messages; i++) {
459 NiceInputMessage *message = &messages[i];
461 gsize total_buf_len = 0;
462 gsize message_len_remaining = message->length;
464 g_assert_cmpint (message->n_buffers, >, 0);
466 for (j = 0; j < (guint) message->n_buffers; j++) {
467 GInputVector *buffer = &message->buffers[j];
470 /* See note above about valid_len. */
471 total_buf_len += buffer->size;
472 valid_len = MIN (message_len_remaining, buffer->size);
474 /* Only validate buffer content for reliable mode, anything could
475 * be received in UDP mode
477 if (test_data->reliable) {
478 guint8 *expected_buf;
480 expected_buf = g_malloc (buffer->size);
481 memset (expected_buf, 0xaa, buffer->size);
482 generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
483 expected_buf, valid_len);
484 g_assert_cmpmem (buffer->buffer, valid_len, expected_buf, valid_len);
485 g_free (expected_buf);
486 buffer_offset += valid_len;
487 message_len_remaining -= valid_len;
489 test_data->received_bytes += valid_len;
492 g_assert_cmpuint (message->length, <=, total_buf_len);
493 g_assert_cmpuint (message->length, >=, 0);
495 /* No non-empty messages can follow an empty message. */
496 if (prev_message_len == 0)
497 g_assert_cmpuint (message->length, ==, 0);
498 prev_message_len = message->length;
500 /* If the API was blocking, it should have completely filled the message. */
501 if (stream_api_is_blocking (test_data->stream_api) && data->reliable)
502 g_assert_cmpuint (message->length, ==, total_buf_len);
504 g_assert (message->from == NULL);
507 /* Free all messages. */
508 for (i = 0; i < (guint) n_messages; i++) {
509 NiceInputMessage *message = &messages[i];
512 for (j = 0; j < (guint) message->n_buffers; j++) {
513 GInputVector *buffer = &message->buffers[j];
515 g_free (buffer->buffer);
518 g_free (message->buffers);
524 /* Determine a size for the next transmit buffer, allocate it, and fill it with
525 * data to be transmitted. */
527 generate_buffer_to_transmit (TestIOStreamThreadData *data, gsize buffer_offset,
528 guint8 **buf, gsize *buf_len)
530 TestData *test_data = data->user_data;
532 /* Allocate the buffer. */
533 *buf_len = generate_buffer_size (test_data->transmit.buffer_size_strategy,
534 test_data->transmit_size_rand, buffer_offset);
535 *buf_len = MIN (*buf_len, test_data->n_bytes - test_data->transmitted_bytes);
536 *buf = g_malloc (*buf_len);
538 /* Fill it with data. */
539 generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
543 /* Similar to generate_buffer_to_transmit(), except that it generates an array
544 * of NiceOutputMessages rather than a single buffer. */
546 generate_messages_to_transmit (TestIOStreamThreadData *data,
547 gsize buffer_offset, NiceOutputMessage **messages, guint *n_messages)
549 TestData *test_data = data->user_data;
551 gsize total_buf_len = 0;
553 /* Determine the number of messages to send. */
555 generate_message_count (test_data->transmit.message_count_strategy,
556 test_data->transmit_size_rand, buffer_offset);
559 test_data->n_messages - test_data->transmitted_messages);
561 *messages = g_malloc_n (*n_messages, sizeof (NiceOutputMessage));
563 for (i = 0; i < *n_messages; i++) {
564 NiceOutputMessage *message = &((*messages)[i]);
566 gsize max_message_size;
567 gsize message_len = 0;
570 generate_buffer_count (test_data->transmit.buffer_count_strategy,
571 test_data->transmit_size_rand, buffer_offset);
572 message->buffers = g_malloc_n (message->n_buffers, sizeof (GOutputVector));
574 /* Limit the overall message size to the smaller of (n_bytes / n_messages)
575 * and MAX_MESSAGE_SIZE, to ensure each message is non-empty. */
577 MIN ((test_data->n_bytes / test_data->n_messages), MAX_MESSAGE_SIZE);
579 for (j = 0; j < (guint) message->n_buffers; j++) {
580 GOutputVector *buffer = &message->buffers[j];
585 generate_buffer_size (test_data->transmit.buffer_size_strategy,
586 test_data->transmit_size_rand, buffer_offset);
589 test_data->n_bytes - test_data->transmitted_bytes - total_buf_len);
590 buf_len = MIN (buf_len, max_message_size - message_len);
592 buffer->size = buf_len;
593 buf = g_malloc (buffer->size);
594 buffer->buffer = buf;
595 message_len += buf_len;
596 total_buf_len += buf_len;
598 /* Fill it with data. */
599 generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
602 buffer_offset += buf_len;
604 /* Reached the maximum UDP payload size? */
605 if (message_len >= max_message_size) {
606 message->n_buffers = j + 1;
611 g_assert_cmpuint (message_len, <=, max_message_size);
615 /* Validate the number of bytes transmitted, and update the test’s internal
616 * state machine. Consumes @buf. */
618 notify_transmitted_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
619 guint8 **buf, gsize buf_len, gssize len)
621 TestData *test_data = data->user_data;
623 g_assert_cmpint (len, <=, buf_len);
624 g_assert_cmpint (len, >=, 0);
626 test_data->transmitted_bytes += len;
632 output_message_get_size (const NiceOutputMessage *message)
635 gsize message_len = 0;
637 /* Find the total size of the message */
639 (message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
640 (message->n_buffers < 0 && message->buffers[i].buffer != NULL);
642 message_len += message->buffers[i].size;
647 /* Similar to notify_transmitted_buffer(), except it operates on an array of
648 * messages from generate_messages_to_transmit(). */
650 notify_transmitted_messages (TestIOStreamThreadData *data, gsize buffer_offset,
651 NiceOutputMessage **messages, guint n_messages, gint n_sent_messages)
653 TestData *test_data = data->user_data;
656 g_assert_cmpint (n_sent_messages, <=, n_messages);
657 g_assert_cmpint (n_sent_messages, >=, 0);
659 test_data->transmitted_messages += n_sent_messages;
661 for (i = 0; i < n_messages; i++) {
662 NiceOutputMessage *message = &((*messages)[i]);
665 if (i < (guint) n_sent_messages)
666 test_data->transmitted_bytes += output_message_get_size (message);
668 for (j = 0; j < (guint) message->n_buffers; j++) {
669 GOutputVector *buffer = &message->buffers[j];
671 g_free ((guint8 *) buffer->buffer);
674 g_free (message->buffers);
681 * Implementation using nice_agent_recv_messages() and nice_agent_send().
684 read_thread_agent_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
686 TestData *test_data = data->user_data;
687 guint stream_id, component_id;
690 tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
691 stream_id = GPOINTER_TO_UINT (tmp);
694 while (test_data->received_bytes < test_data->n_bytes) {
695 GError *error = NULL;
696 NiceInputMessage *messages;
698 gint n_valid_messages;
700 /* Initialise an array of messages to receive into. */
701 generate_messages_to_receive (data, test_data->received_bytes, &messages,
702 &n_messages, test_data->n_bytes - test_data->received_bytes,
703 test_data->n_messages - test_data->received_messages);
705 /* Block on receiving some data. */
706 n_valid_messages = nice_agent_recv_messages (data->agent, stream_id,
707 component_id, messages, n_messages, NULL, &error);
708 g_assert_no_error (error);
710 /* Check the messages and update the test’s state machine. */
711 validate_received_messages (data, test_data->received_bytes, messages,
712 n_messages, n_valid_messages);
715 check_for_termination (data, &test_data->received_bytes,
716 test_data->other_received_bytes, &test_data->transmitted_bytes,
721 write_thread_agent_cb (GOutputStream *output_stream,
722 TestIOStreamThreadData *data)
724 TestData *test_data = data->user_data;
725 guint stream_id, component_id;
728 tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
729 stream_id = GPOINTER_TO_UINT (tmp);
732 while (test_data->transmitted_bytes < test_data->n_bytes) {
733 GError *error = NULL;
734 NiceOutputMessage *messages;
736 gint n_sent_messages;
738 /* Generate a buffer to transmit. */
739 generate_messages_to_transmit (data, test_data->transmitted_bytes,
740 &messages, &n_messages);
742 /* Busy loop on receiving some data. */
744 g_clear_error (&error);
745 n_sent_messages = nice_agent_send_messages_nonblocking (data->agent,
746 stream_id, component_id, messages, n_messages, NULL, &error);
747 } while (n_sent_messages == -1 &&
748 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
749 g_assert_no_error (error);
751 /* Update the test’s buffer generation state machine. */
752 notify_transmitted_messages (data, test_data->transmitted_bytes, &messages,
753 n_messages, n_sent_messages);
758 * Implementation using nice_agent_recv_nonblocking() and
759 * nice_agent_send_nonblocking().
762 read_thread_agent_nonblocking_cb (GInputStream *input_stream,
763 TestIOStreamThreadData *data)
765 TestData *test_data = data->user_data;
766 guint stream_id, component_id;
769 tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
770 stream_id = GPOINTER_TO_UINT (tmp);
773 while (test_data->received_bytes < test_data->n_bytes) {
774 GError *error = NULL;
775 NiceInputMessage *messages;
777 gint n_valid_messages;
779 /* Initialise an array of messages to receive into. */
780 generate_messages_to_receive (data, test_data->received_bytes, &messages,
781 &n_messages, test_data->n_bytes - test_data->received_bytes,
782 test_data->n_messages - test_data->received_messages);
784 /* Trim n_messages to avoid consuming the ‘done’ message. */
786 MIN (n_messages, test_data->n_messages - test_data->received_messages);
788 /* Busy loop on receiving some data. */
790 g_clear_error (&error);
791 n_valid_messages = nice_agent_recv_messages_nonblocking (data->agent,
792 stream_id, component_id, messages, n_messages, NULL, &error);
793 } while (n_valid_messages == -1 &&
794 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
795 g_assert_no_error (error);
797 /* Check the messages and update the test’s state machine. */
798 validate_received_messages (data, test_data->received_bytes, messages,
799 n_messages, n_valid_messages);
802 check_for_termination (data, &test_data->received_bytes,
803 test_data->other_received_bytes, &test_data->transmitted_bytes,
808 wait_transmission_cb (NiceAgent *agent)
813 GInputVector v = { &buffer, sizeof (buffer) };
814 NiceInputMessage message = { &v, 1, NULL, 0};
816 tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
817 stream_id = GPOINTER_TO_UINT (tmp);
819 /* While waiting for write thread to finish sending, keep also receiving so
820 * that any STUN messages from the peer still get processed. */
821 nice_agent_recv_messages_nonblocking (agent, stream_id, 1, &message, 1, NULL,
826 write_thread_agent_nonblocking_cb (GOutputStream *output_stream,
827 TestIOStreamThreadData *data)
829 /* FIXME: There is no nice_agent_send_nonblocking(); nice_agent_send() is
830 * non-blocking by default. */
831 write_thread_agent_cb (output_stream, data);
835 * Implementation using NiceInputStream and NiceOutputStream.
838 read_thread_gio_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
840 TestData *test_data = data->user_data;
842 while (test_data->received_bytes < test_data->n_bytes) {
843 GError *error = NULL;
848 /* Initialise a receive buffer. */
849 generate_buffer_to_receive (data, test_data->received_bytes, &buf,
852 /* Trim the receive buffer to avoid blocking on bytes which will never
854 buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
856 /* Block on receiving some data. */
857 len = g_input_stream_read (input_stream, buf, buf_len, NULL, &error);
858 g_assert_no_error (error);
860 /* Check the buffer and update the test’s state machine. */
861 validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
865 check_for_termination (data, &test_data->received_bytes,
866 test_data->other_received_bytes, &test_data->transmitted_bytes,
871 write_thread_gio_cb (GOutputStream *output_stream, TestIOStreamThreadData *data)
873 TestData *test_data = data->user_data;
875 while (test_data->transmitted_bytes < test_data->n_bytes) {
876 GError *error = NULL;
882 /* Generate a buffer to transmit. */
883 generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf,
888 len = g_output_stream_write (output_stream, buf + total_len,
889 buf_len - total_len, NULL, &error);
890 g_assert_no_error (error);
892 } while (total_len < buf_len);
894 /* Update the test’s buffer generation state machine. */
895 notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf,
901 * Implementation using GPollableInputStream and GPollableOutputStream.
903 * GSourceData is effectively the closure for the ‘for’ loop in other stream API
907 TestIOStreamThreadData *data;
908 GMainLoop *main_loop;
912 read_stream_cb (GObject *pollable_stream, gpointer _user_data)
914 GSourceData *gsource_data = _user_data;
915 TestIOStreamThreadData *data = gsource_data->data;
916 TestData *test_data = data->user_data;
917 GError *error = NULL;
922 /* Initialise a receive buffer. */
923 generate_buffer_to_receive (data, test_data->received_bytes, &buf, &buf_len);
925 /* Trim the receive buffer to avoid consuming the ‘done’ message. */
926 buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
928 /* Try to receive some data. */
929 len = g_pollable_input_stream_read_nonblocking (
930 G_POLLABLE_INPUT_STREAM (pollable_stream), buf, buf_len, NULL, &error);
933 g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
934 g_error_free (error);
936 return G_SOURCE_CONTINUE;
939 g_assert_no_error (error);
941 /* Check the buffer and update the test’s state machine. */
942 validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
945 /* Termination time? */
946 if (test_data->received_bytes == test_data->n_bytes) {
947 g_main_loop_quit (gsource_data->main_loop);
948 return G_SOURCE_REMOVE;
951 return G_SOURCE_CONTINUE;
955 read_thread_gsource_cb (GInputStream *input_stream,
956 TestIOStreamThreadData *data)
958 TestData *test_data = data->user_data;
959 GSourceData gsource_data;
960 GMainContext *main_context;
961 GMainLoop *main_loop;
962 GSource *stream_source;
964 main_context = g_main_context_ref_thread_default ();
965 main_loop = g_main_loop_new (main_context, FALSE);
967 gsource_data.data = data;
968 gsource_data.main_loop = main_loop;
971 g_pollable_input_stream_create_source (
972 G_POLLABLE_INPUT_STREAM (input_stream), NULL);
974 g_source_set_callback (stream_source, G_SOURCE_FUNC (read_stream_cb),
975 &gsource_data, NULL);
976 g_source_attach (stream_source, main_context);
978 /* Run the main loop. */
979 g_main_loop_run (main_loop);
981 g_source_destroy (stream_source);
982 g_source_unref (stream_source);
983 g_main_loop_unref (main_loop);
984 g_main_context_unref (main_context);
987 check_for_termination (data, &test_data->received_bytes,
988 test_data->other_received_bytes, &test_data->transmitted_bytes,
993 write_stream_cb (GObject *pollable_stream, gpointer _user_data)
995 GSourceData *gsource_data = _user_data;
996 TestIOStreamThreadData *data = gsource_data->data;
997 TestData *test_data = data->user_data;
998 GError *error = NULL;
1004 /* Initialise a receive buffer. */
1005 generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf,
1008 /* Try to transmit some data. */
1009 len = g_pollable_output_stream_write_nonblocking (
1010 G_POLLABLE_OUTPUT_STREAM (pollable_stream), buf, buf_len, NULL, &error);
1013 g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
1015 return G_SOURCE_CONTINUE;
1018 g_assert_no_error (error);
1020 /* Update the test’s buffer generation state machine. */
1021 notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf, buf_len,
1024 /* Termination time? */
1025 if (test_data->transmitted_bytes == test_data->n_bytes) {
1026 g_main_loop_quit (gsource_data->main_loop);
1031 return G_SOURCE_REMOVE;
1035 write_thread_gsource_cb (GOutputStream *output_stream,
1036 TestIOStreamThreadData *data)
1038 GSourceData gsource_data;
1039 GMainContext *main_context;
1040 GMainLoop *main_loop;
1041 GSource *stream_source;
1043 main_context = g_main_context_ref_thread_default ();
1044 main_loop = g_main_loop_new (main_context, FALSE);
1046 gsource_data.data = data;
1047 gsource_data.main_loop = main_loop;
1050 g_pollable_output_stream_create_source (
1051 G_POLLABLE_OUTPUT_STREAM (output_stream), NULL);
1053 g_source_set_callback (stream_source, G_SOURCE_FUNC (write_stream_cb),
1054 &gsource_data, NULL);
1055 g_source_attach (stream_source, main_context);
1057 /* Run the main loop. */
1058 g_main_loop_run (main_loop);
1060 g_source_destroy (stream_source);
1061 g_source_unref (stream_source);
1062 g_main_loop_unref (main_loop);
1063 g_main_context_unref (main_context);
1067 test_data_init (TestData *data, gboolean reliable, StreamApi stream_api,
1068 gsize n_bytes, guint n_messages,
1069 BufferSizeStrategy transmit_buffer_size_strategy,
1070 BufferCountStrategy transmit_buffer_count_strategy,
1071 MessageCountStrategy transmit_message_count_strategy,
1072 BufferSizeStrategy receive_buffer_size_strategy,
1073 BufferCountStrategy receive_buffer_count_strategy,
1074 MessageCountStrategy receive_message_count_strategy,
1075 BufferDataStrategy buffer_data_strategy, guint32 transmit_seed,
1076 guint32 receive_seed, gsize *other_received_bytes,
1077 guint *other_received_messages)
1079 data->reliable = reliable;
1080 data->stream_api = stream_api;
1081 data->n_bytes = n_bytes;
1082 data->n_messages = n_messages;
1083 data->transmit.buffer_size_strategy = transmit_buffer_size_strategy;
1084 data->transmit.buffer_count_strategy = transmit_buffer_count_strategy;
1085 data->transmit.message_count_strategy = transmit_message_count_strategy;
1086 data->receive.buffer_size_strategy = receive_buffer_size_strategy;
1087 data->receive.buffer_count_strategy = receive_buffer_count_strategy;
1088 data->receive.message_count_strategy = receive_message_count_strategy;
1089 data->buffer_data_strategy = buffer_data_strategy;
1090 data->transmit_size_rand = g_rand_new_with_seed (transmit_seed);
1091 data->receive_size_rand = g_rand_new_with_seed (receive_seed);
1092 data->transmitted_bytes = 0;
1093 data->received_bytes = 0;
1094 data->other_received_bytes = other_received_bytes;
1095 data->transmitted_messages = 0;
1096 data->received_messages = 0;
1097 data->other_received_messages = other_received_messages;
1104 test_data_clear (TestData *data)
1106 g_rand_free (data->receive_size_rand);
1107 g_rand_free (data->transmit_size_rand);
1111 test (gboolean reliable, StreamApi stream_api, gsize n_bytes, guint n_messages,
1112 BufferSizeStrategy transmit_buffer_size_strategy,
1113 BufferCountStrategy transmit_buffer_count_strategy,
1114 MessageCountStrategy transmit_message_count_strategy,
1115 BufferSizeStrategy receive_buffer_size_strategy,
1116 BufferCountStrategy receive_buffer_count_strategy,
1117 MessageCountStrategy receive_message_count_strategy,
1118 BufferDataStrategy buffer_data_strategy,
1119 guint32 transmit_seed, guint32 receive_seed,
1120 guint deadlock_timeout)
1122 TestData l_data, r_data;
1124 /* Indexed by StreamApi. */
1125 const TestIOStreamCallbacks callbacks[] = {
1126 { read_thread_agent_cb,
1127 write_thread_agent_cb, NULL, NULL, wait_transmission_cb }, /* STREAM_AGENT */
1128 { read_thread_agent_nonblocking_cb, write_thread_agent_nonblocking_cb,
1129 NULL, NULL, wait_transmission_cb }, /* STREAM_AGENT_NONBLOCKING */
1130 { read_thread_gio_cb, write_thread_gio_cb, NULL, NULL, NULL}, /* STREAM_GIO */
1131 { read_thread_gsource_cb, write_thread_gsource_cb,
1132 NULL, NULL, NULL }, /* STREAM_GSOURCE */
1135 test_data_init (&l_data, reliable, stream_api, n_bytes, n_messages,
1136 transmit_buffer_size_strategy, transmit_buffer_count_strategy,
1137 transmit_message_count_strategy, receive_buffer_size_strategy,
1138 receive_buffer_count_strategy, receive_message_count_strategy,
1139 buffer_data_strategy, transmit_seed, receive_seed,
1140 &r_data.received_bytes, &r_data.received_messages);
1141 test_data_init (&r_data, reliable, stream_api, n_bytes, n_messages,
1142 transmit_buffer_size_strategy, transmit_buffer_count_strategy,
1143 transmit_message_count_strategy, receive_buffer_size_strategy,
1144 receive_buffer_count_strategy, receive_message_count_strategy,
1145 buffer_data_strategy, transmit_seed, receive_seed,
1146 &l_data.received_bytes, &l_data.received_messages);
1148 run_io_stream_test (deadlock_timeout, reliable, &callbacks[stream_api],
1149 &l_data, NULL, &r_data, NULL);
1151 test_data_clear (&r_data);
1152 test_data_clear (&l_data);
1155 /* Options with default values. */
1156 guint32 option_transmit_seed = 0;
1157 guint32 option_receive_seed = 0;
1158 gsize option_n_bytes = 10000;
1159 guint option_n_messages = 50;
1160 guint option_timeout = 150; /* seconds */
1161 gboolean option_long_mode = FALSE;
1163 static GOptionEntry entries[] = {
1164 { "transmit-seed", 0, 0, G_OPTION_ARG_INT, &option_transmit_seed,
1165 "Seed for transmission RNG", "S" },
1166 { "receive-seed", 0, 0, G_OPTION_ARG_INT, &option_receive_seed,
1167 "Seed for reception RNG", "S" },
1168 { "n-bytes", 'n', 0, G_OPTION_ARG_INT64, &option_n_bytes,
1169 "Number of bytes to send in each test (default 10000)", "N" },
1170 { "n-messages", 'm', 0, G_OPTION_ARG_INT64, &option_n_messages,
1171 "Number of messages to send in each test (default 50)", "M" },
1172 { "timeout", 't', 0, G_OPTION_ARG_INT, &option_timeout,
1173 "Deadlock detection timeout length, in seconds (default: 15)", "S" },
1174 { "long-mode", 'l', 0, G_OPTION_ARG_NONE, &option_long_mode,
1175 "Enable all tests, rather than a fast subset", NULL },
1180 main (int argc, char *argv[])
1183 StreamApi stream_api;
1184 BufferSizeStrategy transmit_buffer_size_strategy;
1185 BufferCountStrategy transmit_buffer_count_strategy;
1186 MessageCountStrategy transmit_message_count_strategy;
1187 BufferSizeStrategy receive_buffer_size_strategy;
1188 BufferCountStrategy receive_buffer_count_strategy;
1189 MessageCountStrategy receive_message_count_strategy;
1190 BufferDataStrategy buffer_data_strategy;
1191 guint32 transmit_seed;
1192 guint32 receive_seed;
1195 guint deadlock_timeout;
1197 GOptionContext *context;
1198 GError *error = NULL;
1200 /* Argument parsing. Allow some of the test parameters to be specified on the
1202 context = g_option_context_new ("— test send()/recv() correctness");
1203 g_option_context_add_main_entries (context, entries, NULL);
1205 if (!g_option_context_parse (context, &argc, &argv, &error)) {
1206 g_printerr ("Option parsing failed: %s\n", error->message);
1207 g_error_free (error);
1208 g_option_context_free (context);
1212 /* Set up the defaults. */
1213 transmit_seed = option_transmit_seed;
1214 receive_seed = option_receive_seed;
1215 n_bytes = option_n_bytes;
1216 n_messages = option_n_messages;
1217 deadlock_timeout = option_timeout;
1218 long_mode = option_long_mode;
1222 WSAStartup (0x0202, &w);
1226 /* Quick mode. Just test each of the stream APIs in reliable and
1227 * non-reliable mode, with a single pair of buffer strategies, and a single
1231 for (reliable = 0; reliable < 2; reliable++) {
1233 for (stream_api = 0;
1234 (guint) stream_api < STREAM_API_N_ELEMENTS;
1236 /* GIO streams must always be reliable. */
1237 if (!reliable && stream_api_is_reliable_only (stream_api))
1240 /* Non-reliable socket receives require large buffers. */
1242 receive_buffer_size_strategy = BUFFER_SIZE_RANDOM;
1244 receive_buffer_size_strategy = BUFFER_SIZE_CONSTANT_LARGE;
1247 transmit_buffer_size_strategy = BUFFER_SIZE_RANDOM;
1248 buffer_data_strategy = BUFFER_DATA_PSEUDO_RANDOM;
1250 if (stream_api_supports_vectored_io (stream_api)) {
1251 transmit_buffer_count_strategy = BUFFER_COUNT_RANDOM;
1252 transmit_message_count_strategy = MESSAGE_COUNT_RANDOM;
1253 receive_buffer_count_strategy = BUFFER_COUNT_RANDOM;
1254 receive_message_count_strategy = MESSAGE_COUNT_RANDOM;
1256 transmit_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
1257 transmit_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
1258 receive_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
1259 receive_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
1262 g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
1264 reliable, stream_api, n_bytes, n_messages,
1265 transmit_buffer_size_strategy,
1266 receive_buffer_size_strategy, buffer_data_strategy,
1267 transmit_seed, receive_seed);
1268 test (reliable, stream_api, n_bytes, n_messages,
1269 transmit_buffer_size_strategy,
1270 transmit_buffer_count_strategy, transmit_message_count_strategy,
1271 receive_buffer_size_strategy, receive_buffer_count_strategy,
1272 receive_message_count_strategy, buffer_data_strategy,
1273 transmit_seed, receive_seed,
1281 #define STRATEGY_LOOP(V, L) for (V = 0; (guint) V < L##_N_ELEMENTS; V++)
1282 STRATEGY_LOOP(transmit_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
1283 STRATEGY_LOOP(transmit_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
1284 STRATEGY_LOOP(transmit_message_count_strategy, MESSAGE_COUNT_STRATEGY)
1285 STRATEGY_LOOP(receive_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
1286 STRATEGY_LOOP(receive_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
1287 STRATEGY_LOOP(receive_message_count_strategy, MESSAGE_COUNT_STRATEGY)
1288 STRATEGY_LOOP(buffer_data_strategy, BUFFER_DATA_STRATEGY)
1290 for (reliable = 0; reliable < 2; reliable++) {
1292 for (stream_api = 0;
1293 (guint) stream_api < STREAM_API_N_ELEMENTS;
1295 /* GIO streams must always be reliable. */
1296 if (!reliable && stream_api_is_reliable_only (stream_api))
1299 /* Non-reliable socket receives require large buffers. We don’t claim to
1300 * support using them with small (< 65536B) buffers, so don’t test
1303 receive_buffer_size_strategy != BUFFER_SIZE_CONSTANT_LARGE)
1306 /* Non-reliable socket transmits will always block with huge buffers. */
1308 transmit_buffer_size_strategy == BUFFER_SIZE_CONSTANT_LARGE)
1311 /* Stream APIs which don’t support vectored I/O must not be passed
1313 if (!stream_api_supports_vectored_io (stream_api) &&
1314 (transmit_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
1315 transmit_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE ||
1316 receive_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
1317 receive_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE))
1320 g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, "
1321 "%u, %u, %u, %u, %u, %u, %u, %u)…",
1322 reliable, stream_api, n_bytes, n_messages,
1323 transmit_buffer_size_strategy,
1324 transmit_buffer_count_strategy, transmit_message_count_strategy,
1325 receive_buffer_size_strategy, receive_buffer_count_strategy,
1326 receive_message_count_strategy, buffer_data_strategy,
1327 transmit_seed, receive_seed);
1328 test (reliable, stream_api, n_bytes, n_messages,
1329 transmit_buffer_size_strategy,
1330 transmit_buffer_count_strategy, transmit_message_count_strategy,
1331 receive_buffer_size_strategy, receive_buffer_count_strategy,
1332 receive_message_count_strategy, buffer_data_strategy,
1333 transmit_seed, receive_seed,
1339 g_option_context_free (context);