2 * collectpads.c - GstCollectPads testsuite
3 * Copyright (C) 2006 Alessandro Decina <alessandro.d@gmail.com>
6 * Alessandro Decina <alessandro.d@gmail.com>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
28 #include <gst/check/gstcheck.h>
29 #include <gst/base/gstcollectpads.h>
31 /* dummy collectpads based element */
33 #define GST_TYPE_AGGREGATOR (gst_aggregator_get_type ())
34 #define GST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_AGGREGATOR, GstAggregator))
35 #define GST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_AGGREGATOR, GstAggregatorClass))
36 #define GST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_AGGREGATOR, GstAggregatorClass))
38 typedef struct _GstAggregator GstAggregator;
39 typedef struct _GstAggregatorClass GstAggregatorClass;
44 GstCollectPads *collect;
50 struct _GstAggregatorClass
52 GstElementClass parent_class;
55 static GType gst_aggregator_get_type (void);
57 G_DEFINE_TYPE (GstAggregator, gst_aggregator, GST_TYPE_ELEMENT);
59 static GstStaticPadTemplate gst_aggregator_src_template =
60 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
63 static GstStaticPadTemplate gst_aggregator_sink_template =
64 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST,
68 gst_agregator_collected (GstCollectPads * pads, gpointer user_data)
70 GstAggregator *aggregator = GST_AGGREGATOR (user_data);
72 GstCollectData *collect_data = NULL;
77 for (walk = pads->data; walk; walk = walk->next) {
78 GstCollectData *tmp = (GstCollectData *) walk->data;
85 /* can only happen when no pads to collect or all EOS */
86 if (collect_data == NULL)
89 outsize = gst_buffer_get_size (collect_data->buffer);
90 inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize);
94 if (aggregator->first) {
97 gst_segment_init (&segment, GST_FORMAT_BYTES);
98 gst_pad_push_event (aggregator->srcpad,
99 gst_event_new_stream_start ("test"));
100 gst_pad_push_event (aggregator->srcpad, gst_event_new_segment (&segment));
101 aggregator->first = FALSE;
104 /* just forward the first buffer */
105 GST_DEBUG_OBJECT (aggregator, "forward buffer %p", inbuf);
106 return gst_pad_push (aggregator->srcpad, inbuf);
110 GST_DEBUG_OBJECT (aggregator, "no data available, must be EOS");
111 gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
117 gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ,
118 const gchar * unused, const GstCaps * caps)
120 GstAggregator *aggregator = GST_AGGREGATOR (element);
125 if (templ->direction != GST_PAD_SINK)
129 padcount = g_atomic_int_add (&aggregator->padcount, 1);
130 name = g_strdup_printf ("sink_%u", padcount);
131 newpad = gst_pad_new_from_template (templ, name);
134 gst_collect_pads_add_pad (aggregator->collect, newpad,
135 sizeof (GstCollectData), NULL, TRUE);
137 /* takes ownership of the pad */
138 if (!gst_element_add_pad (GST_ELEMENT (aggregator), newpad))
141 GST_DEBUG_OBJECT (aggregator, "added new pad %s", GST_OBJECT_NAME (newpad));
147 GST_DEBUG_OBJECT (aggregator, "could not add pad");
148 gst_collect_pads_remove_pad (aggregator->collect, newpad);
149 gst_object_unref (newpad);
155 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
157 GstAggregator *aggregator = GST_AGGREGATOR (element);
159 if (aggregator->collect)
160 gst_collect_pads_remove_pad (aggregator->collect, pad);
161 gst_element_remove_pad (element, pad);
164 static GstStateChangeReturn
165 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
167 GstAggregator *aggregator = GST_AGGREGATOR (element);
168 GstStateChangeReturn ret;
170 switch (transition) {
171 case GST_STATE_CHANGE_NULL_TO_READY:
173 case GST_STATE_CHANGE_READY_TO_PAUSED:
174 gst_collect_pads_start (aggregator->collect);
176 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
178 case GST_STATE_CHANGE_PAUSED_TO_READY:
179 /* need to unblock the collectpads before calling the
180 * parent change_state so that streaming can finish */
181 gst_collect_pads_stop (aggregator->collect);
188 GST_ELEMENT_CLASS (gst_aggregator_parent_class)->change_state (element,
191 switch (transition) {
200 gst_aggregator_dispose (GObject * object)
202 GstAggregator *aggregator = GST_AGGREGATOR (object);
204 if (aggregator->collect) {
205 gst_object_unref (aggregator->collect);
206 aggregator->collect = NULL;
209 G_OBJECT_CLASS (gst_aggregator_parent_class)->dispose (object);
213 gst_aggregator_class_init (GstAggregatorClass * klass)
215 GObjectClass *gobject_class = (GObjectClass *) klass;
216 GstElementClass *gstelement_class = (GstElementClass *) klass;
218 gobject_class->dispose = gst_aggregator_dispose;
220 gst_element_class_add_pad_template (gstelement_class,
221 gst_static_pad_template_get (&gst_aggregator_src_template));
222 gst_element_class_add_pad_template (gstelement_class,
223 gst_static_pad_template_get (&gst_aggregator_sink_template));
224 gst_element_class_set_static_metadata (gstelement_class, "Aggregator",
225 "Testing", "Combine N buffers", "Stefan Sauer <ensonic@users.sf.net>");
227 gstelement_class->request_new_pad =
228 GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
229 gstelement_class->release_pad =
230 GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
231 gstelement_class->change_state =
232 GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
236 gst_aggregator_init (GstAggregator * agregator)
238 GstPadTemplate *template;
240 template = gst_static_pad_template_get (&gst_aggregator_src_template);
241 agregator->srcpad = gst_pad_new_from_template (template, "src");
242 gst_object_unref (template);
244 GST_PAD_SET_PROXY_CAPS (agregator->srcpad);
245 gst_element_add_pad (GST_ELEMENT (agregator), agregator->srcpad);
247 /* keep track of the sinkpads requested */
248 agregator->collect = gst_collect_pads_new ();
249 gst_collect_pads_set_function (agregator->collect,
250 GST_DEBUG_FUNCPTR (gst_agregator_collected), agregator);
252 agregator->first = TRUE;
256 gst_agregator_plugin_init (GstPlugin * plugin)
258 return gst_element_register (plugin, "aggregator", GST_RANK_NONE,
259 GST_TYPE_AGGREGATOR);
263 gst_agregator_plugin_register (void)
265 return gst_plugin_register_static (GST_VERSION_MAJOR,
269 gst_agregator_plugin_init,
270 VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);
274 #define fail_unless_collected(expected) \
276 g_mutex_lock (&lock); \
277 while (expected == TRUE && collected == FALSE) \
278 g_cond_wait (&cond, &lock); \
279 fail_unless_equals_int (collected, expected); \
280 g_mutex_unlock (&lock); \
294 GstFlowReturn expected_result;
297 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
300 GST_STATIC_CAPS_ANY);
302 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
305 GST_STATIC_CAPS_ANY);
307 static GstCollectPads *collect;
308 static gboolean collected;
309 static GstPad *agg_srcpad, *srcpad1, *srcpad2;
310 static GstPad *sinkpad1, *sinkpad2;
311 static TestData *data1, *data2;
312 static GstBuffer *outbuf1, *outbuf2;
313 static GstElement *agg;
315 gint flush_start_events, flush_stop_events;
321 collected_cb (GstCollectPads * pads, gpointer user_data)
323 outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
324 outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
326 g_mutex_lock (&lock);
328 g_cond_signal (&cond);
329 g_mutex_unlock (&lock);
335 handle_buffer_cb (GstCollectPads * pads, GstCollectData * data,
336 GstBuffer * buf, gpointer user_data)
338 GST_DEBUG ("collected buffers via callback");
340 outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
341 outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
343 g_mutex_lock (&lock);
345 g_cond_signal (&cond);
346 g_mutex_unlock (&lock);
352 push_buffer (gpointer user_data)
356 TestData *test_data = (TestData *) user_data;
359 gst_pad_push_event (test_data->pad, gst_event_new_stream_start ("test"));
361 caps = gst_caps_new_empty_simple ("foo/x-bar");
362 gst_pad_push_event (test_data->pad, gst_event_new_caps (caps));
363 gst_caps_unref (caps);
365 gst_segment_init (&segment, GST_FORMAT_TIME);
366 gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment));
368 flow = gst_pad_push (test_data->pad, test_data->buffer);
369 fail_unless (flow == test_data->expected_result, "got flow %s instead of OK",
370 gst_flow_get_name (flow));
376 push_event (gpointer user_data)
378 TestData *test_data = (TestData *) user_data;
380 fail_unless (gst_pad_push_event (test_data->pad, test_data->event) == TRUE);
388 collect = gst_collect_pads_new ();
390 srcpad1 = gst_pad_new_from_static_template (&srctemplate, "src1");
391 srcpad2 = gst_pad_new_from_static_template (&srctemplate, "src2");
392 sinkpad1 = gst_pad_new_from_static_template (&sinktemplate, "sink1");
393 sinkpad2 = gst_pad_new_from_static_template (&sinktemplate, "sink2");
394 fail_unless (gst_pad_link (srcpad1, sinkpad1) == GST_PAD_LINK_OK);
395 fail_unless (gst_pad_link (srcpad2, sinkpad2) == GST_PAD_LINK_OK);
397 gst_pad_set_active (sinkpad1, TRUE);
398 gst_pad_set_active (sinkpad2, TRUE);
399 gst_pad_set_active (srcpad1, TRUE);
400 gst_pad_set_active (srcpad2, TRUE);
413 gst_collect_pads_set_function (collect, collected_cb, NULL);
417 setup_buffer_cb (void)
420 gst_collect_pads_set_buffer_function (collect, handle_buffer_cb, NULL);
426 gst_object_unref (sinkpad1);
427 gst_object_unref (sinkpad2);
428 gst_object_unref (collect);
431 GST_START_TEST (test_pad_add_remove)
433 ASSERT_CRITICAL (gst_collect_pads_add_pad (collect, sinkpad1,
434 sizeof (BadCollectData), NULL, TRUE));
436 data1 = (TestData *) gst_collect_pads_add_pad (collect,
437 sinkpad1, sizeof (TestData), NULL, TRUE);
438 fail_unless (data1 != NULL);
440 fail_unless (gst_collect_pads_remove_pad (collect, sinkpad2) == FALSE);
441 fail_unless (gst_collect_pads_remove_pad (collect, sinkpad1) == TRUE);
446 GST_START_TEST (test_collect)
448 GstBuffer *buf1, *buf2;
449 GThread *thread1, *thread2;
451 data1 = (TestData *) gst_collect_pads_add_pad (collect,
452 sinkpad1, sizeof (TestData), NULL, TRUE);
453 fail_unless (data1 != NULL);
455 data2 = (TestData *) gst_collect_pads_add_pad (collect,
456 sinkpad2, sizeof (TestData), NULL, TRUE);
457 fail_unless (data2 != NULL);
459 buf1 = gst_buffer_new ();
460 buf2 = gst_buffer_new ();
462 /* start collect pads */
463 gst_collect_pads_start (collect);
465 /* push buffers on the pads */
466 data1->pad = srcpad1;
467 data1->buffer = buf1;
468 thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
469 /* here thread1 is blocked and srcpad1 has a queued buffer */
470 fail_unless_collected (FALSE);
472 data2->pad = srcpad2;
473 data2->buffer = buf2;
474 thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
476 /* now both pads have a buffer */
477 fail_unless_collected (TRUE);
479 fail_unless (outbuf1 == buf1);
480 fail_unless (outbuf2 == buf2);
482 /* these will return immediately as at this point the threads have been
483 * unlocked and are finished */
484 g_thread_join (thread1);
485 g_thread_join (thread2);
487 gst_collect_pads_stop (collect);
489 gst_buffer_unref (buf1);
490 gst_buffer_unref (buf2);
496 GST_START_TEST (test_collect_eos)
499 GThread *thread1, *thread2;
501 data1 = (TestData *) gst_collect_pads_add_pad (collect,
502 sinkpad1, sizeof (TestData), NULL, TRUE);
503 fail_unless (data1 != NULL);
505 data2 = (TestData *) gst_collect_pads_add_pad (collect,
506 sinkpad2, sizeof (TestData), NULL, TRUE);
507 fail_unless (data2 != NULL);
509 buf1 = gst_buffer_new ();
511 /* start collect pads */
512 gst_collect_pads_start (collect);
514 /* push a buffer on srcpad1 and EOS on srcpad2 */
515 data1->pad = srcpad1;
516 data1->buffer = buf1;
517 thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
518 /* here thread1 is blocked and srcpad1 has a queued buffer */
519 fail_unless_collected (FALSE);
521 data2->pad = srcpad2;
522 data2->event = gst_event_new_eos ();
523 thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
524 /* now sinkpad1 has a buffer and sinkpad2 has EOS */
525 fail_unless_collected (TRUE);
527 fail_unless (outbuf1 == buf1);
528 /* sinkpad2 has EOS so a NULL buffer is returned */
529 fail_unless (outbuf2 == NULL);
531 /* these will return immediately as when the data is popped the threads are
532 * unlocked and will terminate */
533 g_thread_join (thread1);
534 g_thread_join (thread2);
536 gst_collect_pads_stop (collect);
538 gst_buffer_unref (buf1);
543 GST_START_TEST (test_collect_twice)
545 GstBuffer *buf1, *buf2;
546 GThread *thread1, *thread2;
548 data1 = (TestData *) gst_collect_pads_add_pad (collect,
549 sinkpad1, sizeof (TestData), NULL, TRUE);
550 fail_unless (data1 != NULL);
552 data2 = (TestData *) gst_collect_pads_add_pad (collect,
553 sinkpad2, sizeof (TestData), NULL, TRUE);
554 fail_unless (data2 != NULL);
556 GST_INFO ("round 1");
558 buf1 = gst_buffer_new ();
560 /* start collect pads */
561 gst_collect_pads_start (collect);
564 data1->pad = srcpad1;
565 data1->buffer = buf1;
566 thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
567 /* here thread1 is blocked and srcpad1 has a queued buffer */
568 fail_unless_collected (FALSE);
570 /* push EOS on the other pad */
571 data2->pad = srcpad2;
572 data2->event = gst_event_new_eos ();
573 thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
575 /* one of the pads has a buffer, the other has EOS */
576 fail_unless_collected (TRUE);
578 fail_unless (outbuf1 == buf1);
579 /* there's nothing to pop from the one which received EOS */
580 fail_unless (outbuf2 == NULL);
582 /* these will return immediately as at this point the threads have been
583 * unlocked and are finished */
584 g_thread_join (thread1);
585 g_thread_join (thread2);
587 gst_collect_pads_stop (collect);
590 GST_INFO ("round 2");
592 buf2 = gst_buffer_new ();
594 /* clear EOS from pads */
595 gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE));
596 gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
598 /* start collect pads */
599 gst_collect_pads_start (collect);
601 /* push buffers on the pads */
602 data1->pad = srcpad1;
603 data1->buffer = buf1;
604 thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
605 /* here thread1 is blocked and srcpad1 has a queued buffer */
606 fail_unless_collected (FALSE);
608 data2->pad = srcpad2;
609 data2->buffer = buf2;
610 thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
612 /* now both pads have a buffer */
613 fail_unless_collected (TRUE);
615 /* these will return immediately as at this point the threads have been
616 * unlocked and are finished */
617 g_thread_join (thread1);
618 g_thread_join (thread2);
620 gst_collect_pads_stop (collect);
622 gst_buffer_unref (buf1);
623 gst_buffer_unref (buf2);
630 /* Test the default collected buffer func */
631 GST_START_TEST (test_collect_default)
633 GstBuffer *buf1, *buf2;
634 GThread *thread1, *thread2;
636 data1 = (TestData *) gst_collect_pads_add_pad (collect,
637 sinkpad1, sizeof (TestData), NULL, TRUE);
638 fail_unless (data1 != NULL);
640 data2 = (TestData *) gst_collect_pads_add_pad (collect,
641 sinkpad2, sizeof (TestData), NULL, TRUE);
642 fail_unless (data2 != NULL);
644 buf1 = gst_buffer_new ();
645 GST_BUFFER_TIMESTAMP (buf1) = 0;
646 buf2 = gst_buffer_new ();
647 GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
649 /* start collect pads */
650 gst_collect_pads_start (collect);
652 /* push buffers on the pads */
653 data1->pad = srcpad1;
654 data1->buffer = buf1;
655 thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
656 /* here thread1 is blocked and srcpad1 has a queued buffer */
657 fail_unless_collected (FALSE);
659 data2->pad = srcpad2;
660 data2->buffer = buf2;
661 thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
663 /* now both pads have a buffer */
664 fail_unless_collected (TRUE);
666 /* The default callback should have popped the buffer with lower timestamp,
667 * and this should therefore be NULL: */
668 fail_unless (outbuf1 == NULL);
669 /* While this one should still be pending: */
670 fail_unless (outbuf2 == buf2);
672 /* these will return immediately as at this point the threads have been
673 * unlocked and are finished */
674 g_thread_join (thread1);
675 g_thread_join (thread2);
677 gst_collect_pads_stop (collect);
679 gst_buffer_unref (buf1);
680 gst_buffer_unref (buf2);
686 #define NUM_BUFFERS 3
688 handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
693 /* Test a linear pipeline using aggregator */
694 GST_START_TEST (test_linear_pipeline)
696 GstElement *pipeline, *src, *agg, *sink;
701 pipeline = gst_pipeline_new ("pipeline");
702 src = gst_check_setup_element ("fakesrc");
703 g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
705 agg = gst_check_setup_element ("aggregator");
706 sink = gst_check_setup_element ("fakesink");
707 g_object_set (sink, "signal-handoffs", TRUE, NULL);
708 g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
710 fail_unless (gst_bin_add (GST_BIN (pipeline), src));
711 fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
712 fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
713 fail_unless (gst_element_link (src, agg));
714 fail_unless (gst_element_link (agg, sink));
716 bus = gst_element_get_bus (pipeline);
717 fail_if (bus == NULL);
718 gst_element_set_state (pipeline, GST_STATE_PLAYING);
720 msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
721 fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
722 gst_message_unref (msg);
724 fail_unless_equals_int (count, NUM_BUFFERS);
726 gst_element_set_state (pipeline, GST_STATE_NULL);
727 gst_object_unref (bus);
728 gst_object_unref (pipeline);
733 /* Test a linear pipeline using aggregator */
734 GST_START_TEST (test_branched_pipeline)
736 GstElement *pipeline, *src, *tee, *queue[2], *agg, *sink;
741 pipeline = gst_pipeline_new ("pipeline");
742 src = gst_check_setup_element ("fakesrc");
743 g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
745 tee = gst_check_setup_element ("tee");
746 queue[0] = gst_check_setup_element ("queue");
747 gst_object_set_name (GST_OBJECT (queue[0]), "queue0");
748 queue[1] = gst_check_setup_element ("queue");
749 gst_object_set_name (GST_OBJECT (queue[1]), "queue1");
750 agg = gst_check_setup_element ("aggregator");
751 sink = gst_check_setup_element ("fakesink");
752 g_object_set (sink, "signal-handoffs", TRUE, NULL);
753 g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
755 fail_unless (gst_bin_add (GST_BIN (pipeline), src));
756 fail_unless (gst_bin_add (GST_BIN (pipeline), tee));
757 fail_unless (gst_bin_add (GST_BIN (pipeline), queue[0]));
758 fail_unless (gst_bin_add (GST_BIN (pipeline), queue[1]));
759 fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
760 fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
761 fail_unless (gst_element_link (src, tee));
762 fail_unless (gst_element_link (tee, queue[0]));
763 fail_unless (gst_element_link (tee, queue[1]));
764 fail_unless (gst_element_link (queue[0], agg));
765 fail_unless (gst_element_link (queue[1], agg));
766 fail_unless (gst_element_link (agg, sink));
768 bus = gst_element_get_bus (pipeline);
769 fail_if (bus == NULL);
770 gst_element_set_state (pipeline, GST_STATE_PLAYING);
772 msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
773 fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
774 gst_message_unref (msg);
776 /* we have two branches, but we still only forward buffers from one branch */
777 fail_unless_equals_int (count, NUM_BUFFERS * 2);
779 gst_element_set_state (pipeline, GST_STATE_NULL);
780 gst_object_unref (bus);
781 gst_object_unref (pipeline);
786 static GstPadProbeReturn
787 downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
789 if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
790 if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
791 GST_EVENT_FLUSH_START)
792 g_atomic_int_inc (&flush_start_events);
793 else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
794 GST_EVENT_FLUSH_STOP)
795 g_atomic_int_inc (&flush_stop_events);
796 } else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) {
797 g_mutex_lock (&lock);
799 g_cond_signal (&cond);
800 g_mutex_unlock (&lock);
803 return GST_PAD_PROBE_DROP;
807 src_event (GstPad * pad, GstObject * parent, GstEvent * event)
810 if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
811 if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) {
816 gst_event_unref (event);
821 agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
823 return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect,
828 setup_src_pad (GstElement * element,
829 GstStaticPadTemplate * tmpl, const char *name)
831 GstPad *srcpad, *sinkpad;
833 srcpad = gst_pad_new_from_static_template (tmpl, "src");
834 sinkpad = gst_element_get_request_pad (element, name);
835 fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
836 "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
837 gst_pad_set_event_function (srcpad, src_event);
838 gst_pad_set_active (srcpad, TRUE);
839 gst_object_unref (sinkpad);
847 agg = gst_check_setup_element ("aggregator");
848 agg_srcpad = gst_element_get_static_pad (agg, "src");
849 srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0");
850 srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1");
851 gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
852 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
853 GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL);
854 gst_pad_set_event_function (agg_srcpad, agg_src_event);
855 data1 = g_new0 (TestData, 1);
856 data2 = g_new0 (TestData, 1);
857 g_atomic_int_set (&flush_start_events, 0);
858 g_atomic_int_set (&flush_stop_events, 0);
859 gst_element_set_state (agg, GST_STATE_PLAYING);
863 flush_teardown (void)
865 gst_element_set_state (agg, GST_STATE_NULL);
866 gst_object_unref (agg);
867 gst_object_unref (agg_srcpad);
868 gst_object_unref (srcpad1);
869 gst_object_unref (srcpad2);
874 GST_START_TEST (test_flushing_seek_failure)
876 GstBuffer *buf1, *buf2;
877 GThread *thread1, *thread2;
880 /* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream
881 * element failing to handle the seek (see src_event()). Check that the
882 * flushing seek logic doesn't get triggered by checking that the buffer
883 * queued on agg:sink_1 doesn't get flushed.
886 /* queue a buffer in agg:sink_1 */
887 buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
888 GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
889 data2->pad = srcpad2;
890 data2->buffer = buf2;
891 thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
892 fail_unless_collected (FALSE);
895 event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
896 GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
897 g_atomic_int_set (&fail_seek, TRUE);
898 fail_if (gst_pad_send_event (agg_srcpad, event));
900 /* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */
901 fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
902 fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
904 /* check that the flush events reached agg:src */
905 fail_unless_equals_int (flush_start_events, 1);
906 fail_unless_equals_int (flush_stop_events, 1);
908 /* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1
909 * should not have been flushed at this point */
910 buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
911 GST_BUFFER_TIMESTAMP (buf1) = 0;
912 data1->pad = srcpad1;
913 data1->buffer = buf1;
914 thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
915 fail_unless_collected (TRUE);
918 /* at this point thread1 must have returned */
919 g_thread_join (thread1);
921 /* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and
922 * the pushing thread returns */
923 data1->pad = srcpad1;
924 data1->event = gst_event_new_eos ();
925 thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL);
926 fail_unless_collected (TRUE);
928 g_thread_join (thread1);
929 g_thread_join (thread2);
934 GST_START_TEST (test_flushing_seek)
936 GstBuffer *buf1, *buf2;
937 GThread *thread1, *thread2;
940 /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
941 * new flushing seek logic is triggered. On the first FLUSH_START call the
942 * buffers queued in collectpads should get flushed. Only one FLUSH_START and
943 * one FLUSH_STOP should be forwarded downstream.
945 buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
946 GST_BUFFER_TIMESTAMP (buf2) = 0;
947 data2->pad = srcpad2;
948 data2->buffer = buf2;
949 /* expect this buffer to be flushed */
950 data2->expected_result = GST_FLOW_FLUSHING;
951 thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
953 /* now do a successful flushing seek */
954 event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
955 GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
956 g_atomic_int_set (&fail_seek, FALSE);
957 fail_unless (gst_pad_send_event (agg_srcpad, event));
959 /* flushing starts once one of the upstream elements sends the first
961 fail_unless_equals_int (flush_start_events, 0);
962 fail_unless_equals_int (flush_stop_events, 0);
964 /* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends
965 * FLUSH_START downstream */
966 fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
967 fail_unless_equals_int (flush_start_events, 1);
968 fail_unless_equals_int (flush_stop_events, 0);
969 /* the first FLUSH_STOP is forwarded downstream */
970 fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
971 fail_unless_equals_int (flush_start_events, 1);
972 fail_unless_equals_int (flush_stop_events, 1);
973 /* at this point even the other pad agg:sink_1 should be flushing so thread2
974 * should have stopped */
975 g_thread_join (thread2);
977 /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
978 * that flushing completes once all the pads have been flushed */
979 buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
980 GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND;
981 data1->pad = srcpad1;
982 data1->buffer = buf1;
983 thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
985 /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
987 gst_pad_push_event (srcpad2, gst_event_new_flush_start ());
988 gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
990 /* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */
991 fail_unless_equals_int (flush_start_events, 1);
992 fail_unless_equals_int (flush_stop_events, 1);
994 /* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */
995 data2->pad = srcpad2;
996 data2->event = gst_event_new_eos ();
997 thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
998 fail_unless_collected (TRUE);
1000 /* these will return immediately as at this point the threads have been
1001 * unlocked and are finished */
1002 g_thread_join (thread1);
1003 g_thread_join (thread2);
1009 gst_collect_pads_suite (void)
1012 TCase *general, *buffers, *pipeline, *flush;
1014 gst_agregator_plugin_register ();
1016 suite = suite_create ("GstCollectPads");
1018 general = tcase_create ("general");
1019 suite_add_tcase (suite, general);
1020 tcase_add_checked_fixture (general, setup, teardown);
1021 tcase_add_test (general, test_pad_add_remove);
1022 tcase_add_test (general, test_collect);
1023 tcase_add_test (general, test_collect_eos);
1024 tcase_add_test (general, test_collect_twice);
1026 buffers = tcase_create ("buffers");
1027 suite_add_tcase (suite, buffers);
1028 tcase_add_checked_fixture (buffers, setup_buffer_cb, teardown);
1029 tcase_add_test (buffers, test_collect_default);
1031 pipeline = tcase_create ("pipeline");
1032 suite_add_tcase (suite, pipeline);
1033 tcase_add_test (pipeline, test_linear_pipeline);
1034 tcase_add_test (pipeline, test_branched_pipeline);
1036 flush = tcase_create ("flush");
1037 suite_add_tcase (suite, flush);
1038 tcase_add_checked_fixture (flush, flush_setup, flush_teardown);
1039 tcase_add_test (flush, test_flushing_seek_failure);
1040 tcase_add_test (flush, test_flushing_seek);
1045 GST_CHECK_MAIN (gst_collect_pads);