And correct even more valid sparse warnings.
[platform/upstream/gstreamer.git] / tests / check / gst / gstbus.c
1 /* GStreamer message bus unit tests
2  * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
3  * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #include <gst/check/gstcheck.h>
22
23 static GstBus *test_bus = NULL;
24 static GMainLoop *main_loop;
25
26 #define NUM_MESSAGES 1000
27 #define NUM_THREADS 10
28
29 static gpointer
30 pound_bus_with_messages (gpointer data)
31 {
32   gint thread_id = GPOINTER_TO_INT (data);
33   gint i;
34
35   for (i = 0; i < NUM_MESSAGES; i++) {
36     GstMessage *m;
37     GstStructure *s;
38
39     s = gst_structure_new ("test_message",
40         "thread_id", G_TYPE_INT, thread_id, "msg_id", G_TYPE_INT, i, NULL);
41     m = gst_message_new_application (NULL, s);
42     gst_bus_post (test_bus, m);
43   }
44   return NULL;
45 }
46
47 static void
48 pull_messages (void)
49 {
50   GstMessage *m;
51   const GstStructure *s;
52   guint message_ids[NUM_THREADS];
53   gint i;
54
55   for (i = 0; i < NUM_THREADS; i++)
56     message_ids[i] = 0;
57
58   while (1) {
59     gint _t, _i;
60
61     m = gst_bus_pop (test_bus);
62     if (!m)
63       break;
64     g_return_if_fail (GST_MESSAGE_TYPE (m) == GST_MESSAGE_APPLICATION);
65
66     s = gst_message_get_structure (m);
67     if (!gst_structure_get_int (s, "thread_id", &_t))
68       g_critical ("Invalid message");
69     if (!gst_structure_get_int (s, "msg_id", &_i))
70       g_critical ("Invalid message");
71
72     g_return_if_fail (_t < NUM_THREADS);
73     g_return_if_fail (_i == message_ids[_t]++);
74
75     gst_message_unref (m);
76   }
77
78   for (i = 0; i < NUM_THREADS; i++)
79     g_return_if_fail (message_ids[i] == NUM_MESSAGES);
80 }
81
82 GST_START_TEST (test_hammer_bus)
83 {
84   GThread *threads[NUM_THREADS];
85   gint i;
86
87   test_bus = gst_bus_new ();
88
89   for (i = 0; i < NUM_THREADS; i++)
90     threads[i] = g_thread_create (pound_bus_with_messages, GINT_TO_POINTER (i),
91         TRUE, NULL);
92
93   for (i = 0; i < NUM_THREADS; i++)
94     g_thread_join (threads[i]);
95
96   pull_messages ();
97
98   gst_object_unref ((GstObject *) test_bus);
99 }
100
101 GST_END_TEST;
102
103 static gboolean
104 message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
105 {
106   const GstStructure *s;
107   gint i;
108
109   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
110
111   GST_DEBUG ("got EOS message");
112
113   s = gst_message_get_structure (message);
114   if (!gst_structure_get_int (s, "msg_id", &i))
115     g_critical ("Invalid message");
116
117   return i != 9;
118 }
119
120 static gboolean
121 message_func_app (GstBus * bus, GstMessage * message, gpointer data)
122 {
123   const GstStructure *s;
124   gint i;
125
126   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
127       FALSE);
128
129   GST_DEBUG ("got APP message");
130
131   s = gst_message_get_structure (message);
132   if (!gst_structure_get_int (s, "msg_id", &i))
133     g_critical ("Invalid message");
134
135   return i != 9;
136 }
137
138 static gboolean
139 send_messages (gpointer data)
140 {
141   GstMessage *m;
142   GstStructure *s;
143   gint i;
144
145   for (i = 0; i < 10; i++) {
146     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
147     m = gst_message_new_application (NULL, s);
148     gst_bus_post (test_bus, m);
149     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
150     m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
151     gst_bus_post (test_bus, m);
152   }
153
154   return FALSE;
155 }
156
157 /* test if adding a signal watch for different message types calls the
158  * respective callbacks. */
159 GST_START_TEST (test_watch)
160 {
161   guint id;
162
163   test_bus = gst_bus_new ();
164
165   main_loop = g_main_loop_new (NULL, FALSE);
166
167   id = gst_bus_add_watch (test_bus, gst_bus_async_signal_func, NULL);
168   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
169       NULL);
170   g_signal_connect (test_bus, "message::application",
171       (GCallback) message_func_app, NULL);
172
173   g_idle_add ((GSourceFunc) send_messages, NULL);
174   while (g_main_context_pending (NULL))
175     g_main_context_iteration (NULL, FALSE);
176
177   g_source_remove (id);
178   g_main_loop_unref (main_loop);
179
180   gst_object_unref ((GstObject *) test_bus);
181 }
182
183 GST_END_TEST;
184
185 static gint messages_seen;
186
187 static void
188 message_func (GstBus * bus, GstMessage * message, gpointer data)
189 {
190   g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
191
192   messages_seen++;
193 }
194
195 static void
196 send_5app_1el_1err_2app_messages (guint interval_usecs)
197 {
198   GstMessage *m;
199   GstStructure *s;
200   gint i;
201
202   for (i = 0; i < 5; i++) {
203     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
204     m = gst_message_new_application (NULL, s);
205     GST_LOG ("posting application message");
206     gst_bus_post (test_bus, m);
207     g_usleep (interval_usecs);
208   }
209   for (i = 0; i < 1; i++) {
210     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
211     m = gst_message_new_element (NULL, s);
212     GST_LOG ("posting element message");
213     gst_bus_post (test_bus, m);
214     g_usleep (interval_usecs);
215   }
216   for (i = 0; i < 1; i++) {
217     m = gst_message_new_error (NULL, NULL, "debug string");
218     GST_LOG ("posting error message");
219     gst_bus_post (test_bus, m);
220     g_usleep (interval_usecs);
221   }
222   for (i = 0; i < 2; i++) {
223     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
224     m = gst_message_new_application (NULL, s);
225     GST_LOG ("posting application message");
226     gst_bus_post (test_bus, m);
227     g_usleep (interval_usecs);
228   }
229 }
230
231 static void
232 send_10_app_messages (void)
233 {
234   GstMessage *m;
235   GstStructure *s;
236   gint i;
237
238   for (i = 0; i < 10; i++) {
239     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
240     m = gst_message_new_application (NULL, s);
241     gst_bus_post (test_bus, m);
242   }
243 }
244
245 /* test that you get the same messages from a poll as from signal watches. */
246 GST_START_TEST (test_watch_with_poll)
247 {
248   guint i;
249
250   test_bus = gst_bus_new ();
251   messages_seen = 0;
252
253   gst_bus_add_signal_watch (test_bus);
254   g_signal_connect (test_bus, "message", (GCallback) message_func, NULL);
255
256   send_10_app_messages ();
257
258   for (i = 0; i < 10; i++)
259     gst_message_unref (gst_bus_poll (test_bus, GST_MESSAGE_APPLICATION,
260             GST_CLOCK_TIME_NONE));
261
262   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
263   fail_unless (messages_seen == 10, "signal handler didn't get 10 messages");
264
265   gst_bus_remove_signal_watch (test_bus);
266
267   gst_object_unref (test_bus);
268 }
269
270 GST_END_TEST;
271
272 /* test that you get the messages with pop. */
273 GST_START_TEST (test_timed_pop)
274 {
275   guint i;
276
277   test_bus = gst_bus_new ();
278
279   send_10_app_messages ();
280
281   for (i = 0; i < 10; i++)
282     gst_message_unref (gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE));
283
284   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
285
286   gst_object_unref (test_bus);
287 }
288
289 GST_END_TEST;
290
291 /* test that you get the messages with pop_filtered */
292 GST_START_TEST (test_timed_pop_filtered)
293 {
294   GstMessage *msg;
295   guint i;
296
297   test_bus = gst_bus_new ();
298
299   send_10_app_messages ();
300   for (i = 0; i < 10; i++) {
301     msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
302         GST_MESSAGE_ANY);
303     fail_unless (msg != NULL);
304     gst_message_unref (msg);
305   }
306
307   /* should flush all messages on the bus with types not matching */
308   send_10_app_messages ();
309   msg = gst_bus_timed_pop_filtered (test_bus, 0,
310       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
311   fail_unless (msg == NULL);
312   msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 2,
313       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
314   fail_unless (msg == NULL);
315   /* there should be nothing on the bus now */
316   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
317   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ANY);
318   fail_unless (msg == NULL);
319
320   send_5app_1el_1err_2app_messages (0);
321   msg = gst_bus_timed_pop_filtered (test_bus, 0,
322       GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
323   fail_unless (msg != NULL);
324   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
325   gst_message_unref (msg);
326   fail_unless (gst_bus_have_pending (test_bus), "expected messages on bus");
327   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_APPLICATION);
328   fail_unless (msg != NULL);
329   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
330   gst_message_unref (msg);
331   msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ERROR);
332   fail_unless (msg == NULL);
333
334   gst_object_unref (test_bus);
335 }
336
337 GST_END_TEST;
338
339 static gpointer
340 post_delayed_thread (gpointer data)
341 {
342   THREAD_START ();
343   send_5app_1el_1err_2app_messages (1 * G_USEC_PER_SEC);
344   return NULL;
345 }
346
347 /* test that you get the messages with pop_filtered if there's a timeout*/
348 GST_START_TEST (test_timed_pop_filtered_with_timeout)
349 {
350   GstMessage *msg;
351
352   MAIN_INIT ();
353
354   test_bus = gst_bus_new ();
355
356   MAIN_START_THREAD_FUNCTIONS (1, post_delayed_thread, NULL);
357
358   MAIN_SYNCHRONIZE ();
359
360   msg = gst_bus_timed_pop_filtered (test_bus, 2 * GST_SECOND,
361       GST_MESSAGE_ERROR);
362   fail_unless (msg == NULL, "Got unexpected %s message",
363       (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
364   msg = gst_bus_timed_pop_filtered (test_bus, (3 + 1 + 1 + 1) * GST_SECOND,
365       GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT);
366   fail_unless (msg != NULL, "expected element message, but got nothing");
367   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
368   gst_message_unref (msg);
369   msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
370       GST_MESSAGE_APPLICATION);
371   fail_unless (msg != NULL, "expected application message, but got nothing");
372   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
373   gst_message_unref (msg);
374   msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
375       GST_MESSAGE_APPLICATION);
376   fail_unless (msg != NULL, "expected application message, but got nothing");
377   fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
378   gst_message_unref (msg);
379   msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 4,
380       GST_MESSAGE_TAG | GST_MESSAGE_ERROR);
381   fail_unless (msg == NULL, "Got unexpected %s message",
382       (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
383
384   MAIN_STOP_THREADS ();
385
386   gst_object_unref (test_bus);
387 }
388
389 GST_END_TEST;
390
391 /* test that you get the messages with pop from another thread. */
392 static gpointer
393 pop_thread (gpointer data)
394 {
395   GstBus *bus = GST_BUS_CAST (data);
396   guint i;
397
398   for (i = 0; i < 10; i++)
399     gst_message_unref (gst_bus_timed_pop (bus, GST_CLOCK_TIME_NONE));
400
401   return NULL;
402 }
403
404 GST_START_TEST (test_timed_pop_thread)
405 {
406   GThread *thread;
407   GError *error = NULL;
408
409   test_bus = gst_bus_new ();
410
411   thread = g_thread_create (pop_thread, test_bus, TRUE, &error);
412   fail_if (error != NULL);
413
414   send_10_app_messages ();
415
416   g_thread_join (thread);
417
418   fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
419
420   /* try to pop a message without timeout. */
421   fail_if (gst_bus_timed_pop (test_bus, 0) != NULL);
422
423   /* with a small timeout */
424   fail_if (gst_bus_timed_pop (test_bus, 1000) != NULL);
425
426   gst_object_unref (test_bus);
427 }
428
429 GST_END_TEST;
430
431 static Suite *
432 gst_bus_suite (void)
433 {
434   Suite *s = suite_create ("GstBus");
435   TCase *tc_chain = tcase_create ("stresstest");
436
437   tcase_set_timeout (tc_chain, 20);
438
439   suite_add_tcase (s, tc_chain);
440   tcase_add_test (tc_chain, test_hammer_bus);
441   tcase_add_test (tc_chain, test_watch);
442   tcase_add_test (tc_chain, test_watch_with_poll);
443   tcase_add_test (tc_chain, test_timed_pop);
444   tcase_add_test (tc_chain, test_timed_pop_thread);
445   tcase_add_test (tc_chain, test_timed_pop_filtered);
446   tcase_add_test (tc_chain, test_timed_pop_filtered_with_timeout);
447   return s;
448 }
449
450 GST_CHECK_MAIN (gst_bus);