1e9e9ca676aea5f37218ff986ba8d96abe56410a
[platform/upstream/gstreamer.git] / tests / check / elements / queue.c
1 /* GStreamer
2  *
3  * unit test for queue
4  *
5  * Copyright (C) <2006> Stefan Kost <ensonic@users.sf.net>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #include <unistd.h>
24
25 #include <gst/check/gstcheck.h>
26
27 #define UNDERRUN_LOCK() (g_mutex_lock (&underrun_mutex))
28 #define UNDERRUN_UNLOCK() (g_mutex_unlock (&underrun_mutex))
29 #define UNDERRUN_SIGNAL() (g_cond_signal (&underrun_cond))
30 #define UNDERRUN_WAIT() (g_cond_wait (&underrun_cond, &underrun_mutex))
31
32 static GstElement *queue;
33
34 /* For ease of programming we use globals to keep refs for our floating
35  * src and sink pads we create; otherwise we always have to do get_pad,
36  * get_peer, and then remove references in every test function */
37 static GstPad *mysrcpad;
38 static GstPad *mysinkpad;
39 static GstPad *qsrcpad;
40 static gulong probe_id;
41
42 static gint overrun_count;
43
44 static GMutex underrun_mutex;
45 static GCond underrun_cond;
46 static gint underrun_count;
47
48 static GList *events;
49
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
51     GST_PAD_SINK,
52     GST_PAD_ALWAYS,
53     GST_STATIC_CAPS_ANY);
54 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
55     GST_PAD_SRC,
56     GST_PAD_ALWAYS,
57     GST_STATIC_CAPS_ANY);
58
59 static void
60 queue_overrun (GstElement * queue, gpointer user_data)
61 {
62   overrun_count++;
63   GST_DEBUG ("queue overrun %d", overrun_count);
64 }
65
66 static void
67 queue_underrun (GstElement * queue, gpointer user_data)
68 {
69   UNDERRUN_LOCK ();
70   underrun_count++;
71   GST_DEBUG ("queue underrun %d", underrun_count);
72   UNDERRUN_SIGNAL ();
73   UNDERRUN_UNLOCK ();
74 }
75
76 static gboolean
77 event_func (GstPad * pad, GstObject * parent, GstEvent * event)
78 {
79   GST_DEBUG ("%s event", gst_event_type_get_name (GST_EVENT_TYPE (event)));
80   events = g_list_append (events, event);
81
82   return TRUE;
83 }
84
85 static void
86 drop_events (void)
87 {
88   while (events != NULL) {
89     gst_event_unref (GST_EVENT (events->data));
90     events = g_list_delete_link (events, events);
91   }
92 }
93
94 static void
95 block_src (void)
96 {
97   qsrcpad = gst_element_get_static_pad (queue, "src");
98   probe_id = gst_pad_add_probe (qsrcpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
99       NULL, NULL, NULL);
100 }
101
102 static void
103 unblock_src (void)
104 {
105   gst_pad_remove_probe (qsrcpad, probe_id);
106   gst_object_unref (qsrcpad);
107 }
108
109 static void
110 setup (void)
111 {
112   GST_DEBUG ("setup_queue");
113
114   queue = gst_check_setup_element ("queue");
115   g_signal_connect (queue, "underrun", G_CALLBACK (queue_underrun), NULL);
116
117   mysrcpad = gst_check_setup_src_pad (queue, &srctemplate);
118   gst_pad_set_active (mysrcpad, TRUE);
119
120   mysinkpad = NULL;
121
122   overrun_count = 0;
123
124   underrun_count = 0;
125
126   events = NULL;
127 }
128
129 static void
130 cleanup (void)
131 {
132   GST_DEBUG ("cleanup_queue");
133
134   gst_check_drop_buffers ();
135
136   drop_events ();
137
138   if (mysinkpad != NULL) {
139     gst_pad_set_active (mysinkpad, FALSE);
140     gst_check_teardown_sink_pad (queue);
141   }
142
143   gst_pad_set_active (mysrcpad, FALSE);
144   gst_check_teardown_src_pad (queue);
145
146   gst_check_teardown_element (queue);
147   queue = NULL;
148 }
149
150 /* setup the sinkpad on a playing queue element. gst_check_setup_sink_pad()
151  * does not work in this case since it does not activate the pad before linking
152  * it. */
153 static GstPad *
154 setup_sink_pad (GstElement * element, GstStaticPadTemplate * tmpl)
155 {
156   GstPad *srcpad;
157   GstPad *sinkpad;
158
159   sinkpad = gst_pad_new_from_static_template (tmpl, "sink");
160   fail_if (sinkpad == NULL);
161   srcpad = gst_element_get_static_pad (element, "src");
162   fail_if (srcpad == NULL);
163   gst_pad_set_chain_function (sinkpad, gst_check_chain_func);
164   gst_pad_set_event_function (sinkpad, event_func);
165   gst_pad_set_active (sinkpad, TRUE);
166   fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK);
167   gst_object_unref (srcpad);
168
169   return sinkpad;
170 }
171
172 /* set queue size to 2 buffers
173  * pull 1 buffer
174  * check over/underuns
175  */
176 GST_START_TEST (test_non_leaky_underrun)
177 {
178   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
179   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, NULL);
180   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
181   gst_pad_set_active (mysinkpad, TRUE);
182
183   GST_DEBUG ("starting");
184
185   UNDERRUN_LOCK ();
186   fail_unless (gst_element_set_state (queue,
187           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
188       "could not set to playing");
189   UNDERRUN_WAIT ();
190   UNDERRUN_UNLOCK ();
191
192   fail_unless (overrun_count == 0);
193   fail_unless (underrun_count == 1);
194
195   GST_DEBUG ("stopping");
196   fail_unless (gst_element_set_state (queue,
197           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
198 }
199
200 GST_END_TEST;
201
202 static void
203 queue_overrun_link_and_activate (GstElement * queue, gpointer user_data)
204 {
205   GST_DEBUG ("queue overrun");
206   overrun_count++;
207
208   /* link the src pad of the queue to make it dequeue buffers */
209   mysinkpad = setup_sink_pad (queue, &sinktemplate);
210
211   unblock_src ();
212 }
213
214 /* set queue size to 2 buffers
215  * push 2 buffers
216  * check over/underuns
217  * push 1 more buffer
218  * check over/underuns again
219  */
220 GST_START_TEST (test_non_leaky_overrun)
221 {
222   GstBuffer *buffer1;
223   GstBuffer *buffer2;
224   GstBuffer *buffer3;
225   GstBuffer *buffer;
226   GstSegment segment;
227
228   g_signal_connect (queue, "overrun",
229       G_CALLBACK (queue_overrun_link_and_activate), NULL);
230   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, NULL);
231
232   block_src ();
233
234   GST_DEBUG ("starting");
235
236   UNDERRUN_LOCK ();
237   fail_unless (gst_element_set_state (queue,
238           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
239       "could not set to playing");
240   UNDERRUN_WAIT ();
241   UNDERRUN_UNLOCK ();
242
243   gst_segment_init (&segment, GST_FORMAT_BYTES);
244   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
245   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
246
247   fail_unless (underrun_count == 1);
248   fail_unless (overrun_count == 0);
249
250   buffer1 = gst_buffer_new_and_alloc (4);
251   /* pushing gives away my reference */
252   gst_pad_push (mysrcpad, buffer1);
253
254   GST_DEBUG ("added 1st");
255   fail_unless (overrun_count == 0);
256   fail_unless (underrun_count == 1);
257
258   buffer2 = gst_buffer_new_and_alloc (4);
259   gst_pad_push (mysrcpad, buffer2);
260
261   GST_DEBUG ("added 2nd");
262   fail_unless (overrun_count == 0);
263   fail_unless (underrun_count == 1);
264
265   buffer3 = gst_buffer_new_and_alloc (4);
266   /* the next call to gst_pad_push will emit the overrun signal. The signal
267    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
268    * activates and links mysinkpad. The queue task then dequeues a buffer and
269    * gst_pad_push() will return. */
270   gst_pad_push (mysrcpad, buffer3);
271
272   GST_DEBUG ("added 3rd");
273   fail_unless (overrun_count == 1);
274
275   /* lock the check_mutex to block the first buffer pushed to mysinkpad */
276   g_mutex_lock (&check_mutex);
277   /* now let the queue push all buffers */
278   while (g_list_length (buffers) < 3) {
279     g_cond_wait (&check_cond, &check_mutex);
280   }
281   g_mutex_unlock (&check_mutex);
282
283   fail_unless (overrun_count == 1);
284   /* make sure we get the underrun signal before we check underrun_count */
285   UNDERRUN_LOCK ();
286   while (underrun_count < 2) {
287     UNDERRUN_WAIT ();
288   }
289   /* we can't check the underrun_count here safely because when adding the 3rd
290    * buffer, the queue lock is released to emit the overrun signal and the
291    * downstream part can then push and empty the queue and signal an additional
292    * underrun */
293   /* fail_unless_equals_int (underrun_count, 2); */
294   UNDERRUN_UNLOCK ();
295
296   buffer = g_list_nth (buffers, 0)->data;
297   fail_unless (buffer == buffer1);
298
299   buffer = g_list_nth (buffers, 1)->data;
300   fail_unless (buffer == buffer2);
301
302   GST_DEBUG ("stopping");
303   fail_unless (gst_element_set_state (queue,
304           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
305 }
306
307 GST_END_TEST;
308
309 /* set queue size to 2 buffers
310  * push 2 buffers
311  * check over/underuns
312  * push 1 more buffer
313  * check over/underuns again
314  * check which buffer was leaked
315  */
316 GST_START_TEST (test_leaky_upstream)
317 {
318   GstBuffer *buffer1;
319   GstBuffer *buffer2;
320   GstBuffer *buffer3;
321   GstBuffer *buffer;
322   GstSegment segment;
323
324   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
325   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, "leaky", 1, NULL);
326
327   GST_DEBUG ("starting");
328
329   block_src ();
330
331   UNDERRUN_LOCK ();
332   fail_unless (gst_element_set_state (queue,
333           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
334       "could not set to playing");
335   UNDERRUN_WAIT ();
336   UNDERRUN_UNLOCK ();
337
338   gst_segment_init (&segment, GST_FORMAT_BYTES);
339   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
340   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
341
342   fail_unless (overrun_count == 0);
343   fail_unless (underrun_count == 1);
344
345   buffer1 = gst_buffer_new_and_alloc (4);
346   /* pushing gives away my reference */
347   gst_pad_push (mysrcpad, buffer1);
348
349   GST_DEBUG ("added 1st");
350   fail_unless (overrun_count == 0);
351   fail_unless (underrun_count == 1);
352
353   buffer2 = gst_buffer_new_and_alloc (4);
354   gst_pad_push (mysrcpad, buffer2);
355
356   GST_DEBUG ("added 2nd");
357   fail_unless (overrun_count == 0);
358   fail_unless (underrun_count == 1);
359
360   buffer3 = gst_buffer_new_and_alloc (4);
361   /* buffer4 will be leaked, keep a ref so refcount can be checked below */
362   gst_buffer_ref (buffer3);
363   gst_pad_push (mysrcpad, buffer3);
364
365   GST_DEBUG ("added 3nd");
366   /* it still triggers overrun when leaking */
367   fail_unless (overrun_count == 1);
368   fail_unless (underrun_count == 1);
369
370   /* wait for underrun and check that we got buffer1 and buffer2 only */
371   UNDERRUN_LOCK ();
372   mysinkpad = setup_sink_pad (queue, &sinktemplate);
373   unblock_src ();
374   UNDERRUN_WAIT ();
375   UNDERRUN_UNLOCK ();
376
377   fail_unless (overrun_count == 1);
378   fail_unless (underrun_count == 2);
379
380   fail_unless (g_list_length (buffers) == 2);
381
382   buffer = g_list_nth (buffers, 0)->data;
383   fail_unless (buffer == buffer1);
384
385   buffer = g_list_nth (buffers, 1)->data;
386   fail_unless (buffer == buffer2);
387
388   ASSERT_BUFFER_REFCOUNT (buffer3, "buffer", 1);
389   gst_buffer_unref (buffer3);
390
391   GST_DEBUG ("stopping");
392   fail_unless (gst_element_set_state (queue,
393           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
394 }
395
396 GST_END_TEST;
397
398 /* set queue size to 2 buffers
399  * push 2 buffers
400  * check over/underuns
401  * push 1 more buffer
402  * check over/underuns again
403  * check which buffer was leaked
404  */
405 GST_START_TEST (test_leaky_downstream)
406 {
407   GstBuffer *buffer1;
408   GstBuffer *buffer2;
409   GstBuffer *buffer3;
410   GstBuffer *buffer;
411   GstSegment segment;
412
413   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
414   g_object_set (G_OBJECT (queue), "max-size-buffers", 2, "leaky", 2, NULL);
415
416   GST_DEBUG ("starting");
417
418   block_src ();
419
420   UNDERRUN_LOCK ();
421   fail_unless (gst_element_set_state (queue,
422           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
423       "could not set to playing");
424   UNDERRUN_WAIT ();
425   UNDERRUN_UNLOCK ();
426
427   gst_segment_init (&segment, GST_FORMAT_BYTES);
428   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
429   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
430
431   fail_unless (overrun_count == 0);
432   fail_unless (underrun_count == 1);
433
434   buffer1 = gst_buffer_new_and_alloc (4);
435   /* pushing gives away one reference */
436   /* buffer1 will be leaked, keep a ref so refcount can be checked below */
437   gst_buffer_ref (buffer1);
438   gst_pad_push (mysrcpad, buffer1);
439
440   GST_DEBUG ("added 1st");
441   fail_unless (overrun_count == 0);
442   fail_unless (underrun_count == 1);
443
444   buffer2 = gst_buffer_new_and_alloc (4);
445   gst_pad_push (mysrcpad, buffer2);
446
447   GST_DEBUG ("added 2nd");
448   fail_unless (overrun_count == 0);
449   fail_unless (underrun_count == 1);
450
451   buffer3 = gst_buffer_new_and_alloc (4);
452   gst_pad_push (mysrcpad, buffer3);
453
454   GST_DEBUG ("added 3rd");
455   /* it still triggers overrun when leaking */
456   fail_unless (overrun_count == 1);
457   fail_unless (underrun_count == 1);
458
459   /* wait for underrun and check that we got buffer1 and buffer2 only */
460   UNDERRUN_LOCK ();
461   mysinkpad = setup_sink_pad (queue, &sinktemplate);
462   unblock_src ();
463   UNDERRUN_WAIT ();
464   UNDERRUN_UNLOCK ();
465
466   fail_unless (overrun_count == 1);
467   fail_unless (underrun_count == 2);
468
469   fail_unless (g_list_length (buffers) == 2);
470
471   ASSERT_BUFFER_REFCOUNT (buffer1, "buffer", 1);
472   gst_buffer_unref (buffer1);
473
474   buffer = g_list_nth (buffers, 0)->data;
475   fail_unless (buffer == buffer2);
476
477   buffer = g_list_nth (buffers, 1)->data;
478   fail_unless (buffer == buffer3);
479
480   GST_DEBUG ("stopping");
481   fail_unless (gst_element_set_state (queue,
482           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
483 }
484
485 GST_END_TEST;
486
487 /* set queue size to 6 buffers and 7 seconds
488  * push 7 buffers with and without duration
489  * check current-level-time
490  */
491 GST_START_TEST (test_time_level)
492 {
493   GstBuffer *buffer = NULL;
494   GstClockTime time;
495   GstSegment segment;
496
497   g_signal_connect (queue, "overrun",
498       G_CALLBACK (queue_overrun_link_and_activate), NULL);
499   g_object_set (G_OBJECT (queue), "max-size-buffers", 6, NULL);
500   g_object_set (G_OBJECT (queue), "max-size-time", 7 * GST_SECOND, NULL);
501
502   GST_DEBUG ("starting");
503
504   block_src ();
505
506   UNDERRUN_LOCK ();
507   fail_unless (gst_element_set_state (queue,
508           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
509       "could not set to playing");
510   UNDERRUN_WAIT ();
511   UNDERRUN_UNLOCK ();
512
513   gst_segment_init (&segment, GST_FORMAT_BYTES);
514   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
515   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
516
517   /* push buffer without duration */
518   buffer = gst_buffer_new_and_alloc (4);
519   GST_BUFFER_TIMESTAMP (buffer) = GST_SECOND;
520   /* pushing gives away my reference */
521   gst_pad_push (mysrcpad, buffer);
522
523   /* level should be 1 seconds because buffer has no duration and starts at 1
524    * SECOND (sparse stream). */
525   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
526   fail_if (time != GST_SECOND);
527
528   /* second push should set the level to 2 second */
529   buffer = gst_buffer_new_and_alloc (4);
530   GST_BUFFER_TIMESTAMP (buffer) = 2 * GST_SECOND;
531   gst_pad_push (mysrcpad, buffer);
532
533   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
534   fail_if (time != 2 * GST_SECOND);
535
536   /* third push should set the level to 4 seconds, the 1 second diff with the
537    * previous buffer (without duration) and the 1 second duration of this
538    * buffer. */
539   buffer = gst_buffer_new_and_alloc (4);
540   GST_BUFFER_TIMESTAMP (buffer) = 3 * GST_SECOND;
541   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
542   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
543   gst_pad_push (mysrcpad, buffer);
544
545   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
546   fail_if (time != 4 * GST_SECOND);
547
548   /* fourth push should set the level to 6 seconds, the 2 second diff with the
549    * previous buffer, same duration. */
550   buffer = gst_buffer_new_and_alloc (4);
551   GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
552   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
553   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
554   gst_pad_push (mysrcpad, buffer);
555
556   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
557   fail_if (time != 6 * GST_SECOND);
558
559   /* fifth push should not adjust the level, the timestamp and duration are the
560    * same, meaning the previous buffer did not really have a duration. */
561   buffer = gst_buffer_new_and_alloc (4);
562   GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
563   GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
564   gst_pad_push (mysrcpad, buffer);
565
566   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
567   fail_if (time != 6 * GST_SECOND);
568
569   /* sixth push should adjust the level with 1 second, we now know the
570    * previous buffer actually had a duration of 2 SECONDS */
571   buffer = gst_buffer_new_and_alloc (4);
572   GST_BUFFER_TIMESTAMP (buffer) = 7 * GST_SECOND;
573   gst_pad_push (mysrcpad, buffer);
574
575   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
576   fail_if (time != 7 * GST_SECOND);
577
578   /* eighth push should cause overrun */
579   fail_unless (overrun_count == 0);
580   buffer = gst_buffer_new_and_alloc (4);
581   GST_BUFFER_TIMESTAMP (buffer) = 8 * GST_SECOND;
582   /* the next call to gst_pad_push will emit the overrun signal. The signal
583    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
584    * activates and links mysinkpad. The queue task then dequeues a buffer and
585    * gst_pad_push() will return. */
586   gst_pad_push (mysrcpad, buffer);
587
588   fail_unless (overrun_count == 1);
589
590   GST_DEBUG ("stopping");
591   fail_unless (gst_element_set_state (queue,
592           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
593 }
594
595 GST_END_TEST;
596
597 GST_START_TEST (test_time_level_task_not_started)
598 {
599   GstEvent *event;
600   GstClockTime time;
601   GstSegment segment;
602
603   GST_DEBUG ("starting");
604
605   block_src ();
606
607   UNDERRUN_LOCK ();
608   fail_unless (gst_element_set_state (queue,
609           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
610       "could not set to playing");
611   UNDERRUN_WAIT ();
612   UNDERRUN_UNLOCK ();
613
614   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
615
616   gst_segment_init (&segment, GST_FORMAT_TIME);
617   segment.start = 1 * GST_SECOND;
618   segment.stop = 5 * GST_SECOND;
619   segment.time = 0;
620   segment.position = 1 * GST_SECOND;
621
622   event = gst_event_new_segment (&segment);
623   gst_pad_push_event (mysrcpad, event);
624
625   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
626   fail_if (time != 0 * GST_SECOND);
627
628   segment.base = 4 * GST_SECOND;
629   event = gst_event_new_segment (&segment);
630   gst_pad_push_event (mysrcpad, event);
631
632   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
633   GST_DEBUG ("time now %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
634   fail_if (time != 4 * GST_SECOND);
635
636   unblock_src ();
637
638   GST_DEBUG ("stopping");
639   fail_unless (gst_element_set_state (queue,
640           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
641 }
642
643 GST_END_TEST;
644
645 GST_START_TEST (test_sticky_not_linked)
646 {
647   GstEvent *event;
648   GstSegment segment;
649   gboolean ret;
650   GstFlowReturn flow_ret;
651
652   GST_DEBUG ("starting");
653
654   g_object_set (queue, "max-size-buffers", 1, NULL);
655
656   UNDERRUN_LOCK ();
657   fail_unless (gst_element_set_state (queue,
658           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
659       "could not set to playing");
660   UNDERRUN_WAIT ();
661   UNDERRUN_UNLOCK ();
662
663   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
664
665   gst_segment_init (&segment, GST_FORMAT_TIME);
666   segment.start = 1 * GST_SECOND;
667   segment.stop = 5 * GST_SECOND;
668   segment.time = 0;
669   segment.position = 1 * GST_SECOND;
670
671   event = gst_event_new_segment (&segment);
672   ret = gst_pad_push_event (mysrcpad, event);
673   fail_unless (ret == TRUE);
674
675   /* the first few buffers can return OK as they are queued and gst_queue_loop
676    * is woken up, tries to push and sets ->srcresult to NOT_LINKED
677    */
678   flow_ret = GST_FLOW_OK;
679   while (flow_ret != GST_FLOW_NOT_LINKED)
680     flow_ret = gst_pad_push (mysrcpad, gst_buffer_new ());
681
682   /* send a new sticky event so that it will be pushed on the next gst_pad_push
683    */
684   event = gst_event_new_segment (&segment);
685   ret = gst_pad_push_event (mysrcpad, event);
686   fail_unless (ret == TRUE);
687
688   /* make sure that gst_queue_sink_event doesn't return FALSE if the queue is
689    * unlinked, as that would make gst_pad_push return ERROR
690    */
691   flow_ret = gst_pad_push (mysrcpad, gst_buffer_new ());
692   fail_unless_equals_int (flow_ret, GST_FLOW_NOT_LINKED);
693
694   GST_DEBUG ("stopping");
695   fail_unless (gst_element_set_state (queue,
696           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
697 }
698
699 GST_END_TEST;
700
701 #if 0
702 static gboolean
703 event_equals_newsegment (GstEvent * event, gboolean update, gdouble rate,
704     GstFormat format, gint64 start, gint64 stop, gint64 position)
705 {
706   gboolean ns_update;
707   gdouble ns_rate, ns_arate;
708   GstFormat ns_format;
709   gint64 ns_start;
710   gint64 ns_stop;
711   gint64 ns_position;
712
713   if (GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT) {
714     return FALSE;
715   }
716
717   gst_event_parse_new_segment (event, &ns_update, &ns_rate, &ns_arate,
718       &ns_format, &ns_start, &ns_stop, &ns_position);
719
720   GST_DEBUG ("update %d, rate %lf, format %s, start %" GST_TIME_FORMAT
721       ", stop %" GST_TIME_FORMAT ", position %" GST_TIME_FORMAT, ns_update,
722       ns_rate, gst_format_get_name (ns_format), GST_TIME_ARGS (ns_start),
723       GST_TIME_ARGS (ns_stop), GST_TIME_ARGS (ns_position));
724
725   return (ns_update == update && ns_rate == rate && ns_format == format &&
726       ns_start == start && ns_stop == stop && ns_position == position);
727 }
728
729 GST_START_TEST (test_newsegment)
730 {
731   GstEvent *event;
732   GstBuffer *buffer1;
733   GstBuffer *buffer2;
734   GstBuffer *buffer;
735
736   g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), NULL);
737   g_object_set (G_OBJECT (queue), "max-size-buffers", 1, "max-size-time",
738       (guint64) 0, "leaky", 2, NULL);
739
740   GST_DEBUG ("starting");
741
742   fail_unless (gst_element_set_state (queue,
743           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
744       "could not set to playing");
745   fail_unless (overrun_count == 0);
746   fail_unless (underrun_count == 0);
747
748   event = gst_event_new_new_segment (FALSE, 2.0, 1.0, GST_FORMAT_TIME, 0,
749       2 * GST_SECOND, 0);
750   gst_pad_push_event (mysrcpad, event);
751
752   GST_DEBUG ("added 1st newsegment");
753   fail_unless (overrun_count == 0);
754   fail_unless (underrun_count == 0);
755
756   event = gst_event_new_new_segment (FALSE, 1.0, 1.0, GST_FORMAT_TIME, 0,
757       3 * GST_SECOND, 0);
758   gst_pad_push_event (mysrcpad, event);
759
760   GST_DEBUG ("added 2nd newsegment");
761   fail_unless (overrun_count == 0);
762   fail_unless (underrun_count == 0);
763
764   event = gst_event_new_new_segment (FALSE, 1.0, 1.0, GST_FORMAT_TIME,
765       4 * GST_SECOND, 5 * GST_SECOND, 4 * GST_SECOND);
766   gst_pad_push_event (mysrcpad, event);
767
768   GST_DEBUG ("added 3rd newsegment");
769   fail_unless (overrun_count == 0);
770   fail_unless (underrun_count == 0);
771
772   buffer1 = gst_buffer_new_and_alloc (4);
773   /* buffer1 will be leaked, keep a ref so refcount can be checked below */
774   gst_buffer_ref (buffer1);
775   /* pushing gives away one reference */
776   gst_pad_push (mysrcpad, buffer1);
777
778   GST_DEBUG ("added 1st buffer");
779   fail_unless (overrun_count == 0);
780   fail_unless (underrun_count == 0);
781
782   buffer2 = gst_buffer_new_and_alloc (4);
783   /* next push will cause overrun and leak all newsegment events and buffer1 */
784   gst_pad_push (mysrcpad, buffer2);
785
786   GST_DEBUG ("added 2nd buffer");
787   /* it still triggers overrun when leaking */
788   fail_unless (overrun_count == 1);
789   fail_unless (underrun_count == 0);
790
791   /* wait for underrun and check that we got one accumulated newsegment event,
792    * one real newsegment event and buffer2 only */
793   UNDERRUN_LOCK ();
794   mysinkpad = setup_sink_pad (queue, &sinktemplate);
795   UNDERRUN_WAIT ();
796   UNDERRUN_UNLOCK ();
797
798   fail_unless (overrun_count == 1);
799   fail_unless (underrun_count == 1);
800
801   fail_unless (g_list_length (events) == 2);
802
803   event = g_list_nth (events, 0)->data;
804   fail_unless (event_equals_newsegment (event, FALSE, 1.0, GST_FORMAT_TIME, 0,
805           4 * GST_SECOND, 0));
806
807   event = g_list_nth (events, 1)->data;
808   fail_unless (event_equals_newsegment (event, FALSE, 1.0, GST_FORMAT_TIME,
809           4 * GST_SECOND, 5 * GST_SECOND, 4 * GST_SECOND));
810
811   fail_unless (g_list_length (buffers) == 1);
812
813   ASSERT_BUFFER_REFCOUNT (buffer1, "buffer", 1);
814   gst_buffer_unref (buffer1);
815
816   buffer = g_list_nth (buffers, 0)->data;
817   fail_unless (buffer == buffer2);
818
819   GST_DEBUG ("stopping");
820   fail_unless (gst_element_set_state (queue,
821           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
822 }
823
824 GST_END_TEST;
825 #endif
826
827 static gpointer
828 thread_func (gpointer data)
829 {
830   int i = 0;
831   for (i = 0; i < 100; i++) {
832     GstCaps *caps;
833     GstQuery *query;
834     gboolean ok;
835     caps = gst_caps_new_any ();
836     query = gst_query_new_allocation (caps, FALSE);
837     ok = gst_pad_peer_query (mysrcpad, query);
838     gst_query_unref (query);
839     gst_caps_unref (caps);
840     query = NULL;
841     caps = NULL;
842
843     if (!ok)
844       break;
845   }
846
847   return NULL;
848 }
849
850 static gboolean query_func (GstPad * pad, GstObject * parent, GstQuery * query);
851
852 static gboolean
853 query_func (GstPad * pad, GstObject * parent, GstQuery * query)
854 {
855
856   g_usleep (1000);
857   return TRUE;
858 }
859
860 GST_START_TEST (test_queries_while_flushing)
861 {
862   GstEvent *event;
863   GThread *thread;
864   int i;
865
866   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
867   gst_pad_set_query_function (mysinkpad, query_func);
868   gst_pad_set_active (mysinkpad, TRUE);
869
870   /* hard to reproduce, so just run it a few times in a row */
871   for (i = 0; i < 500; ++i) {
872     GST_DEBUG ("starting");
873     UNDERRUN_LOCK ();
874     fail_unless (gst_element_set_state (queue,
875             GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
876         "could not set to playing");
877     UNDERRUN_WAIT ();
878     UNDERRUN_UNLOCK ();
879
880     thread = g_thread_new ("deactivating thread", thread_func, NULL);
881     g_usleep (1000);
882
883     event = gst_event_new_flush_start ();
884     gst_pad_push_event (mysrcpad, event);
885
886     g_thread_join (thread);
887
888     GST_DEBUG ("stopping");
889     fail_unless (gst_element_set_state (queue,
890             GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS,
891         "could not set to null");
892   }
893 }
894
895 GST_END_TEST;
896
897 static gpointer
898 push_event_thread_func (gpointer data)
899 {
900   GstEvent *event;
901
902   event = GST_EVENT (data);
903
904   GST_DEBUG ("pushing event %p on pad %p", event, mysrcpad);
905   gst_pad_push_event (mysrcpad, event);
906
907   return NULL;
908 }
909
910 GST_START_TEST (test_state_change_when_flushing)
911 {
912   GstEvent *event;
913   GThread *thread;
914
915   mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate);
916   gst_pad_set_active (mysinkpad, TRUE);
917
918   fail_unless (gst_element_set_state (queue, GST_STATE_PAUSED) ==
919       GST_STATE_CHANGE_SUCCESS);
920
921   event = gst_event_new_flush_start ();
922   gst_pad_push_event (mysrcpad, event);
923
924   event = gst_event_new_flush_stop (TRUE);
925   thread = g_thread_new ("send event", push_event_thread_func, event);
926
927   GST_DEBUG ("changing state to READY");
928   fail_unless (gst_element_set_state (queue, GST_STATE_READY) ==
929       GST_STATE_CHANGE_SUCCESS);
930   GST_DEBUG ("state changed");
931
932   g_thread_join (thread);
933
934   fail_unless (gst_element_set_state (queue, GST_STATE_NULL) ==
935       GST_STATE_CHANGE_SUCCESS);
936 }
937
938 GST_END_TEST;
939
940 GST_START_TEST (test_time_level_buffer_list)
941 {
942   GstBuffer *buffer = NULL;
943   GstBufferList *buffer_list = NULL;
944   GstClockTime time;
945   guint buffers;
946   GstSegment segment;
947
948   g_signal_connect (queue, "overrun",
949       G_CALLBACK (queue_overrun_link_and_activate), NULL);
950   g_object_set (G_OBJECT (queue), "max-size-buffers", 11, NULL);
951   g_object_set (G_OBJECT (queue), "max-size-time",
952       G_GUINT64_CONSTANT (7000) * GST_MSECOND, NULL);
953
954   GST_DEBUG ("starting");
955
956   block_src ();
957
958   UNDERRUN_LOCK ();
959   fail_unless (gst_element_set_state (queue,
960           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
961       "could not set to playing");
962   UNDERRUN_WAIT ();
963   UNDERRUN_UNLOCK ();
964
965   gst_segment_init (&segment, GST_FORMAT_BYTES);
966   gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
967   gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment));
968
969   /* push buffer without duration */
970   buffer = gst_buffer_new_and_alloc (4);
971   GST_BUFFER_TIMESTAMP (buffer) = 1000 * GST_MSECOND;
972   /* pushing gives away my reference */
973   gst_pad_push (mysrcpad, buffer);
974
975   /* level should be 1 seconds because buffer has no duration and starts at 1
976    * SECOND (sparse stream). */
977   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
978   fail_unless_equals_uint64 (time, 1000 * GST_MSECOND);
979   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
980   fail_unless_equals_int (buffers, 1);
981
982   /* second push should set the level to 2 second */
983   buffer_list = gst_buffer_list_new ();
984   buffer = gst_buffer_new_and_alloc (4);
985   GST_BUFFER_TIMESTAMP (buffer) = 1500 * GST_MSECOND;
986   gst_buffer_list_add (buffer_list, buffer);
987   buffer = gst_buffer_new_and_alloc (4);
988   GST_BUFFER_TIMESTAMP (buffer) = 2000 * GST_MSECOND;
989   gst_buffer_list_add (buffer_list, buffer);
990   gst_pad_push_list (mysrcpad, buffer_list);
991
992   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
993   fail_unless_equals_uint64 (time, 2000 * GST_MSECOND);
994   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
995   fail_unless_equals_int (buffers, 3);
996
997   /* third push should set the level to 4 seconds, the 1 second diff with the
998    * previous buffer (without duration) and the 1 second duration of this
999    * buffer. */
1000   buffer_list = gst_buffer_list_new ();
1001   buffer = gst_buffer_new_and_alloc (4);
1002   GST_BUFFER_TIMESTAMP (buffer) = 3000 * GST_MSECOND;
1003   GST_BUFFER_DURATION (buffer) = 500 * GST_MSECOND;
1004   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1005   gst_buffer_list_add (buffer_list, buffer);
1006   buffer = gst_buffer_new_and_alloc (4);
1007   GST_BUFFER_TIMESTAMP (buffer) = 3500 * GST_MSECOND;
1008   GST_BUFFER_DURATION (buffer) = 500 * GST_MSECOND;
1009   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1010   gst_buffer_list_add (buffer_list, buffer);
1011   gst_pad_push_list (mysrcpad, buffer_list);
1012
1013   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1014   fail_unless_equals_uint64 (time, 4000 * GST_MSECOND);
1015   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1016   fail_unless_equals_int (buffers, 5);
1017
1018   /* fourth push should set the level to 6 seconds, the 2 second diff with the
1019    * previous buffer, same duration. */
1020   buffer_list = gst_buffer_list_new ();
1021   buffer = gst_buffer_new_and_alloc (4);
1022   GST_BUFFER_TIMESTAMP (buffer) = 5000 * GST_MSECOND;
1023   GST_BUFFER_DURATION (buffer) = 1000 * GST_MSECOND;
1024   ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
1025   gst_buffer_list_add (buffer_list, buffer);
1026   gst_pad_push_list (mysrcpad, buffer_list);
1027
1028   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1029   fail_unless_equals_uint64 (time, 6000 * GST_MSECOND);
1030   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1031   fail_unless_equals_int (buffers, 6);
1032
1033   /* fifth push should not adjust the level, the timestamp and duration are the
1034    * same, meaning the previous buffer did not really have a duration. */
1035   buffer_list = gst_buffer_list_new ();
1036   buffer = gst_buffer_new_and_alloc (4);
1037   GST_BUFFER_TIMESTAMP (buffer) = 5000 * GST_MSECOND;
1038   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1039   gst_buffer_list_add (buffer_list, buffer);
1040   buffer = gst_buffer_new_and_alloc (4);
1041   GST_BUFFER_TIMESTAMP (buffer) = 5250 * GST_MSECOND;
1042   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1043   gst_buffer_list_add (buffer_list, buffer);
1044   buffer = gst_buffer_new_and_alloc (4);
1045   GST_BUFFER_TIMESTAMP (buffer) = 5500 * GST_MSECOND;
1046   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1047   gst_buffer_list_add (buffer_list, buffer);
1048   buffer = gst_buffer_new_and_alloc (4);
1049   GST_BUFFER_TIMESTAMP (buffer) = 5750 * GST_MSECOND;
1050   GST_BUFFER_DURATION (buffer) = 250 * GST_MSECOND;
1051   gst_buffer_list_add (buffer_list, buffer);
1052   gst_pad_push_list (mysrcpad, buffer_list);
1053
1054   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1055   fail_unless_equals_uint64 (time, 6000 * GST_MSECOND);
1056   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1057   fail_unless_equals_int (buffers, 10);
1058
1059   /* sixth push should adjust the level with 1 second, we now know the
1060    * previous buffer actually had a duration of 2 SECONDS */
1061   buffer = gst_buffer_new_and_alloc (4);
1062   GST_BUFFER_TIMESTAMP (buffer) = 7000 * GST_MSECOND;
1063   gst_pad_push (mysrcpad, buffer);
1064
1065   g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
1066   fail_unless_equals_uint64 (time, 7000 * GST_MSECOND);
1067   g_object_get (G_OBJECT (queue), "current-level-buffers", &buffers, NULL);
1068   fail_unless_equals_int (buffers, 11);
1069
1070   /* eighth push should cause overrun */
1071   fail_unless (overrun_count == 0);
1072   buffer_list = gst_buffer_list_new ();
1073   buffer = gst_buffer_new_and_alloc (4);
1074   GST_BUFFER_TIMESTAMP (buffer) = 8000 * GST_MSECOND;
1075   /* the next call to gst_pad_push will emit the overrun signal. The signal
1076    * handler queue_overrun_link_and_activate() (above) increases overrun_count,
1077    * activates and links mysinkpad. The queue task then dequeues a buffer and
1078    * gst_pad_push() will return. */
1079   gst_buffer_list_add (buffer_list, buffer);
1080   gst_pad_push_list (mysrcpad, buffer_list);
1081
1082   fail_unless (overrun_count == 1);
1083
1084   GST_DEBUG ("stopping");
1085   fail_unless (gst_element_set_state (queue,
1086           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
1087 }
1088
1089 GST_END_TEST;
1090
1091 static Suite *
1092 queue_suite (void)
1093 {
1094   Suite *s = suite_create ("queue");
1095   TCase *tc_chain = tcase_create ("general");
1096
1097   suite_add_tcase (s, tc_chain);
1098   tcase_add_checked_fixture (tc_chain, setup, cleanup);
1099   tcase_add_test (tc_chain, test_non_leaky_underrun);
1100   tcase_add_test (tc_chain, test_non_leaky_overrun);
1101   tcase_add_test (tc_chain, test_leaky_upstream);
1102   tcase_add_test (tc_chain, test_leaky_downstream);
1103   tcase_add_test (tc_chain, test_time_level);
1104   tcase_add_test (tc_chain, test_time_level_task_not_started);
1105   tcase_add_test (tc_chain, test_queries_while_flushing);
1106   tcase_add_test (tc_chain, test_state_change_when_flushing);
1107 #if 0
1108   tcase_add_test (tc_chain, test_newsegment);
1109 #endif
1110   tcase_add_test (tc_chain, test_sticky_not_linked);
1111   tcase_add_test (tc_chain, test_time_level_buffer_list);
1112
1113   return s;
1114 }
1115
1116 GST_CHECK_MAIN (queue);