caps: improve _do_simplify
[platform/upstream/gstreamer.git] / tests / check / gst / gstbus.c
1 /* GStreamer message bus unit tests
2  * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
3  * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #include <gst/check/gstcheck.h>
22
23 static GstBus *test_bus = NULL;
24 static GMainLoop *main_loop;
25
26 #define NUM_MESSAGES 1000
27 #define NUM_THREADS 10
28
29 static gpointer
30 pound_bus_with_messages (gpointer data)
31 {
32   gint thread_id = GPOINTER_TO_INT (data);
33   gint i;
34
35   for (i = 0; i < NUM_MESSAGES; i++) {
36     GstMessage *m;
37     GstStructure *s;
38
39     s = gst_structure_new ("test_message",
40         "thread_id", G_TYPE_INT, thread_id, "msg_id", G_TYPE_INT, i, NULL);
41     m = gst_message_new_application (NULL, s);
42     gst_bus_post (test_bus, m);
43   }
44   return NULL;
45 }
46
47 static void
48 pull_messages (void)
49 {
50   GstMessage *m;
51   const GstStructure *s;
52   guint message_ids[NUM_THREADS];
53   gint i;
54
55   for (i = 0; i < NUM_THREADS; i++)
56     message_ids[i] = 0;
57
58   while (1) {
59     gint _t, _i;
60
61     m = gst_bus_pop (test_bus);
62     if (!m)
63       break;
64     g_return_if_fail (GST_MESSAGE_TYPE (m) == GST_MESSAGE_APPLICATION);
65
66     s = gst_message_get_structure (m);
67     if (!gst_structure_get_int (s, "thread_id", &_t))
68       g_critical ("Invalid message");
69     if (!gst_structure_get_int (s, "msg_id", &_i))
70       g_critical ("Invalid message");
71
72     g_return_if_fail (_t < NUM_THREADS);
73     g_return_if_fail (_i == message_ids[_t]++);
74
75     gst_message_unref (m);
76   }
77
78   for (i = 0; i < NUM_THREADS; i++)
79     g_return_if_fail (message_ids[i] == NUM_MESSAGES);
80 }
81
82 GST_START_TEST (test_hammer_bus)
83 {
84   GThread *threads[NUM_THREADS];
85   gint i;
86
87   test_bus = gst_bus_new ();
88
89   for (i = 0; i < NUM_THREADS; i++)
90     threads[i] = g_thread_create (pound_bus_with_messages, GINT_TO_POINTER (i),
91         TRUE, NULL);
92
93   for (i = 0; i < NUM_THREADS; i++)
94     g_thread_join (threads[i]);
95
96   pull_messages ();
97
98   gst_object_unref ((GstObject *) test_bus);
99 }
100
101 GST_END_TEST;
102
103 static gboolean
104 message_func_eos (GstBus * bus, GstMessage * message, guint * p_counter)
105 {
106   const GstStructure *s;
107   gint i;
108
109   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
110
111   GST_DEBUG ("got EOS message");
112
113   s = gst_message_get_structure (message);
114   if (!gst_structure_get_int (s, "msg_id", &i))
115     g_critical ("Invalid message");
116
117   if (p_counter != NULL)
118     *p_counter += 1;
119
120   return i != 9;
121 }
122
123 static gboolean
124 message_func_app (GstBus * bus, GstMessage * message, guint * p_counter)
125 {
126   const GstStructure *s;
127   gint i;
128
129   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
130       FALSE);
131
132   GST_DEBUG ("got APP message");
133
134   s = gst_message_get_structure (message);
135   if (!gst_structure_get_int (s, "msg_id", &i))
136     g_critical ("Invalid message");
137
138   if (p_counter != NULL)
139     *p_counter += 1;
140
141   return i != 9;
142 }
143
144 static gboolean
145 send_messages (gpointer data)
146 {
147   GstMessage *m;
148   GstStructure *s;
149   gint i;
150
151   for (i = 0; i < 10; i++) {
152     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
153     m = gst_message_new_application (NULL, s);
154     gst_bus_post (test_bus, m);
155     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
156     m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
157     gst_bus_post (test_bus, m);
158   }
159
160   return FALSE;
161 }
162
163 /* test if adding a signal watch for different message types calls the
164  * respective callbacks. */
165 GST_START_TEST (test_watch)
166 {
167   guint num_eos = 0;
168   guint num_app = 0;
169   guint id;
170
171   test_bus = gst_bus_new ();
172
173   main_loop = g_main_loop_new (NULL, FALSE);
174
175   id = gst_bus_add_watch (test_bus, gst_bus_async_signal_func, NULL);
176   fail_if (id == 0);
177   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
178       &num_eos);
179   g_signal_connect (test_bus, "message::application",
180       (GCallback) message_func_app, &num_app);
181
182   g_idle_add ((GSourceFunc) send_messages, NULL);
183   while (g_main_context_pending (NULL))
184     g_main_context_iteration (NULL, FALSE);
185
186   fail_unless_equals_int (num_eos, 10);
187   fail_unless_equals_int (num_app, 10);
188
189   g_source_remove (id);
190   g_main_loop_unref (main_loop);
191
192   gst_object_unref ((GstObject *) test_bus);
193 }
194
195 GST_END_TEST;
196
197 /* test if adding a signal watch for different message types calls the
198  * respective callbacks. */
199 GST_START_TEST (test_watch_with_custom_context)
200 {
201   GMainContext *ctx;
202   GSource *source;
203   guint num_eos = 0;
204   guint num_app = 0;
205   guint id;
206
207   test_bus = gst_bus_new ();
208
209   ctx = g_main_context_new ();
210   main_loop = g_main_loop_new (ctx, FALSE);
211
212   source = gst_bus_create_watch (test_bus);
213   g_source_set_callback (source, (GSourceFunc) gst_bus_async_signal_func, NULL,
214       NULL);
215   id = g_source_attach (source, ctx);
216   g_source_unref (source);
217   fail_if (id == 0);
218
219   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
220       &num_eos);
221   g_signal_connect (test_bus, "message::application",
222       (GCallback) message_func_app, &num_app);
223
224   source = g_idle_source_new ();
225   g_source_set_callback (source, (GSourceFunc) send_messages, NULL, NULL);
226   g_source_attach (source, ctx);
227   g_source_unref (source);
228
229   while (g_main_context_pending (ctx))
230     g_main_context_iteration (ctx, FALSE);
231
232   fail_unless_equals_int (num_eos, 10);
233   fail_unless_equals_int (num_app, 10);
234
235   g_source_remove (id);
236   g_main_loop_unref (main_loop);
237   g_main_context_unref (ctx);
238
239   gst_object_unref (test_bus);
240 }
241
242 GST_END_TEST;
243
244 /* test if adding a signal watch for different message types calls the
245  * respective callbacks. */
246 GST_START_TEST (test_add_watch_with_custom_context)
247 {
248   GMainContext *ctx;
249   GSource *source;
250   guint num_eos = 0;
251   guint num_app = 0;
252
253   test_bus = gst_bus_new ();
254
255   ctx = g_main_context_new ();
256   main_loop = g_main_loop_new (ctx, FALSE);
257
258   g_main_context_push_thread_default (ctx);
259   gst_bus_add_signal_watch (test_bus);
260   g_main_context_pop_thread_default (ctx);
261
262   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
263       &num_eos);
264   g_signal_connect (test_bus, "message::application",
265       (GCallback) message_func_app, &num_app);
266
267   source = g_idle_source_new ();
268   g_source_set_callback (source, (GSourceFunc) send_messages, NULL, NULL);
269   g_source_attach (source, ctx);
270   g_source_unref (source);
271
272   while (g_main_context_pending (ctx))
273     g_main_context_iteration (ctx, FALSE);
274
275   fail_unless_equals_int (num_eos, 10);
276   fail_unless_equals_int (num_app, 10);
277
278   g_main_loop_unref (main_loop);
279   g_main_context_unref (ctx);
280
281   gst_object_unref (test_bus);
282 }
283
284 GST_END_TEST;
285
286 static gint messages_seen;
287
288 static void
289 message_func (GstBus * bus, GstMessage * message, gpointer data)
290 {
291   g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
292
293   messages_seen++;
294 }
295
296 static void
297 send_5app_1el_1err_2app_messages (guint interval_usecs)
298 {
299   GstMessage *m;
300   GstStructure *s;
301   gint i;
302
303   for (i = 0; i < 5; i++) {
304     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
305     m = gst_message_new_application (NULL, s);
306     GST_LOG ("posting application message");
307     gst_bus_post (test_bus, m);
308     g_usleep (interval_usecs);
309   }
310   for (i = 0; i < 1; i++) {
311     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
312     m = gst_message_new_element (NULL, s);
313     GST_LOG ("posting element message");
314     gst_bus_post (test_bus, m);
315     g_usleep (interval_usecs);
316   }
317   for (i = 0; i < 1; i++) {
318     m = gst_message_new_error (NULL, NULL, "debug string");
319     GST_LOG ("posting error message");
320     gst_bus_post (test_bus, m);
321     g_usleep (interval_usecs);
322   }
323   for (i = 0; i < 2; i++) {
324     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
325     m = gst_message_new_application (NULL, s);
326     GST_LOG ("posting application message");
327     gst_bus_post (test_bus, m);
328     g_usleep (interval_usecs);
329   }
330 }
331
332 static void
333 send_10_app_messages (void)
334 {
335   GstMessage *m;
336   GstStructure *s;
337   gint i;
338
339   for (i = 0; i < 10; i++) {
340     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
341     m = gst_message_new_application (NULL, s);
342     gst_bus_post (test_bus, m);
343   }
344 }
345
346 /* test that you get the same messages from a poll as from signal watches. */
347 GST_START_TEST (test_watch_with_poll)
348 {
349   guint i;
350
351   test_bus = gst_bus_new ();
352   messages_seen = 0;
353
354   gst_bus_add_signal_watch (test_bus);
355   g_signal_connect (test_bus, "message", (GCallback) message_func, NULL);
356
357   send_10_app_messages ();
358
359   for (i = 0; i < 10; i++)
360     gst_message_unref (gst_bus_poll (test_bus, GST_MESSAGE_APPLICATION,
361             GST_CLOCK_TIME_NONE));
362
363   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
364   fail_unless (messages_seen == 10, "signal handler didn't get 10 messages");
365
366   gst_bus_remove_signal_watch (test_bus);
367
368   gst_object_unref (test_bus);
369 }
370
371 GST_END_TEST;
372
373 /* test that you get the messages with pop. */
374 GST_START_TEST (test_timed_pop)
375 {
376   guint i;
377
378   test_bus = gst_bus_new ();
379
380   send_10_app_messages ();
381
382   for (i = 0; i < 10; i++)
383     gst_message_unref (gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE));
384
385   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
386
387   gst_object_unref (test_bus);
388 }
389
390 GST_END_TEST;
391
392 /* test that you get the messages with pop_filtered */
393 GST_START_TEST (test_timed_pop_filtered)
394 {
395   GstMessage *msg;
396   guint i;
397
398   test_bus = gst_bus_new ();
399
400   send_10_app_messages ();
401   for (i = 0; i < 10; i++) {
402     msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
403         GST_MESSAGE_ANY);
404     fail_unless (msg != NULL);
405     gst_message_unref (msg);
406   }
407
408   /* should flush all messages on the bus with types not matching */
409   send_10_app_messages ();
410   msg = gst_bus_timed_pop_filtered (test_bus, 0,
411       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
412   fail_unless (msg == NULL);
413   msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 2,
414       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
415   fail_unless (msg == NULL);
416   /* there should be nothing on the bus now */
417   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
418   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ANY);
419   fail_unless (msg == NULL);
420
421   send_5app_1el_1err_2app_messages (0);
422   msg = gst_bus_timed_pop_filtered (test_bus, 0,
423       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
424   fail_unless (msg != NULL);
425   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
426   gst_message_unref (msg);
427   fail_unless (gst_bus_have_pending (test_bus), "expected messages on bus");
428   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_APPLICATION);
429   fail_unless (msg != NULL);
430   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
431   gst_message_unref (msg);
432   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ERROR);
433   fail_unless (msg == NULL);
434
435   gst_object_unref (test_bus);
436 }
437
438 GST_END_TEST;
439
440 static gpointer
441 post_delayed_thread (gpointer data)
442 {
443   THREAD_START ();
444   send_5app_1el_1err_2app_messages (1 * G_USEC_PER_SEC);
445   return NULL;
446 }
447
448 /* test that you get the messages with pop_filtered if there's a timeout*/
449 GST_START_TEST (test_timed_pop_filtered_with_timeout)
450 {
451   GstMessage *msg;
452
453   MAIN_INIT ();
454
455   test_bus = gst_bus_new ();
456
457   MAIN_START_THREAD_FUNCTIONS (1, post_delayed_thread, NULL);
458
459   MAIN_SYNCHRONIZE ();
460
461   msg = gst_bus_timed_pop_filtered (test_bus, 2 * GST_SECOND,
462       GST_MESSAGE_ERROR);
463   fail_unless (msg == NULL, "Got unexpected %s message",
464       (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
465   msg = gst_bus_timed_pop_filtered (test_bus, (3 + 1 + 1 + 1) * GST_SECOND,
466       GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT);
467   fail_unless (msg != NULL, "expected element message, but got nothing");
468   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
469   gst_message_unref (msg);
470   msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
471       GST_MESSAGE_APPLICATION);
472   fail_unless (msg != NULL, "expected application message, but got nothing");
473   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
474   gst_message_unref (msg);
475   msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
476       GST_MESSAGE_APPLICATION);
477   fail_unless (msg != NULL, "expected application message, but got nothing");
478   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
479   gst_message_unref (msg);
480   msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 4,
481       GST_MESSAGE_TAG | GST_MESSAGE_ERROR);
482   fail_unless (msg == NULL, "Got unexpected %s message",
483       (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
484
485   MAIN_STOP_THREADS ();
486
487   gst_object_unref (test_bus);
488 }
489
490 GST_END_TEST;
491
492 /* test that you get the messages with pop from another thread. */
493 static gpointer
494 pop_thread (gpointer data)
495 {
496   GstBus *bus = GST_BUS_CAST (data);
497   guint i;
498
499   for (i = 0; i < 10; i++)
500     gst_message_unref (gst_bus_timed_pop (bus, GST_CLOCK_TIME_NONE));
501
502   return NULL;
503 }
504
505 GST_START_TEST (test_timed_pop_thread)
506 {
507   GThread *thread;
508   GError *error = NULL;
509
510   test_bus = gst_bus_new ();
511
512   thread = g_thread_create (pop_thread, test_bus, TRUE, &error);
513   fail_if (error != NULL);
514
515   send_10_app_messages ();
516
517   g_thread_join (thread);
518
519   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
520
521   /* try to pop a message without timeout. */
522   fail_if (gst_bus_timed_pop (test_bus, 0) != NULL);
523
524   /* with a small timeout */
525   fail_if (gst_bus_timed_pop (test_bus, 1000) != NULL);
526
527   gst_object_unref (test_bus);
528 }
529
530 GST_END_TEST;
531
532 static gboolean
533 cb_bus_call (GstBus * bus, GstMessage * msg, gpointer data)
534 {
535   GMainLoop *loop = data;
536
537   switch (GST_MESSAGE_TYPE (msg)) {
538     case GST_MESSAGE_EOS:
539     {
540       GST_INFO ("End-of-stream");
541       g_main_loop_quit (loop);
542       break;
543     }
544     case GST_MESSAGE_ERROR:
545     {
546       GError *err = NULL;
547
548       gst_message_parse_error (msg, &err, NULL);
549       g_error ("Error: %s", err->message);
550       g_error_free (err);
551
552       g_main_loop_quit (loop);
553       break;
554     }
555     default:
556     {
557       GST_LOG ("BUS MESSAGE: type=%s", GST_MESSAGE_TYPE_NAME (msg));
558       break;
559     }
560   }
561
562   return TRUE;
563 }
564
565 GST_START_TEST (test_custom_main_context)
566 {
567   GMainContext *ctx;
568   GMainLoop *loop;
569   GstElement *pipeline;
570   GstElement *src;
571   GstElement *sink;
572   GSource *source;
573   GstBus *bus;
574
575   ctx = g_main_context_new ();
576   loop = g_main_loop_new (ctx, FALSE);
577
578   pipeline = gst_pipeline_new (NULL);
579   src = gst_element_factory_make ("fakesrc", NULL);
580   g_object_set (src, "num-buffers", 2000, NULL);
581
582   sink = gst_element_factory_make ("fakesink", NULL);
583
584   fail_unless (gst_bin_add (GST_BIN (pipeline), src));
585   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
586   fail_unless (gst_element_link (src, sink));
587
588   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
589   source = gst_bus_create_watch (bus);
590   g_source_attach (source, ctx);
591   g_source_set_callback (source, (GSourceFunc) cb_bus_call, loop, NULL);
592   g_source_unref (source);
593   gst_object_unref (bus);
594
595   GST_INFO ("starting pipeline");
596
597   gst_element_set_state (pipeline, GST_STATE_PLAYING);
598   gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
599
600   GST_INFO ("running event loop, ctx=%p", ctx);
601   g_main_loop_run (loop);
602
603   gst_element_set_state (pipeline, GST_STATE_NULL);
604
605   /* clean up */
606   if (ctx)
607     g_main_context_unref (ctx);
608   g_main_loop_unref (loop);
609   gst_object_unref (pipeline);
610 }
611
612 GST_END_TEST;
613
614 static Suite *
615 gst_bus_suite (void)
616 {
617   Suite *s = suite_create ("GstBus");
618   TCase *tc_chain = tcase_create ("stresstest");
619
620   tcase_set_timeout (tc_chain, 60);
621
622   suite_add_tcase (s, tc_chain);
623   tcase_add_test (tc_chain, test_hammer_bus);
624   tcase_add_test (tc_chain, test_watch);
625   tcase_add_test (tc_chain, test_watch_with_poll);
626   tcase_add_test (tc_chain, test_watch_with_custom_context);
627   tcase_add_test (tc_chain, test_add_watch_with_custom_context);
628   tcase_add_test (tc_chain, test_timed_pop);
629   tcase_add_test (tc_chain, test_timed_pop_thread);
630   tcase_add_test (tc_chain, test_timed_pop_filtered);
631   tcase_add_test (tc_chain, test_timed_pop_filtered_with_timeout);
632   tcase_add_test (tc_chain, test_custom_main_context);
633   return s;
634 }
635
636 GST_CHECK_MAIN (gst_bus);