tests: filesrc: more Windows fixes
[platform/upstream/gstreamer.git] / tests / check / elements / fakesink.c
1 /* GStreamer
2  *
3  * unit test for fakesink
4  *
5  * Copyright (C) <2005> Thomas Vander Stichele <thomas at apestaart dot org>
6  *               <2007> Wim Taymans <wim@fluendo.com>
7  *               <2009> Tim-Philipp Müller <tim centricular net>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 #ifdef HAVE_CONFIG_H
25 #include "config.h"
26 #endif
27
28 #include <gst/base/gstpushsrc.h>
29 #include <gst/check/gstcheck.h>
30
31 typedef struct
32 {
33   GstPad *pad;
34   GstBuffer *buffer;
35   GThread *thread;
36   GstFlowReturn ret;
37 } ChainData;
38
39 static gpointer
40 chain_async_buffer (gpointer data)
41 {
42   ChainData *chain_data = (ChainData *) data;
43
44   chain_data->ret = gst_pad_chain (chain_data->pad, chain_data->buffer);
45
46   return chain_data;
47 }
48
49 static ChainData *
50 chain_async (GstPad * pad, GstBuffer * buffer)
51 {
52   GThread *thread;
53   ChainData *chain_data;
54   GError *error = NULL;
55
56   chain_data = g_new (ChainData, 1);
57   chain_data->pad = pad;
58   chain_data->buffer = buffer;
59   chain_data->ret = GST_FLOW_ERROR;
60
61   thread =
62       g_thread_try_new ("gst-check", chain_async_buffer, chain_data, &error);
63   if (error != NULL) {
64     g_warning ("could not create thread reason: %s", error->message);
65     g_free (chain_data);
66     return NULL;
67   }
68   chain_data->thread = thread;
69
70   return chain_data;
71 }
72
73 static GstFlowReturn
74 chain_async_return (ChainData * data)
75 {
76   GstFlowReturn ret;
77
78   g_thread_join (data->thread);
79   ret = data->ret;
80   g_free (data);
81
82   return ret;
83 }
84
85 GST_START_TEST (test_clipping)
86 {
87   GstElement *sink;
88   GstPad *sinkpad;
89   GstStateChangeReturn ret;
90
91   /* create sink */
92   sink = gst_element_factory_make ("fakesink", "sink");
93   fail_if (sink == NULL);
94
95   sinkpad = gst_element_get_static_pad (sink, "sink");
96   fail_if (sinkpad == NULL);
97
98   /* make element ready to accept data */
99   ret = gst_element_set_state (sink, GST_STATE_PAUSED);
100   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
101
102   gst_pad_send_event (sinkpad, gst_event_new_stream_start ("test"));
103
104   /* send segment */
105   {
106     GstEvent *event;
107     GstSegment segment;
108     gboolean eret;
109
110     GST_DEBUG ("sending segment");
111     gst_segment_init (&segment, GST_FORMAT_TIME);
112     segment.start = 1 * GST_SECOND;
113     segment.stop = 5 * GST_SECOND;
114     segment.time = 1 * GST_SECOND;
115
116     event = gst_event_new_segment (&segment);
117
118     eret = gst_pad_send_event (sinkpad, event);
119     fail_if (eret == FALSE);
120   }
121
122   /* new segment should not have finished preroll */
123   ret = gst_element_get_state (sink, NULL, NULL, 0);
124   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
125
126   /* send buffer that should be dropped */
127   {
128     GstBuffer *buffer;
129     GstFlowReturn fret;
130
131     buffer = gst_buffer_new ();
132     GST_BUFFER_TIMESTAMP (buffer) = 0;
133     GST_BUFFER_DURATION (buffer) = 1 * GST_MSECOND;
134
135     GST_DEBUG ("sending buffer to be dropped");
136     fret = gst_pad_chain (sinkpad, buffer);
137     fail_if (fret != GST_FLOW_OK);
138   }
139   /* dropped buffer should not have finished preroll */
140   ret = gst_element_get_state (sink, NULL, NULL, 0);
141   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
142
143   /* send buffer that should be dropped */
144   {
145     GstBuffer *buffer;
146     GstFlowReturn fret;
147
148     buffer = gst_buffer_new ();
149     GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
150     GST_BUFFER_DURATION (buffer) = 1 * GST_MSECOND;
151
152     GST_DEBUG ("sending buffer to be dropped");
153     fret = gst_pad_chain (sinkpad, buffer);
154     fail_if (fret != GST_FLOW_OK);
155   }
156   /* dropped buffer should not have finished preroll */
157   ret = gst_element_get_state (sink, NULL, NULL, 0);
158   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
159
160   /* send buffer that should block and finish preroll */
161   {
162     GstBuffer *buffer;
163     GstFlowReturn fret;
164     ChainData *data;
165     GstState current, pending;
166
167     buffer = gst_buffer_new ();
168     GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND;
169     GST_BUFFER_DURATION (buffer) = 1 * GST_MSECOND;
170
171     GST_DEBUG ("sending buffer to finish preroll");
172     data = chain_async (sinkpad, buffer);
173     fail_if (data == NULL);
174
175     /* state should now eventually change to PAUSED */
176     ret = gst_element_get_state (sink, &current, &pending, GST_CLOCK_TIME_NONE);
177     fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
178     fail_unless (current == GST_STATE_PAUSED);
179     fail_unless (pending == GST_STATE_VOID_PENDING);
180
181     /* playing should render the buffer */
182     ret = gst_element_set_state (sink, GST_STATE_PLAYING);
183     fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
184
185     /* and we should get a success return value */
186     fret = chain_async_return (data);
187     fail_if (fret != GST_FLOW_OK);
188   }
189
190   /* send some buffer that will be dropped or clipped, this can 
191    * only be observed in the debug log. */
192   {
193     GstBuffer *buffer;
194     GstFlowReturn fret;
195
196     buffer = gst_buffer_new ();
197     GST_BUFFER_TIMESTAMP (buffer) = 6 * GST_SECOND;
198     GST_BUFFER_DURATION (buffer) = 1 * GST_MSECOND;
199
200     /* should be dropped */
201     GST_DEBUG ("sending buffer to drop");
202     fret = gst_pad_chain (sinkpad, buffer);
203     fail_if (fret != GST_FLOW_OK);
204
205     buffer = gst_buffer_new ();
206     GST_BUFFER_TIMESTAMP (buffer) = 0 * GST_SECOND;
207     GST_BUFFER_DURATION (buffer) = 2 * GST_SECOND;
208
209     /* should be clipped */
210     GST_DEBUG ("sending buffer to clip");
211     fret = gst_pad_chain (sinkpad, buffer);
212     fail_if (fret != GST_FLOW_OK);
213
214     buffer = gst_buffer_new ();
215     GST_BUFFER_TIMESTAMP (buffer) = 4 * GST_SECOND;
216     GST_BUFFER_DURATION (buffer) = 2 * GST_SECOND;
217
218     /* should be clipped */
219     GST_DEBUG ("sending buffer to clip");
220     fret = gst_pad_chain (sinkpad, buffer);
221     fail_if (fret != GST_FLOW_OK);
222   }
223
224   gst_element_set_state (sink, GST_STATE_NULL);
225   gst_element_get_state (sink, NULL, NULL, GST_CLOCK_TIME_NONE);
226   gst_object_unref (sinkpad);
227   gst_object_unref (sink);
228 }
229
230 GST_END_TEST;
231
232 static gint num_preroll = 0;
233
234 static void
235 preroll_count (GstElement * sink)
236 {
237   num_preroll++;
238   GST_DEBUG ("got preroll handoff %d", num_preroll);
239 }
240
241 GST_START_TEST (test_preroll_sync)
242 {
243   GstElement *pipeline, *sink;
244   GstPad *sinkpad;
245   GstStateChangeReturn ret;
246
247   /* create sink */
248   pipeline = gst_pipeline_new ("pipeline");
249   fail_if (pipeline == NULL);
250
251   sink = gst_element_factory_make ("fakesink", "sink");
252   fail_if (sink == NULL);
253   g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);
254   g_object_set (G_OBJECT (sink), "signal-handoffs", TRUE, NULL);
255   g_signal_connect (sink, "preroll-handoff", G_CALLBACK (preroll_count), NULL);
256
257   fail_unless (num_preroll == 0);
258
259   gst_bin_add (GST_BIN (pipeline), sink);
260
261   sinkpad = gst_element_get_static_pad (sink, "sink");
262   fail_if (sinkpad == NULL);
263
264   /* make pipeline and element ready to accept data */
265   ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
266   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
267
268   gst_pad_send_event (sinkpad, gst_event_new_stream_start ("test"));
269
270   /* send segment */
271   {
272     GstEvent *event;
273     GstSegment segment;
274     gboolean eret;
275
276     GST_DEBUG ("sending segment");
277     gst_segment_init (&segment, GST_FORMAT_TIME);
278     segment.start = 0 * GST_SECOND;
279     segment.stop = 102 * GST_SECOND;
280     segment.time = 0 * GST_SECOND;
281
282     event = gst_event_new_segment (&segment);
283     eret = gst_pad_send_event (sinkpad, event);
284     fail_if (eret == FALSE);
285   }
286
287   /* send buffer that should block and finish preroll */
288   {
289     GstBuffer *buffer;
290     GstFlowReturn fret;
291     ChainData *data;
292     GstState current, pending;
293
294     buffer = gst_buffer_new ();
295     GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND;
296     GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
297
298     GST_DEBUG ("sending buffer to finish preroll");
299     data = chain_async (sinkpad, buffer);
300     fail_if (data == NULL);
301
302     /* state should now eventually change to PAUSED */
303     ret =
304         gst_element_get_state (pipeline, &current, &pending,
305         GST_CLOCK_TIME_NONE);
306     fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
307     fail_unless (current == GST_STATE_PAUSED);
308     fail_unless (pending == GST_STATE_VOID_PENDING);
309
310     fail_unless (num_preroll == 1);
311
312     /* playing should render the buffer */
313     ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
314     fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
315
316     /* and we should get a success return value */
317     fret = chain_async_return (data);
318     fail_if (fret != GST_FLOW_OK);
319
320     /* now we are playing no new preroll was done */
321     fail_unless (num_preroll == 1);
322
323     buffer = gst_buffer_new ();
324     /* far in the future to make sure we block */
325     GST_BUFFER_TIMESTAMP (buffer) = 100 * GST_SECOND;
326     GST_BUFFER_DURATION (buffer) = 100 * GST_SECOND;
327     data = chain_async (sinkpad, buffer);
328     fail_if (data == NULL);
329
330     g_usleep (1000000);
331
332     /* pause again. Since the buffer has a humongous timestamp we likely
333      * interrupt the clock_wait and we should preroll on this buffer again */
334     ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
335     fail_unless (ret == GST_STATE_CHANGE_ASYNC);
336
337     ret =
338         gst_element_get_state (pipeline, &current, &pending,
339         GST_CLOCK_TIME_NONE);
340     fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
341     fail_unless (current == GST_STATE_PAUSED);
342     fail_unless (pending == GST_STATE_VOID_PENDING);
343
344     fail_unless (num_preroll == 2);
345
346     /* shutdown */
347     ret = gst_element_set_state (pipeline, GST_STATE_READY);
348     fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
349
350     /* should be wrong state now */
351     fret = chain_async_return (data);
352     fail_if (fret != GST_FLOW_FLUSHING);
353   }
354   gst_element_set_state (pipeline, GST_STATE_NULL);
355   gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
356   gst_object_unref (sinkpad);
357   gst_object_unref (pipeline);
358 }
359
360 GST_END_TEST;
361
362 /* after EOS, we refuse everything */
363 GST_START_TEST (test_eos)
364 {
365   GstElement *pipeline, *sink;
366   GstPad *sinkpad;
367   GstStateChangeReturn ret;
368   GstMessage *message;
369   GstBus *bus;
370
371   /* create sink */
372   pipeline = gst_pipeline_new ("pipeline");
373   fail_if (pipeline == NULL);
374
375   bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (pipeline));
376   fail_if (bus == NULL);
377
378   sink = gst_element_factory_make ("fakesink", "sink");
379   fail_if (sink == NULL);
380   g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);
381
382   gst_bin_add (GST_BIN (pipeline), sink);
383
384   sinkpad = gst_element_get_static_pad (sink, "sink");
385   fail_if (sinkpad == NULL);
386
387   /* make pipeline and element ready to accept data */
388   ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
389   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
390
391   /* send EOS, this should work fine */
392   {
393     GstEvent *eos;
394     gboolean eret;
395
396     GST_DEBUG ("sending EOS");
397     eos = gst_event_new_eos ();
398
399     eret = gst_pad_send_event (sinkpad, eos);
400     fail_if (eret == FALSE);
401   }
402
403   /* wait for preroll */
404   gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
405
406   /* EOS should be on the bus at some point */
407   while (TRUE) {
408     GstMessageType type;
409
410     /* blocking wait for messages */
411     message = gst_bus_timed_pop (bus, GST_CLOCK_TIME_NONE);
412     type = GST_MESSAGE_TYPE (message);
413     gst_message_unref (message);
414
415     GST_DEBUG ("got message %s", gst_message_type_get_name (type));
416
417     if (type == GST_MESSAGE_EOS)
418       break;
419   }
420   gst_object_unref (bus);
421
422   /* send another EOS, this should fail */
423   {
424     GstEvent *eos;
425     gboolean eret;
426
427     GST_DEBUG ("sending second EOS");
428     eos = gst_event_new_eos ();
429
430     eret = gst_pad_send_event (sinkpad, eos);
431     fail_if (eret == TRUE);
432   }
433
434   /* send segment, this should fail */
435   {
436     GstEvent *event;
437     GstSegment segment;
438     gboolean eret;
439
440     GST_DEBUG ("sending segment");
441     gst_segment_init (&segment, GST_FORMAT_TIME);
442     segment.start = 0 * GST_SECOND;
443     segment.stop = 2 * GST_SECOND;
444     segment.time = 0 * GST_SECOND;
445     event = gst_event_new_segment (&segment);
446
447     eret = gst_pad_send_event (sinkpad, event);
448     fail_if (eret == TRUE);
449   }
450
451   /* send buffer that should fail after EOS */
452   {
453     GstBuffer *buffer;
454     GstFlowReturn fret;
455
456     buffer = gst_buffer_new ();
457     GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND;
458     GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
459
460     GST_DEBUG ("sending buffer");
461
462     /* buffer after EOS is not EOS */
463     fret = gst_pad_chain (sinkpad, buffer);
464     fail_unless (fret == GST_FLOW_EOS);
465   }
466
467   /* flush, EOS state is flushed again. */
468   {
469     GstEvent *event;
470     gboolean eret;
471
472     GST_DEBUG ("sending FLUSH_START");
473     event = gst_event_new_flush_start ();
474     eret = gst_pad_send_event (sinkpad, event);
475     fail_unless (eret == TRUE);
476
477     GST_DEBUG ("sending FLUSH_STOP");
478     event = gst_event_new_flush_stop (TRUE);
479     eret = gst_pad_send_event (sinkpad, event);
480     fail_unless (eret == TRUE);
481   }
482
483   gst_pad_send_event (sinkpad, gst_event_new_stream_start ("test"));
484
485   /* send segment, this should now work again */
486   {
487     GstEvent *event;
488     GstSegment segment;
489     gboolean eret;
490
491     GST_DEBUG ("sending segment");
492     gst_segment_init (&segment, GST_FORMAT_TIME);
493     segment.start = 0 * GST_SECOND;
494     segment.stop = 2 * GST_SECOND;
495     segment.time = 0 * GST_SECOND;
496     event = gst_event_new_segment (&segment);
497
498     eret = gst_pad_send_event (sinkpad, event);
499     fail_unless (eret == TRUE);
500   }
501
502   /* send buffer that should work and block */
503   {
504     GstBuffer *buffer;
505     GstFlowReturn fret;
506
507     buffer = gst_buffer_new ();
508     GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND;
509     GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
510
511     GST_DEBUG ("sending buffer");
512
513     fret = gst_pad_chain (sinkpad, buffer);
514     fail_unless (fret == GST_FLOW_OK);
515   }
516
517   gst_element_set_state (pipeline, GST_STATE_NULL);
518   gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
519   gst_object_unref (sinkpad);
520   gst_object_unref (pipeline);
521 }
522
523 GST_END_TEST;
524
525 /* test EOS triggered by the element */
526 GST_START_TEST (test_eos2)
527 {
528   GstElement *pipeline, *sink;
529   GstPad *sinkpad;
530   GstStateChangeReturn ret;
531
532   /* create sink */
533   pipeline = gst_pipeline_new ("pipeline");
534   fail_if (pipeline == NULL);
535
536   sink = gst_element_factory_make ("fakesink", "sink");
537   fail_if (sink == NULL);
538   g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);
539   g_object_set (G_OBJECT (sink), "num-buffers", 1, NULL);
540
541   gst_bin_add (GST_BIN (pipeline), sink);
542
543   sinkpad = gst_element_get_static_pad (sink, "sink");
544   fail_if (sinkpad == NULL);
545
546   /* make pipeline and element ready to accept data */
547   ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
548   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
549
550   gst_pad_send_event (sinkpad, gst_event_new_stream_start ("test"));
551
552   /* send segment, this should work */
553   {
554     GstEvent *event;
555     GstSegment segment;
556     gboolean eret;
557
558     GST_DEBUG ("sending segment");
559     gst_segment_init (&segment, GST_FORMAT_TIME);
560     segment.start = 0 * GST_SECOND;
561     segment.stop = 2 * GST_SECOND;
562     segment.time = 0 * GST_SECOND;
563     event = gst_event_new_segment (&segment);
564
565     eret = gst_pad_send_event (sinkpad, event);
566     fail_if (eret == FALSE);
567   }
568
569   /* send buffer that should return EOS */
570   {
571     GstBuffer *buffer;
572     GstFlowReturn fret;
573
574     buffer = gst_buffer_new ();
575     GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND;
576     GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
577
578     GST_DEBUG ("sending buffer");
579
580     /* this buffer will generate EOS */
581     fret = gst_pad_chain (sinkpad, buffer);
582     fail_unless (fret == GST_FLOW_EOS);
583   }
584
585   /* send buffer that should return EOS */
586   {
587     GstBuffer *buffer;
588     GstFlowReturn fret;
589
590     buffer = gst_buffer_new ();
591     GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND;
592     GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
593
594     GST_DEBUG ("sending buffer");
595
596     fret = gst_pad_chain (sinkpad, buffer);
597     fail_unless (fret == GST_FLOW_EOS);
598   }
599
600   gst_element_set_state (pipeline, GST_STATE_NULL);
601   gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
602   gst_object_unref (sinkpad);
603   gst_object_unref (pipeline);
604 }
605
606 GST_END_TEST;
607
608 /* test position reporting before, during and after flush 
609  * in PAUSED and PLAYING */
610 GST_START_TEST (test_position)
611 {
612   GstElement *pipeline, *sink;
613   GstPad *sinkpad;
614   GstStateChangeReturn ret;
615   gboolean qret;
616   gint64 qcur;
617   GstBuffer *buffer;
618   GstFlowReturn fret;
619   ChainData *data;
620   GstEvent *event;
621   gboolean eret;
622   gint i;
623
624   /* create sink */
625   pipeline = gst_pipeline_new ("pipeline");
626   fail_if (pipeline == NULL);
627
628   sink = gst_element_factory_make ("fakesink", "sink");
629   fail_if (sink == NULL);
630   g_object_set (G_OBJECT (sink), "sync", TRUE, NULL);
631   g_object_set (G_OBJECT (sink), "num-buffers", 2, NULL);
632
633   gst_bin_add (GST_BIN (pipeline), sink);
634
635   sinkpad = gst_element_get_static_pad (sink, "sink");
636   fail_if (sinkpad == NULL);
637
638   /* do position query, this should fail, we have nothing received yet */
639   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
640   fail_unless (qret == FALSE);
641
642   ret = gst_element_set_state (pipeline, GST_STATE_READY);
643   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
644
645   /* do position query, this should fail, we have nothing received yet */
646   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
647   fail_unless (qret == FALSE);
648
649   /* make pipeline and element ready to accept data */
650   ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
651   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
652
653   /* do position query, this should fail, we have nothing received yet */
654   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
655   fail_unless (qret == FALSE);
656
657   gst_pad_send_event (sinkpad, gst_event_new_stream_start ("test"));
658
659   /* send segment, this should work */
660   {
661     GstSegment segment;
662
663     GST_DEBUG ("sending segment");
664     gst_segment_init (&segment, GST_FORMAT_TIME);
665     segment.start = 1 * GST_SECOND;
666     segment.stop = 3 * GST_SECOND;
667     segment.time = 1 * GST_SECOND;
668     event = gst_event_new_segment (&segment);
669
670     eret = gst_pad_send_event (sinkpad, event);
671     fail_if (eret == FALSE);
672   }
673
674   /* FIXME, do position query, this should succeed with the time value from the
675    * segment. */
676   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
677   fail_unless (qret == TRUE);
678   fail_unless (qcur == 1 * GST_SECOND);
679
680   /* send buffer that we will flush out */
681   buffer = gst_buffer_new ();
682   GST_BUFFER_TIMESTAMP (buffer) = 2 * GST_SECOND;
683   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
684
685   GST_DEBUG ("sending buffer");
686
687   /* this buffer causes the sink to preroll */
688   data = chain_async (sinkpad, buffer);
689   fail_if (data == NULL);
690
691   /* wait for preroll */
692   ret = gst_element_get_state (pipeline, NULL, NULL, -1);
693
694   /* do position query, this should succeed with the time value from the
695    * segment. */
696   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
697   fail_unless (qret == TRUE);
698   fail_unless (qcur == 1 * GST_SECOND);
699
700   /* start flushing, no timing is affected yet */
701   {
702     GST_DEBUG ("sending flush_start");
703     event = gst_event_new_flush_start ();
704
705     eret = gst_pad_send_event (sinkpad, event);
706     fail_if (eret == FALSE);
707   }
708
709   /* preroll buffer is flushed out */
710   fret = chain_async_return (data);
711   fail_unless (fret == GST_FLOW_FLUSHING);
712
713   /* do position query, this should succeed with the time value from the
714    * segment before the flush. */
715   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
716   fail_unless (qret == TRUE);
717   fail_unless (qcur == 1 * GST_SECOND);
718
719   /* stop flushing, timing is affected now */
720   {
721     GST_DEBUG ("sending flush_stop");
722     event = gst_event_new_flush_stop (TRUE);
723
724     eret = gst_pad_send_event (sinkpad, event);
725     fail_if (eret == FALSE);
726   }
727
728   /* do position query, this should fail, the segment is flushed */
729   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
730   fail_unless (qret == FALSE);
731
732   /* send segment, this should work */
733   {
734     GstSegment segment;
735
736     GST_DEBUG ("sending segment");
737     gst_segment_init (&segment, GST_FORMAT_TIME);
738     segment.start = 2 * GST_SECOND;
739     segment.stop = 4 * GST_SECOND;
740     segment.time = 1 * GST_SECOND;
741     event = gst_event_new_segment (&segment);
742
743     eret = gst_pad_send_event (sinkpad, event);
744     fail_if (eret == FALSE);
745   }
746
747   /* send buffer that should return OK */
748   buffer = gst_buffer_new ();
749   GST_BUFFER_TIMESTAMP (buffer) = 3 * GST_SECOND;
750   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
751
752   GST_DEBUG ("sending buffer");
753
754   /* this buffer causes the sink to preroll */
755   data = chain_async (sinkpad, buffer);
756   fail_if (data == NULL);
757
758   /* wait for preroll */
759   ret = gst_element_get_state (pipeline, NULL, NULL, -1);
760
761   /* do position query, this should succeed with the time value from the
762    * segment. */
763   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
764   fail_unless (qret == TRUE);
765   fail_unless (qcur == 1 * GST_SECOND);
766
767   ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
768   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
769
770   /* position now is increasing but never exceeds the boundaries of the segment */
771   for (i = 0; i < 5; i++) {
772     qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
773     GST_DEBUG ("position %" GST_TIME_FORMAT, GST_TIME_ARGS (qcur));
774     fail_unless (qret == TRUE);
775     fail_unless (qcur >= 1 * GST_SECOND && qcur <= 3 * GST_SECOND);
776     g_usleep (1000 * 250);
777   }
778
779   /* preroll buffer is rendered, we expect one more buffer after this one */
780   fret = chain_async_return (data);
781   fail_unless (fret == GST_FLOW_OK);
782
783   /* after rendering the position must be bigger then the stream_time of the
784    * buffer */
785   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
786   fail_unless (qret == TRUE);
787   fail_unless (qcur >= 2 * GST_SECOND && qcur <= 3 * GST_SECOND);
788
789   /* start flushing in PLAYING */
790   {
791     GST_DEBUG ("sending flush_start");
792     event = gst_event_new_flush_start ();
793
794     eret = gst_pad_send_event (sinkpad, event);
795     fail_if (eret == FALSE);
796   }
797
798   /* this should now just report the last stream time */
799   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
800   fail_unless (qret == TRUE);
801   fail_unless (qcur >= 2 * GST_SECOND && qcur <= 3 * GST_SECOND);
802
803   {
804     GST_DEBUG ("sending flush_stop");
805     event = gst_event_new_flush_stop (TRUE);
806
807     eret = gst_pad_send_event (sinkpad, event);
808     fail_if (eret == FALSE);
809   }
810
811   /* do position query, this should fail, the segment is flushed */
812   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
813   fail_unless (qret == FALSE);
814
815   /* send segment, this should work */
816   {
817     GstSegment segment;
818
819     GST_DEBUG ("sending segment");
820     gst_segment_init (&segment, GST_FORMAT_TIME);
821     segment.start = 2 * GST_SECOND;
822     segment.stop = 4 * GST_SECOND;
823     segment.time = 1 * GST_SECOND;
824     event = gst_event_new_segment (&segment);
825
826     eret = gst_pad_send_event (sinkpad, event);
827     fail_if (eret == FALSE);
828   }
829
830   /* send buffer that should return EOS */
831   buffer = gst_buffer_new ();
832   GST_BUFFER_TIMESTAMP (buffer) = 3 * GST_SECOND;
833   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
834
835   GST_DEBUG ("sending buffer");
836
837   /* this buffer causes the sink to preroll */
838   data = chain_async (sinkpad, buffer);
839   fail_if (data == NULL);
840
841   /* wait for preroll */
842   ret = gst_element_get_state (pipeline, NULL, NULL, -1);
843
844   /* preroll buffer is rendered, we expect no more buffer after this one */
845   fret = chain_async_return (data);
846   fail_unless (fret == GST_FLOW_EOS);
847
848   /* do position query, this should succeed with the stream time of the buffer
849    * against the clock. Since the buffer is synced against the clock, the time
850    * should be at least the stream time of the buffer. */
851   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
852   fail_unless (qret == TRUE);
853   fail_unless (qcur >= 2 * GST_SECOND && qcur <= 3 * GST_SECOND);
854
855   /* wait 2 more seconds, enough to test if the position was clipped correctly
856    * against the segment */
857   g_usleep (2 * G_USEC_PER_SEC);
858
859   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
860   fail_unless (qret == TRUE);
861   fail_unless (qcur == 3 * GST_SECOND);
862
863   GST_DEBUG ("going to PAUSED");
864
865   ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
866   fail_unless (ret == GST_STATE_CHANGE_ASYNC);
867
868   /* we report the time of the last start of the buffer. This is slightly
869    * incorrect, we should report the exact time when we paused but there is no
870    * record of that anywhere */
871   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
872   fail_unless (qret == TRUE);
873   fail_unless (qcur == 3 * GST_SECOND);
874
875   ret = gst_element_set_state (pipeline, GST_STATE_READY);
876   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
877
878   /* fails again because we are in the wrong state */
879   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
880   fail_unless (qret == FALSE);
881
882   gst_element_set_state (pipeline, GST_STATE_NULL);
883
884   qret = gst_element_query_position (sink, GST_FORMAT_TIME, &qcur);
885   fail_unless (qret == FALSE);
886
887   gst_object_unref (sinkpad);
888   gst_object_unref (pipeline);
889 }
890
891 GST_END_TEST;
892
893 /* like fakesrc, but also pushes an OOB event after each buffer */
894 typedef GstPushSrc OOBSource;
895 typedef GstPushSrcClass OOBSourceClass;
896
897 GType oob_source_get_type (void);
898 G_DEFINE_TYPE (OOBSource, oob_source, GST_TYPE_PUSH_SRC);
899
900 static GstFlowReturn
901 oob_source_create (GstPushSrc * src, GstBuffer ** p_buf)
902 {
903   *p_buf = gst_buffer_new ();
904
905   gst_pad_push_event (GST_BASE_SRC_PAD (src),
906       gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, NULL));
907
908   return GST_FLOW_OK;
909 }
910
911 static void
912 oob_source_class_init (OOBSourceClass * klass)
913 {
914   static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("src",
915       GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
916   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
917   GstPushSrcClass *pushsrc_class = GST_PUSH_SRC_CLASS (klass);
918
919   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
920
921   pushsrc_class->create = GST_DEBUG_FUNCPTR (oob_source_create);
922 }
923
924 static void
925 oob_source_init (OOBSource * src)
926 {
927   /* nothing to do */
928 }
929
930 #define NOTIFY_RACE_NUM_PIPELINES 10
931
932 typedef struct
933 {
934   GstElement *src;
935   GstElement *queue;
936   GstElement *sink;
937   GstElement *pipe;
938 } NotifyRacePipeline;
939
940 static void
941 test_notify_race_setup_pipeline (NotifyRacePipeline * p)
942 {
943   GST_DEBUG ("Creating pipeline");
944   p->pipe = gst_pipeline_new ("pipeline");
945   p->src = g_object_new (oob_source_get_type (), NULL);
946
947   p->queue = gst_element_factory_make ("queue", NULL);
948   g_object_set (p->queue, "max-size-buffers", 2, NULL);
949
950   p->sink = gst_element_factory_make ("fakesink", NULL);
951   gst_bin_add (GST_BIN (p->pipe), p->src);
952   gst_bin_add (GST_BIN (p->pipe), p->queue);
953   gst_bin_add (GST_BIN (p->pipe), p->sink);
954   gst_element_link_many (p->src, p->queue, p->sink, NULL);
955
956   GST_DEBUG ("Setting pipeline to PLAYING");
957   fail_unless_equals_int (gst_element_set_state (p->pipe, GST_STATE_PLAYING),
958       GST_STATE_CHANGE_ASYNC);
959   GST_DEBUG ("Getting state");
960   fail_unless_equals_int (gst_element_get_state (p->pipe, NULL, NULL, -1),
961       GST_STATE_CHANGE_SUCCESS);
962 }
963
964 static void
965 test_notify_race_cleanup_pipeline (NotifyRacePipeline * p)
966 {
967   gst_element_set_state (p->pipe, GST_STATE_NULL);
968   gst_object_unref (p->pipe);
969   memset (p, 0, sizeof (NotifyRacePipeline));
970 }
971
972 /* we create N pipelines to make sure the notify race isn't per-class, but
973  * only per instance */
974 GST_START_TEST (test_notify_race)
975 {
976   NotifyRacePipeline pipelines[NOTIFY_RACE_NUM_PIPELINES];
977   int i;
978
979   for (i = 0; i < G_N_ELEMENTS (pipelines); ++i) {
980     GST_DEBUG ("Starting up pipeline %d", i);
981     test_notify_race_setup_pipeline (&pipelines[i]);
982   }
983
984   g_usleep (2 * G_USEC_PER_SEC);
985
986   for (i = 0; i < G_N_ELEMENTS (pipelines); ++i) {
987     GST_DEBUG ("Cleaning up pipeline %d", i);
988     test_notify_race_cleanup_pipeline (&pipelines[i]);
989   }
990 }
991
992 GST_END_TEST;
993
994 static void
995 last_message_cb (GObject * obj, GParamSpec * pspec, gpointer user_data)
996 {
997   gint *p_counter = user_data;
998   gchar *s, *end, *last_msg = NULL;
999   guint64 offset, count;
1000
1001   g_object_get (obj, "last-message", &last_msg, NULL);
1002   fail_unless (last_msg != NULL);
1003
1004   if (!strstr (last_msg, "chain"))
1005     goto skip;
1006
1007   GST_LOG_OBJECT (obj, "%s", last_msg);
1008
1009   s = strstr (last_msg, "offset: ");
1010   fail_unless (s != NULL);
1011
1012   s += strlen ("offset: ");
1013
1014   offset = g_ascii_strtoull (s, &end, 10);
1015   fail_unless (offset < G_MAXUINT64);
1016   fail_if (end == s);
1017
1018   count = *p_counter;
1019
1020   fail_unless_equals_int (count, offset);
1021
1022   *p_counter = count + 1;
1023
1024 skip:
1025
1026   g_free (last_msg);
1027 }
1028
1029 #define NUM_BUFFERS 500
1030
1031 GST_START_TEST (test_last_message_notify)
1032 {
1033   GstElement *pipe, *src, *tee, *q1, *q2, *sink1, *sink2;
1034   gint counter1 = 0;
1035   gint counter2 = 0;
1036   GstMessage *m;
1037
1038   pipe = gst_pipeline_new ("pipeline");
1039   src = gst_element_factory_make ("fakesrc", NULL);
1040   gst_util_set_object_arg (G_OBJECT (src), "sizetype", "fixed");
1041   g_object_set (src, "num-buffers", NUM_BUFFERS, "sizemax", 1, NULL);
1042
1043   tee = gst_element_factory_make ("tee", NULL);
1044
1045   q1 = gst_element_factory_make ("queue", NULL);
1046   sink1 = gst_element_factory_make ("fakesink", NULL);
1047   g_object_set (sink1, "silent", FALSE, NULL);
1048
1049   q2 = gst_element_factory_make ("queue", NULL);
1050   sink2 = gst_element_factory_make ("fakesink", NULL);
1051   g_object_set (sink2, "silent", FALSE, NULL);
1052
1053   gst_bin_add_many (GST_BIN (pipe), src, tee, q1, q2, sink1, sink2, NULL);
1054   fail_unless (gst_element_link_many (src, tee, NULL));
1055   fail_unless (gst_element_link_many (tee, q1, sink1, NULL));
1056   fail_unless (gst_element_link_many (tee, q2, sink2, NULL));
1057
1058   g_signal_connect (sink1, "notify::last-message",
1059       G_CALLBACK (last_message_cb), &counter1);
1060   g_signal_connect (sink2, "notify::last-message",
1061       G_CALLBACK (last_message_cb), &counter2);
1062
1063   GST_DEBUG ("Setting pipeline to PLAYING");
1064   fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_PLAYING),
1065       GST_STATE_CHANGE_ASYNC);
1066
1067   m = gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1, GST_MESSAGE_EOS);
1068   gst_message_unref (m);
1069
1070   fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_NULL),
1071       GST_STATE_CHANGE_SUCCESS);
1072
1073   fail_unless_equals_int (counter1, NUM_BUFFERS);
1074   fail_unless_equals_int (counter2, NUM_BUFFERS);
1075
1076   gst_object_unref (pipe);
1077 }
1078
1079 GST_END_TEST;
1080
1081 static void
1082 deep_notify_last_message_cb (GstObject * obj, GstObject * prop_obj,
1083     GParamSpec * pspec, gpointer user_data)
1084 {
1085   gint *counter_array = user_data;
1086   gint *p_counter;
1087   gchar *s, *end, *last_msg = NULL;
1088   guint64 offset, count;
1089
1090   if (strcmp (GST_OBJECT_NAME (prop_obj), "fakesink0") == 0)
1091     p_counter = counter_array;
1092   else if (strcmp (GST_OBJECT_NAME (prop_obj), "fakesink1") == 0)
1093     p_counter = counter_array + 1;
1094   else
1095     g_assert_not_reached ();
1096
1097   g_object_get (prop_obj, "last-message", &last_msg, NULL);
1098   fail_unless (last_msg != NULL);
1099
1100   if (!strstr (last_msg, "chain"))
1101     goto skip;
1102
1103   GST_LOG_OBJECT (prop_obj, "%s", last_msg);
1104
1105   s = strstr (last_msg, "offset: ");
1106   fail_unless (s != NULL);
1107
1108   s += strlen ("offset: ");
1109
1110   offset = g_ascii_strtoull (s, &end, 10);
1111   fail_unless (offset < G_MAXUINT64);
1112   fail_if (end == s);
1113
1114   count = *p_counter;
1115
1116 //  fail_unless_equals_int (count, offset);
1117
1118   *p_counter = count + 1;
1119
1120 skip:
1121
1122   g_free (last_msg);
1123 }
1124
1125 GST_START_TEST (test_last_message_deep_notify)
1126 {
1127   GstElement *pipe, *src, *tee, *q1, *q2, *sink1, *sink2;
1128   gint counter[2] = { 0, 0 };
1129   GstMessage *m;
1130
1131   pipe = gst_pipeline_new ("pipeline");
1132   src = gst_element_factory_make ("fakesrc", NULL);
1133   gst_util_set_object_arg (G_OBJECT (src), "sizetype", "fixed");
1134   g_object_set (src, "num-buffers", NUM_BUFFERS, "sizemax", 1, NULL);
1135
1136   tee = gst_element_factory_make ("tee", NULL);
1137
1138   q1 = gst_element_factory_make ("queue", NULL);
1139   sink1 = gst_element_factory_make ("fakesink", NULL);
1140   g_object_set (sink1, "silent", FALSE, NULL);
1141
1142   q2 = gst_element_factory_make ("queue", NULL);
1143   sink2 = gst_element_factory_make ("fakesink", NULL);
1144   g_object_set (sink2, "silent", FALSE, NULL);
1145
1146   gst_bin_add_many (GST_BIN (pipe), src, tee, q1, q2, sink1, sink2, NULL);
1147   fail_unless (gst_element_link_many (src, tee, NULL));
1148   fail_unless (gst_element_link_many (tee, q1, sink1, NULL));
1149   fail_unless (gst_element_link_many (tee, q2, sink2, NULL));
1150
1151   g_signal_connect (pipe, "deep-notify::last-message",
1152       G_CALLBACK (deep_notify_last_message_cb), counter);
1153
1154   GST_DEBUG ("Setting pipeline to PLAYING");
1155   fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_PLAYING),
1156       GST_STATE_CHANGE_ASYNC);
1157
1158   m = gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1, GST_MESSAGE_EOS);
1159   gst_message_unref (m);
1160
1161   fail_unless_equals_int (gst_element_set_state (pipe, GST_STATE_NULL),
1162       GST_STATE_CHANGE_SUCCESS);
1163
1164   GST_ERROR ("sink1: %d, sink2: %d, total: %d", counter[0], counter[1],
1165       counter[0] + counter[1]);
1166
1167   fail_unless_equals_int (counter[0], NUM_BUFFERS);
1168   fail_unless_equals_int (counter[1], NUM_BUFFERS);
1169 }
1170
1171 GST_END_TEST;
1172
1173 static Suite *
1174 fakesink_suite (void)
1175 {
1176   Suite *s = suite_create ("fakesink");
1177   TCase *tc_chain = tcase_create ("general");
1178
1179   tcase_set_timeout (tc_chain, 20);
1180
1181   suite_add_tcase (s, tc_chain);
1182   tcase_add_test (tc_chain, test_clipping);
1183   tcase_add_test (tc_chain, test_preroll_sync);
1184   tcase_add_test (tc_chain, test_eos);
1185   tcase_add_test (tc_chain, test_eos2);
1186   tcase_add_test (tc_chain, test_position);
1187   tcase_add_test (tc_chain, test_notify_race);
1188   tcase_add_test (tc_chain, test_last_message_notify);
1189   tcase_skip_broken_test (tc_chain, test_last_message_deep_notify);
1190
1191   return s;
1192 }
1193
1194 GST_CHECK_MAIN (fakesink);