clock: Add signed time utilities
[platform/upstream/gstreamer.git] / tests / check / libs / collectpads.c
1 /*
2  * collectpads.c - GstCollectPads testsuite
3  * Copyright (C) 2006 Alessandro Decina <alessandro.d@gmail.com>
4  *
5  * Authors:
6  *   Alessandro Decina <alessandro.d@gmail.com>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 #ifdef HAVE_CONFIG_H
25 #  include "config.h"
26 #endif
27
28 #include <gst/check/gstcheck.h>
29 #include <gst/base/gstcollectpads.h>
30
31 /* dummy collectpads based element */
32
33 #define GST_TYPE_AGGREGATOR            (gst_aggregator_get_type ())
34 #define GST_AGGREGATOR(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_AGGREGATOR, GstAggregator))
35 #define GST_AGGREGATOR_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_AGGREGATOR, GstAggregatorClass))
36 #define GST_AGGREGATOR_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_AGGREGATOR, GstAggregatorClass))
37
38 typedef struct _GstAggregator GstAggregator;
39 typedef struct _GstAggregatorClass GstAggregatorClass;
40
41 struct _GstAggregator
42 {
43   GstElement parent;
44   GstCollectPads *collect;
45   GstPad *srcpad;
46   GstPad *sinkpad[2];
47   gint padcount;
48   gboolean first;
49 };
50 struct _GstAggregatorClass
51 {
52   GstElementClass parent_class;
53 };
54
55 static GType gst_aggregator_get_type (void);
56
57 G_DEFINE_TYPE (GstAggregator, gst_aggregator, GST_TYPE_ELEMENT);
58
59 static GstStaticPadTemplate gst_aggregator_src_template =
60 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
61     GST_STATIC_CAPS_ANY);
62
63 static GstStaticPadTemplate gst_aggregator_sink_template =
64 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST,
65     GST_STATIC_CAPS_ANY);
66
67 static GstFlowReturn
68 gst_agregator_collected (GstCollectPads * pads, gpointer user_data)
69 {
70   GstAggregator *aggregator = GST_AGGREGATOR (user_data);
71   GstBuffer *inbuf;
72   GstCollectData *collect_data = NULL;
73   guint outsize = 0;
74   GSList *walk;
75
76   walk = pads->data;
77   for (walk = pads->data; walk; walk = walk->next) {
78     GstCollectData *tmp = (GstCollectData *) walk->data;
79     if (tmp->buffer) {
80       collect_data = tmp;
81       break;
82     }
83   }
84
85   /* can only happen when no pads to collect or all EOS */
86   if (collect_data == NULL)
87     goto eos;
88
89   outsize = gst_buffer_get_size (collect_data->buffer);
90   inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize);
91   if (!inbuf)
92     goto eos;
93
94   if (aggregator->first) {
95     GstSegment segment;
96
97     gst_segment_init (&segment, GST_FORMAT_BYTES);
98     gst_pad_push_event (aggregator->srcpad,
99         gst_event_new_stream_start ("test"));
100     gst_pad_push_event (aggregator->srcpad, gst_event_new_segment (&segment));
101     aggregator->first = FALSE;
102   }
103
104   /* just forward the first buffer */
105   GST_DEBUG_OBJECT (aggregator, "forward buffer %p", inbuf);
106   return gst_pad_push (aggregator->srcpad, inbuf);
107   /* ERRORS */
108 eos:
109   {
110     GST_DEBUG_OBJECT (aggregator, "no data available, must be EOS");
111     gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
112     return GST_FLOW_EOS;
113   }
114 }
115
116 static GstPad *
117 gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ,
118     const gchar * unused, const GstCaps * caps)
119 {
120   GstAggregator *aggregator = GST_AGGREGATOR (element);
121   gchar *name;
122   GstPad *newpad;
123   gint padcount;
124
125   if (templ->direction != GST_PAD_SINK)
126     return NULL;
127
128   /* create new pad */
129   padcount = g_atomic_int_add (&aggregator->padcount, 1);
130   name = g_strdup_printf ("sink_%u", padcount);
131   newpad = gst_pad_new_from_template (templ, name);
132   g_free (name);
133
134   gst_collect_pads_add_pad (aggregator->collect, newpad,
135       sizeof (GstCollectData), NULL, TRUE);
136
137   /* takes ownership of the pad */
138   if (!gst_element_add_pad (GST_ELEMENT (aggregator), newpad))
139     goto could_not_add;
140
141   GST_DEBUG_OBJECT (aggregator, "added new pad %s", GST_OBJECT_NAME (newpad));
142   return newpad;
143
144   /* errors */
145 could_not_add:
146   {
147     GST_DEBUG_OBJECT (aggregator, "could not add pad");
148     gst_collect_pads_remove_pad (aggregator->collect, newpad);
149     gst_object_unref (newpad);
150     return NULL;
151   }
152 }
153
154 static void
155 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
156 {
157   GstAggregator *aggregator = GST_AGGREGATOR (element);
158
159   if (aggregator->collect)
160     gst_collect_pads_remove_pad (aggregator->collect, pad);
161   gst_element_remove_pad (element, pad);
162 }
163
164 static GstStateChangeReturn
165 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
166 {
167   GstAggregator *aggregator = GST_AGGREGATOR (element);
168   GstStateChangeReturn ret;
169
170   switch (transition) {
171     case GST_STATE_CHANGE_NULL_TO_READY:
172       break;
173     case GST_STATE_CHANGE_READY_TO_PAUSED:
174       gst_collect_pads_start (aggregator->collect);
175       break;
176     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
177       break;
178     case GST_STATE_CHANGE_PAUSED_TO_READY:
179       /* need to unblock the collectpads before calling the
180        * parent change_state so that streaming can finish */
181       gst_collect_pads_stop (aggregator->collect);
182       break;
183     default:
184       break;
185   }
186
187   ret =
188       GST_ELEMENT_CLASS (gst_aggregator_parent_class)->change_state (element,
189       transition);
190
191   switch (transition) {
192     default:
193       break;
194   }
195
196   return ret;
197 }
198
199 static void
200 gst_aggregator_dispose (GObject * object)
201 {
202   GstAggregator *aggregator = GST_AGGREGATOR (object);
203
204   if (aggregator->collect) {
205     gst_object_unref (aggregator->collect);
206     aggregator->collect = NULL;
207   }
208
209   G_OBJECT_CLASS (gst_aggregator_parent_class)->dispose (object);
210 }
211
212 static void
213 gst_aggregator_class_init (GstAggregatorClass * klass)
214 {
215   GObjectClass *gobject_class = (GObjectClass *) klass;
216   GstElementClass *gstelement_class = (GstElementClass *) klass;
217
218   gobject_class->dispose = gst_aggregator_dispose;
219
220   gst_element_class_add_pad_template (gstelement_class,
221       gst_static_pad_template_get (&gst_aggregator_src_template));
222   gst_element_class_add_pad_template (gstelement_class,
223       gst_static_pad_template_get (&gst_aggregator_sink_template));
224   gst_element_class_set_static_metadata (gstelement_class, "Aggregator",
225       "Testing", "Combine N buffers", "Stefan Sauer <ensonic@users.sf.net>");
226
227   gstelement_class->request_new_pad =
228       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
229   gstelement_class->release_pad =
230       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
231   gstelement_class->change_state =
232       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
233 }
234
235 static void
236 gst_aggregator_init (GstAggregator * agregator)
237 {
238   GstPadTemplate *template;
239
240   template = gst_static_pad_template_get (&gst_aggregator_src_template);
241   agregator->srcpad = gst_pad_new_from_template (template, "src");
242   gst_object_unref (template);
243
244   GST_PAD_SET_PROXY_CAPS (agregator->srcpad);
245   gst_element_add_pad (GST_ELEMENT (agregator), agregator->srcpad);
246
247   /* keep track of the sinkpads requested */
248   agregator->collect = gst_collect_pads_new ();
249   gst_collect_pads_set_function (agregator->collect,
250       GST_DEBUG_FUNCPTR (gst_agregator_collected), agregator);
251
252   agregator->first = TRUE;
253 }
254
255 static gboolean
256 gst_agregator_plugin_init (GstPlugin * plugin)
257 {
258   return gst_element_register (plugin, "aggregator", GST_RANK_NONE,
259       GST_TYPE_AGGREGATOR);
260 }
261
262 static gboolean
263 gst_agregator_plugin_register (void)
264 {
265   return gst_plugin_register_static (GST_VERSION_MAJOR,
266       GST_VERSION_MINOR,
267       "aggregator",
268       "Combine buffers",
269       gst_agregator_plugin_init,
270       VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);
271 }
272
273
274 #define fail_unless_collected(expected)           \
275 G_STMT_START {                                    \
276   g_mutex_lock (&lock);                           \
277   while (expected == TRUE && collected == FALSE)  \
278     g_cond_wait (&cond, &lock);                   \
279   fail_unless_equals_int (collected, expected);   \
280   g_mutex_unlock (&lock);                         \
281 } G_STMT_END;
282
283 typedef struct
284 {
285   char foo;
286 } BadCollectData;
287
288 typedef struct
289 {
290   GstCollectData data;
291   GstPad *pad;
292   GstBuffer *buffer;
293   GstEvent *event;
294   GstFlowReturn expected_result;
295 } TestData;
296
297 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
298     GST_PAD_SRC,
299     GST_PAD_ALWAYS,
300     GST_STATIC_CAPS_ANY);
301
302 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
303     GST_PAD_SINK,
304     GST_PAD_ALWAYS,
305     GST_STATIC_CAPS_ANY);
306
307 static GstCollectPads *collect;
308 static gboolean collected;
309 static GstPad *agg_srcpad, *srcpad1, *srcpad2;
310 static GstPad *sinkpad1, *sinkpad2;
311 static TestData *data1, *data2;
312 static GstBuffer *outbuf1, *outbuf2;
313 static GstElement *agg;
314 gboolean fail_seek;
315 gint flush_start_events, flush_stop_events;
316
317 static GMutex lock;
318 static GCond cond;
319
320 static GstFlowReturn
321 collected_cb (GstCollectPads * pads, gpointer user_data)
322 {
323   outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
324   outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
325
326   g_mutex_lock (&lock);
327   collected = TRUE;
328   g_cond_signal (&cond);
329   g_mutex_unlock (&lock);
330
331   return GST_FLOW_OK;
332 }
333
334 static GstFlowReturn
335 handle_buffer_cb (GstCollectPads * pads, GstCollectData * data,
336     GstBuffer * buf, gpointer user_data)
337 {
338   GST_DEBUG ("collected buffers via callback");
339
340   outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
341   outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
342
343   g_mutex_lock (&lock);
344   collected = TRUE;
345   g_cond_signal (&cond);
346   g_mutex_unlock (&lock);
347
348   return GST_FLOW_OK;
349 }
350
351 static gpointer
352 push_buffer (gpointer user_data)
353 {
354   GstFlowReturn flow;
355   GstCaps *caps;
356   TestData *test_data = (TestData *) user_data;
357   GstSegment segment;
358
359   gst_pad_push_event (test_data->pad, gst_event_new_stream_start ("test"));
360
361   caps = gst_caps_new_empty_simple ("foo/x-bar");
362   gst_pad_push_event (test_data->pad, gst_event_new_caps (caps));
363   gst_caps_unref (caps);
364
365   gst_segment_init (&segment, GST_FORMAT_TIME);
366   gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment));
367
368   flow = gst_pad_push (test_data->pad, test_data->buffer);
369   fail_unless (flow == test_data->expected_result, "got flow %s instead of OK",
370       gst_flow_get_name (flow));
371
372   return NULL;
373 }
374
375 static gpointer
376 push_event (gpointer user_data)
377 {
378   TestData *test_data = (TestData *) user_data;
379
380   fail_unless (gst_pad_push_event (test_data->pad, test_data->event) == TRUE);
381
382   return NULL;
383 }
384
385 static void
386 setup_default (void)
387 {
388   collect = gst_collect_pads_new ();
389
390   srcpad1 = gst_pad_new_from_static_template (&srctemplate, "src1");
391   srcpad2 = gst_pad_new_from_static_template (&srctemplate, "src2");
392   sinkpad1 = gst_pad_new_from_static_template (&sinktemplate, "sink1");
393   sinkpad2 = gst_pad_new_from_static_template (&sinktemplate, "sink2");
394   fail_unless (gst_pad_link (srcpad1, sinkpad1) == GST_PAD_LINK_OK);
395   fail_unless (gst_pad_link (srcpad2, sinkpad2) == GST_PAD_LINK_OK);
396
397   gst_pad_set_active (sinkpad1, TRUE);
398   gst_pad_set_active (sinkpad2, TRUE);
399   gst_pad_set_active (srcpad1, TRUE);
400   gst_pad_set_active (srcpad2, TRUE);
401
402   data1 = NULL;
403   data2 = NULL;
404   outbuf1 = NULL;
405   outbuf2 = NULL;
406   collected = FALSE;
407 }
408
409 static void
410 setup (void)
411 {
412   setup_default ();
413   gst_collect_pads_set_function (collect, collected_cb, NULL);
414 }
415
416 static void
417 setup_buffer_cb (void)
418 {
419   setup_default ();
420   gst_collect_pads_set_buffer_function (collect, handle_buffer_cb, NULL);
421 }
422
423 static void
424 teardown (void)
425 {
426   gst_object_unref (sinkpad1);
427   gst_object_unref (sinkpad2);
428   gst_object_unref (collect);
429 }
430
431 GST_START_TEST (test_pad_add_remove)
432 {
433   ASSERT_CRITICAL (gst_collect_pads_add_pad (collect, sinkpad1,
434           sizeof (BadCollectData), NULL, TRUE));
435
436   data1 = (TestData *) gst_collect_pads_add_pad (collect,
437       sinkpad1, sizeof (TestData), NULL, TRUE);
438   fail_unless (data1 != NULL);
439
440   fail_unless (gst_collect_pads_remove_pad (collect, sinkpad2) == FALSE);
441   fail_unless (gst_collect_pads_remove_pad (collect, sinkpad1) == TRUE);
442 }
443
444 GST_END_TEST;
445
446 GST_START_TEST (test_collect)
447 {
448   GstBuffer *buf1, *buf2;
449   GThread *thread1, *thread2;
450
451   data1 = (TestData *) gst_collect_pads_add_pad (collect,
452       sinkpad1, sizeof (TestData), NULL, TRUE);
453   fail_unless (data1 != NULL);
454
455   data2 = (TestData *) gst_collect_pads_add_pad (collect,
456       sinkpad2, sizeof (TestData), NULL, TRUE);
457   fail_unless (data2 != NULL);
458
459   buf1 = gst_buffer_new ();
460   buf2 = gst_buffer_new ();
461
462   /* start collect pads */
463   gst_collect_pads_start (collect);
464
465   /* push buffers on the pads */
466   data1->pad = srcpad1;
467   data1->buffer = buf1;
468   thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
469   /* here thread1 is blocked and srcpad1 has a queued buffer */
470   fail_unless_collected (FALSE);
471
472   data2->pad = srcpad2;
473   data2->buffer = buf2;
474   thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
475
476   /* now both pads have a buffer */
477   fail_unless_collected (TRUE);
478
479   fail_unless (outbuf1 == buf1);
480   fail_unless (outbuf2 == buf2);
481
482   /* these will return immediately as at this point the threads have been
483    * unlocked and are finished */
484   g_thread_join (thread1);
485   g_thread_join (thread2);
486
487   gst_collect_pads_stop (collect);
488
489   gst_buffer_unref (buf1);
490   gst_buffer_unref (buf2);
491 }
492
493 GST_END_TEST;
494
495
496 GST_START_TEST (test_collect_eos)
497 {
498   GstBuffer *buf1;
499   GThread *thread1, *thread2;
500
501   data1 = (TestData *) gst_collect_pads_add_pad (collect,
502       sinkpad1, sizeof (TestData), NULL, TRUE);
503   fail_unless (data1 != NULL);
504
505   data2 = (TestData *) gst_collect_pads_add_pad (collect,
506       sinkpad2, sizeof (TestData), NULL, TRUE);
507   fail_unless (data2 != NULL);
508
509   buf1 = gst_buffer_new ();
510
511   /* start collect pads */
512   gst_collect_pads_start (collect);
513
514   /* push a buffer on srcpad1 and EOS on srcpad2 */
515   data1->pad = srcpad1;
516   data1->buffer = buf1;
517   thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
518   /* here thread1 is blocked and srcpad1 has a queued buffer */
519   fail_unless_collected (FALSE);
520
521   data2->pad = srcpad2;
522   data2->event = gst_event_new_eos ();
523   thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
524   /* now sinkpad1 has a buffer and sinkpad2 has EOS */
525   fail_unless_collected (TRUE);
526
527   fail_unless (outbuf1 == buf1);
528   /* sinkpad2 has EOS so a NULL buffer is returned */
529   fail_unless (outbuf2 == NULL);
530
531   /* these will return immediately as when the data is popped the threads are
532    * unlocked and will terminate */
533   g_thread_join (thread1);
534   g_thread_join (thread2);
535
536   gst_collect_pads_stop (collect);
537
538   gst_buffer_unref (buf1);
539 }
540
541 GST_END_TEST;
542
543 GST_START_TEST (test_collect_twice)
544 {
545   GstBuffer *buf1, *buf2;
546   GThread *thread1, *thread2;
547
548   data1 = (TestData *) gst_collect_pads_add_pad (collect,
549       sinkpad1, sizeof (TestData), NULL, TRUE);
550   fail_unless (data1 != NULL);
551
552   data2 = (TestData *) gst_collect_pads_add_pad (collect,
553       sinkpad2, sizeof (TestData), NULL, TRUE);
554   fail_unless (data2 != NULL);
555
556   GST_INFO ("round 1");
557
558   buf1 = gst_buffer_new ();
559
560   /* start collect pads */
561   gst_collect_pads_start (collect);
562
563   /* queue a buffer */
564   data1->pad = srcpad1;
565   data1->buffer = buf1;
566   thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
567   /* here thread1 is blocked and srcpad1 has a queued buffer */
568   fail_unless_collected (FALSE);
569
570   /* push EOS on the other pad */
571   data2->pad = srcpad2;
572   data2->event = gst_event_new_eos ();
573   thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
574
575   /* one of the pads has a buffer, the other has EOS */
576   fail_unless_collected (TRUE);
577
578   fail_unless (outbuf1 == buf1);
579   /* there's nothing to pop from the one which received EOS */
580   fail_unless (outbuf2 == NULL);
581
582   /* these will return immediately as at this point the threads have been
583    * unlocked and are finished */
584   g_thread_join (thread1);
585   g_thread_join (thread2);
586
587   gst_collect_pads_stop (collect);
588   collected = FALSE;
589
590   GST_INFO ("round 2");
591
592   buf2 = gst_buffer_new ();
593
594   /* clear EOS from pads */
595   gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE));
596   gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
597
598   /* start collect pads */
599   gst_collect_pads_start (collect);
600
601   /* push buffers on the pads */
602   data1->pad = srcpad1;
603   data1->buffer = buf1;
604   thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
605   /* here thread1 is blocked and srcpad1 has a queued buffer */
606   fail_unless_collected (FALSE);
607
608   data2->pad = srcpad2;
609   data2->buffer = buf2;
610   thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
611
612   /* now both pads have a buffer */
613   fail_unless_collected (TRUE);
614
615   /* these will return immediately as at this point the threads have been
616    * unlocked and are finished */
617   g_thread_join (thread1);
618   g_thread_join (thread2);
619
620   gst_collect_pads_stop (collect);
621
622   gst_buffer_unref (buf1);
623   gst_buffer_unref (buf2);
624
625 }
626
627 GST_END_TEST;
628
629
630 /* Test the default collected buffer func */
631 GST_START_TEST (test_collect_default)
632 {
633   GstBuffer *buf1, *buf2;
634   GThread *thread1, *thread2;
635
636   data1 = (TestData *) gst_collect_pads_add_pad (collect,
637       sinkpad1, sizeof (TestData), NULL, TRUE);
638   fail_unless (data1 != NULL);
639
640   data2 = (TestData *) gst_collect_pads_add_pad (collect,
641       sinkpad2, sizeof (TestData), NULL, TRUE);
642   fail_unless (data2 != NULL);
643
644   buf1 = gst_buffer_new ();
645   GST_BUFFER_TIMESTAMP (buf1) = 0;
646   buf2 = gst_buffer_new ();
647   GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
648
649   /* start collect pads */
650   gst_collect_pads_start (collect);
651
652   /* push buffers on the pads */
653   data1->pad = srcpad1;
654   data1->buffer = buf1;
655   thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
656   /* here thread1 is blocked and srcpad1 has a queued buffer */
657   fail_unless_collected (FALSE);
658
659   data2->pad = srcpad2;
660   data2->buffer = buf2;
661   thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
662
663   /* now both pads have a buffer */
664   fail_unless_collected (TRUE);
665
666   /* The default callback should have popped the buffer with lower timestamp,
667    * and this should therefore be NULL: */
668   fail_unless (outbuf1 == NULL);
669   /* While this one should still be pending: */
670   fail_unless (outbuf2 == buf2);
671
672   /* these will return immediately as at this point the threads have been
673    * unlocked and are finished */
674   g_thread_join (thread1);
675   g_thread_join (thread2);
676
677   gst_collect_pads_stop (collect);
678
679   gst_buffer_unref (buf1);
680   gst_buffer_unref (buf2);
681 }
682
683 GST_END_TEST;
684
685
686 #define NUM_BUFFERS 3
687 static void
688 handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
689 {
690   *count = *count + 1;
691 }
692
693 /* Test a linear pipeline using aggregator */
694 GST_START_TEST (test_linear_pipeline)
695 {
696   GstElement *pipeline, *src, *agg, *sink;
697   GstBus *bus;
698   GstMessage *msg;
699   gint count = 0;
700
701   pipeline = gst_pipeline_new ("pipeline");
702   src = gst_check_setup_element ("fakesrc");
703   g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
704       NULL);
705   agg = gst_check_setup_element ("aggregator");
706   sink = gst_check_setup_element ("fakesink");
707   g_object_set (sink, "signal-handoffs", TRUE, NULL);
708   g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
709
710   fail_unless (gst_bin_add (GST_BIN (pipeline), src));
711   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
712   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
713   fail_unless (gst_element_link (src, agg));
714   fail_unless (gst_element_link (agg, sink));
715
716   bus = gst_element_get_bus (pipeline);
717   fail_if (bus == NULL);
718   gst_element_set_state (pipeline, GST_STATE_PLAYING);
719
720   msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
721   fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
722   gst_message_unref (msg);
723
724   fail_unless_equals_int (count, NUM_BUFFERS);
725
726   gst_element_set_state (pipeline, GST_STATE_NULL);
727   gst_object_unref (bus);
728   gst_object_unref (pipeline);
729 }
730
731 GST_END_TEST;
732
733 /* Test a linear pipeline using aggregator */
734 GST_START_TEST (test_branched_pipeline)
735 {
736   GstElement *pipeline, *src, *tee, *queue[2], *agg, *sink;
737   GstBus *bus;
738   GstMessage *msg;
739   gint count = 0;
740
741   pipeline = gst_pipeline_new ("pipeline");
742   src = gst_check_setup_element ("fakesrc");
743   g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
744       NULL);
745   tee = gst_check_setup_element ("tee");
746   queue[0] = gst_check_setup_element ("queue");
747   gst_object_set_name (GST_OBJECT (queue[0]), "queue0");
748   queue[1] = gst_check_setup_element ("queue");
749   gst_object_set_name (GST_OBJECT (queue[1]), "queue1");
750   agg = gst_check_setup_element ("aggregator");
751   sink = gst_check_setup_element ("fakesink");
752   g_object_set (sink, "signal-handoffs", TRUE, NULL);
753   g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
754
755   fail_unless (gst_bin_add (GST_BIN (pipeline), src));
756   fail_unless (gst_bin_add (GST_BIN (pipeline), tee));
757   fail_unless (gst_bin_add (GST_BIN (pipeline), queue[0]));
758   fail_unless (gst_bin_add (GST_BIN (pipeline), queue[1]));
759   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
760   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
761   fail_unless (gst_element_link (src, tee));
762   fail_unless (gst_element_link (tee, queue[0]));
763   fail_unless (gst_element_link (tee, queue[1]));
764   fail_unless (gst_element_link (queue[0], agg));
765   fail_unless (gst_element_link (queue[1], agg));
766   fail_unless (gst_element_link (agg, sink));
767
768   bus = gst_element_get_bus (pipeline);
769   fail_if (bus == NULL);
770   gst_element_set_state (pipeline, GST_STATE_PLAYING);
771
772   msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
773   fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
774   gst_message_unref (msg);
775
776   /* we have two branches, but we still only forward buffers from one branch */
777   fail_unless_equals_int (count, NUM_BUFFERS * 2);
778
779   gst_element_set_state (pipeline, GST_STATE_NULL);
780   gst_object_unref (bus);
781   gst_object_unref (pipeline);
782 }
783
784 GST_END_TEST;
785
786 static GstPadProbeReturn
787 downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
788 {
789   if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
790     if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
791         GST_EVENT_FLUSH_START)
792       g_atomic_int_inc (&flush_start_events);
793     else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
794         GST_EVENT_FLUSH_STOP)
795       g_atomic_int_inc (&flush_stop_events);
796   } else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) {
797     g_mutex_lock (&lock);
798     collected = TRUE;
799     g_cond_signal (&cond);
800     g_mutex_unlock (&lock);
801   }
802
803   return GST_PAD_PROBE_DROP;
804 }
805
806 static gboolean
807 src_event (GstPad * pad, GstObject * parent, GstEvent * event)
808 {
809   gboolean ret = TRUE;
810   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
811     if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) {
812       ret = FALSE;
813     }
814   }
815
816   gst_event_unref (event);
817   return ret;
818 }
819
820 static gboolean
821 agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
822 {
823   return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect,
824       pad, event);
825 }
826
827 static GstPad *
828 setup_src_pad (GstElement * element,
829     GstStaticPadTemplate * tmpl, const char *name)
830 {
831   GstPad *srcpad, *sinkpad;
832
833   srcpad = gst_pad_new_from_static_template (tmpl, "src");
834   sinkpad = gst_element_get_request_pad (element, name);
835   fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
836       "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
837   gst_pad_set_event_function (srcpad, src_event);
838   gst_pad_set_active (srcpad, TRUE);
839   gst_object_unref (sinkpad);
840
841   return srcpad;
842 }
843
844 static void
845 flush_setup (void)
846 {
847   agg = gst_check_setup_element ("aggregator");
848   agg_srcpad = gst_element_get_static_pad (agg, "src");
849   srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0");
850   srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1");
851   gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
852       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
853       GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL);
854   gst_pad_set_event_function (agg_srcpad, agg_src_event);
855   data1 = g_new0 (TestData, 1);
856   data2 = g_new0 (TestData, 1);
857   g_atomic_int_set (&flush_start_events, 0);
858   g_atomic_int_set (&flush_stop_events, 0);
859   gst_element_set_state (agg, GST_STATE_PLAYING);
860 }
861
862 static void
863 flush_teardown (void)
864 {
865   gst_element_set_state (agg, GST_STATE_NULL);
866   gst_object_unref (agg);
867   gst_object_unref (agg_srcpad);
868   gst_object_unref (srcpad1);
869   gst_object_unref (srcpad2);
870   g_free (data1);
871   g_free (data2);
872 }
873
874 GST_START_TEST (test_flushing_seek_failure)
875 {
876   GstBuffer *buf1, *buf2;
877   GThread *thread1, *thread2;
878   GstEvent *event;
879
880   /* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream
881    * element failing to handle the seek (see src_event()). Check that the
882    * flushing seek logic doesn't get triggered by checking that the buffer
883    * queued on agg:sink_1 doesn't get flushed.
884    */
885
886   /* queue a buffer in agg:sink_1 */
887   buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
888   GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
889   data2->pad = srcpad2;
890   data2->buffer = buf2;
891   thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
892   fail_unless_collected (FALSE);
893
894   /* do the seek */
895   event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
896       GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
897   g_atomic_int_set (&fail_seek, TRUE);
898   fail_if (gst_pad_send_event (agg_srcpad, event));
899
900   /* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */
901   fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
902   fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
903
904   /* check that the flush events reached agg:src */
905   fail_unless_equals_int (flush_start_events, 1);
906   fail_unless_equals_int (flush_stop_events, 1);
907
908   /* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1
909    * should not have been flushed at this point */
910   buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
911   GST_BUFFER_TIMESTAMP (buf1) = 0;
912   data1->pad = srcpad1;
913   data1->buffer = buf1;
914   thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
915   fail_unless_collected (TRUE);
916   collected = FALSE;
917
918   /* at this point thread1 must have returned */
919   g_thread_join (thread1);
920
921   /* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and
922    * the pushing thread returns */
923   data1->pad = srcpad1;
924   data1->event = gst_event_new_eos ();
925   thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL);
926   fail_unless_collected (TRUE);
927
928   g_thread_join (thread1);
929   g_thread_join (thread2);
930 }
931
932 GST_END_TEST;
933
934 GST_START_TEST (test_flushing_seek)
935 {
936   GstBuffer *buf1, *buf2;
937   GThread *thread1, *thread2;
938   GstEvent *event;
939
940   /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
941    * new flushing seek logic is triggered. On the first FLUSH_START call the
942    * buffers queued in collectpads should get flushed. Only one FLUSH_START and
943    * one FLUSH_STOP should be forwarded downstream.
944    */
945   buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
946   GST_BUFFER_TIMESTAMP (buf2) = 0;
947   data2->pad = srcpad2;
948   data2->buffer = buf2;
949   /* expect this buffer to be flushed */
950   data2->expected_result = GST_FLOW_FLUSHING;
951   thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
952
953   /* now do a successful flushing seek */
954   event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
955       GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
956   g_atomic_int_set (&fail_seek, FALSE);
957   fail_unless (gst_pad_send_event (agg_srcpad, event));
958
959   /* flushing starts once one of the upstream elements sends the first
960    * FLUSH_START */
961   fail_unless_equals_int (flush_start_events, 0);
962   fail_unless_equals_int (flush_stop_events, 0);
963
964   /* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends
965    * FLUSH_START downstream */
966   fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
967   fail_unless_equals_int (flush_start_events, 1);
968   fail_unless_equals_int (flush_stop_events, 0);
969   /* the first FLUSH_STOP is forwarded downstream */
970   fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
971   fail_unless_equals_int (flush_start_events, 1);
972   fail_unless_equals_int (flush_stop_events, 1);
973   /* at this point even the other pad agg:sink_1 should be flushing so thread2
974    * should have stopped */
975   g_thread_join (thread2);
976
977   /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
978    * that flushing completes once all the pads have been flushed */
979   buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
980   GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND;
981   data1->pad = srcpad1;
982   data1->buffer = buf1;
983   thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
984
985   /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
986    * sent downstream */
987   gst_pad_push_event (srcpad2, gst_event_new_flush_start ());
988   gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
989
990   /* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */
991   fail_unless_equals_int (flush_start_events, 1);
992   fail_unless_equals_int (flush_stop_events, 1);
993
994   /* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */
995   data2->pad = srcpad2;
996   data2->event = gst_event_new_eos ();
997   thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
998   fail_unless_collected (TRUE);
999
1000   /* these will return immediately as at this point the threads have been
1001    * unlocked and are finished */
1002   g_thread_join (thread1);
1003   g_thread_join (thread2);
1004 }
1005
1006 GST_END_TEST;
1007
1008 static Suite *
1009 gst_collect_pads_suite (void)
1010 {
1011   Suite *suite;
1012   TCase *general, *buffers, *pipeline, *flush;
1013
1014   gst_agregator_plugin_register ();
1015
1016   suite = suite_create ("GstCollectPads");
1017
1018   general = tcase_create ("general");
1019   suite_add_tcase (suite, general);
1020   tcase_add_checked_fixture (general, setup, teardown);
1021   tcase_add_test (general, test_pad_add_remove);
1022   tcase_add_test (general, test_collect);
1023   tcase_add_test (general, test_collect_eos);
1024   tcase_add_test (general, test_collect_twice);
1025
1026   buffers = tcase_create ("buffers");
1027   suite_add_tcase (suite, buffers);
1028   tcase_add_checked_fixture (buffers, setup_buffer_cb, teardown);
1029   tcase_add_test (buffers, test_collect_default);
1030
1031   pipeline = tcase_create ("pipeline");
1032   suite_add_tcase (suite, pipeline);
1033   tcase_add_test (pipeline, test_linear_pipeline);
1034   tcase_add_test (pipeline, test_branched_pipeline);
1035
1036   flush = tcase_create ("flush");
1037   suite_add_tcase (suite, flush);
1038   tcase_add_checked_fixture (flush, flush_setup, flush_teardown);
1039   tcase_add_test (flush, test_flushing_seek_failure);
1040   tcase_add_test (flush, test_flushing_seek);
1041
1042   return suite;
1043 }
1044
1045 GST_CHECK_MAIN (gst_collect_pads);