bf479aa8eee6c9fd25d52ba9e41d2413c928f74d
[platform/upstream/gstreamer.git] / tests / check / elements / multiqueue.c
1 /* GStreamer unit tests for multiqueue
2  *
3  * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #include <unistd.h>
22
23 #include <gst/check/gstcheck.h>
24
25 static GStaticMutex _check_lock = G_STATIC_MUTEX_INIT;
26
27 static GstElement *
28 setup_multiqueue (GstElement * pipe, GstElement * inputs[],
29     GstElement * outputs[], guint num)
30 {
31   GstElement *mq;
32   guint i;
33
34   mq = gst_element_factory_make ("multiqueue", NULL);
35   fail_unless (mq != NULL, "failed to create 'multiqueue' element");
36
37   gst_bin_add (GST_BIN (pipe), mq);
38
39   for (i = 0; i < num; ++i) {
40     GstPad *sinkpad = NULL;
41     GstPad *srcpad = NULL;
42
43     /* create multiqueue sink (and source) pad */
44     sinkpad = gst_element_get_request_pad (mq, "sink%d");
45     fail_unless (sinkpad != NULL,
46         "failed to create multiqueue request pad #%u", i);
47
48     /* link input element N to the N-th multiqueue sink pad we just created */
49     if (inputs != NULL && inputs[i] != NULL) {
50       gst_bin_add (GST_BIN (pipe), inputs[i]);
51
52       srcpad = gst_element_get_static_pad (inputs[i], "src");
53       fail_unless (srcpad != NULL, "failed to find src pad for input #%u", i);
54
55       fail_unless_equals_int (GST_PAD_LINK_OK, gst_pad_link (srcpad, sinkpad));
56
57       gst_object_unref (srcpad);
58       srcpad = NULL;
59     }
60     gst_object_unref (sinkpad);
61     sinkpad = NULL;
62
63     /* link output element N to the N-th multiqueue src pad */
64     if (outputs != NULL && outputs[i] != NULL) {
65       gchar padname[10];
66
67       /* only the sink pads are by request, the source pads are sometimes pads,
68        * so this should return NULL */
69       srcpad = gst_element_get_request_pad (mq, "src%d");
70       fail_unless (srcpad == NULL);
71
72       g_snprintf (padname, sizeof (padname), "src%d", i);
73       srcpad = gst_element_get_static_pad (mq, padname);
74       fail_unless (srcpad != NULL, "failed to get multiqueue src pad #%u", i);
75       fail_unless (GST_PAD_IS_SRC (srcpad),
76           "%s:%s is not a source pad?!", GST_DEBUG_PAD_NAME (srcpad));
77
78       gst_bin_add (GST_BIN (pipe), outputs[i]);
79
80       sinkpad = gst_element_get_static_pad (outputs[i], "sink");
81       fail_unless (sinkpad != NULL, "failed to find sink pad of output #%u", i);
82       fail_unless (GST_PAD_IS_SINK (sinkpad));
83
84       fail_unless_equals_int (GST_PAD_LINK_OK, gst_pad_link (srcpad, sinkpad));
85
86       gst_object_unref (srcpad);
87       gst_object_unref (sinkpad);
88     }
89   }
90
91   return mq;
92 }
93
94 GST_START_TEST (test_simple_pipeline)
95 {
96   GstElement *pipe;
97   GstElement *inputs[1];
98   GstElement *outputs[1];
99   GstMessage *msg;
100
101   pipe = gst_pipeline_new ("pipeline");
102
103   inputs[0] = gst_element_factory_make ("fakesrc", NULL);
104   fail_unless (inputs[0] != NULL, "failed to create 'fakesrc' element");
105   g_object_set (inputs[0], "num-buffers", 256, NULL);
106
107   outputs[0] = gst_element_factory_make ("fakesink", NULL);
108   fail_unless (outputs[0] != NULL, "failed to create 'fakesink' element");
109
110   setup_multiqueue (pipe, inputs, outputs, 1);
111
112   gst_element_set_state (pipe, GST_STATE_PLAYING);
113
114   msg = gst_bus_poll (GST_ELEMENT_BUS (pipe),
115       GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
116
117   fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR,
118       "Expected EOS message, got ERROR message");
119   gst_message_unref (msg);
120
121   GST_LOG ("Got EOS, cleaning up");
122
123   gst_element_set_state (pipe, GST_STATE_NULL);
124   gst_object_unref (pipe);
125 }
126
127 GST_END_TEST;
128
129 GST_START_TEST (test_simple_shutdown_while_running)
130 {
131   GstElement *pipe;
132   GstElement *inputs[1];
133   GstElement *outputs[1];
134   GstMessage *msg;
135
136   pipe = gst_pipeline_new ("pipeline");
137
138   inputs[0] = gst_element_factory_make ("fakesrc", NULL);
139   fail_unless (inputs[0] != NULL, "failed to create 'fakesrc' element");
140
141   outputs[0] = gst_element_factory_make ("fakesink", NULL);
142   fail_unless (outputs[0] != NULL, "failed to create 'fakesink' element");
143
144   setup_multiqueue (pipe, inputs, outputs, 1);
145
146   gst_element_set_state (pipe, GST_STATE_PAUSED);
147
148   /* wait until pipeline is up and running */
149   msg = gst_bus_poll (GST_ELEMENT_BUS (pipe),
150       GST_MESSAGE_ERROR | GST_MESSAGE_ASYNC_DONE, -1);
151   fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR, "Got ERROR message");
152   gst_message_unref (msg);
153
154   GST_LOG ("pipeline is running now");
155   gst_element_set_state (pipe, GST_STATE_PAUSED);
156
157   /* wait a bit to accumulate some buffers in the queue (while it's blocking
158    * in the sink) */
159   msg =
160       gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_ERROR, GST_SECOND / 4);
161   if (msg)
162     g_error ("Got ERROR message");
163
164   /* now shut down only the sink, so the queue gets a wrong-state flow return */
165   gst_element_set_state (outputs[0], GST_STATE_NULL);
166   msg =
167       gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_ERROR, GST_SECOND / 2);
168   if (msg)
169     g_error ("Got ERROR message");
170
171   GST_LOG ("Cleaning up");
172
173   gst_element_set_state (pipe, GST_STATE_NULL);
174   gst_object_unref (pipe);
175 }
176
177 GST_END_TEST;
178
179 GST_START_TEST (test_simple_create_destroy)
180 {
181   GstElement *mq;
182
183   mq = gst_element_factory_make ("multiqueue", NULL);
184   gst_object_unref (mq);
185 }
186
187 GST_END_TEST;
188
189 GST_START_TEST (test_request_pads)
190 {
191   GstElement *mq;
192   GstPad *sink1, *sink2;
193
194   mq = gst_element_factory_make ("multiqueue", NULL);
195
196   sink1 = gst_element_get_request_pad (mq, "foo%d");
197   fail_unless (sink1 == NULL,
198       "Expected NULL pad, as there is no request pad template for 'foo%%d'");
199
200   sink1 = gst_element_get_request_pad (mq, "src%d");
201   fail_unless (sink1 == NULL,
202       "Expected NULL pad, as there is no request pad template for 'src%%d'");
203
204   sink1 = gst_element_get_request_pad (mq, "sink%d");
205   fail_unless (sink1 != NULL);
206   fail_unless (GST_IS_PAD (sink1));
207   fail_unless (GST_PAD_IS_SINK (sink1));
208   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink1));
209
210   sink2 = gst_element_get_request_pad (mq, "sink%d");
211   fail_unless (sink2 != NULL);
212   fail_unless (GST_IS_PAD (sink2));
213   fail_unless (GST_PAD_IS_SINK (sink2));
214   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink2));
215
216   fail_unless (sink1 != sink2);
217
218   GST_LOG ("Cleaning up");
219   gst_object_unref (sink1);
220   gst_object_unref (sink2);
221   gst_object_unref (mq);
222 }
223
224 GST_END_TEST;
225
226 static GstPad *
227 mq_sinkpad_to_srcpad (GstElement * mq, GstPad * sink)
228 {
229   GstPad *srcpad = NULL;
230
231   gchar *mq_sinkpad_name;
232   gchar *mq_srcpad_name;
233
234   mq_sinkpad_name = gst_pad_get_name (sink);
235   fail_unless (g_str_has_prefix (mq_sinkpad_name, "sink"));
236   mq_srcpad_name = g_strdup_printf ("src%s", mq_sinkpad_name + 4);
237   srcpad = gst_element_get_static_pad (mq, mq_srcpad_name);
238   fail_unless (srcpad != NULL);
239
240   g_free (mq_sinkpad_name);
241   g_free (mq_srcpad_name);
242
243   return srcpad;
244 }
245
246 GST_START_TEST (test_request_pads_named)
247 {
248   GstElement *mq;
249   GstPad *sink1, *sink2, *sink3, *sink4;
250
251   mq = gst_element_factory_make ("multiqueue", NULL);
252
253   sink1 = gst_element_get_request_pad (mq, "sink1");
254   fail_unless (sink1 != NULL);
255   fail_unless (GST_IS_PAD (sink1));
256   fail_unless (GST_PAD_IS_SINK (sink1));
257   fail_unless_equals_string (GST_PAD_NAME (sink1), "sink1");
258   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink1));
259
260   sink3 = gst_element_get_request_pad (mq, "sink3");
261   fail_unless (sink3 != NULL);
262   fail_unless (GST_IS_PAD (sink3));
263   fail_unless (GST_PAD_IS_SINK (sink3));
264   fail_unless_equals_string (GST_PAD_NAME (sink3), "sink3");
265   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink3));
266
267   sink2 = gst_element_get_request_pad (mq, "sink2");
268   fail_unless (sink2 != NULL);
269   fail_unless (GST_IS_PAD (sink2));
270   fail_unless (GST_PAD_IS_SINK (sink2));
271   fail_unless_equals_string (GST_PAD_NAME (sink2), "sink2");
272   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink2));
273
274   /* This gets us the first unused id, sink0 */
275   sink4 = gst_element_get_request_pad (mq, "sink%d");
276   fail_unless (sink4 != NULL);
277   fail_unless (GST_IS_PAD (sink4));
278   fail_unless (GST_PAD_IS_SINK (sink4));
279   fail_unless_equals_string (GST_PAD_NAME (sink4), "sink0");
280   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink4));
281
282   GST_LOG ("Cleaning up");
283   gst_object_unref (sink1);
284   gst_object_unref (sink2);
285   gst_object_unref (sink3);
286   gst_object_unref (sink4);
287   gst_object_unref (mq);
288 }
289
290 GST_END_TEST;
291
292 static GstCaps *
293 mq_dummypad_getcaps (GstPad * sinkpad)
294 {
295   return gst_caps_new_any ();
296 }
297
298 struct PadData
299 {
300   guint8 pad_num;
301   guint32 *max_linked_id_ptr;
302   guint32 *eos_count_ptr;
303   gboolean is_linked;
304   gboolean first_buf;
305   gint n_linked;
306
307   GMutex *mutex;
308   GCond *cond;
309 };
310
311 static GstFlowReturn
312 mq_dummypad_chain (GstPad * sinkpad, GstBuffer * buf)
313 {
314   guint32 cur_id;
315   struct PadData *pad_data;
316   guint8 *data;
317   gsize size;
318
319   pad_data = gst_pad_get_element_private (sinkpad);
320
321   g_static_mutex_lock (&_check_lock);
322   fail_if (pad_data == NULL);
323   /* Read an ID from the first 4 bytes of the buffer data and check it's
324    * what we expect */
325   data = gst_buffer_map (buf, &size, NULL, GST_MAP_READ);
326   fail_unless (size >= 4);
327   g_static_mutex_unlock (&_check_lock);
328   cur_id = GST_READ_UINT32_BE (data);
329   gst_buffer_unmap (buf, data, size);
330
331   g_mutex_lock (pad_data->mutex);
332
333   /* For not-linked pads, ensure that we're not running ahead of the 'linked'
334    * pads. The first buffer is allowed to get ahead, because otherwise things can't
335    * always pre-roll correctly */
336   if (!pad_data->is_linked) {
337     /* If there are no linked pads, we can't track a max_id for them :) */
338     if (pad_data->n_linked > 0 && !pad_data->first_buf) {
339       g_static_mutex_lock (&_check_lock);
340       fail_unless (cur_id <= *(pad_data->max_linked_id_ptr) + 1,
341           "Got buffer %u on pad %u before buffer %u was seen on a "
342           "linked pad (max: %u)", cur_id, pad_data->pad_num, cur_id - 1,
343           *(pad_data->max_linked_id_ptr));
344       g_static_mutex_unlock (&_check_lock);
345     }
346   } else {
347     /* Update the max_id value */
348     if (cur_id > *(pad_data->max_linked_id_ptr))
349       *(pad_data->max_linked_id_ptr) = cur_id;
350   }
351   pad_data->first_buf = FALSE;
352
353   g_mutex_unlock (pad_data->mutex);
354
355   /* Unref the buffer */
356   gst_buffer_unref (buf);
357
358   /* Return OK or not-linked as indicated */
359   return pad_data->is_linked ? GST_FLOW_OK : GST_FLOW_NOT_LINKED;
360 }
361
362 static gboolean
363 mq_dummypad_event (GstPad * sinkpad, GstEvent * event)
364 {
365   struct PadData *pad_data;
366
367   pad_data = gst_pad_get_element_private (sinkpad);
368   g_static_mutex_lock (&_check_lock);
369   fail_if (pad_data == NULL);
370   g_static_mutex_unlock (&_check_lock);
371
372   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
373     g_mutex_lock (pad_data->mutex);
374
375     /* Accumulate that we've seen the EOS and signal the main thread */
376     *(pad_data->eos_count_ptr) += 1;
377
378     GST_DEBUG ("EOS on pad %u", pad_data->pad_num);
379
380     g_cond_broadcast (pad_data->cond);
381     g_mutex_unlock (pad_data->mutex);
382   }
383
384   gst_event_unref (event);
385   return TRUE;
386 }
387
388 static void
389 run_output_order_test (gint n_linked)
390 {
391   /* This test creates a multiqueue with 2 linked output, and 3 outputs that 
392    * return 'not-linked' when data is pushed, then verifies that all buffers 
393    * are received on not-linked pads only after earlier buffers on the 
394    * 'linked' pads are made */
395   GstElement *pipe;
396   GstElement *mq;
397   GstPad *inputpads[5];
398   GstPad *sinkpads[5];
399   struct PadData pad_data[5];
400   guint32 max_linked_id;
401   guint32 eos_seen;
402   GMutex *mutex;
403   GCond *cond;
404   gint i;
405   const gint NPADS = 5;
406   const gint NBUFFERS = 1000;
407
408   mutex = g_mutex_new ();
409   cond = g_cond_new ();
410
411   pipe = gst_bin_new ("testbin");
412
413   mq = gst_element_factory_make ("multiqueue", NULL);
414   fail_unless (mq != NULL);
415   gst_bin_add (GST_BIN (pipe), mq);
416
417   /* No limits */
418   g_object_set (mq,
419       "max-size-bytes", (guint) 0,
420       "max-size-buffers", (guint) 0,
421       "max-size-time", (guint64) 0,
422       "extra-size-bytes", (guint) 0,
423       "extra-size-buffers", (guint) 0, "extra-size-time", (guint64) 0, NULL);
424
425   /* Construct NPADS dummy output pads. The first 'n_linked' return FLOW_OK, the rest
426    * return NOT_LINKED. The not-linked ones check the expected ordering of 
427    * output buffers */
428   for (i = 0; i < NPADS; i++) {
429     GstPad *mq_srcpad, *mq_sinkpad;
430     gchar *name;
431
432     name = g_strdup_printf ("dummysrc%d", i);
433     inputpads[i] = gst_pad_new (name, GST_PAD_SRC);
434     g_free (name);
435     gst_pad_set_getcaps_function (inputpads[i], mq_dummypad_getcaps);
436
437     mq_sinkpad = gst_element_get_request_pad (mq, "sink%d");
438     fail_unless (mq_sinkpad != NULL);
439     gst_pad_link (inputpads[i], mq_sinkpad);
440
441     gst_pad_set_active (inputpads[i], TRUE);
442
443     mq_srcpad = mq_sinkpad_to_srcpad (mq, mq_sinkpad);
444
445     name = g_strdup_printf ("dummysink%d", i);
446     sinkpads[i] = gst_pad_new (name, GST_PAD_SINK);
447     g_free (name);
448     gst_pad_set_chain_function (sinkpads[i], mq_dummypad_chain);
449     gst_pad_set_event_function (sinkpads[i], mq_dummypad_event);
450     gst_pad_set_getcaps_function (sinkpads[i], mq_dummypad_getcaps);
451
452     pad_data[i].pad_num = i;
453     pad_data[i].max_linked_id_ptr = &max_linked_id;
454     pad_data[i].eos_count_ptr = &eos_seen;
455     pad_data[i].is_linked = (i < n_linked ? TRUE : FALSE);
456     pad_data[i].n_linked = n_linked;
457     pad_data[i].cond = cond;
458     pad_data[i].mutex = mutex;
459     pad_data[i].first_buf = TRUE;
460     gst_pad_set_element_private (sinkpads[i], pad_data + i);
461
462     gst_pad_link (mq_srcpad, sinkpads[i]);
463     gst_pad_set_active (sinkpads[i], TRUE);
464
465     gst_object_unref (mq_sinkpad);
466     gst_object_unref (mq_srcpad);
467   }
468
469   /* Run the test. Push 1000 buffers through the multiqueue in a pattern */
470
471   max_linked_id = 0;
472   eos_seen = 0;
473   gst_element_set_state (pipe, GST_STATE_PLAYING);
474
475   for (i = 0; i < NBUFFERS; i++) {
476     const guint8 pad_pattern[] =
477         { 0, 0, 0, 0, 1, 1, 2, 1, 0, 2, 3, 2, 3, 1, 4 };
478     const guint n = sizeof (pad_pattern) / sizeof (guint8);
479     guint8 cur_pad;
480     GstBuffer *buf;
481     GstFlowReturn ret;
482     gpointer data;
483
484     cur_pad = pad_pattern[i % n];
485
486     buf = gst_buffer_new_and_alloc (4);
487     g_static_mutex_lock (&_check_lock);
488     fail_if (buf == NULL);
489     g_static_mutex_unlock (&_check_lock);
490
491     data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE);
492     GST_WRITE_UINT32_BE (data, i + 1);
493     gst_buffer_unmap (buf, data, 4);
494     GST_BUFFER_TIMESTAMP (buf) = (i + 1) * GST_SECOND;
495
496     ret = gst_pad_push (inputpads[cur_pad], buf);
497     g_static_mutex_lock (&_check_lock);
498     if (pad_data[cur_pad].is_linked) {
499       fail_unless (ret == GST_FLOW_OK,
500           "Push on pad %d returned %d when FLOW_OK was expected", cur_pad, ret);
501     } else {
502       /* Expect OK initially, then NOT_LINKED when the srcpad starts pushing */
503       fail_unless (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED,
504           "Push on pad %d returned %d when FLOW_OK or NOT_LINKED  was expected",
505           cur_pad, ret);
506     }
507     g_static_mutex_unlock (&_check_lock);
508   }
509   for (i = 0; i < NPADS; i++) {
510     gst_pad_push_event (inputpads[i], gst_event_new_eos ());
511   }
512
513   /* Wait while the buffers are processed */
514   g_mutex_lock (mutex);
515   /* We wait until EOS has been pushed on all linked pads */
516   while (eos_seen < n_linked) {
517     g_cond_wait (cond, mutex);
518   }
519   g_mutex_unlock (mutex);
520
521   /* Clean up */
522   for (i = 0; i < NPADS; i++) {
523     GstPad *mq_input = gst_pad_get_peer (inputpads[i]);
524
525     gst_pad_unlink (inputpads[i], mq_input);
526     gst_element_release_request_pad (mq, mq_input);
527     gst_object_unref (mq_input);
528     gst_object_unref (inputpads[i]);
529
530     gst_object_unref (sinkpads[i]);
531   }
532
533   gst_element_set_state (pipe, GST_STATE_NULL);
534   gst_object_unref (pipe);
535
536   g_cond_free (cond);
537   g_mutex_free (mutex);
538 }
539
540 GST_START_TEST (test_output_order)
541 {
542   run_output_order_test (2);
543   run_output_order_test (0);
544 }
545
546 GST_END_TEST;
547
548 GST_START_TEST (test_sparse_stream)
549 {
550   /* This test creates a multiqueue with 2 streams. One receives
551    * a constant flow of buffers, the other only gets one buffer, and then
552    * new-segment events, and returns not-linked. The multiqueue should not fill.
553    */
554   GstElement *pipe;
555   GstElement *mq;
556   GstPad *inputpads[2];
557   GstPad *sinkpads[2];
558   GstEvent *event;
559   struct PadData pad_data[2];
560   guint32 eos_seen, max_linked_id;
561   GMutex *mutex;
562   GCond *cond;
563   gint i;
564   const gint NBUFFERS = 100;
565
566   mutex = g_mutex_new ();
567   cond = g_cond_new ();
568
569   pipe = gst_pipeline_new ("testbin");
570   mq = gst_element_factory_make ("multiqueue", NULL);
571   fail_unless (mq != NULL);
572   gst_bin_add (GST_BIN (pipe), mq);
573
574   /* 1 second limit */
575   g_object_set (mq,
576       "max-size-bytes", (guint) 0,
577       "max-size-buffers", (guint) 0,
578       "max-size-time", (guint64) GST_SECOND,
579       "extra-size-bytes", (guint) 0,
580       "extra-size-buffers", (guint) 0, "extra-size-time", (guint64) 0, NULL);
581
582   /* Construct 2 dummy output pads. */
583   for (i = 0; i < 2; i++) {
584     GstPad *mq_srcpad, *mq_sinkpad;
585     gchar *name;
586
587     name = g_strdup_printf ("dummysrc%d", i);
588     inputpads[i] = gst_pad_new (name, GST_PAD_SRC);
589     g_free (name);
590     gst_pad_set_getcaps_function (inputpads[i], mq_dummypad_getcaps);
591
592     mq_sinkpad = gst_element_get_request_pad (mq, "sink%d");
593     fail_unless (mq_sinkpad != NULL);
594     gst_pad_link (inputpads[i], mq_sinkpad);
595
596     gst_pad_set_active (inputpads[i], TRUE);
597
598     mq_srcpad = mq_sinkpad_to_srcpad (mq, mq_sinkpad);
599
600     name = g_strdup_printf ("dummysink%d", i);
601     sinkpads[i] = gst_pad_new (name, GST_PAD_SINK);
602     g_free (name);
603     gst_pad_set_chain_function (sinkpads[i], mq_dummypad_chain);
604     gst_pad_set_event_function (sinkpads[i], mq_dummypad_event);
605     gst_pad_set_getcaps_function (sinkpads[i], mq_dummypad_getcaps);
606
607     pad_data[i].pad_num = i;
608     pad_data[i].max_linked_id_ptr = &max_linked_id;
609     pad_data[i].eos_count_ptr = &eos_seen;
610     pad_data[i].is_linked = (i == 0) ? TRUE : FALSE;
611     pad_data[i].n_linked = 1;
612     pad_data[i].cond = cond;
613     pad_data[i].mutex = mutex;
614     pad_data[i].first_buf = TRUE;
615     gst_pad_set_element_private (sinkpads[i], pad_data + i);
616
617     gst_pad_link (mq_srcpad, sinkpads[i]);
618     gst_pad_set_active (sinkpads[i], TRUE);
619
620     gst_object_unref (mq_sinkpad);
621     gst_object_unref (mq_srcpad);
622   }
623
624   /* Run the test. Push 100 buffers through the multiqueue */
625   max_linked_id = 0;
626   eos_seen = 0;
627
628   gst_element_set_state (pipe, GST_STATE_PLAYING);
629
630   /* Push 2 new segment events */
631   event =
632       gst_event_new_new_segment (FALSE, 1.0, 1.0, GST_FORMAT_TIME, 0, -1, 0);
633   gst_pad_push_event (inputpads[0], gst_event_ref (event));
634   gst_pad_push_event (inputpads[1], event);
635
636   for (i = 0; i < NBUFFERS; i++) {
637     GstBuffer *buf;
638     GstFlowReturn ret;
639     GstClockTime ts;
640     gpointer data;
641
642     ts = gst_util_uint64_scale_int (GST_SECOND, i, 10);
643
644     buf = gst_buffer_new_and_alloc (4);
645     g_static_mutex_lock (&_check_lock);
646     fail_if (buf == NULL);
647     g_static_mutex_unlock (&_check_lock);
648
649     data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE);
650     GST_WRITE_UINT32_BE (data, i + 1);
651     gst_buffer_unmap (buf, data, 4);
652
653     GST_BUFFER_TIMESTAMP (buf) = gst_util_uint64_scale_int (GST_SECOND, i, 10);
654
655     /* If i == 0, also push the buffer to the 2nd pad */
656     if (i == 0)
657       ret = gst_pad_push (inputpads[1], gst_buffer_ref (buf));
658
659     ret = gst_pad_push (inputpads[0], buf);
660     g_static_mutex_lock (&_check_lock);
661     fail_unless (ret == GST_FLOW_OK,
662         "Push on pad %d returned %d when FLOW_OK was expected", 0, ret);
663     g_static_mutex_unlock (&_check_lock);
664
665     /* Push a new segment update on the 2nd pad */
666     event =
667         gst_event_new_new_segment (TRUE, 1.0, 1.0, GST_FORMAT_TIME, ts, -1, ts);
668     gst_pad_push_event (inputpads[1], event);
669   }
670
671   event = gst_event_new_eos ();
672   gst_pad_push_event (inputpads[0], gst_event_ref (event));
673   gst_pad_push_event (inputpads[1], event);
674
675   /* Wait while the buffers are processed */
676   g_mutex_lock (mutex);
677   /* We wait until EOS has been pushed on all pads */
678   while (eos_seen < 2) {
679     g_cond_wait (cond, mutex);
680   }
681   g_mutex_unlock (mutex);
682
683   /* Clean up */
684   for (i = 0; i < 2; i++) {
685     GstPad *mq_input = gst_pad_get_peer (inputpads[i]);
686
687     gst_pad_unlink (inputpads[i], mq_input);
688     gst_element_release_request_pad (mq, mq_input);
689     gst_object_unref (mq_input);
690     gst_object_unref (inputpads[i]);
691
692     gst_object_unref (sinkpads[i]);
693   }
694
695   gst_element_set_state (pipe, GST_STATE_NULL);
696   gst_object_unref (pipe);
697
698   g_cond_free (cond);
699   g_mutex_free (mutex);
700 }
701
702 GST_END_TEST;
703
704 static Suite *
705 multiqueue_suite (void)
706 {
707   Suite *s = suite_create ("multiqueue");
708   TCase *tc_chain = tcase_create ("general");
709
710   suite_add_tcase (s, tc_chain);
711   tcase_add_test (tc_chain, test_simple_create_destroy);
712   tcase_add_test (tc_chain, test_simple_pipeline);
713   tcase_add_test (tc_chain, test_simple_shutdown_while_running);
714
715   tcase_add_test (tc_chain, test_request_pads);
716   tcase_add_test (tc_chain, test_request_pads_named);
717
718   tcase_add_test (tc_chain, test_output_order);
719
720   tcase_add_test (tc_chain, test_sparse_stream);
721   return s;
722 }
723
724 GST_CHECK_MAIN (multiqueue)