plugins/elements/gstmultiqueue.c: Fix event leak.
[platform/upstream/gstreamer.git] / plugins / elements / gstmultiqueue.c
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  *
4  * gstmultiqueue.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #  include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include "gstmultiqueue.h"
28
29 /**
30  * GstSingleQueue:
31  * @sinkpad: associated sink #GstPad
32  * @srcpad: associated source #GstPad
33  *
34  * Structure containing all information and properties about
35  * a single queue.
36  */
37 typedef struct _GstSingleQueue GstSingleQueue;
38
39 struct _GstSingleQueue
40 {
41   /* unique identifier of the queue */
42   guint id;
43
44   GstMultiQueue *mqueue;
45
46   GstPad *sinkpad;
47   GstPad *srcpad;
48
49   /* flowreturn of previous srcpad push */
50   GstFlowReturn srcresult;
51   GstSegment sink_segment;
52   GstSegment src_segment;
53
54   /* queue of data */
55   GstDataQueue *queue;
56   GstDataQueueSize max_size, extra_size;
57   GstClockTime cur_time;
58   gboolean is_eos;
59   gboolean inextra;             /* TRUE if the queue is currently in extradata mode */
60
61   /* Protected by global lock */
62   guint32 nextid;               /* ID of the next object waiting to be pushed */
63   guint32 oldid;                /* ID of the last object pushed (last in a series) */
64   GCond *turn;                  /* SingleQueue turn waiting conditional */
65 };
66
67
68 /* Extension of GstDataQueueItem structure for our usage */
69 typedef struct _GstMultiQueueItem GstMultiQueueItem;
70
71 struct _GstMultiQueueItem
72 {
73   GstMiniObject *object;
74   guint size;
75   guint64 duration;
76   gboolean visible;
77
78   GDestroyNotify destroy;
79   guint32 posid;
80 };
81
82 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
83 static void gst_single_queue_free (GstSingleQueue * squeue);
84
85 static void wake_up_next_non_linked (GstMultiQueue * mq);
86 static void compute_next_non_linked (GstMultiQueue * mq);
87
88 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
89     GST_PAD_SINK,
90     GST_PAD_REQUEST,
91     GST_STATIC_CAPS_ANY);
92
93 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
94     GST_PAD_SRC,
95     GST_PAD_SOMETIMES,
96     GST_STATIC_CAPS_ANY);
97
98 GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
99 #define GST_CAT_DEFAULT (multi_queue_debug)
100
101 static const GstElementDetails gst_multi_queue_details =
102 GST_ELEMENT_DETAILS ("MultiQueue",
103     "Generic",
104     "Multiple data queue",
105     "Edward Hervey <edward@fluendo.com>");
106
107 /* default limits, we try to keep up to 2 seconds of data and if there is not
108  * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
109  * there is data in the queues. Normally, the byte and time limits are not hit
110  * in theses conditions. */
111 #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
112 #define DEFAULT_MAX_SIZE_BUFFERS 5
113 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
114
115 /* second limits. When we hit one of the above limits we are probably dealing
116  * with a badly muxed file and we scale the limits to these emergency values.
117  * This is currently not yet implemented. */
118 #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
119 #define DEFAULT_EXTRA_SIZE_BUFFERS 5
120 #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
121
122 /* Signals and args */
123 enum
124 {
125   SIGNAL_UNDERRUN,
126   SIGNAL_OVERRUN,
127   LAST_SIGNAL
128 };
129
130 enum
131 {
132   ARG_0,
133   ARG_EXTRA_SIZE_BYTES,
134   ARG_EXTRA_SIZE_BUFFERS,
135   ARG_EXTRA_SIZE_TIME,
136   ARG_MAX_SIZE_BYTES,
137   ARG_MAX_SIZE_BUFFERS,
138   ARG_MAX_SIZE_TIME,
139 };
140
141 #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
142   g_mutex_lock (q->qlock);                                              \
143 } G_STMT_END
144
145 #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
146   g_mutex_unlock (q->qlock);                                            \
147 } G_STMT_END
148
149 static void gst_multi_queue_finalize (GObject * object);
150 static void gst_multi_queue_set_property (GObject * object,
151     guint prop_id, const GValue * value, GParamSpec * pspec);
152 static void gst_multi_queue_get_property (GObject * object,
153     guint prop_id, GValue * value, GParamSpec * pspec);
154
155 static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
156     GstPadTemplate * temp, const gchar * name);
157 static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
158
159 static void gst_multi_queue_loop (GstPad * pad);
160
161 #define _do_init(bla) \
162   GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
163
164 GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement,
165     GST_TYPE_ELEMENT, _do_init);
166
167 static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
168
169 static void
170 gst_multi_queue_base_init (gpointer g_class)
171 {
172   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
173
174   gst_element_class_add_pad_template (gstelement_class,
175       gst_static_pad_template_get (&sinktemplate));
176   gst_element_class_add_pad_template (gstelement_class,
177       gst_static_pad_template_get (&srctemplate));
178   gst_element_class_set_details (gstelement_class, &gst_multi_queue_details);
179 }
180
181 static void
182 gst_multi_queue_class_init (GstMultiQueueClass * klass)
183 {
184   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
185   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
186
187   gobject_class->set_property =
188       GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
189   gobject_class->get_property =
190       GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
191
192   /* SIGNALS */
193   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
194       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
195       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
196       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
197
198   gst_multi_queue_signals[SIGNAL_OVERRUN] =
199       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
200       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
201       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
202
203   /* PROPERTIES */
204
205   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
206       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
207           "Max. amount of data in the queue (bytes, 0=disable)",
208           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
209   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
210       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
211           "Max. number of buffers in the queue (0=disable)",
212           0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
213   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
214       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
215           "Max. amount of data in the queue (in ns, 0=disable)",
216           0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
217
218   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
219       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
220           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
221           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
222   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
223       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
224           "Amount of buffers the queues can grow if one of them is empty (0=disable)",
225           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
226   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
227       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
228           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
229           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
230
231   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
232
233   gstelement_class->request_new_pad =
234       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
235   gstelement_class->release_pad =
236       GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
237 }
238
239 static void
240 gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
241 {
242   mqueue->nbqueues = 0;
243   mqueue->queues = NULL;
244
245   mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
246   mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
247   mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
248
249   mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
250   mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
251   mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
252
253   mqueue->counter = 0;
254   mqueue->highid = -1;
255   mqueue->nextnotlinked = -1;
256
257   mqueue->qlock = g_mutex_new ();
258 }
259
260 static void
261 gst_multi_queue_finalize (GObject * object)
262 {
263   GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
264
265   g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
266   g_list_free (mqueue->queues);
267   mqueue->queues = NULL;
268
269   /* free/unref instance data */
270   g_mutex_free (mqueue->qlock);
271
272   G_OBJECT_CLASS (parent_class)->finalize (object);
273 }
274
275 #define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START {        \
276     GList * tmp = mq->queues;                                   \
277     while (tmp) {                                               \
278       GstSingleQueue *q = (GstSingleQueue*)tmp->data;           \
279       g_object_set_property ((GObject*) q->queue, name, value); \
280       tmp = g_list_next(tmp);                                   \
281     };                                                          \
282 } G_STMT_END
283
284 static void
285 gst_multi_queue_set_property (GObject * object, guint prop_id,
286     const GValue * value, GParamSpec * pspec)
287 {
288   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
289
290   switch (prop_id) {
291     case ARG_MAX_SIZE_BYTES:
292       mq->max_size.bytes = g_value_get_uint (value);
293       SET_CHILD_PROPERTY (mq, "max-size-bytes", value);
294       break;
295     case ARG_MAX_SIZE_BUFFERS:
296       mq->max_size.visible = g_value_get_uint (value);
297       SET_CHILD_PROPERTY (mq, "max-size-visible", value);
298       break;
299     case ARG_MAX_SIZE_TIME:
300       mq->max_size.time = g_value_get_uint64 (value);
301       SET_CHILD_PROPERTY (mq, "max-size-time", value);
302       break;
303     case ARG_EXTRA_SIZE_BYTES:
304       mq->extra_size.bytes = g_value_get_uint (value);
305       break;
306     case ARG_EXTRA_SIZE_BUFFERS:
307       mq->extra_size.visible = g_value_get_uint (value);
308       break;
309     case ARG_EXTRA_SIZE_TIME:
310       mq->extra_size.time = g_value_get_uint64 (value);
311       break;
312     default:
313       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
314       break;
315   }
316 }
317
318 static void
319 gst_multi_queue_get_property (GObject * object, guint prop_id,
320     GValue * value, GParamSpec * pspec)
321 {
322   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
323
324   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
325
326   switch (prop_id) {
327     case ARG_EXTRA_SIZE_BYTES:
328       g_value_set_uint (value, mq->extra_size.bytes);
329       break;
330     case ARG_EXTRA_SIZE_BUFFERS:
331       g_value_set_uint (value, mq->extra_size.visible);
332       break;
333     case ARG_EXTRA_SIZE_TIME:
334       g_value_set_uint64 (value, mq->extra_size.time);
335       break;
336     case ARG_MAX_SIZE_BYTES:
337       g_value_set_uint (value, mq->max_size.bytes);
338       break;
339     case ARG_MAX_SIZE_BUFFERS:
340       g_value_set_uint (value, mq->max_size.visible);
341       break;
342     case ARG_MAX_SIZE_TIME:
343       g_value_set_uint64 (value, mq->max_size.time);
344       break;
345     default:
346       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
347       break;
348   }
349
350   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
351 }
352
353
354 /*
355  * GstElement methods
356  */
357
358 static GstPad *
359 gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
360     const gchar * name)
361 {
362   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
363   GstSingleQueue *squeue;
364
365   GST_LOG_OBJECT (element, "name : %s", name);
366
367   /* Create a new single queue, add the sink and source pad and return the sink pad */
368   squeue = gst_single_queue_new (mqueue);
369
370   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
371   mqueue->queues = g_list_append (mqueue->queues, squeue);
372   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
373
374   GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
375       GST_DEBUG_PAD_NAME (squeue->sinkpad));
376
377   return squeue->sinkpad;
378 }
379
380 static void
381 gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
382 {
383   GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
384   GstSingleQueue *sq = NULL;
385   GList *tmp;
386
387   GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
388
389   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
390   /* Find which single queue it belongs to, knowing that it should be a sinkpad */
391   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
392     sq = (GstSingleQueue *) tmp->data;
393
394     if (sq->sinkpad == pad)
395       break;
396   }
397
398   if (!tmp) {
399     GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
400     GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
401     return;
402   }
403
404   /* FIXME: The removal of the singlequeue should probably not happen until it
405    * finishes draining */
406
407   /* remove it from the list */
408   mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
409
410   /* FIXME : recompute next-non-linked */
411   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
412
413   /* delete SingleQueue */
414   gst_data_queue_set_flushing (sq->queue, TRUE);
415
416   gst_pad_set_active (sq->srcpad, FALSE);
417   gst_pad_set_active (sq->sinkpad, FALSE);
418   gst_element_remove_pad (element, sq->srcpad);
419   gst_element_remove_pad (element, sq->sinkpad);
420   gst_single_queue_free (sq);
421 }
422
423 static gboolean
424 gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
425 {
426   gboolean result;
427
428   GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
429       sq->id);
430
431   if (flush) {
432     sq->srcresult = GST_FLOW_WRONG_STATE;
433     gst_data_queue_set_flushing (sq->queue, TRUE);
434
435     /* wake up non-linked task */
436     GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
437         sq->id);
438     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
439     g_cond_signal (sq->turn);
440     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
441
442     GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
443     result = gst_pad_pause_task (sq->srcpad);
444   } else {
445     gst_data_queue_flush (sq->queue);
446     gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
447     gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
448     sq->srcresult = GST_FLOW_OK;
449     sq->cur_time = 0;
450     sq->max_size.visible = mq->max_size.visible;
451     sq->is_eos = FALSE;
452     sq->inextra = FALSE;
453     sq->nextid = -1;
454     sq->oldid = -1;
455     gst_data_queue_set_flushing (sq->queue, FALSE);
456
457     GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
458     result =
459         gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
460         sq->srcpad);
461   }
462   return result;
463 }
464
465 /* calculate the diff between running time on the sink and src of the queue.
466  * This is the total amount of time in the queue. */
467 static void
468 update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
469 {
470   gint64 sink_time, src_time;
471
472   sink_time =
473       gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
474       sq->sink_segment.last_stop);
475
476   src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
477       sq->src_segment.last_stop);
478
479   GST_DEBUG_OBJECT (mq,
480       "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
481       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
482
483   if (sink_time >= src_time)
484     sq->cur_time = sink_time - src_time;
485   else
486     sq->cur_time = 0;
487 }
488
489 /* take a NEWSEGMENT event and apply the values to segment, updating the time
490  * level of queue. */
491 static void
492 apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
493     GstSegment * segment)
494 {
495   gboolean update;
496   GstFormat format;
497   gdouble rate, arate;
498   gint64 start, stop, time;
499
500   gst_event_parse_new_segment_full (event, &update, &rate, &arate,
501       &format, &start, &stop, &time);
502
503   /* now configure the values, we use these to track timestamps on the
504    * sinkpad. */
505   if (format != GST_FORMAT_TIME) {
506     /* non-time format, pretent the current time segment is closed with a
507      * 0 start and unknown stop time. */
508     update = FALSE;
509     format = GST_FORMAT_TIME;
510     start = 0;
511     stop = -1;
512     time = 0;
513   }
514   gst_segment_set_newsegment_full (segment, update,
515       rate, arate, format, start, stop, time);
516
517   GST_DEBUG_OBJECT (mq,
518       "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
519
520   /* segment can update the time level of the queue */
521   update_time_level (mq, sq);
522 }
523
524 /* take a buffer and update segment, updating the time level of the queue. */
525 static void
526 apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
527     GstClockTime duration, GstSegment * segment)
528 {
529   /* if no timestamp is set, assume it's continuous with the previous 
530    * time */
531   if (timestamp == GST_CLOCK_TIME_NONE)
532     timestamp = segment->last_stop;
533
534   /* add duration */
535   if (duration != GST_CLOCK_TIME_NONE)
536     timestamp += duration;
537
538   GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT,
539       sq->id, GST_TIME_ARGS (timestamp));
540
541   gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
542
543   /* calc diff with other end */
544   update_time_level (mq, sq);
545 }
546
547 static GstFlowReturn
548 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
549     GstMiniObject * object)
550 {
551   GstFlowReturn result = GST_FLOW_OK;
552
553   if (GST_IS_BUFFER (object)) {
554     GstBuffer *buffer;
555     GstClockTime timestamp, duration;
556
557     buffer = GST_BUFFER_CAST (object);
558     timestamp = GST_BUFFER_TIMESTAMP (buffer);
559     duration = GST_BUFFER_DURATION (buffer);
560
561     apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
562
563     result = gst_pad_push (sq->srcpad, buffer);
564   } else if (GST_IS_EVENT (object)) {
565     GstEvent *event;
566
567     event = GST_EVENT_CAST (object);
568
569     switch (GST_EVENT_TYPE (event)) {
570       case GST_EVENT_EOS:
571         result = GST_FLOW_UNEXPECTED;
572         break;
573       case GST_EVENT_NEWSEGMENT:
574         apply_segment (mq, sq, event, &sq->src_segment);
575         break;
576       default:
577         break;
578     }
579
580     gst_pad_push_event (sq->srcpad, event);
581   } else {
582     g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
583         sq->id);
584   }
585   return result;
586
587   /* ERRORS */
588 }
589
590 static GstMiniObject *
591 gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
592 {
593   GstMiniObject *res;
594
595   res = item->object;
596   item->object = NULL;
597
598   return res;
599 }
600
601 static void
602 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
603 {
604   if (item->object)
605     gst_mini_object_unref (item->object);
606   g_free (item);
607 }
608
609 /* takes ownership of passed mini object! */
610 static GstMultiQueueItem *
611 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
612 {
613   GstMultiQueueItem *item;
614
615   item = g_new (GstMultiQueueItem, 1);
616   item->object = object;
617   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
618   item->posid = curid;
619
620   if (GST_IS_BUFFER (object)) {
621     item->size = GST_BUFFER_SIZE (object);
622     item->duration = GST_BUFFER_DURATION (object);
623     if (item->duration == GST_CLOCK_TIME_NONE)
624       item->duration = 0;
625     item->visible = TRUE;
626   } else {
627     item->size = 0;
628     item->duration = 0;
629     item->visible = FALSE;
630   }
631   return item;
632 }
633
634 static void
635 gst_multi_queue_loop (GstPad * pad)
636 {
637   GstSingleQueue *sq;
638   GstMultiQueueItem *item;
639   GstDataQueueItem *sitem;
640   GstMultiQueue *mq;
641   GstMiniObject *object;
642   guint32 newid;
643   guint32 oldid = -1;
644   GstFlowReturn result;
645
646   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
647   mq = sq->mqueue;
648
649 restart:
650   GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
651
652   if (!(gst_data_queue_pop (sq->queue, &sitem)))
653     goto out_flushing;
654
655   item = (GstMultiQueueItem *) sitem;
656   newid = item->posid;
657   /* steal the object and destroy the item */
658   object = gst_multi_queue_item_steal_object (item);
659   gst_multi_queue_item_destroy (item);
660
661   GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
662       sq->id, newid, oldid);
663
664   /* 1. Only check turn if :
665    * _ We haven't pushed anything yet 
666    * _ OR the new id isn't the follower of the previous one (continuous segment) */
667   if ((oldid == -1) || (newid != (oldid + 1))) {
668     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
669
670     GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
671         gst_flow_get_name (sq->srcresult));
672
673     /* preamble : if we're not linked, set the newid as the next one we want */
674     if (sq->srcresult == GST_FLOW_NOT_LINKED)
675       sq->nextid = newid;
676
677     /* store the last id we outputted */
678     if (oldid != -1)
679       sq->oldid = oldid;
680
681     /* 2. If there's a queue waiting to push, wake it up. If it's us the */
682     /*    check below (3.) will avoid us waiting. */
683     wake_up_next_non_linked (mq);
684
685     /* 3. If we're not linked AND our nextid is higher than the highest oldid outputted
686      * _ Update global next-not-linked
687      * _ Wait on our conditional 
688      */
689     while ((sq->srcresult == GST_FLOW_NOT_LINKED)
690         && (mq->nextnotlinked != sq->id)) {
691       compute_next_non_linked (mq);
692       g_cond_wait (sq->turn, mq->qlock);
693     }
694     /* 4. Check again status, maybe we're flushing */
695     if ((sq->srcresult != GST_FLOW_OK)) {
696       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
697       gst_mini_object_unref (object);
698       goto out_flushing;
699     }
700     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
701   }
702
703   GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
704       gst_flow_get_name (sq->srcresult));
705
706   /* 4. Try to push out the new object */
707   result = gst_single_queue_push_one (mq, sq, object);
708   sq->srcresult = result;
709
710   if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
711     goto out_flushing;
712
713   GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
714       gst_flow_get_name (sq->srcresult));
715
716   oldid = newid;
717
718   /* restart to get the next element */
719   goto restart;
720
721   /* ERRORS */
722 out_flushing:
723   {
724     gst_data_queue_set_flushing (sq->queue, TRUE);
725     gst_pad_pause_task (sq->srcpad);
726     GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
727         "SingleQueue[%d] task paused, reason:%s",
728         sq->id, gst_flow_get_name (sq->srcresult));
729     return;
730   }
731 }
732
733 /**
734  * gst_multi_queue_chain:
735  *
736  * This is similar to GstQueue's chain function, except:
737  * _ we don't have leak behavioures,
738  * _ we push with a unique id (curid)
739  */
740 static GstFlowReturn
741 gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
742 {
743   GstSingleQueue *sq;
744   GstMultiQueue *mq;
745   GstMultiQueueItem *item;
746   GstFlowReturn ret = GST_FLOW_OK;
747   guint32 curid;
748   GstClockTime timestamp, duration;
749
750   sq = gst_pad_get_element_private (pad);
751   mq = (GstMultiQueue *) gst_pad_get_parent (pad);
752
753   /* Get a unique incrementing id */
754   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
755   curid = mq->counter++;
756   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
757
758   GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d",
759       sq->id, curid);
760
761   item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
762
763   timestamp = GST_BUFFER_TIMESTAMP (buffer);
764   duration = GST_BUFFER_DURATION (buffer);
765
766   if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
767     goto flushing;
768
769   /* update time level, we must do this after pushing the data in the queue so
770    * that we never end up filling the queue first. */
771   apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
772
773 done:
774   gst_object_unref (mq);
775
776   return ret;
777
778   /* ERRORS */
779 flushing:
780   {
781     ret = sq->srcresult;
782     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
783         sq->id, gst_flow_get_name (ret));
784     gst_multi_queue_item_destroy (item);
785     goto done;
786   }
787 }
788
789 static gboolean
790 gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
791 {
792   GstSingleQueue *sq;
793
794   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
795
796   if (active) {
797     sq->srcresult = GST_FLOW_OK;
798   } else {
799     sq->srcresult = GST_FLOW_WRONG_STATE;
800     gst_data_queue_flush (sq->queue);
801   }
802   return TRUE;
803 }
804
805 static gboolean
806 gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
807 {
808   GstSingleQueue *sq;
809   GstMultiQueue *mq;
810   guint32 curid;
811   GstMultiQueueItem *item;
812   gboolean res;
813   GstEventType type;
814   GstEvent *sref = NULL;
815
816   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
817   mq = (GstMultiQueue *) gst_pad_get_parent (pad);
818
819   type = GST_EVENT_TYPE (event);
820
821   switch (type) {
822     case GST_EVENT_FLUSH_START:
823       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
824           sq->id);
825
826       res = gst_pad_push_event (sq->srcpad, event);
827
828       gst_single_queue_flush (mq, sq, TRUE);
829       goto done;
830
831     case GST_EVENT_FLUSH_STOP:
832       GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
833           sq->id);
834
835       res = gst_pad_push_event (sq->srcpad, event);
836
837       gst_single_queue_flush (mq, sq, FALSE);
838       goto done;
839     case GST_EVENT_NEWSEGMENT:
840       /* take ref because the queue will take ownership and we need the event
841        * afterwards to update the segment */
842       sref = gst_event_ref (event);
843       break;
844
845     default:
846       if (!(GST_EVENT_IS_SERIALIZED (event))) {
847         res = gst_pad_push_event (sq->srcpad, event);
848         goto done;
849       }
850       break;
851   }
852
853   /* Get an unique incrementing id */
854   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
855   curid = mq->counter++;
856   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
857
858   item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
859
860   GST_DEBUG_OBJECT (mq,
861       "SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event,
862       GST_EVENT_TYPE_NAME (event), curid);
863
864   if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
865     goto flushing;
866
867   /* mark EOS when we received one, we must do that after putting the
868    * buffer in the queue because EOS marks the buffer as filled. No need to take
869    * a lock, the _check_full happens from this thread only, right before pushing
870    * into dataqueue. */
871   switch (type) {
872     case GST_EVENT_EOS:
873       sq->is_eos = TRUE;
874       break;
875     case GST_EVENT_NEWSEGMENT:
876       apply_segment (mq, sq, sref, &sq->sink_segment);
877       gst_event_unref (sref);
878       break;
879     default:
880       break;
881   }
882 done:
883   gst_object_unref (mq);
884   return res;
885
886 flushing:
887   {
888     GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
889         sq->id, gst_flow_get_name (sq->srcresult));
890     if (sref)
891       gst_event_unref (sref);
892     gst_multi_queue_item_destroy (item);
893     goto done;
894   }
895 }
896
897 static GstCaps *
898 gst_multi_queue_getcaps (GstPad * pad)
899 {
900   GstSingleQueue *sq = gst_pad_get_element_private (pad);
901   GstPad *otherpad;
902   GstCaps *result;
903
904   otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
905
906   GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");
907
908   result = gst_pad_peer_get_caps (otherpad);
909   if (result == NULL)
910     result = gst_caps_new_any ();
911
912   return result;
913 }
914
915 static GstFlowReturn
916 gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
917     GstCaps * caps, GstBuffer ** buf)
918 {
919   GstSingleQueue *sq = gst_pad_get_element_private (pad);
920
921   return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf);
922 }
923
924 static gboolean
925 gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
926 {
927   GstMultiQueue *mq;
928   GstSingleQueue *sq;
929   gboolean result = FALSE;
930
931   sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
932   mq = sq->mqueue;
933
934   GST_LOG ("SingleQueue %d", sq->id);
935
936   if (active) {
937     result = gst_single_queue_flush (mq, sq, FALSE);
938   } else {
939     result = gst_single_queue_flush (mq, sq, TRUE);
940     /* make sure streaming finishes */
941     result |= gst_pad_stop_task (pad);
942   }
943   return result;
944 }
945
946 static gboolean
947 gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
948 {
949   return TRUE;
950 }
951
952 static gboolean
953 gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
954 {
955   GstSingleQueue *sq = gst_pad_get_element_private (pad);
956
957   return gst_pad_push_event (sq->sinkpad, event);
958 }
959
960 static gboolean
961 gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
962 {
963   GstSingleQueue *sq = gst_pad_get_element_private (pad);
964   GstPad *peerpad;
965   gboolean res;
966
967   /* FIXME, Handle position offset depending on queue size */
968
969   /* default handling */
970   if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
971     goto no_peer;
972
973   res = gst_pad_query (peerpad, query);
974
975   gst_object_unref (peerpad);
976
977   return res;
978
979   /* ERRORS */
980 no_peer:
981   {
982     GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
983     return FALSE;
984   }
985 }
986
987 /*
988  * Next-non-linked functions
989  */
990
991 /* WITH LOCK TAKEN */
992 static void
993 wake_up_next_non_linked (GstMultiQueue * mq)
994 {
995   GList *tmp;
996
997   GST_LOG ("mq->nextnotlinked:%d", mq->nextnotlinked);
998
999   /* maybe no-one is waiting */
1000   if (mq->nextnotlinked == -1)
1001     return;
1002
1003   /* Else figure out which singlequeue it is and wake it up */
1004   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1005     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1006
1007     if (sq->srcresult == GST_FLOW_NOT_LINKED)
1008       if (sq->id == mq->nextnotlinked) {
1009         GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
1010         g_cond_signal (sq->turn);
1011         return;
1012       }
1013   }
1014 }
1015
1016 /* WITH LOCK TAKEN */
1017 static void
1018 compute_next_non_linked (GstMultiQueue * mq)
1019 {
1020   GList *tmp;
1021   guint32 lowest = G_MAXUINT32;
1022   gint nextid = -1;
1023
1024   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1025     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1026
1027     GST_LOG ("inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
1028         sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
1029
1030     if (sq->srcresult == GST_FLOW_NOT_LINKED)
1031       if (lowest > sq->nextid) {
1032         lowest = sq->nextid;
1033         nextid = sq->id;
1034       }
1035
1036     /* If we don't have a global highid, or the global highid is lower than */
1037     /* this single queue's last outputted id, store the queue's one */
1038     if ((mq->highid == -1) || (mq->highid < sq->oldid))
1039       mq->highid = sq->oldid;
1040   }
1041
1042   mq->nextnotlinked = nextid;
1043   GST_LOG_OBJECT (mq,
1044       "Next-non-linked is sq #%d with nextid : %d. Highid is now : %d", nextid,
1045       lowest, mq->highid);
1046 }
1047
1048 /*
1049  * GstSingleQueue functions
1050  */
1051 static void
1052 single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1053 {
1054   GstMultiQueue *mq = sq->mqueue;
1055   GList *tmp;
1056   GstDataQueueSize size;
1057
1058   gst_data_queue_get_level (sq->queue, &size);
1059
1060   GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id);
1061
1062   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1063   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1064     GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
1065
1066     if (gst_data_queue_is_empty (ssq->queue)) {
1067       if (size.visible == sq->max_size.visible) {
1068         sq->max_size.visible++;
1069         GST_DEBUG_OBJECT (mq,
1070             "Another queue is empty, bumping single queue %d max visible to %d",
1071             sq->id, sq->max_size.visible);
1072       }
1073       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1074       goto beach;
1075     }
1076   }
1077   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1078
1079   /* Overrun is always forwarded, since this is blocking the upstream element */
1080   g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN],
1081       0);
1082
1083 beach:
1084   return;
1085 }
1086
1087 static void
1088 single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1089 {
1090   gboolean empty = TRUE;
1091   GstMultiQueue *mq = sq->mqueue;
1092   GList *tmp;
1093
1094   GST_LOG_OBJECT (mq,
1095       "Single Queue %d is empty, Checking other single queues", sq->id);
1096
1097   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1098   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1099     GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1100
1101     if (gst_data_queue_is_full (sq->queue)) {
1102       GstDataQueueSize size;
1103
1104       gst_data_queue_get_level (sq->queue, &size);
1105       if (size.visible == sq->max_size.visible) {
1106         sq->max_size.visible++;
1107         GST_DEBUG_OBJECT (mq,
1108             "queue %d is filled, bumping its max visible to %d", sq->id,
1109             sq->max_size.visible);
1110         gst_data_queue_limits_changed (sq->queue);
1111       }
1112     }
1113     if (!gst_data_queue_is_empty (sq->queue))
1114       empty = FALSE;
1115   }
1116   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1117
1118   if (empty) {
1119     GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
1120     g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
1121   }
1122 }
1123
1124 static gboolean
1125 single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
1126     guint64 time, GstSingleQueue * sq)
1127 {
1128   gboolean res;
1129
1130   GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
1131       "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
1132       sq->max_size.bytes, sq->cur_time, sq->max_size.time);
1133
1134   /* we are always filled on EOS */
1135   if (sq->is_eos)
1136     return TRUE;
1137
1138 #define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
1139      (sq->max_size.format) <= (value))
1140
1141   /* we never go past the max visible items */
1142   if (IS_FILLED (visible, visible))
1143     return TRUE;
1144
1145   if (sq->cur_time != 0) {
1146     /* if we have valid time in the queue, check */
1147     res = IS_FILLED (time, sq->cur_time);
1148   } else {
1149     /* no valid time, check bytes */
1150     res = IS_FILLED (bytes, bytes);
1151   }
1152   return res;
1153 }
1154
1155 static void
1156 gst_single_queue_free (GstSingleQueue * sq)
1157 {
1158   /* DRAIN QUEUE */
1159   gst_data_queue_flush (sq->queue);
1160   g_object_unref (sq->queue);
1161   g_cond_free (sq->turn);
1162   g_free (sq);
1163 }
1164
1165 static GstSingleQueue *
1166 gst_single_queue_new (GstMultiQueue * mqueue)
1167 {
1168   GstSingleQueue *sq;
1169   gchar *tmp;
1170
1171   sq = g_new0 (GstSingleQueue, 1);
1172
1173   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1174   sq->id = mqueue->nbqueues++;
1175
1176   /* copy over max_size and extra_size so we don't need to take the lock
1177    * any longer when checking if the queue is full. */
1178   sq->max_size.visible = mqueue->max_size.visible;
1179   sq->max_size.bytes = mqueue->max_size.bytes;
1180   sq->max_size.time = mqueue->max_size.time;
1181
1182   sq->extra_size.visible = mqueue->extra_size.visible;
1183   sq->extra_size.bytes = mqueue->extra_size.bytes;
1184   sq->extra_size.time = mqueue->extra_size.time;
1185
1186   GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1187
1188   GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
1189
1190   sq->mqueue = mqueue;
1191   sq->srcresult = GST_FLOW_WRONG_STATE;
1192   sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
1193       single_queue_check_full, sq);
1194   sq->is_eos = FALSE;
1195   gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
1196   gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1197
1198   sq->nextid = -1;
1199   sq->oldid = -1;
1200   sq->turn = g_cond_new ();
1201
1202   /* attach to underrun/overrun signals to handle non-starvation  */
1203   g_signal_connect (G_OBJECT (sq->queue), "full",
1204       G_CALLBACK (single_queue_overrun_cb), sq);
1205   g_signal_connect (G_OBJECT (sq->queue), "empty",
1206       G_CALLBACK (single_queue_underrun_cb), sq);
1207
1208   tmp = g_strdup_printf ("sink%d", sq->id);
1209   sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
1210   g_free (tmp);
1211
1212   gst_pad_set_chain_function (sq->sinkpad,
1213       GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
1214   gst_pad_set_activatepush_function (sq->sinkpad,
1215       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push));
1216   gst_pad_set_event_function (sq->sinkpad,
1217       GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
1218   gst_pad_set_getcaps_function (sq->sinkpad,
1219       GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1220   gst_pad_set_bufferalloc_function (sq->sinkpad,
1221       GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
1222
1223   tmp = g_strdup_printf ("src%d", sq->id);
1224   sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
1225   g_free (tmp);
1226
1227   gst_pad_set_activatepush_function (sq->srcpad,
1228       GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
1229   gst_pad_set_acceptcaps_function (sq->srcpad,
1230       GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
1231   gst_pad_set_getcaps_function (sq->srcpad,
1232       GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1233   gst_pad_set_event_function (sq->srcpad,
1234       GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
1235   gst_pad_set_query_function (sq->srcpad,
1236       GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
1237
1238   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
1239   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
1240
1241   gst_pad_set_active (sq->srcpad, TRUE);
1242   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
1243
1244   gst_pad_set_active (sq->sinkpad, TRUE);
1245   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
1246
1247   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
1248       sq->id);
1249
1250   return sq;
1251 }