Next big merge.
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *                    2005 Wim Taymans <wim@fluendo.com>
6  *
7  * gstqueue.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  */
24
25
26 #include "gst_private.h"
27
28 #include "gstqueue.h"
29 #include "gstscheduler.h"
30 #include "gstpipeline.h"
31 #include "gstevent.h"
32 #include "gstinfo.h"
33 #include "gsterror.h"
34
35 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
36     GST_PAD_SINK,
37     GST_PAD_ALWAYS,
38     GST_STATIC_CAPS_ANY);
39
40 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
41     GST_PAD_SRC,
42     GST_PAD_ALWAYS,
43     GST_STATIC_CAPS_ANY);
44
45 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
46 #define GST_CAT_DEFAULT (queue_dataflow)
47
48 #define STATUS(queue, msg) \
49   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
50                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
51                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
52                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
53                       GST_DEBUG_PAD_NAME (pad), \
54                       queue->cur_level.buffers, \
55                       queue->min_threshold.buffers, \
56                       queue->max_size.buffers, \
57                       queue->cur_level.bytes, \
58                       queue->min_threshold.bytes, \
59                       queue->max_size.bytes, \
60                       queue->cur_level.time, \
61                       queue->min_threshold.time, \
62                       queue->max_size.time, \
63                       queue->queue->length)
64
65 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
66     "Generic",
67     "Simple data queue",
68     "Erik Walthinsen <omega@cse.ogi.edu>");
69
70
71 /* Queue signals and args */
72 enum
73 {
74   SIGNAL_UNDERRUN,
75   SIGNAL_RUNNING,
76   SIGNAL_OVERRUN,
77   LAST_SIGNAL
78 };
79
80 enum
81 {
82   ARG_0,
83   /* FIXME: don't we have another way of doing this
84    * "Gstreamer format" (frame/byte/time) queries? */
85   ARG_CUR_LEVEL_BUFFERS,
86   ARG_CUR_LEVEL_BYTES,
87   ARG_CUR_LEVEL_TIME,
88   ARG_MAX_SIZE_BUFFERS,
89   ARG_MAX_SIZE_BYTES,
90   ARG_MAX_SIZE_TIME,
91   ARG_MIN_THRESHOLD_BUFFERS,
92   ARG_MIN_THRESHOLD_BYTES,
93   ARG_MIN_THRESHOLD_TIME,
94   ARG_LEAKY,
95   ARG_MAY_DEADLOCK,
96   ARG_BLOCK_TIMEOUT
97       /* FILL ME */
98 };
99
100 #define GST_QUEUE_MUTEX_LOCK G_STMT_START {                             \
101   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
102       "locking qlock from thread %p",                                   \
103       g_thread_self ());                                                \
104   g_mutex_lock (queue->qlock);                                          \
105   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
106       "locked qlock from thread %p",                                    \
107       g_thread_self ());                                                \
108 } G_STMT_END
109
110 #define GST_QUEUE_MUTEX_UNLOCK G_STMT_START {                           \
111   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
112       "unlocking qlock from thread %p",                                 \
113       g_thread_self ());                                                \
114   g_mutex_unlock (queue->qlock);                                        \
115 } G_STMT_END
116
117
118 static void gst_queue_base_init (GstQueueClass * klass);
119 static void gst_queue_class_init (GstQueueClass * klass);
120 static void gst_queue_init (GstQueue * queue);
121 static void gst_queue_finalize (GObject * object);
122
123 static void gst_queue_set_property (GObject * object,
124     guint prop_id, const GValue * value, GParamSpec * pspec);
125 static void gst_queue_get_property (GObject * object,
126     guint prop_id, GValue * value, GParamSpec * pspec);
127
128 static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer);
129 static GstBuffer *gst_queue_bufferalloc (GstPad * pad, guint64 offset,
130     guint size, GstCaps * caps);
131 static void gst_queue_loop (GstPad * pad);
132
133 static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);
134
135 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
136 static gboolean gst_queue_handle_src_query (GstPad * pad,
137     GstQueryType type, GstFormat * fmt, gint64 * value);
138
139 static GstCaps *gst_queue_getcaps (GstPad * pad);
140 static GstPadLinkReturn gst_queue_link_sink (GstPad * pad, GstPad * peer);
141 static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer);
142 static void gst_queue_locked_flush (GstQueue * queue);
143
144 static gboolean gst_queue_src_activate (GstPad * pad, GstActivateMode mode);
145 static GstElementStateReturn gst_queue_change_state (GstElement * element);
146
147
148 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
149
150 static GType
151 queue_leaky_get_type (void)
152 {
153   static GType queue_leaky_type = 0;
154   static GEnumValue queue_leaky[] = {
155     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
156     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
157     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
158     {0, NULL, NULL},
159   };
160
161   if (!queue_leaky_type) {
162     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
163   }
164   return queue_leaky_type;
165 }
166
167 static GstElementClass *parent_class = NULL;
168 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
169
170 GType
171 gst_queue_get_type (void)
172 {
173   static GType queue_type = 0;
174
175   if (!queue_type) {
176     static const GTypeInfo queue_info = {
177       sizeof (GstQueueClass),
178       (GBaseInitFunc) gst_queue_base_init,
179       NULL,
180       (GClassInitFunc) gst_queue_class_init,
181       NULL,
182       NULL,
183       sizeof (GstQueue),
184       0,
185       (GInstanceInitFunc) gst_queue_init,
186       NULL
187     };
188
189     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
190         "GstQueue", &queue_info, 0);
191     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0,
192         "dataflow inside the queue element");
193   }
194
195   return queue_type;
196 }
197
198 static void
199 gst_queue_base_init (GstQueueClass * klass)
200 {
201   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
202
203   gst_element_class_add_pad_template (gstelement_class,
204       gst_static_pad_template_get (&srctemplate));
205   gst_element_class_add_pad_template (gstelement_class,
206       gst_static_pad_template_get (&sinktemplate));
207   gst_element_class_set_details (gstelement_class, &gst_queue_details);
208 }
209
210 static void
211 gst_queue_class_init (GstQueueClass * klass)
212 {
213   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
214   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
215
216   parent_class = g_type_class_peek_parent (klass);
217
218   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
219   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
220
221   /* signals */
222   gst_queue_signals[SIGNAL_UNDERRUN] =
223       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
224       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
225       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
226   gst_queue_signals[SIGNAL_RUNNING] =
227       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
228       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
229       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
230   gst_queue_signals[SIGNAL_OVERRUN] =
231       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
232       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
233       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
234
235   /* properties */
236   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
237       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
238           "Current amount of data in the queue (bytes)",
239           0, G_MAXUINT, 0, G_PARAM_READABLE));
240   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
241       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
242           "Current number of buffers in the queue",
243           0, G_MAXUINT, 0, G_PARAM_READABLE));
244   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
245       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
246           "Current amount of data in the queue (in ns)",
247           0, G_MAXUINT64, 0, G_PARAM_READABLE));
248
249   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
250       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
251           "Max. amount of data in the queue (bytes, 0=disable)",
252           0, G_MAXUINT, 0, G_PARAM_READWRITE));
253   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
254       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
255           "Max. number of buffers in the queue (0=disable)",
256           0, G_MAXUINT, 0, G_PARAM_READWRITE));
257   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
258       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
259           "Max. amount of data in the queue (in ns, 0=disable)",
260           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
261
262   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
263       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
264           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
265           0, G_MAXUINT, 0, G_PARAM_READWRITE));
266   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
267       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
268           "Min. number of buffers in the queue to allow reading (0=disable)",
269           0, G_MAXUINT, 0, G_PARAM_READWRITE));
270   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
271       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
272           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
273           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
274
275   g_object_class_install_property (gobject_class, ARG_LEAKY,
276       g_param_spec_enum ("leaky", "Leaky",
277           "Where the queue leaks, if at all",
278           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
279   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
280       g_param_spec_boolean ("may_deadlock", "May Deadlock",
281           "The queue may deadlock if it's full and not PLAYING",
282           TRUE, G_PARAM_READWRITE));
283   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
284       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
285           "Nanoseconds until blocked queue times out and returns filler event. "
286           "Value of -1 disables timeout",
287           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
288
289   /* set several parent class virtual functions */
290   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize);
291
292   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
293 }
294
295 static void
296 gst_queue_init (GstQueue * queue)
297 {
298   queue->sinkpad =
299       gst_pad_new_from_template (gst_static_pad_template_get (&sinktemplate),
300       "sink");
301   gst_pad_set_chain_function (queue->sinkpad,
302       GST_DEBUG_FUNCPTR (gst_queue_chain));
303   gst_pad_set_event_function (queue->sinkpad,
304       GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
305   gst_pad_set_link_function (queue->sinkpad,
306       GST_DEBUG_FUNCPTR (gst_queue_link_sink));
307   gst_pad_set_getcaps_function (queue->sinkpad,
308       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
309   gst_pad_set_bufferalloc_function (queue->sinkpad,
310       GST_DEBUG_FUNCPTR (gst_queue_bufferalloc));
311   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
312
313   queue->srcpad =
314       gst_pad_new_from_template (gst_static_pad_template_get (&srctemplate),
315       "src");
316   gst_pad_set_loop_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_loop));
317   gst_pad_set_activate_function (queue->srcpad,
318       GST_DEBUG_FUNCPTR (gst_queue_src_activate));
319   gst_pad_set_link_function (queue->srcpad,
320       GST_DEBUG_FUNCPTR (gst_queue_link_src));
321   gst_pad_set_getcaps_function (queue->srcpad,
322       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
323   gst_pad_set_event_function (queue->srcpad,
324       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
325   gst_pad_set_query_function (queue->srcpad,
326       GST_DEBUG_FUNCPTR (gst_queue_handle_src_query));
327   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
328
329   queue->cur_level.buffers = 0; /* no content */
330   queue->cur_level.bytes = 0;   /* no content */
331   queue->cur_level.time = 0;    /* no content */
332   queue->max_size.buffers = 200;        /* 200 buffers */
333   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
334   queue->max_size.time = GST_SECOND;    /* 1 s. */
335   queue->min_threshold.buffers = 0;     /* no threshold */
336   queue->min_threshold.bytes = 0;       /* no threshold */
337   queue->min_threshold.time = 0;        /* no threshold */
338
339   queue->leaky = GST_QUEUE_NO_LEAK;
340   queue->may_deadlock = TRUE;
341   queue->block_timeout = GST_CLOCK_TIME_NONE;
342   queue->interrupt = FALSE;
343   queue->flush = FALSE;
344
345   queue->qlock = g_mutex_new ();
346   queue->item_add = g_cond_new ();
347   queue->item_del = g_cond_new ();
348   queue->queue = g_queue_new ();
349
350   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
351       "initialized queue's not_empty & not_full conditions");
352 }
353
354 /* called only once, as opposed to dispose */
355 static void
356 gst_queue_finalize (GObject * object)
357 {
358   GstQueue *queue = GST_QUEUE (object);
359
360   GST_DEBUG_OBJECT (queue, "finalizing queue");
361
362   while (!g_queue_is_empty (queue->queue)) {
363     GstData *data = g_queue_pop_head (queue->queue);
364
365     gst_data_unref (data);
366   }
367   g_queue_free (queue->queue);
368   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, "free mutex");
369   g_mutex_free (queue->qlock);
370   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, "done free mutex");
371   g_cond_free (queue->item_add);
372   g_cond_free (queue->item_del);
373
374   if (G_OBJECT_CLASS (parent_class)->finalize)
375     G_OBJECT_CLASS (parent_class)->finalize (object);
376 }
377
378 static GstCaps *
379 gst_queue_getcaps (GstPad * pad)
380 {
381   GstQueue *queue;
382   GstPad *otherpad;
383   GstCaps *result;
384
385   queue = GST_QUEUE (GST_PAD_PARENT (pad));
386
387   otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
388   result = gst_pad_peer_get_caps (otherpad);
389
390   return result;
391 }
392
393 static GstPadLinkReturn
394 gst_queue_link_sink (GstPad * pad, GstPad * peer)
395 {
396   return GST_PAD_LINK_OK;
397 }
398
399 static GstPadLinkReturn
400 gst_queue_link_src (GstPad * pad, GstPad * peer)
401 {
402   GstPadLinkReturn result = GST_PAD_LINK_OK;
403
404   /* FIXME, see if we need to push or get pulled */
405   if (GST_RPAD_LINKFUNC (peer))
406     result = GST_RPAD_LINKFUNC (peer) (peer, pad);
407
408   return result;
409 }
410
411 static GstBuffer *
412 gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps)
413 {
414   GstQueue *queue;
415   GstPad *otherpeer;
416   GstBuffer *result = NULL;
417
418   queue = GST_QUEUE (GST_PAD_PARENT (pad));
419
420   otherpeer = gst_pad_get_peer (queue->srcpad);
421   if (otherpeer == NULL || GST_RPAD_BUFFERALLOCFUNC (otherpeer) == NULL) {
422     /* let the default aloc function do the work */
423     result = NULL;
424   } else {
425     result =
426         GST_RPAD_BUFFERALLOCFUNC (otherpeer) (otherpeer, offset, size, caps);
427   }
428   if (otherpeer)
429     gst_object_unref (GST_OBJECT (otherpeer));
430
431   return result;
432 }
433
434
435 static void
436 gst_queue_locked_flush (GstQueue * queue)
437 {
438   while (!g_queue_is_empty (queue->queue)) {
439     GstData *data = g_queue_pop_head (queue->queue);
440
441     /* Then loose another reference because we are supposed to destroy that
442        data when flushing */
443     gst_data_unref (data);
444   }
445   queue->cur_level.buffers = 0;
446   queue->cur_level.bytes = 0;
447   queue->cur_level.time = 0;
448
449   /* make sure any pending buffers to be added are flushed too */
450   queue->flush = TRUE;
451
452   /* we deleted something... */
453   g_cond_signal (queue->item_del);
454 }
455
456 #define STATUS(queue, msg) \
457   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
458                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
459                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
460                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
461                       GST_DEBUG_PAD_NAME (pad), \
462                       queue->cur_level.buffers, \
463                       queue->min_threshold.buffers, \
464                       queue->max_size.buffers, \
465                       queue->cur_level.bytes, \
466                       queue->min_threshold.bytes, \
467                       queue->max_size.bytes, \
468                       queue->cur_level.time, \
469                       queue->min_threshold.time, \
470                       queue->max_size.time, \
471                       queue->queue->length)
472
473 static gboolean
474 gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
475 {
476   GstQueue *queue;
477
478   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
479
480   switch (GST_EVENT_TYPE (event)) {
481     case GST_EVENT_FLUSH:
482       STATUS (queue, "received flush event");
483       /* forward event */
484       gst_pad_event_default (pad, event);
485       if (GST_EVENT_FLUSH_DONE (event)) {
486         GST_STREAM_LOCK (queue->srcpad);
487         gst_task_start (GST_RPAD_TASK (queue->srcpad));
488         GST_STREAM_UNLOCK (queue->srcpad);
489       } else {
490         /* now unblock the chain function */
491         GST_QUEUE_MUTEX_LOCK;
492         gst_queue_locked_flush (queue);
493         GST_QUEUE_MUTEX_UNLOCK;
494
495         STATUS (queue, "after flush");
496
497         /* unblock the loop function */
498         g_cond_signal (queue->item_add);
499
500         /* make sure it stops */
501         GST_STREAM_LOCK (queue->srcpad);
502         gst_task_pause (GST_RPAD_TASK (queue->srcpad));
503         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
504         GST_STREAM_UNLOCK (queue->srcpad);
505       }
506       goto done;
507     case GST_EVENT_EOS:
508       STATUS (queue, "received EOS");
509       break;
510     default:
511       /* we put the event in the queue, we don't have to act ourselves */
512       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
513           "adding event %p of type %d", event, GST_EVENT_TYPE (event));
514       break;
515   }
516
517   GST_QUEUE_MUTEX_LOCK;
518   g_queue_push_tail (queue->queue, event);
519   g_cond_signal (queue->item_add);
520
521   GST_QUEUE_MUTEX_UNLOCK;
522 done:
523
524   return TRUE;
525 }
526
527 static gboolean
528 gst_queue_is_empty (GstQueue * queue)
529 {
530   return (queue->queue->length == 0 ||
531       (queue->min_threshold.buffers > 0 &&
532           queue->cur_level.buffers < queue->min_threshold.buffers) ||
533       (queue->min_threshold.bytes > 0 &&
534           queue->cur_level.bytes < queue->min_threshold.bytes) ||
535       (queue->min_threshold.time > 0 &&
536           queue->cur_level.time < queue->min_threshold.time));
537 }
538
539 static gboolean
540 gst_queue_is_filled (GstQueue * queue)
541 {
542   return (((queue->max_size.buffers > 0 &&
543               queue->cur_level.buffers >= queue->max_size.buffers) ||
544           (queue->max_size.bytes > 0 &&
545               queue->cur_level.bytes >= queue->max_size.bytes) ||
546           (queue->max_size.time > 0 &&
547               queue->cur_level.time >= queue->max_size.time)));
548 }
549
550
551 static GstFlowReturn
552 gst_queue_chain (GstPad * pad, GstBuffer * buffer)
553 {
554   GstQueue *queue;
555
556   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
557
558   GST_STREAM_LOCK (pad);
559
560   /* we have to lock the queue since we span threads */
561   GST_QUEUE_MUTEX_LOCK;
562
563   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
564       "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
565
566   /* We make space available if we're "full" according to whatever
567    * the user defined as "full". Note that this only applies to buffers.
568    * We always handle events and they don't count in our statistics. */
569   while (gst_queue_is_filled (queue)) {
570     GST_QUEUE_MUTEX_UNLOCK;
571     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
572     GST_QUEUE_MUTEX_LOCK;
573
574     /* how are we going to make space for this buffer? */
575     switch (queue->leaky) {
576         /* leak current buffer */
577       case GST_QUEUE_LEAK_UPSTREAM:
578         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
579             "queue is full, leaking buffer on upstream end");
580         /* now we can clean up and exit right away */
581         goto out_unref;
582
583         /* leak first buffer in the queue */
584       case GST_QUEUE_LEAK_DOWNSTREAM:{
585         /* this is a bit hacky. We'll manually iterate the list
586          * and find the first buffer from the head on. We'll
587          * unref that and "fix up" the GQueue object... */
588         GList *item;
589         GstData *leak = NULL;
590
591         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
592             "queue is full, leaking buffer on downstream end");
593
594         for (item = queue->queue->head; item != NULL; item = item->next) {
595           if (GST_IS_BUFFER (item->data)) {
596             leak = item->data;
597             break;
598           }
599         }
600
601         /* if we didn't find anything, it means we have no buffers
602          * in here. That cannot happen, since we had >= 1 bufs */
603         g_assert (leak);
604
605         /* Now remove it from the list, fixing up the GQueue
606          * CHECKME: is a queue->head the first or the last item? */
607         item = g_list_delete_link (queue->queue->head, item);
608         queue->queue->head = g_list_first (item);
609         queue->queue->tail = g_list_last (item);
610         queue->queue->length--;
611
612         /* and unref the buffer at the end. Twice, because we keep a ref
613          * to make things read-only. Also keep our list uptodate. */
614         queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer);
615         queue->cur_level.buffers--;
616         if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE)
617           queue->cur_level.time -= GST_BUFFER_DURATION (buffer);
618
619         gst_buffer_unref (buffer);
620         gst_buffer_unref (buffer);
621         break;
622       }
623
624       default:
625         g_warning ("Unknown leaky type, using default");
626         /* fall-through */
627
628         /* don't leak. Instead, wait for space to be available */
629       case GST_QUEUE_NO_LEAK:
630         STATUS (queue, "pre-full wait");
631
632         while (gst_queue_is_filled (queue)) {
633           STATUS (queue, "waiting for item_del signal from thread using qlock");
634           g_cond_wait (queue->item_del, queue->qlock);
635
636           /* if there's a pending state change for this queue
637            * or its manager, switch back to iterator so bottom
638            * half of state change executes */
639           STATUS (queue, "received item_del signal from thread using qlock");
640         }
641
642         STATUS (queue, "post-full wait");
643         GST_QUEUE_MUTEX_UNLOCK;
644         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
645         GST_QUEUE_MUTEX_LOCK;
646         break;
647     }
648   }
649   /* we are flushing */
650   if (GST_RPAD_IS_FLUSHING (pad))
651     goto out_flushing;
652
653   g_queue_push_tail (queue->queue, buffer);
654
655   /* add buffer to the statistics */
656   queue->cur_level.buffers++;
657   queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
658   if (GST_BUFFER_DURATION (buffer) != GST_CLOCK_TIME_NONE)
659     queue->cur_level.time += GST_BUFFER_DURATION (buffer);
660
661   STATUS (queue, "+ level");
662
663   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
664   g_cond_signal (queue->item_add);
665   GST_QUEUE_MUTEX_UNLOCK;
666   GST_STREAM_UNLOCK (pad);
667
668   return GST_FLOW_OK;
669
670 out_unref:
671   GST_QUEUE_MUTEX_UNLOCK;
672   GST_STREAM_UNLOCK (pad);
673
674   gst_buffer_unref (buffer);
675
676   return GST_FLOW_OK;
677
678 out_flushing:
679   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
680   GST_QUEUE_MUTEX_UNLOCK;
681   gst_task_pause (GST_RPAD_TASK (queue->srcpad));
682   GST_STREAM_UNLOCK (pad);
683
684   gst_buffer_unref (buffer);
685
686   return GST_FLOW_UNEXPECTED;
687 }
688
689 static void
690 gst_queue_loop (GstPad * pad)
691 {
692   GstQueue *queue;
693   GstData *data;
694   gboolean restart = TRUE;
695
696   queue = GST_QUEUE (GST_PAD_PARENT (pad));
697
698   GST_STREAM_LOCK (pad);
699
700   /* have to lock for thread-safety */
701   GST_QUEUE_MUTEX_LOCK;
702
703 restart:
704   while (gst_queue_is_empty (queue)) {
705     GST_QUEUE_MUTEX_UNLOCK;
706     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
707     GST_QUEUE_MUTEX_LOCK;
708
709     STATUS (queue, "pre-empty wait");
710     while (gst_queue_is_empty (queue)) {
711       STATUS (queue, "waiting for item_add");
712
713       /* we are flushing */
714       if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
715         goto out_flushing;
716
717       GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
718           g_thread_self ());
719       g_cond_wait (queue->item_add, queue->qlock);
720
721       /* we got unlocked because we are flushing */
722       if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
723         goto out_flushing;
724
725       GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
726           g_thread_self ());
727       STATUS (queue, "got item_add signal");
728     }
729
730     STATUS (queue, "post-empty wait");
731     GST_QUEUE_MUTEX_UNLOCK;
732     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
733     GST_QUEUE_MUTEX_LOCK;
734   }
735
736   /* There's something in the list now, whatever it is */
737   data = g_queue_pop_head (queue->queue);
738   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
739       "retrieved data %p from queue", data);
740
741   if (GST_IS_BUFFER (data)) {
742     GstFlowReturn result;
743
744     /* Update statistics */
745     queue->cur_level.buffers--;
746     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
747     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
748       queue->cur_level.time -= GST_BUFFER_DURATION (data);
749
750     GST_QUEUE_MUTEX_UNLOCK;
751     result = gst_pad_push (pad, GST_BUFFER (data));
752     GST_QUEUE_MUTEX_LOCK;
753     if (result != GST_FLOW_OK) {
754       gst_task_pause (GST_RPAD_TASK (queue->srcpad));
755     }
756   } else {
757     if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
758       gst_task_pause (GST_RPAD_TASK (queue->srcpad));
759       restart = FALSE;
760     }
761     GST_QUEUE_MUTEX_UNLOCK;
762     gst_pad_push_event (queue->srcpad, GST_EVENT (data));
763     GST_QUEUE_MUTEX_LOCK;
764     if (restart == TRUE)
765       goto restart;
766   }
767
768   STATUS (queue, "after _get()");
769
770   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
771   g_cond_signal (queue->item_del);
772   GST_QUEUE_MUTEX_UNLOCK;
773   GST_STREAM_UNLOCK (pad);
774   return;
775
776 out_flushing:
777   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
778   gst_task_pause (GST_RPAD_TASK (pad));
779   GST_QUEUE_MUTEX_UNLOCK;
780   GST_STREAM_UNLOCK (pad);
781   return;
782 }
783
784
785 static gboolean
786 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
787 {
788   GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
789   gboolean res = TRUE;
790
791   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
792       event, GST_EVENT_TYPE (event));
793
794   gst_event_ref (event);
795   res = gst_pad_event_default (pad, event);
796   GST_QUEUE_MUTEX_LOCK;
797
798   switch (GST_EVENT_TYPE (event)) {
799     case GST_EVENT_SEEK:
800       if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
801         gst_queue_locked_flush (queue);
802       }
803     default:
804       break;
805   }
806   GST_QUEUE_MUTEX_UNLOCK;
807   gst_event_unref (event);
808
809   return res;
810 }
811
812 static gboolean
813 gst_queue_handle_src_query (GstPad * pad,
814     GstQueryType type, GstFormat * fmt, gint64 * value)
815 {
816   GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
817
818   if (!GST_PAD_PEER (queue->sinkpad))
819     return FALSE;
820   if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), type, fmt, value))
821     return FALSE;
822
823   if (type == GST_QUERY_POSITION) {
824     /* FIXME: this code assumes that there's no discont in the queue */
825     switch (*fmt) {
826       case GST_FORMAT_BYTES:
827         *value -= queue->cur_level.bytes;
828         break;
829       case GST_FORMAT_TIME:
830         *value -= queue->cur_level.time;
831         break;
832       default:
833         /* FIXME */
834         break;
835     }
836   }
837
838   return TRUE;
839 }
840
841 static gboolean
842 gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
843 {
844   gboolean result = FALSE;
845   GstQueue *queue;
846
847   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
848
849   if (mode == GST_ACTIVATE_PUSH) {
850     /* if we have a scheduler we can start the task */
851     if (GST_ELEMENT_SCHEDULER (queue)) {
852       GST_STREAM_LOCK (pad);
853       GST_RPAD_TASK (pad) =
854           gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue),
855           (GstTaskFunction) gst_queue_loop, pad);
856
857       gst_task_start (GST_RPAD_TASK (pad));
858       GST_STREAM_UNLOCK (pad);
859       result = TRUE;
860     }
861   } else {
862     /* step 1, unblock chain and loop functions */
863     queue->interrupt = TRUE;
864     g_cond_signal (queue->item_add);
865     g_cond_signal (queue->item_del);
866
867     /* step 2, make sure streaming finishes */
868     GST_STREAM_LOCK (pad);
869     /* step 3, stop the task */
870     gst_task_stop (GST_RPAD_TASK (pad));
871     gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
872     GST_STREAM_UNLOCK (pad);
873
874     result = TRUE;
875   }
876   return result;
877 }
878
879
880 static GstElementStateReturn
881 gst_queue_change_state (GstElement * element)
882 {
883   GstQueue *queue;
884   GstElementStateReturn ret = GST_STATE_SUCCESS;
885
886   queue = GST_QUEUE (element);
887
888   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
889
890   /* lock the queue so another thread (not in sync with this thread's state)
891    * can't call this queue's _loop (or whatever) */
892   GST_QUEUE_MUTEX_LOCK;
893
894   switch (GST_STATE_TRANSITION (element)) {
895     case GST_STATE_NULL_TO_READY:
896       gst_queue_locked_flush (queue);
897       break;
898     case GST_STATE_READY_TO_PAUSED:
899       break;
900     case GST_STATE_PAUSED_TO_PLAYING:
901       queue->interrupt = FALSE;
902       break;
903     default:
904       break;
905   }
906
907   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
908
909   switch (GST_STATE_TRANSITION (element)) {
910     case GST_STATE_PLAYING_TO_PAUSED:
911       break;
912     case GST_STATE_PAUSED_TO_READY:
913       gst_queue_locked_flush (queue);
914       break;
915     case GST_STATE_READY_TO_NULL:
916       break;
917     default:
918       break;
919   }
920   GST_QUEUE_MUTEX_UNLOCK;
921
922   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
923
924   return ret;
925 }
926
927 static void
928 gst_queue_set_property (GObject * object,
929     guint prop_id, const GValue * value, GParamSpec * pspec)
930 {
931   GstQueue *queue = GST_QUEUE (object);
932
933   /* someone could change levels here, and since this
934    * affects the get/put funcs, we need to lock for safety. */
935   GST_QUEUE_MUTEX_LOCK;
936
937   switch (prop_id) {
938     case ARG_MAX_SIZE_BYTES:
939       queue->max_size.bytes = g_value_get_uint (value);
940       break;
941     case ARG_MAX_SIZE_BUFFERS:
942       queue->max_size.buffers = g_value_get_uint (value);
943       break;
944     case ARG_MAX_SIZE_TIME:
945       queue->max_size.time = g_value_get_uint64 (value);
946       break;
947     case ARG_MIN_THRESHOLD_BYTES:
948       queue->min_threshold.bytes = g_value_get_uint (value);
949       break;
950     case ARG_MIN_THRESHOLD_BUFFERS:
951       queue->min_threshold.buffers = g_value_get_uint (value);
952       break;
953     case ARG_MIN_THRESHOLD_TIME:
954       queue->min_threshold.time = g_value_get_uint64 (value);
955       break;
956     case ARG_LEAKY:
957       queue->leaky = g_value_get_enum (value);
958       break;
959     case ARG_MAY_DEADLOCK:
960       queue->may_deadlock = g_value_get_boolean (value);
961       break;
962     case ARG_BLOCK_TIMEOUT:
963       queue->block_timeout = g_value_get_uint64 (value);
964       break;
965     default:
966       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
967       break;
968   }
969
970   GST_QUEUE_MUTEX_UNLOCK;
971 }
972
973 static void
974 gst_queue_get_property (GObject * object,
975     guint prop_id, GValue * value, GParamSpec * pspec)
976 {
977   GstQueue *queue = GST_QUEUE (object);
978
979   switch (prop_id) {
980     case ARG_CUR_LEVEL_BYTES:
981       g_value_set_uint (value, queue->cur_level.bytes);
982       break;
983     case ARG_CUR_LEVEL_BUFFERS:
984       g_value_set_uint (value, queue->cur_level.buffers);
985       break;
986     case ARG_CUR_LEVEL_TIME:
987       g_value_set_uint64 (value, queue->cur_level.time);
988       break;
989     case ARG_MAX_SIZE_BYTES:
990       g_value_set_uint (value, queue->max_size.bytes);
991       break;
992     case ARG_MAX_SIZE_BUFFERS:
993       g_value_set_uint (value, queue->max_size.buffers);
994       break;
995     case ARG_MAX_SIZE_TIME:
996       g_value_set_uint64 (value, queue->max_size.time);
997       break;
998     case ARG_MIN_THRESHOLD_BYTES:
999       g_value_set_uint (value, queue->min_threshold.bytes);
1000       break;
1001     case ARG_MIN_THRESHOLD_BUFFERS:
1002       g_value_set_uint (value, queue->min_threshold.buffers);
1003       break;
1004     case ARG_MIN_THRESHOLD_TIME:
1005       g_value_set_uint64 (value, queue->min_threshold.time);
1006       break;
1007     case ARG_LEAKY:
1008       g_value_set_enum (value, queue->leaky);
1009       break;
1010     case ARG_MAY_DEADLOCK:
1011       g_value_set_boolean (value, queue->may_deadlock);
1012       break;
1013     case ARG_BLOCK_TIMEOUT:
1014       g_value_set_uint64 (value, queue->block_timeout);
1015       break;
1016     default:
1017       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1018       break;
1019   }
1020 }