Move dataurisrc element from -bad
[platform/upstream/gstreamer.git] / tests / check / elements / queue.c
1 /* GStreamer
2  *
3  * unit test for queue
4  *
5  * Copyright (C) <2006> Stefan Kost <ensonic@users.sf.net>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #include <unistd.h>
24
25 #include <gst/check/gstcheck.h>
26
27 #define UNDERRUN_LOCK() (g_mutex_lock (&underrun_mutex))
28 #define UNDERRUN_UNLOCK() (g_mutex_unlock (&underrun_mutex))
29 #define UNDERRUN_SIGNAL() (g_cond_signal (&underrun_cond))
30 #define UNDERRUN_WAIT() (g_cond_wait (&underrun_cond, &underrun_mutex))
31
32 static GstElement *queue;
33
34 /* For ease of programming we use globals to keep refs for our floating
35  * src and sink pads we create; otherwise we always have to do get_pad,
36  * get_peer, and then remove references in every test function */
37 static GstPad *mysrcpad;
38 static GstPad *mysinkpad;
39 static GstPad *qsrcpad;
40 static gulong probe_id;
41
42 static gint overrun_count;
43
44 static GMutex underrun_mutex;
45 static GCond underrun_cond;
46 static gint underrun_count;
47
48 static GMutex events_lock;
49 static GCond events_cond;
50 static gint events_count;
51 static GList *events;
52
53 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
54     GST_PAD_SINK,
55     GST_PAD_ALWAYS,
56     GST_STATIC_CAPS_ANY);
57 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
58     GST_PAD_SRC,
59     GST_PAD_ALWAYS,
60     GST_STATIC_CAPS_ANY);
61
62 static void
63 queue_overrun (GstElement * queue, gpointer user_data)
64 {
65   overrun_count++;
66   GST_DEBUG ("queue overrun %d", overrun_count);
67 }
68
69 static void
70 queue_underrun (GstElement * queue, gpointer user_data)
71 {
72   UNDERRUN_LOCK ();
73   underrun_count++;
74   GST_DEBUG ("queue underrun %d", underrun_count);
75   UNDERRUN_SIGNAL ();
76   UNDERRUN_UNLOCK ();
77 }
78
79 static gboolean
80 event_func (GstPad * pad, GstObject * parent, GstEvent * event)
81 {
82   GST_DEBUG ("%s event", GST_EVENT_TYPE_NAME (event));
83
84   g_mutex_lock (&events_lock);
85
86   events = g_list_append (events, event);
87   ++events_count;
88
89   g_cond_broadcast (&events_cond);
90   g_mutex_unlock (&events_lock);
91
92   return TRUE;
93 }
94
95 static void
96 block_src (void)
97 {
98   qsrcpad = gst_element_get_static_pad (queue, "src");
99   probe_id = gst_pad_add_probe (qsrcpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
100       NULL, NULL, NULL);
101 }
102
103 static void
104 unblock_src (void)
105 {
106   gst_pad_remove_probe (qsrcpad, probe_id);
107   gst_object_unref (qsrcpad);
108 }
109
110 static void
111 setup (void)
112 {
113   GST_DEBUG ("setup_queue");
114
115   queue = gst_check_setup_element ("queue");
116   g_signal_connect (queue, "underrun", G_CALLBACK (queue_underrun), NULL);
117
118   mysrcpad = gst_check_setup_src_pad (queue, &srctemplate);
119   gst_pad_set_active (mysrcpad, TRUE);
120
121   mysinkpad = NULL;
122
123   overrun_count = 0;
124
125   underrun_count = 0;
126
127
128   g_mutex_init (&events_lock);
129   g_cond_init (&events_cond);
130   events_count = 0;
131   events = NULL;
132 }
133
134 static void
135 cleanup (void)
136 {
137   GST_DEBUG ("cleanup_queue");
138
139   gst_check_drop_buffers ();
140
141   while (events != NULL) {
142     gst_event_unref (GST_EVENT (events->data));
143     events = g_list_delete_link (events, events);
144   }
145   events_count = 0;
146   g_mutex_clear (&events_lock);
147   g_cond_clear (&events_cond);
148
149   if (mysinkpad != NULL) {
150     gst_pad_set_active (mysinkpad, FALSE);
151     gst_check_teardown_sink_pad (queue);
152   }
153
154   gst_pad_set_active (mysrcpad, FALSE);
155   gst_check_teardown_src_pad (queue);
156
157   gst_check_teardown_element (queue);
158   queue = NULL;
159 }
160
161 /* setup the sinkpad on a playing queue element. gst_check_setup_sink_pad()
162  * does not work in this case since it does not activate the pad before linking
163  * it. */
164 static GstPad *
165 setup_sink_pad (GstElement * element, GstStaticPadTemplate * tmpl)
166 {
167   GstPad *srcpad;
168   GstPad *sinkpad;
169
170   sinkpad = gst_pad_new_from_static_template (tmpl, "sink");
171   fail_if (sinkpad == NULL);
172   srcpad = gst_element_get_static_pad (element, "src");
173   fail_if (srcpad == NULL);
174   gst_pad_set_chain_function (sinkpad, gst_check_chain_func);
175   gst_pad_set_event_function (sinkpad, event_func);
176   gst_pad_set_active (sinkpad, TRUE);
177   fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK);
178   gst_object_unref (srcpad);
179
180   return sinkpad;
181 }
182
183 /* set queue size to 2 buffers
184  * pull 1 buffer
185  * check over/underuns
186  */
187 GST_START_TEST (test_non_leaky_underrun)
188 {
189   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
190   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, NULL);
191   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
192   gst_pad_set_active (mysinkpad, TRUE);
193
194   GST_DEBUG ("starting");
195
196   UNDERRUN_LOCK ();
197   fail_unless (gst_element_set_state (queue,
198           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
199       "could not set to playing");
200   UNDERRUN_WAIT ();
201   UNDERRUN_UNLOCK ();
202
203   fail_unless (overrun_count == 0);
204   fail_unless (underrun_count == 1);
205
206   GST_DEBUG ("stopping");
207   fail_unless (gst_element_set_state (queue,
208           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
209 }
210
211 GST_END_TEST;
212
213 static void
214 queue_overrun_link_and_activate (GstElement * queue, gpointer user_data)
215 {
216   GST_DEBUG ("queue overrun");
217   overrun_count++;
218
219   /* link the src pad of the queue to make it dequeue buffers */
220   mysinkpad = setup_sink_pad (queue, &sinktemplate);
221
222   unblock_src ();
223 }
224
225 /* set queue size to 2 buffers
226  * push 2 buffers
227  * check over/underuns
228  * push 1 more buffer
229  * check over/underuns again
230  */
231 GST_START_TEST (test_non_leaky_overrun)
232 {
233   GstBuffer *buffer1;
234   GstBuffer *buffer2;
235   GstBuffer *buffer3;
236   GstBuffer *buffer;
237   GstSegment segment;
238
239   g_signal_connect (queue, "overrun",
240       G_CALLBACK (queue_overrun_link_and_activate), NULL);
241   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, NULL);
242
243   block_src ();
244
245   GST_DEBUG ("starting");
246
247   UNDERRUN_LOCK ();
248   fail_unless (gst_element_set_state (queue,
249           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
250       "could not set to playing");
251   UNDERRUN_WAIT ();
252   UNDERRUN_UNLOCK ();
253
254   gst_segment_init (&segment, GST_FORMAT_BYTES);
255   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
256   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
257
258   fail_unless (underrun_count == 1);
259   fail_unless (overrun_count == 0);
260
261   buffer1 = gst_buffer_new_and_alloc (4);
262   /* pushing gives away my reference */
263   gst_pad_push (mysrcpad, buffer1);
264
265   GST_DEBUG ("added 1st");
266   fail_unless (overrun_count == 0);
267   fail_unless (underrun_count == 1);
268
269   buffer2 = gst_buffer_new_and_alloc (4);
270   gst_pad_push (mysrcpad, buffer2);
271
272   GST_DEBUG ("added 2nd");
273   fail_unless (overrun_count == 0);
274   fail_unless (underrun_count == 1);
275
276   buffer3 = gst_buffer_new_and_alloc (4);
277   /* the next call to gst_pad_push will emit the overrun signal. The signal
278    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
279    * activates and links mysinkpad. The queue task then dequeues a buffer and
280    * gst_pad_push() will return. */
281   gst_pad_push (mysrcpad, buffer3);
282
283   GST_DEBUG ("added 3rd");
284   fail_unless (overrun_count == 1);
285
286   /* lock the check_mutex to block the first buffer pushed to mysinkpad */
287   g_mutex_lock (&check_mutex);
288   /* now let the queue push all buffers */
289   while (g_list_length (buffers) < 3) {
290     g_cond_wait (&check_cond, &check_mutex);
291   }
292   g_mutex_unlock (&check_mutex);
293
294   fail_unless (overrun_count == 1);
295   /* make sure we get the underrun signal before we check underrun_count */
296   UNDERRUN_LOCK ();
297   while (underrun_count < 2) {
298     UNDERRUN_WAIT ();
299   }
300   /* we can't check the underrun_count here safely because when adding the 3rd
301    * buffer, the queue lock is released to emit the overrun signal and the
302    * downstream part can then push and empty the queue and signal an additional
303    * underrun */
304   /* fail_unless_equals_int (underrun_count, 2); */
305   UNDERRUN_UNLOCK ();
306
307   buffer = g_list_nth (buffers, 0)->data;
308   fail_unless (buffer == buffer1);
309
310   buffer = g_list_nth (buffers, 1)->data;
311   fail_unless (buffer == buffer2);
312
313   GST_DEBUG ("stopping");
314   fail_unless (gst_element_set_state (queue,
315           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
316 }
317
318 GST_END_TEST;
319
320 /* set queue size to 2 buffers
321  * push 2 buffers
322  * check over/underuns
323  * push 1 more buffer
324  * check over/underuns again
325  * check which buffer was leaked
326  */
327 GST_START_TEST (test_leaky_upstream)
328 {
329   GstBuffer *buffer1;
330   GstBuffer *buffer2;
331   GstBuffer *buffer3;
332   GstBuffer *buffer;
333   GstSegment segment;
334
335   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
336   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, "leaky", 1, NULL);
337
338   GST_DEBUG ("starting");
339
340   block_src ();
341
342   UNDERRUN_LOCK ();
343   fail_unless (gst_element_set_state (queue,
344           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
345       "could not set to playing");
346   UNDERRUN_WAIT ();
347   UNDERRUN_UNLOCK ();
348
349   gst_segment_init (&segment, GST_FORMAT_BYTES);
350   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
351   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
352
353   fail_unless (overrun_count == 0);
354   fail_unless (underrun_count == 1);
355
356   buffer1 = gst_buffer_new_and_alloc (4);
357   /* pushing gives away my reference */
358   gst_pad_push (mysrcpad, buffer1);
359
360   GST_DEBUG ("added 1st");
361   fail_unless (overrun_count == 0);
362   fail_unless (underrun_count == 1);
363
364   buffer2 = gst_buffer_new_and_alloc (4);
365   gst_pad_push (mysrcpad, buffer2);
366
367   GST_DEBUG ("added 2nd");
368   fail_unless (overrun_count == 0);
369   fail_unless (underrun_count == 1);
370
371   buffer3 = gst_buffer_new_and_alloc (4);
372   /* buffer4 will be leaked, keep a ref so refcount can be checked below */
373   gst_buffer_ref (buffer3);
374   gst_pad_push (mysrcpad, buffer3);
375
376   GST_DEBUG ("added 3nd");
377   /* it still triggers overrun when leaking */
378   fail_unless (overrun_count == 1);
379   fail_unless (underrun_count == 1);
380
381   /* wait for underrun and check that we got buffer1 and buffer2 only */
382   UNDERRUN_LOCK ();
383   mysinkpad = setup_sink_pad (queue, &sinktemplate);
384   unblock_src ();
385   UNDERRUN_WAIT ();
386   UNDERRUN_UNLOCK ();
387
388   fail_unless (overrun_count == 1);
389   fail_unless (underrun_count == 2);
390
391   fail_unless (g_list_length (buffers) == 2);
392
393   buffer = g_list_nth (buffers, 0)->data;
394   fail_unless (buffer == buffer1);
395
396   buffer = g_list_nth (buffers, 1)->data;
397   fail_unless (buffer == buffer2);
398
399   ASSERT_BUFFER_REFCOUNT (buffer3, "buffer", 1);
400   gst_buffer_unref (buffer3);
401
402   GST_DEBUG ("stopping");
403   fail_unless (gst_element_set_state (queue,
404           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
405 }
406
407 GST_END_TEST;
408
409 /* set queue size to 2 buffers
410  * push 2 buffers
411  * check over/underuns
412  * push 1 more buffer
413  * check over/underuns again
414  * check which buffer was leaked
415  */
416 GST_START_TEST (test_leaky_downstream)
417 {
418   GstBuffer *buffer1;
419   GstBuffer *buffer2;
420   GstBuffer *buffer3;
421   GstBuffer *buffer;
422   GstSegment segment;
423
424   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
425   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, "leaky", 2, NULL);
426
427   GST_DEBUG ("starting");
428
429   block_src ();
430
431   UNDERRUN_LOCK ();
432   fail_unless (gst_element_set_state (queue,
433           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
434       "could not set to playing");
435   UNDERRUN_WAIT ();
436   UNDERRUN_UNLOCK ();
437
438   gst_segment_init (&segment, GST_FORMAT_BYTES);
439   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
440   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
441
442   fail_unless (overrun_count == 0);
443   fail_unless (underrun_count == 1);
444
445   buffer1 = gst_buffer_new_and_alloc (4);
446   /* pushing gives away one reference */
447   /* buffer1 will be leaked, keep a ref so refcount can be checked below */
448   gst_buffer_ref (buffer1);
449   gst_pad_push (mysrcpad, buffer1);
450
451   GST_DEBUG ("added 1st");
452   fail_unless (overrun_count == 0);
453   fail_unless (underrun_count == 1);
454
455   buffer2 = gst_buffer_new_and_alloc (4);
456   gst_pad_push (mysrcpad, buffer2);
457
458   GST_DEBUG ("added 2nd");
459   fail_unless (overrun_count == 0);
460   fail_unless (underrun_count == 1);
461
462   buffer3 = gst_buffer_new_and_alloc (4);
463   gst_pad_push (mysrcpad, buffer3);
464
465   GST_DEBUG ("added 3rd");
466   /* it still triggers overrun when leaking */
467   fail_unless (overrun_count == 1);
468   fail_unless (underrun_count == 1);
469
470   /* wait for underrun and check that we got buffer1 and buffer2 only */
471   UNDERRUN_LOCK ();
472   mysinkpad = setup_sink_pad (queue, &sinktemplate);
473   unblock_src ();
474   UNDERRUN_WAIT ();
475   UNDERRUN_UNLOCK ();
476
477   fail_unless (overrun_count == 1);
478   fail_unless (underrun_count == 2);
479
480   fail_unless (g_list_length (buffers) == 2);
481
482   ASSERT_BUFFER_REFCOUNT (buffer1, "buffer", 1);
483   gst_buffer_unref (buffer1);
484
485   buffer = g_list_nth (buffers, 0)->data;
486   fail_unless (buffer == buffer2);
487
488   buffer = g_list_nth (buffers, 1)->data;
489   fail_unless (buffer == buffer3);
490
491   GST_DEBUG ("stopping");
492   fail_unless (gst_element_set_state (queue,
493           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
494 }
495
496 GST_END_TEST;
497
498 /* set queue size to 6 buffers and 7 seconds
499  * push 7 buffers with and without duration
500  * check current-level-time
501  */
502 GST_START_TEST (test_time_level)
503 {
504   GstBuffer *buffer = NULL;
505   GstClockTime time;
506   GstSegment segment;
507
508   g_signal_connect (queue, "overrun",
509       G_CALLBACK (queue_overrun_link_and_activate), NULL);
510   g_object_set (G_OBJECT (queue), "max-size-buffers", 6, NULL);
511   g_object_set (G_OBJECT (queue), "max-size-time", 7 * GST_SECOND, NULL);
512
513   GST_DEBUG ("starting");
514
515   block_src ();
516
517   UNDERRUN_LOCK ();
518   fail_unless (gst_element_set_state (queue,
519           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
520       "could not set to playing");
521   UNDERRUN_WAIT ();
522   UNDERRUN_UNLOCK ();
523
524   gst_segment_init (&segment, GST_FORMAT_BYTES);
525   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
526   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
527
528   /* push buffer without duration */
529   buffer = gst_buffer_new_and_alloc (4);
530   GST_BUFFER_TIMESTAMP (buffer) = GST_SECOND;
531   /* pushing gives away my reference */
532   gst_pad_push (mysrcpad, buffer);
533
534   /* level should be 1 seconds because buffer has no duration and starts at 1
535    * SECOND (sparse stream). */
536   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
537   fail_if (time != GST_SECOND);
538
539   /* second push should set the level to 2 second */
540   buffer = gst_buffer_new_and_alloc (4);
541   GST_BUFFER_TIMESTAMP (buffer) = 2 * GST_SECOND;
542   gst_pad_push (mysrcpad, buffer);
543
544   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
545   fail_if (time != 2 * GST_SECOND);
546
547   /* third push should set the level to 4 seconds, the 1 second diff with the
548    * previous buffer (without duration) and the 1 second duration of this
549    * buffer. */
550   buffer = gst_buffer_new_and_alloc (4);
551   GST_BUFFER_TIMESTAMP (buffer) = 3 * GST_SECOND;
552   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
553   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
554   gst_pad_push (mysrcpad, buffer);
555
556   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
557   fail_if (time != 4 * GST_SECOND);
558
559   /* fourth push should set the level to 6 seconds, the 2 second diff with the
560    * previous buffer, same duration. */
561   buffer = gst_buffer_new_and_alloc (4);
562   GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
563   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
564   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
565   gst_pad_push (mysrcpad, buffer);
566
567   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
568   fail_if (time != 6 * GST_SECOND);
569
570   /* fifth push should not adjust the level, the timestamp and duration are the
571    * same, meaning the previous buffer did not really have a duration. */
572   buffer = gst_buffer_new_and_alloc (4);
573   GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
574   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
575   gst_pad_push (mysrcpad, buffer);
576
577   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
578   fail_if (time != 6 * GST_SECOND);
579
580   /* sixth push should adjust the level with 1 second, we now know the
581    * previous buffer actually had a duration of 2 SECONDS */
582   buffer = gst_buffer_new_and_alloc (4);
583   GST_BUFFER_TIMESTAMP (buffer) = 7 * GST_SECOND;
584   gst_pad_push (mysrcpad, buffer);
585
586   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
587   fail_if (time != 7 * GST_SECOND);
588
589   /* eighth push should cause overrun */
590   fail_unless (overrun_count == 0);
591   buffer = gst_buffer_new_and_alloc (4);
592   GST_BUFFER_TIMESTAMP (buffer) = 8 * GST_SECOND;
593   /* the next call to gst_pad_push will emit the overrun signal. The signal
594    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
595    * activates and links mysinkpad. The queue task then dequeues a buffer and
596    * gst_pad_push() will return. */
597   gst_pad_push (mysrcpad, buffer);
598
599   fail_unless (overrun_count == 1);
600
601   GST_DEBUG ("stopping");
602   fail_unless (gst_element_set_state (queue,
603           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
604 }
605
606 GST_END_TEST;
607
608 GST_START_TEST (test_time_level_task_not_started)
609 {
610   GstEvent *event;
611   GstClockTime time;
612   GstSegment segment;
613
614   GST_DEBUG ("starting");
615
616   block_src ();
617
618   UNDERRUN_LOCK ();
619   fail_unless (gst_element_set_state (queue,
620           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
621       "could not set to playing");
622   UNDERRUN_WAIT ();
623   UNDERRUN_UNLOCK ();
624
625   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
626
627   gst_segment_init (&segment, GST_FORMAT_TIME);
628   segment.start = 1 * GST_SECOND;
629   segment.stop = 5 * GST_SECOND;
630   segment.time = 0;
631   segment.position = 1 * GST_SECOND;
632
633   event = gst_event_new_segment (&segment);
634   gst_pad_push_event (mysrcpad, event);
635
636   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
637   fail_if (time != 0 * GST_SECOND);
638
639   segment.base = 4 * GST_SECOND;
640   event = gst_event_new_segment (&segment);
641   gst_pad_push_event (mysrcpad, event);
642
643   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
644   GST_DEBUG ("time now %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
645   fail_if (time != 4 * GST_SECOND);
646
647   unblock_src ();
648
649   GST_DEBUG ("stopping");
650   fail_unless (gst_element_set_state (queue,
651           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
652 }
653
654 GST_END_TEST;
655
656 GST_START_TEST (test_sticky_not_linked)
657 {
658   GstEvent *event;
659   GstSegment segment;
660   gboolean ret;
661   GstFlowReturn flow_ret;
662
663   GST_DEBUG ("starting");
664
665   g_object_set (queue, "max-size-buffers", 1, NULL);
666
667   UNDERRUN_LOCK ();
668   fail_unless (gst_element_set_state (queue,
669           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
670       "could not set to playing");
671   UNDERRUN_WAIT ();
672   UNDERRUN_UNLOCK ();
673
674   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
675
676   gst_segment_init (&segment, GST_FORMAT_TIME);
677   segment.start = 1 * GST_SECOND;
678   segment.stop = 5 * GST_SECOND;
679   segment.time = 0;
680   segment.position = 1 * GST_SECOND;
681
682   event = gst_event_new_segment (&segment);
683   ret = gst_pad_push_event (mysrcpad, event);
684   fail_unless (ret == TRUE);
685
686   /* the first few buffers can return OK as they are queued and gst_queue_loop
687    * is woken up, tries to push and sets ->srcresult to NOT_LINKED
688    */
689   flow_ret = GST_FLOW_OK;
690   while (flow_ret != GST_FLOW_NOT_LINKED)
691     flow_ret = gst_pad_push (mysrcpad, gst_buffer_new ());
692
693   /* send a new sticky event so that it will be pushed on the next gst_pad_push
694    */
695   event = gst_event_new_segment (&segment);
696   ret = gst_pad_push_event (mysrcpad, event);
697   fail_unless (ret == TRUE);
698
699   /* make sure that gst_queue_sink_event doesn't return FALSE if the queue is
700    * unlinked, as that would make gst_pad_push return ERROR
701    */
702   flow_ret = gst_pad_push (mysrcpad, gst_buffer_new ());
703   fail_unless_equals_int (flow_ret, GST_FLOW_NOT_LINKED);
704
705   GST_DEBUG ("stopping");
706   fail_unless (gst_element_set_state (queue,
707           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
708 }
709
710 GST_END_TEST;
711
712 #if 0
713 static gboolean
714 event_equals_newsegment (GstEvent * event, gboolean update, gdouble rate,
715     GstFormat format, gint64 start, gint64 stop, gint64 position)
716 {
717   gboolean ns_update;
718   gdouble ns_rate, ns_arate;
719   GstFormat ns_format;
720   gint64 ns_start;
721   gint64 ns_stop;
722   gint64 ns_position;
723
724   if (GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT) {
725     return FALSE;
726   }
727
728   gst_event_parse_new_segment (event, &ns_update, &ns_rate, &ns_arate,
729       &ns_format, &ns_start, &ns_stop, &ns_position);
730
731   GST_DEBUG ("update %d, rate %lf, format %s, start %" GST_TIME_FORMAT
732       ", stop %" GST_TIME_FORMAT ", position %" GST_TIME_FORMAT, ns_update,
733       ns_rate, gst_format_get_name (ns_format), GST_TIME_ARGS (ns_start),
734       GST_TIME_ARGS (ns_stop), GST_TIME_ARGS (ns_position));
735
736   return (ns_update == update && ns_rate == rate && ns_format == format &&
737       ns_start == start && ns_stop == stop && ns_position == position);
738 }
739
740 GST_START_TEST (test_newsegment)
741 {
742   GstEvent *event;
743   GstBuffer *buffer1;
744   GstBuffer *buffer2;
745   GstBuffer *buffer;
746
747   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
748   g_object_set (G_OBJECT (queue), "max-size-buffers", 1, "max-size-time",
749       (guint64) 0, "leaky", 2, NULL);
750
751   GST_DEBUG ("starting");
752
753   fail_unless (gst_element_set_state (queue,
754           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
755       "could not set to playing");
756   fail_unless (overrun_count == 0);
757   fail_unless (underrun_count == 0);
758
759   event = gst_event_new_new_segment (FALSE, 2.0, 1.0, GST_FORMAT_TIME, 0,
760       2 * GST_SECOND, 0);
761   gst_pad_push_event (mysrcpad, event);
762
763   GST_DEBUG ("added 1st newsegment");
764   fail_unless (overrun_count == 0);
765   fail_unless (underrun_count == 0);
766
767   event = gst_event_new_new_segment (FALSE, 1.0, 1.0, GST_FORMAT_TIME, 0,
768       3 * GST_SECOND, 0);
769   gst_pad_push_event (mysrcpad, event);
770
771   GST_DEBUG ("added 2nd newsegment");
772   fail_unless (overrun_count == 0);
773   fail_unless (underrun_count == 0);
774
775   event = gst_event_new_new_segment (FALSE, 1.0, 1.0, GST_FORMAT_TIME,
776       4 * GST_SECOND, 5 * GST_SECOND, 4 * GST_SECOND);
777   gst_pad_push_event (mysrcpad, event);
778
779   GST_DEBUG ("added 3rd newsegment");
780   fail_unless (overrun_count == 0);
781   fail_unless (underrun_count == 0);
782
783   buffer1 = gst_buffer_new_and_alloc (4);
784   /* buffer1 will be leaked, keep a ref so refcount can be checked below */
785   gst_buffer_ref (buffer1);
786   /* pushing gives away one reference */
787   gst_pad_push (mysrcpad, buffer1);
788
789   GST_DEBUG ("added 1st buffer");
790   fail_unless (overrun_count == 0);
791   fail_unless (underrun_count == 0);
792
793   buffer2 = gst_buffer_new_and_alloc (4);
794   /* next push will cause overrun and leak all newsegment events and buffer1 */
795   gst_pad_push (mysrcpad, buffer2);
796
797   GST_DEBUG ("added 2nd buffer");
798   /* it still triggers overrun when leaking */
799   fail_unless (overrun_count == 1);
800   fail_unless (underrun_count == 0);
801
802   /* wait for underrun and check that we got one accumulated newsegment event,
803    * one real newsegment event and buffer2 only */
804   UNDERRUN_LOCK ();
805   mysinkpad = setup_sink_pad (queue, &sinktemplate);
806   UNDERRUN_WAIT ();
807   UNDERRUN_UNLOCK ();
808
809   fail_unless (overrun_count == 1);
810   fail_unless (underrun_count == 1);
811
812   fail_unless (g_list_length (events) == 2);
813
814   event = g_list_nth (events, 0)->data;
815   fail_unless (event_equals_newsegment (event, FALSE, 1.0, GST_FORMAT_TIME, 0,
816           4 * GST_SECOND, 0));
817
818   event = g_list_nth (events, 1)->data;
819   fail_unless (event_equals_newsegment (event, FALSE, 1.0, GST_FORMAT_TIME,
820           4 * GST_SECOND, 5 * GST_SECOND, 4 * GST_SECOND));
821
822   fail_unless (g_list_length (buffers) == 1);
823
824   ASSERT_BUFFER_REFCOUNT (buffer1, "buffer", 1);
825   gst_buffer_unref (buffer1);
826
827   buffer = g_list_nth (buffers, 0)->data;
828   fail_unless (buffer == buffer2);
829
830   GST_DEBUG ("stopping");
831   fail_unless (gst_element_set_state (queue,
832           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
833 }
834
835 GST_END_TEST;
836 #endif
837
838 static gpointer
839 thread_func (gpointer data)
840 {
841   int i = 0;
842   for (i = 0; i < 100; i++) {
843     GstCaps *caps;
844     GstQuery *query;
845     gboolean ok;
846     caps = gst_caps_new_any ();
847     query = gst_query_new_allocation (caps, FALSE);
848     ok = gst_pad_peer_query (mysrcpad, query);
849     gst_query_unref (query);
850     gst_caps_unref (caps);
851     query = NULL;
852     caps = NULL;
853
854     if (!ok)
855       break;
856   }
857
858   return NULL;
859 }
860
861 static gboolean query_func (GstPad * pad, GstObject * parent, GstQuery * query);
862
863 static gboolean
864 query_func (GstPad * pad, GstObject * parent, GstQuery * query)
865 {
866
867   g_usleep (1000);
868   return TRUE;
869 }
870
871 GST_START_TEST (test_queries_while_flushing)
872 {
873   GstEvent *event;
874   GThread *thread;
875   int i;
876
877   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
878   gst_pad_set_query_function (mysinkpad, query_func);
879   gst_pad_set_active (mysinkpad, TRUE);
880
881   /* hard to reproduce, so just run it a few times in a row */
882   for (i = 0; i < 500; ++i) {
883     GST_DEBUG ("starting");
884     UNDERRUN_LOCK ();
885     fail_unless (gst_element_set_state (queue,
886             GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
887         "could not set to playing");
888     UNDERRUN_WAIT ();
889     UNDERRUN_UNLOCK ();
890
891     thread = g_thread_new ("deactivating thread", thread_func, NULL);
892     g_usleep (1000);
893
894     event = gst_event_new_flush_start ();
895     gst_pad_push_event (mysrcpad, event);
896
897     g_thread_join (thread);
898
899     GST_DEBUG ("stopping");
900     fail_unless (gst_element_set_state (queue,
901             GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS,
902         "could not set to null");
903   }
904 }
905
906 GST_END_TEST;
907
908 static gpointer
909 push_event_thread_func (gpointer data)
910 {
911   GstEvent *event;
912
913   event = GST_EVENT (data);
914
915   GST_DEBUG ("pushing event %p on pad %p", event, mysrcpad);
916   gst_pad_push_event (mysrcpad, event);
917
918   return NULL;
919 }
920
921 GST_START_TEST (test_state_change_when_flushing)
922 {
923   GstEvent *event;
924   GThread *thread;
925
926   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
927   gst_pad_set_active (mysinkpad, TRUE);
928
929   fail_unless (gst_element_set_state (queue, GST_STATE_PAUSED) ==
930       GST_STATE_CHANGE_SUCCESS);
931
932   event = gst_event_new_flush_start ();
933   gst_pad_push_event (mysrcpad, event);
934
935   event = gst_event_new_flush_stop (TRUE);
936   thread = g_thread_new ("send event", push_event_thread_func, event);
937
938   GST_DEBUG ("changing state to READY");
939   fail_unless (gst_element_set_state (queue, GST_STATE_READY) ==
940       GST_STATE_CHANGE_SUCCESS);
941   GST_DEBUG ("state changed");
942
943   g_thread_join (thread);
944
945   fail_unless (gst_element_set_state (queue, GST_STATE_NULL) ==
946       GST_STATE_CHANGE_SUCCESS);
947 }
948
949 GST_END_TEST;
950
951 GST_START_TEST (test_time_level_buffer_list)
952 {
953   GstBuffer *buffer = NULL;
954   GstBufferList *buffer_list = NULL;
955   GstClockTime time;
956   guint buffers;
957   GstSegment segment;
958
959   g_signal_connect (queue, "overrun",
960       G_CALLBACK (queue_overrun_link_and_activate), NULL);
961   g_object_set (G_OBJECT (queue), "max-size-buffers", 11, NULL);
962   g_object_set (G_OBJECT (queue), "max-size-time",
963       G_GUINT64_CONSTANT (7000) * GST_MSECOND, NULL);
964
965   GST_DEBUG ("starting");
966
967   block_src ();
968
969   UNDERRUN_LOCK ();
970   fail_unless (gst_element_set_state (queue,
971           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
972       "could not set to playing");
973   UNDERRUN_WAIT ();
974   UNDERRUN_UNLOCK ();
975
976   gst_segment_init (&segment, GST_FORMAT_BYTES);
977   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
978   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
979
980   /* push buffer without duration */
981   buffer = gst_buffer_new_and_alloc (4);
982   GST_BUFFER_TIMESTAMP (buffer) = 1000 * GST_MSECOND;
983   /* pushing gives away my reference */
984   gst_pad_push (mysrcpad, buffer);
985
986   /* level should be 1 seconds because buffer has no duration and starts at 1
987    * SECOND (sparse stream). */
988   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
989   fail_unless_equals_uint64 (time, 1000 * GST_MSECOND);
990   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
991   fail_unless_equals_int (buffers, 1);
992
993   /* second push should set the level to 2 second */
994   buffer_list = gst_buffer_list_new ();
995   buffer = gst_buffer_new_and_alloc (4);
996   GST_BUFFER_TIMESTAMP (buffer) = 1500 * GST_MSECOND;
997   gst_buffer_list_add (buffer_list, buffer);
998   buffer = gst_buffer_new_and_alloc (4);
999   GST_BUFFER_TIMESTAMP (buffer) = 2000 * GST_MSECOND;
1000   gst_buffer_list_add (buffer_list, buffer);
1001   gst_pad_push_list (mysrcpad, buffer_list);
1002
1003   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1004   fail_unless_equals_uint64 (time, 2000 * GST_MSECOND);
1005   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1006   fail_unless_equals_int (buffers, 3);
1007
1008   /* third push should set the level to 4 seconds, the 1 second diff with the
1009    * previous buffer (without duration) and the 1 second duration of this
1010    * buffer. */
1011   buffer_list = gst_buffer_list_new ();
1012   buffer = gst_buffer_new_and_alloc (4);
1013   GST_BUFFER_TIMESTAMP (buffer) = 3000 * GST_MSECOND;
1014   GST_BUFFER_DURATION (buffer) = 500 * GST_MSECOND;
1015   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1016   gst_buffer_list_add (buffer_list, buffer);
1017   buffer = gst_buffer_new_and_alloc (4);
1018   GST_BUFFER_TIMESTAMP (buffer) = 3500 * GST_MSECOND;
1019   GST_BUFFER_DURATION (buffer) = 500 * GST_MSECOND;
1020   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1021   gst_buffer_list_add (buffer_list, buffer);
1022   gst_pad_push_list (mysrcpad, buffer_list);
1023
1024   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1025   fail_unless_equals_uint64 (time, 4000 * GST_MSECOND);
1026   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1027   fail_unless_equals_int (buffers, 5);
1028
1029   /* fourth push should set the level to 6 seconds, the 2 second diff with the
1030    * previous buffer, same duration. */
1031   buffer_list = gst_buffer_list_new ();
1032   buffer = gst_buffer_new_and_alloc (4);
1033   GST_BUFFER_TIMESTAMP (buffer) = 5000 * GST_MSECOND;
1034   GST_BUFFER_DURATION (buffer) = 1000 * GST_MSECOND;
1035   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1036   gst_buffer_list_add (buffer_list, buffer);
1037   gst_pad_push_list (mysrcpad, buffer_list);
1038
1039   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1040   fail_unless_equals_uint64 (time, 6000 * GST_MSECOND);
1041   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1042   fail_unless_equals_int (buffers, 6);
1043
1044   /* fifth push should not adjust the level, the timestamp and duration are the
1045    * same, meaning the previous buffer did not really have a duration. */
1046   buffer_list = gst_buffer_list_new ();
1047   buffer = gst_buffer_new_and_alloc (4);
1048   GST_BUFFER_TIMESTAMP (buffer) = 5000 * GST_MSECOND;
1049   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1050   gst_buffer_list_add (buffer_list, buffer);
1051   buffer = gst_buffer_new_and_alloc (4);
1052   GST_BUFFER_TIMESTAMP (buffer) = 5250 * GST_MSECOND;
1053   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1054   gst_buffer_list_add (buffer_list, buffer);
1055   buffer = gst_buffer_new_and_alloc (4);
1056   GST_BUFFER_TIMESTAMP (buffer) = 5500 * GST_MSECOND;
1057   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1058   gst_buffer_list_add (buffer_list, buffer);
1059   buffer = gst_buffer_new_and_alloc (4);
1060   GST_BUFFER_TIMESTAMP (buffer) = 5750 * GST_MSECOND;
1061   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1062   gst_buffer_list_add (buffer_list, buffer);
1063   gst_pad_push_list (mysrcpad, buffer_list);
1064
1065   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1066   fail_unless_equals_uint64 (time, 6000 * GST_MSECOND);
1067   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1068   fail_unless_equals_int (buffers, 10);
1069
1070   /* sixth push should adjust the level with 1 second, we now know the
1071    * previous buffer actually had a duration of 2 SECONDS */
1072   buffer = gst_buffer_new_and_alloc (4);
1073   GST_BUFFER_TIMESTAMP (buffer) = 7000 * GST_MSECOND;
1074   gst_pad_push (mysrcpad, buffer);
1075
1076   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1077   fail_unless_equals_uint64 (time, 7000 * GST_MSECOND);
1078   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1079   fail_unless_equals_int (buffers, 11);
1080
1081   /* eighth push should cause overrun */
1082   fail_unless (overrun_count == 0);
1083   buffer_list = gst_buffer_list_new ();
1084   buffer = gst_buffer_new_and_alloc (4);
1085   GST_BUFFER_TIMESTAMP (buffer) = 8000 * GST_MSECOND;
1086   /* the next call to gst_pad_push will emit the overrun signal. The signal
1087    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
1088    * activates and links mysinkpad. The queue task then dequeues a buffer and
1089    * gst_pad_push() will return. */
1090   gst_buffer_list_add (buffer_list, buffer);
1091   gst_pad_push_list (mysrcpad, buffer_list);
1092
1093   fail_unless (overrun_count == 1);
1094
1095   GST_DEBUG ("stopping");
1096   fail_unless (gst_element_set_state (queue,
1097           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
1098 }
1099
1100 GST_END_TEST;
1101
1102 GST_START_TEST (test_initial_events_nodelay)
1103 {
1104   GstSegment segment;
1105   GstEvent *event;
1106   GstCaps *caps;
1107   gboolean ret;
1108
1109   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
1110   gst_pad_set_event_function (mysinkpad, event_func);
1111   gst_pad_set_active (mysinkpad, TRUE);
1112
1113   GST_DEBUG ("starting");
1114
1115   fail_unless (gst_element_set_state (queue,
1116           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
1117       "could not set to playing");
1118
1119   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
1120
1121   caps = gst_caps_new_empty_simple ("foo/x-bar");
1122   ret = gst_pad_push_event (mysrcpad, gst_event_new_caps (caps));
1123   gst_caps_unref (caps);
1124   fail_unless (ret == TRUE);
1125
1126   gst_segment_init (&segment, GST_FORMAT_TIME);
1127   ret = gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
1128   fail_unless (ret == TRUE);
1129
1130   g_mutex_lock (&events_lock);
1131   while (events_count < 3) {
1132     g_cond_wait (&events_cond, &events_lock);
1133   }
1134   g_mutex_unlock (&events_lock);
1135
1136   fail_unless_equals_int (g_list_length (events), 3);
1137   event = g_list_nth_data (events, 0);
1138   fail_unless_equals_int (GST_EVENT_TYPE (event), GST_EVENT_STREAM_START);
1139   event = g_list_nth_data (events, 1);
1140   fail_unless_equals_int (GST_EVENT_TYPE (event), GST_EVENT_CAPS);
1141   event = g_list_nth_data (events, 2);
1142   fail_unless_equals_int (GST_EVENT_TYPE (event), GST_EVENT_SEGMENT);
1143
1144   gst_element_set_state (queue, GST_STATE_NULL);
1145 }
1146
1147 GST_END_TEST;
1148
1149 static Suite *
1150 queue_suite (void)
1151 {
1152   Suite *s = suite_create ("queue");
1153   TCase *tc_chain = tcase_create ("general");
1154
1155   suite_add_tcase (s, tc_chain);
1156   tcase_add_checked_fixture (tc_chain, setup, cleanup);
1157   tcase_add_test (tc_chain, test_non_leaky_underrun);
1158   tcase_add_test (tc_chain, test_non_leaky_overrun);
1159   tcase_add_test (tc_chain, test_leaky_upstream);
1160   tcase_add_test (tc_chain, test_leaky_downstream);
1161   tcase_add_test (tc_chain, test_time_level);
1162   tcase_add_test (tc_chain, test_time_level_task_not_started);
1163   tcase_add_test (tc_chain, test_queries_while_flushing);
1164   tcase_add_test (tc_chain, test_state_change_when_flushing);
1165 #if 0
1166   tcase_add_test (tc_chain, test_newsegment);
1167 #endif
1168   tcase_add_test (tc_chain, test_sticky_not_linked);
1169   tcase_add_test (tc_chain, test_time_level_buffer_list);
1170   tcase_add_test (tc_chain, test_initial_events_nodelay);
1171
1172   return s;
1173 }
1174
1175 GST_CHECK_MAIN (queue);