50f28964b517f703afe4f2f4516b5976b21d795d
[platform/upstream/gstreamer.git] / tests / check / elements / queue.c
1 /* GStreamer
2  *
3  * unit test for queue
4  *
5  * Copyright (C) <2006> Stefan Kost <ensonic@users.sf.net>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <gst/check/gstcheck.h>
27
28 #define UNDERRUN_LOCK() (g_mutex_lock (&underrun_mutex))
29 #define UNDERRUN_UNLOCK() (g_mutex_unlock (&underrun_mutex))
30 #define UNDERRUN_SIGNAL() (g_cond_signal (&underrun_cond))
31 #define UNDERRUN_WAIT() (g_cond_wait (&underrun_cond, &underrun_mutex))
32
33 static GstElement *queue;
34
35 /* For ease of programming we use globals to keep refs for our floating
36  * src and sink pads we create; otherwise we always have to do get_pad,
37  * get_peer, and then remove references in every test function */
38 static GstPad *mysrcpad;
39 static GstPad *mysinkpad;
40 static GstPad *qsrcpad;
41 static gulong probe_id;
42
43 static gint overrun_count;
44
45 static GMutex underrun_mutex;
46 static GCond underrun_cond;
47 static gint underrun_count;
48
49 static GMutex events_lock;
50 static GCond events_cond;
51 static gint events_count;
52 static GList *events;
53
54 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
55     GST_PAD_SINK,
56     GST_PAD_ALWAYS,
57     GST_STATIC_CAPS_ANY);
58 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
59     GST_PAD_SRC,
60     GST_PAD_ALWAYS,
61     GST_STATIC_CAPS_ANY);
62
63 static void
64 queue_overrun (GstElement * queue, gpointer user_data)
65 {
66   overrun_count++;
67   GST_DEBUG ("queue overrun %d", overrun_count);
68 }
69
70 static void
71 queue_underrun (GstElement * queue, gpointer user_data)
72 {
73   UNDERRUN_LOCK ();
74   underrun_count++;
75   GST_DEBUG ("queue underrun %d", underrun_count);
76   UNDERRUN_SIGNAL ();
77   UNDERRUN_UNLOCK ();
78 }
79
80 static gboolean
81 event_func (GstPad * pad, GstObject * parent, GstEvent * event)
82 {
83   GST_DEBUG ("%s event", GST_EVENT_TYPE_NAME (event));
84
85   g_mutex_lock (&events_lock);
86
87   events = g_list_append (events, event);
88   ++events_count;
89
90   g_cond_broadcast (&events_cond);
91   g_mutex_unlock (&events_lock);
92
93   return TRUE;
94 }
95
96 static void
97 block_src (void)
98 {
99   qsrcpad = gst_element_get_static_pad (queue, "src");
100   probe_id = gst_pad_add_probe (qsrcpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
101       NULL, NULL, NULL);
102 }
103
104 static void
105 unblock_src (void)
106 {
107   gst_pad_remove_probe (qsrcpad, probe_id);
108   gst_object_unref (qsrcpad);
109 }
110
111 static void
112 setup (void)
113 {
114   GST_DEBUG ("setup_queue");
115
116   queue = gst_check_setup_element ("queue");
117   g_signal_connect (queue, "underrun", G_CALLBACK (queue_underrun), NULL);
118
119   mysrcpad = gst_check_setup_src_pad (queue, &srctemplate);
120   gst_pad_set_active (mysrcpad, TRUE);
121
122   mysinkpad = NULL;
123
124   overrun_count = 0;
125
126   underrun_count = 0;
127
128
129   g_mutex_init (&events_lock);
130   g_cond_init (&events_cond);
131   events_count = 0;
132   events = NULL;
133 }
134
135 static void
136 cleanup (void)
137 {
138   GST_DEBUG ("cleanup_queue");
139
140   gst_check_drop_buffers ();
141
142   while (events != NULL) {
143     gst_event_unref (GST_EVENT (events->data));
144     events = g_list_delete_link (events, events);
145   }
146   events_count = 0;
147   g_mutex_clear (&events_lock);
148   g_cond_clear (&events_cond);
149
150   if (mysinkpad != NULL) {
151     gst_pad_set_active (mysinkpad, FALSE);
152     gst_check_teardown_sink_pad (queue);
153   }
154
155   gst_pad_set_active (mysrcpad, FALSE);
156   gst_check_teardown_src_pad (queue);
157
158   gst_check_teardown_element (queue);
159   queue = NULL;
160 }
161
162 /* setup the sinkpad on a playing queue element. gst_check_setup_sink_pad()
163  * does not work in this case since it does not activate the pad before linking
164  * it. */
165 static GstPad *
166 setup_sink_pad (GstElement * element, GstStaticPadTemplate * tmpl)
167 {
168   GstPad *srcpad;
169   GstPad *sinkpad;
170
171   sinkpad = gst_pad_new_from_static_template (tmpl, "sink");
172   fail_if (sinkpad == NULL);
173   srcpad = gst_element_get_static_pad (element, "src");
174   fail_if (srcpad == NULL);
175   gst_pad_set_chain_function (sinkpad, gst_check_chain_func);
176   gst_pad_set_event_function (sinkpad, event_func);
177   gst_pad_set_active (sinkpad, TRUE);
178   fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK);
179   gst_object_unref (srcpad);
180
181   return sinkpad;
182 }
183
184 /* set queue size to 2 buffers
185  * pull 1 buffer
186  * check over/underuns
187  */
188 GST_START_TEST (test_non_leaky_underrun)
189 {
190   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
191   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, NULL);
192   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
193   gst_pad_set_active (mysinkpad, TRUE);
194
195   GST_DEBUG ("starting");
196
197   UNDERRUN_LOCK ();
198   fail_unless (gst_element_set_state (queue,
199           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
200       "could not set to playing");
201   UNDERRUN_WAIT ();
202   UNDERRUN_UNLOCK ();
203
204   fail_unless (overrun_count == 0);
205   fail_unless (underrun_count == 1);
206
207   GST_DEBUG ("stopping");
208   fail_unless (gst_element_set_state (queue,
209           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
210 }
211
212 GST_END_TEST;
213
214 static void
215 queue_overrun_link_and_activate (GstElement * queue, gpointer user_data)
216 {
217   GST_DEBUG ("queue overrun");
218   overrun_count++;
219
220   /* link the src pad of the queue to make it dequeue buffers */
221   mysinkpad = setup_sink_pad (queue, &sinktemplate);
222
223   unblock_src ();
224 }
225
226 /* set queue size to 2 buffers
227  * push 2 buffers
228  * check over/underuns
229  * push 1 more buffer
230  * check over/underuns again
231  */
232 GST_START_TEST (test_non_leaky_overrun)
233 {
234   GstBuffer *buffer1;
235   GstBuffer *buffer2;
236   GstBuffer *buffer3;
237   GstBuffer *buffer;
238   GstSegment segment;
239
240   g_signal_connect (queue, "overrun",
241       G_CALLBACK (queue_overrun_link_and_activate), NULL);
242   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, NULL);
243
244   block_src ();
245
246   GST_DEBUG ("starting");
247
248   UNDERRUN_LOCK ();
249   fail_unless (gst_element_set_state (queue,
250           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
251       "could not set to playing");
252   UNDERRUN_WAIT ();
253   UNDERRUN_UNLOCK ();
254
255   gst_segment_init (&segment, GST_FORMAT_BYTES);
256   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
257   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
258
259   fail_unless (underrun_count == 1);
260   fail_unless (overrun_count == 0);
261
262   buffer1 = gst_buffer_new_and_alloc (4);
263   /* pushing gives away my reference */
264   gst_pad_push (mysrcpad, buffer1);
265
266   GST_DEBUG ("added 1st");
267   fail_unless (overrun_count == 0);
268   fail_unless (underrun_count == 1);
269
270   buffer2 = gst_buffer_new_and_alloc (4);
271   gst_pad_push (mysrcpad, buffer2);
272
273   GST_DEBUG ("added 2nd");
274   fail_unless (overrun_count == 0);
275   fail_unless (underrun_count == 1);
276
277   buffer3 = gst_buffer_new_and_alloc (4);
278   /* the next call to gst_pad_push will emit the overrun signal. The signal
279    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
280    * activates and links mysinkpad. The queue task then dequeues a buffer and
281    * gst_pad_push() will return. */
282   gst_pad_push (mysrcpad, buffer3);
283
284   GST_DEBUG ("added 3rd");
285   fail_unless (overrun_count == 1);
286
287   /* lock the check_mutex to block the first buffer pushed to mysinkpad */
288   g_mutex_lock (&check_mutex);
289   /* now let the queue push all buffers */
290   while (g_list_length (buffers) < 3) {
291     g_cond_wait (&check_cond, &check_mutex);
292   }
293   g_mutex_unlock (&check_mutex);
294
295   fail_unless (overrun_count == 1);
296   /* make sure we get the underrun signal before we check underrun_count */
297   UNDERRUN_LOCK ();
298   while (underrun_count < 2) {
299     UNDERRUN_WAIT ();
300   }
301   /* we can't check the underrun_count here safely because when adding the 3rd
302    * buffer, the queue lock is released to emit the overrun signal and the
303    * downstream part can then push and empty the queue and signal an additional
304    * underrun */
305   /* fail_unless_equals_int (underrun_count, 2); */
306   UNDERRUN_UNLOCK ();
307
308   buffer = g_list_nth (buffers, 0)->data;
309   fail_unless (buffer == buffer1);
310
311   buffer = g_list_nth (buffers, 1)->data;
312   fail_unless (buffer == buffer2);
313
314   GST_DEBUG ("stopping");
315   fail_unless (gst_element_set_state (queue,
316           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
317 }
318
319 GST_END_TEST;
320
321 /* set queue size to 2 buffers
322  * push 2 buffers
323  * check over/underuns
324  * push 1 more buffer
325  * check over/underuns again
326  * check which buffer was leaked
327  */
328 GST_START_TEST (test_leaky_upstream)
329 {
330   GstBuffer *buffer1;
331   GstBuffer *buffer2;
332   GstBuffer *buffer3;
333   GstBuffer *buffer;
334   GstSegment segment;
335
336   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
337   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, "leaky", 1, NULL);
338
339   GST_DEBUG ("starting");
340
341   block_src ();
342
343   UNDERRUN_LOCK ();
344   fail_unless (gst_element_set_state (queue,
345           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
346       "could not set to playing");
347   UNDERRUN_WAIT ();
348   UNDERRUN_UNLOCK ();
349
350   gst_segment_init (&segment, GST_FORMAT_BYTES);
351   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
352   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
353
354   fail_unless (overrun_count == 0);
355   fail_unless (underrun_count == 1);
356
357   buffer1 = gst_buffer_new_and_alloc (4);
358   /* pushing gives away my reference */
359   gst_pad_push (mysrcpad, buffer1);
360
361   GST_DEBUG ("added 1st");
362   fail_unless (overrun_count == 0);
363   fail_unless (underrun_count == 1);
364
365   buffer2 = gst_buffer_new_and_alloc (4);
366   gst_pad_push (mysrcpad, buffer2);
367
368   GST_DEBUG ("added 2nd");
369   fail_unless (overrun_count == 0);
370   fail_unless (underrun_count == 1);
371
372   buffer3 = gst_buffer_new_and_alloc (4);
373   /* buffer4 will be leaked, keep a ref so refcount can be checked below */
374   gst_buffer_ref (buffer3);
375   gst_pad_push (mysrcpad, buffer3);
376
377   GST_DEBUG ("added 3nd");
378   /* it still triggers overrun when leaking */
379   fail_unless (overrun_count == 1);
380   fail_unless (underrun_count == 1);
381
382   /* wait for underrun and check that we got buffer1 and buffer2 only */
383   UNDERRUN_LOCK ();
384   mysinkpad = setup_sink_pad (queue, &sinktemplate);
385   unblock_src ();
386   UNDERRUN_WAIT ();
387   UNDERRUN_UNLOCK ();
388
389   fail_unless (overrun_count == 1);
390   fail_unless (underrun_count == 2);
391
392   fail_unless (g_list_length (buffers) == 2);
393
394   buffer = g_list_nth (buffers, 0)->data;
395   fail_unless (buffer == buffer1);
396
397   buffer = g_list_nth (buffers, 1)->data;
398   fail_unless (buffer == buffer2);
399
400   ASSERT_BUFFER_REFCOUNT (buffer3, "buffer", 1);
401   gst_buffer_unref (buffer3);
402
403   GST_DEBUG ("stopping");
404   fail_unless (gst_element_set_state (queue,
405           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
406 }
407
408 GST_END_TEST;
409
410 /* set queue size to 2 buffers
411  * push 2 buffers
412  * check over/underuns
413  * push 1 more buffer
414  * check over/underuns again
415  * check which buffer was leaked
416  */
417 GST_START_TEST (test_leaky_downstream)
418 {
419   GstBuffer *buffer1;
420   GstBuffer *buffer2;
421   GstBuffer *buffer3;
422   GstBuffer *buffer;
423   GstSegment segment;
424
425   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
426   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, "leaky", 2, NULL);
427
428   GST_DEBUG ("starting");
429
430   block_src ();
431
432   UNDERRUN_LOCK ();
433   fail_unless (gst_element_set_state (queue,
434           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
435       "could not set to playing");
436   UNDERRUN_WAIT ();
437   UNDERRUN_UNLOCK ();
438
439   gst_segment_init (&segment, GST_FORMAT_BYTES);
440   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
441   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
442
443   fail_unless (overrun_count == 0);
444   fail_unless (underrun_count == 1);
445
446   buffer1 = gst_buffer_new_and_alloc (4);
447   /* pushing gives away one reference */
448   /* buffer1 will be leaked, keep a ref so refcount can be checked below */
449   gst_buffer_ref (buffer1);
450   gst_pad_push (mysrcpad, buffer1);
451
452   GST_DEBUG ("added 1st");
453   fail_unless (overrun_count == 0);
454   fail_unless (underrun_count == 1);
455
456   buffer2 = gst_buffer_new_and_alloc (4);
457   gst_pad_push (mysrcpad, buffer2);
458
459   GST_DEBUG ("added 2nd");
460   fail_unless (overrun_count == 0);
461   fail_unless (underrun_count == 1);
462
463   buffer3 = gst_buffer_new_and_alloc (4);
464   gst_pad_push (mysrcpad, buffer3);
465
466   GST_DEBUG ("added 3rd");
467   /* it still triggers overrun when leaking */
468   fail_unless (overrun_count == 1);
469   fail_unless (underrun_count == 1);
470
471   /* wait for underrun and check that we got buffer1 and buffer2 only */
472   UNDERRUN_LOCK ();
473   mysinkpad = setup_sink_pad (queue, &sinktemplate);
474   unblock_src ();
475   UNDERRUN_WAIT ();
476   UNDERRUN_UNLOCK ();
477
478   fail_unless (overrun_count == 1);
479   fail_unless (underrun_count == 2);
480
481   fail_unless (g_list_length (buffers) == 2);
482
483   ASSERT_BUFFER_REFCOUNT (buffer1, "buffer", 1);
484   gst_buffer_unref (buffer1);
485
486   buffer = g_list_nth (buffers, 0)->data;
487   fail_unless (buffer == buffer2);
488
489   buffer = g_list_nth (buffers, 1)->data;
490   fail_unless (buffer == buffer3);
491
492   GST_DEBUG ("stopping");
493   fail_unless (gst_element_set_state (queue,
494           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
495 }
496
497 GST_END_TEST;
498
499 /* set queue size to 6 buffers and 7 seconds
500  * push 7 buffers with and without duration
501  * check current-level-time
502  */
503 GST_START_TEST (test_time_level)
504 {
505   GstBuffer *buffer = NULL;
506   GstClockTime time;
507   GstSegment segment;
508
509   g_signal_connect (queue, "overrun",
510       G_CALLBACK (queue_overrun_link_and_activate), NULL);
511   g_object_set (G_OBJECT (queue), "max-size-buffers", 6, NULL);
512   g_object_set (G_OBJECT (queue), "max-size-time", 7 * GST_SECOND, NULL);
513
514   GST_DEBUG ("starting");
515
516   block_src ();
517
518   UNDERRUN_LOCK ();
519   fail_unless (gst_element_set_state (queue,
520           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
521       "could not set to playing");
522   UNDERRUN_WAIT ();
523   UNDERRUN_UNLOCK ();
524
525   gst_segment_init (&segment, GST_FORMAT_BYTES);
526   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
527   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
528
529   /* push buffer without duration */
530   buffer = gst_buffer_new_and_alloc (4);
531   GST_BUFFER_TIMESTAMP (buffer) = GST_SECOND;
532   /* pushing gives away my reference */
533   gst_pad_push (mysrcpad, buffer);
534
535   /* level should be 1 seconds because buffer has no duration and starts at 1
536    * SECOND (sparse stream). */
537   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
538   fail_if (time != GST_SECOND);
539
540   /* second push should set the level to 2 second */
541   buffer = gst_buffer_new_and_alloc (4);
542   GST_BUFFER_TIMESTAMP (buffer) = 2 * GST_SECOND;
543   gst_pad_push (mysrcpad, buffer);
544
545   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
546   fail_if (time != 2 * GST_SECOND);
547
548   /* third push should set the level to 4 seconds, the 1 second diff with the
549    * previous buffer (without duration) and the 1 second duration of this
550    * buffer. */
551   buffer = gst_buffer_new_and_alloc (4);
552   GST_BUFFER_TIMESTAMP (buffer) = 3 * GST_SECOND;
553   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
554   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
555   gst_pad_push (mysrcpad, buffer);
556
557   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
558   fail_if (time != 4 * GST_SECOND);
559
560   /* fourth push should set the level to 6 seconds, the 2 second diff with the
561    * previous buffer, same duration. */
562   buffer = gst_buffer_new_and_alloc (4);
563   GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
564   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
565   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
566   gst_pad_push (mysrcpad, buffer);
567
568   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
569   fail_if (time != 6 * GST_SECOND);
570
571   /* fifth push should not adjust the level, the timestamp and duration are the
572    * same, meaning the previous buffer did not really have a duration. */
573   buffer = gst_buffer_new_and_alloc (4);
574   GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
575   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
576   gst_pad_push (mysrcpad, buffer);
577
578   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
579   fail_if (time != 6 * GST_SECOND);
580
581   /* sixth push should adjust the level with 1 second, we now know the
582    * previous buffer actually had a duration of 2 SECONDS */
583   buffer = gst_buffer_new_and_alloc (4);
584   GST_BUFFER_TIMESTAMP (buffer) = 7 * GST_SECOND;
585   gst_pad_push (mysrcpad, buffer);
586
587   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
588   fail_if (time != 7 * GST_SECOND);
589
590   /* eighth push should cause overrun */
591   fail_unless (overrun_count == 0);
592   buffer = gst_buffer_new_and_alloc (4);
593   GST_BUFFER_TIMESTAMP (buffer) = 8 * GST_SECOND;
594   /* the next call to gst_pad_push will emit the overrun signal. The signal
595    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
596    * activates and links mysinkpad. The queue task then dequeues a buffer and
597    * gst_pad_push() will return. */
598   gst_pad_push (mysrcpad, buffer);
599
600   fail_unless (overrun_count == 1);
601
602   GST_DEBUG ("stopping");
603   fail_unless (gst_element_set_state (queue,
604           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
605 }
606
607 GST_END_TEST;
608
609 GST_START_TEST (test_time_level_task_not_started)
610 {
611   GstEvent *event;
612   GstClockTime time;
613   GstSegment segment;
614
615   GST_DEBUG ("starting");
616
617   block_src ();
618
619   UNDERRUN_LOCK ();
620   fail_unless (gst_element_set_state (queue,
621           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
622       "could not set to playing");
623   UNDERRUN_WAIT ();
624   UNDERRUN_UNLOCK ();
625
626   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
627
628   gst_segment_init (&segment, GST_FORMAT_TIME);
629   segment.start = 1 * GST_SECOND;
630   segment.stop = 5 * GST_SECOND;
631   segment.time = 0;
632   segment.position = 1 * GST_SECOND;
633
634   event = gst_event_new_segment (&segment);
635   gst_pad_push_event (mysrcpad, event);
636
637   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
638   fail_if (time != 0 * GST_SECOND);
639
640   segment.base = 4 * GST_SECOND;
641   event = gst_event_new_segment (&segment);
642   gst_pad_push_event (mysrcpad, event);
643
644   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
645   GST_DEBUG ("time now %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
646   fail_if (time != 4 * GST_SECOND);
647
648   unblock_src ();
649
650   GST_DEBUG ("stopping");
651   fail_unless (gst_element_set_state (queue,
652           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
653 }
654
655 GST_END_TEST;
656
657 GST_START_TEST (test_sticky_not_linked)
658 {
659   GstEvent *event;
660   GstSegment segment;
661   gboolean ret;
662   GstFlowReturn flow_ret;
663
664   GST_DEBUG ("starting");
665
666   g_object_set (queue, "max-size-buffers", 1, NULL);
667
668   UNDERRUN_LOCK ();
669   fail_unless (gst_element_set_state (queue,
670           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
671       "could not set to playing");
672   UNDERRUN_WAIT ();
673   UNDERRUN_UNLOCK ();
674
675   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
676
677   gst_segment_init (&segment, GST_FORMAT_TIME);
678   segment.start = 1 * GST_SECOND;
679   segment.stop = 5 * GST_SECOND;
680   segment.time = 0;
681   segment.position = 1 * GST_SECOND;
682
683   event = gst_event_new_segment (&segment);
684   ret = gst_pad_push_event (mysrcpad, event);
685   fail_unless (ret == TRUE);
686
687   /* the first few buffers can return OK as they are queued and gst_queue_loop
688    * is woken up, tries to push and sets ->srcresult to NOT_LINKED
689    */
690   flow_ret = GST_FLOW_OK;
691   while (flow_ret != GST_FLOW_NOT_LINKED)
692     flow_ret = gst_pad_push (mysrcpad, gst_buffer_new ());
693
694   /* send a new sticky event so that it will be pushed on the next gst_pad_push
695    */
696   event = gst_event_new_segment (&segment);
697   ret = gst_pad_push_event (mysrcpad, event);
698   fail_unless (ret == TRUE);
699
700   /* make sure that gst_queue_sink_event doesn't return FALSE if the queue is
701    * unlinked, as that would make gst_pad_push return ERROR
702    */
703   flow_ret = gst_pad_push (mysrcpad, gst_buffer_new ());
704   fail_unless_equals_int (flow_ret, GST_FLOW_NOT_LINKED);
705
706   GST_DEBUG ("stopping");
707   fail_unless (gst_element_set_state (queue,
708           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
709 }
710
711 GST_END_TEST;
712
713 #if 0
714 static gboolean
715 event_equals_newsegment (GstEvent * event, gboolean update, gdouble rate,
716     GstFormat format, gint64 start, gint64 stop, gint64 position)
717 {
718   gboolean ns_update;
719   gdouble ns_rate, ns_arate;
720   GstFormat ns_format;
721   gint64 ns_start;
722   gint64 ns_stop;
723   gint64 ns_position;
724
725   if (GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT) {
726     return FALSE;
727   }
728
729   gst_event_parse_new_segment (event, &ns_update, &ns_rate, &ns_arate,
730       &ns_format, &ns_start, &ns_stop, &ns_position);
731
732   GST_DEBUG ("update %d, rate %lf, format %s, start %" GST_TIME_FORMAT
733       ", stop %" GST_TIME_FORMAT ", position %" GST_TIME_FORMAT, ns_update,
734       ns_rate, gst_format_get_name (ns_format), GST_TIME_ARGS (ns_start),
735       GST_TIME_ARGS (ns_stop), GST_TIME_ARGS (ns_position));
736
737   return (ns_update == update && ns_rate == rate && ns_format == format &&
738       ns_start == start && ns_stop == stop && ns_position == position);
739 }
740
741 GST_START_TEST (test_newsegment)
742 {
743   GstEvent *event;
744   GstBuffer *buffer1;
745   GstBuffer *buffer2;
746   GstBuffer *buffer;
747
748   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
749   g_object_set (G_OBJECT (queue), "max-size-buffers", 1, "max-size-time",
750       (guint64) 0, "leaky", 2, NULL);
751
752   GST_DEBUG ("starting");
753
754   fail_unless (gst_element_set_state (queue,
755           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
756       "could not set to playing");
757   fail_unless (overrun_count == 0);
758   fail_unless (underrun_count == 0);
759
760   event = gst_event_new_new_segment (FALSE, 2.0, 1.0, GST_FORMAT_TIME, 0,
761       2 * GST_SECOND, 0);
762   gst_pad_push_event (mysrcpad, event);
763
764   GST_DEBUG ("added 1st newsegment");
765   fail_unless (overrun_count == 0);
766   fail_unless (underrun_count == 0);
767
768   event = gst_event_new_new_segment (FALSE, 1.0, 1.0, GST_FORMAT_TIME, 0,
769       3 * GST_SECOND, 0);
770   gst_pad_push_event (mysrcpad, event);
771
772   GST_DEBUG ("added 2nd newsegment");
773   fail_unless (overrun_count == 0);
774   fail_unless (underrun_count == 0);
775
776   event = gst_event_new_new_segment (FALSE, 1.0, 1.0, GST_FORMAT_TIME,
777       4 * GST_SECOND, 5 * GST_SECOND, 4 * GST_SECOND);
778   gst_pad_push_event (mysrcpad, event);
779
780   GST_DEBUG ("added 3rd newsegment");
781   fail_unless (overrun_count == 0);
782   fail_unless (underrun_count == 0);
783
784   buffer1 = gst_buffer_new_and_alloc (4);
785   /* buffer1 will be leaked, keep a ref so refcount can be checked below */
786   gst_buffer_ref (buffer1);
787   /* pushing gives away one reference */
788   gst_pad_push (mysrcpad, buffer1);
789
790   GST_DEBUG ("added 1st buffer");
791   fail_unless (overrun_count == 0);
792   fail_unless (underrun_count == 0);
793
794   buffer2 = gst_buffer_new_and_alloc (4);
795   /* next push will cause overrun and leak all newsegment events and buffer1 */
796   gst_pad_push (mysrcpad, buffer2);
797
798   GST_DEBUG ("added 2nd buffer");
799   /* it still triggers overrun when leaking */
800   fail_unless (overrun_count == 1);
801   fail_unless (underrun_count == 0);
802
803   /* wait for underrun and check that we got one accumulated newsegment event,
804    * one real newsegment event and buffer2 only */
805   UNDERRUN_LOCK ();
806   mysinkpad = setup_sink_pad (queue, &sinktemplate);
807   UNDERRUN_WAIT ();
808   UNDERRUN_UNLOCK ();
809
810   fail_unless (overrun_count == 1);
811   fail_unless (underrun_count == 1);
812
813   fail_unless (g_list_length (events) == 2);
814
815   event = g_list_nth (events, 0)->data;
816   fail_unless (event_equals_newsegment (event, FALSE, 1.0, GST_FORMAT_TIME, 0,
817           4 * GST_SECOND, 0));
818
819   event = g_list_nth (events, 1)->data;
820   fail_unless (event_equals_newsegment (event, FALSE, 1.0, GST_FORMAT_TIME,
821           4 * GST_SECOND, 5 * GST_SECOND, 4 * GST_SECOND));
822
823   fail_unless (g_list_length (buffers) == 1);
824
825   ASSERT_BUFFER_REFCOUNT (buffer1, "buffer", 1);
826   gst_buffer_unref (buffer1);
827
828   buffer = g_list_nth (buffers, 0)->data;
829   fail_unless (buffer == buffer2);
830
831   GST_DEBUG ("stopping");
832   fail_unless (gst_element_set_state (queue,
833           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
834 }
835
836 GST_END_TEST;
837 #endif
838
839 static gpointer
840 thread_func (gpointer data)
841 {
842   int i = 0;
843   for (i = 0; i < 100; i++) {
844     GstCaps *caps;
845     GstQuery *query;
846     gboolean ok;
847     caps = gst_caps_new_any ();
848     query = gst_query_new_allocation (caps, FALSE);
849     ok = gst_pad_peer_query (mysrcpad, query);
850     gst_query_unref (query);
851     gst_caps_unref (caps);
852     query = NULL;
853     caps = NULL;
854
855     if (!ok)
856       break;
857   }
858
859   return NULL;
860 }
861
862 static gboolean query_func (GstPad * pad, GstObject * parent, GstQuery * query);
863
864 static gboolean
865 query_func (GstPad * pad, GstObject * parent, GstQuery * query)
866 {
867
868   g_usleep (1000);
869   return TRUE;
870 }
871
872 GST_START_TEST (test_queries_while_flushing)
873 {
874   GstEvent *event;
875   GThread *thread;
876   int i;
877
878   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
879   gst_pad_set_query_function (mysinkpad, query_func);
880   gst_pad_set_active (mysinkpad, TRUE);
881
882   /* hard to reproduce, so just run it a few times in a row */
883   for (i = 0; i < 500; ++i) {
884     GST_DEBUG ("starting");
885     UNDERRUN_LOCK ();
886     fail_unless (gst_element_set_state (queue,
887             GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
888         "could not set to playing");
889     UNDERRUN_WAIT ();
890     UNDERRUN_UNLOCK ();
891
892     thread = g_thread_new ("deactivating thread", thread_func, NULL);
893     g_usleep (1000);
894
895     event = gst_event_new_flush_start ();
896     gst_pad_push_event (mysrcpad, event);
897
898     g_thread_join (thread);
899
900     GST_DEBUG ("stopping");
901     fail_unless (gst_element_set_state (queue,
902             GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS,
903         "could not set to null");
904   }
905 }
906
907 GST_END_TEST;
908
909 static gpointer
910 push_event_thread_func (gpointer data)
911 {
912   GstEvent *event;
913
914   event = GST_EVENT (data);
915
916   GST_DEBUG ("pushing event %p on pad %p", event, mysrcpad);
917   gst_pad_push_event (mysrcpad, event);
918
919   return NULL;
920 }
921
922 GST_START_TEST (test_state_change_when_flushing)
923 {
924   GstEvent *event;
925   GThread *thread;
926
927   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
928   gst_pad_set_active (mysinkpad, TRUE);
929
930   fail_unless (gst_element_set_state (queue, GST_STATE_PAUSED) ==
931       GST_STATE_CHANGE_SUCCESS);
932
933   event = gst_event_new_flush_start ();
934   gst_pad_push_event (mysrcpad, event);
935
936   event = gst_event_new_flush_stop (TRUE);
937   thread = g_thread_new ("send event", push_event_thread_func, event);
938
939   GST_DEBUG ("changing state to READY");
940   fail_unless (gst_element_set_state (queue, GST_STATE_READY) ==
941       GST_STATE_CHANGE_SUCCESS);
942   GST_DEBUG ("state changed");
943
944   g_thread_join (thread);
945
946   fail_unless (gst_element_set_state (queue, GST_STATE_NULL) ==
947       GST_STATE_CHANGE_SUCCESS);
948 }
949
950 GST_END_TEST;
951
952 GST_START_TEST (test_time_level_buffer_list)
953 {
954   GstBuffer *buffer = NULL;
955   GstBufferList *buffer_list = NULL;
956   GstClockTime time;
957   guint buffers;
958   GstSegment segment;
959
960   g_signal_connect (queue, "overrun",
961       G_CALLBACK (queue_overrun_link_and_activate), NULL);
962   g_object_set (G_OBJECT (queue), "max-size-buffers", 11, NULL);
963   g_object_set (G_OBJECT (queue), "max-size-time",
964       G_GUINT64_CONSTANT (7000) * GST_MSECOND, NULL);
965
966   GST_DEBUG ("starting");
967
968   block_src ();
969
970   UNDERRUN_LOCK ();
971   fail_unless (gst_element_set_state (queue,
972           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
973       "could not set to playing");
974   UNDERRUN_WAIT ();
975   UNDERRUN_UNLOCK ();
976
977   gst_segment_init (&segment, GST_FORMAT_BYTES);
978   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
979   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
980
981   /* push buffer without duration */
982   buffer = gst_buffer_new_and_alloc (4);
983   GST_BUFFER_TIMESTAMP (buffer) = 1000 * GST_MSECOND;
984   /* pushing gives away my reference */
985   gst_pad_push (mysrcpad, buffer);
986
987   /* level should be 1 seconds because buffer has no duration and starts at 1
988    * SECOND (sparse stream). */
989   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
990   fail_unless_equals_uint64 (time, 1000 * GST_MSECOND);
991   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
992   fail_unless_equals_int (buffers, 1);
993
994   /* second push should set the level to 2 second */
995   buffer_list = gst_buffer_list_new ();
996   buffer = gst_buffer_new_and_alloc (4);
997   GST_BUFFER_TIMESTAMP (buffer) = 1500 * GST_MSECOND;
998   gst_buffer_list_add (buffer_list, buffer);
999   buffer = gst_buffer_new_and_alloc (4);
1000   GST_BUFFER_TIMESTAMP (buffer) = 2000 * GST_MSECOND;
1001   gst_buffer_list_add (buffer_list, buffer);
1002   gst_pad_push_list (mysrcpad, buffer_list);
1003
1004   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1005   fail_unless_equals_uint64 (time, 2000 * GST_MSECOND);
1006   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1007   fail_unless_equals_int (buffers, 3);
1008
1009   /* third push should set the level to 4 seconds, the 1 second diff with the
1010    * previous buffer (without duration) and the 1 second duration of this
1011    * buffer. */
1012   buffer_list = gst_buffer_list_new ();
1013   buffer = gst_buffer_new_and_alloc (4);
1014   GST_BUFFER_TIMESTAMP (buffer) = 3000 * GST_MSECOND;
1015   GST_BUFFER_DURATION (buffer) = 500 * GST_MSECOND;
1016   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1017   gst_buffer_list_add (buffer_list, buffer);
1018   buffer = gst_buffer_new_and_alloc (4);
1019   GST_BUFFER_TIMESTAMP (buffer) = 3500 * GST_MSECOND;
1020   GST_BUFFER_DURATION (buffer) = 500 * GST_MSECOND;
1021   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1022   gst_buffer_list_add (buffer_list, buffer);
1023   gst_pad_push_list (mysrcpad, buffer_list);
1024
1025   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1026   fail_unless_equals_uint64 (time, 4000 * GST_MSECOND);
1027   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1028   fail_unless_equals_int (buffers, 5);
1029
1030   /* fourth push should set the level to 6 seconds, the 2 second diff with the
1031    * previous buffer, same duration. */
1032   buffer_list = gst_buffer_list_new ();
1033   buffer = gst_buffer_new_and_alloc (4);
1034   GST_BUFFER_TIMESTAMP (buffer) = 5000 * GST_MSECOND;
1035   GST_BUFFER_DURATION (buffer) = 1000 * GST_MSECOND;
1036   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1037   gst_buffer_list_add (buffer_list, buffer);
1038   gst_pad_push_list (mysrcpad, buffer_list);
1039
1040   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1041   fail_unless_equals_uint64 (time, 6000 * GST_MSECOND);
1042   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1043   fail_unless_equals_int (buffers, 6);
1044
1045   /* fifth push should not adjust the level, the timestamp and duration are the
1046    * same, meaning the previous buffer did not really have a duration. */
1047   buffer_list = gst_buffer_list_new ();
1048   buffer = gst_buffer_new_and_alloc (4);
1049   GST_BUFFER_TIMESTAMP (buffer) = 5000 * GST_MSECOND;
1050   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1051   gst_buffer_list_add (buffer_list, buffer);
1052   buffer = gst_buffer_new_and_alloc (4);
1053   GST_BUFFER_TIMESTAMP (buffer) = 5250 * GST_MSECOND;
1054   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1055   gst_buffer_list_add (buffer_list, buffer);
1056   buffer = gst_buffer_new_and_alloc (4);
1057   GST_BUFFER_TIMESTAMP (buffer) = 5500 * GST_MSECOND;
1058   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1059   gst_buffer_list_add (buffer_list, buffer);
1060   buffer = gst_buffer_new_and_alloc (4);
1061   GST_BUFFER_TIMESTAMP (buffer) = 5750 * GST_MSECOND;
1062   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1063   gst_buffer_list_add (buffer_list, buffer);
1064   gst_pad_push_list (mysrcpad, buffer_list);
1065
1066   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1067   fail_unless_equals_uint64 (time, 6000 * GST_MSECOND);
1068   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1069   fail_unless_equals_int (buffers, 10);
1070
1071   /* sixth push should adjust the level with 1 second, we now know the
1072    * previous buffer actually had a duration of 2 SECONDS */
1073   buffer = gst_buffer_new_and_alloc (4);
1074   GST_BUFFER_TIMESTAMP (buffer) = 7000 * GST_MSECOND;
1075   gst_pad_push (mysrcpad, buffer);
1076
1077   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1078   fail_unless_equals_uint64 (time, 7000 * GST_MSECOND);
1079   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1080   fail_unless_equals_int (buffers, 11);
1081
1082   /* eighth push should cause overrun */
1083   fail_unless (overrun_count == 0);
1084   buffer_list = gst_buffer_list_new ();
1085   buffer = gst_buffer_new_and_alloc (4);
1086   GST_BUFFER_TIMESTAMP (buffer) = 8000 * GST_MSECOND;
1087   /* the next call to gst_pad_push will emit the overrun signal. The signal
1088    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
1089    * activates and links mysinkpad. The queue task then dequeues a buffer and
1090    * gst_pad_push() will return. */
1091   gst_buffer_list_add (buffer_list, buffer);
1092   gst_pad_push_list (mysrcpad, buffer_list);
1093
1094   fail_unless (overrun_count == 1);
1095
1096   GST_DEBUG ("stopping");
1097   fail_unless (gst_element_set_state (queue,
1098           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
1099 }
1100
1101 GST_END_TEST;
1102
1103 GST_START_TEST (test_initial_events_nodelay)
1104 {
1105   GstSegment segment;
1106   GstEvent *event;
1107   GstCaps *caps;
1108   gboolean ret;
1109
1110   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
1111   gst_pad_set_event_function (mysinkpad, event_func);
1112   gst_pad_set_active (mysinkpad, TRUE);
1113
1114   GST_DEBUG ("starting");
1115
1116   fail_unless (gst_element_set_state (queue,
1117           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
1118       "could not set to playing");
1119
1120   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
1121
1122   caps = gst_caps_new_empty_simple ("foo/x-bar");
1123   ret = gst_pad_push_event (mysrcpad, gst_event_new_caps (caps));
1124   gst_caps_unref (caps);
1125   fail_unless (ret == TRUE);
1126
1127   gst_segment_init (&segment, GST_FORMAT_TIME);
1128   ret = gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
1129   fail_unless (ret == TRUE);
1130
1131   g_mutex_lock (&events_lock);
1132   while (events_count < 3) {
1133     g_cond_wait (&events_cond, &events_lock);
1134   }
1135   g_mutex_unlock (&events_lock);
1136
1137   fail_unless_equals_int (g_list_length (events), 3);
1138   event = g_list_nth_data (events, 0);
1139   fail_unless_equals_int (GST_EVENT_TYPE (event), GST_EVENT_STREAM_START);
1140   event = g_list_nth_data (events, 1);
1141   fail_unless_equals_int (GST_EVENT_TYPE (event), GST_EVENT_CAPS);
1142   event = g_list_nth_data (events, 2);
1143   fail_unless_equals_int (GST_EVENT_TYPE (event), GST_EVENT_SEGMENT);
1144
1145   gst_element_set_state (queue, GST_STATE_NULL);
1146 }
1147
1148 GST_END_TEST;
1149
1150 static Suite *
1151 queue_suite (void)
1152 {
1153   Suite *s = suite_create ("queue");
1154   TCase *tc_chain = tcase_create ("general");
1155
1156   suite_add_tcase (s, tc_chain);
1157   tcase_add_checked_fixture (tc_chain, setup, cleanup);
1158   tcase_add_test (tc_chain, test_non_leaky_underrun);
1159   tcase_add_test (tc_chain, test_non_leaky_overrun);
1160   tcase_add_test (tc_chain, test_leaky_upstream);
1161   tcase_add_test (tc_chain, test_leaky_downstream);
1162   tcase_add_test (tc_chain, test_time_level);
1163   tcase_add_test (tc_chain, test_time_level_task_not_started);
1164   tcase_add_test (tc_chain, test_queries_while_flushing);
1165   tcase_add_test (tc_chain, test_state_change_when_flushing);
1166 #if 0
1167   tcase_add_test (tc_chain, test_newsegment);
1168 #endif
1169   tcase_add_test (tc_chain, test_sticky_not_linked);
1170   tcase_add_test (tc_chain, test_time_level_buffer_list);
1171   tcase_add_test (tc_chain, test_initial_events_nodelay);
1172
1173   return s;
1174 }
1175
1176 GST_CHECK_MAIN (queue);