tests: include config.h and don't include unix headers
[platform/upstream/gstreamer.git] / tests / check / gst / gstbus.c
1 /* GStreamer message bus unit tests
2  * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
3  * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include <gst/check/gstcheck.h>
25
26 static GstBus *test_bus = NULL;
27 static GMainLoop *main_loop;
28
29 static GType foo_device_get_type (void);
30
31 #define NUM_MESSAGES 1000
32 #define NUM_THREADS 10
33
34 static gpointer
35 pound_bus_with_messages (gpointer data)
36 {
37   gint thread_id = GPOINTER_TO_INT (data);
38   gint i;
39
40   for (i = 0; i < NUM_MESSAGES; i++) {
41     GstMessage *m;
42     GstStructure *s;
43
44     s = gst_structure_new ("test_message",
45         "thread_id", G_TYPE_INT, thread_id, "msg_id", G_TYPE_INT, i, NULL);
46     m = gst_message_new_application (NULL, s);
47     gst_bus_post (test_bus, m);
48   }
49   return NULL;
50 }
51
52 static void
53 pull_messages (void)
54 {
55   GstMessage *m;
56   const GstStructure *s;
57   guint message_ids[NUM_THREADS];
58   gint i;
59
60   for (i = 0; i < NUM_THREADS; i++)
61     message_ids[i] = 0;
62
63   while (1) {
64     gint _t, _i;
65
66     m = gst_bus_pop (test_bus);
67     if (!m)
68       break;
69     g_return_if_fail (GST_MESSAGE_TYPE (m) == GST_MESSAGE_APPLICATION);
70
71     s = gst_message_get_structure (m);
72     if (!gst_structure_get_int (s, "thread_id", &_t))
73       g_critical ("Invalid message");
74     if (!gst_structure_get_int (s, "msg_id", &_i))
75       g_critical ("Invalid message");
76
77     g_return_if_fail (_t < NUM_THREADS);
78     g_return_if_fail (_i == message_ids[_t]++);
79
80     gst_message_unref (m);
81   }
82
83   for (i = 0; i < NUM_THREADS; i++)
84     g_return_if_fail (message_ids[i] == NUM_MESSAGES);
85 }
86
87 GST_START_TEST (test_hammer_bus)
88 {
89   GThread *threads[NUM_THREADS];
90   gint i;
91
92   test_bus = gst_bus_new ();
93
94   for (i = 0; i < NUM_THREADS; i++)
95     threads[i] = g_thread_try_new ("gst-check", pound_bus_with_messages,
96         GINT_TO_POINTER (i), NULL);
97
98   for (i = 0; i < NUM_THREADS; i++)
99     g_thread_join (threads[i]);
100
101   pull_messages ();
102
103   gst_object_unref ((GstObject *) test_bus);
104 }
105
106 GST_END_TEST;
107
108 static gboolean
109 message_func_eos (GstBus * bus, GstMessage * message, guint * p_counter)
110 {
111   const GstStructure *s;
112   gint i;
113
114   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
115
116   GST_DEBUG ("got EOS message");
117
118   s = gst_message_get_structure (message);
119   if (!gst_structure_get_int (s, "msg_id", &i))
120     g_critical ("Invalid message");
121
122   if (p_counter != NULL)
123     *p_counter += 1;
124
125   return i != 9;
126 }
127
128 static gboolean
129 message_func_app (GstBus * bus, GstMessage * message, guint * p_counter)
130 {
131   const GstStructure *s;
132   gint i;
133
134   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
135       FALSE);
136
137   GST_DEBUG ("got APP message");
138
139   s = gst_message_get_structure (message);
140   if (!gst_structure_get_int (s, "msg_id", &i))
141     g_critical ("Invalid message");
142
143   if (p_counter != NULL)
144     *p_counter += 1;
145
146   return i != 9;
147 }
148
149 static gboolean
150 send_messages (gpointer data)
151 {
152   GstMessage *m;
153   GstStructure *s;
154   gint i;
155
156   for (i = 0; i < 10; i++) {
157     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
158     m = gst_message_new_application (NULL, s);
159     gst_bus_post (test_bus, m);
160     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
161     m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
162     gst_bus_post (test_bus, m);
163   }
164
165   return FALSE;
166 }
167
168 /* test if adding a signal watch for different message types calls the
169  * respective callbacks. */
170 GST_START_TEST (test_watch)
171 {
172   guint num_eos = 0;
173   guint num_app = 0;
174   guint id;
175
176   test_bus = gst_bus_new ();
177
178   main_loop = g_main_loop_new (NULL, FALSE);
179
180   id = gst_bus_add_watch (test_bus, gst_bus_async_signal_func, NULL);
181   fail_if (id == 0);
182   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
183       &num_eos);
184   g_signal_connect (test_bus, "message::application",
185       (GCallback) message_func_app, &num_app);
186
187   g_idle_add ((GSourceFunc) send_messages, NULL);
188   while (g_main_context_pending (NULL))
189     g_main_context_iteration (NULL, FALSE);
190
191   fail_unless_equals_int (num_eos, 10);
192   fail_unless_equals_int (num_app, 10);
193
194   fail_unless (gst_bus_remove_watch (test_bus));
195   g_main_loop_unref (main_loop);
196
197   gst_object_unref ((GstObject *) test_bus);
198 }
199
200 GST_END_TEST;
201
202 /* test if adding a signal watch for different message types calls the
203  * respective callbacks. */
204 GST_START_TEST (test_watch_with_custom_context)
205 {
206   GMainContext *ctx;
207   GSource *source;
208   guint num_eos = 0;
209   guint num_app = 0;
210   guint id;
211
212   test_bus = gst_bus_new ();
213
214   ctx = g_main_context_new ();
215   main_loop = g_main_loop_new (ctx, FALSE);
216
217   source = gst_bus_create_watch (test_bus);
218   g_source_set_callback (source, (GSourceFunc) gst_bus_async_signal_func, NULL,
219       NULL);
220   id = g_source_attach (source, ctx);
221   g_source_unref (source);
222   fail_if (id == 0);
223
224   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
225       &num_eos);
226   g_signal_connect (test_bus, "message::application",
227       (GCallback) message_func_app, &num_app);
228
229   source = g_idle_source_new ();
230   g_source_set_callback (source, (GSourceFunc) send_messages, NULL, NULL);
231   g_source_attach (source, ctx);
232   g_source_unref (source);
233
234   while (g_main_context_pending (ctx))
235     g_main_context_iteration (ctx, FALSE);
236
237   fail_unless_equals_int (num_eos, 10);
238   fail_unless_equals_int (num_app, 10);
239
240   if ((source = g_main_context_find_source_by_id (ctx, id)))
241     g_source_destroy (source);
242   g_main_loop_unref (main_loop);
243   g_main_context_unref (ctx);
244
245   gst_object_unref (test_bus);
246 }
247
248 GST_END_TEST;
249
250 /* test if adding a signal watch for different message types calls the
251  * respective callbacks. */
252 GST_START_TEST (test_add_watch_with_custom_context)
253 {
254   GMainContext *ctx;
255   GSource *source;
256   guint num_eos = 0;
257   guint num_app = 0;
258
259   test_bus = gst_bus_new ();
260
261   ctx = g_main_context_new ();
262   main_loop = g_main_loop_new (ctx, FALSE);
263
264   g_main_context_push_thread_default (ctx);
265   gst_bus_add_signal_watch (test_bus);
266   g_main_context_pop_thread_default (ctx);
267
268   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
269       &num_eos);
270   g_signal_connect (test_bus, "message::application",
271       (GCallback) message_func_app, &num_app);
272
273   source = g_idle_source_new ();
274   g_source_set_callback (source, (GSourceFunc) send_messages, NULL, NULL);
275   g_source_attach (source, ctx);
276   g_source_unref (source);
277
278   while (g_main_context_pending (ctx))
279     g_main_context_iteration (ctx, FALSE);
280
281   fail_unless_equals_int (num_eos, 10);
282   fail_unless_equals_int (num_app, 10);
283
284   g_main_loop_unref (main_loop);
285   g_main_context_unref (ctx);
286
287   gst_object_unref (test_bus);
288 }
289
290 GST_END_TEST;
291
292 static gboolean
293 dummy_bus_func (GstBus * bus, GstMessage * msg, gpointer user_data)
294 {
295   return TRUE;
296 }
297
298 GST_START_TEST (test_remove_watch)
299 {
300   test_bus = gst_bus_new ();
301
302   /* removing a non-existing watch should fail */
303   fail_if (gst_bus_remove_watch (test_bus));
304
305   gst_bus_add_watch (test_bus, dummy_bus_func, NULL);
306
307   fail_unless (gst_bus_remove_watch (test_bus));
308
309   /* now it should fail to remove the watch again */
310   fail_if (gst_bus_remove_watch (test_bus));
311
312   gst_object_unref (test_bus);
313 }
314
315 GST_END_TEST;
316
317 static gint messages_seen;
318
319 static void
320 message_func (GstBus * bus, GstMessage * message, gpointer data)
321 {
322   g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
323
324   messages_seen++;
325 }
326
327 static void
328 send_5app_1el_1err_2app_1eos_messages (guint interval_usecs)
329 {
330   GstMessage *m;
331   GstStructure *s;
332   gint i;
333
334   for (i = 0; i < 5; i++) {
335     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
336     m = gst_message_new_application (NULL, s);
337     GST_LOG ("posting application message");
338     gst_bus_post (test_bus, m);
339     g_usleep (interval_usecs);
340   }
341   for (i = 0; i < 1; i++) {
342     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
343     m = gst_message_new_element (NULL, s);
344     GST_LOG ("posting element message");
345     gst_bus_post (test_bus, m);
346     g_usleep (interval_usecs);
347   }
348   for (i = 0; i < 1; i++) {
349     m = gst_message_new_error (NULL, NULL, "debug string");
350     GST_LOG ("posting error message");
351     gst_bus_post (test_bus, m);
352     g_usleep (interval_usecs);
353   }
354   for (i = 0; i < 2; i++) {
355     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
356     m = gst_message_new_application (NULL, s);
357     GST_LOG ("posting application message");
358     gst_bus_post (test_bus, m);
359     g_usleep (interval_usecs);
360   }
361   for (i = 0; i < 1; i++) {
362     m = gst_message_new_eos (NULL);
363     GST_LOG ("posting EOS message");
364     gst_bus_post (test_bus, m);
365     g_usleep (interval_usecs);
366   }
367 }
368
369 static void
370 send_extended_messages (guint interval_usecs)
371 {
372   GstMessage *msg;
373   GstDevice *device;
374
375   device = g_object_new (foo_device_get_type (), NULL);
376
377   msg = gst_message_new_device_added (NULL, device);
378   GST_LOG ("posting device-added message");
379   gst_bus_post (test_bus, msg);
380   g_usleep (interval_usecs);
381
382   msg = gst_message_new_device_removed (NULL, device);
383   GST_LOG ("posting device-removed message");
384   gst_bus_post (test_bus, msg);
385   g_usleep (interval_usecs);
386
387   gst_object_unref (device);
388 }
389
390
391 static void
392 send_10_app_messages (void)
393 {
394   GstMessage *m;
395   GstStructure *s;
396   gint i;
397
398   for (i = 0; i < 10; i++) {
399     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
400     m = gst_message_new_application (NULL, s);
401     gst_bus_post (test_bus, m);
402   }
403 }
404
405 /* test that you get the same messages from a poll as from signal watches. */
406 GST_START_TEST (test_watch_with_poll)
407 {
408   guint i;
409
410   test_bus = gst_bus_new ();
411   messages_seen = 0;
412
413   gst_bus_add_signal_watch (test_bus);
414   g_signal_connect (test_bus, "message", (GCallback) message_func, NULL);
415
416   send_10_app_messages ();
417
418   for (i = 0; i < 10; i++)
419     gst_message_unref (gst_bus_poll (test_bus, GST_MESSAGE_APPLICATION,
420             GST_CLOCK_TIME_NONE));
421
422   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
423   fail_unless (messages_seen == 10, "signal handler didn't get 10 messages");
424
425   gst_bus_remove_signal_watch (test_bus);
426
427   gst_object_unref (test_bus);
428 }
429
430 GST_END_TEST;
431
432 /* test that you get the messages with pop. */
433 GST_START_TEST (test_timed_pop)
434 {
435   guint i;
436
437   test_bus = gst_bus_new ();
438
439   send_10_app_messages ();
440
441   for (i = 0; i < 10; i++)
442     gst_message_unref (gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE));
443
444   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
445
446   gst_object_unref (test_bus);
447 }
448
449 GST_END_TEST;
450
451 typedef struct
452 {
453   GstDevice device;
454 } FooDevice;
455 typedef struct
456 {
457   GstDeviceClass device_klass;
458 } FooDeviceClass;
459
460 G_DEFINE_TYPE (FooDevice, foo_device, GST_TYPE_DEVICE);
461
462 static void
463 foo_device_class_init (FooDeviceClass * klass)
464 {
465   /* nothing to do here */
466 }
467
468 static void
469 foo_device_init (FooDevice * device)
470 {
471   /* nothing to do here */
472 }
473
474 /* test that you get the messages with pop_filtered */
475 GST_START_TEST (test_timed_pop_filtered)
476 {
477   GstMessage *msg;
478   guint i;
479
480   test_bus = gst_bus_new ();
481
482   send_10_app_messages ();
483   for (i = 0; i < 10; i++) {
484     msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
485         GST_MESSAGE_ANY);
486     fail_unless (msg != NULL);
487     gst_message_unref (msg);
488   }
489
490   /* should flush all messages on the bus with types not matching */
491   send_10_app_messages ();
492   msg = gst_bus_timed_pop_filtered (test_bus, 0,
493       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
494   fail_unless (msg == NULL);
495   msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 2,
496       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
497   fail_unless (msg == NULL);
498   /* there should be nothing on the bus now */
499   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
500   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ANY);
501   fail_unless (msg == NULL);
502
503   send_5app_1el_1err_2app_1eos_messages (0);
504   msg = gst_bus_timed_pop_filtered (test_bus, 0,
505       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
506   fail_unless (msg != NULL);
507   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
508   gst_message_unref (msg);
509   fail_unless (gst_bus_have_pending (test_bus), "expected messages on bus");
510   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_APPLICATION);
511   fail_unless (msg != NULL);
512   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
513   gst_message_unref (msg);
514   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ERROR);
515   fail_unless (msg == NULL);
516
517   gst_object_unref (test_bus);
518
519   /* Test extended messages */
520   GST_DEBUG
521       ("Checking extended messages received from gst_bus_timed_pop_filtered");
522   test_bus = gst_bus_new ();
523
524   send_5app_1el_1err_2app_1eos_messages (0);
525   send_extended_messages (0);
526   send_5app_1el_1err_2app_1eos_messages (0);
527   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_EXTENDED);
528   fail_unless (msg != NULL);
529   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_DEVICE_ADDED);
530   gst_message_unref (msg);
531
532   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_EXTENDED);
533   fail_unless (msg != NULL);
534   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_DEVICE_REMOVED);
535   gst_message_unref (msg);
536   gst_object_unref (test_bus);
537
538   /* Now check extended messages don't appear when we don't ask for them */
539   GST_DEBUG
540       ("Checking extended messages *not* received from gst_bus_timed_pop_filtered when not wanted");
541   test_bus = gst_bus_new ();
542
543   send_extended_messages (0);
544   send_5app_1el_1err_2app_1eos_messages (0);
545
546   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ERROR);
547   fail_unless (msg != NULL);
548   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ERROR);
549   gst_message_unref (msg);
550
551   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_EOS);
552   fail_unless (msg != NULL);
553   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_EOS);
554   gst_message_unref (msg);
555
556   gst_object_unref (test_bus);
557 }
558
559 GST_END_TEST;
560
561 static gpointer
562 post_delayed_thread (gpointer data)
563 {
564   THREAD_START ();
565   send_5app_1el_1err_2app_1eos_messages (1 * G_USEC_PER_SEC);
566   return NULL;
567 }
568
569 /* test that you get the messages with pop_filtered if there's a timeout*/
570 GST_START_TEST (test_timed_pop_filtered_with_timeout)
571 {
572   GstMessage *msg;
573
574   MAIN_INIT ();
575
576   test_bus = gst_bus_new ();
577
578   MAIN_START_THREAD_FUNCTIONS (1, post_delayed_thread, NULL);
579
580   MAIN_SYNCHRONIZE ();
581
582   msg = gst_bus_timed_pop_filtered (test_bus, 2 * GST_SECOND,
583       GST_MESSAGE_ERROR);
584   fail_unless (msg == NULL, "Got unexpected %s message",
585       (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
586   msg = gst_bus_timed_pop_filtered (test_bus, (3 + 1 + 1 + 1) * GST_SECOND,
587       GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT);
588   fail_unless (msg != NULL, "expected element message, but got nothing");
589   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
590   gst_message_unref (msg);
591   msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
592       GST_MESSAGE_APPLICATION);
593   fail_unless (msg != NULL, "expected application message, but got nothing");
594   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
595   gst_message_unref (msg);
596   msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
597       GST_MESSAGE_APPLICATION);
598   fail_unless (msg != NULL, "expected application message, but got nothing");
599   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
600   gst_message_unref (msg);
601   msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 4,
602       GST_MESSAGE_TAG | GST_MESSAGE_ERROR);
603   fail_unless (msg == NULL, "Got unexpected %s message",
604       (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
605
606   MAIN_STOP_THREADS ();
607
608   gst_object_unref (test_bus);
609 }
610
611 GST_END_TEST;
612
613 /* test that you get the messages with pop from another thread. */
614 static gpointer
615 pop_thread (gpointer data)
616 {
617   GstBus *bus = GST_BUS_CAST (data);
618   guint i;
619
620   for (i = 0; i < 10; i++)
621     gst_message_unref (gst_bus_timed_pop (bus, GST_CLOCK_TIME_NONE));
622
623   return NULL;
624 }
625
626 GST_START_TEST (test_timed_pop_thread)
627 {
628   GThread *thread;
629   GError *error = NULL;
630
631   test_bus = gst_bus_new ();
632
633   thread = g_thread_try_new ("gst-chek", pop_thread, test_bus, &error);
634   fail_if (error != NULL);
635
636   send_10_app_messages ();
637
638   g_thread_join (thread);
639
640   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
641
642   /* try to pop a message without timeout. */
643   fail_if (gst_bus_timed_pop (test_bus, 0) != NULL);
644
645   /* with a small timeout */
646   fail_if (gst_bus_timed_pop (test_bus, 1000) != NULL);
647
648   gst_object_unref (test_bus);
649 }
650
651 GST_END_TEST;
652
653 static gboolean
654 cb_bus_call (GstBus * bus, GstMessage * msg, gpointer data)
655 {
656   GMainLoop *loop = data;
657
658   switch (GST_MESSAGE_TYPE (msg)) {
659     case GST_MESSAGE_EOS:
660     {
661       GST_INFO ("End-of-stream");
662       g_main_loop_quit (loop);
663       break;
664     }
665     case GST_MESSAGE_ERROR:
666     {
667       GError *err = NULL;
668
669       gst_message_parse_error (msg, &err, NULL);
670       g_error ("Error: %s", err->message);
671       g_error_free (err);
672
673       g_main_loop_quit (loop);
674       break;
675     }
676     default:
677     {
678       GST_LOG ("BUS MESSAGE: type=%s", GST_MESSAGE_TYPE_NAME (msg));
679       break;
680     }
681   }
682
683   return TRUE;
684 }
685
686 GST_START_TEST (test_custom_main_context)
687 {
688   GMainContext *ctx;
689   GMainLoop *loop;
690   GstElement *pipeline;
691   GstElement *src;
692   GstElement *sink;
693   GSource *source;
694   GstBus *bus;
695
696   ctx = g_main_context_new ();
697   loop = g_main_loop_new (ctx, FALSE);
698
699   pipeline = gst_pipeline_new (NULL);
700   src = gst_element_factory_make ("fakesrc", NULL);
701   g_object_set (src, "num-buffers", 2000, NULL);
702
703   sink = gst_element_factory_make ("fakesink", NULL);
704
705   fail_unless (gst_bin_add (GST_BIN (pipeline), src));
706   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
707   fail_unless (gst_element_link (src, sink));
708
709   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
710   source = gst_bus_create_watch (bus);
711   g_source_attach (source, ctx);
712   g_source_set_callback (source, (GSourceFunc) cb_bus_call, loop, NULL);
713   g_source_unref (source);
714   gst_object_unref (bus);
715
716   GST_INFO ("starting pipeline");
717
718   gst_element_set_state (pipeline, GST_STATE_PLAYING);
719   gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
720
721   GST_INFO ("running event loop, ctx=%p", ctx);
722   g_main_loop_run (loop);
723
724   gst_element_set_state (pipeline, GST_STATE_NULL);
725
726   /* clean up */
727   if (ctx)
728     g_main_context_unref (ctx);
729   g_main_loop_unref (loop);
730   gst_object_unref (pipeline);
731 }
732
733 GST_END_TEST;
734
735 static GstBusSyncReply
736 test_async_sync_handler (GstBus * bus, GstMessage * msg, gpointer user_data)
737 {
738   GArray *timestamps = user_data;
739   gint64 ts = g_get_monotonic_time () * 1000;   /* microsecs -> nanosecs */
740
741   g_array_append_val (timestamps, ts);
742   GST_INFO ("[msg %p] %" GST_PTR_FORMAT, msg, msg);
743
744   return GST_BUS_ASYNC;
745 }
746
747 static gpointer
748 post_10_app_messages_thread (gpointer data)
749 {
750   THREAD_START ();
751   send_10_app_messages ();
752   return NULL;
753 }
754
755 /* Test GST_BUS_ASYNC actually causes the thread posting the message to
756  * block until the message has been freed. We spawn a thread to post ten
757  * messages. We install a bus sync handler to get the timestamp of each
758  * message as it is being posted, and to return GST_BUS_ASYNC. In the main
759  * thread we sleep a bit after we pop off a message and before we free it.
760  * The posting thread should be blocked while the main thread sleeps, so
761  * we expect the interval as the messages are posted to be roughly the same
762  * as the sleep time in the main thread. g_usleep() is not super-precise, so
763  * we allow for some slack there, we just want to check that the posting
764  * thread was blocked at all really. */
765 GST_START_TEST (test_async_message)
766 {
767   GArray *timestamps;
768   guint i;
769
770   MAIN_INIT ();
771
772   timestamps = g_array_sized_new (FALSE, FALSE, sizeof (gint64), 10);
773
774   test_bus = gst_bus_new ();
775
776   gst_bus_set_sync_handler (test_bus, test_async_sync_handler, timestamps,
777       NULL);
778
779   MAIN_START_THREAD_FUNCTIONS (1, post_10_app_messages_thread, NULL);
780
781   MAIN_SYNCHRONIZE ();
782
783   for (i = 0; i < 10; i++) {
784     GstMessage *msg;
785
786     GST_LOG ("(%d) waiting for message..", i);
787     msg = gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE);
788     GST_LOG ("(%d) got message, sleeping a bit", i);
789     g_usleep (60 * GST_MSECOND / (GST_SECOND / G_USEC_PER_SEC));
790     GST_LOG ("(%d) about to free message", i);
791     gst_message_unref (msg);
792   }
793
794   for (i = 1; i < 10; i++) {
795     gint64 prev_ts = g_array_index (timestamps, gint64, i - 1);
796     gint64 ts = g_array_index (timestamps, gint64, i);
797     gint64 diff = ts - prev_ts;
798
799     fail_unless (prev_ts < ts);
800     fail_unless (diff >= 20 * GST_MSECOND, "interval between messages being "
801         "posted was just %" G_GINT64_FORMAT "ms", diff / GST_MSECOND);
802   }
803
804   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
805
806   MAIN_STOP_THREADS ();
807
808   gst_object_unref (test_bus);
809
810   g_array_unref (timestamps);
811 }
812
813 GST_END_TEST;
814
815 static Suite *
816 gst_bus_suite (void)
817 {
818   Suite *s = suite_create ("GstBus");
819   TCase *tc_chain = tcase_create ("stresstest");
820
821   tcase_set_timeout (tc_chain, 60);
822
823   suite_add_tcase (s, tc_chain);
824   tcase_add_test (tc_chain, test_hammer_bus);
825   tcase_add_test (tc_chain, test_watch);
826   tcase_add_test (tc_chain, test_watch_with_poll);
827   tcase_add_test (tc_chain, test_watch_with_custom_context);
828   tcase_add_test (tc_chain, test_add_watch_with_custom_context);
829   tcase_add_test (tc_chain, test_remove_watch);
830   tcase_add_test (tc_chain, test_timed_pop);
831   tcase_add_test (tc_chain, test_timed_pop_thread);
832   tcase_add_test (tc_chain, test_timed_pop_filtered);
833   tcase_add_test (tc_chain, test_timed_pop_filtered_with_timeout);
834   tcase_add_test (tc_chain, test_custom_main_context);
835   tcase_add_test (tc_chain, test_async_message);
836   return s;
837 }
838
839 GST_CHECK_MAIN (gst_bus);