multiqueue: Fix high_time wakeup logic
[platform/upstream/gstreamer.git] / plugins / elements / gstmultiqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
4  * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * gstmultiqueue.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 /**
26  * SECTION:element-multiqueue
27  * @see_also: #GstQueue
28  *
29  * <refsect2>
30  * <para>
31  * Multiqueue is similar to a normal #GstQueue with the following additional
32  * features:
33  * <orderedlist>
34  * <listitem>
35  *   <itemizedlist><title>Multiple streamhandling</title>
36  *   <listitem><para>
37  *     The element handles queueing data on more than one stream at once. To
38  *     achieve such a feature it has request sink pads (sink&percnt;u) and
39  *     'sometimes' src pads (src&percnt;u).
40  *   </para><para>
41  *     When requesting a given sinkpad with gst_element_request_pad(),
42  *     the associated srcpad for that stream will be created.
43  *     Example: requesting sink1 will generate src1.
44  *   </para></listitem>
45  *   </itemizedlist>
46  * </listitem>
47  * <listitem>
48  *   <itemizedlist><title>Non-starvation on multiple streams</title>
49  *   <listitem><para>
50  *     If more than one stream is used with the element, the streams' queues
51  *     will be dynamically grown (up to a limit), in order to ensure that no
52  *     stream is risking data starvation. This guarantees that at any given
53  *     time there are at least N bytes queued and available for each individual
54  *     stream.
55  *   </para><para>
56  *     If an EOS event comes through a srcpad, the associated queue will be
57  *     considered as 'not-empty' in the queue-size-growing algorithm.
58  *   </para></listitem>
59  *   </itemizedlist>
60  * </listitem>
61  * <listitem>
62  *   <itemizedlist><title>Non-linked srcpads graceful handling</title>
63  *   <listitem><para>
64  *     In order to better support dynamic switching between streams, the multiqueue
65  *     (unlike the current GStreamer queue) continues to push buffers on non-linked
66  *     pads rather than shutting down.
67  *   </para><para>
68  *     In addition, to prevent a non-linked stream from very quickly consuming all
69  *     available buffers and thus 'racing ahead' of the other streams, the element
70  *     must ensure that buffers and inlined events for a non-linked stream are pushed
71  *     in the same order as they were received, relative to the other streams
72  *     controlled by the element. This means that a buffer cannot be pushed to a
73  *     non-linked pad any sooner than buffers in any other stream which were received
74  *     before it.
75  *   </para></listitem>
76  *   </itemizedlist>
77  * </listitem>
78  * </orderedlist>
79  * </para>
80  * <para>
81  *   Data is queued until one of the limits specified by the
82  *   #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
83  *   #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
84  *   more buffers into the queue will block the pushing thread until more space
85  *   becomes available. #GstMultiQueue:extra-size-buffers,
86  * </para>
87  * <para>
88  *   #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
89  *   currently unused.
90  * </para>
91  * <para>
92  *   The default queue size limits are 5 buffers, 10MB of data, or
93  *   two second worth of data, whichever is reached first. Note that the number
94  *   of buffers will dynamically grow depending on the fill level of 
95  *   other queues.
96  * </para>
97  * <para>
98  *   The #GstMultiQueue::underrun signal is emitted when all of the queues
99  *   are empty. The #GstMultiQueue::overrun signal is emitted when one of the
100  *   queues is filled.
101  *   Both signals are emitted from the context of the streaming thread.
102  * </para>
103  * <para>
104  *   When using #GstMultiQueue:sync-by-running-time the unlinked streams will
105  *   be throttled by the highest running-time of linked streams. This allows
106  *   further relinking of those unlinked streams without them being in the
107  *   future (i.e. to achieve gapless playback).
108  *   When dealing with streams which have got different consumption requirements
109  *   downstream (ex: video decoders which will consume more buffer (in time) than
110  *   audio decoders), it is recommended to group streams of the same type
111  *   by using the pad "group-id" property. This will further throttle streams
112  *   in time within that group.
113  * </para>
114  * </refsect2>
115  */
116
117 #ifdef HAVE_CONFIG_H
118 #  include "config.h"
119 #endif
120
121 #include <gst/gst.h>
122 #include <stdio.h>
123 #include "gstmultiqueue.h"
124 #include <gst/glib-compat-private.h>
125
126 /**
127  * GstSingleQueue:
128  * @sinkpad: associated sink #GstPad
129  * @srcpad: associated source #GstPad
130  *
131  * Structure containing all information and properties about
132  * a single queue.
133  */
134 typedef struct _GstSingleQueue GstSingleQueue;
135
136 struct _GstSingleQueue
137 {
138   /* unique identifier of the queue */
139   guint id;
140   /* group of streams to which this queue belongs to */
141   guint groupid;
142   GstClockTimeDiff group_high_time;
143
144   GstMultiQueue *mqueue;
145
146   GstPad *sinkpad;
147   GstPad *srcpad;
148
149   /* flowreturn of previous srcpad push */
150   GstFlowReturn srcresult;
151   /* If something was actually pushed on
152    * this pad after flushing/pad activation
153    * and the srcresult corresponds to something
154    * real
155    */
156   gboolean pushed;
157
158   /* segments */
159   GstSegment sink_segment;
160   GstSegment src_segment;
161   gboolean has_src_segment;     /* preferred over initializing the src_segment to
162                                  * UNDEFINED as this doesn't requires adding ifs
163                                  * in every segment usage */
164
165   /* position of src/sink */
166   GstClockTimeDiff sinktime, srctime;
167   /* cached input value, used for interleave */
168   GstClockTimeDiff cached_sinktime;
169   /* TRUE if either position needs to be recalculated */
170   gboolean sink_tainted, src_tainted;
171
172   /* queue of data */
173   GstDataQueue *queue;
174   GstDataQueueSize max_size, extra_size;
175   GstClockTime cur_time;
176   gboolean is_eos;
177   gboolean is_sparse;
178   gboolean flushing;
179   gboolean active;
180
181   /* Protected by global lock */
182   guint32 nextid;               /* ID of the next object waiting to be pushed */
183   guint32 oldid;                /* ID of the last object pushed (last in a series) */
184   guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
185   GstClockTimeDiff next_time;   /* End running time of next buffer to be pushed */
186   GstClockTimeDiff last_time;   /* Start running time of last pushed buffer */
187   GCond turn;                   /* SingleQueue turn waiting conditional */
188
189   /* for serialized queries */
190   GCond query_handled;
191   gboolean last_query;
192   GstQuery *last_handled_query;
193
194   /* For interleave calculation */
195   GThread *thread;
196 };
197
198
199 /* Extension of GstDataQueueItem structure for our usage */
200 typedef struct _GstMultiQueueItem GstMultiQueueItem;
201
202 struct _GstMultiQueueItem
203 {
204   GstMiniObject *object;
205   guint size;
206   guint64 duration;
207   gboolean visible;
208
209   GDestroyNotify destroy;
210   guint32 posid;
211
212   gboolean is_query;
213 };
214
215 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
216 static void gst_single_queue_free (GstSingleQueue * squeue);
217
218 static void wake_up_next_non_linked (GstMultiQueue * mq);
219 static void compute_high_id (GstMultiQueue * mq);
220 static void compute_high_time (GstMultiQueue * mq, guint groupid);
221 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
222 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
223
224 static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq);
225 static void gst_multi_queue_post_buffering (GstMultiQueue * mq);
226 static void recheck_buffering_status (GstMultiQueue * mq);
227
228 static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
229
230 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
231     GST_PAD_SINK,
232     GST_PAD_REQUEST,
233     GST_STATIC_CAPS_ANY);
234
235 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
236     GST_PAD_SRC,
237     GST_PAD_SOMETIMES,
238     GST_STATIC_CAPS_ANY);
239
240 GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
241 #define GST_CAT_DEFAULT (multi_queue_debug)
242
243 /* Signals and args */
244 enum
245 {
246   SIGNAL_UNDERRUN,
247   SIGNAL_OVERRUN,
248   LAST_SIGNAL
249 };
250
251 /* default limits, we try to keep up to 2 seconds of data and if there is not
252  * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
253  * there is data in the queues. Normally, the byte and time limits are not hit
254  * in theses conditions. */
255 #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
256 #define DEFAULT_MAX_SIZE_BUFFERS 5
257 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
258
259 /* second limits. When we hit one of the above limits we are probably dealing
260  * with a badly muxed file and we scale the limits to these emergency values.
261  * This is currently not yet implemented.
262  * Since we dynamically scale the queue buffer size up to the limits but avoid
263  * going above the max-size-buffers when we can, we don't really need this
264  * aditional extra size. */
265 #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
266 #define DEFAULT_EXTRA_SIZE_BUFFERS 5
267 #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
268
269 #define DEFAULT_USE_BUFFERING FALSE
270 #define DEFAULT_LOW_PERCENT   10
271 #define DEFAULT_HIGH_PERCENT  99
272 #define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
273 #define DEFAULT_USE_INTERLEAVE FALSE
274 #define DEFAULT_UNLINKED_CACHE_TIME 250 * GST_MSECOND
275
276 enum
277 {
278   PROP_0,
279   PROP_EXTRA_SIZE_BYTES,
280   PROP_EXTRA_SIZE_BUFFERS,
281   PROP_EXTRA_SIZE_TIME,
282   PROP_MAX_SIZE_BYTES,
283   PROP_MAX_SIZE_BUFFERS,
284   PROP_MAX_SIZE_TIME,
285   PROP_USE_BUFFERING,
286   PROP_LOW_PERCENT,
287   PROP_HIGH_PERCENT,
288   PROP_SYNC_BY_RUNNING_TIME,
289   PROP_USE_INTERLEAVE,
290   PROP_UNLINKED_CACHE_TIME,
291   PROP_LAST
292 };
293
294 /* GstMultiQueuePad */
295
296 #define DEFAULT_PAD_GROUP_ID 0
297
298 enum
299 {
300   PROP_PAD_0,
301   PROP_PAD_GROUP_ID,
302 };
303
304 #define GST_TYPE_MULTIQUEUE_PAD            (gst_multiqueue_pad_get_type())
305 #define GST_MULTIQUEUE_PAD(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePad))
306 #define GST_IS_MULTIQUEUE_PAD(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTIQUEUE_PAD))
307 #define GST_MULTIQUEUE_PAD_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
308 #define GST_IS_MULTIQUEUE_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_MULTIQUEUE_PAD))
309 #define GST_MULTIQUEUE_PAD_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_MULTIQUEUE_PAD,GstMultiQueuePadClass))
310
311 struct _GstMultiQueuePad
312 {
313   GstPad parent;
314
315   GstSingleQueue *sq;
316 };
317
318 struct _GstMultiQueuePadClass
319 {
320   GstPadClass parent_class;
321 };
322
323 GType gst_multiqueue_pad_get_type (void);
324
325 G_DEFINE_TYPE (GstMultiQueuePad, gst_multiqueue_pad, GST_TYPE_PAD);
326 static void
327 gst_multiqueue_pad_get_property (GObject * object, guint prop_id,
328     GValue * value, GParamSpec * pspec)
329 {
330   GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
331
332   switch (prop_id) {
333     case PROP_PAD_GROUP_ID:
334       if (pad->sq)
335         g_value_set_uint (value, pad->sq->groupid);
336       break;
337     default:
338       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
339       break;
340   }
341 }
342
343 static void
344 gst_multiqueue_pad_set_property (GObject * object, guint prop_id,
345     const GValue * value, GParamSpec * pspec)
346 {
347   GstMultiQueuePad *pad = GST_MULTIQUEUE_PAD (object);
348
349   switch (prop_id) {
350     case PROP_PAD_GROUP_ID:
351       GST_OBJECT_LOCK (pad);
352       if (pad->sq)
353         pad->sq->groupid = g_value_get_uint (value);
354       GST_OBJECT_UNLOCK (pad);
355       break;
356     default:
357       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
358       break;
359   }
360 }
361
362 static void
363 gst_multiqueue_pad_class_init (GstMultiQueuePadClass * klass)
364 {
365   GObjectClass *gobject_class = (GObjectClass *) klass;
366
367   gobject_class->set_property = gst_multiqueue_pad_set_property;
368   gobject_class->get_property = gst_multiqueue_pad_get_property;
369
370   /**
371    * GstMultiQueuePad:group-id:
372    *
373    * Group to which this pad belongs.
374    *
375    * Since: 1.10
376    */
377   g_object_class_install_property (gobject_class, PROP_PAD_GROUP_ID,
378       g_param_spec_uint ("group-id", "Group ID",
379           "Group to which this pad belongs", 0, G_MAXUINT32,
380           DEFAULT_PAD_GROUP_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381 }
382
383 static void
384 gst_multiqueue_pad_init (GstMultiQueuePad * pad)
385 {
386
387 }
388
389
390 #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
391   g_mutex_lock (&q->qlock);                                              \
392 } G_STMT_END
393
394 #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
395   g_mutex_unlock (&q->qlock);                                            \
396 } G_STMT_END
397
398 #define SET_PERCENT(mq, perc) G_STMT_START {                             \
399   if (perc != mq->percent) {                                             \
400     mq->percent = perc;                                                  \
401     mq->percent_changed = TRUE;                                          \
402     GST_DEBUG_OBJECT (mq, "buffering %d percent", perc);                 \
403   }                                                                      \
404 } G_STMT_END
405
406 /* Convenience function */
407 static inline GstClockTimeDiff
408 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
409 {
410   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
411
412   if (GST_CLOCK_TIME_IS_VALID (val)) {
413     gboolean sign =
414         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
415     if (sign > 0)
416       res = val;
417     else if (sign < 0)
418       res = -val;
419   }
420   return res;
421 }
422
423 static void gst_multi_queue_finalize (GObject * object);
424 static void gst_multi_queue_set_property (GObject * object,
425     guint prop_id, const GValue * value, GParamSpec * pspec);
426 static void gst_multi_queue_get_property (GObject * object,
427     guint prop_id, GValue * value, GParamSpec * pspec);
428
429 static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
430     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
431 static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
432 static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
433     element, GstStateChange transition);
434
435 static void gst_multi_queue_loop (GstPad * pad);
436
437 #define _do_init \
438   GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
439 #define gst_multi_queue_parent_class parent_class
440 G_DEFINE_TYPE_WITH_CODE (GstMultiQueue, gst_multi_queue, GST_TYPE_ELEMENT,
441     _do_init);
442
443 static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
444
445 static void
446 gst_multi_queue_class_init (GstMultiQueueClass * klass)
447 {
448   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
449   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
450
451   gobject_class->set_property = gst_multi_queue_set_property;
452   gobject_class->get_property = gst_multi_queue_get_property;
453
454   /* SIGNALS */
455
456   /**
457    * GstMultiQueue::underrun:
458    * @multiqueue: the multiqueue instance
459    *
460    * This signal is emitted from the streaming thread when there is
461    * no data in any of the queues inside the multiqueue instance (underrun).
462    *
463    * This indicates either starvation or EOS from the upstream data sources.
464    */
465   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
466       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
467       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
468       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
469
470   /**
471    * GstMultiQueue::overrun:
472    * @multiqueue: the multiqueue instance
473    *
474    * Reports that one of the queues in the multiqueue is full (overrun).
475    * A queue is full if the total amount of data inside it (num-buffers, time,
476    * size) is higher than the boundary values which can be set through the
477    * GObject properties.
478    *
479    * This can be used as an indicator of pre-roll. 
480    */
481   gst_multi_queue_signals[SIGNAL_OVERRUN] =
482       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
483       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
484       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
485
486   /* PROPERTIES */
487
488   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
489       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
490           "Max. amount of data in the queue (bytes, 0=disable)",
491           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
492           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
493           G_PARAM_STATIC_STRINGS));
494   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
495       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
496           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
497           DEFAULT_MAX_SIZE_BUFFERS,
498           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
499           G_PARAM_STATIC_STRINGS));
500   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
501       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
502           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
503           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
504           G_PARAM_STATIC_STRINGS));
505
506   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
507       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
508           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
509           " (NOT IMPLEMENTED)",
510           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
511           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
512   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
513       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
514           "Amount of buffers the queues can grow if one of them is empty (0=disable)"
515           " (NOT IMPLEMENTED)",
516           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
517           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
518   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
519       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
520           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
521           " (NOT IMPLEMENTED)",
522           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
523           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
524
525   /**
526    * GstMultiQueue:use-buffering
527    * 
528    * Enable the buffering option in multiqueue so that BUFFERING messages are
529    * emitted based on low-/high-percent thresholds.
530    */
531   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
532       g_param_spec_boolean ("use-buffering", "Use buffering",
533           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
534           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
535           G_PARAM_STATIC_STRINGS));
536   /**
537    * GstMultiQueue:low-percent
538    * 
539    * Low threshold percent for buffering to start.
540    */
541   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
542       g_param_spec_int ("low-percent", "Low percent",
543           "Low threshold for buffering to start", 0, 100,
544           DEFAULT_LOW_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
545   /**
546    * GstMultiQueue:high-percent
547    * 
548    * High threshold percent for buffering to finish.
549    */
550   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
551       g_param_spec_int ("high-percent", "High percent",
552           "High threshold for buffering to finish", 0, 100,
553           DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
554
555   /**
556    * GstMultiQueue:sync-by-running-time
557    * 
558    * If enabled multiqueue will synchronize deactivated or not-linked streams
559    * to the activated and linked streams by taking the running time.
560    * Otherwise multiqueue will synchronize the deactivated or not-linked
561    * streams by keeping the order in which buffers and events arrived compared
562    * to active and linked streams.
563    */
564   g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
565       g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
566           "Synchronize deactivated or not-linked streams by running time",
567           DEFAULT_SYNC_BY_RUNNING_TIME,
568           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
569
570   g_object_class_install_property (gobject_class, PROP_USE_INTERLEAVE,
571       g_param_spec_boolean ("use-interleave", "Use interleave",
572           "Adjust time limits based on input interleave",
573           DEFAULT_USE_INTERLEAVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
574
575   g_object_class_install_property (gobject_class, PROP_UNLINKED_CACHE_TIME,
576       g_param_spec_uint64 ("unlinked-cache-time", "Unlinked cache time (ns)",
577           "Extra buffering in time for unlinked streams (if 'sync-by-running-time')",
578           0, G_MAXUINT64, DEFAULT_UNLINKED_CACHE_TIME,
579           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
580           G_PARAM_STATIC_STRINGS));
581
582
583   gobject_class->finalize = gst_multi_queue_finalize;
584
585   gst_element_class_set_static_metadata (gstelement_class,
586       "MultiQueue",
587       "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
588   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
589   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
590
591   gstelement_class->request_new_pad =
592       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
593   gstelement_class->release_pad =
594       GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
595   gstelement_class->change_state =
596       GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
597 }
598
599 static void
600 gst_multi_queue_init (GstMultiQueue * mqueue)
601 {
602   mqueue->nbqueues = 0;
603   mqueue->queues = NULL;
604
605   mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
606   mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
607   mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
608
609   mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
610   mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
611   mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
612
613   mqueue->use_buffering = DEFAULT_USE_BUFFERING;
614   mqueue->low_percent = DEFAULT_LOW_PERCENT;
615   mqueue->high_percent = DEFAULT_HIGH_PERCENT;
616
617   mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
618   mqueue->use_interleave = DEFAULT_USE_INTERLEAVE;
619   mqueue->unlinked_cache_time = DEFAULT_UNLINKED_CACHE_TIME;
620
621   mqueue->counter = 1;
622   mqueue->highid = -1;
623   mqueue->high_time = GST_CLOCK_STIME_NONE;
624
625   g_mutex_init (&mqueue->qlock);
626   g_mutex_init (&mqueue->buffering_post_lock);
627 }
628
629 static void
630 gst_multi_queue_finalize (GObject * object)
631 {
632   GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
633
634   g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
635   g_list_free (mqueue->queues);
636   mqueue->queues = NULL;
637   mqueue->queues_cookie++;
638
639   /* free/unref instance data */
640   g_mutex_clear (&mqueue->qlock);
641   g_mutex_clear (&mqueue->buffering_post_lock);
642
643   G_OBJECT_CLASS (parent_class)->finalize (object);
644 }
645
646 #define SET_CHILD_PROPERTY(mq,format) G_STMT_START {            \
647     GList * tmp = mq->queues;                                   \
648     while (tmp) {                                               \
649       GstSingleQueue *q = (GstSingleQueue*)tmp->data;           \
650       q->max_size.format = mq->max_size.format;                 \
651       update_buffering (mq, q);                                 \
652       gst_data_queue_limits_changed (q->queue);                 \
653       tmp = g_list_next(tmp);                                   \
654     };                                                          \
655 } G_STMT_END
656
657 static void
658 gst_multi_queue_set_property (GObject * object, guint prop_id,
659     const GValue * value, GParamSpec * pspec)
660 {
661   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
662
663   switch (prop_id) {
664     case PROP_MAX_SIZE_BYTES:
665       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
666       mq->max_size.bytes = g_value_get_uint (value);
667       SET_CHILD_PROPERTY (mq, bytes);
668       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
669       gst_multi_queue_post_buffering (mq);
670       break;
671     case PROP_MAX_SIZE_BUFFERS:
672     {
673       GList *tmp;
674       gint new_size = g_value_get_uint (value);
675
676       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
677
678       mq->max_size.visible = new_size;
679
680       tmp = mq->queues;
681       while (tmp) {
682         GstDataQueueSize size;
683         GstSingleQueue *q = (GstSingleQueue *) tmp->data;
684         gst_data_queue_get_level (q->queue, &size);
685
686         GST_DEBUG_OBJECT (mq, "Queue %d: Requested buffers size: %d,"
687             " current: %d, current max %d", q->id, new_size, size.visible,
688             q->max_size.visible);
689
690         /* do not reduce max size below current level if the single queue
691          * has grown because of empty queue */
692         if (new_size == 0) {
693           q->max_size.visible = new_size;
694         } else if (q->max_size.visible == 0) {
695           q->max_size.visible = MAX (new_size, size.visible);
696         } else if (new_size > size.visible) {
697           q->max_size.visible = new_size;
698         }
699         update_buffering (mq, q);
700         gst_data_queue_limits_changed (q->queue);
701         tmp = g_list_next (tmp);
702       }
703
704       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
705       gst_multi_queue_post_buffering (mq);
706
707       break;
708     }
709     case PROP_MAX_SIZE_TIME:
710       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
711       mq->max_size.time = g_value_get_uint64 (value);
712       SET_CHILD_PROPERTY (mq, time);
713       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
714       gst_multi_queue_post_buffering (mq);
715       break;
716     case PROP_EXTRA_SIZE_BYTES:
717       mq->extra_size.bytes = g_value_get_uint (value);
718       break;
719     case PROP_EXTRA_SIZE_BUFFERS:
720       mq->extra_size.visible = g_value_get_uint (value);
721       break;
722     case PROP_EXTRA_SIZE_TIME:
723       mq->extra_size.time = g_value_get_uint64 (value);
724       break;
725     case PROP_USE_BUFFERING:
726       mq->use_buffering = g_value_get_boolean (value);
727       recheck_buffering_status (mq);
728       break;
729     case PROP_LOW_PERCENT:
730       mq->low_percent = g_value_get_int (value);
731       /* Recheck buffering status - the new low-percent value might
732        * be above the current fill level. If the old low-percent one
733        * was below the current level, this means that mq->buffering is
734        * disabled and needs to be re-enabled. */
735       recheck_buffering_status (mq);
736       break;
737     case PROP_HIGH_PERCENT:
738       mq->high_percent = g_value_get_int (value);
739       recheck_buffering_status (mq);
740       break;
741     case PROP_SYNC_BY_RUNNING_TIME:
742       mq->sync_by_running_time = g_value_get_boolean (value);
743       break;
744     case PROP_USE_INTERLEAVE:
745       mq->use_interleave = g_value_get_boolean (value);
746       break;
747     case PROP_UNLINKED_CACHE_TIME:
748       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
749       mq->unlinked_cache_time = g_value_get_uint64 (value);
750       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
751       gst_multi_queue_post_buffering (mq);
752       break;
753     default:
754       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
755       break;
756   }
757 }
758
759 static void
760 gst_multi_queue_get_property (GObject * object, guint prop_id,
761     GValue * value, GParamSpec * pspec)
762 {
763   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
764
765   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
766
767   switch (prop_id) {
768     case PROP_EXTRA_SIZE_BYTES:
769       g_value_set_uint (value, mq->extra_size.bytes);
770       break;
771     case PROP_EXTRA_SIZE_BUFFERS:
772       g_value_set_uint (value, mq->extra_size.visible);
773       break;
774     case PROP_EXTRA_SIZE_TIME:
775       g_value_set_uint64 (value, mq->extra_size.time);
776       break;
777     case PROP_MAX_SIZE_BYTES:
778       g_value_set_uint (value, mq->max_size.bytes);
779       break;
780     case PROP_MAX_SIZE_BUFFERS:
781       g_value_set_uint (value, mq->max_size.visible);
782       break;
783     case PROP_MAX_SIZE_TIME:
784       g_value_set_uint64 (value, mq->max_size.time);
785       break;
786     case PROP_USE_BUFFERING:
787       g_value_set_boolean (value, mq->use_buffering);
788       break;
789     case PROP_LOW_PERCENT:
790       g_value_set_int (value, mq->low_percent);
791       break;
792     case PROP_HIGH_PERCENT:
793       g_value_set_int (value, mq->high_percent);
794       break;
795     case PROP_SYNC_BY_RUNNING_TIME:
796       g_value_set_boolean (value, mq->sync_by_running_time);
797       break;
798     case PROP_USE_INTERLEAVE:
799       g_value_set_boolean (value, mq->use_interleave);
800       break;
801     case PROP_UNLINKED_CACHE_TIME:
802       g_value_set_uint64 (value, mq->unlinked_cache_time);
803       break;
804     default:
805       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
806       break;
807   }
808
809   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
810 }
811
812 static GstIterator *
813 gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
814 {
815   GstIterator *it = NULL;
816   GstPad *opad;
817   GstSingleQueue *squeue;
818   GstMultiQueue *mq = GST_MULTI_QUEUE (parent);
819   GValue val = { 0, };
820
821   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
822   squeue = gst_pad_get_element_private (pad);
823   if (!squeue)
824     goto out;
825
826   if (squeue->sinkpad == pad)
827     opad = gst_object_ref (squeue->srcpad);
828   else if (squeue->srcpad == pad)
829     opad = gst_object_ref (squeue->sinkpad);
830   else
831     goto out;
832
833   g_value_init (&val, GST_TYPE_PAD);
834   g_value_set_object (&val, opad);
835   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
836   g_value_unset (&val);
837
838   gst_object_unref (opad);
839
840 out:
841   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
842
843   return it;
844 }
845
846
847 /*
848  * GstElement methods
849  */
850
851 static GstPad *
852 gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
853     const gchar * name, const GstCaps * caps)
854 {
855   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
856   GstSingleQueue *squeue;
857   GstPad *new_pad;
858   guint temp_id = -1;
859
860   if (name) {
861     sscanf (name + 4, "_%u", &temp_id);
862     GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
863   }
864
865   /* Create a new single queue, add the sink and source pad and return the sink pad */
866   squeue = gst_single_queue_new (mqueue, temp_id);
867
868   new_pad = squeue ? squeue->sinkpad : NULL;
869
870   GST_DEBUG_OBJECT (mqueue, "Returning pad %" GST_PTR_FORMAT, new_pad);
871
872   return new_pad;
873 }
874
875 static void
876 gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
877 {
878   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
879   GstSingleQueue *sq = NULL;
880   GList *tmp;
881
882   GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
883
884   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
885   /* Find which single queue it belongs to, knowing that it should be a sinkpad */
886   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
887     sq = (GstSingleQueue *) tmp->data;
888
889     if (sq->sinkpad == pad)
890       break;
891   }
892
893   if (!tmp) {
894     GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
895     GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
896     return;
897   }
898
899   /* FIXME: The removal of the singlequeue should probably not happen until it
900    * finishes draining */
901
902   /* remove it from the list */
903   mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
904   mqueue->queues_cookie++;
905
906   /* FIXME : recompute next-non-linked */
907   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
908
909   /* delete SingleQueue */
910   gst_data_queue_set_flushing (sq->queue, TRUE);
911
912   gst_pad_set_active (sq->srcpad, FALSE);
913   gst_pad_set_active (sq->sinkpad, FALSE);
914   gst_pad_set_element_private (sq->srcpad, NULL);
915   gst_pad_set_element_private (sq->sinkpad, NULL);
916   gst_element_remove_pad (element, sq->srcpad);
917   gst_element_remove_pad (element, sq->sinkpad);
918   gst_single_queue_free (sq);
919 }
920
921 static GstStateChangeReturn
922 gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
923 {
924   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
925   GstSingleQueue *sq = NULL;
926   GstStateChangeReturn result;
927
928   switch (transition) {
929     case GST_STATE_CHANGE_READY_TO_PAUSED:{
930       GList *tmp;
931
932       /* Set all pads to non-flushing */
933       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
934       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
935         sq = (GstSingleQueue *) tmp->data;
936         sq->flushing = FALSE;
937       }
938
939       /* the visible limit might not have been set on single queues that have grown because of other queueus were empty */
940       SET_CHILD_PROPERTY (mqueue, visible);
941
942       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
943       gst_multi_queue_post_buffering (mqueue);
944
945       break;
946     }
947     case GST_STATE_CHANGE_PAUSED_TO_READY:{
948       GList *tmp;
949
950       /* Un-wait all waiting pads */
951       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
952       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
953         sq = (GstSingleQueue *) tmp->data;
954         sq->flushing = TRUE;
955         g_cond_signal (&sq->turn);
956
957         sq->last_query = FALSE;
958         g_cond_signal (&sq->query_handled);
959       }
960       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
961       break;
962     }
963     default:
964       break;
965   }
966
967   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
968
969   switch (transition) {
970     default:
971       break;
972   }
973
974   return result;
975 }
976
977 static gboolean
978 gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
979     gboolean full)
980 {
981   gboolean result;
982
983   GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
984       sq->id);
985
986   if (flush) {
987     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
988     sq->srcresult = GST_FLOW_FLUSHING;
989     gst_data_queue_set_flushing (sq->queue, TRUE);
990
991     sq->flushing = TRUE;
992
993     /* wake up non-linked task */
994     GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
995         sq->id);
996     g_cond_signal (&sq->turn);
997     sq->last_query = FALSE;
998     g_cond_signal (&sq->query_handled);
999     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1000
1001     GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
1002     result = gst_pad_pause_task (sq->srcpad);
1003     sq->sink_tainted = sq->src_tainted = TRUE;
1004   } else {
1005     gst_single_queue_flush_queue (sq, full);
1006
1007     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1008     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
1009     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1010     sq->has_src_segment = FALSE;
1011     /* All pads start off not-linked for a smooth kick-off */
1012     sq->srcresult = GST_FLOW_OK;
1013     sq->pushed = FALSE;
1014     sq->cur_time = 0;
1015     sq->max_size.visible = mq->max_size.visible;
1016     sq->is_eos = FALSE;
1017     sq->nextid = 0;
1018     sq->oldid = 0;
1019     sq->last_oldid = G_MAXUINT32;
1020     sq->next_time = GST_CLOCK_STIME_NONE;
1021     sq->last_time = GST_CLOCK_STIME_NONE;
1022     sq->cached_sinktime = GST_CLOCK_STIME_NONE;
1023     sq->group_high_time = GST_CLOCK_STIME_NONE;
1024     gst_data_queue_set_flushing (sq->queue, FALSE);
1025
1026     /* We will become active again on the next buffer/gap */
1027     sq->active = FALSE;
1028
1029     /* Reset high time to be recomputed next */
1030     mq->high_time = GST_CLOCK_STIME_NONE;
1031
1032     sq->flushing = FALSE;
1033     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1034
1035     GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
1036     result =
1037         gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
1038         sq->srcpad, NULL);
1039   }
1040   return result;
1041 }
1042
1043 /* WITH LOCK TAKEN */
1044 static gint
1045 get_percentage (GstSingleQueue * sq)
1046 {
1047   GstDataQueueSize size;
1048   gint percent, tmp;
1049
1050   gst_data_queue_get_level (sq->queue, &size);
1051
1052   GST_DEBUG_OBJECT (sq->mqueue,
1053       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
1054       G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
1055       size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
1056
1057   /* get bytes and time percentages and take the max */
1058   if (sq->is_eos || sq->srcresult == GST_FLOW_NOT_LINKED || sq->is_sparse) {
1059     percent = 100;
1060   } else {
1061     percent = 0;
1062     if (sq->max_size.time > 0) {
1063       tmp = (sq->cur_time * 100) / sq->max_size.time;
1064       percent = MAX (percent, tmp);
1065     }
1066     if (sq->max_size.bytes > 0) {
1067       tmp = (size.bytes * 100) / sq->max_size.bytes;
1068       percent = MAX (percent, tmp);
1069     }
1070   }
1071
1072   return percent;
1073 }
1074
1075 /* WITH LOCK TAKEN */
1076 static void
1077 update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
1078 {
1079   gint percent;
1080
1081   /* nothing to dowhen we are not in buffering mode */
1082   if (!mq->use_buffering)
1083     return;
1084
1085   percent = get_percentage (sq);
1086
1087   if (mq->buffering) {
1088     if (percent >= mq->high_percent) {
1089       mq->buffering = FALSE;
1090     }
1091     /* make sure it increases */
1092     percent = MAX (mq->percent, percent);
1093
1094     SET_PERCENT (mq, percent);
1095   } else {
1096     GList *iter;
1097     gboolean is_buffering = TRUE;
1098
1099     for (iter = mq->queues; iter; iter = g_list_next (iter)) {
1100       GstSingleQueue *oq = (GstSingleQueue *) iter->data;
1101
1102       if (get_percentage (oq) >= mq->high_percent) {
1103         is_buffering = FALSE;
1104
1105         break;
1106       }
1107     }
1108
1109     if (is_buffering && percent < mq->low_percent) {
1110       mq->buffering = TRUE;
1111       SET_PERCENT (mq, percent);
1112     }
1113   }
1114 }
1115
1116 static void
1117 gst_multi_queue_post_buffering (GstMultiQueue * mq)
1118 {
1119   GstMessage *msg = NULL;
1120
1121   g_mutex_lock (&mq->buffering_post_lock);
1122   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1123   if (mq->percent_changed) {
1124     gint percent = mq->percent;
1125
1126     mq->percent_changed = FALSE;
1127
1128     percent = percent * 100 / mq->high_percent;
1129     /* clip */
1130     if (percent > 100)
1131       percent = 100;
1132
1133     GST_DEBUG_OBJECT (mq, "Going to post buffering: %d%%", percent);
1134     msg = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
1135   }
1136   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1137
1138   if (msg != NULL)
1139     gst_element_post_message (GST_ELEMENT_CAST (mq), msg);
1140
1141   g_mutex_unlock (&mq->buffering_post_lock);
1142 }
1143
1144 static void
1145 recheck_buffering_status (GstMultiQueue * mq)
1146 {
1147   if (!mq->use_buffering && mq->buffering) {
1148     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1149     mq->buffering = FALSE;
1150     GST_DEBUG_OBJECT (mq,
1151         "Buffering property disabled, but queue was still buffering; setting percentage to 100%%");
1152     SET_PERCENT (mq, 100);
1153     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1154   }
1155
1156   if (mq->use_buffering) {
1157     GList *tmp;
1158     gint old_perc;
1159
1160     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1161
1162     /* force fill level percentage to be recalculated */
1163     old_perc = mq->percent;
1164     mq->percent = 0;
1165
1166     tmp = mq->queues;
1167     while (tmp) {
1168       GstSingleQueue *q = (GstSingleQueue *) tmp->data;
1169       update_buffering (mq, q);
1170       gst_data_queue_limits_changed (q->queue);
1171       tmp = g_list_next (tmp);
1172     }
1173
1174     GST_DEBUG_OBJECT (mq, "Recalculated fill level: old: %d%% new: %d%%",
1175         old_perc, mq->percent);
1176
1177     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1178   }
1179
1180   gst_multi_queue_post_buffering (mq);
1181 }
1182
1183 static void
1184 calculate_interleave (GstMultiQueue * mq)
1185 {
1186   GstClockTimeDiff low, high;
1187   GstClockTime interleave;
1188   GList *tmp;
1189
1190   low = high = GST_CLOCK_STIME_NONE;
1191   interleave = mq->interleave;
1192   /* Go over all single queues and calculate lowest/highest value */
1193   for (tmp = mq->queues; tmp; tmp = tmp->next) {
1194     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1195     /* Ignore sparse streams for interleave calculation */
1196     if (sq->is_sparse)
1197       continue;
1198     /* If a stream is not active yet (hasn't received any buffers), set
1199      * a maximum interleave to allow it to receive more data */
1200     if (!sq->active) {
1201       GST_LOG_OBJECT (mq,
1202           "queue %d is not active yet, forcing interleave to 5s", sq->id);
1203       mq->interleave = 5 * GST_SECOND;
1204       /* Update max-size time */
1205       mq->max_size.time = mq->interleave;
1206       SET_CHILD_PROPERTY (mq, time);
1207       goto beach;
1208     }
1209     if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) {
1210       if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low)
1211         low = sq->cached_sinktime;
1212       if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high)
1213         high = sq->cached_sinktime;
1214     }
1215     GST_LOG_OBJECT (mq,
1216         "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
1217         " high:%" GST_STIME_FORMAT, sq->id,
1218         GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low),
1219         GST_STIME_ARGS (high));
1220   }
1221
1222   if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
1223     interleave = high - low;
1224     /* Padding of interleave and minimum value */
1225     /* FIXME : Make the minimum time interleave a property */
1226     interleave = (150 * interleave / 100) + 250 * GST_MSECOND;
1227
1228     /* Update the stored interleave if:
1229      * * No data has arrived yet (high == low)
1230      * * Or it went higher
1231      * * Or it went lower and we've gone past the previous interleave needed */
1232     if (high == low || interleave > mq->interleave ||
1233         ((mq->last_interleave_update + (2 * MIN (GST_SECOND,
1234                         mq->interleave)) < low)
1235             && interleave < (mq->interleave * 3 / 4))) {
1236       /* Update the interleave */
1237       mq->interleave = interleave;
1238       mq->last_interleave_update = high;
1239       /* Update max-size time */
1240       mq->max_size.time = mq->interleave;
1241       SET_CHILD_PROPERTY (mq, time);
1242     }
1243   }
1244
1245 beach:
1246   GST_DEBUG_OBJECT (mq,
1247       "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
1248       GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
1249       " last_interleave_update:%" GST_STIME_FORMAT, GST_STIME_ARGS (low),
1250       GST_STIME_ARGS (high), GST_TIME_ARGS (interleave),
1251       GST_TIME_ARGS (mq->interleave),
1252       GST_STIME_ARGS (mq->last_interleave_update));
1253 }
1254
1255
1256 /* calculate the diff between running time on the sink and src of the queue.
1257  * This is the total amount of time in the queue. 
1258  * WITH LOCK TAKEN */
1259 static void
1260 update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
1261 {
1262   GstClockTimeDiff sink_time, src_time;
1263
1264   if (sq->sink_tainted) {
1265     sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment,
1266         sq->sink_segment.position);
1267
1268     GST_DEBUG_OBJECT (mq,
1269         "queue %d sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%"
1270         GST_STIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position),
1271         GST_STIME_ARGS (sink_time));
1272
1273     if (G_UNLIKELY (sq->last_time == GST_CLOCK_STIME_NONE)) {
1274       /* If the single queue still doesn't have a last_time set, this means
1275        * that nothing has been pushed out yet.
1276        * In order for the high_time computation to be as efficient as possible,
1277        * we set the last_time */
1278       sq->last_time = sink_time;
1279     }
1280     if (G_UNLIKELY (sink_time != GST_CLOCK_STIME_NONE)) {
1281       /* if we have a time, we become untainted and use the time */
1282       sq->sink_tainted = FALSE;
1283       if (mq->use_interleave) {
1284         sq->cached_sinktime = sink_time;
1285         calculate_interleave (mq);
1286       }
1287     }
1288   } else
1289     sink_time = sq->sinktime;
1290
1291   if (sq->src_tainted) {
1292     GstSegment *segment;
1293     gint64 position;
1294
1295     if (sq->has_src_segment) {
1296       segment = &sq->src_segment;
1297       position = sq->src_segment.position;
1298     } else {
1299       /*
1300        * If the src pad had no segment yet, use the sink segment
1301        * to avoid signalling overrun if the received sink segment has a
1302        * a position > max-size-time while the src pad time would be the default=0
1303        *
1304        * This can happen when switching pads on chained/adaptive streams and the
1305        * new chain has a segment with a much larger position
1306        */
1307       segment = &sq->sink_segment;
1308       position = sq->sink_segment.position;
1309     }
1310
1311     src_time = sq->srctime = my_segment_to_running_time (segment, position);
1312     /* if we have a time, we become untainted and use the time */
1313     if (G_UNLIKELY (src_time != GST_CLOCK_STIME_NONE)) {
1314       sq->src_tainted = FALSE;
1315     }
1316   } else
1317     src_time = sq->srctime;
1318
1319   GST_DEBUG_OBJECT (mq,
1320       "queue %d, sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, sq->id,
1321       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
1322
1323   /* This allows for streams with out of order timestamping - sometimes the
1324    * emerging timestamp is later than the arriving one(s) */
1325   if (G_LIKELY (GST_CLOCK_STIME_IS_VALID (sink_time) &&
1326           GST_CLOCK_STIME_IS_VALID (src_time) && sink_time > src_time))
1327     sq->cur_time = sink_time - src_time;
1328   else
1329     sq->cur_time = 0;
1330
1331   /* updating the time level can change the buffering state */
1332   update_buffering (mq, sq);
1333
1334   return;
1335 }
1336
1337 /* take a SEGMENT event and apply the values to segment, updating the time
1338  * level of queue. */
1339 static void
1340 apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1341     GstSegment * segment)
1342 {
1343   gst_event_copy_segment (event, segment);
1344
1345   /* now configure the values, we use these to track timestamps on the
1346    * sinkpad. */
1347   if (segment->format != GST_FORMAT_TIME) {
1348     /* non-time format, pretent the current time segment is closed with a
1349      * 0 start and unknown stop time. */
1350     segment->format = GST_FORMAT_TIME;
1351     segment->start = 0;
1352     segment->stop = -1;
1353     segment->time = 0;
1354   }
1355   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1356
1357   /* Make sure we have a valid initial segment position (and not garbage
1358    * from upstream) */
1359   if (segment->rate > 0.0)
1360     segment->position = segment->start;
1361   else
1362     segment->position = segment->stop;
1363   if (segment == &sq->sink_segment)
1364     sq->sink_tainted = TRUE;
1365   else {
1366     sq->has_src_segment = TRUE;
1367     sq->src_tainted = TRUE;
1368   }
1369
1370   GST_DEBUG_OBJECT (mq,
1371       "queue %d, configured SEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
1372
1373   /* segment can update the time level of the queue */
1374   update_time_level (mq, sq);
1375
1376   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1377   gst_multi_queue_post_buffering (mq);
1378 }
1379
1380 /* take a buffer and update segment, updating the time level of the queue. */
1381 static void
1382 apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
1383     GstClockTime duration, GstSegment * segment)
1384 {
1385   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1386
1387   /* if no timestamp is set, assume it's continuous with the previous 
1388    * time */
1389   if (timestamp == GST_CLOCK_TIME_NONE)
1390     timestamp = segment->position;
1391
1392   /* add duration */
1393   if (duration != GST_CLOCK_TIME_NONE)
1394     timestamp += duration;
1395
1396   GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
1397       sq->id, segment == &sq->sink_segment ? "sink" : "src",
1398       GST_TIME_ARGS (timestamp));
1399
1400   segment->position = timestamp;
1401
1402   if (segment == &sq->sink_segment)
1403     sq->sink_tainted = TRUE;
1404   else
1405     sq->src_tainted = TRUE;
1406
1407   /* calc diff with other end */
1408   update_time_level (mq, sq);
1409   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1410   gst_multi_queue_post_buffering (mq);
1411 }
1412
1413 static void
1414 apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1415     GstSegment * segment)
1416 {
1417   GstClockTime timestamp;
1418   GstClockTime duration;
1419
1420   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1421
1422   gst_event_parse_gap (event, &timestamp, &duration);
1423
1424   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1425
1426     if (GST_CLOCK_TIME_IS_VALID (duration)) {
1427       timestamp += duration;
1428     }
1429
1430     segment->position = timestamp;
1431
1432     if (segment == &sq->sink_segment)
1433       sq->sink_tainted = TRUE;
1434     else
1435       sq->src_tainted = TRUE;
1436
1437     /* calc diff with other end */
1438     update_time_level (mq, sq);
1439   }
1440
1441   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1442   gst_multi_queue_post_buffering (mq);
1443 }
1444
1445 static GstClockTimeDiff
1446 get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
1447 {
1448   GstClockTimeDiff time = GST_CLOCK_STIME_NONE;
1449
1450   if (GST_IS_BUFFER (object)) {
1451     GstBuffer *buf = GST_BUFFER_CAST (object);
1452     GstClockTime btime = GST_BUFFER_DTS_OR_PTS (buf);
1453
1454     if (GST_CLOCK_TIME_IS_VALID (btime)) {
1455       if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1456         btime += GST_BUFFER_DURATION (buf);
1457       if (btime > segment->stop)
1458         btime = segment->stop;
1459       time = my_segment_to_running_time (segment, btime);
1460     }
1461   } else if (GST_IS_BUFFER_LIST (object)) {
1462     GstBufferList *list = GST_BUFFER_LIST_CAST (object);
1463     gint i, n;
1464     GstBuffer *buf;
1465
1466     n = gst_buffer_list_length (list);
1467     for (i = 0; i < n; i++) {
1468       GstClockTime btime;
1469       buf = gst_buffer_list_get (list, i);
1470       btime = GST_BUFFER_DTS_OR_PTS (buf);
1471       if (GST_CLOCK_TIME_IS_VALID (btime)) {
1472         if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1473           btime += GST_BUFFER_DURATION (buf);
1474         if (btime > segment->stop)
1475           btime = segment->stop;
1476         time = my_segment_to_running_time (segment, btime);
1477         if (!end)
1478           goto done;
1479       } else if (!end) {
1480         goto done;
1481       }
1482     }
1483   } else if (GST_IS_EVENT (object)) {
1484     GstEvent *event = GST_EVENT_CAST (object);
1485
1486     /* For newsegment events return the running time of the start position */
1487     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1488       const GstSegment *new_segment;
1489
1490       gst_event_parse_segment (event, &new_segment);
1491       if (new_segment->format == GST_FORMAT_TIME) {
1492         time =
1493             my_segment_to_running_time ((GstSegment *) new_segment,
1494             new_segment->start);
1495       }
1496     }
1497   }
1498
1499 done:
1500   return time;
1501 }
1502
1503 static GstFlowReturn
1504 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
1505     GstMiniObject * object, gboolean * allow_drop)
1506 {
1507   GstFlowReturn result = sq->srcresult;
1508
1509   if (GST_IS_BUFFER (object)) {
1510     GstBuffer *buffer;
1511     GstClockTime timestamp, duration;
1512
1513     buffer = GST_BUFFER_CAST (object);
1514     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1515     duration = GST_BUFFER_DURATION (buffer);
1516
1517     apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
1518
1519     /* Applying the buffer may have made the queue non-full again, unblock it if needed */
1520     gst_data_queue_limits_changed (sq->queue);
1521
1522     if (G_UNLIKELY (*allow_drop)) {
1523       GST_DEBUG_OBJECT (mq,
1524           "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
1525           sq->id, buffer, GST_TIME_ARGS (timestamp));
1526       gst_buffer_unref (buffer);
1527     } else {
1528       GST_DEBUG_OBJECT (mq,
1529           "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
1530           sq->id, buffer, GST_TIME_ARGS (timestamp));
1531       result = gst_pad_push (sq->srcpad, buffer);
1532     }
1533   } else if (GST_IS_EVENT (object)) {
1534     GstEvent *event;
1535
1536     event = GST_EVENT_CAST (object);
1537
1538     switch (GST_EVENT_TYPE (event)) {
1539       case GST_EVENT_EOS:
1540         result = GST_FLOW_EOS;
1541         if (G_UNLIKELY (*allow_drop))
1542           *allow_drop = FALSE;
1543         break;
1544       case GST_EVENT_SEGMENT:
1545         apply_segment (mq, sq, event, &sq->src_segment);
1546         /* Applying the segment may have made the queue non-full again, unblock it if needed */
1547         gst_data_queue_limits_changed (sq->queue);
1548         if (G_UNLIKELY (*allow_drop)) {
1549           result = GST_FLOW_OK;
1550           *allow_drop = FALSE;
1551         }
1552         break;
1553       case GST_EVENT_GAP:
1554         apply_gap (mq, sq, event, &sq->src_segment);
1555         /* Applying the gap may have made the queue non-full again, unblock it if needed */
1556         gst_data_queue_limits_changed (sq->queue);
1557         break;
1558       default:
1559         break;
1560     }
1561
1562     if (G_UNLIKELY (*allow_drop)) {
1563       GST_DEBUG_OBJECT (mq,
1564           "SingleQueue %d : Dropping EOS event %p of type %s",
1565           sq->id, event, GST_EVENT_TYPE_NAME (event));
1566       gst_event_unref (event);
1567     } else {
1568       GST_DEBUG_OBJECT (mq,
1569           "SingleQueue %d : Pushing event %p of type %s",
1570           sq->id, event, GST_EVENT_TYPE_NAME (event));
1571
1572       gst_pad_push_event (sq->srcpad, event);
1573     }
1574   } else if (GST_IS_QUERY (object)) {
1575     GstQuery *query;
1576     gboolean res;
1577
1578     query = GST_QUERY_CAST (object);
1579
1580     if (G_UNLIKELY (*allow_drop)) {
1581       GST_DEBUG_OBJECT (mq,
1582           "SingleQueue %d : Dropping EOS query %p", sq->id, query);
1583       gst_query_unref (query);
1584       res = FALSE;
1585     } else {
1586       res = gst_pad_peer_query (sq->srcpad, query);
1587     }
1588
1589     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1590     sq->last_query = res;
1591     sq->last_handled_query = query;
1592     g_cond_signal (&sq->query_handled);
1593     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1594   } else {
1595     g_warning ("Unexpected object in singlequeue %u (refcounting problem?)",
1596         sq->id);
1597   }
1598   return result;
1599
1600   /* ERRORS */
1601 }
1602
1603 static GstMiniObject *
1604 gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
1605 {
1606   GstMiniObject *res;
1607
1608   res = item->object;
1609   item->object = NULL;
1610
1611   return res;
1612 }
1613
1614 static void
1615 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
1616 {
1617   if (!item->is_query && item->object)
1618     gst_mini_object_unref (item->object);
1619   g_slice_free (GstMultiQueueItem, item);
1620 }
1621
1622 /* takes ownership of passed mini object! */
1623 static GstMultiQueueItem *
1624 gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
1625 {
1626   GstMultiQueueItem *item;
1627
1628   item = g_slice_new (GstMultiQueueItem);
1629   item->object = object;
1630   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1631   item->posid = curid;
1632   item->is_query = GST_IS_QUERY (object);
1633
1634   item->size = gst_buffer_get_size (GST_BUFFER_CAST (object));
1635   item->duration = GST_BUFFER_DURATION (object);
1636   if (item->duration == GST_CLOCK_TIME_NONE)
1637     item->duration = 0;
1638   item->visible = TRUE;
1639   return item;
1640 }
1641
1642 static GstMultiQueueItem *
1643 gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
1644 {
1645   GstMultiQueueItem *item;
1646
1647   item = g_slice_new (GstMultiQueueItem);
1648   item->object = object;
1649   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1650   item->posid = curid;
1651   item->is_query = GST_IS_QUERY (object);
1652
1653   item->size = 0;
1654   item->duration = 0;
1655   item->visible = FALSE;
1656   return item;
1657 }
1658
1659 /* Each main loop attempts to push buffers until the return value
1660  * is not-linked. not-linked pads are not allowed to push data beyond
1661  * any linked pads, so they don't 'rush ahead of the pack'.
1662  */
1663 static void
1664 gst_multi_queue_loop (GstPad * pad)
1665 {
1666   GstSingleQueue *sq;
1667   GstMultiQueueItem *item;
1668   GstDataQueueItem *sitem;
1669   GstMultiQueue *mq;
1670   GstMiniObject *object = NULL;
1671   guint32 newid;
1672   GstFlowReturn result;
1673   GstClockTimeDiff next_time;
1674   gboolean is_buffer;
1675   gboolean do_update_buffering = FALSE;
1676   gboolean dropping = FALSE;
1677
1678   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1679   mq = sq->mqueue;
1680
1681 next:
1682   GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
1683
1684   if (sq->flushing)
1685     goto out_flushing;
1686
1687   /* Get something from the queue, blocking until that happens, or we get
1688    * flushed */
1689   if (!(gst_data_queue_pop (sq->queue, &sitem)))
1690     goto out_flushing;
1691
1692   item = (GstMultiQueueItem *) sitem;
1693   newid = item->posid;
1694
1695   /* steal the object and destroy the item */
1696   object = gst_multi_queue_item_steal_object (item);
1697   gst_multi_queue_item_destroy (item);
1698
1699   is_buffer = GST_IS_BUFFER (object);
1700
1701   /* Get running time of the item. Events will have GST_CLOCK_STIME_NONE */
1702   next_time = get_running_time (&sq->src_segment, object, FALSE);
1703
1704   GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
1705       sq->id, newid, sq->last_oldid);
1706
1707   /* If we're not-linked, we do some extra work because we might need to
1708    * wait before pushing. If we're linked but there's a gap in the IDs,
1709    * or it's the first loop, or we just passed the previous highid,
1710    * we might need to wake some sleeping pad up, so there's extra work
1711    * there too */
1712   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1713   if (sq->srcresult == GST_FLOW_NOT_LINKED
1714       || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
1715       || sq->last_oldid > mq->highid) {
1716     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
1717         gst_flow_get_name (sq->srcresult));
1718
1719     /* Check again if we're flushing after the lock is taken,
1720      * the flush flag might have been changed in the meantime */
1721     if (sq->flushing) {
1722       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1723       goto out_flushing;
1724     }
1725
1726     /* Update the nextid so other threads know when to wake us up */
1727     sq->nextid = newid;
1728     /* Take into account the extra cache time since we're unlinked */
1729     if (GST_CLOCK_STIME_IS_VALID (next_time))
1730       next_time += mq->unlinked_cache_time;
1731     sq->next_time = next_time;
1732
1733     /* Update the oldid (the last ID we output) for highid tracking */
1734     if (sq->last_oldid != G_MAXUINT32)
1735       sq->oldid = sq->last_oldid;
1736
1737     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1738       gboolean should_wait;
1739       /* Go to sleep until it's time to push this buffer */
1740
1741       /* Recompute the highid */
1742       compute_high_id (mq);
1743       /* Recompute the high time */
1744       compute_high_time (mq, sq->groupid);
1745
1746       GST_DEBUG_OBJECT (mq,
1747           "groupid %d high_time %" GST_STIME_FORMAT " next_time %"
1748           GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (sq->group_high_time),
1749           GST_STIME_ARGS (next_time));
1750
1751       if (mq->sync_by_running_time)
1752         /* In this case we only need to wait if:
1753          * 1) there is a time against which to wait
1754          * 2) and either we have gone over the high_time or there is no
1755          *   high_time */
1756         should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
1757             (sq->group_high_time == GST_CLOCK_STIME_NONE
1758             || next_time > sq->group_high_time);
1759       else
1760         should_wait = newid > mq->highid;
1761
1762       while (should_wait && sq->srcresult == GST_FLOW_NOT_LINKED) {
1763
1764         GST_DEBUG_OBJECT (mq,
1765             "queue %d sleeping for not-linked wakeup with "
1766             "newid %u, highid %u, next_time %" GST_STIME_FORMAT
1767             ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
1768             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
1769
1770         /* Wake up all non-linked pads before we sleep */
1771         wake_up_next_non_linked (mq);
1772
1773         mq->numwaiting++;
1774         g_cond_wait (&sq->turn, &mq->qlock);
1775         mq->numwaiting--;
1776
1777         if (sq->flushing) {
1778           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1779           goto out_flushing;
1780         }
1781
1782         /* Recompute the high time and ID */
1783         compute_high_time (mq, sq->groupid);
1784         compute_high_id (mq);
1785
1786         GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
1787             "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
1788             ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
1789             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
1790
1791         if (mq->sync_by_running_time)
1792           should_wait = GST_CLOCK_STIME_IS_VALID (next_time) &&
1793               (sq->group_high_time == GST_CLOCK_STIME_NONE
1794               || next_time > sq->group_high_time);
1795         else
1796           should_wait = newid > mq->highid;
1797       }
1798
1799       /* Re-compute the high_id in case someone else pushed */
1800       compute_high_id (mq);
1801       compute_high_time (mq, sq->groupid);
1802     } else {
1803       compute_high_id (mq);
1804       compute_high_time (mq, sq->groupid);
1805       /* Wake up all non-linked pads */
1806       wake_up_next_non_linked (mq);
1807     }
1808     /* We're done waiting, we can clear the nextid and nexttime */
1809     sq->nextid = 0;
1810     sq->next_time = GST_CLOCK_STIME_NONE;
1811   }
1812   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1813
1814   if (sq->flushing)
1815     goto out_flushing;
1816
1817   GST_LOG_OBJECT (mq, "sq:%d BEFORE PUSHING sq->srcresult: %s", sq->id,
1818       gst_flow_get_name (sq->srcresult));
1819
1820   /* Update time stats */
1821   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1822   next_time = get_running_time (&sq->src_segment, object, TRUE);
1823   if (GST_CLOCK_STIME_IS_VALID (next_time)) {
1824     if (sq->last_time == GST_CLOCK_STIME_NONE || sq->last_time < next_time)
1825       sq->last_time = next_time;
1826     if (mq->high_time == GST_CLOCK_STIME_NONE || mq->high_time <= next_time) {
1827       /* Wake up all non-linked pads now that we advanced the high time */
1828       mq->high_time = next_time;
1829       wake_up_next_non_linked (mq);
1830     }
1831   }
1832   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1833
1834   /* Try to push out the new object */
1835   result = gst_single_queue_push_one (mq, sq, object, &dropping);
1836   object = NULL;
1837
1838   /* Check if we pushed something already and if this is
1839    * now a switch from an active to a non-active stream.
1840    *
1841    * If it is, we reset all the waiting streams, let them
1842    * push another buffer to see if they're now active again.
1843    * This allows faster switching between streams and prevents
1844    * deadlocks if downstream does any waiting too.
1845    */
1846   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1847   if (sq->pushed && sq->srcresult == GST_FLOW_OK
1848       && result == GST_FLOW_NOT_LINKED) {
1849     GList *tmp;
1850
1851     GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
1852         sq->id);
1853
1854     compute_high_id (mq);
1855     compute_high_time (mq, sq->groupid);
1856     do_update_buffering = TRUE;
1857
1858     /* maybe no-one is waiting */
1859     if (mq->numwaiting > 0) {
1860       /* Else figure out which singlequeue(s) need waking up */
1861       for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1862         GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
1863
1864         if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
1865           GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
1866           sq2->pushed = FALSE;
1867           sq2->srcresult = GST_FLOW_OK;
1868           g_cond_signal (&sq2->turn);
1869         }
1870       }
1871     }
1872   }
1873
1874   if (is_buffer)
1875     sq->pushed = TRUE;
1876
1877   /* now hold on a bit;
1878    * can not simply throw this result to upstream, because
1879    * that might already be onto another segment, so we have to make
1880    * sure we are relaying the correct info wrt proper segment */
1881   if (result == GST_FLOW_EOS && !dropping &&
1882       sq->srcresult != GST_FLOW_NOT_LINKED) {
1883     GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
1884     dropping = TRUE;
1885     /* pretend we have not seen EOS yet for upstream's sake */
1886     result = sq->srcresult;
1887   } else if (dropping && gst_data_queue_is_empty (sq->queue)) {
1888     /* queue empty, so stop dropping
1889      * we can commit the result we have now,
1890      * which is either OK after a segment, or EOS */
1891     GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
1892     dropping = FALSE;
1893     result = GST_FLOW_EOS;
1894   }
1895   sq->srcresult = result;
1896   sq->last_oldid = newid;
1897
1898   if (do_update_buffering)
1899     update_buffering (mq, sq);
1900
1901   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1902   gst_multi_queue_post_buffering (mq);
1903
1904   GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s (is_eos:%d)",
1905       sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (sq->srcpad));
1906
1907   /* Need to make sure wake up any sleeping pads when we exit */
1908   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1909   if (mq->numwaiting > 0 && (GST_PAD_IS_EOS (sq->srcpad)
1910           || sq->srcresult == GST_FLOW_EOS)) {
1911     compute_high_time (mq, sq->groupid);
1912     compute_high_id (mq);
1913     wake_up_next_non_linked (mq);
1914   }
1915   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1916
1917   if (dropping)
1918     goto next;
1919
1920   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
1921       && result != GST_FLOW_EOS)
1922     goto out_flushing;
1923
1924   return;
1925
1926 out_flushing:
1927   {
1928     if (object)
1929       gst_mini_object_unref (object);
1930
1931     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1932     sq->last_query = FALSE;
1933     g_cond_signal (&sq->query_handled);
1934
1935     /* Post an error message if we got EOS while downstream
1936      * has returned an error flow return. After EOS there
1937      * will be no further buffer which could propagate the
1938      * error upstream */
1939     if (sq->is_eos && sq->srcresult < GST_FLOW_EOS) {
1940       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1941       GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
1942     } else {
1943       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1944     }
1945
1946     /* upstream needs to see fatal result ASAP to shut things down,
1947      * but might be stuck in one of our other full queues;
1948      * so empty this one and trigger dynamic queue growth. At
1949      * this point the srcresult is not OK, NOT_LINKED
1950      * or EOS, i.e. a real failure */
1951     gst_single_queue_flush_queue (sq, FALSE);
1952     single_queue_underrun_cb (sq->queue, sq);
1953     gst_data_queue_set_flushing (sq->queue, TRUE);
1954     gst_pad_pause_task (sq->srcpad);
1955     GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
1956         "SingleQueue[%d] task paused, reason:%s",
1957         sq->id, gst_flow_get_name (sq->srcresult));
1958     return;
1959   }
1960 }
1961
1962 /**
1963  * gst_multi_queue_chain:
1964  *
1965  * This is similar to GstQueue's chain function, except:
1966  * _ we don't have leak behaviours,
1967  * _ we push with a unique id (curid)
1968  */
1969 static GstFlowReturn
1970 gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1971 {
1972   GstSingleQueue *sq;
1973   GstMultiQueue *mq;
1974   GstMultiQueueItem *item;
1975   guint32 curid;
1976   GstClockTime timestamp, duration;
1977
1978   sq = gst_pad_get_element_private (pad);
1979   mq = sq->mqueue;
1980
1981   /* if eos, we are always full, so avoid hanging incoming indefinitely */
1982   if (sq->is_eos)
1983     goto was_eos;
1984
1985   sq->active = TRUE;
1986
1987   /* Get a unique incrementing id */
1988   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
1989
1990   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1991   duration = GST_BUFFER_DURATION (buffer);
1992
1993   GST_LOG_OBJECT (mq,
1994       "SingleQueue %d : about to enqueue buffer %p with id %d (pts:%"
1995       GST_TIME_FORMAT " dts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT ")",
1996       sq->id, buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
1997       GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), GST_TIME_ARGS (duration));
1998
1999   item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
2000
2001   /* Update interleave before pushing data into queue */
2002   if (mq->use_interleave) {
2003     GstClockTime val = timestamp;
2004     GstClockTimeDiff dval;
2005
2006     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2007     if (val == GST_CLOCK_TIME_NONE)
2008       val = sq->sink_segment.position;
2009     if (duration != GST_CLOCK_TIME_NONE)
2010       val += duration;
2011
2012     dval = my_segment_to_running_time (&sq->sink_segment, val);
2013     if (GST_CLOCK_STIME_IS_VALID (dval)) {
2014       sq->cached_sinktime = dval;
2015       GST_DEBUG_OBJECT (mq,
2016           "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
2017           GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
2018           GST_STIME_ARGS (sq->cached_sinktime));
2019       calculate_interleave (mq);
2020     }
2021     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2022   }
2023
2024   if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
2025     goto flushing;
2026
2027   /* update time level, we must do this after pushing the data in the queue so
2028    * that we never end up filling the queue first. */
2029   apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
2030
2031 done:
2032   return sq->srcresult;
2033
2034   /* ERRORS */
2035 flushing:
2036   {
2037     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
2038         sq->id, gst_flow_get_name (sq->srcresult));
2039     gst_multi_queue_item_destroy (item);
2040     goto done;
2041   }
2042 was_eos:
2043   {
2044     GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return EOS");
2045     gst_buffer_unref (buffer);
2046     return GST_FLOW_EOS;
2047   }
2048 }
2049
2050 static gboolean
2051 gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
2052     GstPadMode mode, gboolean active)
2053 {
2054   gboolean res;
2055   GstSingleQueue *sq;
2056   GstMultiQueue *mq;
2057
2058   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2059   mq = (GstMultiQueue *) gst_pad_get_parent (pad);
2060
2061   /* mq is NULL if the pad is activated/deactivated before being
2062    * added to the multiqueue */
2063   if (mq)
2064     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2065
2066   switch (mode) {
2067     case GST_PAD_MODE_PUSH:
2068       if (active) {
2069         /* All pads start off linked until they push one buffer */
2070         sq->srcresult = GST_FLOW_OK;
2071         sq->pushed = FALSE;
2072         gst_data_queue_set_flushing (sq->queue, FALSE);
2073       } else {
2074         sq->srcresult = GST_FLOW_FLUSHING;
2075         sq->last_query = FALSE;
2076         g_cond_signal (&sq->query_handled);
2077         gst_data_queue_set_flushing (sq->queue, TRUE);
2078
2079         /* Wait until streaming thread has finished */
2080         if (mq)
2081           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2082         GST_PAD_STREAM_LOCK (pad);
2083         if (mq)
2084           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2085         gst_data_queue_flush (sq->queue);
2086         if (mq)
2087           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2088         GST_PAD_STREAM_UNLOCK (pad);
2089         if (mq)
2090           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2091       }
2092       res = TRUE;
2093       break;
2094     default:
2095       res = FALSE;
2096       break;
2097   }
2098
2099   if (mq) {
2100     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2101     gst_object_unref (mq);
2102   }
2103
2104   return res;
2105 }
2106
2107 static GstFlowReturn
2108 gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
2109 {
2110   GstSingleQueue *sq;
2111   GstMultiQueue *mq;
2112   guint32 curid;
2113   GstMultiQueueItem *item;
2114   gboolean res = TRUE;
2115   GstFlowReturn flowret = GST_FLOW_OK;
2116   GstEventType type;
2117   GstEvent *sref = NULL;
2118
2119   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2120   mq = (GstMultiQueue *) parent;
2121
2122   type = GST_EVENT_TYPE (event);
2123
2124   switch (type) {
2125     case GST_EVENT_STREAM_START:
2126     {
2127       if (mq->sync_by_running_time) {
2128         GstStreamFlags stream_flags;
2129         gst_event_parse_stream_flags (event, &stream_flags);
2130         if ((stream_flags & GST_STREAM_FLAG_SPARSE)) {
2131           GST_INFO_OBJECT (mq, "SingleQueue %d is a sparse stream", sq->id);
2132           sq->is_sparse = TRUE;
2133         }
2134       }
2135
2136       sq->thread = g_thread_self ();
2137
2138       /* Remove EOS flag */
2139       sq->is_eos = FALSE;
2140       break;
2141     }
2142     case GST_EVENT_FLUSH_START:
2143       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
2144           sq->id);
2145
2146       res = gst_pad_push_event (sq->srcpad, event);
2147
2148       gst_single_queue_flush (mq, sq, TRUE, FALSE);
2149       goto done;
2150
2151     case GST_EVENT_FLUSH_STOP:
2152       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
2153           sq->id);
2154
2155       res = gst_pad_push_event (sq->srcpad, event);
2156
2157       gst_single_queue_flush (mq, sq, FALSE, FALSE);
2158       goto done;
2159
2160     case GST_EVENT_SEGMENT:
2161       sref = gst_event_ref (event);
2162       break;
2163     case GST_EVENT_GAP:
2164       /* take ref because the queue will take ownership and we need the event
2165        * afterwards to update the segment */
2166       sref = gst_event_ref (event);
2167       if (mq->use_interleave) {
2168         GstClockTime val, dur;
2169         GstClockTime stime;
2170         gst_event_parse_gap (event, &val, &dur);
2171         if (GST_CLOCK_TIME_IS_VALID (val)) {
2172           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2173           if (GST_CLOCK_TIME_IS_VALID (dur))
2174             val += dur;
2175           stime = my_segment_to_running_time (&sq->sink_segment, val);
2176           if (GST_CLOCK_STIME_IS_VALID (stime)) {
2177             sq->cached_sinktime = stime;
2178             calculate_interleave (mq);
2179           }
2180           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2181         }
2182       }
2183       break;
2184
2185     default:
2186       if (!(GST_EVENT_IS_SERIALIZED (event))) {
2187         res = gst_pad_push_event (sq->srcpad, event);
2188         goto done;
2189       }
2190       break;
2191   }
2192
2193   /* if eos, we are always full, so avoid hanging incoming indefinitely */
2194   if (sq->is_eos)
2195     goto was_eos;
2196
2197   /* Get an unique incrementing id. */
2198   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2199
2200   item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
2201
2202   GST_DEBUG_OBJECT (mq,
2203       "SingleQueue %d : Enqueuing event %p of type %s with id %d",
2204       sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
2205
2206   if (!gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))
2207     goto flushing;
2208
2209   /* mark EOS when we received one, we must do that after putting the
2210    * buffer in the queue because EOS marks the buffer as filled. */
2211   switch (type) {
2212     case GST_EVENT_EOS:
2213       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2214       sq->is_eos = TRUE;
2215
2216       /* Post an error message if we got EOS while downstream
2217        * has returned an error flow return. After EOS there
2218        * will be no further buffer which could propagate the
2219        * error upstream */
2220       if (sq->srcresult < GST_FLOW_EOS) {
2221         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2222         GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
2223       } else {
2224         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2225       }
2226
2227       /* EOS affects the buffering state */
2228       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2229       update_buffering (mq, sq);
2230       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2231       single_queue_overrun_cb (sq->queue, sq);
2232       gst_multi_queue_post_buffering (mq);
2233       break;
2234     case GST_EVENT_SEGMENT:
2235       apply_segment (mq, sq, sref, &sq->sink_segment);
2236       gst_event_unref (sref);
2237       /* a new segment allows us to accept more buffers if we got EOS
2238        * from downstream */
2239       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2240       if (sq->srcresult == GST_FLOW_EOS)
2241         sq->srcresult = GST_FLOW_OK;
2242       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2243       break;
2244     case GST_EVENT_GAP:
2245       sq->active = TRUE;
2246       apply_gap (mq, sq, sref, &sq->sink_segment);
2247       gst_event_unref (sref);
2248     default:
2249       break;
2250   }
2251
2252 done:
2253   if (res == FALSE)
2254     flowret = GST_FLOW_ERROR;
2255   GST_DEBUG_OBJECT (mq, "SingleQueue %d : returning %s", sq->id,
2256       gst_flow_get_name (flowret));
2257   return flowret;
2258
2259 flushing:
2260   {
2261     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
2262         sq->id, gst_flow_get_name (sq->srcresult));
2263     if (sref)
2264       gst_event_unref (sref);
2265     gst_multi_queue_item_destroy (item);
2266     return sq->srcresult;
2267   }
2268 was_eos:
2269   {
2270     GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return GST_FLOW_EOS");
2271     gst_event_unref (event);
2272     return GST_FLOW_EOS;
2273   }
2274 }
2275
2276 static gboolean
2277 gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
2278 {
2279   gboolean res;
2280   GstSingleQueue *sq;
2281   GstMultiQueue *mq;
2282
2283   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2284   mq = (GstMultiQueue *) parent;
2285
2286   switch (GST_QUERY_TYPE (query)) {
2287     default:
2288       if (GST_QUERY_IS_SERIALIZED (query)) {
2289         guint32 curid;
2290         GstMultiQueueItem *item;
2291
2292         GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2293         if (sq->srcresult != GST_FLOW_OK)
2294           goto out_flushing;
2295
2296         /* serialized events go in the queue. We need to be certain that we
2297          * don't cause deadlocks waiting for the query return value. We check if
2298          * the queue is empty (nothing is blocking downstream and the query can
2299          * be pushed for sure) or we are not buffering. If we are buffering,
2300          * the pipeline waits to unblock downstream until our queue fills up
2301          * completely, which can not happen if we block on the query..
2302          * Therefore we only potentially block when we are not buffering. */
2303         if (!mq->use_buffering || gst_data_queue_is_empty (sq->queue)) {
2304           /* Get an unique incrementing id. */
2305           curid = g_atomic_int_add ((gint *) & mq->counter, 1);
2306
2307           item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
2308
2309           GST_DEBUG_OBJECT (mq,
2310               "SingleQueue %d : Enqueuing query %p of type %s with id %d",
2311               sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
2312           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2313           res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
2314           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2315           if (!res || sq->flushing)
2316             goto out_flushing;
2317           /* it might be that the query has been taken out of the queue
2318            * while we were unlocked. So, we need to check if the last
2319            * handled query is the same one than the one we just
2320            * pushed. If it is, we don't need to wait for the condition
2321            * variable, otherwise we wait for the condition variable to
2322            * be signaled. */
2323           if (sq->last_handled_query != query)
2324             g_cond_wait (&sq->query_handled, &mq->qlock);
2325           res = sq->last_query;
2326           sq->last_handled_query = NULL;
2327         } else {
2328           GST_DEBUG_OBJECT (mq, "refusing query, we are buffering and the "
2329               "queue is not empty");
2330           res = FALSE;
2331         }
2332         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2333       } else {
2334         /* default handling */
2335         res = gst_pad_query_default (pad, parent, query);
2336       }
2337       break;
2338   }
2339   return res;
2340
2341 out_flushing:
2342   {
2343     GST_DEBUG_OBJECT (mq, "Flushing");
2344     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2345     return FALSE;
2346   }
2347 }
2348
2349 static gboolean
2350 gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
2351     GstPadMode mode, gboolean active)
2352 {
2353   GstMultiQueue *mq;
2354   GstSingleQueue *sq;
2355   gboolean result;
2356
2357   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2358   mq = sq->mqueue;
2359
2360   GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
2361
2362   switch (mode) {
2363     case GST_PAD_MODE_PUSH:
2364       if (active) {
2365         result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
2366       } else {
2367         result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
2368         /* make sure streaming finishes */
2369         result |= gst_pad_stop_task (pad);
2370       }
2371       break;
2372     default:
2373       result = FALSE;
2374       break;
2375   }
2376   return result;
2377 }
2378
2379 static gboolean
2380 gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2381 {
2382   GstSingleQueue *sq = gst_pad_get_element_private (pad);
2383   GstMultiQueue *mq = sq->mqueue;
2384   gboolean ret;
2385
2386   switch (GST_EVENT_TYPE (event)) {
2387     case GST_EVENT_RECONFIGURE:
2388       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2389       if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2390         sq->srcresult = GST_FLOW_OK;
2391         g_cond_signal (&sq->turn);
2392       }
2393       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2394
2395       ret = gst_pad_push_event (sq->sinkpad, event);
2396       break;
2397     default:
2398       ret = gst_pad_push_event (sq->sinkpad, event);
2399       break;
2400   }
2401
2402   return ret;
2403 }
2404
2405 static gboolean
2406 gst_multi_queue_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2407 {
2408   gboolean res;
2409
2410   /* FIXME, Handle position offset depending on queue size */
2411   switch (GST_QUERY_TYPE (query)) {
2412     default:
2413       /* default handling */
2414       res = gst_pad_query_default (pad, parent, query);
2415       break;
2416   }
2417   return res;
2418 }
2419
2420 /*
2421  * Next-non-linked functions
2422  */
2423
2424 /* WITH LOCK TAKEN */
2425 static void
2426 wake_up_next_non_linked (GstMultiQueue * mq)
2427 {
2428   GList *tmp;
2429
2430   /* maybe no-one is waiting */
2431   if (mq->numwaiting < 1)
2432     return;
2433
2434   if (mq->sync_by_running_time && GST_CLOCK_STIME_IS_VALID (mq->high_time)) {
2435     /* Else figure out which singlequeue(s) need waking up */
2436     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2437       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2438       if (sq->srcresult == GST_FLOW_NOT_LINKED
2439           && GST_CLOCK_STIME_IS_VALID (sq->group_high_time)
2440           && GST_CLOCK_STIME_IS_VALID (sq->next_time)
2441           && sq->next_time <= sq->group_high_time) {
2442         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2443         g_cond_signal (&sq->turn);
2444       }
2445     }
2446   } else {
2447     /* Else figure out which singlequeue(s) need waking up */
2448     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2449       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2450       if (sq->srcresult == GST_FLOW_NOT_LINKED &&
2451           sq->nextid != 0 && sq->nextid <= mq->highid) {
2452         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2453         g_cond_signal (&sq->turn);
2454       }
2455     }
2456   }
2457 }
2458
2459 /* WITH LOCK TAKEN */
2460 static void
2461 compute_high_id (GstMultiQueue * mq)
2462 {
2463   /* The high-id is either the highest id among the linked pads, or if all
2464    * pads are not-linked, it's the lowest not-linked pad */
2465   GList *tmp;
2466   guint32 lowest = G_MAXUINT32;
2467   guint32 highid = G_MAXUINT32;
2468
2469   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2470     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2471
2472     GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
2473         sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
2474
2475     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2476       /* No need to consider queues which are not waiting */
2477       if (sq->nextid == 0) {
2478         GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
2479         continue;
2480       }
2481
2482       if (sq->nextid < lowest)
2483         lowest = sq->nextid;
2484     } else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) {
2485       /* If we don't have a global highid, or the global highid is lower than
2486        * this single queue's last outputted id, store the queue's one, 
2487        * unless the singlequeue output is at EOS */
2488       if ((highid == G_MAXUINT32) || (sq->oldid > highid))
2489         highid = sq->oldid;
2490     }
2491   }
2492
2493   if (highid == G_MAXUINT32 || lowest < highid)
2494     mq->highid = lowest;
2495   else
2496     mq->highid = highid;
2497
2498   GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
2499       lowest);
2500 }
2501
2502 /* WITH LOCK TAKEN */
2503 static void
2504 compute_high_time (GstMultiQueue * mq, guint groupid)
2505 {
2506   /* The high-time is either the highest last time among the linked
2507    * pads, or if all pads are not-linked, it's the lowest nex time of
2508    * not-linked pad */
2509   GList *tmp;
2510   GstClockTimeDiff highest = GST_CLOCK_STIME_NONE;
2511   GstClockTimeDiff lowest = GST_CLOCK_STIME_NONE;
2512   GstClockTimeDiff group_high = GST_CLOCK_STIME_NONE;
2513   GstClockTimeDiff group_low = GST_CLOCK_STIME_NONE;
2514   GstClockTimeDiff res;
2515   /* Number of streams which belong to groupid */
2516   guint group_count = 0;
2517
2518   if (!mq->sync_by_running_time)
2519     /* return GST_CLOCK_STIME_NONE; */
2520     return;
2521
2522   for (tmp = mq->queues; tmp; tmp = tmp->next) {
2523     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2524
2525     GST_LOG_OBJECT (mq,
2526         "inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT
2527         ", last_time:%" GST_STIME_FORMAT ", srcresult:%s", sq->id, sq->groupid,
2528         GST_STIME_ARGS (sq->next_time), GST_STIME_ARGS (sq->last_time),
2529         gst_flow_get_name (sq->srcresult));
2530
2531     if (sq->groupid == groupid)
2532       group_count++;
2533
2534     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2535       /* No need to consider queues which are not waiting */
2536       if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) {
2537         GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
2538         continue;
2539       }
2540
2541       if (lowest == GST_CLOCK_STIME_NONE || sq->next_time < lowest)
2542         lowest = sq->next_time;
2543       if (sq->groupid == groupid && (group_low == GST_CLOCK_STIME_NONE
2544               || sq->next_time < group_low))
2545         group_low = sq->next_time;
2546     } else if (!GST_PAD_IS_EOS (sq->srcpad) && sq->srcresult != GST_FLOW_EOS) {
2547       /* If we don't have a global high time, or the global high time
2548        * is lower than this single queue's last outputted time, store
2549        * the queue's one, unless the singlequeue output is at EOS. */
2550       if (highest == GST_CLOCK_STIME_NONE
2551           || (sq->last_time != GST_CLOCK_STIME_NONE && sq->last_time > highest))
2552         highest = sq->last_time;
2553       if (sq->groupid == groupid && (group_high == GST_CLOCK_STIME_NONE
2554               || (sq->last_time != GST_CLOCK_STIME_NONE
2555                   && sq->last_time > group_high)))
2556         group_high = sq->last_time;
2557     }
2558     GST_LOG_OBJECT (mq,
2559         "highest now %" GST_STIME_FORMAT " lowest %" GST_STIME_FORMAT,
2560         GST_STIME_ARGS (highest), GST_STIME_ARGS (lowest));
2561     if (sq->groupid == groupid)
2562       GST_LOG_OBJECT (mq,
2563           "grouphigh %" GST_STIME_FORMAT " grouplow %" GST_STIME_FORMAT,
2564           GST_STIME_ARGS (group_high), GST_STIME_ARGS (group_low));
2565   }
2566
2567   if (highest == GST_CLOCK_STIME_NONE)
2568     mq->high_time = lowest;
2569   else
2570     mq->high_time = highest;
2571
2572   GST_LOG_OBJECT (mq, "group count %d for groupid %u", group_count, groupid);
2573   GST_LOG_OBJECT (mq,
2574       "High time is now : %" GST_STIME_FORMAT ", lowest non-linked %"
2575       GST_STIME_FORMAT, GST_STIME_ARGS (mq->high_time),
2576       GST_STIME_ARGS (lowest));
2577
2578   /* If there's only one stream of a given type, use the global high */
2579   if (group_count < 2)
2580     res = mq->high_time;
2581   else if (group_high == GST_CLOCK_STIME_NONE)
2582     res = group_low;
2583   else
2584     res = group_high;
2585
2586   for (tmp = mq->queues; tmp; tmp = tmp->next) {
2587     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2588     if (groupid == sq->groupid)
2589       sq->group_high_time = res;
2590   }
2591 }
2592
2593 #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
2594      ((q)->max_size.format) <= (value))
2595
2596 /*
2597  * GstSingleQueue functions
2598  */
2599 static void
2600 single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
2601 {
2602   GstMultiQueue *mq = sq->mqueue;
2603   GList *tmp;
2604   GstDataQueueSize size;
2605   gboolean filled = TRUE;
2606   gboolean empty_found = FALSE;
2607
2608   gst_data_queue_get_level (sq->queue, &size);
2609
2610   GST_LOG_OBJECT (mq,
2611       "Single Queue %d: EOS %d, visible %u/%u, bytes %u/%u, time %"
2612       G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, sq->is_eos, size.visible,
2613       sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time,
2614       sq->max_size.time);
2615
2616   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2617
2618   /* check if we reached the hard time/bytes limits;
2619      time limit is only taken into account for non-sparse streams */
2620   if (sq->is_eos || IS_FILLED (sq, bytes, size.bytes) ||
2621       (!sq->is_sparse && IS_FILLED (sq, time, sq->cur_time))) {
2622     goto done;
2623   }
2624
2625   /* Search for empty queues */
2626   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2627     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
2628
2629     if (oq == sq)
2630       continue;
2631
2632     if (oq->srcresult == GST_FLOW_NOT_LINKED) {
2633       GST_LOG_OBJECT (mq, "Queue %d is not-linked", oq->id);
2634       continue;
2635     }
2636
2637     GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
2638     if (gst_data_queue_is_empty (oq->queue) && !oq->is_sparse) {
2639       GST_LOG_OBJECT (mq, "Queue %d is empty", oq->id);
2640       empty_found = TRUE;
2641       break;
2642     }
2643   }
2644
2645   /* if hard limits are not reached then we allow one more buffer in the full
2646    * queue, but only if any of the other singelqueues are empty */
2647   if (empty_found) {
2648     if (IS_FILLED (sq, visible, size.visible)) {
2649       sq->max_size.visible = size.visible + 1;
2650       GST_DEBUG_OBJECT (mq,
2651           "Bumping single queue %d max visible to %d",
2652           sq->id, sq->max_size.visible);
2653       filled = FALSE;
2654     }
2655   }
2656
2657 done:
2658   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2659
2660   /* Overrun is always forwarded, since this is blocking the upstream element */
2661   if (filled) {
2662     GST_DEBUG_OBJECT (mq, "Queue %d is filled, signalling overrun", sq->id);
2663     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
2664   }
2665 }
2666
2667 static void
2668 single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
2669 {
2670   gboolean empty = TRUE;
2671   GstMultiQueue *mq = sq->mqueue;
2672   GList *tmp;
2673
2674   if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2675     GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id);
2676     return;
2677   } else {
2678     GST_LOG_OBJECT (mq,
2679         "Single Queue %d is empty, Checking other single queues", sq->id);
2680   }
2681
2682   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2683   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2684     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
2685
2686     if (gst_data_queue_is_full (oq->queue)) {
2687       GstDataQueueSize size;
2688
2689       gst_data_queue_get_level (oq->queue, &size);
2690       if (IS_FILLED (oq, visible, size.visible)) {
2691         oq->max_size.visible = size.visible + 1;
2692         GST_DEBUG_OBJECT (mq,
2693             "queue %d is filled, bumping its max visible to %d", oq->id,
2694             oq->max_size.visible);
2695         gst_data_queue_limits_changed (oq->queue);
2696       }
2697     }
2698     if (!gst_data_queue_is_empty (oq->queue) || oq->is_sparse)
2699       empty = FALSE;
2700   }
2701   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2702
2703   if (empty) {
2704     GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
2705     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
2706   }
2707 }
2708
2709 static gboolean
2710 single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
2711     guint64 time, GstSingleQueue * sq)
2712 {
2713   gboolean res;
2714   GstMultiQueue *mq = sq->mqueue;
2715
2716   GST_DEBUG_OBJECT (mq,
2717       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
2718       G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
2719       sq->max_size.bytes, sq->cur_time, sq->max_size.time);
2720
2721   /* we are always filled on EOS */
2722   if (sq->is_eos)
2723     return TRUE;
2724
2725   /* we never go past the max visible items unless we are in buffering mode */
2726   if (!mq->use_buffering && IS_FILLED (sq, visible, visible))
2727     return TRUE;
2728
2729   /* check time or bytes */
2730   res = IS_FILLED (sq, bytes, bytes);
2731   /* We only care about limits in time if we're not a sparse stream or
2732    * we're not syncing by running time */
2733   if (!sq->is_sparse || !mq->sync_by_running_time) {
2734     /* If unlinked, take into account the extra unlinked cache time */
2735     if (mq->sync_by_running_time && sq->srcresult == GST_FLOW_NOT_LINKED) {
2736       if (sq->cur_time > mq->unlinked_cache_time)
2737         res |= IS_FILLED (sq, time, sq->cur_time - mq->unlinked_cache_time);
2738       else
2739         res = FALSE;
2740     } else
2741       res |= IS_FILLED (sq, time, sq->cur_time);
2742   }
2743
2744   return res;
2745 }
2746
2747 static void
2748 gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
2749 {
2750   GstDataQueueItem *sitem;
2751   GstMultiQueueItem *mitem;
2752   gboolean was_flushing = FALSE;
2753
2754   while (!gst_data_queue_is_empty (sq->queue)) {
2755     GstMiniObject *data;
2756
2757     /* FIXME: If this fails here although the queue is not empty,
2758      * we're flushing... but we want to rescue all sticky
2759      * events nonetheless.
2760      */
2761     if (!gst_data_queue_pop (sq->queue, &sitem)) {
2762       was_flushing = TRUE;
2763       gst_data_queue_set_flushing (sq->queue, FALSE);
2764       continue;
2765     }
2766
2767     mitem = (GstMultiQueueItem *) sitem;
2768
2769     data = sitem->object;
2770
2771     if (!full && !mitem->is_query && GST_IS_EVENT (data)
2772         && GST_EVENT_IS_STICKY (data)
2773         && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
2774         && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
2775       gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data));
2776     }
2777
2778     sitem->destroy (sitem);
2779   }
2780
2781   gst_data_queue_flush (sq->queue);
2782   if (was_flushing)
2783     gst_data_queue_set_flushing (sq->queue, TRUE);
2784
2785   GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue);
2786   update_buffering (sq->mqueue, sq);
2787   GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue);
2788   gst_multi_queue_post_buffering (sq->mqueue);
2789 }
2790
2791 static void
2792 gst_single_queue_free (GstSingleQueue * sq)
2793 {
2794   /* DRAIN QUEUE */
2795   gst_data_queue_flush (sq->queue);
2796   g_object_unref (sq->queue);
2797   g_cond_clear (&sq->turn);
2798   g_cond_clear (&sq->query_handled);
2799   g_free (sq);
2800 }
2801
2802 static GstSingleQueue *
2803 gst_single_queue_new (GstMultiQueue * mqueue, guint id)
2804 {
2805   GstSingleQueue *sq;
2806   GstMultiQueuePad *mqpad;
2807   GstPadTemplate *templ;
2808   gchar *name;
2809   GList *tmp;
2810   guint temp_id = (id == -1) ? 0 : id;
2811
2812   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
2813
2814   /* Find an unused queue ID, if possible the passed one */
2815   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
2816     GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
2817     /* This works because the IDs are sorted in ascending order */
2818     if (sq2->id == temp_id) {
2819       /* If this ID was requested by the caller return NULL,
2820        * otherwise just get us the next one */
2821       if (id == -1) {
2822         temp_id = sq2->id + 1;
2823       } else {
2824         GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
2825         return NULL;
2826       }
2827     } else if (sq2->id > temp_id) {
2828       break;
2829     }
2830   }
2831
2832   sq = g_new0 (GstSingleQueue, 1);
2833   mqueue->nbqueues++;
2834   sq->id = temp_id;
2835   sq->groupid = DEFAULT_PAD_GROUP_ID;
2836   sq->group_high_time = GST_CLOCK_STIME_NONE;
2837
2838   mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
2839   mqueue->queues_cookie++;
2840
2841   /* copy over max_size and extra_size so we don't need to take the lock
2842    * any longer when checking if the queue is full. */
2843   sq->max_size.visible = mqueue->max_size.visible;
2844   sq->max_size.bytes = mqueue->max_size.bytes;
2845   sq->max_size.time = mqueue->max_size.time;
2846
2847   sq->extra_size.visible = mqueue->extra_size.visible;
2848   sq->extra_size.bytes = mqueue->extra_size.bytes;
2849   sq->extra_size.time = mqueue->extra_size.time;
2850
2851   GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
2852
2853   sq->mqueue = mqueue;
2854   sq->srcresult = GST_FLOW_FLUSHING;
2855   sq->pushed = FALSE;
2856   sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
2857       single_queue_check_full,
2858       (GstDataQueueFullCallback) single_queue_overrun_cb,
2859       (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
2860   sq->is_eos = FALSE;
2861   sq->is_sparse = FALSE;
2862   sq->flushing = FALSE;
2863   sq->active = FALSE;
2864   gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
2865   gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
2866
2867   sq->nextid = 0;
2868   sq->oldid = 0;
2869   sq->next_time = GST_CLOCK_STIME_NONE;
2870   sq->last_time = GST_CLOCK_STIME_NONE;
2871   g_cond_init (&sq->turn);
2872   g_cond_init (&sq->query_handled);
2873
2874   sq->sinktime = GST_CLOCK_STIME_NONE;
2875   sq->srctime = GST_CLOCK_STIME_NONE;
2876   sq->sink_tainted = TRUE;
2877   sq->src_tainted = TRUE;
2878
2879   name = g_strdup_printf ("sink_%u", sq->id);
2880   templ = gst_static_pad_template_get (&sinktemplate);
2881   sq->sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
2882       "direction", templ->direction, "template", templ, NULL);
2883   gst_object_unref (templ);
2884   g_free (name);
2885
2886   mqpad = (GstMultiQueuePad *) sq->sinkpad;
2887   mqpad->sq = sq;
2888
2889   gst_pad_set_chain_function (sq->sinkpad,
2890       GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
2891   gst_pad_set_activatemode_function (sq->sinkpad,
2892       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_mode));
2893   gst_pad_set_event_full_function (sq->sinkpad,
2894       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
2895   gst_pad_set_query_function (sq->sinkpad,
2896       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_query));
2897   gst_pad_set_iterate_internal_links_function (sq->sinkpad,
2898       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
2899   GST_OBJECT_FLAG_SET (sq->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
2900
2901   name = g_strdup_printf ("src_%u", sq->id);
2902   sq->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
2903   g_free (name);
2904
2905   gst_pad_set_activatemode_function (sq->srcpad,
2906       GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode));
2907   gst_pad_set_event_function (sq->srcpad,
2908       GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
2909   gst_pad_set_query_function (sq->srcpad,
2910       GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
2911   gst_pad_set_iterate_internal_links_function (sq->srcpad,
2912       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
2913   GST_OBJECT_FLAG_SET (sq->srcpad, GST_PAD_FLAG_PROXY_CAPS);
2914
2915   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
2916   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
2917
2918   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
2919
2920   /* only activate the pads when we are not in the NULL state
2921    * and add the pad under the state_lock to prevend state changes
2922    * between activating and adding */
2923   g_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
2924   if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
2925     gst_pad_set_active (sq->srcpad, TRUE);
2926     gst_pad_set_active (sq->sinkpad, TRUE);
2927   }
2928   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
2929   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
2930   g_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
2931
2932   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
2933       sq->id);
2934
2935   return sq;
2936 }