Merging gst-docs
[platform/upstream/gstreamer.git] / subprojects / gstreamer / plugins / elements / gstmultiqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
4  * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * gstmultiqueue.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 /**
26  * SECTION:element-multiqueue
27  * @title: multiqueue
28  * @see_also: #GstQueue
29  *
30  * Multiqueue is similar to a normal #GstQueue with the following additional
31  * features:
32  *
33  * 1) Multiple streamhandling
34  *
35  *  * The element handles queueing data on more than one stream at once. To
36  * achieve such a feature it has request sink pads (sink%u) and
37  * 'sometimes' src pads (src%u). When requesting a given sinkpad with gst_element_request_pad(),
38  * the associated srcpad for that stream will be created.
39  * Example: requesting sink1 will generate src1.
40  *
41  * 2) Non-starvation on multiple stream
42  *
43  * * If more than one stream is used with the element, the streams' queues
44  * will be dynamically grown (up to a limit), in order to ensure that no
45  * stream is risking data starvation. This guarantees that at any given
46  * time there are at least N bytes queued and available for each individual
47  * stream. If an EOS event comes through a srcpad, the associated queue will be
48  * considered as 'not-empty' in the queue-size-growing algorithm.
49  *
50  * 3) Non-linked srcpads graceful handling
51  *
52  * * In order to better support dynamic switching between streams, the multiqueue
53  * (unlike the current GStreamer queue) continues to push buffers on non-linked
54  * pads rather than shutting down. In addition, to prevent a non-linked stream from very quickly consuming all
55  * available buffers and thus 'racing ahead' of the other streams, the element
56  * must ensure that buffers and inlined events for a non-linked stream are pushed
57  * in the same order as they were received, relative to the other streams
58  * controlled by the element. This means that a buffer cannot be pushed to a
59  * non-linked pad any sooner than buffers in any other stream which were received
60  * before it.
61  *
62  * Data is queued until one of the limits specified by the
63  * #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
64  * #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
65  * more buffers into the queue will block the pushing thread until more space
66  * becomes available. #GstMultiQueue:extra-size-buffers,
67  *
68  *
69  * #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
70  * currently unused.
71  *
72  * The default queue size limits are 5 buffers, 10MB of data, or
73  * two second worth of data, whichever is reached first. Note that the number
74  * of buffers will dynamically grow depending on the fill level of
75  * other queues.
76  *
77  * The #GstMultiQueue::underrun signal is emitted when all of the queues
78  * are empty. The #GstMultiQueue::overrun signal is emitted when one of the
79  * queues is filled.
80  * Both signals are emitted from the context of the streaming thread.
81  *
82  * When using #GstMultiQueue:sync-by-running-time the unlinked streams will
83  * be throttled by the highest running-time of linked streams. This allows
84  * further relinking of those unlinked streams without them being in the
85  * future (i.e. to achieve gapless playback).
86  * When dealing with streams which have got different consumption requirements
87  * downstream (ex: video decoders which will consume more buffer (in time) than
88  * audio decoders), it is recommended to group streams of the same type
89  * by using the pad "group-id" property. This will further throttle streams
90  * in time within that group.
91  */
92
93 #ifdef HAVE_CONFIG_H
94 #  include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <gst/glib-compat-private.h>
99 #include <stdio.h>
100
101 #include "gstmultiqueue.h"
102 #include "gstcoreelementselements.h"
103
104 /* GstSingleQueue:
105  * @sinkpad: associated sink #GstPad
106  * @srcpad: associated source #GstPad
107  *
108  * Structure containing all information and properties about
109  * a single queue.
110  */
111 typedef struct _GstSingleQueue GstSingleQueue;
112
113 struct _GstSingleQueue
114 {
115   gint refcount;
116
117   /* unique identifier of the queue */
118   guint id;
119   /* group of streams to which this queue belongs to */
120   guint groupid;
121   GstClockTimeDiff group_high_time;
122
123   GWeakRef mqueue;
124   GWeakRef sinkpad;
125   GWeakRef srcpad;
126
127   /* flowreturn of previous srcpad push */
128   GstFlowReturn srcresult;
129   /* If something was actually pushed on
130    * this pad after flushing/pad activation
131    * and the srcresult corresponds to something
132    * real
133    */
134   gboolean pushed;
135
136   /* segments */
137   GstSegment sink_segment;
138   GstSegment src_segment;
139   gboolean has_src_segment;     /* preferred over initializing the src_segment to
140                                  * UNDEFINED as this doesn't requires adding ifs
141                                  * in every segment usage */
142
143   /* position of src/sink */
144   GstClockTimeDiff sinktime, srctime;
145   /* cached input value, used for interleave */
146   GstClockTimeDiff cached_sinktime;
147   /* TRUE if either position needs to be recalculated */
148   gboolean sink_tainted, src_tainted;
149
150   /* queue of data */
151   GstDataQueue *queue;
152   GstDataQueueSize max_size, extra_size;
153   GstClockTime cur_time;
154   gboolean is_eos;
155   gboolean is_segment_done;
156   gboolean is_sparse;
157   gboolean flushing;
158   gboolean active;
159
160   /* Protected by global lock */
161   guint32 nextid;               /* ID of the next object waiting to be pushed */
162   guint32 oldid;                /* ID of the last object pushed (last in a series) */
163   guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
164   GstClockTimeDiff next_time;   /* End running time of next buffer to be pushed */
165   GstClockTimeDiff last_time;   /* Start running time of last pushed buffer */
166   GCond turn;                   /* SingleQueue turn waiting conditional */
167
168   /* for serialized queries */
169   GCond query_handled;
170   gboolean last_query;
171   GstQuery *last_handled_query;
172
173   /* For interleave calculation */
174   GThread *thread;              /* Streaming thread of SingleQueue */
175   GstClockTime interleave;      /* Calculated interleve within the thread */
176 };
177
178 /* Extension of GstDataQueueItem structure for our usage */
179 typedef struct _GstMultiQueueItem GstMultiQueueItem;
180
181 struct _GstMultiQueueItem
182 {
183   GstMiniObject *object;
184   guint size;
185   guint64 duration;
186   gboolean visible;
187
188   GDestroyNotify destroy;
189   guint32 posid;
190
191   gboolean is_query;
192 };
193
194 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
195 static void gst_single_queue_unref (GstSingleQueue * squeue);
196 static GstSingleQueue *gst_single_queue_ref (GstSingleQueue * squeue);
197
198 static void wake_up_next_non_linked (GstMultiQueue * mq);
199 static void compute_high_id (GstMultiQueue * mq);
200 static void compute_high_time (GstMultiQueue * mq, guint groupid);
201 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
202 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
203
204 static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq);
205 static void gst_multi_queue_post_buffering (GstMultiQueue * mq);
206 static void recheck_buffering_status (GstMultiQueue * mq);
207
208 static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
209
210 static void calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq);
211
212 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
213     GST_PAD_SINK,
214     GST_PAD_REQUEST,
215     GST_STATIC_CAPS_ANY);
216
217 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
218     GST_PAD_SRC,
219     GST_PAD_SOMETIMES,
220     GST_STATIC_CAPS_ANY);
221
222 GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
223 #define GST_CAT_DEFAULT (multi_queue_debug)
224
225 /* Signals and args */
226 enum
227 {
228   SIGNAL_UNDERRUN,
229   SIGNAL_OVERRUN,
230   LAST_SIGNAL
231 };
232
233 /* default limits, we try to keep up to 2 seconds of data and if there is not
234  * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
235  * there is data in the queues. Normally, the byte and time limits are not hit
236  * in theses conditions. */
237 #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
238 #define DEFAULT_MAX_SIZE_BUFFERS 5
239 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
240
241 /* second limits. When we hit one of the above limits we are probably dealing
242  * with a badly muxed file and we scale the limits to these emergency values.
243  * This is currently not yet implemented.
244  * Since we dynamically scale the queue buffer size up to the limits but avoid
245  * going above the max-size-buffers when we can, we don't really need this
246  * additional extra size. */
247 #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
248 #define DEFAULT_EXTRA_SIZE_BUFFERS 5
249 #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
250
251 #define DEFAULT_USE_BUFFERING FALSE
252 #define DEFAULT_LOW_WATERMARK  0.01
253 #define DEFAULT_HIGH_WATERMARK 0.99
254 #define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
255 #define DEFAULT_USE_INTERLEAVE FALSE
256 #define DEFAULT_UNLINKED_CACHE_TIME 250 * GST_MSECOND
257
258 #define DEFAULT_MINIMUM_INTERLEAVE (250 * GST_MSECOND)
259
260 enum
261 {
262   PROP_0,
263   PROP_EXTRA_SIZE_BYTES,
264   PROP_EXTRA_SIZE_BUFFERS,
265   PROP_EXTRA_SIZE_TIME,
266   PROP_MAX_SIZE_BYTES,
267   PROP_MAX_SIZE_BUFFERS,
268   PROP_MAX_SIZE_TIME,
269   PROP_USE_BUFFERING,
270   PROP_LOW_PERCENT,
271   PROP_HIGH_PERCENT,
272   PROP_LOW_WATERMARK,
273   PROP_HIGH_WATERMARK,
274   PROP_SYNC_BY_RUNNING_TIME,
275   PROP_USE_INTERLEAVE,
276   PROP_UNLINKED_CACHE_TIME,
277   PROP_MINIMUM_INTERLEAVE,
278   PROP_STATS,
279   PROP_LAST
280 };
281
282 /* Explanation for buffer levels and percentages:
283  *
284  * The buffering_level functions here return a value in a normalized range
285  * that specifies the current fill level of a queue. The range goes from 0 to
286  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
287  *
288  * This is not to be confused with the buffering_percent value, which is
289  * a *relative* quantity - relative to the low/high watermarks.
290  * buffering_percent = 0% means overall buffering_level is at the low watermark.
291  * buffering_percent = 100% means overall buffering_level is at the high watermark.
292  * buffering_percent is used for determining if the fill level has reached
293  * the high watermark, and for producing BUFFERING messages. This value
294  * always uses a 0..100 range (since it is a percentage).
295  *
296  * To avoid future confusions, whenever "buffering level" is mentioned, it
297  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
298  * range. Whenever "buffering_percent" is mentioned, it refers to the
299  * percentage value that is relative to the low/high watermark. */
300
301 /* Using a buffering level range of 0..1000000 to allow for a
302  * resolution in ppm (1 ppm = 0.0001%) */
303 #define MAX_BUFFERING_LEVEL 1000000
304
305 /* How much 1% makes up in the buffer level range */
306 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
307
308 /* GstMultiQueuePad */
309
310 #define DEFAULT_PAD_GROUP_ID 0
311
312 enum
313 {
314   PROP_PAD_0,
315   PROP_PAD_GROUP_ID,
316   PROP_CURRENT_LEVEL_BUFFERS,
317   PROP_CURRENT_LEVEL_BYTES,
318   PROP_CURRENT_LEVEL_TIME,
319 };
320
321 #define GST_TYPE_MULTIQUEUE_PAD            (gst_multiqueue_pad_get_type())
322 #define GST_MULTIQUEUE_PAD(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePad))
323 #define GST_IS_MULTIQUEUE_PAD(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTIQUEUE_PAD))
324 #define GST_MULTIQUEUE_PAD_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
325 #define GST_IS_MULTIQUEUE_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_MULTIQUEUE_PAD))
326 #define GST_MULTIQUEUE_PAD_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
327
328 #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
329   g_mutex_lock (&q->qlock);                                              \
330 } G_STMT_END
331
332 #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
333   g_mutex_unlock (&q->qlock);                                            \
334 } G_STMT_END
335
336 #define SET_PERCENT(mq, perc) G_STMT_START {                             \
337   if (perc != mq->buffering_percent) {                                   \
338     mq->buffering_percent = perc;                                        \
339     mq->buffering_percent_changed = TRUE;                                \
340     GST_DEBUG_OBJECT (mq, "buffering %d percent", perc);                 \
341   }                                                                      \
342 } G_STMT_END
343
344 struct _GstMultiQueuePad
345 {
346   GstPad parent;
347
348   GstSingleQueue *sq;
349 };
350
351 struct _GstMultiQueuePadClass
352 {
353   GstPadClass parent_class;
354 };
355
356 GType gst_multiqueue_pad_get_type (void);
357
358 G_DEFINE_TYPE (GstMultiQueuePad, gst_multiqueue_pad, GST_TYPE_PAD);
359
360 static guint
361 gst_multiqueue_pad_get_group_id (GstMultiQueuePad * pad)
362 {
363   guint ret = 0;
364   GstMultiQueue *mq;
365
366   if (!pad->sq)
367     return 0;
368
369   mq = g_weak_ref_get (&pad->sq->mqueue);
370
371   if (mq) {
372     GST_OBJECT_LOCK (mq);
373   }
374
375   ret = pad->sq->groupid;
376
377   if (mq) {
378     GST_OBJECT_UNLOCK (mq);
379     gst_object_unref (mq);
380   }
381
382   return ret;
383 }
384
385 static guint
386 gst_multiqueue_pad_get_current_level_buffers (GstMultiQueuePad * pad)
387 {
388   GstSingleQueue *sq = pad->sq;
389   GstDataQueueSize level;
390   GstMultiQueue *mq;
391
392   if (!sq)
393     return 0;
394
395   mq = g_weak_ref_get (&pad->sq->mqueue);
396
397   if (mq) {
398     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
399   }
400
401   gst_data_queue_get_level (sq->queue, &level);
402
403   if (mq) {
404     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
405     gst_object_unref (mq);
406   }
407
408   return level.visible;
409 }
410
411 static guint
412 gst_multiqueue_pad_get_current_level_bytes (GstMultiQueuePad * pad)
413 {
414   GstSingleQueue *sq = pad->sq;
415   GstDataQueueSize level;
416   GstMultiQueue *mq;
417
418   if (!sq)
419     return 0;
420
421   mq = g_weak_ref_get (&pad->sq->mqueue);
422
423   if (mq) {
424     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
425   }
426
427   gst_data_queue_get_level (sq->queue, &level);
428
429   if (mq) {
430     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
431     gst_object_unref (mq);
432   }
433
434   return level.bytes;
435 }
436
437 static guint64
438 gst_multiqueue_pad_get_current_level_time (GstMultiQueuePad * pad)
439 {
440   GstSingleQueue *sq = pad->sq;
441   GstMultiQueue *mq;
442   guint64 ret;
443
444   if (!sq)
445     return 0;
446
447   mq = g_weak_ref_get (&pad->sq->mqueue);
448
449   if (mq) {
450     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
451   }
452
453   ret = sq->cur_time;
454
455   if (mq) {
456     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
457     gst_object_unref (mq);
458   }
459
460   return ret;
461 }
462
463 static void
464 gst_multiqueue_pad_get_property (GObject * object, guint prop_id,
465     GValue * value, GParamSpec * pspec)
466 {
467   GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
468
469   switch (prop_id) {
470     case PROP_PAD_GROUP_ID:
471       g_value_set_uint (value, gst_multiqueue_pad_get_group_id (pad));
472       break;
473     case PROP_CURRENT_LEVEL_BUFFERS:{
474       g_value_set_uint (value,
475           gst_multiqueue_pad_get_current_level_buffers (pad));
476       break;
477     }
478     case PROP_CURRENT_LEVEL_BYTES:{
479       g_value_set_uint (value,
480           gst_multiqueue_pad_get_current_level_bytes (pad));
481       break;
482     }
483     case PROP_CURRENT_LEVEL_TIME:{
484       g_value_set_uint64 (value,
485           gst_multiqueue_pad_get_current_level_time (pad));
486       break;
487     }
488     default:
489       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
490       break;
491   }
492 }
493
494 static void
495 gst_multiqueue_pad_set_property (GObject * object, guint prop_id,
496     const GValue * value, GParamSpec * pspec)
497 {
498   GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
499
500   switch (prop_id) {
501     case PROP_PAD_GROUP_ID:
502       if (pad->sq) {
503         GstMultiQueue *mqueue = g_weak_ref_get (&pad->sq->mqueue);
504
505         if (mqueue)
506           GST_OBJECT_LOCK (mqueue);
507
508         pad->sq->groupid = g_value_get_uint (value);
509
510         if (mqueue) {
511           GST_OBJECT_UNLOCK (mqueue);
512           gst_object_unref (mqueue);
513         }
514       }
515       break;
516     default:
517       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
518       break;
519   }
520 }
521
522 static void
523 gst_multiqueue_pad_finalize (GObject * object)
524 {
525   GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
526
527   if (pad->sq)
528     gst_single_queue_unref (pad->sq);
529
530   G_OBJECT_CLASS (gst_multiqueue_pad_parent_class)->finalize (object);
531 }
532
533 static void
534 gst_multiqueue_pad_class_init (GstMultiQueuePadClass * klass)
535 {
536   GObjectClass *gobject_class = (GObjectClass *) klass;
537
538   gobject_class->set_property = gst_multiqueue_pad_set_property;
539   gobject_class->get_property = gst_multiqueue_pad_get_property;
540   gobject_class->finalize = gst_multiqueue_pad_finalize;
541
542   /**
543    * GstMultiQueuePad:group-id:
544    *
545    * Group to which this pad belongs.
546    *
547    * Since: 1.10
548    */
549   g_object_class_install_property (gobject_class, PROP_PAD_GROUP_ID,
550       g_param_spec_uint ("group-id", "Group ID",
551           "Group to which this pad belongs", 0, G_MAXUINT32,
552           DEFAULT_PAD_GROUP_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
553
554   /**
555    * GstMultiQueuePad:current-level-buffers:
556    *
557    * The corresponding queue's current level of buffers.
558    *
559    * Since: 1.18
560    */
561   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
562       g_param_spec_uint ("current-level-buffers", "Current level buffers",
563           "Current level buffers", 0, G_MAXUINT32,
564           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
565
566   /**
567    * GstMultiQueuePad:current-level-bytes:
568    *
569    * The corresponding queue's current level of bytes.
570    *
571    * Since: 1.18
572    */
573   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
574       g_param_spec_uint ("current-level-bytes", "Current level bytes",
575           "Current level bytes", 0, G_MAXUINT32,
576           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
577
578   /**
579    * GstMultiQueuePad:current-level-time:
580    *
581    * The corresponding queue's current level of time.
582    *
583    * Since: 1.18
584    */
585   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
586       g_param_spec_uint64 ("current-level-time", "Current level time",
587           "Current level time", 0, G_MAXUINT64,
588           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
589 }
590
591 static void
592 gst_multiqueue_pad_init (GstMultiQueuePad * pad)
593 {
594
595 }
596
597
598 /* Convenience function */
599 static inline GstClockTimeDiff
600 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
601 {
602   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
603
604   if (GST_CLOCK_TIME_IS_VALID (val)) {
605     gboolean sign =
606         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
607     if (sign > 0)
608       res = val;
609     else if (sign < 0)
610       res = -val;
611   }
612   return res;
613 }
614
615 static void gst_multi_queue_finalize (GObject * object);
616 static void gst_multi_queue_set_property (GObject * object,
617     guint prop_id, const GValue * value, GParamSpec * pspec);
618 static void gst_multi_queue_get_property (GObject * object,
619     guint prop_id, GValue * value, GParamSpec * pspec);
620
621 static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
622     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
623 static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
624 static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
625     element, GstStateChange transition);
626
627 static void gst_multi_queue_loop (GstPad * pad);
628
629 #define _do_init \
630   GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
631 #define gst_multi_queue_parent_class parent_class
632 G_DEFINE_TYPE_WITH_CODE (GstMultiQueue, gst_multi_queue, GST_TYPE_ELEMENT,
633     _do_init);
634 GST_ELEMENT_REGISTER_DEFINE (multiqueue, "multiqueue", GST_RANK_NONE,
635     GST_TYPE_MULTI_QUEUE);
636
637 static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
638
639 static void
640 gst_multi_queue_class_init (GstMultiQueueClass * klass)
641 {
642   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
643   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
644
645   gobject_class->set_property = gst_multi_queue_set_property;
646   gobject_class->get_property = gst_multi_queue_get_property;
647
648   /* SIGNALS */
649
650   /**
651    * GstMultiQueue::underrun:
652    * @multiqueue: the multiqueue instance
653    *
654    * This signal is emitted from the streaming thread when there is
655    * no data in any of the queues inside the multiqueue instance (underrun).
656    *
657    * This indicates either starvation or EOS from the upstream data sources.
658    */
659   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
660       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
661       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
662       NULL, G_TYPE_NONE, 0);
663
664   /**
665    * GstMultiQueue::overrun:
666    * @multiqueue: the multiqueue instance
667    *
668    * Reports that one of the queues in the multiqueue is full (overrun).
669    * A queue is full if the total amount of data inside it (num-buffers, time,
670    * size) is higher than the boundary values which can be set through the
671    * GObject properties.
672    *
673    * This can be used as an indicator of pre-roll.
674    */
675   gst_multi_queue_signals[SIGNAL_OVERRUN] =
676       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
677       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
678       NULL, G_TYPE_NONE, 0);
679
680   /* PROPERTIES */
681
682   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
683       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
684           "Max. amount of data in the queue (bytes, 0=disable)",
685           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
686           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
687           G_PARAM_STATIC_STRINGS));
688   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
689       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
690           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
691           DEFAULT_MAX_SIZE_BUFFERS,
692           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
693           G_PARAM_STATIC_STRINGS));
694   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
695       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
696           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
697           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
698           G_PARAM_STATIC_STRINGS));
699
700   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
701       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
702           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
703           " (NOT IMPLEMENTED)",
704           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
705           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
706   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
707       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
708           "Amount of buffers the queues can grow if one of them is empty (0=disable)"
709           " (NOT IMPLEMENTED)",
710           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
711           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
712   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
713       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
714           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
715           " (NOT IMPLEMENTED)",
716           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
717           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
718
719   /**
720    * GstMultiQueue:use-buffering:
721    *
722    * Enable the buffering option in multiqueue so that BUFFERING messages are
723    * emitted based on low-/high-percent thresholds.
724    */
725   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
726       g_param_spec_boolean ("use-buffering", "Use buffering",
727           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds "
728           "(0% = low-watermark, 100% = high-watermark)",
729           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
730           G_PARAM_STATIC_STRINGS));
731   /**
732    * GstMultiQueue:low-percent:
733    *
734    * Low threshold percent for buffering to start.
735    */
736   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
737       g_param_spec_int ("low-percent", "Low percent",
738           "Low threshold for buffering to start. Only used if use-buffering is True "
739           "(Deprecated: use low-watermark instead)",
740           0, 100, DEFAULT_LOW_WATERMARK * 100,
741           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
742   /**
743    * GstMultiQueue:high-percent:
744    *
745    * High threshold percent for buffering to finish.
746    */
747   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
748       g_param_spec_int ("high-percent", "High percent",
749           "High threshold for buffering to finish. Only used if use-buffering is True "
750           "(Deprecated: use high-watermark instead)",
751           0, 100, DEFAULT_HIGH_WATERMARK * 100,
752           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
753   /**
754    * GstMultiQueue:low-watermark:
755    *
756    * Low threshold watermark for buffering to start.
757    *
758    * Since: 1.10
759    */
760   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
761       g_param_spec_double ("low-watermark", "Low watermark",
762           "Low threshold for buffering to start. Only used if use-buffering is True",
763           0.0, 1.0, DEFAULT_LOW_WATERMARK,
764           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
765   /**
766    * GstMultiQueue:high-watermark:
767    *
768    * High threshold watermark for buffering to finish.
769    *
770    * Since: 1.10
771    */
772   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
773       g_param_spec_double ("high-watermark", "High watermark",
774           "High threshold for buffering to finish. Only used if use-buffering is True",
775           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
776           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
777
778   /**
779    * GstMultiQueue:sync-by-running-time:
780    *
781    * If enabled multiqueue will synchronize deactivated or not-linked streams
782    * to the activated and linked streams by taking the running time.
783    * Otherwise multiqueue will synchronize the deactivated or not-linked
784    * streams by keeping the order in which buffers and events arrived compared
785    * to active and linked streams.
786    */
787   g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
788       g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
789           "Synchronize deactivated or not-linked streams by running time",
790           DEFAULT_SYNC_BY_RUNNING_TIME,
791           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
792
793   g_object_class_install_property (gobject_class, PROP_USE_INTERLEAVE,
794       g_param_spec_boolean ("use-interleave", "Use interleave",
795           "Adjust time limits based on input interleave",
796           DEFAULT_USE_INTERLEAVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
797
798   g_object_class_install_property (gobject_class, PROP_UNLINKED_CACHE_TIME,
799       g_param_spec_uint64 ("unlinked-cache-time", "Unlinked cache time (ns)",
800           "Extra buffering in time for unlinked streams (if 'sync-by-running-time')",
801           0, G_MAXUINT64, DEFAULT_UNLINKED_CACHE_TIME,
802           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
803           G_PARAM_STATIC_STRINGS));
804
805   g_object_class_install_property (gobject_class, PROP_MINIMUM_INTERLEAVE,
806       g_param_spec_uint64 ("min-interleave-time", "Minimum interleave time",
807           "Minimum extra buffering for deinterleaving (size of the queues) when use-interleave=true",
808           0, G_MAXUINT64, DEFAULT_MINIMUM_INTERLEAVE,
809           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
810           G_PARAM_STATIC_STRINGS));
811
812   /**
813    * GstMultiQueue:stats:
814    *
815    * Various #GstMultiQueue statistics. This property returns a #GstStructure
816    * with name "application/x-gst-multi-queue-stats" with the following fields:
817    *
818    * - "queues" GST_TYPE_ARRAY    Contains one GstStructure named "queue_%d"
819    *   (where \%d is the queue's ID) per internal queue:
820    *   - "buffers" G_TYPE_UINT    The queue's current level of buffers
821    *   - "bytes" G_TYPE_UINT    The queue's current level of bytes
822    *   - "time" G_TYPE_UINT64    The queue's current level of time
823    *
824    * Since: 1.18
825    */
826   g_object_class_install_property (gobject_class, PROP_STATS,
827       g_param_spec_boxed ("stats", "Stats",
828           "Multiqueue Statistics",
829           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
830
831   gobject_class->finalize = gst_multi_queue_finalize;
832
833   gst_element_class_set_static_metadata (gstelement_class,
834       "MultiQueue",
835       "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
836   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
837       &sinktemplate, GST_TYPE_MULTIQUEUE_PAD);
838   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
839       &srctemplate, GST_TYPE_MULTIQUEUE_PAD);
840
841   gstelement_class->request_new_pad =
842       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
843   gstelement_class->release_pad =
844       GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
845   gstelement_class->change_state =
846       GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
847
848   gst_type_mark_as_plugin_api (GST_TYPE_MULTIQUEUE_PAD, 0);
849 }
850
851 static void
852 gst_multi_queue_init (GstMultiQueue * mqueue)
853 {
854   mqueue->nbqueues = 0;
855   mqueue->queues = NULL;
856
857   mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
858   mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
859   mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
860
861   mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
862   mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
863   mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
864
865   mqueue->use_buffering = DEFAULT_USE_BUFFERING;
866   mqueue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
867   mqueue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
868
869   mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
870   mqueue->use_interleave = DEFAULT_USE_INTERLEAVE;
871   mqueue->min_interleave_time = DEFAULT_MINIMUM_INTERLEAVE;
872   mqueue->unlinked_cache_time = DEFAULT_UNLINKED_CACHE_TIME;
873
874   mqueue->counter = 1;
875   mqueue->highid = -1;
876   mqueue->high_time = GST_CLOCK_STIME_NONE;
877
878   g_mutex_init (&mqueue->qlock);
879   g_mutex_init (&mqueue->buffering_post_lock);
880 }
881
882 static void
883 gst_multi_queue_finalize (GObject * object)
884 {
885   GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
886
887   g_list_free_full (mqueue->queues, (GDestroyNotify) gst_single_queue_unref);
888   mqueue->queues = NULL;
889   mqueue->queues_cookie++;
890
891   /* free/unref instance data */
892   g_mutex_clear (&mqueue->qlock);
893   g_mutex_clear (&mqueue->buffering_post_lock);
894
895   G_OBJECT_CLASS (parent_class)->finalize (object);
896 }
897
898 #define SET_CHILD_PROPERTY(mq,format) G_STMT_START {            \
899     GList * tmp = mq->queues;                                   \
900     while (tmp) {                                               \
901       GstSingleQueue *q = (GstSingleQueue*)tmp->data;           \
902       q->max_size.format = mq->max_size.format;                 \
903       update_buffering (mq, q);                                 \
904       gst_data_queue_limits_changed (q->queue);                 \
905       tmp = g_list_next(tmp);                                   \
906     };                                                          \
907 } G_STMT_END
908
909 static void
910 gst_multi_queue_set_property (GObject * object, guint prop_id,
911     const GValue * value, GParamSpec * pspec)
912 {
913   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
914
915   switch (prop_id) {
916     case PROP_MAX_SIZE_BYTES:
917       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
918       mq->max_size.bytes = g_value_get_uint (value);
919       SET_CHILD_PROPERTY (mq, bytes);
920       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
921       gst_multi_queue_post_buffering (mq);
922       break;
923     case PROP_MAX_SIZE_BUFFERS:
924     {
925       GList *tmp;
926       gint new_size = g_value_get_uint (value);
927
928       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
929
930       mq->max_size.visible = new_size;
931
932       tmp = mq->queues;
933       while (tmp) {
934         GstDataQueueSize size;
935         GstSingleQueue *q = (GstSingleQueue *) tmp->data;
936         gst_data_queue_get_level (q->queue, &size);
937
938         GST_DEBUG_OBJECT (mq, "Queue %d: Requested buffers size: %d,"
939             " current: %d, current max %d", q->id, new_size, size.visible,
940             q->max_size.visible);
941
942         /* do not reduce max size below current level if the single queue
943          * has grown because of empty queue */
944         if (new_size == 0) {
945           q->max_size.visible = new_size;
946         } else if (q->max_size.visible == 0) {
947           q->max_size.visible = MAX (new_size, size.visible);
948         } else if (new_size > size.visible) {
949           q->max_size.visible = new_size;
950         }
951         update_buffering (mq, q);
952         gst_data_queue_limits_changed (q->queue);
953         tmp = g_list_next (tmp);
954       }
955
956       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
957       gst_multi_queue_post_buffering (mq);
958
959       break;
960     }
961     case PROP_MAX_SIZE_TIME:
962       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
963       mq->max_size.time = g_value_get_uint64 (value);
964       SET_CHILD_PROPERTY (mq, time);
965       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
966       gst_multi_queue_post_buffering (mq);
967       break;
968     case PROP_EXTRA_SIZE_BYTES:
969       mq->extra_size.bytes = g_value_get_uint (value);
970       break;
971     case PROP_EXTRA_SIZE_BUFFERS:
972       mq->extra_size.visible = g_value_get_uint (value);
973       break;
974     case PROP_EXTRA_SIZE_TIME:
975       mq->extra_size.time = g_value_get_uint64 (value);
976       break;
977     case PROP_USE_BUFFERING:
978       mq->use_buffering = g_value_get_boolean (value);
979       recheck_buffering_status (mq);
980       break;
981     case PROP_LOW_PERCENT:
982       mq->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
983       /* Recheck buffering status - the new low_watermark value might
984        * be above the current fill level. If the old low_watermark one
985        * was below the current level, this means that mq->buffering is
986        * disabled and needs to be re-enabled. */
987       recheck_buffering_status (mq);
988       break;
989     case PROP_HIGH_PERCENT:
990       mq->high_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
991       recheck_buffering_status (mq);
992       break;
993     case PROP_LOW_WATERMARK:
994       mq->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
995       recheck_buffering_status (mq);
996       break;
997     case PROP_HIGH_WATERMARK:
998       mq->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
999       recheck_buffering_status (mq);
1000       break;
1001     case PROP_SYNC_BY_RUNNING_TIME:
1002       mq->sync_by_running_time = g_value_get_boolean (value);
1003       break;
1004     case PROP_USE_INTERLEAVE:
1005       mq->use_interleave = g_value_get_boolean (value);
1006       break;
1007     case PROP_UNLINKED_CACHE_TIME:
1008       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1009       mq->unlinked_cache_time = g_value_get_uint64 (value);
1010       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1011       gst_multi_queue_post_buffering (mq);
1012       break;
1013     case PROP_MINIMUM_INTERLEAVE:
1014       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1015       mq->min_interleave_time = g_value_get_uint64 (value);
1016       if (mq->use_interleave)
1017         calculate_interleave (mq, NULL);
1018       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1019       break;
1020     default:
1021       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1022       break;
1023   }
1024 }
1025
1026 /* Called with mutex held */
1027 static GstStructure *
1028 gst_multi_queue_get_stats (GstMultiQueue * mq)
1029 {
1030   GstStructure *ret =
1031       gst_structure_new_empty ("application/x-gst-multi-queue-stats");
1032   GList *tmp;
1033   GstSingleQueue *sq;
1034
1035   if (mq->queues != NULL) {
1036     GValue queues = G_VALUE_INIT;
1037     GValue v = G_VALUE_INIT;
1038
1039     g_value_init (&queues, GST_TYPE_ARRAY);
1040
1041     for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1042       GstDataQueueSize level;
1043       GstStructure *s;
1044       gchar *id;
1045       g_value_init (&v, GST_TYPE_STRUCTURE);
1046
1047       sq = (GstSingleQueue *) tmp->data;
1048       gst_data_queue_get_level (sq->queue, &level);
1049       id = g_strdup_printf ("queue_%d", sq->id);
1050       s = gst_structure_new (id,
1051           "buffers", G_TYPE_UINT, level.visible,
1052           "bytes", G_TYPE_UINT, level.bytes,
1053           "time", G_TYPE_UINT64, sq->cur_time, NULL);
1054       g_value_take_boxed (&v, s);
1055       gst_value_array_append_and_take_value (&queues, &v);
1056       g_free (id);
1057     }
1058     gst_structure_take_value (ret, "queues", &queues);
1059   }
1060
1061   return ret;
1062 }
1063
1064 static void
1065 gst_multi_queue_get_property (GObject * object, guint prop_id,
1066     GValue * value, GParamSpec * pspec)
1067 {
1068   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
1069
1070   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1071
1072   switch (prop_id) {
1073     case PROP_EXTRA_SIZE_BYTES:
1074       g_value_set_uint (value, mq->extra_size.bytes);
1075       break;
1076     case PROP_EXTRA_SIZE_BUFFERS:
1077       g_value_set_uint (value, mq->extra_size.visible);
1078       break;
1079     case PROP_EXTRA_SIZE_TIME:
1080       g_value_set_uint64 (value, mq->extra_size.time);
1081       break;
1082     case PROP_MAX_SIZE_BYTES:
1083       g_value_set_uint (value, mq->max_size.bytes);
1084       break;
1085     case PROP_MAX_SIZE_BUFFERS:
1086       g_value_set_uint (value, mq->max_size.visible);
1087       break;
1088     case PROP_MAX_SIZE_TIME:
1089       g_value_set_uint64 (value, mq->max_size.time);
1090       break;
1091     case PROP_USE_BUFFERING:
1092       g_value_set_boolean (value, mq->use_buffering);
1093       break;
1094     case PROP_LOW_PERCENT:
1095       g_value_set_int (value, mq->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
1096       break;
1097     case PROP_HIGH_PERCENT:
1098       g_value_set_int (value, mq->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
1099       break;
1100     case PROP_LOW_WATERMARK:
1101       g_value_set_double (value, mq->low_watermark /
1102           (gdouble) MAX_BUFFERING_LEVEL);
1103       break;
1104     case PROP_HIGH_WATERMARK:
1105       g_value_set_double (value, mq->high_watermark /
1106           (gdouble) MAX_BUFFERING_LEVEL);
1107       break;
1108     case PROP_SYNC_BY_RUNNING_TIME:
1109       g_value_set_boolean (value, mq->sync_by_running_time);
1110       break;
1111     case PROP_USE_INTERLEAVE:
1112       g_value_set_boolean (value, mq->use_interleave);
1113       break;
1114     case PROP_UNLINKED_CACHE_TIME:
1115       g_value_set_uint64 (value, mq->unlinked_cache_time);
1116       break;
1117     case PROP_MINIMUM_INTERLEAVE:
1118       g_value_set_uint64 (value, mq->min_interleave_time);
1119       break;
1120     case PROP_STATS:
1121       g_value_take_boxed (value, gst_multi_queue_get_stats (mq));
1122       break;
1123     default:
1124       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1125       break;
1126   }
1127
1128   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1129 }
1130
1131 static GstIterator *
1132 gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
1133 {
1134   GstIterator *it = NULL;
1135   GstPad *opad, *sinkpad, *srcpad;
1136   GstSingleQueue *squeue;
1137   GstMultiQueue *mq = GST_MULTI_QUEUE (parent);
1138   GValue val = { 0, };
1139
1140   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1141   squeue = GST_MULTIQUEUE_PAD (pad)->sq;
1142   if (!squeue)
1143     goto out;
1144
1145   srcpad = g_weak_ref_get (&squeue->srcpad);
1146   sinkpad = g_weak_ref_get (&squeue->sinkpad);
1147   if (sinkpad == pad && srcpad) {
1148     opad = srcpad;
1149     gst_clear_object (&sinkpad);
1150
1151   } else if (srcpad == pad && sinkpad) {
1152     opad = sinkpad;
1153     gst_clear_object (&srcpad);
1154
1155   } else {
1156     gst_clear_object (&srcpad);
1157     gst_clear_object (&sinkpad);
1158     goto out;
1159   }
1160
1161   g_value_init (&val, GST_TYPE_PAD);
1162   g_value_set_object (&val, opad);
1163   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1164   g_value_unset (&val);
1165
1166   gst_object_unref (opad);
1167
1168 out:
1169   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1170
1171   return it;
1172 }
1173
1174
1175 /*
1176  * GstElement methods
1177  */
1178
1179 static GstPad *
1180 gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
1181     const gchar * name, const GstCaps * caps)
1182 {
1183   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
1184   GstSingleQueue *squeue;
1185   GstPad *new_pad;
1186   guint temp_id = -1;
1187
1188   if (name) {
1189     sscanf (name + 4, "_%u", &temp_id);
1190     GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
1191   }
1192
1193   /* Create a new single queue, add the sink and source pad and return the sink pad */
1194   squeue = gst_single_queue_new (mqueue, temp_id);
1195
1196   new_pad = squeue ? g_weak_ref_get (&squeue->sinkpad) : NULL;
1197   /* request pad assumes the element is owning the ref of the pad it returns */
1198   if (new_pad)
1199     gst_object_unref (new_pad);
1200
1201   GST_DEBUG_OBJECT (mqueue, "Returning pad %" GST_PTR_FORMAT, new_pad);
1202
1203   return new_pad;
1204 }
1205
1206 static void
1207 gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
1208 {
1209   GstPad *sinkpad = NULL, *srcpad = NULL;
1210   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
1211   GstSingleQueue *sq = NULL;
1212   GList *tmp;
1213
1214   GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1215
1216   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1217   /* Find which single queue it belongs to, knowing that it should be a sinkpad */
1218   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
1219     sq = (GstSingleQueue *) tmp->data;
1220     sinkpad = g_weak_ref_get (&sq->sinkpad);
1221
1222     if (sinkpad == pad) {
1223       srcpad = g_weak_ref_get (&sq->srcpad);
1224       break;
1225     }
1226
1227     gst_object_unref (sinkpad);
1228   }
1229
1230   if (!tmp) {
1231     gst_clear_object (&sinkpad);
1232     gst_clear_object (&srcpad);
1233     GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
1234     GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1235     return;
1236   }
1237
1238   /* FIXME: The removal of the singlequeue should probably not happen until it
1239    * finishes draining */
1240
1241   /* remove it from the list */
1242   mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
1243   mqueue->queues_cookie++;
1244
1245   /* FIXME : recompute next-non-linked */
1246   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1247
1248   /* delete SingleQueue */
1249   gst_data_queue_set_flushing (sq->queue, TRUE);
1250
1251   gst_pad_set_active (srcpad, FALSE);
1252   gst_pad_set_active (sinkpad, FALSE);
1253   gst_element_remove_pad (element, srcpad);
1254   gst_element_remove_pad (element, sinkpad);
1255   gst_object_unref (srcpad);
1256   gst_object_unref (sinkpad);
1257 }
1258
1259 static GstStateChangeReturn
1260 gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
1261 {
1262   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
1263   GstSingleQueue *sq = NULL;
1264   GstStateChangeReturn result;
1265
1266   switch (transition) {
1267     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1268       GList *tmp;
1269
1270       /* Set all pads to non-flushing */
1271       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1272       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
1273         sq = (GstSingleQueue *) tmp->data;
1274         sq->flushing = FALSE;
1275       }
1276
1277       /* the visible limit might not have been set on single queues that have grown because of other queueus were empty */
1278       SET_CHILD_PROPERTY (mqueue, visible);
1279
1280       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1281       gst_multi_queue_post_buffering (mqueue);
1282
1283       break;
1284     }
1285     case GST_STATE_CHANGE_PAUSED_TO_READY:{
1286       GList *tmp;
1287
1288       /* Un-wait all waiting pads */
1289       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1290       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
1291         sq = (GstSingleQueue *) tmp->data;
1292         sq->flushing = TRUE;
1293         g_cond_signal (&sq->turn);
1294
1295         sq->last_query = FALSE;
1296         g_cond_signal (&sq->query_handled);
1297       }
1298       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1299       break;
1300     }
1301     default:
1302       break;
1303   }
1304
1305   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1306
1307   switch (transition) {
1308     default:
1309       break;
1310   }
1311
1312   return result;
1313 }
1314
1315 static gboolean
1316 gst_single_queue_start (GstMultiQueue * mq, GstSingleQueue * sq)
1317 {
1318   gboolean res = FALSE;
1319   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
1320
1321   GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
1322
1323   if (srcpad) {
1324     res = gst_pad_start_task (srcpad,
1325         (GstTaskFunction) gst_multi_queue_loop, srcpad, NULL);
1326     gst_object_unref (srcpad);
1327   }
1328
1329   return res;
1330 }
1331
1332 static gboolean
1333 gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
1334 {
1335   gboolean result = FALSE;
1336   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
1337
1338   GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
1339   if (srcpad) {
1340     result = gst_pad_pause_task (srcpad);
1341     gst_object_unref (srcpad);
1342   }
1343
1344   sq->sink_tainted = sq->src_tainted = TRUE;
1345   return result;
1346 }
1347
1348 static gboolean
1349 gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq)
1350 {
1351   gboolean result = FALSE;
1352   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
1353
1354   GST_LOG_OBJECT (mq, "SingleQueue %d : stopping task", sq->id);
1355   if (srcpad) {
1356     result = gst_pad_stop_task (srcpad);
1357     gst_object_unref (srcpad);
1358   }
1359   sq->sink_tainted = sq->src_tainted = TRUE;
1360   return result;
1361 }
1362
1363 static void
1364 gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
1365     gboolean full)
1366 {
1367   GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
1368       sq->id);
1369
1370   if (flush) {
1371     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1372     sq->srcresult = GST_FLOW_FLUSHING;
1373     gst_data_queue_set_flushing (sq->queue, TRUE);
1374
1375     sq->flushing = TRUE;
1376
1377     /* wake up non-linked task */
1378     GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
1379         sq->id);
1380     g_cond_signal (&sq->turn);
1381     sq->last_query = FALSE;
1382     g_cond_signal (&sq->query_handled);
1383     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1384   } else {
1385     gst_single_queue_flush_queue (sq, full);
1386
1387     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1388     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
1389     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1390     sq->has_src_segment = FALSE;
1391     /* All pads start off OK for a smooth kick-off */
1392     sq->srcresult = GST_FLOW_OK;
1393     sq->pushed = FALSE;
1394     sq->cur_time = 0;
1395     sq->max_size.visible = mq->max_size.visible;
1396     sq->is_eos = FALSE;
1397     sq->is_segment_done = FALSE;
1398     sq->nextid = 0;
1399     sq->oldid = 0;
1400     sq->last_oldid = G_MAXUINT32;
1401     sq->next_time = GST_CLOCK_STIME_NONE;
1402     sq->last_time = GST_CLOCK_STIME_NONE;
1403     sq->cached_sinktime = GST_CLOCK_STIME_NONE;
1404     sq->group_high_time = GST_CLOCK_STIME_NONE;
1405     gst_data_queue_set_flushing (sq->queue, FALSE);
1406
1407     /* We will become active again on the next buffer/gap */
1408     sq->active = FALSE;
1409
1410     /* Reset high time to be recomputed next */
1411     mq->high_time = GST_CLOCK_STIME_NONE;
1412
1413     sq->flushing = FALSE;
1414     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1415   }
1416 }
1417
1418 /* WITH LOCK TAKEN */
1419 static gint
1420 get_buffering_level (GstMultiQueue * mq, GstSingleQueue * sq)
1421 {
1422   GstDataQueueSize size;
1423   gint buffering_level, tmp;
1424
1425   gst_data_queue_get_level (sq->queue, &size);
1426
1427   GST_DEBUG_OBJECT (mq,
1428       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
1429       G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
1430       size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
1431
1432   /* get bytes and time buffer levels and take the max */
1433   if (sq->is_eos || sq->is_segment_done || sq->srcresult == GST_FLOW_NOT_LINKED
1434       || sq->is_sparse) {
1435     buffering_level = MAX_BUFFERING_LEVEL;
1436   } else {
1437     buffering_level = 0;
1438     if (sq->max_size.time > 0) {
1439       tmp =
1440           gst_util_uint64_scale (sq->cur_time,
1441           MAX_BUFFERING_LEVEL, sq->max_size.time);
1442       buffering_level = MAX (buffering_level, tmp);
1443     }
1444     if (sq->max_size.bytes > 0) {
1445       tmp =
1446           gst_util_uint64_scale_int (size.bytes,
1447           MAX_BUFFERING_LEVEL, sq->max_size.bytes);
1448       buffering_level = MAX (buffering_level, tmp);
1449     }
1450   }
1451
1452   return buffering_level;
1453 }
1454
1455 /* WITH LOCK TAKEN */
1456 static void
1457 update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
1458 {
1459   gint buffering_level, percent;
1460
1461   /* nothing to dowhen we are not in buffering mode */
1462   if (!mq->use_buffering)
1463     return;
1464
1465   buffering_level = get_buffering_level (mq, sq);
1466
1467   /* scale so that if buffering_level equals the high watermark,
1468    * the percentage is 100% */
1469   percent = gst_util_uint64_scale (buffering_level, 100, mq->high_watermark);
1470   /* clip */
1471   if (percent > 100)
1472     percent = 100;
1473
1474   if (mq->buffering) {
1475     if (buffering_level >= mq->high_watermark) {
1476       mq->buffering = FALSE;
1477     }
1478     /* make sure it increases */
1479     percent = MAX (mq->buffering_percent, percent);
1480
1481     SET_PERCENT (mq, percent);
1482   } else {
1483     GList *iter;
1484     gboolean is_buffering = TRUE;
1485
1486     for (iter = mq->queues; iter; iter = g_list_next (iter)) {
1487       GstSingleQueue *oq = (GstSingleQueue *) iter->data;
1488
1489       if (get_buffering_level (mq, oq) >= mq->high_watermark) {
1490         is_buffering = FALSE;
1491
1492         break;
1493       }
1494     }
1495
1496     if (is_buffering && buffering_level < mq->low_watermark) {
1497       mq->buffering = TRUE;
1498       SET_PERCENT (mq, percent);
1499     }
1500   }
1501 }
1502
1503 static void
1504 gst_multi_queue_post_buffering (GstMultiQueue * mq)
1505 {
1506   GstMessage *msg = NULL;
1507
1508   g_mutex_lock (&mq->buffering_post_lock);
1509   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1510   if (mq->buffering_percent_changed) {
1511     gint percent = mq->buffering_percent;
1512
1513     mq->buffering_percent_changed = FALSE;
1514
1515     GST_DEBUG_OBJECT (mq, "Going to post buffering: %d%%", percent);
1516     msg = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
1517   }
1518   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1519
1520   if (msg != NULL)
1521     gst_element_post_message (GST_ELEMENT_CAST (mq), msg);
1522
1523   g_mutex_unlock (&mq->buffering_post_lock);
1524 }
1525
1526 static void
1527 recheck_buffering_status (GstMultiQueue * mq)
1528 {
1529   if (!mq->use_buffering && mq->buffering) {
1530     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1531     mq->buffering = FALSE;
1532     GST_DEBUG_OBJECT (mq,
1533         "Buffering property disabled, but queue was still buffering; "
1534         "setting buffering percentage to 100%%");
1535     SET_PERCENT (mq, 100);
1536     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1537   }
1538
1539   if (mq->use_buffering) {
1540     GList *tmp;
1541     gint old_perc;
1542
1543     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1544
1545     /* force buffering percentage to be recalculated */
1546     old_perc = mq->buffering_percent;
1547     mq->buffering_percent = 0;
1548
1549     tmp = mq->queues;
1550     while (tmp) {
1551       GstSingleQueue *q = (GstSingleQueue *) tmp->data;
1552       update_buffering (mq, q);
1553       gst_data_queue_limits_changed (q->queue);
1554       tmp = g_list_next (tmp);
1555     }
1556
1557     GST_DEBUG_OBJECT (mq,
1558         "Recalculated buffering percentage: old: %d%% new: %d%%",
1559         old_perc, mq->buffering_percent);
1560
1561     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1562   }
1563
1564   gst_multi_queue_post_buffering (mq);
1565 }
1566
1567 static void
1568 calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
1569 {
1570   GstClockTimeDiff low, high;
1571   GstClockTime interleave, other_interleave = 0;
1572   GList *tmp;
1573
1574   low = high = GST_CLOCK_STIME_NONE;
1575   interleave = mq->interleave;
1576   /* Go over all single queues and calculate lowest/highest value */
1577   for (tmp = mq->queues; tmp; tmp = tmp->next) {
1578     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
1579     /* Ignore sparse streams for interleave calculation */
1580     if (oq->is_sparse)
1581       continue;
1582     /* If a stream is not active yet (hasn't received any buffers), set
1583      * a maximum interleave to allow it to receive more data */
1584     if (!oq->active) {
1585       GST_LOG_OBJECT (mq,
1586           "queue %d is not active yet, forcing interleave to 5s", oq->id);
1587       mq->interleave = 5 * GST_SECOND;
1588       /* Update max-size time */
1589       mq->max_size.time = mq->interleave;
1590       SET_CHILD_PROPERTY (mq, time);
1591       goto beach;
1592     }
1593
1594     /* Calculate within each streaming thread */
1595     if (sq && sq->thread != oq->thread) {
1596       if (oq->interleave > other_interleave)
1597         other_interleave = oq->interleave;
1598       continue;
1599     }
1600
1601     if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime)) {
1602       if (low == GST_CLOCK_STIME_NONE || oq->cached_sinktime < low)
1603         low = oq->cached_sinktime;
1604       if (high == GST_CLOCK_STIME_NONE || oq->cached_sinktime > high)
1605         high = oq->cached_sinktime;
1606     }
1607     GST_LOG_OBJECT (mq,
1608         "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
1609         " high:%" GST_STIME_FORMAT, oq->id,
1610         GST_STIME_ARGS (oq->cached_sinktime), GST_STIME_ARGS (low),
1611         GST_STIME_ARGS (high));
1612   }
1613
1614   if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
1615     interleave = high - low;
1616     /* Padding of interleave and minimum value */
1617     interleave = (150 * interleave / 100) + mq->min_interleave_time;
1618     if (sq)
1619       sq->interleave = interleave;
1620
1621     interleave = MAX (interleave, other_interleave);
1622
1623     /* Update the stored interleave if:
1624      * * No data has arrived yet (high == low)
1625      * * Or it went higher
1626      * * Or it went lower and we've gone past the previous interleave needed */
1627     if (high == low || interleave > mq->interleave ||
1628         ((mq->last_interleave_update + (2 * MIN (GST_SECOND,
1629                         mq->interleave)) < low)
1630             && interleave < (mq->interleave * 3 / 4))) {
1631       /* Update the interleave */
1632       mq->interleave = interleave;
1633       mq->last_interleave_update = high;
1634       /* Update max-size time */
1635       mq->max_size.time = mq->interleave;
1636       SET_CHILD_PROPERTY (mq, time);
1637     }
1638   }
1639
1640 beach:
1641   GST_DEBUG_OBJECT (mq,
1642       "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
1643       GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
1644       " last_interleave_update:%" GST_STIME_FORMAT, GST_STIME_ARGS (low),
1645       GST_STIME_ARGS (high), GST_TIME_ARGS (interleave),
1646       GST_TIME_ARGS (mq->interleave),
1647       GST_STIME_ARGS (mq->last_interleave_update));
1648 }
1649
1650
1651 /* calculate the diff between running time on the sink and src of the queue.
1652  * This is the total amount of time in the queue.
1653  * WITH LOCK TAKEN */
1654 static void
1655 update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
1656 {
1657   GstClockTimeDiff sink_time, src_time;
1658
1659   if (sq->sink_tainted) {
1660     sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment,
1661         sq->sink_segment.position);
1662
1663     GST_DEBUG_OBJECT (mq,
1664         "queue %d sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%"
1665         GST_STIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position),
1666         GST_STIME_ARGS (sink_time));
1667
1668     if (G_UNLIKELY (sq->last_time == GST_CLOCK_STIME_NONE)) {
1669       /* If the single queue still doesn't have a last_time set, this means
1670        * that nothing has been pushed out yet.
1671        * In order for the high_time computation to be as efficient as possible,
1672        * we set the last_time */
1673       sq->last_time = sink_time;
1674     }
1675     if (G_UNLIKELY (sink_time != GST_CLOCK_STIME_NONE)) {
1676       /* if we have a time, we become untainted and use the time */
1677       sq->sink_tainted = FALSE;
1678       if (mq->use_interleave) {
1679         sq->cached_sinktime = sink_time;
1680         calculate_interleave (mq, sq);
1681       }
1682     }
1683   } else
1684     sink_time = sq->sinktime;
1685
1686   if (sq->src_tainted) {
1687     GstSegment *segment;
1688     gint64 position;
1689
1690     if (sq->has_src_segment) {
1691       segment = &sq->src_segment;
1692       position = sq->src_segment.position;
1693     } else {
1694       /*
1695        * If the src pad had no segment yet, use the sink segment
1696        * to avoid signalling overrun if the received sink segment has a
1697        * a position > max-size-time while the src pad time would be the default=0
1698        *
1699        * This can happen when switching pads on chained/adaptive streams and the
1700        * new chain has a segment with a much larger position
1701        */
1702       segment = &sq->sink_segment;
1703       position = sq->sink_segment.position;
1704     }
1705
1706     src_time = sq->srctime = my_segment_to_running_time (segment, position);
1707     /* if we have a time, we become untainted and use the time */
1708     if (G_UNLIKELY (src_time != GST_CLOCK_STIME_NONE)) {
1709       sq->src_tainted = FALSE;
1710     }
1711   } else
1712     src_time = sq->srctime;
1713
1714   GST_DEBUG_OBJECT (mq,
1715       "queue %d, sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, sq->id,
1716       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
1717
1718   /* This allows for streams with out of order timestamping - sometimes the
1719    * emerging timestamp is later than the arriving one(s) */
1720   if (G_LIKELY (GST_CLOCK_STIME_IS_VALID (sink_time) &&
1721           GST_CLOCK_STIME_IS_VALID (src_time) && sink_time > src_time))
1722     sq->cur_time = sink_time - src_time;
1723   else
1724     sq->cur_time = 0;
1725
1726   /* updating the time level can change the buffering state */
1727   update_buffering (mq, sq);
1728
1729   return;
1730 }
1731
1732 /* take a SEGMENT event and apply the values to segment, updating the time
1733  * level of queue. */
1734 static void
1735 apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1736     GstSegment * segment)
1737 {
1738   gst_event_copy_segment (event, segment);
1739
1740   /* now configure the values, we use these to track timestamps on the
1741    * sinkpad. */
1742   if (segment->format != GST_FORMAT_TIME) {
1743     /* non-time format, pretent the current time segment is closed with a
1744      * 0 start and unknown stop time. */
1745     segment->format = GST_FORMAT_TIME;
1746     segment->start = 0;
1747     segment->stop = -1;
1748     segment->time = 0;
1749   }
1750   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1751
1752   /* Make sure we have a valid initial segment position (and not garbage
1753    * from upstream) */
1754   if (segment->rate > 0.0)
1755     segment->position = segment->start;
1756   else
1757     segment->position = segment->stop;
1758   if (segment == &sq->sink_segment)
1759     sq->sink_tainted = TRUE;
1760   else {
1761     sq->has_src_segment = TRUE;
1762     sq->src_tainted = TRUE;
1763   }
1764
1765   GST_DEBUG_OBJECT (mq,
1766       "queue %d, configured SEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
1767
1768   /* segment can update the time level of the queue */
1769   update_time_level (mq, sq);
1770
1771   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1772   gst_multi_queue_post_buffering (mq);
1773 }
1774
1775 /* take a buffer and update segment, updating the time level of the queue. */
1776 static void
1777 apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
1778     GstClockTime duration, GstSegment * segment)
1779 {
1780   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1781
1782   /* if no timestamp is set, assume it's continuous with the previous
1783    * time */
1784   if (timestamp == GST_CLOCK_TIME_NONE)
1785     timestamp = segment->position;
1786
1787   /* add duration */
1788   if (duration != GST_CLOCK_TIME_NONE)
1789     timestamp += duration;
1790
1791   GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
1792       sq->id, segment == &sq->sink_segment ? "sink" : "src",
1793       GST_TIME_ARGS (timestamp));
1794
1795   segment->position = timestamp;
1796
1797   if (segment == &sq->sink_segment)
1798     sq->sink_tainted = TRUE;
1799   else
1800     sq->src_tainted = TRUE;
1801
1802   /* calc diff with other end */
1803   update_time_level (mq, sq);
1804   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1805   gst_multi_queue_post_buffering (mq);
1806 }
1807
1808 static void
1809 apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1810     GstSegment * segment)
1811 {
1812   GstClockTime timestamp;
1813   GstClockTime duration;
1814
1815   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1816
1817   gst_event_parse_gap (event, &timestamp, &duration);
1818
1819   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1820
1821     if (GST_CLOCK_TIME_IS_VALID (duration)) {
1822       timestamp += duration;
1823     }
1824
1825     GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
1826         sq->id, segment == &sq->sink_segment ? "sink" : "src",
1827         GST_TIME_ARGS (timestamp));
1828
1829     segment->position = timestamp;
1830
1831     if (segment == &sq->sink_segment)
1832       sq->sink_tainted = TRUE;
1833     else
1834       sq->src_tainted = TRUE;
1835
1836     /* calc diff with other end */
1837     update_time_level (mq, sq);
1838   }
1839
1840   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1841   gst_multi_queue_post_buffering (mq);
1842 }
1843
1844 static GstClockTimeDiff
1845 get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
1846 {
1847   GstClockTimeDiff time = GST_CLOCK_STIME_NONE;
1848
1849   if (GST_IS_BUFFER (object)) {
1850     GstBuffer *buf = GST_BUFFER_CAST (object);
1851     GstClockTime btime = GST_BUFFER_DTS_OR_PTS (buf);
1852
1853     if (GST_CLOCK_TIME_IS_VALID (btime)) {
1854       if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1855         btime += GST_BUFFER_DURATION (buf);
1856       time = my_segment_to_running_time (segment, btime);
1857     }
1858   } else if (GST_IS_BUFFER_LIST (object)) {
1859     GstBufferList *list = GST_BUFFER_LIST_CAST (object);
1860     gint i, n;
1861     GstBuffer *buf;
1862
1863     n = gst_buffer_list_length (list);
1864     for (i = 0; i < n; i++) {
1865       GstClockTime btime;
1866       buf = gst_buffer_list_get (list, i);
1867       btime = GST_BUFFER_DTS_OR_PTS (buf);
1868       if (GST_CLOCK_TIME_IS_VALID (btime)) {
1869         if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1870           btime += GST_BUFFER_DURATION (buf);
1871         time = my_segment_to_running_time (segment, btime);
1872         if (!end)
1873           goto done;
1874       } else if (!end) {
1875         goto done;
1876       }
1877     }
1878   } else if (GST_IS_EVENT (object)) {
1879     GstEvent *event = GST_EVENT_CAST (object);
1880
1881     /* For newsegment events return the running time of the start position */
1882     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1883       const GstSegment *new_segment;
1884
1885       gst_event_parse_segment (event, &new_segment);
1886       if (new_segment->format == GST_FORMAT_TIME) {
1887         time =
1888             my_segment_to_running_time ((GstSegment *) new_segment,
1889             new_segment->start);
1890       }
1891     } else if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
1892       GstClockTime ts, dur;
1893       gst_event_parse_gap (event, &ts, &dur);
1894       if (GST_CLOCK_TIME_IS_VALID (ts)) {
1895         if (GST_CLOCK_TIME_IS_VALID (dur))
1896           ts += dur;
1897         time = my_segment_to_running_time (segment, ts);
1898       }
1899     }
1900   }
1901
1902 done:
1903   return time;
1904 }
1905
1906 static GstFlowReturn
1907 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
1908     GstMiniObject * object, gboolean * allow_drop)
1909 {
1910   GstFlowReturn result = sq->srcresult;
1911   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
1912
1913   if (!srcpad) {
1914     GST_INFO_OBJECT (mq,
1915         "Pushing while corresponding sourcepad has been cleared");
1916     return GST_FLOW_FLUSHING;
1917   }
1918
1919   if (GST_IS_BUFFER (object)) {
1920     GstBuffer *buffer;
1921     GstClockTime timestamp, duration;
1922
1923     buffer = GST_BUFFER_CAST (object);
1924     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1925     duration = GST_BUFFER_DURATION (buffer);
1926
1927     apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
1928
1929     /* Applying the buffer may have made the queue non-full again, unblock it if needed */
1930     gst_data_queue_limits_changed (sq->queue);
1931
1932     if (G_UNLIKELY (*allow_drop)) {
1933       GST_DEBUG_OBJECT (mq,
1934           "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
1935           sq->id, buffer, GST_TIME_ARGS (timestamp));
1936       gst_buffer_unref (buffer);
1937     } else {
1938       GST_DEBUG_OBJECT (mq,
1939           "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
1940           sq->id, buffer, GST_TIME_ARGS (timestamp));
1941       result = gst_pad_push (srcpad, buffer);
1942     }
1943   } else if (GST_IS_EVENT (object)) {
1944     GstEvent *event;
1945
1946     event = GST_EVENT_CAST (object);
1947
1948     switch (GST_EVENT_TYPE (event)) {
1949       case GST_EVENT_SEGMENT_DONE:
1950         *allow_drop = FALSE;
1951         break;
1952       case GST_EVENT_EOS:
1953         result = GST_FLOW_EOS;
1954         if (G_UNLIKELY (*allow_drop))
1955           *allow_drop = FALSE;
1956         break;
1957       case GST_EVENT_STREAM_START:
1958         result = GST_FLOW_OK;
1959         if (G_UNLIKELY (*allow_drop))
1960           *allow_drop = FALSE;
1961         break;
1962       case GST_EVENT_SEGMENT:
1963         apply_segment (mq, sq, event, &sq->src_segment);
1964         /* Applying the segment may have made the queue non-full again, unblock it if needed */
1965         gst_data_queue_limits_changed (sq->queue);
1966         if (G_UNLIKELY (*allow_drop)) {
1967           result = GST_FLOW_OK;
1968           *allow_drop = FALSE;
1969         }
1970         break;
1971       case GST_EVENT_GAP:
1972         apply_gap (mq, sq, event, &sq->src_segment);
1973         /* Applying the gap may have made the queue non-full again, unblock it if needed */
1974         gst_data_queue_limits_changed (sq->queue);
1975         break;
1976       default:
1977         break;
1978     }
1979
1980     if (G_UNLIKELY (*allow_drop)) {
1981       GST_DEBUG_OBJECT (mq,
1982           "SingleQueue %d : Dropping EOS event %p of type %s",
1983           sq->id, event, GST_EVENT_TYPE_NAME (event));
1984       gst_event_unref (event);
1985     } else {
1986       GST_DEBUG_OBJECT (mq,
1987           "SingleQueue %d : Pushing event %p of type %s",
1988           sq->id, event, GST_EVENT_TYPE_NAME (event));
1989
1990       gst_pad_push_event (srcpad, event);
1991     }
1992   } else if (GST_IS_QUERY (object)) {
1993     GstQuery *query;
1994     gboolean res;
1995
1996     query = GST_QUERY_CAST (object);
1997
1998     if (G_UNLIKELY (*allow_drop)) {
1999       GST_DEBUG_OBJECT (mq,
2000           "SingleQueue %d : Dropping EOS query %p", sq->id, query);
2001       gst_query_unref (query);
2002       res = FALSE;
2003     } else {
2004       res = gst_pad_peer_query (srcpad, query);
2005     }
2006
2007     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2008     sq->last_query = res;
2009     sq->last_handled_query = query;
2010     g_cond_signal (&sq->query_handled);
2011     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2012   } else {
2013     g_warning ("Unexpected object in singlequeue %u (refcounting problem?)",
2014         sq->id);
2015   }
2016
2017   gst_object_unref (srcpad);
2018   return result;
2019
2020   /* ERRORS */
2021 }
2022
2023 static GstMiniObject *
2024 gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
2025 {
2026   GstMiniObject *res;
2027
2028   res = item->object;
2029   item->object = NULL;
2030
2031   return res;
2032 }
2033
2034 static void
2035 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
2036 {
2037   if (!item->is_query && item->object)
2038     gst_mini_object_unref (item->object);
2039   g_slice_free (GstMultiQueueItem, item);
2040 }
2041
2042 /* takes ownership of passed mini object! */
2043 static GstMultiQueueItem *
2044 gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
2045 {
2046   GstMultiQueueItem *item;
2047
2048   item = g_slice_new (GstMultiQueueItem);
2049   item->object = object;
2050   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
2051   item->posid = curid;
2052   item->is_query = GST_IS_QUERY (object);
2053
2054   item->size = gst_buffer_get_size (GST_BUFFER_CAST (object));
2055   item->duration = GST_BUFFER_DURATION (object);
2056   if (item->duration == GST_CLOCK_TIME_NONE)
2057     item->duration = 0;
2058   item->visible = TRUE;
2059   return item;
2060 }
2061
2062 static GstMultiQueueItem *
2063 gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
2064 {
2065   GstMultiQueueItem *item;
2066
2067   item = g_slice_new (GstMultiQueueItem);
2068   item->object = object;
2069   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
2070   item->posid = curid;
2071   item->is_query = GST_IS_QUERY (object);
2072
2073   item->size = 0;
2074   item->duration = 0;
2075   item->visible = FALSE;
2076   return item;
2077 }
2078
2079 /* Each main loop attempts to push buffers until the return value
2080  * is not-linked. not-linked pads are not allowed to push data beyond
2081  * any linked pads, so they don't 'rush ahead of the pack'.
2082  */
2083 static void
2084 gst_multi_queue_loop (GstPad * pad)
2085 {
2086   GstSingleQueue *sq;
2087   GstMultiQueueItem *item;
2088   GstDataQueueItem *sitem;
2089   GstMultiQueue *mq;
2090   GstMiniObject *object = NULL;
2091   guint32 newid;
2092   GstFlowReturn result;
2093   GstClockTimeDiff next_time;
2094   gboolean is_buffer;
2095   gboolean do_update_buffering = FALSE;
2096   gboolean dropping = FALSE;
2097   GstPad *srcpad = NULL;
2098
2099   sq = GST_MULTIQUEUE_PAD (pad)->sq;
2100   mq = g_weak_ref_get (&sq->mqueue);
2101   srcpad = g_weak_ref_get (&sq->srcpad);
2102
2103   if (!mq || !srcpad)
2104     goto out_flushing;
2105
2106 next:
2107   GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
2108
2109   if (sq->flushing)
2110     goto out_flushing;
2111
2112   /* Get something from the queue, blocking until that happens, or we get
2113    * flushed */
2114   if (!(gst_data_queue_pop (sq->queue, &sitem)))
2115     goto out_flushing;
2116
2117   item = (GstMultiQueueItem *) sitem;
2118   newid = item->posid;
2119
2120   /* steal the object and destroy the item */
2121   object = gst_multi_queue_item_steal_object (item);
2122   gst_multi_queue_item_destroy (item);
2123
2124   is_buffer = GST_IS_BUFFER (object);
2125
2126   /* Get running time of the item. Events will have GST_CLOCK_STIME_NONE */
2127   next_time = get_running_time (&sq->src_segment, object, FALSE);
2128
2129   GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
2130       sq->id, newid, sq->last_oldid);
2131
2132   /* If we're not-linked, we do some extra work because we might need to
2133    * wait before pushing. If we're linked but there's a gap in the IDs,
2134    * or it's the first loop, or we just passed the previous highid,
2135    * we might need to wake some sleeping pad up, so there's extra work
2136    * there too */
2137   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2138   if (sq->srcresult == GST_FLOW_NOT_LINKED
2139       || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
2140       || sq->last_oldid > mq->highid) {
2141     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
2142         gst_flow_get_name (sq->srcresult));
2143
2144     /* Check again if we're flushing after the lock is taken,
2145      * the flush flag might have been changed in the meantime */
2146     if (sq->flushing) {
2147       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2148       goto out_flushing;
2149     }
2150
2151     /* Update the nextid so other threads know when to wake us up */
2152     sq->nextid = newid;
2153     /* Take into account the extra cache time since we're unlinked */
2154     if (GST_CLOCK_STIME_IS_VALID (next_time))
2155       next_time += mq->unlinked_cache_time;
2156     sq->next_time = next_time;
2157
2158     /* Update the oldid (the last ID we output) for highid tracking */
2159     if (sq->last_oldid != G_MAXUINT32)
2160       sq->oldid = sq->last_oldid;
2161
2162     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2163       gboolean should_wait;
2164       /* Go to sleep until it's time to push this buffer */
2165
2166       /* Recompute the highid */
2167       compute_high_id (mq);
2168       /* Recompute the high time */
2169       compute_high_time (mq, sq->groupid);
2170
2171       GST_DEBUG_OBJECT (mq,
2172           "groupid %d high_time %" GST_STIME_FORMAT " next_time %"
2173           GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (sq->group_high_time),
2174           GST_STIME_ARGS (next_time));
2175
2176       if (mq->sync_by_running_time) {
2177         if (sq->group_high_time == GST_CLOCK_STIME_NONE) {
2178           should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
2179               (mq->high_time == GST_CLOCK_STIME_NONE
2180               || next_time > mq->high_time);
2181         } else {
2182           should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
2183               next_time > sq->group_high_time;
2184         }
2185       } else
2186         should_wait = newid > mq->highid;
2187
2188       while (should_wait && sq->srcresult == GST_FLOW_NOT_LINKED) {
2189
2190         GST_DEBUG_OBJECT (mq,
2191             "queue %d sleeping for not-linked wakeup with "
2192             "newid %u, highid %u, next_time %" GST_STIME_FORMAT
2193             ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
2194             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
2195
2196         /* Wake up all non-linked pads before we sleep */
2197         wake_up_next_non_linked (mq);
2198
2199         mq->numwaiting++;
2200         g_cond_wait (&sq->turn, &mq->qlock);
2201         mq->numwaiting--;
2202
2203         if (sq->flushing) {
2204           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2205           goto out_flushing;
2206         }
2207
2208         /* Recompute the high time and ID */
2209         compute_high_time (mq, sq->groupid);
2210         compute_high_id (mq);
2211
2212         GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
2213             "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
2214             ", high_time %" GST_STIME_FORMAT " mq high_time %" GST_STIME_FORMAT,
2215             sq->id, newid, mq->highid,
2216             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time),
2217             GST_STIME_ARGS (mq->high_time));
2218
2219         if (mq->sync_by_running_time) {
2220           if (sq->group_high_time == GST_CLOCK_STIME_NONE) {
2221             should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
2222                 (mq->high_time == GST_CLOCK_STIME_NONE
2223                 || next_time > mq->high_time);
2224           } else {
2225             should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
2226                 next_time > sq->group_high_time;
2227           }
2228         } else
2229           should_wait = newid > mq->highid;
2230       }
2231
2232       /* Re-compute the high_id in case someone else pushed */
2233       compute_high_id (mq);
2234       compute_high_time (mq, sq->groupid);
2235     } else {
2236       compute_high_id (mq);
2237       compute_high_time (mq, sq->groupid);
2238       /* Wake up all non-linked pads */
2239       wake_up_next_non_linked (mq);
2240     }
2241     /* We're done waiting, we can clear the nextid and nexttime */
2242     sq->nextid = 0;
2243     sq->next_time = GST_CLOCK_STIME_NONE;
2244   }
2245   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2246
2247   if (sq->flushing)
2248     goto out_flushing;
2249
2250   GST_LOG_OBJECT (mq, "sq:%d BEFORE PUSHING sq->srcresult: %s", sq->id,
2251       gst_flow_get_name (sq->srcresult));
2252
2253   /* Update time stats */
2254   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2255   next_time = get_running_time (&sq->src_segment, object, TRUE);
2256   if (GST_CLOCK_STIME_IS_VALID (next_time)) {
2257     if (sq->last_time == GST_CLOCK_STIME_NONE || sq->last_time < next_time)
2258       sq->last_time = next_time;
2259     if (mq->high_time == GST_CLOCK_STIME_NONE || mq->high_time <= next_time) {
2260       /* Wake up all non-linked pads now that we advanced the high time */
2261       mq->high_time = next_time;
2262       wake_up_next_non_linked (mq);
2263     }
2264   }
2265   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2266
2267   /* Try to push out the new object */
2268   result = gst_single_queue_push_one (mq, sq, object, &dropping);
2269   object = NULL;
2270
2271   /* Check if we pushed something already and if this is
2272    * now a switch from an active to a non-active stream.
2273    *
2274    * If it is, we reset all the waiting streams, let them
2275    * push another buffer to see if they're now active again.
2276    * This allows faster switching between streams and prevents
2277    * deadlocks if downstream does any waiting too.
2278    */
2279   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2280   if (sq->pushed && sq->srcresult == GST_FLOW_OK
2281       && result == GST_FLOW_NOT_LINKED) {
2282     GList *tmp;
2283
2284     GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
2285         sq->id);
2286
2287     compute_high_id (mq);
2288     compute_high_time (mq, sq->groupid);
2289     do_update_buffering = TRUE;
2290
2291     /* maybe no-one is waiting */
2292     if (mq->numwaiting > 0) {
2293       /* Else figure out which singlequeue(s) need waking up */
2294       for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2295         GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
2296
2297         if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
2298           GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
2299           sq2->pushed = FALSE;
2300           sq2->srcresult = GST_FLOW_OK;
2301           g_cond_signal (&sq2->turn);
2302         }
2303       }
2304     }
2305   }
2306
2307   if (is_buffer)
2308     sq->pushed = TRUE;
2309
2310   /* now hold on a bit;
2311    * can not simply throw this result to upstream, because
2312    * that might already be onto another segment, so we have to make
2313    * sure we are relaying the correct info wrt proper segment */
2314   if (result == GST_FLOW_EOS && !dropping &&
2315       sq->srcresult != GST_FLOW_NOT_LINKED) {
2316     GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
2317     dropping = TRUE;
2318     /* pretend we have not seen EOS yet for upstream's sake */
2319     result = sq->srcresult;
2320   } else if (dropping && gst_data_queue_is_empty (sq->queue)) {
2321     /* queue empty, so stop dropping
2322      * we can commit the result we have now,
2323      * which is either OK after a segment, or EOS */
2324     GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
2325     dropping = FALSE;
2326     result = GST_FLOW_EOS;
2327   }
2328   sq->srcresult = result;
2329   sq->last_oldid = newid;
2330
2331   if (do_update_buffering)
2332     update_buffering (mq, sq);
2333
2334   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2335   gst_multi_queue_post_buffering (mq);
2336
2337   GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s (is_eos:%d)",
2338       sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (srcpad));
2339
2340   /* Need to make sure wake up any sleeping pads when we exit */
2341   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2342   if (mq->numwaiting > 0 && (GST_PAD_IS_EOS (srcpad)
2343           || sq->srcresult == GST_FLOW_EOS)) {
2344     compute_high_time (mq, sq->groupid);
2345     compute_high_id (mq);
2346     wake_up_next_non_linked (mq);
2347   }
2348   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2349
2350   if (dropping)
2351     goto next;
2352
2353   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
2354       && result != GST_FLOW_EOS)
2355     goto out_flushing;
2356
2357 done:
2358   gst_clear_object (&mq);
2359   gst_clear_object (&srcpad);
2360
2361   return;
2362
2363 out_flushing:
2364   {
2365     if (object && !GST_IS_QUERY (object))
2366       gst_mini_object_unref (object);
2367
2368     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2369     sq->last_query = FALSE;
2370     g_cond_signal (&sq->query_handled);
2371
2372     /* Post an error message if we got EOS while downstream
2373      * has returned an error flow return. After EOS there
2374      * will be no further buffer which could propagate the
2375      * error upstream */
2376     if ((sq->is_eos || sq->is_segment_done) && sq->srcresult < GST_FLOW_EOS) {
2377       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2378       GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
2379     } else {
2380       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2381     }
2382
2383     /* upstream needs to see fatal result ASAP to shut things down,
2384      * but might be stuck in one of our other full queues;
2385      * so empty this one and trigger dynamic queue growth. At
2386      * this point the srcresult is not OK, NOT_LINKED
2387      * or EOS, i.e. a real failure */
2388     gst_single_queue_flush_queue (sq, FALSE);
2389     single_queue_underrun_cb (sq->queue, sq);
2390     gst_data_queue_set_flushing (sq->queue, TRUE);
2391     gst_pad_pause_task (srcpad);
2392     GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
2393         "SingleQueue[%d] task paused, reason:%s",
2394         sq->id, gst_flow_get_name (sq->srcresult));
2395     goto done;
2396   }
2397 }
2398
2399 /**
2400  * gst_multi_queue_chain:
2401  *
2402  * This is similar to GstQueue's chain function, except:
2403  * _ we don't have leak behaviours,
2404  * _ we push with a unique id (curid)
2405  */
2406 static GstFlowReturn
2407 gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2408 {
2409   GstSingleQueue *sq;
2410   GstMultiQueue *mq;
2411   GstMultiQueueItem *item = NULL;
2412   guint32 curid;
2413   GstClockTime timestamp, duration;
2414
2415   sq = GST_MULTIQUEUE_PAD (pad)->sq;
2416   mq = g_weak_ref_get (&sq->mqueue);
2417
2418   if (!mq)
2419     goto flushing;
2420
2421   /* if eos, we are always full, so avoid hanging incoming indefinitely */
2422   if (sq->is_eos)
2423     goto was_eos;
2424
2425   sq->active = TRUE;
2426
2427   /* Get a unique incrementing id */
2428   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2429
2430   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
2431   duration = GST_BUFFER_DURATION (buffer);
2432
2433   GST_LOG_OBJECT (mq,
2434       "SingleQueue %d : about to enqueue buffer %p with id %d (pts:%"
2435       GST_TIME_FORMAT " dts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT ")",
2436       sq->id, buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
2437       GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), GST_TIME_ARGS (duration));
2438
2439   item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
2440
2441   /* Update interleave before pushing data into queue */
2442   if (mq->use_interleave) {
2443     GstClockTime val = timestamp;
2444     GstClockTimeDiff dval;
2445
2446     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2447     if (val == GST_CLOCK_TIME_NONE)
2448       val = sq->sink_segment.position;
2449     if (duration != GST_CLOCK_TIME_NONE)
2450       val += duration;
2451
2452     dval = my_segment_to_running_time (&sq->sink_segment, val);
2453     if (GST_CLOCK_STIME_IS_VALID (dval)) {
2454       sq->cached_sinktime = dval;
2455       GST_DEBUG_OBJECT (mq,
2456           "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
2457           GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
2458           GST_STIME_ARGS (sq->cached_sinktime));
2459       calculate_interleave (mq, sq);
2460     }
2461     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2462   }
2463
2464   if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
2465     goto flushing;
2466
2467   /* update time level, we must do this after pushing the data in the queue so
2468    * that we never end up filling the queue first. */
2469   apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
2470
2471 done:
2472   gst_clear_object (&mq);
2473   return sq->srcresult;
2474
2475   /* ERRORS */
2476 flushing:
2477   {
2478     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
2479         sq->id, gst_flow_get_name (sq->srcresult));
2480     if (item)
2481       gst_multi_queue_item_destroy (item);
2482     goto done;
2483   }
2484 was_eos:
2485   {
2486     GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return EOS");
2487     gst_buffer_unref (buffer);
2488     gst_object_unref (mq);
2489     return GST_FLOW_EOS;
2490   }
2491 }
2492
2493 static gboolean
2494 gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
2495     GstPadMode mode, gboolean active)
2496 {
2497   gboolean res;
2498   GstSingleQueue *sq;
2499   GstMultiQueue *mq;
2500
2501   sq = GST_MULTIQUEUE_PAD (pad)->sq;
2502   mq = (GstMultiQueue *) gst_pad_get_parent (pad);
2503
2504   /* mq is NULL if the pad is activated/deactivated before being
2505    * added to the multiqueue */
2506   if (mq)
2507     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2508
2509   switch (mode) {
2510     case GST_PAD_MODE_PUSH:
2511       if (active) {
2512         /* All pads start off linked until they push one buffer */
2513         sq->srcresult = GST_FLOW_OK;
2514         sq->pushed = FALSE;
2515         gst_data_queue_set_flushing (sq->queue, FALSE);
2516       } else {
2517         sq->srcresult = GST_FLOW_FLUSHING;
2518         sq->last_query = FALSE;
2519         g_cond_signal (&sq->query_handled);
2520         gst_data_queue_set_flushing (sq->queue, TRUE);
2521
2522         /* Wait until streaming thread has finished */
2523         if (mq)
2524           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2525         GST_PAD_STREAM_LOCK (pad);
2526         if (mq)
2527           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2528         gst_data_queue_flush (sq->queue);
2529         if (mq)
2530           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2531         GST_PAD_STREAM_UNLOCK (pad);
2532         if (mq)
2533           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2534       }
2535       res = TRUE;
2536       break;
2537     default:
2538       res = FALSE;
2539       break;
2540   }
2541
2542   if (mq) {
2543     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2544     gst_object_unref (mq);
2545   }
2546
2547   return res;
2548 }
2549
2550 static GstFlowReturn
2551 gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
2552 {
2553   GstSingleQueue *sq;
2554   GstMultiQueue *mq;
2555   guint32 curid;
2556   GstMultiQueueItem *item;
2557   gboolean res = TRUE;
2558   GstFlowReturn flowret = GST_FLOW_OK;
2559   GstEventType type;
2560   GstEvent *sref = NULL;
2561   GstPad *srcpad;
2562
2563
2564   sq = GST_MULTIQUEUE_PAD (pad)->sq;
2565   mq = (GstMultiQueue *) parent;
2566   srcpad = g_weak_ref_get (&sq->srcpad);
2567
2568   if (!srcpad) {
2569     GST_INFO_OBJECT (pad, "Pushing while corresponding sourcepad has been"
2570         " removed already");
2571
2572     return GST_FLOW_FLUSHING;
2573   }
2574
2575   type = GST_EVENT_TYPE (event);
2576
2577   switch (type) {
2578     case GST_EVENT_STREAM_START:
2579     {
2580       if (mq->sync_by_running_time) {
2581         GstStreamFlags stream_flags;
2582         gst_event_parse_stream_flags (event, &stream_flags);
2583         if ((stream_flags & GST_STREAM_FLAG_SPARSE)) {
2584           GST_INFO_OBJECT (mq, "SingleQueue %d is a sparse stream", sq->id);
2585           sq->is_sparse = TRUE;
2586         }
2587       }
2588
2589       sq->thread = g_thread_self ();
2590
2591       /* Remove EOS flag */
2592       sq->is_eos = FALSE;
2593       break;
2594     }
2595     case GST_EVENT_FLUSH_START:
2596       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
2597           sq->id);
2598
2599       res = gst_pad_push_event (srcpad, event);
2600
2601       gst_single_queue_flush (mq, sq, TRUE, FALSE);
2602       gst_single_queue_pause (mq, sq);
2603       goto done;
2604
2605     case GST_EVENT_FLUSH_STOP:
2606       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
2607           sq->id);
2608
2609       res = gst_pad_push_event (srcpad, event);
2610
2611       gst_single_queue_flush (mq, sq, FALSE, FALSE);
2612       gst_single_queue_start (mq, sq);
2613       goto done;
2614
2615     case GST_EVENT_SEGMENT:
2616       sq->is_segment_done = FALSE;
2617       sref = gst_event_ref (event);
2618       break;
2619     case GST_EVENT_GAP:
2620       /* take ref because the queue will take ownership and we need the event
2621        * afterwards to update the segment */
2622       sref = gst_event_ref (event);
2623       if (mq->use_interleave) {
2624         GstClockTime val, dur;
2625         GstClockTime stime;
2626         gst_event_parse_gap (event, &val, &dur);
2627         if (GST_CLOCK_TIME_IS_VALID (val)) {
2628           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2629           if (GST_CLOCK_TIME_IS_VALID (dur))
2630             val += dur;
2631           stime = my_segment_to_running_time (&sq->sink_segment, val);
2632           if (GST_CLOCK_STIME_IS_VALID (stime)) {
2633             sq->cached_sinktime = stime;
2634             calculate_interleave (mq, sq);
2635           }
2636           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2637         }
2638       }
2639       break;
2640
2641     default:
2642       if (!(GST_EVENT_IS_SERIALIZED (event))) {
2643         res = gst_pad_push_event (srcpad, event);
2644         goto done;
2645       }
2646       break;
2647   }
2648
2649   /* if eos, we are always full, so avoid hanging incoming indefinitely */
2650   if (sq->is_eos)
2651     goto was_eos;
2652
2653   /* Get an unique incrementing id. */
2654   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2655
2656   item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
2657
2658   GST_DEBUG_OBJECT (mq,
2659       "SingleQueue %d : Enqueuing event %p of type %s with id %d",
2660       sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
2661
2662   if (!gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))
2663     goto flushing;
2664
2665   /* mark EOS when we received one, we must do that after putting the
2666    * buffer in the queue because EOS marks the buffer as filled. */
2667   switch (type) {
2668     case GST_EVENT_SEGMENT_DONE:
2669       sq->is_segment_done = TRUE;
2670       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2671       update_buffering (mq, sq);
2672       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2673       single_queue_overrun_cb (sq->queue, sq);
2674       gst_multi_queue_post_buffering (mq);
2675       break;
2676     case GST_EVENT_EOS:
2677       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2678       sq->is_eos = TRUE;
2679
2680       /* Post an error message if we got EOS while downstream
2681        * has returned an error flow return. After EOS there
2682        * will be no further buffer which could propagate the
2683        * error upstream */
2684       if (sq->srcresult < GST_FLOW_EOS) {
2685         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2686         GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
2687       } else {
2688         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2689       }
2690
2691       /* EOS affects the buffering state */
2692       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2693       update_buffering (mq, sq);
2694       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2695       single_queue_overrun_cb (sq->queue, sq);
2696       gst_multi_queue_post_buffering (mq);
2697       break;
2698     case GST_EVENT_SEGMENT:
2699       apply_segment (mq, sq, sref, &sq->sink_segment);
2700       gst_event_unref (sref);
2701       /* a new segment allows us to accept more buffers if we got EOS
2702        * from downstream */
2703       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2704       if (sq->srcresult == GST_FLOW_EOS)
2705         sq->srcresult = GST_FLOW_OK;
2706       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2707       break;
2708     case GST_EVENT_GAP:
2709       sq->active = TRUE;
2710       apply_gap (mq, sq, sref, &sq->sink_segment);
2711       gst_event_unref (sref);
2712     default:
2713       break;
2714   }
2715
2716 done:
2717
2718   gst_object_unref (srcpad);
2719   if (res == FALSE)
2720     flowret = GST_FLOW_ERROR;
2721   GST_DEBUG_OBJECT (mq, "SingleQueue %d : returning %s", sq->id,
2722       gst_flow_get_name (flowret));
2723   return flowret;
2724
2725 flushing:
2726   {
2727     gst_object_unref (srcpad);
2728     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
2729         sq->id, gst_flow_get_name (sq->srcresult));
2730     if (sref)
2731       gst_event_unref (sref);
2732     gst_multi_queue_item_destroy (item);
2733     return sq->srcresult;
2734   }
2735 was_eos:
2736   {
2737     gst_object_unref (srcpad);
2738     GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return GST_FLOW_EOS");
2739     gst_event_unref (event);
2740     return GST_FLOW_EOS;
2741   }
2742 }
2743
2744 static gboolean
2745 gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
2746 {
2747   gboolean res;
2748   GstSingleQueue *sq;
2749   GstMultiQueue *mq;
2750
2751   sq = GST_MULTIQUEUE_PAD (pad)->sq;
2752   mq = (GstMultiQueue *) parent;
2753
2754   switch (GST_QUERY_TYPE (query)) {
2755     default:
2756       if (GST_QUERY_IS_SERIALIZED (query)) {
2757         guint32 curid;
2758         GstMultiQueueItem *item;
2759
2760         GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2761         if (sq->srcresult != GST_FLOW_OK)
2762           goto out_flushing;
2763
2764         /* serialized events go in the queue. We need to be certain that we
2765          * don't cause deadlocks waiting for the query return value. We check if
2766          * the queue is empty (nothing is blocking downstream and the query can
2767          * be pushed for sure) or we are not buffering. If we are buffering,
2768          * the pipeline waits to unblock downstream until our queue fills up
2769          * completely, which can not happen if we block on the query..
2770          * Therefore we only potentially block when we are not buffering. */
2771         if (!mq->use_buffering || gst_data_queue_is_empty (sq->queue)) {
2772           /* Get an unique incrementing id. */
2773           curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2774
2775           item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
2776
2777           GST_DEBUG_OBJECT (mq,
2778               "SingleQueue %d : Enqueuing query %p of type %s with id %d",
2779               sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
2780           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2781           res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
2782           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2783           if (!res || sq->flushing)
2784             goto out_flushing;
2785           /* it might be that the query has been taken out of the queue
2786            * while we were unlocked. So, we need to check if the last
2787            * handled query is the same one than the one we just
2788            * pushed. If it is, we don't need to wait for the condition
2789            * variable, otherwise we wait for the condition variable to
2790            * be signaled. */
2791           while (!sq->flushing && sq->srcresult == GST_FLOW_OK
2792               && sq->last_handled_query != query)
2793             g_cond_wait (&sq->query_handled, &mq->qlock);
2794           res = sq->last_query;
2795           sq->last_handled_query = NULL;
2796         } else {
2797           GST_DEBUG_OBJECT (mq, "refusing query, we are buffering and the "
2798               "queue is not empty");
2799           res = FALSE;
2800         }
2801         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2802       } else {
2803         /* default handling */
2804         res = gst_pad_query_default (pad, parent, query);
2805       }
2806       break;
2807   }
2808   return res;
2809
2810 out_flushing:
2811   {
2812     GST_DEBUG_OBJECT (mq, "Flushing");
2813     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2814     return FALSE;
2815   }
2816 }
2817
2818 static gboolean
2819 gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
2820     GstPadMode mode, gboolean active)
2821 {
2822   GstMultiQueue *mq;
2823   GstSingleQueue *sq;
2824   gboolean result;
2825
2826   sq = GST_MULTIQUEUE_PAD (pad)->sq;
2827   mq = g_weak_ref_get (&sq->mqueue);
2828
2829   if (!mq) {
2830     GST_ERROR_OBJECT (pad, "No multique set anymore, can't activate pad");
2831
2832     return FALSE;
2833   }
2834
2835   GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
2836
2837   switch (mode) {
2838     case GST_PAD_MODE_PUSH:
2839       if (active) {
2840         gst_single_queue_flush (mq, sq, FALSE, TRUE);
2841         result = parent ? gst_single_queue_start (mq, sq) : TRUE;
2842       } else {
2843         gst_single_queue_flush (mq, sq, TRUE, TRUE);
2844         result = gst_single_queue_stop (mq, sq);
2845       }
2846       break;
2847     default:
2848       result = FALSE;
2849       break;
2850   }
2851   gst_object_unref (mq);
2852   return result;
2853 }
2854
2855 static gboolean
2856 gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2857 {
2858   GstSingleQueue *sq = GST_MULTIQUEUE_PAD (pad)->sq;
2859   GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
2860   gboolean ret;
2861   GstPad *sinkpad = g_weak_ref_get (&sq->sinkpad);
2862
2863   if (!mq || !sinkpad) {
2864     gst_clear_object (&sinkpad);
2865     gst_clear_object (&mq);
2866     GST_INFO_OBJECT (pad, "No multique/sinkpad set anymore, flushing");
2867
2868     return FALSE;
2869   }
2870
2871   switch (GST_EVENT_TYPE (event)) {
2872     case GST_EVENT_RECONFIGURE:
2873       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2874       if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2875         sq->srcresult = GST_FLOW_OK;
2876         g_cond_signal (&sq->turn);
2877       }
2878       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2879
2880       ret = gst_pad_push_event (sinkpad, event);
2881       break;
2882     default:
2883       ret = gst_pad_push_event (sinkpad, event);
2884       break;
2885   }
2886
2887   gst_object_unref (sinkpad);
2888   gst_object_unref (mq);
2889
2890   return ret;
2891 }
2892
2893 static gboolean
2894 gst_multi_queue_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2895 {
2896   gboolean res;
2897
2898   /* FIXME, Handle position offset depending on queue size */
2899   switch (GST_QUERY_TYPE (query)) {
2900     default:
2901       /* default handling */
2902       res = gst_pad_query_default (pad, parent, query);
2903       break;
2904   }
2905   return res;
2906 }
2907
2908 /*
2909  * Next-non-linked functions
2910  */
2911
2912 /* WITH LOCK TAKEN */
2913 static void
2914 wake_up_next_non_linked (GstMultiQueue * mq)
2915 {
2916   GList *tmp;
2917
2918   /* maybe no-one is waiting */
2919   if (mq->numwaiting < 1)
2920     return;
2921
2922   if (mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (mq->high_time)) {
2923     /* Else figure out which singlequeue(s) need waking up */
2924     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2925       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2926       if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2927         GstClockTimeDiff high_time;
2928
2929         if (GST_CLOCK_STIME_IS_VALID (sq->group_high_time))
2930           high_time = sq->group_high_time;
2931         else
2932           high_time = mq->high_time;
2933
2934         if (GST_CLOCK_STIME_IS_VALID (sq->next_time) &&
2935             GST_CLOCK_STIME_IS_VALID (high_time)
2936             && sq->next_time <= high_time) {
2937           GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2938           g_cond_signal (&sq->turn);
2939         }
2940       }
2941     }
2942   } else {
2943     /* Else figure out which singlequeue(s) need waking up */
2944     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2945       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2946       if (sq->srcresult == GST_FLOW_NOT_LINKED &&
2947           sq->nextid != 0 && sq->nextid <= mq->highid) {
2948         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2949         g_cond_signal (&sq->turn);
2950       }
2951     }
2952   }
2953 }
2954
2955 /* WITH LOCK TAKEN */
2956 static void
2957 compute_high_id (GstMultiQueue * mq)
2958 {
2959   /* The high-id is either the highest id among the linked pads, or if all
2960    * pads are not-linked, it's the lowest not-linked pad */
2961   GList *tmp;
2962   guint32 lowest = G_MAXUINT32;
2963   guint32 highid = G_MAXUINT32;
2964
2965   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2966     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2967     GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
2968
2969     if (!srcpad) {
2970       GST_INFO_OBJECT (mq,
2971           "srcpad has been removed already... ignoring single queue");
2972
2973       continue;
2974     }
2975
2976     GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
2977         sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
2978
2979     /* No need to consider queues which are not waiting */
2980     if (sq->nextid == 0) {
2981       GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
2982       gst_object_unref (srcpad);
2983       continue;
2984     }
2985
2986     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2987       if (sq->nextid < lowest)
2988         lowest = sq->nextid;
2989     } else if (!GST_PAD_IS_EOS (srcpad) && sq->srcresult != GST_FLOW_EOS) {
2990       /* If we don't have a global highid, or the global highid is lower than
2991        * this single queue's last outputted id, store the queue's one,
2992        * unless the singlequeue output is at EOS */
2993       if ((highid == G_MAXUINT32) || (sq->oldid > highid))
2994         highid = sq->oldid;
2995     }
2996     gst_object_unref (srcpad);
2997   }
2998
2999   if (highid == G_MAXUINT32 || lowest < highid)
3000     mq->highid = lowest;
3001   else
3002     mq->highid = highid;
3003
3004   GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
3005       lowest);
3006 }
3007
3008 /* WITH LOCK TAKEN */
3009 static void
3010 compute_high_time (GstMultiQueue * mq, guint groupid)
3011 {
3012   /* The high-time is either the highest last time among the linked
3013    * pads, or if all pads are not-linked, it's the lowest nex time of
3014    * not-linked pad */
3015   GList *tmp;
3016   GstClockTimeDiff highest = GST_CLOCK_STIME_NONE;
3017   GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE;
3018   GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE;
3019   GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE;
3020   GstClockTimeDiff res;
3021   /* Number of streams which belong to groupid */
3022   guint group_count = 0;
3023
3024   if (!mq->sync_by_running_time)
3025     /* return GST_CLOCK_STIME_NONE; */
3026     return;
3027
3028   for (tmp = mq->queues; tmp; tmp = tmp->next) {
3029     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
3030     GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
3031
3032     if (!srcpad) {
3033       GST_INFO_OBJECT (mq,
3034           "srcpad has been removed already... ignoring single queue");
3035
3036       continue;
3037     }
3038
3039     GST_LOG_OBJECT (mq,
3040         "inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT
3041         ", last_time:%" GST_STIME_FORMAT ", srcresult:%s", sq->id, sq->groupid,
3042         GST_STIME_ARGS (sq->next_time), GST_STIME_ARGS (sq->last_time),
3043         gst_flow_get_name (sq->srcresult));
3044
3045     if (sq->groupid == groupid)
3046       group_count++;
3047
3048     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
3049       /* No need to consider queues which are not waiting */
3050       if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) {
3051         GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
3052         gst_object_unref (srcpad);
3053         continue;
3054       }
3055
3056       if (lowest == GST_CLOCK_STIME_NONE || sq->next_time < lowest)
3057         lowest = sq->next_time;
3058       if (sq->groupid == groupid && (group_low == GST_CLOCK_STIME_NONE
3059               || sq->next_time < group_low))
3060         group_low = sq->next_time;
3061     } else if (!GST_PAD_IS_EOS (srcpad) && sq->srcresult != GST_FLOW_EOS) {
3062       /* If we don't have a global high time, or the global high time
3063        * is lower than this single queue's last outputted time, store
3064        * the queue's one, unless the singlequeue output is at EOS. */
3065       if (highest == GST_CLOCK_STIME_NONE
3066           || (sq->last_time != GST_CLOCK_STIME_NONE && sq->last_time > highest))
3067         highest = sq->last_time;
3068       if (sq->groupid == groupid && (group_high == GST_CLOCK_STIME_NONE
3069               || (sq->last_time != GST_CLOCK_STIME_NONE
3070                   && sq->last_time > group_high)))
3071         group_high = sq->last_time;
3072     }
3073     GST_LOG_OBJECT (mq,
3074         "highest now %" GST_STIME_FORMAT " lowest %" GST_STIME_FORMAT,
3075         GST_STIME_ARGS (highest), GST_STIME_ARGS (lowest));
3076     if (sq->groupid == groupid)
3077       GST_LOG_OBJECT (mq,
3078           "grouphigh %" GST_STIME_FORMAT " grouplow %" GST_STIME_FORMAT,
3079           GST_STIME_ARGS (group_high), GST_STIME_ARGS (group_low));
3080
3081     gst_object_unref (srcpad);
3082   }
3083
3084   if (highest == GST_CLOCK_STIME_NONE)
3085     mq->high_time = lowest;
3086   else
3087     mq->high_time = highest;
3088
3089   /* If there's only one stream of a given type, use the global high */
3090   if (group_count < 2)
3091     res = GST_CLOCK_STIME_NONE;
3092   else if (group_high == GST_CLOCK_STIME_NONE)
3093     res = group_low;
3094   else
3095     res = group_high;
3096
3097   GST_LOG_OBJECT (mq, "group count %d for groupid %u", group_count, groupid);
3098   GST_LOG_OBJECT (mq,
3099       "MQ High time is now : %" GST_STIME_FORMAT ", group %d high time %"
3100       GST_STIME_FORMAT ", lowest non-linked %" GST_STIME_FORMAT,
3101       GST_STIME_ARGS (mq->high_time), groupid, GST_STIME_ARGS (mq->high_time),
3102       GST_STIME_ARGS (lowest));
3103
3104   for (tmp = mq->queues; tmp; tmp = tmp->next) {
3105     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
3106     if (groupid == sq->groupid)
3107       sq->group_high_time = res;
3108   }
3109 }
3110
3111 #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
3112      ((q)->max_size.format) <= (value))
3113
3114 /*
3115  * GstSingleQueue functions
3116  */
3117 static void
3118 single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
3119 {
3120   GList *tmp;
3121   GstDataQueueSize size;
3122   gboolean filled = TRUE;
3123   gboolean empty_found = FALSE;
3124   GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
3125
3126   if (!mq) {
3127     GST_ERROR ("No multique set anymore, not doing anything");
3128
3129     return;
3130   }
3131
3132   gst_data_queue_get_level (sq->queue, &size);
3133
3134   GST_LOG_OBJECT (mq,
3135       "Single Queue %d: EOS %d, visible %u/%u, bytes %u/%u, time %"
3136       G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, sq->is_eos, size.visible,
3137       sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time,
3138       sq->max_size.time);
3139
3140   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
3141
3142   /* check if we reached the hard time/bytes limits;
3143      time limit is only taken into account for non-sparse streams */
3144   if (sq->is_eos || IS_FILLED (sq, bytes, size.bytes) ||
3145       (!sq->is_sparse && IS_FILLED (sq, time, sq->cur_time))) {
3146     goto done;
3147   }
3148
3149   /* Search for empty queues */
3150   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
3151     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
3152
3153     if (oq == sq)
3154       continue;
3155
3156     if (oq->srcresult == GST_FLOW_NOT_LINKED) {
3157       GST_LOG_OBJECT (mq, "Queue %d is not-linked", oq->id);
3158       continue;
3159     }
3160
3161     GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
3162     if (gst_data_queue_is_empty (oq->queue) && !oq->is_sparse) {
3163       GST_LOG_OBJECT (mq, "Queue %d is empty", oq->id);
3164       empty_found = TRUE;
3165       break;
3166     }
3167   }
3168
3169   /* if hard limits are not reached then we allow one more buffer in the full
3170    * queue, but only if any of the other singelqueues are empty */
3171   if (empty_found) {
3172     if (IS_FILLED (sq, visible, size.visible)) {
3173       sq->max_size.visible = size.visible + 1;
3174       GST_DEBUG_OBJECT (mq,
3175           "Bumping single queue %d max visible to %d",
3176           sq->id, sq->max_size.visible);
3177       filled = FALSE;
3178     }
3179   }
3180
3181 done:
3182   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
3183   gst_object_unref (mq);
3184
3185   /* Overrun is always forwarded, since this is blocking the upstream element */
3186   if (filled) {
3187     GST_DEBUG_OBJECT (mq, "Queue %d is filled, signalling overrun", sq->id);
3188     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
3189   }
3190 }
3191
3192 static void
3193 single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
3194 {
3195   gboolean empty = TRUE;
3196   GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
3197   GList *tmp;
3198
3199   if (!mq) {
3200     GST_ERROR ("No multique set anymore, not doing anything");
3201
3202     return;
3203   }
3204
3205   if (sq->srcresult == GST_FLOW_NOT_LINKED) {
3206     GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id);
3207     gst_object_unref (mq);
3208     return;
3209   } else {
3210     GST_LOG_OBJECT (mq,
3211         "Single Queue %d is empty, Checking other single queues", sq->id);
3212   }
3213
3214   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
3215   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
3216     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
3217
3218     if (gst_data_queue_is_full (oq->queue)) {
3219       GstDataQueueSize size;
3220
3221       gst_data_queue_get_level (oq->queue, &size);
3222       if (IS_FILLED (oq, visible, size.visible)) {
3223         oq->max_size.visible = size.visible + 1;
3224         GST_DEBUG_OBJECT (mq,
3225             "queue %d is filled, bumping its max visible to %d", oq->id,
3226             oq->max_size.visible);
3227         gst_data_queue_limits_changed (oq->queue);
3228       }
3229     }
3230     if (!gst_data_queue_is_empty (oq->queue) || oq->is_sparse)
3231       empty = FALSE;
3232   }
3233   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
3234   gst_object_unref (mq);
3235
3236   if (empty) {
3237     GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
3238     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
3239   }
3240 }
3241
3242 static gboolean
3243 single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
3244     guint64 time, GstSingleQueue * sq)
3245 {
3246   gboolean res;
3247   GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
3248
3249   if (!mq) {
3250     GST_ERROR ("No multique set anymore, let's say we are full");
3251
3252     return TRUE;
3253   }
3254
3255   GST_DEBUG_OBJECT (mq,
3256       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
3257       G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
3258       sq->max_size.bytes, sq->cur_time, sq->max_size.time);
3259
3260   /* we are always filled on EOS */
3261   if (sq->is_eos || sq->is_segment_done) {
3262     res = TRUE;
3263     goto done;
3264   }
3265
3266   /* we never go past the max visible items unless we are in buffering mode */
3267   if (!mq->use_buffering && IS_FILLED (sq, visible, visible)) {
3268     res = TRUE;
3269     goto done;
3270   }
3271
3272   /* check time or bytes */
3273   res = IS_FILLED (sq, bytes, bytes);
3274   /* We only care about limits in time if we're not a sparse stream or
3275    * we're not syncing by running time */
3276   if (!sq->is_sparse || !mq->sync_by_running_time) {
3277     /* If unlinked, take into account the extra unlinked cache time */
3278     if (mq->sync_by_running_time && sq->srcresult == GST_FLOW_NOT_LINKED) {
3279       if (sq->cur_time > mq->unlinked_cache_time)
3280         res |= IS_FILLED (sq, time, sq->cur_time - mq->unlinked_cache_time);
3281       else
3282         res = FALSE;
3283     } else
3284       res |= IS_FILLED (sq, time, sq->cur_time);
3285   }
3286 done:
3287   gst_object_unref (mq);
3288
3289   return res;
3290 }
3291
3292 static void
3293 gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
3294 {
3295   GstDataQueueItem *sitem;
3296   GstMultiQueueItem *mitem;
3297   gboolean was_flushing = FALSE;
3298   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
3299   GstMultiQueue *mq = g_weak_ref_get (&sq->mqueue);
3300
3301   while (!gst_data_queue_is_empty (sq->queue)) {
3302     GstMiniObject *data;
3303
3304     /* FIXME: If this fails here although the queue is not empty,
3305      * we're flushing... but we want to rescue all sticky
3306      * events nonetheless.
3307      */
3308     if (!gst_data_queue_pop (sq->queue, &sitem)) {
3309       was_flushing = TRUE;
3310       gst_data_queue_set_flushing (sq->queue, FALSE);
3311       continue;
3312     }
3313
3314     mitem = (GstMultiQueueItem *) sitem;
3315
3316     data = sitem->object;
3317
3318     if (!full && !mitem->is_query && GST_IS_EVENT (data)
3319         && srcpad && GST_EVENT_IS_STICKY (data)
3320         && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
3321         && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
3322       gst_pad_store_sticky_event (srcpad, GST_EVENT_CAST (data));
3323     }
3324
3325     sitem->destroy (sitem);
3326   }
3327   gst_clear_object (&srcpad);
3328
3329   gst_data_queue_flush (sq->queue);
3330   if (was_flushing)
3331     gst_data_queue_set_flushing (sq->queue, TRUE);
3332
3333   if (mq) {
3334     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
3335     update_buffering (mq, sq);
3336     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
3337     gst_multi_queue_post_buffering (mq);
3338     gst_object_unref (mq);
3339   }
3340 }
3341
3342 static void
3343 gst_single_queue_unref (GstSingleQueue * sq)
3344 {
3345   if (g_atomic_int_dec_and_test (&sq->refcount)) {
3346     /* DRAIN QUEUE */
3347     gst_data_queue_flush (sq->queue);
3348     g_object_unref (sq->queue);
3349     g_cond_clear (&sq->turn);
3350     g_cond_clear (&sq->query_handled);
3351     g_weak_ref_clear (&sq->sinkpad);
3352     g_weak_ref_clear (&sq->srcpad);
3353     g_weak_ref_clear (&sq->mqueue);
3354     g_free (sq);
3355   }
3356 }
3357
3358
3359 static GstSingleQueue *
3360 gst_single_queue_ref (GstSingleQueue * squeue)
3361 {
3362   g_atomic_int_inc (&squeue->refcount);
3363
3364   return squeue;
3365 }
3366
3367 static GstSingleQueue *
3368 gst_single_queue_new (GstMultiQueue * mqueue, guint id)
3369 {
3370   GstPad *srcpad, *sinkpad;
3371   GstSingleQueue *sq;
3372   GstPadTemplate *templ;
3373   gchar *name;
3374   GList *tmp;
3375   guint temp_id = (id == -1) ? 0 : id;
3376
3377   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
3378
3379   /* Find an unused queue ID, if possible the passed one */
3380   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
3381     GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
3382     /* This works because the IDs are sorted in ascending order */
3383     if (sq2->id == temp_id) {
3384       /* If this ID was requested by the caller return NULL,
3385        * otherwise just get us the next one */
3386       if (id == -1) {
3387         temp_id = sq2->id + 1;
3388       } else {
3389         GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
3390         return NULL;
3391       }
3392     } else if (sq2->id > temp_id) {
3393       break;
3394     }
3395   }
3396
3397   sq = g_new0 (GstSingleQueue, 1);
3398   g_atomic_int_set (&sq->refcount, 1);
3399
3400   mqueue->nbqueues++;
3401   sq->id = temp_id;
3402   sq->groupid = DEFAULT_PAD_GROUP_ID;
3403   sq->group_high_time = GST_CLOCK_STIME_NONE;
3404
3405   mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
3406   mqueue->queues_cookie++;
3407
3408   /* copy over max_size and extra_size so we don't need to take the lock
3409    * any longer when checking if the queue is full. */
3410   sq->max_size.visible = mqueue->max_size.visible;
3411   sq->max_size.bytes = mqueue->max_size.bytes;
3412   sq->max_size.time = mqueue->max_size.time;
3413
3414   sq->extra_size.visible = mqueue->extra_size.visible;
3415   sq->extra_size.bytes = mqueue->extra_size.bytes;
3416   sq->extra_size.time = mqueue->extra_size.time;
3417
3418   GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
3419
3420   g_weak_ref_init (&sq->mqueue, mqueue);
3421   sq->srcresult = GST_FLOW_FLUSHING;
3422   sq->pushed = FALSE;
3423   sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
3424       single_queue_check_full,
3425       (GstDataQueueFullCallback) single_queue_overrun_cb,
3426       (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
3427   sq->is_eos = FALSE;
3428   sq->is_sparse = FALSE;
3429   sq->flushing = FALSE;
3430   sq->active = FALSE;
3431   gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
3432   gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
3433
3434   sq->nextid = 0;
3435   sq->oldid = 0;
3436   sq->next_time = GST_CLOCK_STIME_NONE;
3437   sq->last_time = GST_CLOCK_STIME_NONE;
3438   g_cond_init (&sq->turn);
3439   g_cond_init (&sq->query_handled);
3440
3441   sq->sinktime = GST_CLOCK_STIME_NONE;
3442   sq->srctime = GST_CLOCK_STIME_NONE;
3443   sq->sink_tainted = TRUE;
3444   sq->src_tainted = TRUE;
3445
3446   name = g_strdup_printf ("sink_%u", sq->id);
3447   templ = gst_static_pad_template_get (&sinktemplate);
3448   sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
3449       "direction", templ->direction, "template", templ, NULL);
3450   g_weak_ref_init (&sq->sinkpad, sinkpad);
3451   gst_object_unref (templ);
3452   g_free (name);
3453
3454   GST_MULTIQUEUE_PAD (sinkpad)->sq = sq;
3455
3456   gst_pad_set_chain_function (sinkpad,
3457       GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
3458   gst_pad_set_activatemode_function (sinkpad,
3459       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_mode));
3460   gst_pad_set_event_full_function (sinkpad,
3461       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
3462   gst_pad_set_query_function (sinkpad,
3463       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_query));
3464   gst_pad_set_iterate_internal_links_function (sinkpad,
3465       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
3466   GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
3467
3468   name = g_strdup_printf ("src_%u", sq->id);
3469   templ = gst_static_pad_template_get (&srctemplate);
3470   srcpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
3471       "direction", templ->direction, "template", templ, NULL);
3472   g_weak_ref_init (&sq->srcpad, srcpad);
3473   gst_object_unref (templ);
3474   g_free (name);
3475
3476   GST_MULTIQUEUE_PAD (srcpad)->sq = gst_single_queue_ref (sq);
3477
3478   gst_pad_set_activatemode_function (srcpad,
3479       GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode));
3480   gst_pad_set_event_function (srcpad,
3481       GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
3482   gst_pad_set_query_function (srcpad,
3483       GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
3484   gst_pad_set_iterate_internal_links_function (srcpad,
3485       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
3486   GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
3487
3488   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
3489
3490   /* only activate the pads when we are not in the NULL state
3491    * and add the pad under the state_lock to prevent state changes
3492    * between activating and adding */
3493   g_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
3494   if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
3495     gst_pad_set_active (srcpad, TRUE);
3496     gst_pad_set_active (sinkpad, TRUE);
3497   }
3498   gst_element_add_pad (GST_ELEMENT (mqueue), srcpad);
3499   gst_element_add_pad (GST_ELEMENT (mqueue), sinkpad);
3500   if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
3501     gst_single_queue_start (mqueue, sq);
3502   }
3503   g_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
3504
3505   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
3506       sq->id);
3507
3508   return sq;
3509 }