Port gtk-doc comments to their equivalent markdown syntax
[platform/upstream/gstreamer.git] / plugins / elements / gstmultiqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
4  * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * gstmultiqueue.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 /**
26  * SECTION:element-multiqueue
27  * @title: multiqueue
28  * @see_also: #GstQueue
29  *
30  * Multiqueue is similar to a normal #GstQueue with the following additional
31  * features:
32  *
33  * 1) Multiple streamhandling
34  *
35  *  * The element handles queueing data on more than one stream at once. To
36  * achieve such a feature it has request sink pads (sink%u) and
37  * 'sometimes' src pads (src%u). When requesting a given sinkpad with gst_element_request_pad(),
38  * the associated srcpad for that stream will be created.
39  * Example: requesting sink1 will generate src1.
40  *
41  * 2) Non-starvation on multiple stream
42  *
43  * * If more than one stream is used with the element, the streams' queues
44  * will be dynamically grown (up to a limit), in order to ensure that no
45  * stream is risking data starvation. This guarantees that at any given
46  * time there are at least N bytes queued and available for each individual
47  * stream. If an EOS event comes through a srcpad, the associated queue will be
48  * considered as 'not-empty' in the queue-size-growing algorithm.
49  *
50  * 3) Non-linked srcpads graceful handling
51  *
52  * * In order to better support dynamic switching between streams, the multiqueue
53  * (unlike the current GStreamer queue) continues to push buffers on non-linked
54  * pads rather than shutting down. In addition, to prevent a non-linked stream from very quickly consuming all
55  * available buffers and thus 'racing ahead' of the other streams, the element
56  * must ensure that buffers and inlined events for a non-linked stream are pushed
57  * in the same order as they were received, relative to the other streams
58  * controlled by the element. This means that a buffer cannot be pushed to a
59  * non-linked pad any sooner than buffers in any other stream which were received
60  * before it.
61  *
62  * Data is queued until one of the limits specified by the
63  * #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
64  * #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
65  * more buffers into the queue will block the pushing thread until more space
66  * becomes available. #GstMultiQueue:extra-size-buffers,
67  *
68  *
69  * #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
70  * currently unused.
71  *
72  * The default queue size limits are 5 buffers, 10MB of data, or
73  * two second worth of data, whichever is reached first. Note that the number
74  * of buffers will dynamically grow depending on the fill level of
75  * other queues.
76  *
77  * The #GstMultiQueue::underrun signal is emitted when all of the queues
78  * are empty. The #GstMultiQueue::overrun signal is emitted when one of the
79  * queues is filled.
80  * Both signals are emitted from the context of the streaming thread.
81  *
82  * When using #GstMultiQueue:sync-by-running-time the unlinked streams will
83  * be throttled by the highest running-time of linked streams. This allows
84  * further relinking of those unlinked streams without them being in the
85  * future (i.e. to achieve gapless playback).
86  * When dealing with streams which have got different consumption requirements
87  * downstream (ex: video decoders which will consume more buffer (in time) than
88  * audio decoders), it is recommended to group streams of the same type
89  * by using the pad "group-id" property. This will further throttle streams
90  * in time within that group.
91  */
92
93 #ifdef HAVE_CONFIG_H
94 #  include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <stdio.h>
99 #include "gstmultiqueue.h"
100 #include <gst/glib-compat-private.h>
101
102 /**
103  * GstSingleQueue:
104  * @sinkpad: associated sink #GstPad
105  * @srcpad: associated source #GstPad
106  *
107  * Structure containing all information and properties about
108  * a single queue.
109  */
110 typedef struct _GstSingleQueue GstSingleQueue;
111
112 struct _GstSingleQueue
113 {
114   /* unique identifier of the queue */
115   guint id;
116   /* group of streams to which this queue belongs to */
117   guint groupid;
118   GstClockTimeDiff group_high_time;
119
120   GstMultiQueue *mqueue;
121
122   GstPad *sinkpad;
123   GstPad *srcpad;
124
125   /* flowreturn of previous srcpad push */
126   GstFlowReturn srcresult;
127   /* If something was actually pushed on
128    * this pad after flushing/pad activation
129    * and the srcresult corresponds to something
130    * real
131    */
132   gboolean pushed;
133
134   /* segments */
135   GstSegment sink_segment;
136   GstSegment src_segment;
137   gboolean has_src_segment;     /* preferred over initializing the src_segment to
138                                  * UNDEFINED as this doesn't requires adding ifs
139                                  * in every segment usage */
140
141   /* position of src/sink */
142   GstClockTimeDiff sinktime, srctime;
143   /* cached input value, used for interleave */
144   GstClockTimeDiff cached_sinktime;
145   /* TRUE if either position needs to be recalculated */
146   gboolean sink_tainted, src_tainted;
147
148   /* queue of data */
149   GstDataQueue *queue;
150   GstDataQueueSize max_size, extra_size;
151   GstClockTime cur_time;
152   gboolean is_eos;
153   gboolean is_sparse;
154   gboolean flushing;
155   gboolean active;
156
157   /* Protected by global lock */
158   guint32 nextid;               /* ID of the next object waiting to be pushed */
159   guint32 oldid;                /* ID of the last object pushed (last in a series) */
160   guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
161   GstClockTimeDiff next_time;   /* End running time of next buffer to be pushed */
162   GstClockTimeDiff last_time;   /* Start running time of last pushed buffer */
163   GCond turn;                   /* SingleQueue turn waiting conditional */
164
165   /* for serialized queries */
166   GCond query_handled;
167   gboolean last_query;
168   GstQuery *last_handled_query;
169
170   /* For interleave calculation */
171   GThread *thread;
172 };
173
174
175 /* Extension of GstDataQueueItem structure for our usage */
176 typedef struct _GstMultiQueueItem GstMultiQueueItem;
177
178 struct _GstMultiQueueItem
179 {
180   GstMiniObject *object;
181   guint size;
182   guint64 duration;
183   gboolean visible;
184
185   GDestroyNotify destroy;
186   guint32 posid;
187
188   gboolean is_query;
189 };
190
191 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
192 static void gst_single_queue_free (GstSingleQueue * squeue);
193
194 static void wake_up_next_non_linked (GstMultiQueue * mq);
195 static void compute_high_id (GstMultiQueue * mq);
196 static void compute_high_time (GstMultiQueue * mq, guint groupid);
197 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
198 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
199
200 static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq);
201 static void gst_multi_queue_post_buffering (GstMultiQueue * mq);
202 static void recheck_buffering_status (GstMultiQueue * mq);
203
204 static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
205
206 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
207     GST_PAD_SINK,
208     GST_PAD_REQUEST,
209     GST_STATIC_CAPS_ANY);
210
211 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
212     GST_PAD_SRC,
213     GST_PAD_SOMETIMES,
214     GST_STATIC_CAPS_ANY);
215
216 GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
217 #define GST_CAT_DEFAULT (multi_queue_debug)
218
219 /* Signals and args */
220 enum
221 {
222   SIGNAL_UNDERRUN,
223   SIGNAL_OVERRUN,
224   LAST_SIGNAL
225 };
226
227 /* default limits, we try to keep up to 2 seconds of data and if there is not
228  * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
229  * there is data in the queues. Normally, the byte and time limits are not hit
230  * in theses conditions. */
231 #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
232 #define DEFAULT_MAX_SIZE_BUFFERS 5
233 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
234
235 /* second limits. When we hit one of the above limits we are probably dealing
236  * with a badly muxed file and we scale the limits to these emergency values.
237  * This is currently not yet implemented.
238  * Since we dynamically scale the queue buffer size up to the limits but avoid
239  * going above the max-size-buffers when we can, we don't really need this
240  * aditional extra size. */
241 #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
242 #define DEFAULT_EXTRA_SIZE_BUFFERS 5
243 #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
244
245 #define DEFAULT_USE_BUFFERING FALSE
246 #define DEFAULT_LOW_WATERMARK  0.01
247 #define DEFAULT_HIGH_WATERMARK 0.99
248 #define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
249 #define DEFAULT_USE_INTERLEAVE FALSE
250 #define DEFAULT_UNLINKED_CACHE_TIME 250 * GST_MSECOND
251
252 enum
253 {
254   PROP_0,
255   PROP_EXTRA_SIZE_BYTES,
256   PROP_EXTRA_SIZE_BUFFERS,
257   PROP_EXTRA_SIZE_TIME,
258   PROP_MAX_SIZE_BYTES,
259   PROP_MAX_SIZE_BUFFERS,
260   PROP_MAX_SIZE_TIME,
261   PROP_USE_BUFFERING,
262   PROP_LOW_PERCENT,
263   PROP_HIGH_PERCENT,
264   PROP_LOW_WATERMARK,
265   PROP_HIGH_WATERMARK,
266   PROP_SYNC_BY_RUNNING_TIME,
267   PROP_USE_INTERLEAVE,
268   PROP_UNLINKED_CACHE_TIME,
269   PROP_LAST
270 };
271
272 /* Explanation for buffer levels and percentages:
273  *
274  * The buffering_level functions here return a value in a normalized range
275  * that specifies the current fill level of a queue. The range goes from 0 to
276  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
277  *
278  * This is not to be confused with the buffering_percent value, which is
279  * a *relative* quantity - relative to the low/high watermarks.
280  * buffering_percent = 0% means overall buffering_level is at the low watermark.
281  * buffering_percent = 100% means overall buffering_level is at the high watermark.
282  * buffering_percent is used for determining if the fill level has reached
283  * the high watermark, and for producing BUFFERING messages. This value
284  * always uses a 0..100 range (since it is a percentage).
285  *
286  * To avoid future confusions, whenever "buffering level" is mentioned, it
287  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
288  * range. Whenever "buffering_percent" is mentioned, it refers to the
289  * percentage value that is relative to the low/high watermark. */
290
291 /* Using a buffering level range of 0..1000000 to allow for a
292  * resolution in ppm (1 ppm = 0.0001%) */
293 #define MAX_BUFFERING_LEVEL 1000000
294
295 /* How much 1% makes up in the buffer level range */
296 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
297
298 /* GstMultiQueuePad */
299
300 #define DEFAULT_PAD_GROUP_ID 0
301
302 enum
303 {
304   PROP_PAD_0,
305   PROP_PAD_GROUP_ID,
306 };
307
308 #define GST_TYPE_MULTIQUEUE_PAD            (gst_multiqueue_pad_get_type())
309 #define GST_MULTIQUEUE_PAD(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePad))
310 #define GST_IS_MULTIQUEUE_PAD(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTIQUEUE_PAD))
311 #define GST_MULTIQUEUE_PAD_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
312 #define GST_IS_MULTIQUEUE_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_MULTIQUEUE_PAD))
313 #define GST_MULTIQUEUE_PAD_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
314
315 struct _GstMultiQueuePad
316 {
317   GstPad parent;
318
319   GstSingleQueue *sq;
320 };
321
322 struct _GstMultiQueuePadClass
323 {
324   GstPadClass parent_class;
325 };
326
327 GType gst_multiqueue_pad_get_type (void);
328
329 G_DEFINE_TYPE (GstMultiQueuePad, gst_multiqueue_pad, GST_TYPE_PAD);
330 static void
331 gst_multiqueue_pad_get_property (GObject * object, guint prop_id,
332     GValue * value, GParamSpec * pspec)
333 {
334   GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
335
336   switch (prop_id) {
337     case PROP_PAD_GROUP_ID:
338       if (pad->sq)
339         g_value_set_uint (value, pad->sq->groupid);
340       break;
341     default:
342       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
343       break;
344   }
345 }
346
347 static void
348 gst_multiqueue_pad_set_property (GObject * object, guint prop_id,
349     const GValue * value, GParamSpec * pspec)
350 {
351   GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
352
353   switch (prop_id) {
354     case PROP_PAD_GROUP_ID:
355       GST_OBJECT_LOCK (pad);
356       if (pad->sq)
357         pad->sq->groupid = g_value_get_uint (value);
358       GST_OBJECT_UNLOCK (pad);
359       break;
360     default:
361       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
362       break;
363   }
364 }
365
366 static void
367 gst_multiqueue_pad_class_init (GstMultiQueuePadClass * klass)
368 {
369   GObjectClass *gobject_class = (GObjectClass *) klass;
370
371   gobject_class->set_property = gst_multiqueue_pad_set_property;
372   gobject_class->get_property = gst_multiqueue_pad_get_property;
373
374   /**
375    * GstMultiQueuePad:group-id:
376    *
377    * Group to which this pad belongs.
378    *
379    * Since: 1.10
380    */
381   g_object_class_install_property (gobject_class, PROP_PAD_GROUP_ID,
382       g_param_spec_uint ("group-id", "Group ID",
383           "Group to which this pad belongs", 0, G_MAXUINT32,
384           DEFAULT_PAD_GROUP_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
385 }
386
387 static void
388 gst_multiqueue_pad_init (GstMultiQueuePad * pad)
389 {
390
391 }
392
393
394 #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
395   g_mutex_lock (&q->qlock);                                              \
396 } G_STMT_END
397
398 #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
399   g_mutex_unlock (&q->qlock);                                            \
400 } G_STMT_END
401
402 #define SET_PERCENT(mq, perc) G_STMT_START {                             \
403   if (perc != mq->buffering_percent) {                                   \
404     mq->buffering_percent = perc;                                        \
405     mq->buffering_percent_changed = TRUE;                                \
406     GST_DEBUG_OBJECT (mq, "buffering %d percent", perc);                 \
407   }                                                                      \
408 } G_STMT_END
409
410 /* Convenience function */
411 static inline GstClockTimeDiff
412 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
413 {
414   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
415
416   if (GST_CLOCK_TIME_IS_VALID (val)) {
417     gboolean sign =
418         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
419     if (sign > 0)
420       res = val;
421     else if (sign < 0)
422       res = -val;
423   }
424   return res;
425 }
426
427 static void gst_multi_queue_finalize (GObject * object);
428 static void gst_multi_queue_set_property (GObject * object,
429     guint prop_id, const GValue * value, GParamSpec * pspec);
430 static void gst_multi_queue_get_property (GObject * object,
431     guint prop_id, GValue * value, GParamSpec * pspec);
432
433 static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
434     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
435 static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
436 static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
437     element, GstStateChange transition);
438
439 static void gst_multi_queue_loop (GstPad * pad);
440
441 #define _do_init \
442   GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
443 #define gst_multi_queue_parent_class parent_class
444 G_DEFINE_TYPE_WITH_CODE (GstMultiQueue, gst_multi_queue, GST_TYPE_ELEMENT,
445     _do_init);
446
447 static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
448
449 static void
450 gst_multi_queue_class_init (GstMultiQueueClass * klass)
451 {
452   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
453   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
454
455   gobject_class->set_property = gst_multi_queue_set_property;
456   gobject_class->get_property = gst_multi_queue_get_property;
457
458   /* SIGNALS */
459
460   /**
461    * GstMultiQueue::underrun:
462    * @multiqueue: the multiqueue instance
463    *
464    * This signal is emitted from the streaming thread when there is
465    * no data in any of the queues inside the multiqueue instance (underrun).
466    *
467    * This indicates either starvation or EOS from the upstream data sources.
468    */
469   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
470       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
471       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
472       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
473
474   /**
475    * GstMultiQueue::overrun:
476    * @multiqueue: the multiqueue instance
477    *
478    * Reports that one of the queues in the multiqueue is full (overrun).
479    * A queue is full if the total amount of data inside it (num-buffers, time,
480    * size) is higher than the boundary values which can be set through the
481    * GObject properties.
482    *
483    * This can be used as an indicator of pre-roll.
484    */
485   gst_multi_queue_signals[SIGNAL_OVERRUN] =
486       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
487       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
488       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
489
490   /* PROPERTIES */
491
492   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
493       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
494           "Max. amount of data in the queue (bytes, 0=disable)",
495           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
496           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
497           G_PARAM_STATIC_STRINGS));
498   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
499       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
500           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
501           DEFAULT_MAX_SIZE_BUFFERS,
502           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
503           G_PARAM_STATIC_STRINGS));
504   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
505       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
506           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
507           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
508           G_PARAM_STATIC_STRINGS));
509
510   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
511       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
512           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
513           " (NOT IMPLEMENTED)",
514           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
515           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
516   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
517       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
518           "Amount of buffers the queues can grow if one of them is empty (0=disable)"
519           " (NOT IMPLEMENTED)",
520           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
521           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
522   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
523       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
524           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
525           " (NOT IMPLEMENTED)",
526           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
527           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
528
529   /**
530    * GstMultiQueue:use-buffering:
531    *
532    * Enable the buffering option in multiqueue so that BUFFERING messages are
533    * emitted based on low-/high-percent thresholds.
534    */
535   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
536       g_param_spec_boolean ("use-buffering", "Use buffering",
537           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
538           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
539           G_PARAM_STATIC_STRINGS));
540   /**
541    * GstMultiQueue:low-percent:
542    *
543    * Low threshold percent for buffering to start.
544    */
545   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
546       g_param_spec_int ("low-percent", "Low percent",
547           "Low threshold for buffering to start. Only used if use-buffering is True "
548           "(Deprecated: use low-watermark instead)",
549           0, 100, DEFAULT_LOW_WATERMARK * 100,
550           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
551   /**
552    * GstMultiQueue:high-percent:
553    *
554    * High threshold percent for buffering to finish.
555    */
556   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
557       g_param_spec_int ("high-percent", "High percent",
558           "High threshold for buffering to finish. Only used if use-buffering is True "
559           "(Deprecated: use high-watermark instead)",
560           0, 100, DEFAULT_HIGH_WATERMARK * 100,
561           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
562   /**
563    * GstMultiQueue:low-watermark:
564    *
565    * Low threshold watermark for buffering to start.
566    *
567    * Since: 1.10
568    */
569   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
570       g_param_spec_double ("low-watermark", "Low watermark",
571           "Low threshold for buffering to start. Only used if use-buffering is True",
572           0.0, 1.0, DEFAULT_LOW_WATERMARK,
573           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
574   /**
575    * GstMultiQueue:high-watermark:
576    *
577    * High threshold watermark for buffering to finish.
578    *
579    * Since: 1.10
580    */
581   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
582       g_param_spec_double ("high-watermark", "High watermark",
583           "High threshold for buffering to finish. Only used if use-buffering is True",
584           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
585           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
586
587   /**
588    * GstMultiQueue:sync-by-running-time:
589    *
590    * If enabled multiqueue will synchronize deactivated or not-linked streams
591    * to the activated and linked streams by taking the running time.
592    * Otherwise multiqueue will synchronize the deactivated or not-linked
593    * streams by keeping the order in which buffers and events arrived compared
594    * to active and linked streams.
595    */
596   g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
597       g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
598           "Synchronize deactivated or not-linked streams by running time",
599           DEFAULT_SYNC_BY_RUNNING_TIME,
600           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
601
602   g_object_class_install_property (gobject_class, PROP_USE_INTERLEAVE,
603       g_param_spec_boolean ("use-interleave", "Use interleave",
604           "Adjust time limits based on input interleave",
605           DEFAULT_USE_INTERLEAVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
606
607   g_object_class_install_property (gobject_class, PROP_UNLINKED_CACHE_TIME,
608       g_param_spec_uint64 ("unlinked-cache-time", "Unlinked cache time (ns)",
609           "Extra buffering in time for unlinked streams (if 'sync-by-running-time')",
610           0, G_MAXUINT64, DEFAULT_UNLINKED_CACHE_TIME,
611           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
612           G_PARAM_STATIC_STRINGS));
613
614
615   gobject_class->finalize = gst_multi_queue_finalize;
616
617   gst_element_class_set_static_metadata (gstelement_class,
618       "MultiQueue",
619       "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
620   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
621   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
622
623   gstelement_class->request_new_pad =
624       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
625   gstelement_class->release_pad =
626       GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
627   gstelement_class->change_state =
628       GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
629 }
630
631 static void
632 gst_multi_queue_init (GstMultiQueue * mqueue)
633 {
634   mqueue->nbqueues = 0;
635   mqueue->queues = NULL;
636
637   mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
638   mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
639   mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
640
641   mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
642   mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
643   mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
644
645   mqueue->use_buffering = DEFAULT_USE_BUFFERING;
646   mqueue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
647   mqueue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
648
649   mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
650   mqueue->use_interleave = DEFAULT_USE_INTERLEAVE;
651   mqueue->unlinked_cache_time = DEFAULT_UNLINKED_CACHE_TIME;
652
653   mqueue->counter = 1;
654   mqueue->highid = -1;
655   mqueue->high_time = GST_CLOCK_STIME_NONE;
656
657   g_mutex_init (&mqueue->qlock);
658   g_mutex_init (&mqueue->buffering_post_lock);
659 }
660
661 static void
662 gst_multi_queue_finalize (GObject * object)
663 {
664   GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
665
666   g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
667   g_list_free (mqueue->queues);
668   mqueue->queues = NULL;
669   mqueue->queues_cookie++;
670
671   /* free/unref instance data */
672   g_mutex_clear (&mqueue->qlock);
673   g_mutex_clear (&mqueue->buffering_post_lock);
674
675   G_OBJECT_CLASS (parent_class)->finalize (object);
676 }
677
678 #define SET_CHILD_PROPERTY(mq,format) G_STMT_START {            \
679     GList * tmp = mq->queues;                                   \
680     while (tmp) {                                               \
681       GstSingleQueue *q = (GstSingleQueue*)tmp->data;           \
682       q->max_size.format = mq->max_size.format;                 \
683       update_buffering (mq, q);                                 \
684       gst_data_queue_limits_changed (q->queue);                 \
685       tmp = g_list_next(tmp);                                   \
686     };                                                          \
687 } G_STMT_END
688
689 static void
690 gst_multi_queue_set_property (GObject * object, guint prop_id,
691     const GValue * value, GParamSpec * pspec)
692 {
693   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
694
695   switch (prop_id) {
696     case PROP_MAX_SIZE_BYTES:
697       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
698       mq->max_size.bytes = g_value_get_uint (value);
699       SET_CHILD_PROPERTY (mq, bytes);
700       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
701       gst_multi_queue_post_buffering (mq);
702       break;
703     case PROP_MAX_SIZE_BUFFERS:
704     {
705       GList *tmp;
706       gint new_size = g_value_get_uint (value);
707
708       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
709
710       mq->max_size.visible = new_size;
711
712       tmp = mq->queues;
713       while (tmp) {
714         GstDataQueueSize size;
715         GstSingleQueue *q = (GstSingleQueue *) tmp->data;
716         gst_data_queue_get_level (q->queue, &size);
717
718         GST_DEBUG_OBJECT (mq, "Queue %d: Requested buffers size: %d,"
719             " current: %d, current max %d", q->id, new_size, size.visible,
720             q->max_size.visible);
721
722         /* do not reduce max size below current level if the single queue
723          * has grown because of empty queue */
724         if (new_size == 0) {
725           q->max_size.visible = new_size;
726         } else if (q->max_size.visible == 0) {
727           q->max_size.visible = MAX (new_size, size.visible);
728         } else if (new_size > size.visible) {
729           q->max_size.visible = new_size;
730         }
731         update_buffering (mq, q);
732         gst_data_queue_limits_changed (q->queue);
733         tmp = g_list_next (tmp);
734       }
735
736       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
737       gst_multi_queue_post_buffering (mq);
738
739       break;
740     }
741     case PROP_MAX_SIZE_TIME:
742       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
743       mq->max_size.time = g_value_get_uint64 (value);
744       SET_CHILD_PROPERTY (mq, time);
745       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
746       gst_multi_queue_post_buffering (mq);
747       break;
748     case PROP_EXTRA_SIZE_BYTES:
749       mq->extra_size.bytes = g_value_get_uint (value);
750       break;
751     case PROP_EXTRA_SIZE_BUFFERS:
752       mq->extra_size.visible = g_value_get_uint (value);
753       break;
754     case PROP_EXTRA_SIZE_TIME:
755       mq->extra_size.time = g_value_get_uint64 (value);
756       break;
757     case PROP_USE_BUFFERING:
758       mq->use_buffering = g_value_get_boolean (value);
759       recheck_buffering_status (mq);
760       break;
761     case PROP_LOW_PERCENT:
762       mq->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
763       /* Recheck buffering status - the new low_watermark value might
764        * be above the current fill level. If the old low_watermark one
765        * was below the current level, this means that mq->buffering is
766        * disabled and needs to be re-enabled. */
767       recheck_buffering_status (mq);
768       break;
769     case PROP_HIGH_PERCENT:
770       mq->high_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
771       recheck_buffering_status (mq);
772       break;
773     case PROP_LOW_WATERMARK:
774       mq->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
775       recheck_buffering_status (mq);
776       break;
777     case PROP_HIGH_WATERMARK:
778       mq->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
779       recheck_buffering_status (mq);
780       break;
781     case PROP_SYNC_BY_RUNNING_TIME:
782       mq->sync_by_running_time = g_value_get_boolean (value);
783       break;
784     case PROP_USE_INTERLEAVE:
785       mq->use_interleave = g_value_get_boolean (value);
786       break;
787     case PROP_UNLINKED_CACHE_TIME:
788       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
789       mq->unlinked_cache_time = g_value_get_uint64 (value);
790       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
791       gst_multi_queue_post_buffering (mq);
792       break;
793     default:
794       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
795       break;
796   }
797 }
798
799 static void
800 gst_multi_queue_get_property (GObject * object, guint prop_id,
801     GValue * value, GParamSpec * pspec)
802 {
803   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
804
805   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
806
807   switch (prop_id) {
808     case PROP_EXTRA_SIZE_BYTES:
809       g_value_set_uint (value, mq->extra_size.bytes);
810       break;
811     case PROP_EXTRA_SIZE_BUFFERS:
812       g_value_set_uint (value, mq->extra_size.visible);
813       break;
814     case PROP_EXTRA_SIZE_TIME:
815       g_value_set_uint64 (value, mq->extra_size.time);
816       break;
817     case PROP_MAX_SIZE_BYTES:
818       g_value_set_uint (value, mq->max_size.bytes);
819       break;
820     case PROP_MAX_SIZE_BUFFERS:
821       g_value_set_uint (value, mq->max_size.visible);
822       break;
823     case PROP_MAX_SIZE_TIME:
824       g_value_set_uint64 (value, mq->max_size.time);
825       break;
826     case PROP_USE_BUFFERING:
827       g_value_set_boolean (value, mq->use_buffering);
828       break;
829     case PROP_LOW_PERCENT:
830       g_value_set_int (value, mq->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
831       break;
832     case PROP_HIGH_PERCENT:
833       g_value_set_int (value, mq->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
834       break;
835     case PROP_LOW_WATERMARK:
836       g_value_set_double (value, mq->low_watermark /
837           (gdouble) MAX_BUFFERING_LEVEL);
838       break;
839     case PROP_HIGH_WATERMARK:
840       g_value_set_double (value, mq->high_watermark /
841           (gdouble) MAX_BUFFERING_LEVEL);
842       break;
843     case PROP_SYNC_BY_RUNNING_TIME:
844       g_value_set_boolean (value, mq->sync_by_running_time);
845       break;
846     case PROP_USE_INTERLEAVE:
847       g_value_set_boolean (value, mq->use_interleave);
848       break;
849     case PROP_UNLINKED_CACHE_TIME:
850       g_value_set_uint64 (value, mq->unlinked_cache_time);
851       break;
852     default:
853       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
854       break;
855   }
856
857   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
858 }
859
860 static GstIterator *
861 gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
862 {
863   GstIterator *it = NULL;
864   GstPad *opad;
865   GstSingleQueue *squeue;
866   GstMultiQueue *mq = GST_MULTI_QUEUE (parent);
867   GValue val = { 0, };
868
869   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
870   squeue = gst_pad_get_element_private (pad);
871   if (!squeue)
872     goto out;
873
874   if (squeue->sinkpad == pad)
875     opad = gst_object_ref (squeue->srcpad);
876   else if (squeue->srcpad == pad)
877     opad = gst_object_ref (squeue->sinkpad);
878   else
879     goto out;
880
881   g_value_init (&val, GST_TYPE_PAD);
882   g_value_set_object (&val, opad);
883   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
884   g_value_unset (&val);
885
886   gst_object_unref (opad);
887
888 out:
889   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
890
891   return it;
892 }
893
894
895 /*
896  * GstElement methods
897  */
898
899 static GstPad *
900 gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
901     const gchar * name, const GstCaps * caps)
902 {
903   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
904   GstSingleQueue *squeue;
905   GstPad *new_pad;
906   guint temp_id = -1;
907
908   if (name) {
909     sscanf (name + 4, "_%u", &temp_id);
910     GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
911   }
912
913   /* Create a new single queue, add the sink and source pad and return the sink pad */
914   squeue = gst_single_queue_new (mqueue, temp_id);
915
916   new_pad = squeue ? squeue->sinkpad : NULL;
917
918   GST_DEBUG_OBJECT (mqueue, "Returning pad %" GST_PTR_FORMAT, new_pad);
919
920   return new_pad;
921 }
922
923 static void
924 gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
925 {
926   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
927   GstSingleQueue *sq = NULL;
928   GList *tmp;
929
930   GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
931
932   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
933   /* Find which single queue it belongs to, knowing that it should be a sinkpad */
934   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
935     sq = (GstSingleQueue *) tmp->data;
936
937     if (sq->sinkpad == pad)
938       break;
939   }
940
941   if (!tmp) {
942     GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
943     GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
944     return;
945   }
946
947   /* FIXME: The removal of the singlequeue should probably not happen until it
948    * finishes draining */
949
950   /* remove it from the list */
951   mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
952   mqueue->queues_cookie++;
953
954   /* FIXME : recompute next-non-linked */
955   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
956
957   /* delete SingleQueue */
958   gst_data_queue_set_flushing (sq->queue, TRUE);
959
960   gst_pad_set_active (sq->srcpad, FALSE);
961   gst_pad_set_active (sq->sinkpad, FALSE);
962   gst_pad_set_element_private (sq->srcpad, NULL);
963   gst_pad_set_element_private (sq->sinkpad, NULL);
964   gst_element_remove_pad (element, sq->srcpad);
965   gst_element_remove_pad (element, sq->sinkpad);
966   gst_single_queue_free (sq);
967 }
968
969 static GstStateChangeReturn
970 gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
971 {
972   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
973   GstSingleQueue *sq = NULL;
974   GstStateChangeReturn result;
975
976   switch (transition) {
977     case GST_STATE_CHANGE_READY_TO_PAUSED:{
978       GList *tmp;
979
980       /* Set all pads to non-flushing */
981       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
982       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
983         sq = (GstSingleQueue *) tmp->data;
984         sq->flushing = FALSE;
985       }
986
987       /* the visible limit might not have been set on single queues that have grown because of other queueus were empty */
988       SET_CHILD_PROPERTY (mqueue, visible);
989
990       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
991       gst_multi_queue_post_buffering (mqueue);
992
993       break;
994     }
995     case GST_STATE_CHANGE_PAUSED_TO_READY:{
996       GList *tmp;
997
998       /* Un-wait all waiting pads */
999       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1000       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
1001         sq = (GstSingleQueue *) tmp->data;
1002         sq->flushing = TRUE;
1003         g_cond_signal (&sq->turn);
1004
1005         sq->last_query = FALSE;
1006         g_cond_signal (&sq->query_handled);
1007       }
1008       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1009       break;
1010     }
1011     default:
1012       break;
1013   }
1014
1015   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1016
1017   switch (transition) {
1018     default:
1019       break;
1020   }
1021
1022   return result;
1023 }
1024
1025 static gboolean
1026 gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
1027     gboolean full)
1028 {
1029   gboolean result;
1030
1031   GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
1032       sq->id);
1033
1034   if (flush) {
1035     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1036     sq->srcresult = GST_FLOW_FLUSHING;
1037     gst_data_queue_set_flushing (sq->queue, TRUE);
1038
1039     sq->flushing = TRUE;
1040
1041     /* wake up non-linked task */
1042     GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
1043         sq->id);
1044     g_cond_signal (&sq->turn);
1045     sq->last_query = FALSE;
1046     g_cond_signal (&sq->query_handled);
1047     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1048
1049     GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
1050     result = gst_pad_pause_task (sq->srcpad);
1051     sq->sink_tainted = sq->src_tainted = TRUE;
1052   } else {
1053     gst_single_queue_flush_queue (sq, full);
1054
1055     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1056     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
1057     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1058     sq->has_src_segment = FALSE;
1059     /* All pads start off not-linked for a smooth kick-off */
1060     sq->srcresult = GST_FLOW_OK;
1061     sq->pushed = FALSE;
1062     sq->cur_time = 0;
1063     sq->max_size.visible = mq->max_size.visible;
1064     sq->is_eos = FALSE;
1065     sq->nextid = 0;
1066     sq->oldid = 0;
1067     sq->last_oldid = G_MAXUINT32;
1068     sq->next_time = GST_CLOCK_STIME_NONE;
1069     sq->last_time = GST_CLOCK_STIME_NONE;
1070     sq->cached_sinktime = GST_CLOCK_STIME_NONE;
1071     sq->group_high_time = GST_CLOCK_STIME_NONE;
1072     gst_data_queue_set_flushing (sq->queue, FALSE);
1073
1074     /* We will become active again on the next buffer/gap */
1075     sq->active = FALSE;
1076
1077     /* Reset high time to be recomputed next */
1078     mq->high_time = GST_CLOCK_STIME_NONE;
1079
1080     sq->flushing = FALSE;
1081     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1082
1083     GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
1084     result =
1085         gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
1086         sq->srcpad, NULL);
1087   }
1088   return result;
1089 }
1090
1091 /* WITH LOCK TAKEN */
1092 static gint
1093 get_buffering_level (GstSingleQueue * sq)
1094 {
1095   GstDataQueueSize size;
1096   gint buffering_level, tmp;
1097
1098   gst_data_queue_get_level (sq->queue, &size);
1099
1100   GST_DEBUG_OBJECT (sq->mqueue,
1101       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
1102       G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
1103       size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
1104
1105   /* get bytes and time buffer levels and take the max */
1106   if (sq->is_eos || sq->srcresult == GST_FLOW_NOT_LINKED || sq->is_sparse) {
1107     buffering_level = MAX_BUFFERING_LEVEL;
1108   } else {
1109     buffering_level = 0;
1110     if (sq->max_size.time > 0) {
1111       tmp =
1112           gst_util_uint64_scale (sq->cur_time,
1113           MAX_BUFFERING_LEVEL, sq->max_size.time);
1114       buffering_level = MAX (buffering_level, tmp);
1115     }
1116     if (sq->max_size.bytes > 0) {
1117       tmp =
1118           gst_util_uint64_scale_int (size.bytes,
1119           MAX_BUFFERING_LEVEL, sq->max_size.bytes);
1120       buffering_level = MAX (buffering_level, tmp);
1121     }
1122   }
1123
1124   return buffering_level;
1125 }
1126
1127 /* WITH LOCK TAKEN */
1128 static void
1129 update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
1130 {
1131   gint buffering_level, percent;
1132
1133   /* nothing to dowhen we are not in buffering mode */
1134   if (!mq->use_buffering)
1135     return;
1136
1137   buffering_level = get_buffering_level (sq);
1138
1139   /* scale so that if buffering_level equals the high watermark,
1140    * the percentage is 100% */
1141   percent = gst_util_uint64_scale (buffering_level, 100, mq->high_watermark);
1142   /* clip */
1143   if (percent > 100)
1144     percent = 100;
1145
1146   if (mq->buffering) {
1147     if (buffering_level >= mq->high_watermark) {
1148       mq->buffering = FALSE;
1149     }
1150     /* make sure it increases */
1151     percent = MAX (mq->buffering_percent, percent);
1152
1153     SET_PERCENT (mq, percent);
1154   } else {
1155     GList *iter;
1156     gboolean is_buffering = TRUE;
1157
1158     for (iter = mq->queues; iter; iter = g_list_next (iter)) {
1159       GstSingleQueue *oq = (GstSingleQueue *) iter->data;
1160
1161       if (get_buffering_level (oq) >= mq->high_watermark) {
1162         is_buffering = FALSE;
1163
1164         break;
1165       }
1166     }
1167
1168     if (is_buffering && buffering_level < mq->low_watermark) {
1169       mq->buffering = TRUE;
1170       SET_PERCENT (mq, percent);
1171     }
1172   }
1173 }
1174
1175 static void
1176 gst_multi_queue_post_buffering (GstMultiQueue * mq)
1177 {
1178   GstMessage *msg = NULL;
1179
1180   g_mutex_lock (&mq->buffering_post_lock);
1181   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1182   if (mq->buffering_percent_changed) {
1183     gint percent = mq->buffering_percent;
1184
1185     mq->buffering_percent_changed = FALSE;
1186
1187     GST_DEBUG_OBJECT (mq, "Going to post buffering: %d%%", percent);
1188     msg = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
1189   }
1190   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1191
1192   if (msg != NULL)
1193     gst_element_post_message (GST_ELEMENT_CAST (mq), msg);
1194
1195   g_mutex_unlock (&mq->buffering_post_lock);
1196 }
1197
1198 static void
1199 recheck_buffering_status (GstMultiQueue * mq)
1200 {
1201   if (!mq->use_buffering && mq->buffering) {
1202     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1203     mq->buffering = FALSE;
1204     GST_DEBUG_OBJECT (mq,
1205         "Buffering property disabled, but queue was still buffering; "
1206         "setting buffering percentage to 100%%");
1207     SET_PERCENT (mq, 100);
1208     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1209   }
1210
1211   if (mq->use_buffering) {
1212     GList *tmp;
1213     gint old_perc;
1214
1215     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1216
1217     /* force buffering percentage to be recalculated */
1218     old_perc = mq->buffering_percent;
1219     mq->buffering_percent = 0;
1220
1221     tmp = mq->queues;
1222     while (tmp) {
1223       GstSingleQueue *q = (GstSingleQueue *) tmp->data;
1224       update_buffering (mq, q);
1225       gst_data_queue_limits_changed (q->queue);
1226       tmp = g_list_next (tmp);
1227     }
1228
1229     GST_DEBUG_OBJECT (mq,
1230         "Recalculated buffering percentage: old: %d%% new: %d%%",
1231         old_perc, mq->buffering_percent);
1232
1233     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1234   }
1235
1236   gst_multi_queue_post_buffering (mq);
1237 }
1238
1239 static void
1240 calculate_interleave (GstMultiQueue * mq)
1241 {
1242   GstClockTimeDiff low, high;
1243   GstClockTime interleave;
1244   GList *tmp;
1245
1246   low = high = GST_CLOCK_STIME_NONE;
1247   interleave = mq->interleave;
1248   /* Go over all single queues and calculate lowest/highest value */
1249   for (tmp = mq->queues; tmp; tmp = tmp->next) {
1250     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1251     /* Ignore sparse streams for interleave calculation */
1252     if (sq->is_sparse)
1253       continue;
1254     /* If a stream is not active yet (hasn't received any buffers), set
1255      * a maximum interleave to allow it to receive more data */
1256     if (!sq->active) {
1257       GST_LOG_OBJECT (mq,
1258           "queue %d is not active yet, forcing interleave to 5s", sq->id);
1259       mq->interleave = 5 * GST_SECOND;
1260       /* Update max-size time */
1261       mq->max_size.time = mq->interleave;
1262       SET_CHILD_PROPERTY (mq, time);
1263       goto beach;
1264     }
1265     if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) {
1266       if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low)
1267         low = sq->cached_sinktime;
1268       if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high)
1269         high = sq->cached_sinktime;
1270     }
1271     GST_LOG_OBJECT (mq,
1272         "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
1273         " high:%" GST_STIME_FORMAT, sq->id,
1274         GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low),
1275         GST_STIME_ARGS (high));
1276   }
1277
1278   if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
1279     interleave = high - low;
1280     /* Padding of interleave and minimum value */
1281     /* FIXME : Make the minimum time interleave a property */
1282     interleave = (150 * interleave / 100) + 250 * GST_MSECOND;
1283
1284     /* Update the stored interleave if:
1285      * * No data has arrived yet (high == low)
1286      * * Or it went higher
1287      * * Or it went lower and we've gone past the previous interleave needed */
1288     if (high == low || interleave > mq->interleave ||
1289         ((mq->last_interleave_update + (2 * MIN (GST_SECOND,
1290                         mq->interleave)) < low)
1291             && interleave < (mq->interleave * 3 / 4))) {
1292       /* Update the interleave */
1293       mq->interleave = interleave;
1294       mq->last_interleave_update = high;
1295       /* Update max-size time */
1296       mq->max_size.time = mq->interleave;
1297       SET_CHILD_PROPERTY (mq, time);
1298     }
1299   }
1300
1301 beach:
1302   GST_DEBUG_OBJECT (mq,
1303       "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
1304       GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
1305       " last_interleave_update:%" GST_STIME_FORMAT, GST_STIME_ARGS (low),
1306       GST_STIME_ARGS (high), GST_TIME_ARGS (interleave),
1307       GST_TIME_ARGS (mq->interleave),
1308       GST_STIME_ARGS (mq->last_interleave_update));
1309 }
1310
1311
1312 /* calculate the diff between running time on the sink and src of the queue.
1313  * This is the total amount of time in the queue.
1314  * WITH LOCK TAKEN */
1315 static void
1316 update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
1317 {
1318   GstClockTimeDiff sink_time, src_time;
1319
1320   if (sq->sink_tainted) {
1321     sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment,
1322         sq->sink_segment.position);
1323
1324     GST_DEBUG_OBJECT (mq,
1325         "queue %d sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%"
1326         GST_STIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position),
1327         GST_STIME_ARGS (sink_time));
1328
1329     if (G_UNLIKELY (sq->last_time == GST_CLOCK_STIME_NONE)) {
1330       /* If the single queue still doesn't have a last_time set, this means
1331        * that nothing has been pushed out yet.
1332        * In order for the high_time computation to be as efficient as possible,
1333        * we set the last_time */
1334       sq->last_time = sink_time;
1335     }
1336     if (G_UNLIKELY (sink_time != GST_CLOCK_STIME_NONE)) {
1337       /* if we have a time, we become untainted and use the time */
1338       sq->sink_tainted = FALSE;
1339       if (mq->use_interleave) {
1340         sq->cached_sinktime = sink_time;
1341         calculate_interleave (mq);
1342       }
1343     }
1344   } else
1345     sink_time = sq->sinktime;
1346
1347   if (sq->src_tainted) {
1348     GstSegment *segment;
1349     gint64 position;
1350
1351     if (sq->has_src_segment) {
1352       segment = &sq->src_segment;
1353       position = sq->src_segment.position;
1354     } else {
1355       /*
1356        * If the src pad had no segment yet, use the sink segment
1357        * to avoid signalling overrun if the received sink segment has a
1358        * a position > max-size-time while the src pad time would be the default=0
1359        *
1360        * This can happen when switching pads on chained/adaptive streams and the
1361        * new chain has a segment with a much larger position
1362        */
1363       segment = &sq->sink_segment;
1364       position = sq->sink_segment.position;
1365     }
1366
1367     src_time = sq->srctime = my_segment_to_running_time (segment, position);
1368     /* if we have a time, we become untainted and use the time */
1369     if (G_UNLIKELY (src_time != GST_CLOCK_STIME_NONE)) {
1370       sq->src_tainted = FALSE;
1371     }
1372   } else
1373     src_time = sq->srctime;
1374
1375   GST_DEBUG_OBJECT (mq,
1376       "queue %d, sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, sq->id,
1377       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
1378
1379   /* This allows for streams with out of order timestamping - sometimes the
1380    * emerging timestamp is later than the arriving one(s) */
1381   if (G_LIKELY (GST_CLOCK_STIME_IS_VALID (sink_time) &&
1382           GST_CLOCK_STIME_IS_VALID (src_time) && sink_time > src_time))
1383     sq->cur_time = sink_time - src_time;
1384   else
1385     sq->cur_time = 0;
1386
1387   /* updating the time level can change the buffering state */
1388   update_buffering (mq, sq);
1389
1390   return;
1391 }
1392
1393 /* take a SEGMENT event and apply the values to segment, updating the time
1394  * level of queue. */
1395 static void
1396 apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1397     GstSegment * segment)
1398 {
1399   gst_event_copy_segment (event, segment);
1400
1401   /* now configure the values, we use these to track timestamps on the
1402    * sinkpad. */
1403   if (segment->format != GST_FORMAT_TIME) {
1404     /* non-time format, pretent the current time segment is closed with a
1405      * 0 start and unknown stop time. */
1406     segment->format = GST_FORMAT_TIME;
1407     segment->start = 0;
1408     segment->stop = -1;
1409     segment->time = 0;
1410   }
1411   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1412
1413   /* Make sure we have a valid initial segment position (and not garbage
1414    * from upstream) */
1415   if (segment->rate > 0.0)
1416     segment->position = segment->start;
1417   else
1418     segment->position = segment->stop;
1419   if (segment == &sq->sink_segment)
1420     sq->sink_tainted = TRUE;
1421   else {
1422     sq->has_src_segment = TRUE;
1423     sq->src_tainted = TRUE;
1424   }
1425
1426   GST_DEBUG_OBJECT (mq,
1427       "queue %d, configured SEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
1428
1429   /* segment can update the time level of the queue */
1430   update_time_level (mq, sq);
1431
1432   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1433   gst_multi_queue_post_buffering (mq);
1434 }
1435
1436 /* take a buffer and update segment, updating the time level of the queue. */
1437 static void
1438 apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
1439     GstClockTime duration, GstSegment * segment)
1440 {
1441   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1442
1443   /* if no timestamp is set, assume it's continuous with the previous
1444    * time */
1445   if (timestamp == GST_CLOCK_TIME_NONE)
1446     timestamp = segment->position;
1447
1448   /* add duration */
1449   if (duration != GST_CLOCK_TIME_NONE)
1450     timestamp += duration;
1451
1452   GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
1453       sq->id, segment == &sq->sink_segment ? "sink" : "src",
1454       GST_TIME_ARGS (timestamp));
1455
1456   segment->position = timestamp;
1457
1458   if (segment == &sq->sink_segment)
1459     sq->sink_tainted = TRUE;
1460   else
1461     sq->src_tainted = TRUE;
1462
1463   /* calc diff with other end */
1464   update_time_level (mq, sq);
1465   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1466   gst_multi_queue_post_buffering (mq);
1467 }
1468
1469 static void
1470 apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1471     GstSegment * segment)
1472 {
1473   GstClockTime timestamp;
1474   GstClockTime duration;
1475
1476   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1477
1478   gst_event_parse_gap (event, &timestamp, &duration);
1479
1480   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1481
1482     if (GST_CLOCK_TIME_IS_VALID (duration)) {
1483       timestamp += duration;
1484     }
1485
1486     segment->position = timestamp;
1487
1488     if (segment == &sq->sink_segment)
1489       sq->sink_tainted = TRUE;
1490     else
1491       sq->src_tainted = TRUE;
1492
1493     /* calc diff with other end */
1494     update_time_level (mq, sq);
1495   }
1496
1497   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1498   gst_multi_queue_post_buffering (mq);
1499 }
1500
1501 static GstClockTimeDiff
1502 get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
1503 {
1504   GstClockTimeDiff time = GST_CLOCK_STIME_NONE;
1505
1506   if (GST_IS_BUFFER (object)) {
1507     GstBuffer *buf = GST_BUFFER_CAST (object);
1508     GstClockTime btime = GST_BUFFER_DTS_OR_PTS (buf);
1509
1510     if (GST_CLOCK_TIME_IS_VALID (btime)) {
1511       if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1512         btime += GST_BUFFER_DURATION (buf);
1513       if (btime > segment->stop)
1514         btime = segment->stop;
1515       time = my_segment_to_running_time (segment, btime);
1516     }
1517   } else if (GST_IS_BUFFER_LIST (object)) {
1518     GstBufferList *list = GST_BUFFER_LIST_CAST (object);
1519     gint i, n;
1520     GstBuffer *buf;
1521
1522     n = gst_buffer_list_length (list);
1523     for (i = 0; i < n; i++) {
1524       GstClockTime btime;
1525       buf = gst_buffer_list_get (list, i);
1526       btime = GST_BUFFER_DTS_OR_PTS (buf);
1527       if (GST_CLOCK_TIME_IS_VALID (btime)) {
1528         if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1529           btime += GST_BUFFER_DURATION (buf);
1530         if (btime > segment->stop)
1531           btime = segment->stop;
1532         time = my_segment_to_running_time (segment, btime);
1533         if (!end)
1534           goto done;
1535       } else if (!end) {
1536         goto done;
1537       }
1538     }
1539   } else if (GST_IS_EVENT (object)) {
1540     GstEvent *event = GST_EVENT_CAST (object);
1541
1542     /* For newsegment events return the running time of the start position */
1543     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1544       const GstSegment *new_segment;
1545
1546       gst_event_parse_segment (event, &new_segment);
1547       if (new_segment->format == GST_FORMAT_TIME) {
1548         time =
1549             my_segment_to_running_time ((GstSegment *) new_segment,
1550             new_segment->start);
1551       }
1552     }
1553   }
1554
1555 done:
1556   return time;
1557 }
1558
1559 static GstFlowReturn
1560 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
1561     GstMiniObject * object, gboolean * allow_drop)
1562 {
1563   GstFlowReturn result = sq->srcresult;
1564
1565   if (GST_IS_BUFFER (object)) {
1566     GstBuffer *buffer;
1567     GstClockTime timestamp, duration;
1568
1569     buffer = GST_BUFFER_CAST (object);
1570     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1571     duration = GST_BUFFER_DURATION (buffer);
1572
1573     apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
1574
1575     /* Applying the buffer may have made the queue non-full again, unblock it if needed */
1576     gst_data_queue_limits_changed (sq->queue);
1577
1578     if (G_UNLIKELY (*allow_drop)) {
1579       GST_DEBUG_OBJECT (mq,
1580           "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
1581           sq->id, buffer, GST_TIME_ARGS (timestamp));
1582       gst_buffer_unref (buffer);
1583     } else {
1584       GST_DEBUG_OBJECT (mq,
1585           "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
1586           sq->id, buffer, GST_TIME_ARGS (timestamp));
1587       result = gst_pad_push (sq->srcpad, buffer);
1588     }
1589   } else if (GST_IS_EVENT (object)) {
1590     GstEvent *event;
1591
1592     event = GST_EVENT_CAST (object);
1593
1594     switch (GST_EVENT_TYPE (event)) {
1595       case GST_EVENT_EOS:
1596         result = GST_FLOW_EOS;
1597         if (G_UNLIKELY (*allow_drop))
1598           *allow_drop = FALSE;
1599         break;
1600       case GST_EVENT_SEGMENT:
1601         apply_segment (mq, sq, event, &sq->src_segment);
1602         /* Applying the segment may have made the queue non-full again, unblock it if needed */
1603         gst_data_queue_limits_changed (sq->queue);
1604         if (G_UNLIKELY (*allow_drop)) {
1605           result = GST_FLOW_OK;
1606           *allow_drop = FALSE;
1607         }
1608         break;
1609       case GST_EVENT_GAP:
1610         apply_gap (mq, sq, event, &sq->src_segment);
1611         /* Applying the gap may have made the queue non-full again, unblock it if needed */
1612         gst_data_queue_limits_changed (sq->queue);
1613         break;
1614       default:
1615         break;
1616     }
1617
1618     if (G_UNLIKELY (*allow_drop)) {
1619       GST_DEBUG_OBJECT (mq,
1620           "SingleQueue %d : Dropping EOS event %p of type %s",
1621           sq->id, event, GST_EVENT_TYPE_NAME (event));
1622       gst_event_unref (event);
1623     } else {
1624       GST_DEBUG_OBJECT (mq,
1625           "SingleQueue %d : Pushing event %p of type %s",
1626           sq->id, event, GST_EVENT_TYPE_NAME (event));
1627
1628       gst_pad_push_event (sq->srcpad, event);
1629     }
1630   } else if (GST_IS_QUERY (object)) {
1631     GstQuery *query;
1632     gboolean res;
1633
1634     query = GST_QUERY_CAST (object);
1635
1636     if (G_UNLIKELY (*allow_drop)) {
1637       GST_DEBUG_OBJECT (mq,
1638           "SingleQueue %d : Dropping EOS query %p", sq->id, query);
1639       gst_query_unref (query);
1640       res = FALSE;
1641     } else {
1642       res = gst_pad_peer_query (sq->srcpad, query);
1643     }
1644
1645     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1646     sq->last_query = res;
1647     sq->last_handled_query = query;
1648     g_cond_signal (&sq->query_handled);
1649     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1650   } else {
1651     g_warning ("Unexpected object in singlequeue %u (refcounting problem?)",
1652         sq->id);
1653   }
1654   return result;
1655
1656   /* ERRORS */
1657 }
1658
1659 static GstMiniObject *
1660 gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
1661 {
1662   GstMiniObject *res;
1663
1664   res = item->object;
1665   item->object = NULL;
1666
1667   return res;
1668 }
1669
1670 static void
1671 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
1672 {
1673   if (!item->is_query && item->object)
1674     gst_mini_object_unref (item->object);
1675   g_slice_free (GstMultiQueueItem, item);
1676 }
1677
1678 /* takes ownership of passed mini object! */
1679 static GstMultiQueueItem *
1680 gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
1681 {
1682   GstMultiQueueItem *item;
1683
1684   item = g_slice_new (GstMultiQueueItem);
1685   item->object = object;
1686   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1687   item->posid = curid;
1688   item->is_query = GST_IS_QUERY (object);
1689
1690   item->size = gst_buffer_get_size (GST_BUFFER_CAST (object));
1691   item->duration = GST_BUFFER_DURATION (object);
1692   if (item->duration == GST_CLOCK_TIME_NONE)
1693     item->duration = 0;
1694   item->visible = TRUE;
1695   return item;
1696 }
1697
1698 static GstMultiQueueItem *
1699 gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
1700 {
1701   GstMultiQueueItem *item;
1702
1703   item = g_slice_new (GstMultiQueueItem);
1704   item->object = object;
1705   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1706   item->posid = curid;
1707   item->is_query = GST_IS_QUERY (object);
1708
1709   item->size = 0;
1710   item->duration = 0;
1711   item->visible = FALSE;
1712   return item;
1713 }
1714
1715 /* Each main loop attempts to push buffers until the return value
1716  * is not-linked. not-linked pads are not allowed to push data beyond
1717  * any linked pads, so they don't 'rush ahead of the pack'.
1718  */
1719 static void
1720 gst_multi_queue_loop (GstPad * pad)
1721 {
1722   GstSingleQueue *sq;
1723   GstMultiQueueItem *item;
1724   GstDataQueueItem *sitem;
1725   GstMultiQueue *mq;
1726   GstMiniObject *object = NULL;
1727   guint32 newid;
1728   GstFlowReturn result;
1729   GstClockTimeDiff next_time;
1730   gboolean is_buffer;
1731   gboolean do_update_buffering = FALSE;
1732   gboolean dropping = FALSE;
1733
1734   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1735   mq = sq->mqueue;
1736
1737 next:
1738   GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
1739
1740   if (sq->flushing)
1741     goto out_flushing;
1742
1743   /* Get something from the queue, blocking until that happens, or we get
1744    * flushed */
1745   if (!(gst_data_queue_pop (sq->queue, &sitem)))
1746     goto out_flushing;
1747
1748   item = (GstMultiQueueItem *) sitem;
1749   newid = item->posid;
1750
1751   /* steal the object and destroy the item */
1752   object = gst_multi_queue_item_steal_object (item);
1753   gst_multi_queue_item_destroy (item);
1754
1755   is_buffer = GST_IS_BUFFER (object);
1756
1757   /* Get running time of the item. Events will have GST_CLOCK_STIME_NONE */
1758   next_time = get_running_time (&sq->src_segment, object, FALSE);
1759
1760   GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
1761       sq->id, newid, sq->last_oldid);
1762
1763   /* If we're not-linked, we do some extra work because we might need to
1764    * wait before pushing. If we're linked but there's a gap in the IDs,
1765    * or it's the first loop, or we just passed the previous highid,
1766    * we might need to wake some sleeping pad up, so there's extra work
1767    * there too */
1768   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1769   if (sq->srcresult == GST_FLOW_NOT_LINKED
1770       || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
1771       || sq->last_oldid > mq->highid) {
1772     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
1773         gst_flow_get_name (sq->srcresult));
1774
1775     /* Check again if we're flushing after the lock is taken,
1776      * the flush flag might have been changed in the meantime */
1777     if (sq->flushing) {
1778       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1779       goto out_flushing;
1780     }
1781
1782     /* Update the nextid so other threads know when to wake us up */
1783     sq->nextid = newid;
1784     /* Take into account the extra cache time since we're unlinked */
1785     if (GST_CLOCK_STIME_IS_VALID (next_time))
1786       next_time += mq->unlinked_cache_time;
1787     sq->next_time = next_time;
1788
1789     /* Update the oldid (the last ID we output) for highid tracking */
1790     if (sq->last_oldid != G_MAXUINT32)
1791       sq->oldid = sq->last_oldid;
1792
1793     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1794       gboolean should_wait;
1795       /* Go to sleep until it's time to push this buffer */
1796
1797       /* Recompute the highid */
1798       compute_high_id (mq);
1799       /* Recompute the high time */
1800       compute_high_time (mq, sq->groupid);
1801
1802       GST_DEBUG_OBJECT (mq,
1803           "groupid %d high_time %" GST_STIME_FORMAT " next_time %"
1804           GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (sq->group_high_time),
1805           GST_STIME_ARGS (next_time));
1806
1807       if (mq->sync_by_running_time) {
1808         if (sq->group_high_time == GST_CLOCK_STIME_NONE) {
1809           should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
1810               (mq->high_time == GST_CLOCK_STIME_NONE
1811               || next_time > mq->high_time);
1812         } else {
1813           should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
1814               next_time > sq->group_high_time;
1815         }
1816       } else
1817         should_wait = newid > mq->highid;
1818
1819       while (should_wait && sq->srcresult == GST_FLOW_NOT_LINKED) {
1820
1821         GST_DEBUG_OBJECT (mq,
1822             "queue %d sleeping for not-linked wakeup with "
1823             "newid %u, highid %u, next_time %" GST_STIME_FORMAT
1824             ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
1825             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
1826
1827         /* Wake up all non-linked pads before we sleep */
1828         wake_up_next_non_linked (mq);
1829
1830         mq->numwaiting++;
1831         g_cond_wait (&sq->turn, &mq->qlock);
1832         mq->numwaiting--;
1833
1834         if (sq->flushing) {
1835           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1836           goto out_flushing;
1837         }
1838
1839         /* Recompute the high time and ID */
1840         compute_high_time (mq, sq->groupid);
1841         compute_high_id (mq);
1842
1843         GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
1844             "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
1845             ", high_time %" GST_STIME_FORMAT " mq high_time %" GST_STIME_FORMAT,
1846             sq->id, newid, mq->highid,
1847             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time),
1848             GST_STIME_ARGS (mq->high_time));
1849
1850         if (mq->sync_by_running_time) {
1851           if (sq->group_high_time == GST_CLOCK_STIME_NONE) {
1852             should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
1853                 (mq->high_time == GST_CLOCK_STIME_NONE
1854                 || next_time > mq->high_time);
1855           } else {
1856             should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
1857                 next_time > sq->group_high_time;
1858           }
1859         } else
1860           should_wait = newid > mq->highid;
1861       }
1862
1863       /* Re-compute the high_id in case someone else pushed */
1864       compute_high_id (mq);
1865       compute_high_time (mq, sq->groupid);
1866     } else {
1867       compute_high_id (mq);
1868       compute_high_time (mq, sq->groupid);
1869       /* Wake up all non-linked pads */
1870       wake_up_next_non_linked (mq);
1871     }
1872     /* We're done waiting, we can clear the nextid and nexttime */
1873     sq->nextid = 0;
1874     sq->next_time = GST_CLOCK_STIME_NONE;
1875   }
1876   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1877
1878   if (sq->flushing)
1879     goto out_flushing;
1880
1881   GST_LOG_OBJECT (mq, "sq:%d BEFORE PUSHING sq->srcresult: %s", sq->id,
1882       gst_flow_get_name (sq->srcresult));
1883
1884   /* Update time stats */
1885   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1886   next_time = get_running_time (&sq->src_segment, object, TRUE);
1887   if (GST_CLOCK_STIME_IS_VALID (next_time)) {
1888     if (sq->last_time == GST_CLOCK_STIME_NONE || sq->last_time < next_time)
1889       sq->last_time = next_time;
1890     if (mq->high_time == GST_CLOCK_STIME_NONE || mq->high_time <= next_time) {
1891       /* Wake up all non-linked pads now that we advanced the high time */
1892       mq->high_time = next_time;
1893       wake_up_next_non_linked (mq);
1894     }
1895   }
1896   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1897
1898   /* Try to push out the new object */
1899   result = gst_single_queue_push_one (mq, sq, object, &dropping);
1900   object = NULL;
1901
1902   /* Check if we pushed something already and if this is
1903    * now a switch from an active to a non-active stream.
1904    *
1905    * If it is, we reset all the waiting streams, let them
1906    * push another buffer to see if they're now active again.
1907    * This allows faster switching between streams and prevents
1908    * deadlocks if downstream does any waiting too.
1909    */
1910   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1911   if (sq->pushed && sq->srcresult == GST_FLOW_OK
1912       && result == GST_FLOW_NOT_LINKED) {
1913     GList *tmp;
1914
1915     GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
1916         sq->id);
1917
1918     compute_high_id (mq);
1919     compute_high_time (mq, sq->groupid);
1920     do_update_buffering = TRUE;
1921
1922     /* maybe no-one is waiting */
1923     if (mq->numwaiting > 0) {
1924       /* Else figure out which singlequeue(s) need waking up */
1925       for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1926         GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
1927
1928         if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
1929           GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
1930           sq2->pushed = FALSE;
1931           sq2->srcresult = GST_FLOW_OK;
1932           g_cond_signal (&sq2->turn);
1933         }
1934       }
1935     }
1936   }
1937
1938   if (is_buffer)
1939     sq->pushed = TRUE;
1940
1941   /* now hold on a bit;
1942    * can not simply throw this result to upstream, because
1943    * that might already be onto another segment, so we have to make
1944    * sure we are relaying the correct info wrt proper segment */
1945   if (result == GST_FLOW_EOS && !dropping &&
1946       sq->srcresult != GST_FLOW_NOT_LINKED) {
1947     GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
1948     dropping = TRUE;
1949     /* pretend we have not seen EOS yet for upstream's sake */
1950     result = sq->srcresult;
1951   } else if (dropping && gst_data_queue_is_empty (sq->queue)) {
1952     /* queue empty, so stop dropping
1953      * we can commit the result we have now,
1954      * which is either OK after a segment, or EOS */
1955     GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
1956     dropping = FALSE;
1957     result = GST_FLOW_EOS;
1958   }
1959   sq->srcresult = result;
1960   sq->last_oldid = newid;
1961
1962   if (do_update_buffering)
1963     update_buffering (mq, sq);
1964
1965   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1966   gst_multi_queue_post_buffering (mq);
1967
1968   GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s (is_eos:%d)",
1969       sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (sq->srcpad));
1970
1971   /* Need to make sure wake up any sleeping pads when we exit */
1972   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1973   if (mq->numwaiting > 0 && (GST_PAD_IS_EOS (sq->srcpad)
1974           || sq->srcresult == GST_FLOW_EOS)) {
1975     compute_high_time (mq, sq->groupid);
1976     compute_high_id (mq);
1977     wake_up_next_non_linked (mq);
1978   }
1979   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1980
1981   if (dropping)
1982     goto next;
1983
1984   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
1985       && result != GST_FLOW_EOS)
1986     goto out_flushing;
1987
1988   return;
1989
1990 out_flushing:
1991   {
1992     if (object)
1993       gst_mini_object_unref (object);
1994
1995     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1996     sq->last_query = FALSE;
1997     g_cond_signal (&sq->query_handled);
1998
1999     /* Post an error message if we got EOS while downstream
2000      * has returned an error flow return. After EOS there
2001      * will be no further buffer which could propagate the
2002      * error upstream */
2003     if (sq->is_eos && sq->srcresult < GST_FLOW_EOS) {
2004       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2005       GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
2006     } else {
2007       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2008     }
2009
2010     /* upstream needs to see fatal result ASAP to shut things down,
2011      * but might be stuck in one of our other full queues;
2012      * so empty this one and trigger dynamic queue growth. At
2013      * this point the srcresult is not OK, NOT_LINKED
2014      * or EOS, i.e. a real failure */
2015     gst_single_queue_flush_queue (sq, FALSE);
2016     single_queue_underrun_cb (sq->queue, sq);
2017     gst_data_queue_set_flushing (sq->queue, TRUE);
2018     gst_pad_pause_task (sq->srcpad);
2019     GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
2020         "SingleQueue[%d] task paused, reason:%s",
2021         sq->id, gst_flow_get_name (sq->srcresult));
2022     return;
2023   }
2024 }
2025
2026 /**
2027  * gst_multi_queue_chain:
2028  *
2029  * This is similar to GstQueue's chain function, except:
2030  * _ we don't have leak behaviours,
2031  * _ we push with a unique id (curid)
2032  */
2033 static GstFlowReturn
2034 gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2035 {
2036   GstSingleQueue *sq;
2037   GstMultiQueue *mq;
2038   GstMultiQueueItem *item;
2039   guint32 curid;
2040   GstClockTime timestamp, duration;
2041
2042   sq = gst_pad_get_element_private (pad);
2043   mq = sq->mqueue;
2044
2045   /* if eos, we are always full, so avoid hanging incoming indefinitely */
2046   if (sq->is_eos)
2047     goto was_eos;
2048
2049   sq->active = TRUE;
2050
2051   /* Get a unique incrementing id */
2052   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2053
2054   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
2055   duration = GST_BUFFER_DURATION (buffer);
2056
2057   GST_LOG_OBJECT (mq,
2058       "SingleQueue %d : about to enqueue buffer %p with id %d (pts:%"
2059       GST_TIME_FORMAT " dts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT ")",
2060       sq->id, buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
2061       GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), GST_TIME_ARGS (duration));
2062
2063   item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
2064
2065   /* Update interleave before pushing data into queue */
2066   if (mq->use_interleave) {
2067     GstClockTime val = timestamp;
2068     GstClockTimeDiff dval;
2069
2070     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2071     if (val == GST_CLOCK_TIME_NONE)
2072       val = sq->sink_segment.position;
2073     if (duration != GST_CLOCK_TIME_NONE)
2074       val += duration;
2075
2076     dval = my_segment_to_running_time (&sq->sink_segment, val);
2077     if (GST_CLOCK_STIME_IS_VALID (dval)) {
2078       sq->cached_sinktime = dval;
2079       GST_DEBUG_OBJECT (mq,
2080           "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
2081           GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
2082           GST_STIME_ARGS (sq->cached_sinktime));
2083       calculate_interleave (mq);
2084     }
2085     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2086   }
2087
2088   if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
2089     goto flushing;
2090
2091   /* update time level, we must do this after pushing the data in the queue so
2092    * that we never end up filling the queue first. */
2093   apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
2094
2095 done:
2096   return sq->srcresult;
2097
2098   /* ERRORS */
2099 flushing:
2100   {
2101     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
2102         sq->id, gst_flow_get_name (sq->srcresult));
2103     gst_multi_queue_item_destroy (item);
2104     goto done;
2105   }
2106 was_eos:
2107   {
2108     GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return EOS");
2109     gst_buffer_unref (buffer);
2110     return GST_FLOW_EOS;
2111   }
2112 }
2113
2114 static gboolean
2115 gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
2116     GstPadMode mode, gboolean active)
2117 {
2118   gboolean res;
2119   GstSingleQueue *sq;
2120   GstMultiQueue *mq;
2121
2122   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2123   mq = (GstMultiQueue *) gst_pad_get_parent (pad);
2124
2125   /* mq is NULL if the pad is activated/deactivated before being
2126    * added to the multiqueue */
2127   if (mq)
2128     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2129
2130   switch (mode) {
2131     case GST_PAD_MODE_PUSH:
2132       if (active) {
2133         /* All pads start off linked until they push one buffer */
2134         sq->srcresult = GST_FLOW_OK;
2135         sq->pushed = FALSE;
2136         gst_data_queue_set_flushing (sq->queue, FALSE);
2137       } else {
2138         sq->srcresult = GST_FLOW_FLUSHING;
2139         sq->last_query = FALSE;
2140         g_cond_signal (&sq->query_handled);
2141         gst_data_queue_set_flushing (sq->queue, TRUE);
2142
2143         /* Wait until streaming thread has finished */
2144         if (mq)
2145           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2146         GST_PAD_STREAM_LOCK (pad);
2147         if (mq)
2148           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2149         gst_data_queue_flush (sq->queue);
2150         if (mq)
2151           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2152         GST_PAD_STREAM_UNLOCK (pad);
2153         if (mq)
2154           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2155       }
2156       res = TRUE;
2157       break;
2158     default:
2159       res = FALSE;
2160       break;
2161   }
2162
2163   if (mq) {
2164     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2165     gst_object_unref (mq);
2166   }
2167
2168   return res;
2169 }
2170
2171 static GstFlowReturn
2172 gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
2173 {
2174   GstSingleQueue *sq;
2175   GstMultiQueue *mq;
2176   guint32 curid;
2177   GstMultiQueueItem *item;
2178   gboolean res = TRUE;
2179   GstFlowReturn flowret = GST_FLOW_OK;
2180   GstEventType type;
2181   GstEvent *sref = NULL;
2182
2183   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2184   mq = (GstMultiQueue *) parent;
2185
2186   type = GST_EVENT_TYPE (event);
2187
2188   switch (type) {
2189     case GST_EVENT_STREAM_START:
2190     {
2191       if (mq->sync_by_running_time) {
2192         GstStreamFlags stream_flags;
2193         gst_event_parse_stream_flags (event, &stream_flags);
2194         if ((stream_flags & GST_STREAM_FLAG_SPARSE)) {
2195           GST_INFO_OBJECT (mq, "SingleQueue %d is a sparse stream", sq->id);
2196           sq->is_sparse = TRUE;
2197         }
2198       }
2199
2200       sq->thread = g_thread_self ();
2201
2202       /* Remove EOS flag */
2203       sq->is_eos = FALSE;
2204       break;
2205     }
2206     case GST_EVENT_FLUSH_START:
2207       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
2208           sq->id);
2209
2210       res = gst_pad_push_event (sq->srcpad, event);
2211
2212       gst_single_queue_flush (mq, sq, TRUE, FALSE);
2213       goto done;
2214
2215     case GST_EVENT_FLUSH_STOP:
2216       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
2217           sq->id);
2218
2219       res = gst_pad_push_event (sq->srcpad, event);
2220
2221       gst_single_queue_flush (mq, sq, FALSE, FALSE);
2222       goto done;
2223
2224     case GST_EVENT_SEGMENT:
2225       sref = gst_event_ref (event);
2226       break;
2227     case GST_EVENT_GAP:
2228       /* take ref because the queue will take ownership and we need the event
2229        * afterwards to update the segment */
2230       sref = gst_event_ref (event);
2231       if (mq->use_interleave) {
2232         GstClockTime val, dur;
2233         GstClockTime stime;
2234         gst_event_parse_gap (event, &val, &dur);
2235         if (GST_CLOCK_TIME_IS_VALID (val)) {
2236           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2237           if (GST_CLOCK_TIME_IS_VALID (dur))
2238             val += dur;
2239           stime = my_segment_to_running_time (&sq->sink_segment, val);
2240           if (GST_CLOCK_STIME_IS_VALID (stime)) {
2241             sq->cached_sinktime = stime;
2242             calculate_interleave (mq);
2243           }
2244           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2245         }
2246       }
2247       break;
2248
2249     default:
2250       if (!(GST_EVENT_IS_SERIALIZED (event))) {
2251         res = gst_pad_push_event (sq->srcpad, event);
2252         goto done;
2253       }
2254       break;
2255   }
2256
2257   /* if eos, we are always full, so avoid hanging incoming indefinitely */
2258   if (sq->is_eos)
2259     goto was_eos;
2260
2261   /* Get an unique incrementing id. */
2262   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2263
2264   item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
2265
2266   GST_DEBUG_OBJECT (mq,
2267       "SingleQueue %d : Enqueuing event %p of type %s with id %d",
2268       sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
2269
2270   if (!gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))
2271     goto flushing;
2272
2273   /* mark EOS when we received one, we must do that after putting the
2274    * buffer in the queue because EOS marks the buffer as filled. */
2275   switch (type) {
2276     case GST_EVENT_EOS:
2277       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2278       sq->is_eos = TRUE;
2279
2280       /* Post an error message if we got EOS while downstream
2281        * has returned an error flow return. After EOS there
2282        * will be no further buffer which could propagate the
2283        * error upstream */
2284       if (sq->srcresult < GST_FLOW_EOS) {
2285         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2286         GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
2287       } else {
2288         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2289       }
2290
2291       /* EOS affects the buffering state */
2292       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2293       update_buffering (mq, sq);
2294       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2295       single_queue_overrun_cb (sq->queue, sq);
2296       gst_multi_queue_post_buffering (mq);
2297       break;
2298     case GST_EVENT_SEGMENT:
2299       apply_segment (mq, sq, sref, &sq->sink_segment);
2300       gst_event_unref (sref);
2301       /* a new segment allows us to accept more buffers if we got EOS
2302        * from downstream */
2303       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2304       if (sq->srcresult == GST_FLOW_EOS)
2305         sq->srcresult = GST_FLOW_OK;
2306       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2307       break;
2308     case GST_EVENT_GAP:
2309       sq->active = TRUE;
2310       apply_gap (mq, sq, sref, &sq->sink_segment);
2311       gst_event_unref (sref);
2312     default:
2313       break;
2314   }
2315
2316 done:
2317   if (res == FALSE)
2318     flowret = GST_FLOW_ERROR;
2319   GST_DEBUG_OBJECT (mq, "SingleQueue %d : returning %s", sq->id,
2320       gst_flow_get_name (flowret));
2321   return flowret;
2322
2323 flushing:
2324   {
2325     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
2326         sq->id, gst_flow_get_name (sq->srcresult));
2327     if (sref)
2328       gst_event_unref (sref);
2329     gst_multi_queue_item_destroy (item);
2330     return sq->srcresult;
2331   }
2332 was_eos:
2333   {
2334     GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return GST_FLOW_EOS");
2335     gst_event_unref (event);
2336     return GST_FLOW_EOS;
2337   }
2338 }
2339
2340 static gboolean
2341 gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
2342 {
2343   gboolean res;
2344   GstSingleQueue *sq;
2345   GstMultiQueue *mq;
2346
2347   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2348   mq = (GstMultiQueue *) parent;
2349
2350   switch (GST_QUERY_TYPE (query)) {
2351     default:
2352       if (GST_QUERY_IS_SERIALIZED (query)) {
2353         guint32 curid;
2354         GstMultiQueueItem *item;
2355
2356         GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2357         if (sq->srcresult != GST_FLOW_OK)
2358           goto out_flushing;
2359
2360         /* serialized events go in the queue. We need to be certain that we
2361          * don't cause deadlocks waiting for the query return value. We check if
2362          * the queue is empty (nothing is blocking downstream and the query can
2363          * be pushed for sure) or we are not buffering. If we are buffering,
2364          * the pipeline waits to unblock downstream until our queue fills up
2365          * completely, which can not happen if we block on the query..
2366          * Therefore we only potentially block when we are not buffering. */
2367         if (!mq->use_buffering || gst_data_queue_is_empty (sq->queue)) {
2368           /* Get an unique incrementing id. */
2369           curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2370
2371           item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
2372
2373           GST_DEBUG_OBJECT (mq,
2374               "SingleQueue %d : Enqueuing query %p of type %s with id %d",
2375               sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
2376           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2377           res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
2378           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2379           if (!res || sq->flushing)
2380             goto out_flushing;
2381           /* it might be that the query has been taken out of the queue
2382            * while we were unlocked. So, we need to check if the last
2383            * handled query is the same one than the one we just
2384            * pushed. If it is, we don't need to wait for the condition
2385            * variable, otherwise we wait for the condition variable to
2386            * be signaled. */
2387           while (!sq->flushing && sq->srcresult == GST_FLOW_OK
2388               && sq->last_handled_query != query)
2389             g_cond_wait (&sq->query_handled, &mq->qlock);
2390           res = sq->last_query;
2391           sq->last_handled_query = NULL;
2392         } else {
2393           GST_DEBUG_OBJECT (mq, "refusing query, we are buffering and the "
2394               "queue is not empty");
2395           res = FALSE;
2396         }
2397         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2398       } else {
2399         /* default handling */
2400         res = gst_pad_query_default (pad, parent, query);
2401       }
2402       break;
2403   }
2404   return res;
2405
2406 out_flushing:
2407   {
2408     GST_DEBUG_OBJECT (mq, "Flushing");
2409     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2410     return FALSE;
2411   }
2412 }
2413
2414 static gboolean
2415 gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
2416     GstPadMode mode, gboolean active)
2417 {
2418   GstMultiQueue *mq;
2419   GstSingleQueue *sq;
2420   gboolean result;
2421
2422   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2423   mq = sq->mqueue;
2424
2425   GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
2426
2427   switch (mode) {
2428     case GST_PAD_MODE_PUSH:
2429       if (active) {
2430         result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
2431       } else {
2432         result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
2433         /* make sure streaming finishes */
2434         result |= gst_pad_stop_task (pad);
2435       }
2436       break;
2437     default:
2438       result = FALSE;
2439       break;
2440   }
2441   return result;
2442 }
2443
2444 static gboolean
2445 gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2446 {
2447   GstSingleQueue *sq = gst_pad_get_element_private (pad);
2448   GstMultiQueue *mq = sq->mqueue;
2449   gboolean ret;
2450
2451   switch (GST_EVENT_TYPE (event)) {
2452     case GST_EVENT_RECONFIGURE:
2453       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2454       if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2455         sq->srcresult = GST_FLOW_OK;
2456         g_cond_signal (&sq->turn);
2457       }
2458       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2459
2460       ret = gst_pad_push_event (sq->sinkpad, event);
2461       break;
2462     default:
2463       ret = gst_pad_push_event (sq->sinkpad, event);
2464       break;
2465   }
2466
2467   return ret;
2468 }
2469
2470 static gboolean
2471 gst_multi_queue_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2472 {
2473   gboolean res;
2474
2475   /* FIXME, Handle position offset depending on queue size */
2476   switch (GST_QUERY_TYPE (query)) {
2477     default:
2478       /* default handling */
2479       res = gst_pad_query_default (pad, parent, query);
2480       break;
2481   }
2482   return res;
2483 }
2484
2485 /*
2486  * Next-non-linked functions
2487  */
2488
2489 /* WITH LOCK TAKEN */
2490 static void
2491 wake_up_next_non_linked (GstMultiQueue * mq)
2492 {
2493   GList *tmp;
2494
2495   /* maybe no-one is waiting */
2496   if (mq->numwaiting < 1)
2497     return;
2498
2499   if (mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (mq->high_time)) {
2500     /* Else figure out which singlequeue(s) need waking up */
2501     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2502       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2503       if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2504         GstClockTimeDiff high_time;
2505
2506         if (GST_CLOCK_STIME_IS_VALID (sq->group_high_time))
2507           high_time = sq->group_high_time;
2508         else
2509           high_time = mq->high_time;
2510
2511         if (GST_CLOCK_STIME_IS_VALID (sq->next_time) &&
2512             GST_CLOCK_STIME_IS_VALID (high_time)
2513             && sq->next_time <= high_time) {
2514           GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2515           g_cond_signal (&sq->turn);
2516         }
2517       }
2518     }
2519   } else {
2520     /* Else figure out which singlequeue(s) need waking up */
2521     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2522       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2523       if (sq->srcresult == GST_FLOW_NOT_LINKED &&
2524           sq->nextid != 0 && sq->nextid <= mq->highid) {
2525         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2526         g_cond_signal (&sq->turn);
2527       }
2528     }
2529   }
2530 }
2531
2532 /* WITH LOCK TAKEN */
2533 static void
2534 compute_high_id (GstMultiQueue * mq)
2535 {
2536   /* The high-id is either the highest id among the linked pads, or if all
2537    * pads are not-linked, it's the lowest not-linked pad */
2538   GList *tmp;
2539   guint32 lowest = G_MAXUINT32;
2540   guint32 highid = G_MAXUINT32;
2541
2542   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2543     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2544
2545     GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
2546         sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
2547
2548     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2549       /* No need to consider queues which are not waiting */
2550       if (sq->nextid == 0) {
2551         GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
2552         continue;
2553       }
2554
2555       if (sq->nextid < lowest)
2556         lowest = sq->nextid;
2557     } else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) {
2558       /* If we don't have a global highid, or the global highid is lower than
2559        * this single queue's last outputted id, store the queue's one,
2560        * unless the singlequeue output is at EOS */
2561       if ((highid == G_MAXUINT32) || (sq->oldid > highid))
2562         highid = sq->oldid;
2563     }
2564   }
2565
2566   if (highid == G_MAXUINT32 || lowest < highid)
2567     mq->highid = lowest;
2568   else
2569     mq->highid = highid;
2570
2571   GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
2572       lowest);
2573 }
2574
2575 /* WITH LOCK TAKEN */
2576 static void
2577 compute_high_time (GstMultiQueue * mq, guint groupid)
2578 {
2579   /* The high-time is either the highest last time among the linked
2580    * pads, or if all pads are not-linked, it's the lowest nex time of
2581    * not-linked pad */
2582   GList *tmp;
2583   GstClockTimeDiff highest = GST_CLOCK_STIME_NONE;
2584   GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE;
2585   GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE;
2586   GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE;
2587   GstClockTimeDiff res;
2588   /* Number of streams which belong to groupid */
2589   guint group_count = 0;
2590
2591   if (!mq->sync_by_running_time)
2592     /* return GST_CLOCK_STIME_NONE; */
2593     return;
2594
2595   for (tmp = mq->queues; tmp; tmp = tmp->next) {
2596     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2597
2598     GST_LOG_OBJECT (mq,
2599         "inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT
2600         ", last_time:%" GST_STIME_FORMAT ", srcresult:%s", sq->id, sq->groupid,
2601         GST_STIME_ARGS (sq->next_time), GST_STIME_ARGS (sq->last_time),
2602         gst_flow_get_name (sq->srcresult));
2603
2604     if (sq->groupid == groupid)
2605       group_count++;
2606
2607     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2608       /* No need to consider queues which are not waiting */
2609       if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) {
2610         GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
2611         continue;
2612       }
2613
2614       if (lowest == GST_CLOCK_STIME_NONE || sq->next_time < lowest)
2615         lowest = sq->next_time;
2616       if (sq->groupid == groupid && (group_low == GST_CLOCK_STIME_NONE
2617               || sq->next_time < group_low))
2618         group_low = sq->next_time;
2619     } else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) {
2620       /* If we don't have a global high time, or the global high time
2621        * is lower than this single queue's last outputted time, store
2622        * the queue's one, unless the singlequeue output is at EOS. */
2623       if (highest == GST_CLOCK_STIME_NONE
2624           || (sq->last_time != GST_CLOCK_STIME_NONE && sq->last_time > highest))
2625         highest = sq->last_time;
2626       if (sq->groupid == groupid && (group_high == GST_CLOCK_STIME_NONE
2627               || (sq->last_time != GST_CLOCK_STIME_NONE
2628                   && sq->last_time > group_high)))
2629         group_high = sq->last_time;
2630     }
2631     GST_LOG_OBJECT (mq,
2632         "highest now %" GST_STIME_FORMAT " lowest %" GST_STIME_FORMAT,
2633         GST_STIME_ARGS (highest), GST_STIME_ARGS (lowest));
2634     if (sq->groupid == groupid)
2635       GST_LOG_OBJECT (mq,
2636           "grouphigh %" GST_STIME_FORMAT " grouplow %" GST_STIME_FORMAT,
2637           GST_STIME_ARGS (group_high), GST_STIME_ARGS (group_low));
2638   }
2639
2640   if (highest == GST_CLOCK_STIME_NONE)
2641     mq->high_time = lowest;
2642   else
2643     mq->high_time = highest;
2644
2645   /* If there's only one stream of a given type, use the global high */
2646   if (group_count < 2)
2647     res = GST_CLOCK_STIME_NONE;
2648   else if (group_high == GST_CLOCK_STIME_NONE)
2649     res = group_low;
2650   else
2651     res = group_high;
2652
2653   GST_LOG_OBJECT (mq, "group count %d for groupid %u", group_count, groupid);
2654   GST_LOG_OBJECT (mq,
2655       "MQ High time is now : %" GST_STIME_FORMAT ", group %d high time %"
2656       GST_STIME_FORMAT ", lowest non-linked %" GST_STIME_FORMAT,
2657       GST_STIME_ARGS (mq->high_time), groupid, GST_STIME_ARGS (mq->high_time),
2658       GST_STIME_ARGS (lowest));
2659
2660   for (tmp = mq->queues; tmp; tmp = tmp->next) {
2661     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2662     if (groupid == sq->groupid)
2663       sq->group_high_time = res;
2664   }
2665 }
2666
2667 #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
2668      ((q)->max_size.format) <= (value))
2669
2670 /*
2671  * GstSingleQueue functions
2672  */
2673 static void
2674 single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
2675 {
2676   GstMultiQueue *mq = sq->mqueue;
2677   GList *tmp;
2678   GstDataQueueSize size;
2679   gboolean filled = TRUE;
2680   gboolean empty_found = FALSE;
2681
2682   gst_data_queue_get_level (sq->queue, &size);
2683
2684   GST_LOG_OBJECT (mq,
2685       "Single Queue %d: EOS %d, visible %u/%u, bytes %u/%u, time %"
2686       G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, sq->is_eos, size.visible,
2687       sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time,
2688       sq->max_size.time);
2689
2690   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2691
2692   /* check if we reached the hard time/bytes limits;
2693      time limit is only taken into account for non-sparse streams */
2694   if (sq->is_eos || IS_FILLED (sq, bytes, size.bytes) ||
2695       (!sq->is_sparse && IS_FILLED (sq, time, sq->cur_time))) {
2696     goto done;
2697   }
2698
2699   /* Search for empty queues */
2700   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2701     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
2702
2703     if (oq == sq)
2704       continue;
2705
2706     if (oq->srcresult == GST_FLOW_NOT_LINKED) {
2707       GST_LOG_OBJECT (mq, "Queue %d is not-linked", oq->id);
2708       continue;
2709     }
2710
2711     GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
2712     if (gst_data_queue_is_empty (oq->queue) && !oq->is_sparse) {
2713       GST_LOG_OBJECT (mq, "Queue %d is empty", oq->id);
2714       empty_found = TRUE;
2715       break;
2716     }
2717   }
2718
2719   /* if hard limits are not reached then we allow one more buffer in the full
2720    * queue, but only if any of the other singelqueues are empty */
2721   if (empty_found) {
2722     if (IS_FILLED (sq, visible, size.visible)) {
2723       sq->max_size.visible = size.visible + 1;
2724       GST_DEBUG_OBJECT (mq,
2725           "Bumping single queue %d max visible to %d",
2726           sq->id, sq->max_size.visible);
2727       filled = FALSE;
2728     }
2729   }
2730
2731 done:
2732   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2733
2734   /* Overrun is always forwarded, since this is blocking the upstream element */
2735   if (filled) {
2736     GST_DEBUG_OBJECT (mq, "Queue %d is filled, signalling overrun", sq->id);
2737     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
2738   }
2739 }
2740
2741 static void
2742 single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
2743 {
2744   gboolean empty = TRUE;
2745   GstMultiQueue *mq = sq->mqueue;
2746   GList *tmp;
2747
2748   if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2749     GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id);
2750     return;
2751   } else {
2752     GST_LOG_OBJECT (mq,
2753         "Single Queue %d is empty, Checking other single queues", sq->id);
2754   }
2755
2756   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2757   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2758     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
2759
2760     if (gst_data_queue_is_full (oq->queue)) {
2761       GstDataQueueSize size;
2762
2763       gst_data_queue_get_level (oq->queue, &size);
2764       if (IS_FILLED (oq, visible, size.visible)) {
2765         oq->max_size.visible = size.visible + 1;
2766         GST_DEBUG_OBJECT (mq,
2767             "queue %d is filled, bumping its max visible to %d", oq->id,
2768             oq->max_size.visible);
2769         gst_data_queue_limits_changed (oq->queue);
2770       }
2771     }
2772     if (!gst_data_queue_is_empty (oq->queue) || oq->is_sparse)
2773       empty = FALSE;
2774   }
2775   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2776
2777   if (empty) {
2778     GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
2779     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
2780   }
2781 }
2782
2783 static gboolean
2784 single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
2785     guint64 time, GstSingleQueue * sq)
2786 {
2787   gboolean res;
2788   GstMultiQueue *mq = sq->mqueue;
2789
2790   GST_DEBUG_OBJECT (mq,
2791       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
2792       G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
2793       sq->max_size.bytes, sq->cur_time, sq->max_size.time);
2794
2795   /* we are always filled on EOS */
2796   if (sq->is_eos)
2797     return TRUE;
2798
2799   /* we never go past the max visible items unless we are in buffering mode */
2800   if (!mq->use_buffering && IS_FILLED (sq, visible, visible))
2801     return TRUE;
2802
2803   /* check time or bytes */
2804   res = IS_FILLED (sq, bytes, bytes);
2805   /* We only care about limits in time if we're not a sparse stream or
2806    * we're not syncing by running time */
2807   if (!sq->is_sparse || !mq->sync_by_running_time) {
2808     /* If unlinked, take into account the extra unlinked cache time */
2809     if (mq->sync_by_running_time && sq->srcresult == GST_FLOW_NOT_LINKED) {
2810       if (sq->cur_time > mq->unlinked_cache_time)
2811         res |= IS_FILLED (sq, time, sq->cur_time - mq->unlinked_cache_time);
2812       else
2813         res = FALSE;
2814     } else
2815       res |= IS_FILLED (sq, time, sq->cur_time);
2816   }
2817
2818   return res;
2819 }
2820
2821 static void
2822 gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
2823 {
2824   GstDataQueueItem *sitem;
2825   GstMultiQueueItem *mitem;
2826   gboolean was_flushing = FALSE;
2827
2828   while (!gst_data_queue_is_empty (sq->queue)) {
2829     GstMiniObject *data;
2830
2831     /* FIXME: If this fails here although the queue is not empty,
2832      * we're flushing... but we want to rescue all sticky
2833      * events nonetheless.
2834      */
2835     if (!gst_data_queue_pop (sq->queue, &sitem)) {
2836       was_flushing = TRUE;
2837       gst_data_queue_set_flushing (sq->queue, FALSE);
2838       continue;
2839     }
2840
2841     mitem = (GstMultiQueueItem *) sitem;
2842
2843     data = sitem->object;
2844
2845     if (!full && !mitem->is_query && GST_IS_EVENT (data)
2846         && GST_EVENT_IS_STICKY (data)
2847         && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
2848         && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
2849       gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data));
2850     }
2851
2852     sitem->destroy (sitem);
2853   }
2854
2855   gst_data_queue_flush (sq->queue);
2856   if (was_flushing)
2857     gst_data_queue_set_flushing (sq->queue, TRUE);
2858
2859   GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue);
2860   update_buffering (sq->mqueue, sq);
2861   GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue);
2862   gst_multi_queue_post_buffering (sq->mqueue);
2863 }
2864
2865 static void
2866 gst_single_queue_free (GstSingleQueue * sq)
2867 {
2868   /* DRAIN QUEUE */
2869   gst_data_queue_flush (sq->queue);
2870   g_object_unref (sq->queue);
2871   g_cond_clear (&sq->turn);
2872   g_cond_clear (&sq->query_handled);
2873   g_free (sq);
2874 }
2875
2876 static GstSingleQueue *
2877 gst_single_queue_new (GstMultiQueue * mqueue, guint id)
2878 {
2879   GstSingleQueue *sq;
2880   GstMultiQueuePad *mqpad;
2881   GstPadTemplate *templ;
2882   gchar *name;
2883   GList *tmp;
2884   guint temp_id = (id == -1) ? 0 : id;
2885
2886   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
2887
2888   /* Find an unused queue ID, if possible the passed one */
2889   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
2890     GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
2891     /* This works because the IDs are sorted in ascending order */
2892     if (sq2->id == temp_id) {
2893       /* If this ID was requested by the caller return NULL,
2894        * otherwise just get us the next one */
2895       if (id == -1) {
2896         temp_id = sq2->id + 1;
2897       } else {
2898         GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
2899         return NULL;
2900       }
2901     } else if (sq2->id > temp_id) {
2902       break;
2903     }
2904   }
2905
2906   sq = g_new0 (GstSingleQueue, 1);
2907   mqueue->nbqueues++;
2908   sq->id = temp_id;
2909   sq->groupid = DEFAULT_PAD_GROUP_ID;
2910   sq->group_high_time = GST_CLOCK_STIME_NONE;
2911
2912   mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
2913   mqueue->queues_cookie++;
2914
2915   /* copy over max_size and extra_size so we don't need to take the lock
2916    * any longer when checking if the queue is full. */
2917   sq->max_size.visible = mqueue->max_size.visible;
2918   sq->max_size.bytes = mqueue->max_size.bytes;
2919   sq->max_size.time = mqueue->max_size.time;
2920
2921   sq->extra_size.visible = mqueue->extra_size.visible;
2922   sq->extra_size.bytes = mqueue->extra_size.bytes;
2923   sq->extra_size.time = mqueue->extra_size.time;
2924
2925   GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
2926
2927   sq->mqueue = mqueue;
2928   sq->srcresult = GST_FLOW_FLUSHING;
2929   sq->pushed = FALSE;
2930   sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
2931       single_queue_check_full,
2932       (GstDataQueueFullCallback) single_queue_overrun_cb,
2933       (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
2934   sq->is_eos = FALSE;
2935   sq->is_sparse = FALSE;
2936   sq->flushing = FALSE;
2937   sq->active = FALSE;
2938   gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
2939   gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
2940
2941   sq->nextid = 0;
2942   sq->oldid = 0;
2943   sq->next_time = GST_CLOCK_STIME_NONE;
2944   sq->last_time = GST_CLOCK_STIME_NONE;
2945   g_cond_init (&sq->turn);
2946   g_cond_init (&sq->query_handled);
2947
2948   sq->sinktime = GST_CLOCK_STIME_NONE;
2949   sq->srctime = GST_CLOCK_STIME_NONE;
2950   sq->sink_tainted = TRUE;
2951   sq->src_tainted = TRUE;
2952
2953   name = g_strdup_printf ("sink_%u", sq->id);
2954   templ = gst_static_pad_template_get (&sinktemplate);
2955   sq->sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
2956       "direction", templ->direction, "template", templ, NULL);
2957   gst_object_unref (templ);
2958   g_free (name);
2959
2960   mqpad = (GstMultiQueuePad *) sq->sinkpad;
2961   mqpad->sq = sq;
2962
2963   gst_pad_set_chain_function (sq->sinkpad,
2964       GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
2965   gst_pad_set_activatemode_function (sq->sinkpad,
2966       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_mode));
2967   gst_pad_set_event_full_function (sq->sinkpad,
2968       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
2969   gst_pad_set_query_function (sq->sinkpad,
2970       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_query));
2971   gst_pad_set_iterate_internal_links_function (sq->sinkpad,
2972       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
2973   GST_OBJECT_FLAG_SET (sq->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
2974
2975   name = g_strdup_printf ("src_%u", sq->id);
2976   sq->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
2977   g_free (name);
2978
2979   gst_pad_set_activatemode_function (sq->srcpad,
2980       GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode));
2981   gst_pad_set_event_function (sq->srcpad,
2982       GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
2983   gst_pad_set_query_function (sq->srcpad,
2984       GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
2985   gst_pad_set_iterate_internal_links_function (sq->srcpad,
2986       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
2987   GST_OBJECT_FLAG_SET (sq->srcpad, GST_PAD_FLAG_PROXY_CAPS);
2988
2989   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
2990   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
2991
2992   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
2993
2994   /* only activate the pads when we are not in the NULL state
2995    * and add the pad under the state_lock to prevend state changes
2996    * between activating and adding */
2997   g_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
2998   if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
2999     gst_pad_set_active (sq->srcpad, TRUE);
3000     gst_pad_set_active (sq->sinkpad, TRUE);
3001   }
3002   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
3003   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
3004   g_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
3005
3006   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
3007       sq->id);
3008
3009   return sq;
3010 }