gst/elements/gstfilesrc.c: compute mapsize correctly
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *
6  * gstqueue.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24
25 #include "gst_private.h"
26
27 #include "gstqueue.h"
28 #include "gstscheduler.h"
29 #include "gstevent.h"
30 #include "gstinfo.h"
31 #include "gsterror.h"
32
33 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
34
35 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
36     "Generic",
37     "Simple data queue",
38     "Erik Walthinsen <omega@cse.ogi.edu>");
39
40
41 /* Queue signals and args */
42 enum
43 {
44   SIGNAL_UNDERRUN,
45   SIGNAL_RUNNING,
46   SIGNAL_OVERRUN,
47   LAST_SIGNAL
48 };
49
50 enum
51 {
52   ARG_0,
53   /* FIXME: don't we have another way of doing this
54    * "Gstreamer format" (frame/byte/time) queries? */
55   ARG_CUR_LEVEL_BUFFERS,
56   ARG_CUR_LEVEL_BYTES,
57   ARG_CUR_LEVEL_TIME,
58   ARG_MAX_SIZE_BUFFERS,
59   ARG_MAX_SIZE_BYTES,
60   ARG_MAX_SIZE_TIME,
61   ARG_MIN_THRESHOLD_BUFFERS,
62   ARG_MIN_THRESHOLD_BYTES,
63   ARG_MIN_THRESHOLD_TIME,
64   ARG_LEAKY,
65   ARG_MAY_DEADLOCK,
66   ARG_BLOCK_TIMEOUT
67       /* FILL ME */
68 };
69
70 typedef struct _GstQueueEventResponse
71 {
72   GstEvent *event;
73   gboolean ret, handled;
74 }
75 GstQueueEventResponse;
76
77 static void gst_queue_base_init (GstQueueClass * klass);
78 static void gst_queue_class_init (GstQueueClass * klass);
79 static void gst_queue_init (GstQueue * queue);
80 static void gst_queue_dispose (GObject * object);
81
82 static void gst_queue_set_property (GObject * object,
83     guint prop_id, const GValue * value, GParamSpec * pspec);
84 static void gst_queue_get_property (GObject * object,
85     guint prop_id, GValue * value, GParamSpec * pspec);
86
87 static void gst_queue_chain (GstPad * pad, GstData * data);
88 static GstData *gst_queue_get (GstPad * pad);
89
90 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
91
92 static GstCaps *gst_queue_getcaps (GstPad * pad);
93 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
94 static void gst_queue_locked_flush (GstQueue * queue);
95
96 static GstElementStateReturn gst_queue_change_state (GstElement * element);
97 static gboolean gst_queue_release_locks (GstElement * element);
98
99
100 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
101
102 static GType
103 queue_leaky_get_type (void)
104 {
105   static GType queue_leaky_type = 0;
106   static GEnumValue queue_leaky[] = {
107     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
108     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
109     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
110     {0, NULL, NULL},
111   };
112
113   if (!queue_leaky_type) {
114     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
115   }
116   return queue_leaky_type;
117 }
118
119 static GstElementClass *parent_class = NULL;
120 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
121
122 GType
123 gst_queue_get_type (void)
124 {
125   static GType queue_type = 0;
126
127   if (!queue_type) {
128     static const GTypeInfo queue_info = {
129       sizeof (GstQueueClass),
130       (GBaseInitFunc) gst_queue_base_init,
131       NULL,
132       (GClassInitFunc) gst_queue_class_init,
133       NULL,
134       NULL,
135       sizeof (GstQueue),
136       0,
137       (GInstanceInitFunc) gst_queue_init,
138       NULL
139     };
140
141     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
142         "GstQueue", &queue_info, 0);
143     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0,
144         "dataflow inside the queue element");
145   }
146
147   return queue_type;
148 }
149
150 static void
151 gst_queue_base_init (GstQueueClass * klass)
152 {
153   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
154
155   gst_element_class_set_details (gstelement_class, &gst_queue_details);
156 }
157
158 static void
159 gst_queue_class_init (GstQueueClass * klass)
160 {
161   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
162   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
163
164   parent_class = g_type_class_peek_parent (klass);
165
166   /* signals */
167   gst_queue_signals[SIGNAL_UNDERRUN] =
168       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
169       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
170       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
171   gst_queue_signals[SIGNAL_RUNNING] =
172       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
173       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
174       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
175   gst_queue_signals[SIGNAL_OVERRUN] =
176       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
177       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
178       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
179
180   /* properties */
181   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
182       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
183           "Current amount of data in the queue (bytes)",
184           0, G_MAXUINT, 0, G_PARAM_READABLE));
185   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
186       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
187           "Current number of buffers in the queue",
188           0, G_MAXUINT, 0, G_PARAM_READABLE));
189   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
190       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
191           "Current amount of data in the queue (in ns)",
192           0, G_MAXUINT64, 0, G_PARAM_READABLE));
193
194   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
195       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
196           "Max. amount of data in the queue (bytes, 0=disable)",
197           0, G_MAXUINT, 0, G_PARAM_READWRITE));
198   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
199       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
200           "Max. number of buffers in the queue (0=disable)",
201           0, G_MAXUINT, 0, G_PARAM_READWRITE));
202   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of data in the queue (in ns, 0=disable)",
205           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
206
207   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
208       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
209           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
210           0, G_MAXUINT, 0, G_PARAM_READWRITE));
211   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
212       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
213           "Min. number of buffers in the queue to allow reading (0=disable)",
214           0, G_MAXUINT, 0, G_PARAM_READWRITE));
215   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
216       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
217           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
218           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
219
220   g_object_class_install_property (gobject_class, ARG_LEAKY,
221       g_param_spec_enum ("leaky", "Leaky",
222           "Where the queue leaks, if at all",
223           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
224   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
225       g_param_spec_boolean ("may_deadlock", "May Deadlock",
226           "The queue may deadlock if it's full and not PLAYING",
227           TRUE, G_PARAM_READWRITE));
228   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
229       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
230           "Nanoseconds until blocked queue times out and returns filler event. "
231           "Value of -1 disables timeout",
232           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
233
234   /* set several parent class virtual functions */
235   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
236   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
237   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
238
239   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
240   gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
241 }
242
243 static void
244 gst_queue_init (GstQueue * queue)
245 {
246   /* scheduling on this kind of element is, well, interesting */
247   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
248   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
249
250   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
251   gst_pad_set_chain_function (queue->sinkpad,
252       GST_DEBUG_FUNCPTR (gst_queue_chain));
253   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
254   gst_pad_set_link_function (queue->sinkpad,
255       GST_DEBUG_FUNCPTR (gst_queue_link));
256   gst_pad_set_getcaps_function (queue->sinkpad,
257       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
258   gst_pad_set_active (queue->sinkpad, TRUE);
259
260   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
261   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
262   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
263   gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
264   gst_pad_set_getcaps_function (queue->srcpad,
265       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
266   gst_pad_set_event_function (queue->srcpad,
267       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
268   gst_pad_set_active (queue->srcpad, TRUE);
269
270   queue->cur_level.buffers = 0; /* no content */
271   queue->cur_level.bytes = 0;   /* no content */
272   queue->cur_level.time = 0;    /* no content */
273   queue->max_size.buffers = 100;        /* 100 buffers */
274   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
275   queue->max_size.time = GST_SECOND;    /* 1 s. */
276   queue->min_threshold.buffers = 0;     /* no threshold */
277   queue->min_threshold.bytes = 0;       /* no threshold */
278   queue->min_threshold.time = 0;        /* no threshold */
279
280   queue->leaky = GST_QUEUE_NO_LEAK;
281   queue->may_deadlock = TRUE;
282   queue->block_timeout = GST_CLOCK_TIME_NONE;
283   queue->interrupt = FALSE;
284   queue->flush = FALSE;
285
286   queue->qlock = g_mutex_new ();
287   queue->item_add = g_cond_new ();
288   queue->item_del = g_cond_new ();
289   queue->event_done = g_cond_new ();
290   queue->events = g_queue_new ();
291   queue->queue = g_queue_new ();
292
293   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
294       "initialized queue's not_empty & not_full conditions");
295 }
296
297 static void
298 gst_queue_dispose (GObject * object)
299 {
300   GstQueue *queue = GST_QUEUE (object);
301
302   gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL);
303
304   while (!g_queue_is_empty (queue->queue)) {
305     GstData *data = g_queue_pop_head (queue->queue);
306
307     gst_data_unref (data);
308   }
309   g_queue_free (queue->queue);
310   g_mutex_free (queue->qlock);
311   g_cond_free (queue->item_add);
312   g_cond_free (queue->item_del);
313   g_cond_free (queue->event_done);
314   while (!g_queue_is_empty (queue->events)) {
315     GstEvent *event = g_queue_pop_head (queue->events);
316
317     gst_event_unref (event);
318   }
319
320   if (G_OBJECT_CLASS (parent_class)->dispose)
321     G_OBJECT_CLASS (parent_class)->dispose (object);
322 }
323
324 static GstCaps *
325 gst_queue_getcaps (GstPad * pad)
326 {
327   GstQueue *queue;
328
329   queue = GST_QUEUE (gst_pad_get_parent (pad));
330
331   if (queue->cur_level.bytes > 0) {
332     return gst_caps_copy (queue->negotiated_caps);
333   }
334
335   return gst_pad_proxy_getcaps (pad);
336 }
337
338 static GstPadLinkReturn
339 gst_queue_link (GstPad * pad, const GstCaps * caps)
340 {
341   GstQueue *queue;
342   GstPadLinkReturn link_ret;
343
344   queue = GST_QUEUE (gst_pad_get_parent (pad));
345
346   if (queue->cur_level.bytes > 0) {
347     if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
348       return GST_PAD_LINK_OK;
349     }
350     return GST_PAD_LINK_REFUSED;
351   }
352
353   link_ret = gst_pad_proxy_pad_link (pad, caps);
354
355   if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
356     /* we store an extra copy of the negotiated caps, just in case
357      * the pads become unnegotiated while we have buffers */
358     gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
359   }
360
361   return link_ret;
362 }
363
364 static void
365 gst_queue_locked_flush (GstQueue * queue)
366 {
367   while (!g_queue_is_empty (queue->queue)) {
368     GstData *data = g_queue_pop_head (queue->queue);
369
370     /* First loose the reference we added when putting that data in the queue */
371     gst_data_unref (data);
372     /* Then loose another reference because we are supposed to destroy that
373        data when flushing */
374     gst_data_unref (data);
375   }
376   queue->timeval = NULL;
377   queue->cur_level.buffers = 0;
378   queue->cur_level.bytes = 0;
379   queue->cur_level.time = 0;
380
381   /* make sure any pending buffers to be added are flushed too */
382   queue->flush = TRUE;
383
384   /* we deleted something... */
385   g_cond_signal (queue->item_del);
386 }
387
388 static void
389 gst_queue_handle_pending_events (GstQueue * queue)
390 {
391   /* check for events to send upstream */
392   while (!g_queue_is_empty (queue->events)) {
393     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
394
395     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream");
396     er->ret = gst_pad_event_default (queue->srcpad, er->event);
397     er->handled = TRUE;
398     g_cond_signal (queue->event_done);
399     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
400   }
401 }
402
403 #define STATUS(queue, msg) \
404   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
405                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
406                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
407                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
408                       GST_DEBUG_PAD_NAME (pad), \
409                       queue->cur_level.buffers, \
410                       queue->min_threshold.buffers, \
411                       queue->max_size.buffers, \
412                       queue->cur_level.bytes, \
413                       queue->min_threshold.bytes, \
414                       queue->max_size.bytes, \
415                       queue->cur_level.time, \
416                       queue->min_threshold.time, \
417                       queue->max_size.time, \
418                       queue->queue->length)
419
420 static void
421 gst_queue_chain (GstPad * pad, GstData * data)
422 {
423   GstQueue *queue;
424
425   g_return_if_fail (pad != NULL);
426   g_return_if_fail (GST_IS_PAD (pad));
427   g_return_if_fail (data != NULL);
428
429   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
430
431 restart:
432   /* we have to lock the queue since we span threads */
433   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locking t:%p", g_thread_self ());
434   g_mutex_lock (queue->qlock);
435   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locked t:%p", g_thread_self ());
436
437   gst_queue_handle_pending_events (queue);
438
439   /* assume don't need to flush this buffer when the queue is filled */
440   queue->flush = FALSE;
441
442   if (GST_IS_EVENT (data)) {
443     switch (GST_EVENT_TYPE (data)) {
444       case GST_EVENT_FLUSH:
445         STATUS (queue, "received flush event");
446         gst_queue_locked_flush (queue);
447         STATUS (queue, "after flush");
448         break;
449       case GST_EVENT_EOS:
450         STATUS (queue, "received EOS");
451         break;
452       default:
453         /* we put the event in the queue, we don't have to act ourselves */
454         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
455             "adding event %p of type %d", data, GST_EVENT_TYPE (data));
456         break;
457     }
458   }
459
460   if (GST_IS_BUFFER (data))
461     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
462         "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
463
464   /* We make space available if we're "full" according to whatever
465    * the user defined as "full". Note that this only applies to buffers.
466    * We always handle events and they don't count in our statistics. */
467   if (GST_IS_BUFFER (data) &&
468       ((queue->max_size.buffers > 0 &&
469               queue->cur_level.buffers >= queue->max_size.buffers) ||
470           (queue->max_size.bytes > 0 &&
471               queue->cur_level.bytes >= queue->max_size.bytes) ||
472           (queue->max_size.time > 0 &&
473               queue->cur_level.time >= queue->max_size.time))) {
474     g_mutex_unlock (queue->qlock);
475     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
476     g_mutex_lock (queue->qlock);
477
478     /* how are we going to make space for this buffer? */
479     switch (queue->leaky) {
480         /* leak current buffer */
481       case GST_QUEUE_LEAK_UPSTREAM:
482         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
483             "queue is full, leaking buffer on upstream end");
484         /* now we can clean up and exit right away */
485         g_mutex_unlock (queue->qlock);
486         goto out_unref;
487
488         /* leak first buffer in the queue */
489       case GST_QUEUE_LEAK_DOWNSTREAM:{
490         /* this is a bit hacky. We'll manually iterate the list
491          * and find the first buffer from the head on. We'll
492          * unref that and "fix up" the GQueue object... */
493         GList *item;
494         GstData *leak = NULL;
495
496         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
497             "queue is full, leaking buffer on downstream end");
498
499         for (item = queue->queue->head; item != NULL; item = item->next) {
500           if (GST_IS_BUFFER (item->data)) {
501             leak = item->data;
502             break;
503           }
504         }
505
506         /* if we didn't find anything, it means we have no buffers
507          * in here. That cannot happen, since we had >= 1 bufs */
508         g_assert (leak);
509
510         /* Now remove it from the list, fixing up the GQueue
511          * CHECKME: is a queue->head the first or the last item? */
512         item = g_list_delete_link (queue->queue->head, item);
513         queue->queue->head = g_list_first (item);
514         queue->queue->tail = g_list_last (item);
515         queue->queue->length--;
516
517         /* and unref the data at the end. Twice, because we keep a ref
518          * to make things read-only. Also keep our list uptodate. */
519         queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
520         queue->cur_level.buffers--;
521         if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
522           queue->cur_level.time -= GST_BUFFER_DURATION (data);
523
524         gst_data_unref (data);
525         gst_data_unref (data);
526         break;
527       }
528
529       default:
530         g_warning ("Unknown leaky type, using default");
531         /* fall-through */
532
533         /* don't leak. Instead, wait for space to be available */
534       case GST_QUEUE_NO_LEAK:
535         STATUS (queue, "pre-full wait");
536
537         while ((queue->max_size.buffers > 0 &&
538                 queue->cur_level.buffers >= queue->max_size.buffers) ||
539             (queue->max_size.bytes > 0 &&
540                 queue->cur_level.bytes >= queue->max_size.bytes) ||
541             (queue->max_size.time > 0 &&
542                 queue->cur_level.time >= queue->max_size.time)) {
543           /* if there's a pending state change for this queue
544            * or its manager, switch back to iterator so bottom
545            * half of state change executes */
546           if (queue->interrupt) {
547             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
548             g_mutex_unlock (queue->qlock);
549             if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
550                     GST_ELEMENT (queue))) {
551               goto out_unref;
552             }
553             /* if we got here because we were unlocked after a
554              * flush, we don't need to add the buffer to the
555              * queue again */
556             if (queue->flush) {
557               GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
558                   "not adding pending buffer after flush");
559               goto out_unref;
560             }
561             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
562                 "adding pending buffer after interrupt");
563             goto restart;
564           }
565
566           if (GST_STATE (queue) != GST_STATE_PLAYING) {
567             /* this means the other end is shut down. Try to
568              * signal to resolve the error */
569             if (!queue->may_deadlock) {
570               g_mutex_unlock (queue->qlock);
571               gst_data_unref (data);
572               GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
573                   ("deadlock found, shutting down source pad elements"));
574               /* we don't go to out_unref here, since we want to
575                * unref the buffer *before* calling GST_ELEMENT_ERROR */
576               return;
577             } else {
578               GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
579                   "%s: waiting for the app to restart "
580                   "source pad elements", GST_ELEMENT_NAME (queue));
581             }
582           }
583
584           /* OK, we've got a serious issue here. Imagine the situation
585            * where the puller (next element) is sending an event here,
586            * so it cannot pull events from the queue, and we cannot
587            * push data further because the queue is 'full' and therefore,
588            * we wait here (and do not handle events): deadlock! to solve
589            * that, we handle pending upstream events here, too. */
590           gst_queue_handle_pending_events (queue);
591
592           STATUS (queue, "waiting for item_del signal");
593           g_cond_wait (queue->item_del, queue->qlock);
594           STATUS (queue, "received item_del signal");
595         }
596
597         STATUS (queue, "post-full wait");
598         g_mutex_unlock (queue->qlock);
599         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
600         g_mutex_lock (queue->qlock);
601         break;
602     }
603   }
604
605   /* put the buffer on the tail of the list. We keep a reference,
606    * so that the data is read-only while in here. There's a good
607    * reason to do so: we have a size and time counter, and any
608    * modification to the content could change any of the two. */
609   gst_data_ref (data);
610   g_queue_push_tail (queue->queue, data);
611
612   /* Note that we only add buffers (not events) to the statistics */
613   if (GST_IS_BUFFER (data)) {
614     queue->cur_level.buffers++;
615     queue->cur_level.bytes += GST_BUFFER_SIZE (data);
616     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
617       queue->cur_level.time += GST_BUFFER_DURATION (data);
618   }
619
620   STATUS (queue, "+ level");
621
622   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
623   g_cond_signal (queue->item_add);
624   g_mutex_unlock (queue->qlock);
625
626   return;
627
628 out_unref:
629   gst_data_unref (data);
630   return;
631 }
632
633 static GstData *
634 gst_queue_get (GstPad * pad)
635 {
636   GstQueue *queue;
637   GstData *data;
638
639   g_return_val_if_fail (pad != NULL, NULL);
640   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
641
642   queue = GST_QUEUE (gst_pad_get_parent (pad));
643
644 restart:
645   /* have to lock for thread-safety */
646   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locking t:%p", g_thread_self ());
647   g_mutex_lock (queue->qlock);
648   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locked t:%p", g_thread_self ());
649
650   if (queue->queue->length == 0 ||
651       (queue->min_threshold.buffers > 0 &&
652           queue->cur_level.buffers < queue->min_threshold.buffers) ||
653       (queue->min_threshold.bytes > 0 &&
654           queue->cur_level.bytes < queue->min_threshold.bytes) ||
655       (queue->min_threshold.time > 0 &&
656           queue->cur_level.time < queue->min_threshold.time)) {
657     g_mutex_unlock (queue->qlock);
658     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
659     g_mutex_lock (queue->qlock);
660
661     STATUS (queue, "pre-empty wait");
662     while (queue->queue->length == 0 ||
663         (queue->min_threshold.buffers > 0 &&
664             queue->cur_level.buffers < queue->min_threshold.buffers) ||
665         (queue->min_threshold.bytes > 0 &&
666             queue->cur_level.bytes < queue->min_threshold.bytes) ||
667         (queue->min_threshold.time > 0 &&
668             queue->cur_level.time < queue->min_threshold.time)) {
669       /* if there's a pending state change for this queue or its
670        * manager, switch back to iterator so bottom half of state
671        * change executes. */
672       if (queue->interrupt) {
673         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
674         g_mutex_unlock (queue->qlock);
675         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
676                 GST_ELEMENT (queue)))
677           return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
678         goto restart;
679       }
680       if (GST_STATE (queue) != GST_STATE_PLAYING) {
681         /* this means the other end is shut down */
682         if (!queue->may_deadlock) {
683           g_mutex_unlock (queue->qlock);
684           GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
685               ("deadlock found, shutting down sink pad elements"));
686           goto restart;
687         } else {
688           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
689               "%s: waiting for the app to restart "
690               "source pad elements", GST_ELEMENT_NAME (queue));
691         }
692       }
693
694       STATUS (queue, "waiting for item_add");
695
696       if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
697         GTimeVal timeout;
698
699         g_get_current_time (&timeout);
700         g_time_val_add (&timeout, queue->block_timeout / 1000);
701         if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
702           g_mutex_unlock (queue->qlock);
703           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
704               "Sending filler event");
705           return GST_DATA (gst_event_new_filler ());
706         }
707       } else {
708         g_cond_wait (queue->item_add, queue->qlock);
709       }
710       STATUS (queue, "got item_add signal");
711     }
712
713     STATUS (queue, "post-empty wait");
714     g_mutex_unlock (queue->qlock);
715     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
716     g_mutex_lock (queue->qlock);
717   }
718
719   /* There's something in the list now, whatever it is */
720   data = g_queue_pop_head (queue->queue);
721   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
722       "retrieved data %p from queue", data);
723
724   if (data == NULL)
725     return NULL;
726
727   if (GST_IS_BUFFER (data)) {
728     /* Update statistics */
729     queue->cur_level.buffers--;
730     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
731     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
732       queue->cur_level.time -= GST_BUFFER_DURATION (data);
733   }
734
735   /* Now that we're done, we can lose our own reference to
736    * the item, since we're no longer in danger. */
737   gst_data_unref (data);
738
739   STATUS (queue, "after _get()");
740
741   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
742   g_cond_signal (queue->item_del);
743   g_mutex_unlock (queue->qlock);
744
745   /* FIXME: I suppose this needs to be locked, since the EOS
746    * bit affects the pipeline state. However, that bit is
747    * locked too so it'd cause a deadlock. */
748   if (GST_IS_EVENT (data)) {
749     GstEvent *event = GST_EVENT (data);
750
751     switch (GST_EVENT_TYPE (event)) {
752       case GST_EVENT_EOS:
753         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
754             "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
755         gst_element_set_eos (GST_ELEMENT (queue));
756         break;
757       default:
758         break;
759     }
760   }
761
762   return data;
763 }
764
765
766 static gboolean
767 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
768 {
769   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
770   gboolean res;
771
772   g_mutex_lock (queue->qlock);
773
774   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
775     GstQueueEventResponse er;
776
777     /* push the event to the queue and wait for upstream consumption */
778     er.event = event;
779     er.handled = FALSE;
780     g_queue_push_tail (queue->events, &er);
781     GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
782         "Preparing for loop for event handler");
783     /* see the chain function on why this is here - it prevents a deadlock */
784     g_cond_signal (queue->item_del);
785     while (!er.handled) {
786       GTimeVal timeout;
787
788       g_get_current_time (&timeout);
789       g_time_val_add (&timeout, 500 * 1000);    /* half a second */
790       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
791           !er.handled) {
792         GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
793             "timeout in upstream event handling");
794         /* remove ourselves from the pending list. Since we're
795          * locked, others cannot reference this anymore. */
796         queue->queue->head = g_list_remove (queue->queue->head, &er);
797         queue->queue->head = g_list_first (queue->queue->head);
798         queue->queue->tail = g_list_last (queue->queue->head);
799         queue->queue->length--;
800         res = FALSE;
801         goto handled;
802       }
803     }
804     GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
805     res = er.ret;
806   } else {
807     res = gst_pad_event_default (pad, event);
808
809     switch (GST_EVENT_TYPE (event)) {
810       case GST_EVENT_FLUSH:
811         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
812             "FLUSH event, flushing queue\n");
813         gst_queue_locked_flush (queue);
814         break;
815       case GST_EVENT_SEEK:
816         if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
817           gst_queue_locked_flush (queue);
818         }
819       default:
820         break;
821     }
822   }
823 handled:
824   g_mutex_unlock (queue->qlock);
825
826   return res;
827 }
828
829 static gboolean
830 gst_queue_release_locks (GstElement * element)
831 {
832   GstQueue *queue;
833
834   queue = GST_QUEUE (element);
835
836   g_mutex_lock (queue->qlock);
837   queue->interrupt = TRUE;
838   g_cond_signal (queue->item_add);
839   g_cond_signal (queue->item_del);
840   g_mutex_unlock (queue->qlock);
841
842   return TRUE;
843 }
844
845 static GstElementStateReturn
846 gst_queue_change_state (GstElement * element)
847 {
848   GstQueue *queue;
849   GstElementStateReturn ret = GST_STATE_SUCCESS;
850
851   queue = GST_QUEUE (element);
852
853   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
854
855   /* lock the queue so another thread (not in sync with this thread's state)
856    * can't call this queue's _get (or whatever)
857    */
858   g_mutex_lock (queue->qlock);
859
860   switch (GST_STATE_TRANSITION (element)) {
861     case GST_STATE_NULL_TO_READY:
862       gst_queue_locked_flush (queue);
863       break;
864     case GST_STATE_PAUSED_TO_PLAYING:
865       if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
866         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
867             "queue %s is not linked", GST_ELEMENT_NAME (queue));
868         /* FIXME can this be? */
869         g_cond_signal (queue->item_add);
870
871         ret = GST_STATE_FAILURE;
872         goto error;
873       } else {
874         GstScheduler *src_sched, *sink_sched;
875
876         src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
877         sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
878
879         if (src_sched == sink_sched) {
880           GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
881               "queue %s does not connect different schedulers",
882               GST_ELEMENT_NAME (queue));
883
884           g_warning ("queue %s does not connect different schedulers",
885               GST_ELEMENT_NAME (queue));
886
887           ret = GST_STATE_FAILURE;
888           goto error;
889         }
890       }
891       queue->interrupt = FALSE;
892       break;
893     case GST_STATE_PAUSED_TO_READY:
894       gst_queue_locked_flush (queue);
895       break;
896     default:
897       break;
898   }
899
900   if (GST_ELEMENT_CLASS (parent_class)->change_state)
901     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
902
903   /* this is an ugly hack to make sure our pads are always active.
904    * Reason for this is that pad activation for the queue element
905    * depends on 2 schedulers (ugh) */
906   gst_pad_set_active (queue->sinkpad, TRUE);
907   gst_pad_set_active (queue->srcpad, TRUE);
908
909 error:
910   g_mutex_unlock (queue->qlock);
911
912   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
913
914   return ret;
915 }
916
917
918 static void
919 gst_queue_set_property (GObject * object,
920     guint prop_id, const GValue * value, GParamSpec * pspec)
921 {
922   GstQueue *queue = GST_QUEUE (object);
923
924   /* someone could change levels here, and since this
925    * affects the get/put funcs, we need to lock for safety. */
926   g_mutex_lock (queue->qlock);
927
928   switch (prop_id) {
929     case ARG_MAX_SIZE_BYTES:
930       queue->max_size.bytes = g_value_get_uint (value);
931       break;
932     case ARG_MAX_SIZE_BUFFERS:
933       queue->max_size.buffers = g_value_get_uint (value);
934       break;
935     case ARG_MAX_SIZE_TIME:
936       queue->max_size.time = g_value_get_uint64 (value);
937       break;
938     case ARG_MIN_THRESHOLD_BYTES:
939       queue->min_threshold.bytes = g_value_get_uint (value);
940       break;
941     case ARG_MIN_THRESHOLD_BUFFERS:
942       queue->min_threshold.buffers = g_value_get_uint (value);
943       break;
944     case ARG_MIN_THRESHOLD_TIME:
945       queue->min_threshold.time = g_value_get_uint64 (value);
946       break;
947     case ARG_LEAKY:
948       queue->leaky = g_value_get_enum (value);
949       break;
950     case ARG_MAY_DEADLOCK:
951       queue->may_deadlock = g_value_get_boolean (value);
952       break;
953     case ARG_BLOCK_TIMEOUT:
954       queue->block_timeout = g_value_get_uint64 (value);
955       break;
956     default:
957       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
958       break;
959   }
960
961   g_mutex_unlock (queue->qlock);
962 }
963
964 static void
965 gst_queue_get_property (GObject * object,
966     guint prop_id, GValue * value, GParamSpec * pspec)
967 {
968   GstQueue *queue = GST_QUEUE (object);
969
970   switch (prop_id) {
971     case ARG_CUR_LEVEL_BYTES:
972       g_value_set_uint (value, queue->cur_level.bytes);
973       break;
974     case ARG_CUR_LEVEL_BUFFERS:
975       g_value_set_uint (value, queue->cur_level.buffers);
976       break;
977     case ARG_CUR_LEVEL_TIME:
978       g_value_set_uint64 (value, queue->cur_level.time);
979       break;
980     case ARG_MAX_SIZE_BYTES:
981       g_value_set_uint (value, queue->max_size.bytes);
982       break;
983     case ARG_MAX_SIZE_BUFFERS:
984       g_value_set_uint (value, queue->max_size.buffers);
985       break;
986     case ARG_MAX_SIZE_TIME:
987       g_value_set_uint64 (value, queue->max_size.time);
988       break;
989     case ARG_MIN_THRESHOLD_BYTES:
990       g_value_set_uint (value, queue->min_threshold.bytes);
991       break;
992     case ARG_MIN_THRESHOLD_BUFFERS:
993       g_value_set_uint (value, queue->min_threshold.buffers);
994       break;
995     case ARG_MIN_THRESHOLD_TIME:
996       g_value_set_uint64 (value, queue->min_threshold.time);
997       break;
998     case ARG_LEAKY:
999       g_value_set_enum (value, queue->leaky);
1000       break;
1001     case ARG_MAY_DEADLOCK:
1002       g_value_set_boolean (value, queue->may_deadlock);
1003       break;
1004     case ARG_BLOCK_TIMEOUT:
1005       g_value_set_uint64 (value, queue->block_timeout);
1006       break;
1007     default:
1008       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1009       break;
1010   }
1011 }