multiqueue: use new stream-start event sparse flag to avoid overreading subtitles
[platform/upstream/gstreamer.git] / plugins / elements / gstmultiqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
4  * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * gstmultiqueue.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 /**
26  * SECTION:element-multiqueue
27  * @see_also: #GstQueue
28  *
29  * <refsect2>
30  * <para>
31  * Multiqueue is similar to a normal #GstQueue with the following additional
32  * features:
33  * <orderedlist>
34  * <listitem>
35  *   <itemizedlist><title>Multiple streamhandling</title>
36  *   <listitem><para>
37  *     The element handles queueing data on more than one stream at once. To
38  *     achieve such a feature it has request sink pads (sink&percnt;u) and
39  *     'sometimes' src pads (src&percnt;u).
40  *   </para><para>
41  *     When requesting a given sinkpad with gst_element_request_pad(),
42  *     the associated srcpad for that stream will be created.
43  *     Example: requesting sink1 will generate src1.
44  *   </para></listitem>
45  *   </itemizedlist>
46  * </listitem>
47  * <listitem>
48  *   <itemizedlist><title>Non-starvation on multiple streams</title>
49  *   <listitem><para>
50  *     If more than one stream is used with the element, the streams' queues
51  *     will be dynamically grown (up to a limit), in order to ensure that no
52  *     stream is risking data starvation. This guarantees that at any given
53  *     time there are at least N bytes queued and available for each individual
54  *     stream.
55  *   </para><para>
56  *     If an EOS event comes through a srcpad, the associated queue will be
57  *     considered as 'not-empty' in the queue-size-growing algorithm.
58  *   </para></listitem>
59  *   </itemizedlist>
60  * </listitem>
61  * <listitem>
62  *   <itemizedlist><title>Non-linked srcpads graceful handling</title>
63  *   <listitem><para>
64  *     In order to better support dynamic switching between streams, the multiqueue
65  *     (unlike the current GStreamer queue) continues to push buffers on non-linked
66  *     pads rather than shutting down.
67  *   </para><para>
68  *     In addition, to prevent a non-linked stream from very quickly consuming all
69  *     available buffers and thus 'racing ahead' of the other streams, the element
70  *     must ensure that buffers and inlined events for a non-linked stream are pushed
71  *     in the same order as they were received, relative to the other streams
72  *     controlled by the element. This means that a buffer cannot be pushed to a
73  *     non-linked pad any sooner than buffers in any other stream which were received
74  *     before it.
75  *   </para></listitem>
76  *   </itemizedlist>
77  * </listitem>
78  * </orderedlist>
79  * </para>
80  * <para>
81  *   Data is queued until one of the limits specified by the
82  *   #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
83  *   #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
84  *   more buffers into the queue will block the pushing thread until more space
85  *   becomes available. #GstMultiQueue:extra-size-buffers,
86  * </para>
87  * <para>
88  *   #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
89  *   currently unused.
90  * </para>
91  * <para>
92  *   The default queue size limits are 5 buffers, 10MB of data, or
93  *   two second worth of data, whichever is reached first. Note that the number
94  *   of buffers will dynamically grow depending on the fill level of 
95  *   other queues.
96  * </para>
97  * <para>
98  *   The #GstMultiQueue::underrun signal is emitted when all of the queues
99  *   are empty. The #GstMultiQueue::overrun signal is emitted when one of the
100  *   queues is filled.
101  *   Both signals are emitted from the context of the streaming thread.
102  * </para>
103  * </refsect2>
104  */
105
106 #ifdef HAVE_CONFIG_H
107 #  include "config.h"
108 #endif
109
110 #include <gst/gst.h>
111 #include <stdio.h>
112 #include "gstmultiqueue.h"
113 #include <gst/glib-compat-private.h>
114
115 /**
116  * GstSingleQueue:
117  * @sinkpad: associated sink #GstPad
118  * @srcpad: associated source #GstPad
119  *
120  * Structure containing all information and properties about
121  * a single queue.
122  */
123 typedef struct _GstSingleQueue GstSingleQueue;
124
125 struct _GstSingleQueue
126 {
127   /* unique identifier of the queue */
128   guint id;
129
130   GstMultiQueue *mqueue;
131
132   GstPad *sinkpad;
133   GstPad *srcpad;
134
135   /* flowreturn of previous srcpad push */
136   GstFlowReturn srcresult;
137   /* If something was actually pushed on
138    * this pad after flushing/pad activation
139    * and the srcresult corresponds to something
140    * real
141    */
142   gboolean pushed;
143
144   /* segments */
145   GstSegment sink_segment;
146   GstSegment src_segment;
147   gboolean has_src_segment;     /* preferred over initializing the src_segment to
148                                  * UNDEFINED as this doesn't requires adding ifs
149                                  * in every segment usage */
150
151   /* position of src/sink */
152   GstClockTime sinktime, srctime;
153   /* TRUE if either position needs to be recalculated */
154   gboolean sink_tainted, src_tainted;
155
156   /* queue of data */
157   GstDataQueue *queue;
158   GstDataQueueSize max_size, extra_size;
159   GstClockTime cur_time;
160   gboolean is_eos;
161   gboolean is_sparse;
162   gboolean flushing;
163
164   /* Protected by global lock */
165   guint32 nextid;               /* ID of the next object waiting to be pushed */
166   guint32 oldid;                /* ID of the last object pushed (last in a series) */
167   guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
168   GstClockTime next_time;       /* End running time of next buffer to be pushed */
169   GstClockTime last_time;       /* Start running time of last pushed buffer */
170   GCond turn;                   /* SingleQueue turn waiting conditional */
171
172   /* for serialized queries */
173   GCond query_handled;
174   gboolean last_query;
175   GstQuery *last_handled_query;
176 };
177
178
179 /* Extension of GstDataQueueItem structure for our usage */
180 typedef struct _GstMultiQueueItem GstMultiQueueItem;
181
182 struct _GstMultiQueueItem
183 {
184   GstMiniObject *object;
185   guint size;
186   guint64 duration;
187   gboolean visible;
188
189   GDestroyNotify destroy;
190   guint32 posid;
191
192   gboolean is_query;
193 };
194
195 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
196 static void gst_single_queue_free (GstSingleQueue * squeue);
197
198 static void wake_up_next_non_linked (GstMultiQueue * mq);
199 static void compute_high_id (GstMultiQueue * mq);
200 static void compute_high_time (GstMultiQueue * mq);
201 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
202 static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
203
204 static void update_buffering (GstMultiQueue * mq, GstSingleQueue * sq);
205 static void gst_multi_queue_post_buffering (GstMultiQueue * mq);
206
207 static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
208
209 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
210     GST_PAD_SINK,
211     GST_PAD_REQUEST,
212     GST_STATIC_CAPS_ANY);
213
214 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
215     GST_PAD_SRC,
216     GST_PAD_SOMETIMES,
217     GST_STATIC_CAPS_ANY);
218
219 GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
220 #define GST_CAT_DEFAULT (multi_queue_debug)
221
222 /* Signals and args */
223 enum
224 {
225   SIGNAL_UNDERRUN,
226   SIGNAL_OVERRUN,
227   LAST_SIGNAL
228 };
229
230 /* default limits, we try to keep up to 2 seconds of data and if there is not
231  * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
232  * there is data in the queues. Normally, the byte and time limits are not hit
233  * in theses conditions. */
234 #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
235 #define DEFAULT_MAX_SIZE_BUFFERS 5
236 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
237
238 /* second limits. When we hit one of the above limits we are probably dealing
239  * with a badly muxed file and we scale the limits to these emergency values.
240  * This is currently not yet implemented.
241  * Since we dynamically scale the queue buffer size up to the limits but avoid
242  * going above the max-size-buffers when we can, we don't really need this
243  * aditional extra size. */
244 #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
245 #define DEFAULT_EXTRA_SIZE_BUFFERS 5
246 #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
247
248 #define DEFAULT_USE_BUFFERING FALSE
249 #define DEFAULT_LOW_PERCENT   10
250 #define DEFAULT_HIGH_PERCENT  99
251 #define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
252
253 enum
254 {
255   PROP_0,
256   PROP_EXTRA_SIZE_BYTES,
257   PROP_EXTRA_SIZE_BUFFERS,
258   PROP_EXTRA_SIZE_TIME,
259   PROP_MAX_SIZE_BYTES,
260   PROP_MAX_SIZE_BUFFERS,
261   PROP_MAX_SIZE_TIME,
262   PROP_USE_BUFFERING,
263   PROP_LOW_PERCENT,
264   PROP_HIGH_PERCENT,
265   PROP_SYNC_BY_RUNNING_TIME,
266   PROP_LAST
267 };
268
269 #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
270   g_mutex_lock (&q->qlock);                                              \
271 } G_STMT_END
272
273 #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
274   g_mutex_unlock (&q->qlock);                                            \
275 } G_STMT_END
276
277 #define SET_PERCENT(mq, perc) G_STMT_START {                             \
278   if (perc != mq->percent) {                                             \
279     mq->percent = perc;                                                  \
280     mq->percent_changed = TRUE;                                          \
281     GST_DEBUG_OBJECT (mq, "buffering %d percent", perc);                 \
282   }                                                                      \
283 } G_STMT_END
284
285 static void gst_multi_queue_finalize (GObject * object);
286 static void gst_multi_queue_set_property (GObject * object,
287     guint prop_id, const GValue * value, GParamSpec * pspec);
288 static void gst_multi_queue_get_property (GObject * object,
289     guint prop_id, GValue * value, GParamSpec * pspec);
290
291 static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
292     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
293 static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
294 static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
295     element, GstStateChange transition);
296
297 static void gst_multi_queue_loop (GstPad * pad);
298
299 #define _do_init \
300   GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
301 #define gst_multi_queue_parent_class parent_class
302 G_DEFINE_TYPE_WITH_CODE (GstMultiQueue, gst_multi_queue, GST_TYPE_ELEMENT,
303     _do_init);
304
305 static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
306
307 static void
308 gst_multi_queue_class_init (GstMultiQueueClass * klass)
309 {
310   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
311   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
312
313   gobject_class->set_property = gst_multi_queue_set_property;
314   gobject_class->get_property = gst_multi_queue_get_property;
315
316   /* SIGNALS */
317
318   /**
319    * GstMultiQueue::underrun:
320    * @multiqueue: the multiqueue instance
321    *
322    * This signal is emitted from the streaming thread when there is
323    * no data in any of the queues inside the multiqueue instance (underrun).
324    *
325    * This indicates either starvation or EOS from the upstream data sources.
326    */
327   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
328       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
329       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
330       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
331
332   /**
333    * GstMultiQueue::overrun:
334    * @multiqueue: the multiqueue instance
335    *
336    * Reports that one of the queues in the multiqueue is full (overrun).
337    * A queue is full if the total amount of data inside it (num-buffers, time,
338    * size) is higher than the boundary values which can be set through the
339    * GObject properties.
340    *
341    * This can be used as an indicator of pre-roll. 
342    */
343   gst_multi_queue_signals[SIGNAL_OVERRUN] =
344       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
345       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
346       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
347
348   /* PROPERTIES */
349
350   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
351       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
352           "Max. amount of data in the queue (bytes, 0=disable)",
353           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
354           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
355           G_PARAM_STATIC_STRINGS));
356   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
357       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
358           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
359           DEFAULT_MAX_SIZE_BUFFERS,
360           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
361           G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
363       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
364           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
365           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
366           G_PARAM_STATIC_STRINGS));
367
368   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
369       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
370           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
371           " (NOT IMPLEMENTED)",
372           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
373           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
375       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
376           "Amount of buffers the queues can grow if one of them is empty (0=disable)"
377           " (NOT IMPLEMENTED)",
378           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
379           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380   g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
381       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
382           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
383           " (NOT IMPLEMENTED)",
384           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386
387   /**
388    * GstMultiQueue:use-buffering
389    * 
390    * Enable the buffering option in multiqueue so that BUFFERING messages are
391    * emitted based on low-/high-percent thresholds.
392    */
393   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
394       g_param_spec_boolean ("use-buffering", "Use buffering",
395           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
396           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
397           G_PARAM_STATIC_STRINGS));
398   /**
399    * GstMultiQueue:low-percent
400    * 
401    * Low threshold percent for buffering to start.
402    */
403   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
404       g_param_spec_int ("low-percent", "Low percent",
405           "Low threshold for buffering to start", 0, 100,
406           DEFAULT_LOW_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407   /**
408    * GstMultiQueue:high-percent
409    * 
410    * High threshold percent for buffering to finish.
411    */
412   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
413       g_param_spec_int ("high-percent", "High percent",
414           "High threshold for buffering to finish", 0, 100,
415           DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
416
417   /**
418    * GstMultiQueue:sync-by-running-time
419    * 
420    * If enabled multiqueue will synchronize deactivated or not-linked streams
421    * to the activated and linked streams by taking the running time.
422    * Otherwise multiqueue will synchronize the deactivated or not-linked
423    * streams by keeping the order in which buffers and events arrived compared
424    * to active and linked streams.
425    */
426   g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
427       g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
428           "Synchronize deactivated or not-linked streams by running time",
429           DEFAULT_SYNC_BY_RUNNING_TIME,
430           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431
432   gobject_class->finalize = gst_multi_queue_finalize;
433
434   gst_element_class_set_static_metadata (gstelement_class,
435       "MultiQueue",
436       "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
437   gst_element_class_add_pad_template (gstelement_class,
438       gst_static_pad_template_get (&sinktemplate));
439   gst_element_class_add_pad_template (gstelement_class,
440       gst_static_pad_template_get (&srctemplate));
441
442   gstelement_class->request_new_pad =
443       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
444   gstelement_class->release_pad =
445       GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
446   gstelement_class->change_state =
447       GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
448 }
449
450 static void
451 gst_multi_queue_init (GstMultiQueue * mqueue)
452 {
453   mqueue->nbqueues = 0;
454   mqueue->queues = NULL;
455
456   mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
457   mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
458   mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
459
460   mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
461   mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
462   mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
463
464   mqueue->use_buffering = DEFAULT_USE_BUFFERING;
465   mqueue->low_percent = DEFAULT_LOW_PERCENT;
466   mqueue->high_percent = DEFAULT_HIGH_PERCENT;
467
468   mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
469
470   mqueue->counter = 1;
471   mqueue->highid = -1;
472   mqueue->high_time = GST_CLOCK_TIME_NONE;
473
474   g_mutex_init (&mqueue->qlock);
475   g_mutex_init (&mqueue->buffering_post_lock);
476 }
477
478 static void
479 gst_multi_queue_finalize (GObject * object)
480 {
481   GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
482
483   g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
484   g_list_free (mqueue->queues);
485   mqueue->queues = NULL;
486   mqueue->queues_cookie++;
487
488   /* free/unref instance data */
489   g_mutex_clear (&mqueue->qlock);
490   g_mutex_clear (&mqueue->buffering_post_lock);
491
492   G_OBJECT_CLASS (parent_class)->finalize (object);
493 }
494
495 #define SET_CHILD_PROPERTY(mq,format) G_STMT_START {            \
496     GList * tmp = mq->queues;                                   \
497     while (tmp) {                                               \
498       GstSingleQueue *q = (GstSingleQueue*)tmp->data;           \
499       q->max_size.format = mq->max_size.format;                 \
500       update_buffering (mq, q);                                 \
501       gst_data_queue_limits_changed (q->queue);                 \
502       tmp = g_list_next(tmp);                                   \
503     };                                                          \
504 } G_STMT_END
505
506 static void
507 gst_multi_queue_set_property (GObject * object, guint prop_id,
508     const GValue * value, GParamSpec * pspec)
509 {
510   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
511
512   switch (prop_id) {
513     case PROP_MAX_SIZE_BYTES:
514       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
515       mq->max_size.bytes = g_value_get_uint (value);
516       SET_CHILD_PROPERTY (mq, bytes);
517       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
518       gst_multi_queue_post_buffering (mq);
519       break;
520     case PROP_MAX_SIZE_BUFFERS:
521     {
522       GList *tmp;
523       gint new_size = g_value_get_uint (value);
524
525       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
526
527       mq->max_size.visible = new_size;
528
529       tmp = mq->queues;
530       while (tmp) {
531         GstDataQueueSize size;
532         GstSingleQueue *q = (GstSingleQueue *) tmp->data;
533         gst_data_queue_get_level (q->queue, &size);
534
535         GST_DEBUG_OBJECT (mq, "Queue %d: Requested buffers size: %d,"
536             " current: %d, current max %d", q->id, new_size, size.visible,
537             q->max_size.visible);
538
539         /* do not reduce max size below current level if the single queue
540          * has grown because of empty queue */
541         if (new_size == 0) {
542           q->max_size.visible = new_size;
543         } else if (q->max_size.visible == 0) {
544           q->max_size.visible = MAX (new_size, size.visible);
545         } else if (new_size > size.visible) {
546           q->max_size.visible = new_size;
547         }
548         update_buffering (mq, q);
549         gst_data_queue_limits_changed (q->queue);
550         tmp = g_list_next (tmp);
551       }
552
553       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
554       gst_multi_queue_post_buffering (mq);
555
556       break;
557     }
558     case PROP_MAX_SIZE_TIME:
559       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
560       mq->max_size.time = g_value_get_uint64 (value);
561       SET_CHILD_PROPERTY (mq, time);
562       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
563       gst_multi_queue_post_buffering (mq);
564       break;
565     case PROP_EXTRA_SIZE_BYTES:
566       mq->extra_size.bytes = g_value_get_uint (value);
567       break;
568     case PROP_EXTRA_SIZE_BUFFERS:
569       mq->extra_size.visible = g_value_get_uint (value);
570       break;
571     case PROP_EXTRA_SIZE_TIME:
572       mq->extra_size.time = g_value_get_uint64 (value);
573       break;
574     case PROP_USE_BUFFERING:
575       mq->use_buffering = g_value_get_boolean (value);
576       if (!mq->use_buffering && mq->buffering) {
577         GST_MULTI_QUEUE_MUTEX_LOCK (mq);
578         mq->buffering = FALSE;
579         GST_DEBUG_OBJECT (mq, "buffering 100 percent");
580         SET_PERCENT (mq, 100);
581         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
582       }
583
584       if (mq->use_buffering) {
585         GList *tmp;
586
587         GST_MULTI_QUEUE_MUTEX_LOCK (mq);
588
589         tmp = mq->queues;
590         while (tmp) {
591           GstSingleQueue *q = (GstSingleQueue *) tmp->data;
592           update_buffering (mq, q);
593           gst_data_queue_limits_changed (q->queue);
594           tmp = g_list_next (tmp);
595         }
596
597         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
598       }
599       gst_multi_queue_post_buffering (mq);
600       break;
601     case PROP_LOW_PERCENT:
602       mq->low_percent = g_value_get_int (value);
603       break;
604     case PROP_HIGH_PERCENT:
605       mq->high_percent = g_value_get_int (value);
606       break;
607     case PROP_SYNC_BY_RUNNING_TIME:
608       mq->sync_by_running_time = g_value_get_boolean (value);
609       break;
610     default:
611       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
612       break;
613   }
614 }
615
616 static void
617 gst_multi_queue_get_property (GObject * object, guint prop_id,
618     GValue * value, GParamSpec * pspec)
619 {
620   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
621
622   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
623
624   switch (prop_id) {
625     case PROP_EXTRA_SIZE_BYTES:
626       g_value_set_uint (value, mq->extra_size.bytes);
627       break;
628     case PROP_EXTRA_SIZE_BUFFERS:
629       g_value_set_uint (value, mq->extra_size.visible);
630       break;
631     case PROP_EXTRA_SIZE_TIME:
632       g_value_set_uint64 (value, mq->extra_size.time);
633       break;
634     case PROP_MAX_SIZE_BYTES:
635       g_value_set_uint (value, mq->max_size.bytes);
636       break;
637     case PROP_MAX_SIZE_BUFFERS:
638       g_value_set_uint (value, mq->max_size.visible);
639       break;
640     case PROP_MAX_SIZE_TIME:
641       g_value_set_uint64 (value, mq->max_size.time);
642       break;
643     case PROP_USE_BUFFERING:
644       g_value_set_boolean (value, mq->use_buffering);
645       break;
646     case PROP_LOW_PERCENT:
647       g_value_set_int (value, mq->low_percent);
648       break;
649     case PROP_HIGH_PERCENT:
650       g_value_set_int (value, mq->high_percent);
651       break;
652     case PROP_SYNC_BY_RUNNING_TIME:
653       g_value_set_boolean (value, mq->sync_by_running_time);
654       break;
655     default:
656       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
657       break;
658   }
659
660   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
661 }
662
663 static GstIterator *
664 gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
665 {
666   GstIterator *it = NULL;
667   GstPad *opad;
668   GstSingleQueue *squeue;
669   GstMultiQueue *mq = GST_MULTI_QUEUE (parent);
670   GValue val = { 0, };
671
672   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
673   squeue = gst_pad_get_element_private (pad);
674   if (!squeue)
675     goto out;
676
677   if (squeue->sinkpad == pad)
678     opad = gst_object_ref (squeue->srcpad);
679   else if (squeue->srcpad == pad)
680     opad = gst_object_ref (squeue->sinkpad);
681   else
682     goto out;
683
684   g_value_init (&val, GST_TYPE_PAD);
685   g_value_set_object (&val, opad);
686   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
687   g_value_unset (&val);
688
689   gst_object_unref (opad);
690
691 out:
692   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
693
694   return it;
695 }
696
697
698 /*
699  * GstElement methods
700  */
701
702 static GstPad *
703 gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
704     const gchar * name, const GstCaps * caps)
705 {
706   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
707   GstSingleQueue *squeue;
708   guint temp_id = -1;
709
710   if (name) {
711     sscanf (name + 4, "_%u", &temp_id);
712     GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
713   }
714
715   /* Create a new single queue, add the sink and source pad and return the sink pad */
716   squeue = gst_single_queue_new (mqueue, temp_id);
717
718   GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
719       GST_DEBUG_PAD_NAME (squeue->sinkpad));
720
721   return squeue ? squeue->sinkpad : NULL;
722 }
723
724 static void
725 gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
726 {
727   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
728   GstSingleQueue *sq = NULL;
729   GList *tmp;
730
731   GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
732
733   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
734   /* Find which single queue it belongs to, knowing that it should be a sinkpad */
735   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
736     sq = (GstSingleQueue *) tmp->data;
737
738     if (sq->sinkpad == pad)
739       break;
740   }
741
742   if (!tmp) {
743     GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
744     GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
745     return;
746   }
747
748   /* FIXME: The removal of the singlequeue should probably not happen until it
749    * finishes draining */
750
751   /* remove it from the list */
752   mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
753   mqueue->queues_cookie++;
754
755   /* FIXME : recompute next-non-linked */
756   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
757
758   /* delete SingleQueue */
759   gst_data_queue_set_flushing (sq->queue, TRUE);
760
761   gst_pad_set_active (sq->srcpad, FALSE);
762   gst_pad_set_active (sq->sinkpad, FALSE);
763   gst_pad_set_element_private (sq->srcpad, NULL);
764   gst_pad_set_element_private (sq->sinkpad, NULL);
765   gst_element_remove_pad (element, sq->srcpad);
766   gst_element_remove_pad (element, sq->sinkpad);
767   gst_single_queue_free (sq);
768 }
769
770 static GstStateChangeReturn
771 gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
772 {
773   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
774   GstSingleQueue *sq = NULL;
775   GstStateChangeReturn result;
776
777   switch (transition) {
778     case GST_STATE_CHANGE_READY_TO_PAUSED:{
779       GList *tmp;
780
781       /* Set all pads to non-flushing */
782       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
783       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
784         sq = (GstSingleQueue *) tmp->data;
785         sq->flushing = FALSE;
786       }
787
788       /* the visible limit might not have been set on single queues that have grown because of other queueus were empty */
789       SET_CHILD_PROPERTY (mqueue, visible);
790
791       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
792       gst_multi_queue_post_buffering (mqueue);
793
794       break;
795     }
796     case GST_STATE_CHANGE_PAUSED_TO_READY:{
797       GList *tmp;
798
799       /* Un-wait all waiting pads */
800       GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
801       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
802         sq = (GstSingleQueue *) tmp->data;
803         sq->flushing = TRUE;
804         g_cond_signal (&sq->turn);
805
806         sq->last_query = FALSE;
807         g_cond_signal (&sq->query_handled);
808       }
809       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
810       break;
811     }
812     default:
813       break;
814   }
815
816   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
817
818   switch (transition) {
819     default:
820       break;
821   }
822
823   return result;
824 }
825
826 static gboolean
827 gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
828     gboolean full)
829 {
830   gboolean result;
831
832   GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
833       sq->id);
834
835   if (flush) {
836     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
837     sq->srcresult = GST_FLOW_FLUSHING;
838     gst_data_queue_set_flushing (sq->queue, TRUE);
839
840     sq->flushing = TRUE;
841
842     /* wake up non-linked task */
843     GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
844         sq->id);
845     g_cond_signal (&sq->turn);
846     sq->last_query = FALSE;
847     g_cond_signal (&sq->query_handled);
848     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
849
850     GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
851     result = gst_pad_pause_task (sq->srcpad);
852     sq->sink_tainted = sq->src_tainted = TRUE;
853   } else {
854     gst_single_queue_flush_queue (sq, full);
855
856     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
857     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
858     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
859     sq->has_src_segment = FALSE;
860     /* All pads start off not-linked for a smooth kick-off */
861     sq->srcresult = GST_FLOW_OK;
862     sq->pushed = FALSE;
863     sq->cur_time = 0;
864     sq->max_size.visible = mq->max_size.visible;
865     sq->is_eos = FALSE;
866     sq->nextid = 0;
867     sq->oldid = 0;
868     sq->last_oldid = G_MAXUINT32;
869     sq->next_time = GST_CLOCK_TIME_NONE;
870     sq->last_time = GST_CLOCK_TIME_NONE;
871     gst_data_queue_set_flushing (sq->queue, FALSE);
872
873     /* Reset high time to be recomputed next */
874     mq->high_time = GST_CLOCK_TIME_NONE;
875
876     sq->flushing = FALSE;
877     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
878
879     GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
880     result =
881         gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
882         sq->srcpad, NULL);
883   }
884   return result;
885 }
886
887 /* WITH LOCK TAKEN */
888 static gint
889 get_percentage (GstSingleQueue * sq)
890 {
891   GstDataQueueSize size;
892   gint percent, tmp;
893
894   gst_data_queue_get_level (sq->queue, &size);
895
896   GST_DEBUG_OBJECT (sq->mqueue,
897       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
898       G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
899       size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
900
901   /* get bytes and time percentages and take the max */
902   if (sq->is_eos || sq->srcresult == GST_FLOW_NOT_LINKED || sq->is_sparse) {
903     percent = 100;
904   } else {
905     percent = 0;
906     if (sq->max_size.time > 0) {
907       tmp = (sq->cur_time * 100) / sq->max_size.time;
908       percent = MAX (percent, tmp);
909     }
910     if (sq->max_size.bytes > 0) {
911       tmp = (size.bytes * 100) / sq->max_size.bytes;
912       percent = MAX (percent, tmp);
913     }
914   }
915
916   return percent;
917 }
918
919 /* WITH LOCK TAKEN */
920 static void
921 update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
922 {
923   gint percent;
924
925   /* nothing to dowhen we are not in buffering mode */
926   if (!mq->use_buffering)
927     return;
928
929   percent = get_percentage (sq);
930
931   if (mq->buffering) {
932     if (percent >= mq->high_percent) {
933       mq->buffering = FALSE;
934     }
935     /* make sure it increases */
936     percent = MAX (mq->percent, percent);
937
938     SET_PERCENT (mq, percent);
939   } else {
940     GList *iter;
941     gboolean is_buffering = TRUE;
942
943     for (iter = mq->queues; iter; iter = g_list_next (iter)) {
944       GstSingleQueue *oq = (GstSingleQueue *) iter->data;
945
946       if (get_percentage (oq) >= mq->high_percent) {
947         is_buffering = FALSE;
948
949         break;
950       }
951     }
952
953     if (is_buffering && percent < mq->low_percent) {
954       mq->buffering = TRUE;
955       SET_PERCENT (mq, percent);
956     }
957   }
958 }
959
960 static void
961 gst_multi_queue_post_buffering (GstMultiQueue * mq)
962 {
963   GstMessage *msg = NULL;
964
965   g_mutex_lock (&mq->buffering_post_lock);
966   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
967   if (mq->percent_changed) {
968     gint percent = mq->percent;
969
970     mq->percent_changed = FALSE;
971
972     percent = percent * 100 / mq->high_percent;
973     /* clip */
974     if (percent > 100)
975       percent = 100;
976
977     GST_DEBUG_OBJECT (mq, "Going to post buffering: %d%%", percent);
978     msg = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
979   }
980   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
981
982   if (msg != NULL)
983     gst_element_post_message (GST_ELEMENT_CAST (mq), msg);
984
985   g_mutex_unlock (&mq->buffering_post_lock);
986 }
987
988 /* calculate the diff between running time on the sink and src of the queue.
989  * This is the total amount of time in the queue. 
990  * WITH LOCK TAKEN */
991 static void
992 update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
993 {
994   gint64 sink_time, src_time;
995
996   if (sq->sink_tainted) {
997     sink_time = sq->sinktime =
998         gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
999         sq->sink_segment.position);
1000
1001     GST_DEBUG_OBJECT (mq,
1002         "queue %d sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%"
1003         GST_TIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position),
1004         GST_TIME_ARGS (sink_time));
1005
1006     if (G_UNLIKELY (sq->last_time == GST_CLOCK_TIME_NONE)) {
1007       /* If the single queue still doesn't have a last_time set, this means
1008        * that nothing has been pushed out yet.
1009        * In order for the high_time computation to be as efficient as possible,
1010        * we set the last_time */
1011       sq->last_time = sink_time;
1012     }
1013     if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE)) {
1014       /* if we have a time, we become untainted and use the time */
1015       sq->sink_tainted = FALSE;
1016   } else
1017     sink_time = sq->sinktime;
1018
1019   if (sq->src_tainted) {
1020     GstSegment *segment;
1021     gint64 position;
1022
1023     if (sq->has_src_segment) {
1024       segment = &sq->src_segment;
1025       position = sq->src_segment.position;
1026     } else {
1027       /*
1028        * If the src pad had no segment yet, use the sink segment
1029        * to avoid signalling overrun if the received sink segment has a
1030        * a position > max-size-time while the src pad time would be the default=0
1031        *
1032        * This can happen when switching pads on chained/adaptive streams and the
1033        * new chain has a segment with a much larger position
1034        */
1035       segment = &sq->sink_segment;
1036       position = sq->sink_segment.position;
1037     }
1038
1039     src_time = sq->srctime =
1040         gst_segment_to_running_time (segment, GST_FORMAT_TIME, position);
1041     /* if we have a time, we become untainted and use the time */
1042     if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
1043       sq->src_tainted = FALSE;
1044   } else
1045     src_time = sq->srctime;
1046
1047   GST_DEBUG_OBJECT (mq,
1048       "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
1049       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
1050
1051   /* This allows for streams with out of order timestamping - sometimes the
1052    * emerging timestamp is later than the arriving one(s) */
1053   if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time))
1054     sq->cur_time = sink_time - src_time;
1055   else
1056     sq->cur_time = 0;
1057
1058   /* updating the time level can change the buffering state */
1059   update_buffering (mq, sq);
1060
1061   return;
1062 }
1063
1064 /* take a SEGMENT event and apply the values to segment, updating the time
1065  * level of queue. */
1066 static void
1067 apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1068     GstSegment * segment)
1069 {
1070   gst_event_copy_segment (event, segment);
1071
1072   /* now configure the values, we use these to track timestamps on the
1073    * sinkpad. */
1074   if (segment->format != GST_FORMAT_TIME) {
1075     /* non-time format, pretent the current time segment is closed with a
1076      * 0 start and unknown stop time. */
1077     segment->format = GST_FORMAT_TIME;
1078     segment->start = 0;
1079     segment->stop = -1;
1080     segment->time = 0;
1081   }
1082   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1083
1084   if (segment == &sq->sink_segment)
1085     sq->sink_tainted = TRUE;
1086   else {
1087     sq->has_src_segment = TRUE;
1088     sq->src_tainted = TRUE;
1089   }
1090
1091   GST_DEBUG_OBJECT (mq,
1092       "queue %d, configured SEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
1093
1094   /* segment can update the time level of the queue */
1095   update_time_level (mq, sq);
1096
1097   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1098   gst_multi_queue_post_buffering (mq);
1099 }
1100
1101 /* take a buffer and update segment, updating the time level of the queue. */
1102 static void
1103 apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
1104     GstClockTime duration, GstSegment * segment)
1105 {
1106   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1107
1108   /* if no timestamp is set, assume it's continuous with the previous 
1109    * time */
1110   if (timestamp == GST_CLOCK_TIME_NONE)
1111     timestamp = segment->position;
1112
1113   /* add duration */
1114   if (duration != GST_CLOCK_TIME_NONE)
1115     timestamp += duration;
1116
1117   GST_DEBUG_OBJECT (mq, "queue %d, position updated to %" GST_TIME_FORMAT,
1118       sq->id, GST_TIME_ARGS (timestamp));
1119
1120   segment->position = timestamp;
1121
1122   if (segment == &sq->sink_segment)
1123     sq->sink_tainted = TRUE;
1124   else
1125     sq->src_tainted = TRUE;
1126
1127   /* calc diff with other end */
1128   update_time_level (mq, sq);
1129   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1130   gst_multi_queue_post_buffering (mq);
1131 }
1132
1133 static void
1134 apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
1135     GstSegment * segment)
1136 {
1137   GstClockTime timestamp;
1138   GstClockTime duration;
1139
1140   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1141
1142   gst_event_parse_gap (event, &timestamp, &duration);
1143
1144   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1145
1146     if (GST_CLOCK_TIME_IS_VALID (duration)) {
1147       timestamp += duration;
1148     }
1149
1150     segment->position = timestamp;
1151
1152     if (segment == &sq->sink_segment)
1153       sq->sink_tainted = TRUE;
1154     else
1155       sq->src_tainted = TRUE;
1156
1157     /* calc diff with other end */
1158     update_time_level (mq, sq);
1159   }
1160
1161   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1162   gst_multi_queue_post_buffering (mq);
1163 }
1164
1165 static GstClockTime
1166 get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
1167 {
1168   GstClockTime time = GST_CLOCK_TIME_NONE;
1169
1170   if (GST_IS_BUFFER (object)) {
1171     GstBuffer *buf = GST_BUFFER_CAST (object);
1172     GstClockTime btime = GST_BUFFER_DTS_OR_PTS (buf);
1173
1174     if (GST_CLOCK_TIME_IS_VALID (btime)) {
1175       if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1176         btime += GST_BUFFER_DURATION (buf);
1177       if (btime > segment->stop)
1178         btime = segment->stop;
1179       time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, btime);
1180     }
1181   } else if (GST_IS_BUFFER_LIST (object)) {
1182     GstBufferList *list = GST_BUFFER_LIST_CAST (object);
1183     gint i, n;
1184     GstBuffer *buf;
1185
1186     n = gst_buffer_list_length (list);
1187     for (i = 0; i < n; i++) {
1188       GstClockTime btime;
1189       buf = gst_buffer_list_get (list, i);
1190       btime = GST_BUFFER_DTS_OR_PTS (buf);
1191       if (GST_CLOCK_TIME_IS_VALID (btime)) {
1192         if (end && GST_BUFFER_DURATION_IS_VALID (buf))
1193           btime += GST_BUFFER_DURATION (buf);
1194         if (time > segment->stop)
1195           btime = segment->stop;
1196         time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, btime);
1197         if (!end)
1198           goto done;
1199       } else if (!end) {
1200         goto done;
1201       }
1202     }
1203   } else if (GST_IS_EVENT (object)) {
1204     GstEvent *event = GST_EVENT_CAST (object);
1205
1206     /* For newsegment events return the running time of the start position */
1207     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1208       const GstSegment *new_segment;
1209
1210       gst_event_parse_segment (event, &new_segment);
1211       if (new_segment->format == GST_FORMAT_TIME) {
1212         time =
1213             gst_segment_to_running_time (new_segment, GST_FORMAT_TIME,
1214             new_segment->start);
1215       }
1216     }
1217   }
1218
1219 done:
1220   return time;
1221 }
1222
1223 static GstFlowReturn
1224 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
1225     GstMiniObject * object, gboolean * allow_drop)
1226 {
1227   GstFlowReturn result = sq->srcresult;
1228
1229   if (GST_IS_BUFFER (object)) {
1230     GstBuffer *buffer;
1231     GstClockTime timestamp, duration;
1232
1233     buffer = GST_BUFFER_CAST (object);
1234     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1235     duration = GST_BUFFER_DURATION (buffer);
1236
1237     apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
1238
1239     /* Applying the buffer may have made the queue non-full again, unblock it if needed */
1240     gst_data_queue_limits_changed (sq->queue);
1241
1242     if (G_UNLIKELY (*allow_drop)) {
1243       GST_DEBUG_OBJECT (mq,
1244           "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
1245           sq->id, buffer, GST_TIME_ARGS (timestamp));
1246       gst_buffer_unref (buffer);
1247     } else {
1248       GST_DEBUG_OBJECT (mq,
1249           "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
1250           sq->id, buffer, GST_TIME_ARGS (timestamp));
1251       result = gst_pad_push (sq->srcpad, buffer);
1252     }
1253   } else if (GST_IS_EVENT (object)) {
1254     GstEvent *event;
1255
1256     event = GST_EVENT_CAST (object);
1257
1258     switch (GST_EVENT_TYPE (event)) {
1259       case GST_EVENT_EOS:
1260         result = GST_FLOW_EOS;
1261         if (G_UNLIKELY (*allow_drop))
1262           *allow_drop = FALSE;
1263         break;
1264       case GST_EVENT_SEGMENT:
1265         apply_segment (mq, sq, event, &sq->src_segment);
1266         /* Applying the segment may have made the queue non-full again, unblock it if needed */
1267         gst_data_queue_limits_changed (sq->queue);
1268         if (G_UNLIKELY (*allow_drop)) {
1269           result = GST_FLOW_OK;
1270           *allow_drop = FALSE;
1271         }
1272         break;
1273       case GST_EVENT_GAP:
1274         apply_gap (mq, sq, event, &sq->src_segment);
1275         /* Applying the gap may have made the queue non-full again, unblock it if needed */
1276         gst_data_queue_limits_changed (sq->queue);
1277         break;
1278       default:
1279         break;
1280     }
1281
1282     if (G_UNLIKELY (*allow_drop)) {
1283       GST_DEBUG_OBJECT (mq,
1284           "SingleQueue %d : Dropping EOS event %p of type %s",
1285           sq->id, event, GST_EVENT_TYPE_NAME (event));
1286       gst_event_unref (event);
1287     } else {
1288       GST_DEBUG_OBJECT (mq,
1289           "SingleQueue %d : Pushing event %p of type %s",
1290           sq->id, event, GST_EVENT_TYPE_NAME (event));
1291
1292       gst_pad_push_event (sq->srcpad, event);
1293     }
1294   } else if (GST_IS_QUERY (object)) {
1295     GstQuery *query;
1296     gboolean res;
1297
1298     query = GST_QUERY_CAST (object);
1299
1300     if (G_UNLIKELY (*allow_drop)) {
1301       GST_DEBUG_OBJECT (mq,
1302           "SingleQueue %d : Dropping EOS query %p", sq->id, query);
1303       gst_query_unref (query);
1304       res = FALSE;
1305     } else {
1306       res = gst_pad_peer_query (sq->srcpad, query);
1307     }
1308
1309     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1310     sq->last_query = res;
1311     sq->last_handled_query = query;
1312     g_cond_signal (&sq->query_handled);
1313     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1314   } else {
1315     g_warning ("Unexpected object in singlequeue %u (refcounting problem?)",
1316         sq->id);
1317   }
1318   return result;
1319
1320   /* ERRORS */
1321 }
1322
1323 static GstMiniObject *
1324 gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
1325 {
1326   GstMiniObject *res;
1327
1328   res = item->object;
1329   item->object = NULL;
1330
1331   return res;
1332 }
1333
1334 static void
1335 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
1336 {
1337   if (!item->is_query && item->object)
1338     gst_mini_object_unref (item->object);
1339   g_slice_free (GstMultiQueueItem, item);
1340 }
1341
1342 /* takes ownership of passed mini object! */
1343 static GstMultiQueueItem *
1344 gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
1345 {
1346   GstMultiQueueItem *item;
1347
1348   item = g_slice_new (GstMultiQueueItem);
1349   item->object = object;
1350   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1351   item->posid = curid;
1352   item->is_query = GST_IS_QUERY (object);
1353
1354   item->size = gst_buffer_get_size (GST_BUFFER_CAST (object));
1355   item->duration = GST_BUFFER_DURATION (object);
1356   if (item->duration == GST_CLOCK_TIME_NONE)
1357     item->duration = 0;
1358   item->visible = TRUE;
1359   return item;
1360 }
1361
1362 static GstMultiQueueItem *
1363 gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
1364 {
1365   GstMultiQueueItem *item;
1366
1367   item = g_slice_new (GstMultiQueueItem);
1368   item->object = object;
1369   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1370   item->posid = curid;
1371   item->is_query = GST_IS_QUERY (object);
1372
1373   item->size = 0;
1374   item->duration = 0;
1375   item->visible = FALSE;
1376   return item;
1377 }
1378
1379 /* Each main loop attempts to push buffers until the return value
1380  * is not-linked. not-linked pads are not allowed to push data beyond
1381  * any linked pads, so they don't 'rush ahead of the pack'.
1382  */
1383 static void
1384 gst_multi_queue_loop (GstPad * pad)
1385 {
1386   GstSingleQueue *sq;
1387   GstMultiQueueItem *item;
1388   GstDataQueueItem *sitem;
1389   GstMultiQueue *mq;
1390   GstMiniObject *object = NULL;
1391   guint32 newid;
1392   GstFlowReturn result;
1393   GstClockTime next_time;
1394   gboolean is_buffer;
1395   gboolean do_update_buffering = FALSE;
1396   gboolean dropping = FALSE;
1397
1398   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1399   mq = sq->mqueue;
1400
1401 next:
1402   GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
1403
1404   if (sq->flushing)
1405     goto out_flushing;
1406
1407   /* Get something from the queue, blocking until that happens, or we get
1408    * flushed */
1409   if (!(gst_data_queue_pop (sq->queue, &sitem)))
1410     goto out_flushing;
1411
1412   item = (GstMultiQueueItem *) sitem;
1413   newid = item->posid;
1414
1415   /* steal the object and destroy the item */
1416   object = gst_multi_queue_item_steal_object (item);
1417   gst_multi_queue_item_destroy (item);
1418
1419   is_buffer = GST_IS_BUFFER (object);
1420
1421   /* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */
1422   next_time = get_running_time (&sq->src_segment, object, FALSE);
1423
1424   GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
1425       sq->id, newid, sq->last_oldid);
1426
1427   /* If we're not-linked, we do some extra work because we might need to
1428    * wait before pushing. If we're linked but there's a gap in the IDs,
1429    * or it's the first loop, or we just passed the previous highid,
1430    * we might need to wake some sleeping pad up, so there's extra work
1431    * there too */
1432   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1433   if (sq->srcresult == GST_FLOW_NOT_LINKED
1434       || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
1435       || sq->last_oldid > mq->highid) {
1436     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
1437         gst_flow_get_name (sq->srcresult));
1438
1439     /* Check again if we're flushing after the lock is taken,
1440      * the flush flag might have been changed in the meantime */
1441     if (sq->flushing) {
1442       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1443       goto out_flushing;
1444     }
1445
1446     /* Update the nextid so other threads know when to wake us up */
1447     sq->nextid = newid;
1448     sq->next_time = next_time;
1449
1450     /* Update the oldid (the last ID we output) for highid tracking */
1451     if (sq->last_oldid != G_MAXUINT32)
1452       sq->oldid = sq->last_oldid;
1453
1454     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1455       /* Go to sleep until it's time to push this buffer */
1456
1457       /* Recompute the highid */
1458       compute_high_id (mq);
1459       /* Recompute the high time */
1460       compute_high_time (mq);
1461
1462       while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE &&
1463                   (mq->high_time == GST_CLOCK_TIME_NONE
1464                       || next_time > mq->high_time))
1465               || (!mq->sync_by_running_time && newid > mq->highid))
1466           && sq->srcresult == GST_FLOW_NOT_LINKED) {
1467
1468         GST_DEBUG_OBJECT (mq,
1469             "queue %d sleeping for not-linked wakeup with "
1470             "newid %u, highid %u, next_time %" GST_TIME_FORMAT
1471             ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
1472             GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
1473
1474         /* Wake up all non-linked pads before we sleep */
1475         wake_up_next_non_linked (mq);
1476
1477         mq->numwaiting++;
1478         g_cond_wait (&sq->turn, &mq->qlock);
1479         mq->numwaiting--;
1480
1481         if (sq->flushing) {
1482           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1483           goto out_flushing;
1484         }
1485
1486         /* Recompute the high time */
1487         compute_high_time (mq);
1488
1489         GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
1490             "wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT
1491             ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
1492             GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
1493       }
1494
1495       /* Re-compute the high_id in case someone else pushed */
1496       compute_high_id (mq);
1497       compute_high_time (mq);
1498     } else {
1499       compute_high_id (mq);
1500       compute_high_time (mq);
1501       /* Wake up all non-linked pads */
1502       wake_up_next_non_linked (mq);
1503     }
1504     /* We're done waiting, we can clear the nextid and nexttime */
1505     sq->nextid = 0;
1506     sq->next_time = GST_CLOCK_TIME_NONE;
1507   }
1508   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1509
1510   if (sq->flushing)
1511     goto out_flushing;
1512
1513   GST_LOG_OBJECT (mq, "sq:%d BEFORE PUSHING sq->srcresult: %s", sq->id,
1514       gst_flow_get_name (sq->srcresult));
1515
1516   /* Update time stats */
1517   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1518   next_time = get_running_time (&sq->src_segment, object, TRUE);
1519   if (next_time != GST_CLOCK_TIME_NONE) {
1520     if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
1521       sq->last_time = next_time;
1522     if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) {
1523       /* Wake up all non-linked pads now that we advanced the high time */
1524       mq->high_time = next_time;
1525       wake_up_next_non_linked (mq);
1526     }
1527   }
1528   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1529
1530   /* Try to push out the new object */
1531   result = gst_single_queue_push_one (mq, sq, object, &dropping);
1532   object = NULL;
1533
1534   /* Check if we pushed something already and if this is
1535    * now a switch from an active to a non-active stream.
1536    *
1537    * If it is, we reset all the waiting streams, let them
1538    * push another buffer to see if they're now active again.
1539    * This allows faster switching between streams and prevents
1540    * deadlocks if downstream does any waiting too.
1541    */
1542   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1543   if (sq->pushed && sq->srcresult == GST_FLOW_OK
1544       && result == GST_FLOW_NOT_LINKED) {
1545     GList *tmp;
1546
1547     GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
1548         sq->id);
1549
1550     compute_high_id (mq);
1551     do_update_buffering = TRUE;
1552
1553     /* maybe no-one is waiting */
1554     if (mq->numwaiting > 0) {
1555       /* Else figure out which singlequeue(s) need waking up */
1556       for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1557         GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
1558
1559         if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
1560           GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
1561           sq2->pushed = FALSE;
1562           sq2->srcresult = GST_FLOW_OK;
1563           g_cond_signal (&sq2->turn);
1564         }
1565       }
1566     }
1567   }
1568
1569   if (is_buffer)
1570     sq->pushed = TRUE;
1571
1572   /* now hold on a bit;
1573    * can not simply throw this result to upstream, because
1574    * that might already be onto another segment, so we have to make
1575    * sure we are relaying the correct info wrt proper segment */
1576   if (result == GST_FLOW_EOS && !dropping &&
1577       sq->srcresult != GST_FLOW_NOT_LINKED) {
1578     GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
1579     dropping = TRUE;
1580     /* pretend we have not seen EOS yet for upstream's sake */
1581     result = sq->srcresult;
1582   } else if (dropping && gst_data_queue_is_empty (sq->queue)) {
1583     /* queue empty, so stop dropping
1584      * we can commit the result we have now,
1585      * which is either OK after a segment, or EOS */
1586     GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
1587     dropping = FALSE;
1588     result = GST_FLOW_EOS;
1589   }
1590   sq->srcresult = result;
1591   sq->last_oldid = newid;
1592
1593   if (do_update_buffering)
1594     update_buffering (mq, sq);
1595
1596   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1597   gst_multi_queue_post_buffering (mq);
1598
1599   if (dropping)
1600     goto next;
1601
1602   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
1603       && result != GST_FLOW_EOS)
1604     goto out_flushing;
1605
1606   GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s", sq->id,
1607       gst_flow_get_name (sq->srcresult));
1608
1609   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1610   if (mq->numwaiting > 0 && sq->srcresult == GST_FLOW_EOS) {
1611     compute_high_time (mq);
1612     compute_high_id (mq);
1613     wake_up_next_non_linked (mq);
1614   }
1615   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1616
1617   return;
1618
1619 out_flushing:
1620   {
1621     if (object)
1622       gst_mini_object_unref (object);
1623
1624     /* Need to make sure wake up any sleeping pads when we exit */
1625     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1626     compute_high_time (mq);
1627     compute_high_id (mq);
1628     wake_up_next_non_linked (mq);
1629     sq->last_query = FALSE;
1630     g_cond_signal (&sq->query_handled);
1631
1632     /* Post an error message if we got EOS while downstream
1633      * has returned an error flow return. After EOS there
1634      * will be no further buffer which could propagate the
1635      * error upstream */
1636     if (sq->is_eos && sq->srcresult < GST_FLOW_EOS) {
1637       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1638       GST_ELEMENT_ERROR (mq, STREAM, FAILED,
1639           ("Internal data stream error."),
1640           ("streaming stopped, reason %s", gst_flow_get_name (sq->srcresult)));
1641     } else {
1642       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1643     }
1644
1645     /* upstream needs to see fatal result ASAP to shut things down,
1646      * but might be stuck in one of our other full queues;
1647      * so empty this one and trigger dynamic queue growth. At
1648      * this point the srcresult is not OK, NOT_LINKED
1649      * or EOS, i.e. a real failure */
1650     gst_single_queue_flush_queue (sq, FALSE);
1651     single_queue_underrun_cb (sq->queue, sq);
1652     gst_data_queue_set_flushing (sq->queue, TRUE);
1653     gst_pad_pause_task (sq->srcpad);
1654     GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
1655         "SingleQueue[%d] task paused, reason:%s",
1656         sq->id, gst_flow_get_name (sq->srcresult));
1657     return;
1658   }
1659 }
1660
1661 /**
1662  * gst_multi_queue_chain:
1663  *
1664  * This is similar to GstQueue's chain function, except:
1665  * _ we don't have leak behaviours,
1666  * _ we push with a unique id (curid)
1667  */
1668 static GstFlowReturn
1669 gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1670 {
1671   GstSingleQueue *sq;
1672   GstMultiQueue *mq;
1673   GstMultiQueueItem *item;
1674   guint32 curid;
1675   GstClockTime timestamp, duration;
1676
1677   sq = gst_pad_get_element_private (pad);
1678   mq = sq->mqueue;
1679
1680   /* if eos, we are always full, so avoid hanging incoming indefinitely */
1681   if (sq->is_eos)
1682     goto was_eos;
1683
1684   /* Get a unique incrementing id */
1685   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
1686
1687   GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
1688       sq->id, buffer, curid);
1689
1690   item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
1691
1692   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1693   duration = GST_BUFFER_DURATION (buffer);
1694
1695   if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
1696     goto flushing;
1697
1698   /* update time level, we must do this after pushing the data in the queue so
1699    * that we never end up filling the queue first. */
1700   apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
1701
1702 done:
1703   return sq->srcresult;
1704
1705   /* ERRORS */
1706 flushing:
1707   {
1708     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
1709         sq->id, gst_flow_get_name (sq->srcresult));
1710     gst_multi_queue_item_destroy (item);
1711     goto done;
1712   }
1713 was_eos:
1714   {
1715     GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return EOS");
1716     gst_buffer_unref (buffer);
1717     return GST_FLOW_EOS;
1718   }
1719 }
1720
1721 static gboolean
1722 gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
1723     GstPadMode mode, gboolean active)
1724 {
1725   gboolean res;
1726   GstSingleQueue *sq;
1727   GstMultiQueue *mq;
1728
1729   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1730   mq = (GstMultiQueue *) gst_pad_get_parent (pad);
1731
1732   /* mq is NULL if the pad is activated/deactivated before being
1733    * added to the multiqueue */
1734   if (mq)
1735     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1736
1737   switch (mode) {
1738     case GST_PAD_MODE_PUSH:
1739       if (active) {
1740         /* All pads start off linked until they push one buffer */
1741         sq->srcresult = GST_FLOW_OK;
1742         sq->pushed = FALSE;
1743         gst_data_queue_set_flushing (sq->queue, FALSE);
1744       } else {
1745         sq->srcresult = GST_FLOW_FLUSHING;
1746         sq->last_query = FALSE;
1747         g_cond_signal (&sq->query_handled);
1748         gst_data_queue_set_flushing (sq->queue, TRUE);
1749
1750         /* Wait until streaming thread has finished */
1751         if (mq)
1752           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1753         GST_PAD_STREAM_LOCK (pad);
1754         if (mq)
1755           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1756         gst_data_queue_flush (sq->queue);
1757         if (mq)
1758           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1759         GST_PAD_STREAM_UNLOCK (pad);
1760         if (mq)
1761           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1762       }
1763       res = TRUE;
1764       break;
1765     default:
1766       res = FALSE;
1767       break;
1768   }
1769
1770   if (mq) {
1771     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1772     gst_object_unref (mq);
1773   }
1774
1775   return res;
1776 }
1777
1778 static gboolean
1779 gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
1780 {
1781   GstSingleQueue *sq;
1782   GstMultiQueue *mq;
1783   guint32 curid;
1784   GstMultiQueueItem *item;
1785   gboolean res;
1786   GstEventType type;
1787   GstEvent *sref = NULL;
1788
1789   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1790   mq = (GstMultiQueue *) parent;
1791
1792   type = GST_EVENT_TYPE (event);
1793
1794   switch (type) {
1795     case GST_EVENT_STREAM_START:
1796     {
1797       if (mq->sync_by_running_time) {
1798         GstStreamFlags stream_flags;
1799         gst_event_parse_stream_flags (event, &stream_flags);
1800         if ((stream_flags & GST_STREAM_FLAG_SPARSE)) {
1801           GST_INFO_OBJECT (mq, "SingleQueue %d is a sparse stream", sq->id);
1802           sq->is_sparse = TRUE;
1803         }
1804         sq->thread = g_thread_self ();
1805       }
1806
1807       /* Remove EOS flag */
1808       sq->is_eos = FALSE;
1809     }
1810       break;
1811     case GST_EVENT_FLUSH_START:
1812       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
1813           sq->id);
1814
1815       res = gst_pad_push_event (sq->srcpad, event);
1816
1817       gst_single_queue_flush (mq, sq, TRUE, FALSE);
1818       goto done;
1819
1820     case GST_EVENT_FLUSH_STOP:
1821       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
1822           sq->id);
1823
1824       res = gst_pad_push_event (sq->srcpad, event);
1825
1826       gst_single_queue_flush (mq, sq, FALSE, FALSE);
1827       goto done;
1828
1829     case GST_EVENT_SEGMENT:
1830     case GST_EVENT_GAP:
1831       /* take ref because the queue will take ownership and we need the event
1832        * afterwards to update the segment */
1833       sref = gst_event_ref (event);
1834       break;
1835
1836     default:
1837       if (!(GST_EVENT_IS_SERIALIZED (event))) {
1838         res = gst_pad_push_event (sq->srcpad, event);
1839         goto done;
1840       }
1841       break;
1842   }
1843
1844   /* if eos, we are always full, so avoid hanging incoming indefinitely */
1845   if (sq->is_eos)
1846     goto was_eos;
1847
1848   /* Get an unique incrementing id. */
1849   curid = g_atomic_int_add ((gint *) & mq->counter, 1);
1850
1851   item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
1852
1853   GST_DEBUG_OBJECT (mq,
1854       "SingleQueue %d : Enqueuing event %p of type %s with id %d",
1855       sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
1856
1857   if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
1858     goto flushing;
1859
1860   /* mark EOS when we received one, we must do that after putting the
1861    * buffer in the queue because EOS marks the buffer as filled. */
1862   switch (type) {
1863     case GST_EVENT_EOS:
1864       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1865       sq->is_eos = TRUE;
1866
1867       /* Post an error message if we got EOS while downstream
1868        * has returned an error flow return. After EOS there
1869        * will be no further buffer which could propagate the
1870        * error upstream */
1871       if (sq->srcresult < GST_FLOW_EOS) {
1872         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1873         GST_ELEMENT_ERROR (mq, STREAM, FAILED,
1874             ("Internal data stream error."),
1875             ("streaming stopped, reason %s",
1876                 gst_flow_get_name (sq->srcresult)));
1877       } else {
1878         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1879       }
1880
1881       /* EOS affects the buffering state */
1882       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1883       update_buffering (mq, sq);
1884       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1885       single_queue_overrun_cb (sq->queue, sq);
1886       gst_multi_queue_post_buffering (mq);
1887       break;
1888     case GST_EVENT_SEGMENT:
1889       apply_segment (mq, sq, sref, &sq->sink_segment);
1890       gst_event_unref (sref);
1891       /* a new segment allows us to accept more buffers if we got EOS
1892        * from downstream */
1893       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1894       if (sq->srcresult == GST_FLOW_EOS)
1895         sq->srcresult = GST_FLOW_OK;
1896       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1897       break;
1898     case GST_EVENT_GAP:
1899       apply_gap (mq, sq, sref, &sq->sink_segment);
1900       gst_event_unref (sref);
1901     default:
1902       break;
1903   }
1904 done:
1905   return res;
1906
1907 flushing:
1908   {
1909     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
1910         sq->id, gst_flow_get_name (sq->srcresult));
1911     if (sref)
1912       gst_event_unref (sref);
1913     gst_multi_queue_item_destroy (item);
1914     goto done;
1915   }
1916 was_eos:
1917   {
1918     GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return FALSE");
1919     gst_event_unref (event);
1920     res = FALSE;
1921     goto done;
1922   }
1923 }
1924
1925 static gboolean
1926 gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
1927 {
1928   gboolean res;
1929   GstSingleQueue *sq;
1930   GstMultiQueue *mq;
1931
1932   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1933   mq = (GstMultiQueue *) parent;
1934
1935   switch (GST_QUERY_TYPE (query)) {
1936     default:
1937       if (GST_QUERY_IS_SERIALIZED (query)) {
1938         guint32 curid;
1939         GstMultiQueueItem *item;
1940
1941         GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1942         if (sq->srcresult != GST_FLOW_OK)
1943           goto out_flushing;
1944
1945         /* serialized events go in the queue. We need to be certain that we
1946          * don't cause deadlocks waiting for the query return value. We check if
1947          * the queue is empty (nothing is blocking downstream and the query can
1948          * be pushed for sure) or we are not buffering. If we are buffering,
1949          * the pipeline waits to unblock downstream until our queue fills up
1950          * completely, which can not happen if we block on the query..
1951          * Therefore we only potentially block when we are not buffering. */
1952         if (!mq->use_buffering || gst_data_queue_is_empty (sq->queue)) {
1953           /* Get an unique incrementing id. */
1954           curid = g_atomic_int_add ((gint *) & mq->counter, 1);
1955
1956           item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
1957
1958           GST_DEBUG_OBJECT (mq,
1959               "SingleQueue %d : Enqueuing query %p of type %s with id %d",
1960               sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
1961           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1962           res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
1963           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1964           /* it might be that the query has been taken out of the queue
1965            * while we were unlocked. So, we need to check if the last
1966            * handled query is the same one than the one we just
1967            * pushed. If it is, we don't need to wait for the condition
1968            * variable, otherwise we wait for the condition variable to
1969            * be signaled. */
1970           if (sq->last_handled_query != query)
1971             g_cond_wait (&sq->query_handled, &mq->qlock);
1972           res = sq->last_query;
1973           sq->last_handled_query = NULL;
1974         } else {
1975           GST_DEBUG_OBJECT (mq, "refusing query, we are buffering and the "
1976               "queue is not empty");
1977           res = FALSE;
1978         }
1979         GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1980       } else {
1981         /* default handling */
1982         res = gst_pad_query_default (pad, parent, query);
1983       }
1984       break;
1985   }
1986   return res;
1987
1988 out_flushing:
1989   {
1990     GST_DEBUG_OBJECT (mq, "Flushing");
1991     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1992     return FALSE;
1993   }
1994 }
1995
1996 static gboolean
1997 gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
1998     GstPadMode mode, gboolean active)
1999 {
2000   GstMultiQueue *mq;
2001   GstSingleQueue *sq;
2002   gboolean result;
2003
2004   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
2005   mq = sq->mqueue;
2006
2007   GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
2008
2009   switch (mode) {
2010     case GST_PAD_MODE_PUSH:
2011       if (active) {
2012         result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
2013       } else {
2014         result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
2015         /* make sure streaming finishes */
2016         result |= gst_pad_stop_task (pad);
2017       }
2018       break;
2019     default:
2020       result = FALSE;
2021       break;
2022   }
2023   return result;
2024 }
2025
2026 static gboolean
2027 gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2028 {
2029   GstSingleQueue *sq = gst_pad_get_element_private (pad);
2030   GstMultiQueue *mq = sq->mqueue;
2031   gboolean ret;
2032
2033   switch (GST_EVENT_TYPE (event)) {
2034     case GST_EVENT_RECONFIGURE:
2035       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2036       if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2037         sq->srcresult = GST_FLOW_OK;
2038         g_cond_signal (&sq->turn);
2039       }
2040       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2041
2042       ret = gst_pad_push_event (sq->sinkpad, event);
2043       break;
2044     default:
2045       ret = gst_pad_push_event (sq->sinkpad, event);
2046       break;
2047   }
2048
2049   return ret;
2050 }
2051
2052 static gboolean
2053 gst_multi_queue_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2054 {
2055   gboolean res;
2056
2057   /* FIXME, Handle position offset depending on queue size */
2058   switch (GST_QUERY_TYPE (query)) {
2059     default:
2060       /* default handling */
2061       res = gst_pad_query_default (pad, parent, query);
2062       break;
2063   }
2064   return res;
2065 }
2066
2067 /*
2068  * Next-non-linked functions
2069  */
2070
2071 /* WITH LOCK TAKEN */
2072 static void
2073 wake_up_next_non_linked (GstMultiQueue * mq)
2074 {
2075   GList *tmp;
2076
2077   /* maybe no-one is waiting */
2078   if (mq->numwaiting < 1)
2079     return;
2080
2081   if (mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE) {
2082     /* Else figure out which singlequeue(s) need waking up */
2083     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2084       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2085       if (sq->srcresult == GST_FLOW_NOT_LINKED
2086           && sq->next_time != GST_CLOCK_TIME_NONE
2087           && sq->next_time <= mq->high_time) {
2088         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2089         g_cond_signal (&sq->turn);
2090       }
2091     }
2092   } else {
2093     /* Else figure out which singlequeue(s) need waking up */
2094     for (tmp = mq->queues; tmp; tmp = tmp->next) {
2095       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2096       if (sq->srcresult == GST_FLOW_NOT_LINKED &&
2097           sq->nextid != 0 && sq->nextid <= mq->highid) {
2098         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
2099         g_cond_signal (&sq->turn);
2100       }
2101     }
2102   }
2103 }
2104
2105 /* WITH LOCK TAKEN */
2106 static void
2107 compute_high_id (GstMultiQueue * mq)
2108 {
2109   /* The high-id is either the highest id among the linked pads, or if all
2110    * pads are not-linked, it's the lowest not-linked pad */
2111   GList *tmp;
2112   guint32 lowest = G_MAXUINT32;
2113   guint32 highid = G_MAXUINT32;
2114
2115   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2116     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2117
2118     GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
2119         sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
2120
2121     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2122       /* No need to consider queues which are not waiting */
2123       if (sq->nextid == 0) {
2124         GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
2125         continue;
2126       }
2127
2128       if (sq->nextid < lowest)
2129         lowest = sq->nextid;
2130     } else if (sq->srcresult != GST_FLOW_EOS) {
2131       /* If we don't have a global highid, or the global highid is lower than
2132        * this single queue's last outputted id, store the queue's one, 
2133        * unless the singlequeue is at EOS (srcresult = EOS) */
2134       if ((highid == G_MAXUINT32) || (sq->oldid > highid))
2135         highid = sq->oldid;
2136     }
2137   }
2138
2139   if (highid == G_MAXUINT32 || lowest < highid)
2140     mq->highid = lowest;
2141   else
2142     mq->highid = highid;
2143
2144   GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
2145       lowest);
2146 }
2147
2148 /* WITH LOCK TAKEN */
2149 static void
2150 compute_high_time (GstMultiQueue * mq)
2151 {
2152   /* The high-time is either the highest last time among the linked
2153    * pads, or if all pads are not-linked, it's the lowest nex time of
2154    * not-linked pad */
2155   GList *tmp;
2156   GstClockTime highest = GST_CLOCK_TIME_NONE;
2157   GstClockTime lowest = GST_CLOCK_TIME_NONE;
2158
2159   if (!mq->sync_by_running_time)
2160     return;
2161
2162   for (tmp = mq->queues; tmp; tmp = tmp->next) {
2163     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
2164
2165     GST_LOG_OBJECT (mq,
2166         "inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%"
2167         GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time),
2168         GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult));
2169
2170     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2171       /* No need to consider queues which are not waiting */
2172       if (sq->next_time == GST_CLOCK_TIME_NONE) {
2173         GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
2174         continue;
2175       }
2176
2177       if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest)
2178         lowest = sq->next_time;
2179     } else if (sq->srcresult != GST_FLOW_EOS) {
2180       /* If we don't have a global high time, or the global high time
2181        * is lower than this single queue's last outputted time, store
2182        * the queue's one, unless the singlequeue is at EOS (srcresult
2183        * = EOS) */
2184       if (highest == GST_CLOCK_TIME_NONE
2185           || (sq->last_time != GST_CLOCK_TIME_NONE && sq->last_time > highest))
2186         highest = sq->last_time;
2187     }
2188     GST_LOG_OBJECT (mq,
2189         "highest now %" GST_TIME_FORMAT " lowest %" GST_TIME_FORMAT,
2190         GST_TIME_ARGS (highest), GST_TIME_ARGS (lowest));
2191   }
2192
2193   if (highest == GST_CLOCK_TIME_NONE)
2194     mq->high_time = lowest;
2195   else
2196     mq->high_time = highest;
2197
2198   GST_LOG_OBJECT (mq,
2199       "High time is now : %" GST_TIME_FORMAT ", lowest non-linked %"
2200       GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest));
2201 }
2202
2203 #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
2204      ((q)->max_size.format) <= (value))
2205
2206 /*
2207  * GstSingleQueue functions
2208  */
2209 static void
2210 single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
2211 {
2212   GstMultiQueue *mq = sq->mqueue;
2213   GList *tmp;
2214   GstDataQueueSize size;
2215   gboolean filled = TRUE;
2216   gboolean empty_found = FALSE;
2217
2218   gst_data_queue_get_level (sq->queue, &size);
2219
2220   GST_LOG_OBJECT (mq,
2221       "Single Queue %d: EOS %d, visible %u/%u, bytes %u/%u, time %"
2222       G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, sq->is_eos, size.visible,
2223       sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time,
2224       sq->max_size.time);
2225
2226   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2227
2228   /* check if we reached the hard time/bytes limits */
2229   if (sq->is_eos || sq->is_sparse || IS_FILLED (sq, bytes, size.bytes) ||
2230       IS_FILLED (sq, time, sq->cur_time)) {
2231     goto done;
2232   }
2233
2234   /* Search for empty queues */
2235   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2236     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
2237
2238     if (oq == sq)
2239       continue;
2240
2241     if (oq->srcresult == GST_FLOW_NOT_LINKED) {
2242       GST_LOG_OBJECT (mq, "Queue %d is not-linked", oq->id);
2243       continue;
2244     }
2245
2246     GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
2247     if (gst_data_queue_is_empty (oq->queue) && !oq->is_sparse) {
2248       GST_LOG_OBJECT (mq, "Queue %d is empty", oq->id);
2249       empty_found = TRUE;
2250       break;
2251     }
2252   }
2253
2254   /* if hard limits are not reached then we allow one more buffer in the full
2255    * queue, but only if any of the other singelqueues are empty */
2256   if (empty_found) {
2257     if (IS_FILLED (sq, visible, size.visible)) {
2258       sq->max_size.visible = size.visible + 1;
2259       GST_DEBUG_OBJECT (mq,
2260           "Bumping single queue %d max visible to %d",
2261           sq->id, sq->max_size.visible);
2262       filled = FALSE;
2263     }
2264   }
2265
2266 done:
2267   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2268
2269   /* Overrun is always forwarded, since this is blocking the upstream element */
2270   if (filled) {
2271     GST_DEBUG_OBJECT (mq, "Queue %d is filled, signalling overrun", sq->id);
2272     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
2273   }
2274 }
2275
2276 static void
2277 single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
2278 {
2279   gboolean empty = TRUE;
2280   GstMultiQueue *mq = sq->mqueue;
2281   GList *tmp;
2282
2283   if (sq->srcresult == GST_FLOW_NOT_LINKED) {
2284     GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id);
2285     return;
2286   } else {
2287     GST_LOG_OBJECT (mq,
2288         "Single Queue %d is empty, Checking other single queues", sq->id);
2289   }
2290
2291   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
2292   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
2293     GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
2294
2295     if (gst_data_queue_is_full (oq->queue)) {
2296       GstDataQueueSize size;
2297
2298       gst_data_queue_get_level (oq->queue, &size);
2299       if (IS_FILLED (oq, visible, size.visible)) {
2300         oq->max_size.visible = size.visible + 1;
2301         GST_DEBUG_OBJECT (mq,
2302             "queue %d is filled, bumping its max visible to %d", oq->id,
2303             oq->max_size.visible);
2304         gst_data_queue_limits_changed (oq->queue);
2305       }
2306     }
2307     if (!gst_data_queue_is_empty (oq->queue) || oq->is_sparse)
2308       empty = FALSE;
2309   }
2310   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
2311
2312   if (empty) {
2313     GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
2314     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
2315   }
2316 }
2317
2318 static gboolean
2319 single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
2320     guint64 time, GstSingleQueue * sq)
2321 {
2322   gboolean res;
2323   GstMultiQueue *mq = sq->mqueue;
2324
2325   GST_DEBUG_OBJECT (mq,
2326       "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
2327       G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
2328       sq->max_size.bytes, sq->cur_time, sq->max_size.time);
2329
2330   /* we are always filled on EOS */
2331   if (sq->is_eos)
2332     return TRUE;
2333
2334   /* we never go past the max visible items unless we are in buffering mode */
2335   if (!mq->use_buffering && IS_FILLED (sq, visible, visible))
2336     return TRUE;
2337
2338   /* check time or bytes */
2339   res = IS_FILLED (sq, bytes, bytes);
2340   /* We only care about limits in time if we're not a sparse stream or
2341    * we're not syncing by running time */
2342   if (!sq->is_sparse || !mq->sync_by_running_time)
2343     res |= IS_FILLED (sq, time, sq->cur_time);
2344
2345   return res;
2346 }
2347
2348 static void
2349 gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
2350 {
2351   GstDataQueueItem *sitem;
2352   GstMultiQueueItem *mitem;
2353   gboolean was_flushing = FALSE;
2354
2355   while (!gst_data_queue_is_empty (sq->queue)) {
2356     GstMiniObject *data;
2357
2358     /* FIXME: If this fails here although the queue is not empty,
2359      * we're flushing... but we want to rescue all sticky
2360      * events nonetheless.
2361      */
2362     if (!gst_data_queue_pop (sq->queue, &sitem)) {
2363       was_flushing = TRUE;
2364       gst_data_queue_set_flushing (sq->queue, FALSE);
2365       continue;
2366     }
2367
2368     mitem = (GstMultiQueueItem *) sitem;
2369
2370     data = sitem->object;
2371
2372     if (!full && !mitem->is_query && GST_IS_EVENT (data)
2373         && GST_EVENT_IS_STICKY (data)
2374         && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
2375         && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
2376       gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data));
2377     }
2378
2379     sitem->destroy (sitem);
2380   }
2381
2382   gst_data_queue_flush (sq->queue);
2383   if (was_flushing)
2384     gst_data_queue_set_flushing (sq->queue, TRUE);
2385
2386   GST_MULTI_QUEUE_MUTEX_LOCK (sq->mqueue);
2387   update_buffering (sq->mqueue, sq);
2388   GST_MULTI_QUEUE_MUTEX_UNLOCK (sq->mqueue);
2389   gst_multi_queue_post_buffering (sq->mqueue);
2390 }
2391
2392 static void
2393 gst_single_queue_free (GstSingleQueue * sq)
2394 {
2395   /* DRAIN QUEUE */
2396   gst_data_queue_flush (sq->queue);
2397   g_object_unref (sq->queue);
2398   g_cond_clear (&sq->turn);
2399   g_cond_clear (&sq->query_handled);
2400   g_free (sq);
2401 }
2402
2403 static GstSingleQueue *
2404 gst_single_queue_new (GstMultiQueue * mqueue, guint id)
2405 {
2406   GstSingleQueue *sq;
2407   gchar *name;
2408   GList *tmp;
2409   guint temp_id = (id == -1) ? 0 : id;
2410
2411   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
2412
2413   /* Find an unused queue ID, if possible the passed one */
2414   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
2415     GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
2416     /* This works because the IDs are sorted in ascending order */
2417     if (sq2->id == temp_id) {
2418       /* If this ID was requested by the caller return NULL,
2419        * otherwise just get us the next one */
2420       if (id == -1)
2421         temp_id = sq2->id + 1;
2422       else
2423         return NULL;
2424     } else if (sq2->id > temp_id) {
2425       break;
2426     }
2427   }
2428
2429   sq = g_new0 (GstSingleQueue, 1);
2430   mqueue->nbqueues++;
2431   sq->id = temp_id;
2432
2433   mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
2434   mqueue->queues_cookie++;
2435
2436   /* copy over max_size and extra_size so we don't need to take the lock
2437    * any longer when checking if the queue is full. */
2438   sq->max_size.visible = mqueue->max_size.visible;
2439   sq->max_size.bytes = mqueue->max_size.bytes;
2440   sq->max_size.time = mqueue->max_size.time;
2441
2442   sq->extra_size.visible = mqueue->extra_size.visible;
2443   sq->extra_size.bytes = mqueue->extra_size.bytes;
2444   sq->extra_size.time = mqueue->extra_size.time;
2445
2446   GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
2447
2448   sq->mqueue = mqueue;
2449   sq->srcresult = GST_FLOW_FLUSHING;
2450   sq->pushed = FALSE;
2451   sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
2452       single_queue_check_full,
2453       (GstDataQueueFullCallback) single_queue_overrun_cb,
2454       (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
2455   sq->is_eos = FALSE;
2456   sq->is_sparse = FALSE;
2457   sq->flushing = FALSE;
2458   gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
2459   gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
2460
2461   sq->nextid = 0;
2462   sq->oldid = 0;
2463   sq->next_time = GST_CLOCK_TIME_NONE;
2464   sq->last_time = GST_CLOCK_TIME_NONE;
2465   g_cond_init (&sq->turn);
2466   g_cond_init (&sq->query_handled);
2467
2468   sq->sinktime = GST_CLOCK_TIME_NONE;
2469   sq->srctime = GST_CLOCK_TIME_NONE;
2470   sq->sink_tainted = TRUE;
2471   sq->src_tainted = TRUE;
2472
2473   name = g_strdup_printf ("sink_%u", sq->id);
2474   sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, name);
2475   g_free (name);
2476
2477   gst_pad_set_chain_function (sq->sinkpad,
2478       GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
2479   gst_pad_set_activatemode_function (sq->sinkpad,
2480       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_mode));
2481   gst_pad_set_event_function (sq->sinkpad,
2482       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
2483   gst_pad_set_query_function (sq->sinkpad,
2484       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_query));
2485   gst_pad_set_iterate_internal_links_function (sq->sinkpad,
2486       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
2487   GST_OBJECT_FLAG_SET (sq->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
2488
2489   name = g_strdup_printf ("src_%u", sq->id);
2490   sq->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
2491   g_free (name);
2492
2493   gst_pad_set_activatemode_function (sq->srcpad,
2494       GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_mode));
2495   gst_pad_set_event_function (sq->srcpad,
2496       GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
2497   gst_pad_set_query_function (sq->srcpad,
2498       GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
2499   gst_pad_set_iterate_internal_links_function (sq->srcpad,
2500       GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
2501   GST_OBJECT_FLAG_SET (sq->srcpad, GST_PAD_FLAG_PROXY_CAPS);
2502
2503   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
2504   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
2505
2506   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
2507
2508   /* only activate the pads when we are not in the NULL state
2509    * and add the pad under the state_lock to prevend state changes
2510    * between activating and adding */
2511   g_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
2512   if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
2513     gst_pad_set_active (sq->srcpad, TRUE);
2514     gst_pad_set_active (sq->sinkpad, TRUE);
2515   }
2516   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
2517   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
2518   g_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
2519
2520   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
2521       sq->id);
2522
2523   return sq;
2524 }