check/gst/gstbin.c: Change for new bus API.
[platform/upstream/gstreamer.git] / check / gst / gstbus.c
1 /* GStreamer
2  * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
3  *
4  * gstbus.c: Unit test for the message bus
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 #include <gst/check/gstcheck.h>
23
24 static GstBus *test_bus = NULL;
25 static GMainLoop *main_loop;
26
27 #define NUM_MESSAGES 1000
28 #define NUM_THREADS 10
29
30 static gpointer
31 pound_bus_with_messages (gpointer data)
32 {
33   gint thread_id = GPOINTER_TO_INT (data);
34   gint i;
35
36   for (i = 0; i < NUM_MESSAGES; i++) {
37     GstMessage *m;
38     GstStructure *s;
39
40     s = gst_structure_new ("test_message",
41         "thread_id", G_TYPE_INT, thread_id, "msg_id", G_TYPE_INT, i, NULL);
42     m = gst_message_new_application (NULL, s);
43     gst_bus_post (test_bus, m);
44   }
45   return NULL;
46 }
47
48 static void
49 pull_messages ()
50 {
51   GstMessage *m;
52   const GstStructure *s;
53   guint message_ids[NUM_THREADS];
54   gint i;
55
56   for (i = 0; i < NUM_THREADS; i++)
57     message_ids[i] = 0;
58
59   while (1) {
60     gint _t, _i;
61
62     m = gst_bus_pop (test_bus);
63     if (!m)
64       break;
65     g_return_if_fail (GST_MESSAGE_TYPE (m) == GST_MESSAGE_APPLICATION);
66
67     s = gst_message_get_structure (m);
68     if (!gst_structure_get_int (s, "thread_id", &_t))
69       g_critical ("Invalid message");
70     if (!gst_structure_get_int (s, "msg_id", &_i))
71       g_critical ("Invalid message");
72
73     g_return_if_fail (_t < NUM_THREADS);
74     g_return_if_fail (_i == message_ids[_t]++);
75
76     gst_message_unref (m);
77   }
78
79   for (i = 0; i < NUM_THREADS; i++)
80     g_return_if_fail (message_ids[i] == NUM_MESSAGES);
81 }
82
83 GST_START_TEST (test_hammer_bus)
84 {
85   GThread *threads[NUM_THREADS];
86   gint i;
87
88   test_bus = gst_bus_new ();
89
90   for (i = 0; i < NUM_THREADS; i++)
91     threads[i] = g_thread_create (pound_bus_with_messages, GINT_TO_POINTER (i),
92         TRUE, NULL);
93
94   for (i = 0; i < NUM_THREADS; i++)
95     g_thread_join (threads[i]);
96
97   pull_messages ();
98
99   gst_object_unref ((GstObject *) test_bus);
100 }
101 GST_END_TEST static gboolean
102 message_func_eos (GstBus * bus, GstMessage * message, gpointer data)
103 {
104   const GstStructure *s;
105   gint i;
106
107   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
108
109   GST_DEBUG ("got EOS message");
110
111   s = gst_message_get_structure (message);
112   if (!gst_structure_get_int (s, "msg_id", &i))
113     g_critical ("Invalid message");
114
115   return i != 9;
116 }
117
118 static gboolean
119 message_func_app (GstBus * bus, GstMessage * message, gpointer data)
120 {
121   const GstStructure *s;
122   gint i;
123
124   g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
125       FALSE);
126
127   GST_DEBUG ("got APP message");
128
129   s = gst_message_get_structure (message);
130   if (!gst_structure_get_int (s, "msg_id", &i))
131     g_critical ("Invalid message");
132
133   return i != 9;
134 }
135
136 static gboolean
137 send_messages (gpointer data)
138 {
139   GstMessage *m;
140   GstStructure *s;
141   gint i;
142
143   for (i = 0; i < 10; i++) {
144     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
145     m = gst_message_new_application (NULL, s);
146     gst_bus_post (test_bus, m);
147     s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
148     m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
149     gst_bus_post (test_bus, m);
150   }
151
152   return FALSE;
153 }
154
155 /* test if adding a signal watch for different message types calls the
156  * respective callbacks. */
157 GST_START_TEST (test_watch)
158 {
159   guint id;
160
161   test_bus = gst_bus_new ();
162
163   main_loop = g_main_loop_new (NULL, FALSE);
164
165   id = gst_bus_add_watch (test_bus, gst_bus_async_signal_func, NULL);
166   g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
167       NULL);
168   g_signal_connect (test_bus, "message::application",
169       (GCallback) message_func_app, NULL);
170
171   g_idle_add ((GSourceFunc) send_messages, NULL);
172   while (g_main_context_pending (NULL))
173     g_main_context_iteration (NULL, FALSE);
174
175   g_source_remove (id);
176   g_main_loop_unref (main_loop);
177
178   gst_object_unref ((GstObject *) test_bus);
179 }
180 GST_END_TEST Suite * gstbus_suite (void)
181 {
182   Suite *s = suite_create ("GstBus");
183   TCase *tc_chain = tcase_create ("stresstest");
184
185   tcase_set_timeout (tc_chain, 20);
186
187   suite_add_tcase (s, tc_chain);
188   tcase_add_test (tc_chain, test_hammer_bus);
189   tcase_add_test (tc_chain, test_watch);
190   return s;
191 }
192
193 int
194 main (int argc, char **argv)
195 {
196   int nf;
197
198   Suite *s = gstbus_suite ();
199   SRunner *sr = srunner_create (s);
200
201   gst_check_init (&argc, &argv);
202
203   srunner_run_all (sr, CK_NORMAL);
204   nf = srunner_ntests_failed (sr);
205   srunner_free (sr);
206
207   return nf;
208 }