2 * This file is part of the Nice GLib ICE library.
4 * (C) 2014 Collabora Ltd.
5 * Contact: Philip Withnall
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
17 * The Original Code is the Nice GLib ICE library.
19 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
20 * Corporation. All Rights Reserved.
23 * Philip Withnall, Collabora Ltd.
25 * Alternatively, the contents of this file may be used under the terms of the
26 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
27 * case the provisions of LGPL are applicable instead of those above. If you
28 * wish to allow use of your version of this file only under the terms of the
29 * LGPL and not to allow others to use your version of this file under the
30 * MPL, indicate your decision by deleting the provisions above and replace
31 * them with the notice and other provisions required by the LGPL. If you do
32 * not delete the provisions above, a recipient may use your version of this
33 * file under either the MPL or the LGPL.
40 #include "test-io-stream-common.h"
52 /* Waits about 10 seconds for @var to be NULL/FALSE */
53 #define WAIT_UNTIL_UNSET(var, context) \
58 for (i = 0; i < 13 && (var); i++) \
60 g_usleep (1000 * (1 << i)); \
61 g_main_context_iteration (context, FALSE); \
67 static gboolean timer_cb (gpointer pointer)
69 g_debug ("test-thread:%s: %p", G_STRFUNC, pointer);
71 /* note: should not be reached, abort */
72 g_debug ("ERROR: test has got stuck, aborting...");
78 wait_for_start (TestIOStreamThreadData *data)
80 g_mutex_lock (data->start_mutex);
81 (*data->start_count)--;
82 g_cond_broadcast (data->start_cond);
83 while (*data->start_count > 0)
84 g_cond_wait (data->start_cond, data->start_mutex);
85 g_mutex_unlock (data->start_mutex);
89 write_thread_cb (gpointer user_data)
91 TestIOStreamThreadData *data = user_data;
92 GMainContext *main_context;
93 GOutputStream *output_stream = NULL;
95 main_context = g_main_context_new ();
96 g_main_context_push_thread_default (main_context);
98 /* Synchronise thread starting. */
99 wait_for_start (data);
101 /* Wait for the stream to be writeable. */
102 g_mutex_lock (&data->write_mutex);
103 while (!(data->stream_open && data->stream_ready))
104 g_cond_wait (&data->write_cond, &data->write_mutex);
105 g_mutex_unlock (&data->write_mutex);
108 output_stream = g_io_stream_get_output_stream (data->io_stream);
109 data->callbacks->write_thread (output_stream, data);
111 g_main_context_pop_thread_default (main_context);
112 g_main_context_unref (main_context);
118 read_thread_cb (gpointer user_data)
120 TestIOStreamThreadData *data = user_data;
121 GMainContext *main_context;
122 GInputStream *input_stream = NULL;
124 main_context = g_main_context_new ();
125 g_main_context_push_thread_default (main_context);
127 /* Synchronise thread starting. */
128 wait_for_start (data);
131 input_stream = g_io_stream_get_input_stream (data->io_stream);
132 data->callbacks->read_thread (input_stream, data);
134 g_main_context_pop_thread_default (main_context);
135 g_main_context_unref (main_context);
141 main_thread_cb (gpointer user_data)
143 TestIOStreamThreadData *data = user_data;
145 g_main_context_push_thread_default (data->main_context);
147 /* Synchronise thread starting. */
148 wait_for_start (data);
150 /* Run the main context. */
151 g_main_loop_run (data->main_loop);
153 g_main_context_pop_thread_default (data->main_context);
159 candidate_gathering_done_cb (NiceAgent *agent, guint stream_id,
162 NiceAgent *other = g_object_get_data (G_OBJECT (agent), "other-agent");
163 gchar *ufrag = NULL, *password = NULL;
168 tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
169 id = GPOINTER_TO_UINT (tmp);
170 tmp = g_object_get_data (G_OBJECT (other), "stream-id");
171 other_id = GPOINTER_TO_UINT (tmp);
173 nice_agent_get_local_credentials (agent, id, &ufrag, &password);
174 nice_agent_set_remote_credentials (other,
175 other_id, ufrag, password);
179 cands = nice_agent_get_local_candidates (agent, id, 1);
180 g_assert (cands != NULL);
182 nice_agent_set_remote_candidates (other, other_id, 1, cands);
184 for (i = cands; i; i = i->next)
185 nice_candidate_free ((NiceCandidate *) i->data);
186 g_slist_free (cands);
190 reliable_transport_writable_cb (NiceAgent *agent, guint stream_id,
191 guint component_id, gpointer user_data)
193 TestIOStreamThreadData *data = user_data;
195 g_assert (data->reliable);
197 /* Signal writeability. */
198 g_mutex_lock (&data->write_mutex);
199 data->stream_open = TRUE;
200 g_cond_broadcast (&data->write_cond);
201 g_mutex_unlock (&data->write_mutex);
203 if (data->callbacks->reliable_transport_writable != NULL) {
204 GIOStream *io_stream;
205 GOutputStream *output_stream;
207 io_stream = g_object_get_data (G_OBJECT (agent), "io-stream");
208 g_assert (io_stream != NULL);
209 output_stream = g_io_stream_get_output_stream (io_stream);
211 data->callbacks->reliable_transport_writable (output_stream, agent,
212 stream_id, component_id, data);
217 component_state_changed_cb (NiceAgent *agent, guint stream_id,
218 guint component_id, guint state, gpointer user_data)
220 TestIOStreamThreadData *data = user_data;
222 if (state != NICE_COMPONENT_STATE_READY)
225 /* Signal stream state. */
226 g_mutex_lock (&data->write_mutex);
227 data->stream_ready = TRUE;
228 g_cond_broadcast (&data->write_cond);
229 g_mutex_unlock (&data->write_mutex);
233 new_selected_pair_cb (NiceAgent *agent, guint stream_id, guint component_id,
234 gchar *lfoundation, gchar *rfoundation, gpointer user_data)
236 TestIOStreamThreadData *data = user_data;
238 if (data->callbacks->new_selected_pair != NULL) {
239 data->callbacks->new_selected_pair (agent, stream_id, component_id,
240 lfoundation, rfoundation, data);
245 create_agent (gboolean controlling_mode, TestIOStreamThreadData *data,
246 GMainContext **main_context, GMainLoop **main_loop)
249 NiceAddress base_addr;
250 const gchar *stun_server, *stun_server_port;
252 /* Create main contexts. */
253 *main_context = g_main_context_new ();
254 *main_loop = g_main_loop_new (*main_context, FALSE);
256 /* Use Google compatibility to ignore credentials. */
258 agent = nice_agent_new_reliable (*main_context, NICE_COMPATIBILITY_GOOGLE);
260 agent = nice_agent_new (*main_context, NICE_COMPATIBILITY_GOOGLE);
262 g_object_set (G_OBJECT (agent),
263 "controlling-mode", controlling_mode,
267 /* Specify which local interface to use. */
268 g_assert (nice_address_set_from_string (&base_addr, "127.0.0.1"));
269 nice_agent_add_local_address (agent, &base_addr);
271 /* Hook up signals. */
272 g_signal_connect (G_OBJECT (agent), "candidate-gathering-done",
273 (GCallback) candidate_gathering_done_cb,
274 GUINT_TO_POINTER (controlling_mode));
275 g_signal_connect (G_OBJECT (agent), "new-selected-pair",
276 (GCallback) new_selected_pair_cb, data);
277 g_signal_connect (G_OBJECT (agent), "component-state-changed",
278 (GCallback) component_state_changed_cb, data);
280 if (data->reliable) {
281 g_signal_connect (G_OBJECT (agent), "reliable-transport-writable",
282 (GCallback) reliable_transport_writable_cb, data);
284 data->stream_open = TRUE;
287 /* Configure the STUN server. */
288 stun_server = g_getenv ("NICE_STUN_SERVER");
289 stun_server_port = g_getenv ("NICE_STUN_SERVER_PORT");
291 if (stun_server != NULL) {
292 g_object_set (G_OBJECT (agent),
293 "stun-server", stun_server,
294 "stun-server-port", atoi (stun_server_port),
302 add_stream (NiceAgent *agent)
306 stream_id = nice_agent_add_stream (agent, 2);
307 g_assert_cmpuint (stream_id, >, 0);
309 g_object_set_data (G_OBJECT (agent), "stream-id",
310 GUINT_TO_POINTER (stream_id));
314 run_agent (TestIOStreamThreadData *data, NiceAgent *agent)
319 tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
320 stream_id = GPOINTER_TO_UINT (tmp);
322 nice_agent_gather_candidates (agent, stream_id);
324 if (data->reliable) {
326 G_IO_STREAM (nice_agent_get_io_stream (agent, stream_id, 1));
327 g_object_set_data (G_OBJECT (agent), "io-stream", data->io_stream);
329 data->io_stream = NULL;
334 spawn_thread (const gchar *thread_name, GThreadFunc thread_func,
339 thread = g_thread_new (thread_name, thread_func, user_data);
346 run_io_stream_test (guint deadlock_timeout, gboolean reliable,
347 const TestIOStreamCallbacks *callbacks,
348 gpointer l_user_data, GDestroyNotify l_user_data_free,
349 gpointer r_user_data, GDestroyNotify r_user_data_free)
351 GMainLoop *error_loop;
352 GThread *l_main_thread, *r_main_thread;
353 GThread *l_write_thread, *l_read_thread, *r_write_thread, *r_read_thread;
354 TestIOStreamThreadData l_data = { NULL }, r_data = { NULL };
357 guint start_count = 6;
360 g_mutex_init (&mutex);
363 error_loop = g_main_loop_new (NULL, FALSE);
365 /* Set up data structures. */
366 l_data.reliable = reliable;
367 l_data.error_loop = error_loop;
368 l_data.callbacks = callbacks;
369 l_data.user_data = l_user_data;
370 l_data.user_data_free = l_user_data_free;
372 g_cond_init (&l_data.write_cond);
373 g_mutex_init (&l_data.write_mutex);
374 l_data.stream_open = FALSE;
375 l_data.stream_ready = FALSE;
376 l_data.start_mutex = &mutex;
377 l_data.start_cond = &cond;
378 l_data.start_count = &start_count;
380 r_data.reliable = reliable;
381 r_data.error_loop = error_loop;
382 r_data.callbacks = callbacks;
383 r_data.user_data = r_user_data;
384 r_data.user_data_free = r_user_data_free;
386 g_cond_init (&r_data.write_cond);
387 g_mutex_init (&r_data.write_mutex);
388 r_data.stream_open = FALSE;
389 r_data.stream_ready = FALSE;
390 r_data.start_mutex = &mutex;
391 r_data.start_cond = &cond;
392 r_data.start_count = &start_count;
394 l_data.other = &r_data;
395 r_data.other = &l_data;
397 /* Create the L and R agents. */
398 l_data.agent = create_agent (TRUE, &l_data,
399 &l_data.main_context, &l_data.main_loop);
400 r_data.agent = create_agent (FALSE, &r_data,
401 &r_data.main_context, &r_data.main_loop);
403 g_object_set_data (G_OBJECT (l_data.agent), "other-agent", r_data.agent);
404 g_object_set_data (G_OBJECT (r_data.agent), "other-agent", l_data.agent);
406 /* Add a timer to catch deadlocks. */
407 g_timeout_add_seconds (deadlock_timeout, timer_cb, NULL);
409 l_main_thread = spawn_thread ("libnice L main", main_thread_cb, &l_data);
410 r_main_thread = spawn_thread ("libnice R main", main_thread_cb, &r_data);
412 add_stream (l_data.agent);
413 add_stream (r_data.agent);
414 run_agent (&l_data, l_data.agent);
415 run_agent (&r_data, r_data.agent);
417 l_read_thread = spawn_thread ("libnice L read", read_thread_cb, &l_data);
418 r_read_thread = spawn_thread ("libnice R read", read_thread_cb, &r_data);
420 if (callbacks->write_thread != NULL) {
421 l_write_thread = spawn_thread ("libnice L write", write_thread_cb, &l_data);
422 r_write_thread = spawn_thread ("libnice R write", write_thread_cb, &r_data);
424 g_mutex_lock (&mutex);
426 g_cond_broadcast (&cond);
427 g_mutex_unlock (&mutex);
429 l_write_thread = NULL;
430 r_write_thread = NULL;
433 /* Run loop for error timer */
434 g_main_loop_run (error_loop);
436 /* Clean up the main loops and threads. */
437 stop_main_loop (l_data.main_loop);
438 stop_main_loop (r_data.main_loop);
440 g_thread_join (l_read_thread);
441 g_thread_join (r_read_thread);
442 if (l_write_thread != NULL)
443 g_thread_join (l_write_thread);
444 if (r_write_thread != NULL)
445 g_thread_join (r_write_thread);
446 g_thread_join (l_main_thread);
447 g_thread_join (r_main_thread);
450 if (r_data.user_data_free != NULL)
451 r_data.user_data_free (r_data.user_data);
453 if (l_data.user_data_free != NULL)
454 l_data.user_data_free (l_data.user_data);
456 g_cond_clear (&r_data.write_cond);
457 g_mutex_clear (&r_data.write_mutex);
458 g_cond_clear (&l_data.write_cond);
459 g_mutex_clear (&l_data.write_mutex);
461 if (r_data.io_stream != NULL)
462 g_object_unref (r_data.io_stream);
463 if (l_data.io_stream != NULL)
464 g_object_unref (l_data.io_stream);
467 GPOINTER_TO_UINT (g_object_get_data (G_OBJECT (r_data.agent), "stream-id"));
469 nice_agent_remove_stream (r_data.agent, stream_id);
471 GPOINTER_TO_UINT (g_object_get_data (G_OBJECT (l_data.agent), "stream-id"));
473 nice_agent_remove_stream (l_data.agent, stream_id);
475 g_object_add_weak_pointer (G_OBJECT (r_data.agent),
476 (gpointer *) &r_data.agent);
477 g_object_add_weak_pointer (G_OBJECT (l_data.agent),
478 (gpointer *) &l_data.agent);
480 g_object_unref (r_data.agent);
481 g_object_unref (l_data.agent);
483 WAIT_UNTIL_UNSET (r_data.agent, r_data.main_context);
484 WAIT_UNTIL_UNSET (l_data.agent, l_data.main_context);
486 g_main_loop_unref (r_data.main_loop);
487 g_main_loop_unref (l_data.main_loop);
489 g_main_context_unref (r_data.main_context);
490 g_main_context_unref (l_data.main_context);
492 g_main_loop_unref (error_loop);
494 g_mutex_clear (&mutex);
495 g_cond_clear (&cond);
498 /* Once we’ve received all the expected bytes, wait to finish sending all bytes,
499 * then send and wait for the close message. Finally, remove the stream.
501 * This must only be called from the read thread implementation. */
503 check_for_termination (TestIOStreamThreadData *data, gsize *recv_count,
504 gsize *other_recv_count, volatile gsize *send_count, gsize expected_recv_count)
508 GError *error = NULL;
510 /* Wait for transmission to complete. */
511 while (*send_count < expected_recv_count) {
512 if (data->callbacks->wait_transmission_cb) {
513 data->callbacks->wait_transmission_cb (data->agent);
517 /* Send a close message. */
518 tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
519 stream_id = GPOINTER_TO_UINT (tmp);
521 /* Can't be certain enough to test for termination on non-reliable streams.
522 * There may be packet losses, etc
524 if (data->io_stream) {
527 g_output_stream_close (g_io_stream_get_output_stream (data->io_stream),
530 g_assert_no_error (error);
532 len = g_input_stream_skip (g_io_stream_get_input_stream (data->io_stream),
533 1024 * 1024, NULL, &error);
534 g_assert_no_error (error);
535 g_assert_cmpint (len, ==, 0);
538 /* Remove the stream and run away. */
539 nice_agent_remove_stream (data->agent, stream_id);
540 g_object_set_data (G_OBJECT (data->agent), "stream-id", GUINT_TO_POINTER (0));
541 g_clear_object (&data->io_stream);
544 if (data->other->done)
545 g_main_loop_quit (data->error_loop);
547 /* If both sides have finished, quit the test main loop. */
548 if (*recv_count > expected_recv_count &&
549 *other_recv_count > expected_recv_count) {
550 g_main_loop_quit (data->error_loop);
555 stop_main_loop (GMainLoop *loop)
557 GSource *src = g_idle_source_new ();
558 g_source_set_callback (src, G_SOURCE_FUNC (g_main_loop_quit),
559 g_main_loop_ref (loop), (GDestroyNotify) g_main_loop_unref);
560 g_source_attach (src, g_main_loop_get_context (loop));
561 g_source_unref (src);