- Elements can now send events to their parent.
[platform/upstream/gstreamer.git] / gst / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstqueue.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /* #define DEBUG_ENABLED */
24 /* #define STATUS_ENABLED */
25 #ifdef STATUS_ENABLED
26 #define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
27 #else
28 #define STATUS(A)
29 #endif
30
31 #include <pthread.h>
32
33 #include "config.h"
34 #include "gst_private.h"
35
36 #include "gstqueue.h"
37 #include "gstscheduler.h"
38 #include "gstevent.h"
39
40 GstElementDetails gst_queue_details = {
41   "Queue",
42   "Connection",
43   "Simple data queue",
44   VERSION,
45   "Erik Walthinsen <omega@cse.ogi.edu>",
46   "(C) 1999",
47 };
48
49
50 /* Queue signals and args */
51 enum {
52   LOW_WATERMARK,
53   HIGH_WATERMARK,
54   LAST_SIGNAL
55 };
56
57 enum {
58   ARG_0,
59   ARG_LEVEL_BUFFERS,
60   ARG_LEVEL_BYTES,
61   ARG_LEVEL_TIME,
62   ARG_SIZE_BUFFERS,
63   ARG_SIZE_BYTES,
64   ARG_SIZE_TIME,
65   ARG_LEAKY,
66   ARG_LEVEL,
67   ARG_MAX_LEVEL,
68 };
69
70
71 static void                     gst_queue_class_init            (GstQueueClass *klass);
72 static void                     gst_queue_init                  (GstQueue *queue);
73
74 static void                     gst_queue_set_property          (GObject *object, guint prop_id, 
75                                                                  const GValue *value, GParamSpec *pspec);
76 static void                     gst_queue_get_property          (GObject *object, guint prop_id, 
77                                                                  GValue *value, GParamSpec *pspec);
78
79 static GstPadNegotiateReturn    gst_queue_handle_negotiate_src  (GstPad *pad, GstCaps **caps, gpointer *data);
80 static GstPadNegotiateReturn    gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data);
81 static void                     gst_queue_chain                 (GstPad *pad, GstBuffer *buf);
82 static GstBuffer *              gst_queue_get                   (GstPad *pad);
83 static GstBufferPool*           gst_queue_get_bufferpool        (GstPad *pad);
84         
85 static void                     gst_queue_locked_flush                  (GstQueue *queue);
86 static void                     gst_queue_flush                 (GstQueue *queue);
87
88 static GstElementStateReturn    gst_queue_change_state          (GstElement *element);
89
90   
91 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
92 static GType
93 queue_leaky_get_type(void) {
94   static GType queue_leaky_type = 0;
95   static GEnumValue queue_leaky[] = {
96     { GST_QUEUE_NO_LEAK,                "0", "Not Leaky" },
97     { GST_QUEUE_LEAK_UPSTREAM,          "1", "Leaky on Upstream" },
98     { GST_QUEUE_LEAK_DOWNSTREAM,        "2", "Leaky on Downstream" },
99     { 0, NULL, NULL },
100   };
101   if (!queue_leaky_type) {
102     queue_leaky_type = g_enum_register_static("GstQueueLeaky", queue_leaky);
103   }
104   return queue_leaky_type;
105 }
106
107 static GstElementClass *parent_class = NULL;
108 /* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */
109
110 GType
111 gst_queue_get_type(void) 
112 {
113   static GType queue_type = 0;
114
115   if (!queue_type) {
116     static const GTypeInfo queue_info = {
117       sizeof(GstQueueClass),
118       NULL,
119       NULL,
120       (GClassInitFunc)gst_queue_class_init,
121       NULL,
122       NULL,
123       sizeof(GstQueue),
124       4,
125       (GInstanceInitFunc)gst_queue_init,
126       NULL
127     };
128     queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0);
129   }
130   return queue_type;
131 }
132
133 static void
134 gst_queue_class_init (GstQueueClass *klass)
135 {
136   GObjectClass *gobject_class;
137   GstElementClass *gstelement_class;
138
139   gobject_class = (GObjectClass*)klass;
140   gstelement_class = (GstElementClass*)klass;
141
142   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
143
144   g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY,
145     g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.",
146                       GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE));
147   g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL,
148     g_param_spec_int("level","Level","How many buffers are in the queue.",
149                      0,G_MAXINT,0,G_PARAM_READABLE));
150   g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL,
151     g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.",
152                      0,G_MAXINT,100,G_PARAM_READWRITE));
153
154   gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property);
155   gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property);
156
157   gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
158 }
159
160 static void
161 gst_queue_init (GstQueue *queue)
162 {
163   /* scheduling on this kind of element is, well, interesting */
164   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
165   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
166
167   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
168   gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
169   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
170   gst_pad_set_negotiate_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_handle_negotiate_sink));
171   gst_pad_set_bufferpool_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_get_bufferpool));
172
173   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
174   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
175   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
176   gst_pad_set_negotiate_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_handle_negotiate_src));
177
178   queue->queue = NULL;
179   queue->level_buffers = 0;
180   queue->level_bytes = 0;
181   queue->level_time = 0LL;
182   queue->size_buffers = 100;            /* 100 buffers */
183   queue->size_bytes = 100 * 1024;       /* 100KB */
184   queue->size_time = 1000000000LL;      /* 1sec */
185
186   queue->qlock = g_mutex_new ();
187   queue->reader = FALSE;
188   queue->writer = FALSE;
189   queue->not_empty = g_cond_new ();
190   queue->not_full = g_cond_new ();
191   GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
192 }
193
194 static GstBufferPool*
195 gst_queue_get_bufferpool (GstPad *pad)
196 {
197   GstQueue *queue;
198
199   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
200
201   return gst_pad_get_bufferpool (queue->srcpad);
202 }
203
204 static GstPadNegotiateReturn
205 gst_queue_handle_negotiate_src (GstPad *pad, GstCaps **caps, gpointer *data)
206 {
207   GstQueue *queue;
208
209   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
210
211   return gst_pad_negotiate_proxy (pad, queue->sinkpad, caps);
212 }
213
214 static GstPadNegotiateReturn
215 gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data)
216 {
217   GstQueue *queue;
218
219   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
220
221   return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
222 }
223
224 static void
225 gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
226 {
227   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
228
229   if (GST_IS_BUFFER (data)) {
230     gst_buffer_unref (GST_BUFFER (data));
231   }
232 }
233
234 static void
235 gst_queue_locked_flush (GstQueue *queue)
236 {
237   g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
238                   (gpointer) queue);
239   g_slist_free (queue->queue);
240
241   queue->queue = NULL;
242   queue->level_buffers = 0;
243   queue->timeval = NULL;
244 }
245
246 static void
247 gst_queue_flush (GstQueue *queue)
248 {
249   g_mutex_lock (queue->qlock);
250   gst_queue_locked_flush (queue);
251   g_mutex_unlock (queue->qlock);
252 }
253
254
255 static void
256 gst_queue_chain (GstPad *pad, GstBuffer *buf)
257 {
258   GstQueue *queue;
259   gboolean reader;
260
261   g_return_if_fail (pad != NULL);
262   g_return_if_fail (GST_IS_PAD (pad));
263   g_return_if_fail (buf != NULL);
264
265   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
266
267   reader = FALSE;
268
269 restart:
270   /* we have to lock the queue since we span threads */
271   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
272   g_mutex_lock (queue->qlock);
273   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
274
275   if (GST_IS_EVENT (buf)) {
276     switch (GST_EVENT_TYPE (buf)) {
277       case GST_EVENT_FLUSH:
278         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
279         gst_queue_locked_flush (queue);
280         break;
281       case GST_EVENT_EOS:
282         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", 
283                            GST_ELEMENT_NAME (queue), queue->level_buffers);
284         break;
285       default:
286         gst_pad_event_default (pad, GST_EVENT (buf));
287         break;
288     }
289   }
290
291   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
292
293   if (queue->level_buffers == queue->size_buffers) {
294     /* if this is a leaky queue... */
295     if (queue->leaky) {
296       /* FIXME don't want to leak events! */
297       /* if we leak on the upstream side, drop the current buffer */
298       if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
299         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
300         if (GST_IS_EVENT (buf))
301           fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
302               GST_ELEMENT_NAME(GST_ELEMENT(queue)),
303               GST_EVENT_TYPE(GST_EVENT(buf)));
304           GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
305         gst_buffer_unref(buf);
306         /* now we have to clean up and exit right away */
307         g_mutex_unlock (queue->qlock);
308         return;
309       }
310       /* otherwise we have to push a buffer off the other end */
311       else {
312         GSList *front;
313         GstBuffer *leakbuf;
314         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
315         front = queue->queue;
316         leakbuf = (GstBuffer *)(front->data);
317         if (GST_IS_EVENT (leakbuf))
318           fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
319               GST_ELEMENT_NAME(GST_ELEMENT(queue)),
320               GST_EVENT_TYPE(GST_EVENT(leakbuf)));
321         queue->level_buffers--;
322         queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
323         gst_buffer_unref(leakbuf);
324         queue->queue = g_slist_remove_link (queue->queue, front);
325         g_slist_free (front);
326       }
327     }
328
329     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
330         queue->level_buffers, queue->size_buffers);
331     while (queue->level_buffers == queue->size_buffers) {
332       /* if there's a pending state change for this queue or its manager, switch */
333       /* back to iterator so bottom half of state change executes */
334       while (GST_STATE (queue) != GST_STATE_PLAYING) {
335         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
336         g_mutex_unlock (queue->qlock);
337         cothread_switch(cothread_current_main());
338         goto restart;
339       }
340       g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
341
342       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
343       if (queue->writer)
344         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
345       queue->writer = TRUE;
346       g_cond_wait (queue->not_full, queue->qlock);
347       queue->writer = FALSE;
348       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
349     }
350     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
351         queue->level_buffers, queue->size_buffers);
352   }
353
354   /* put the buffer on the tail of the list */
355   queue->queue = g_slist_append (queue->queue, buf);
356   queue->level_buffers++;
357   queue->level_bytes += GST_BUFFER_SIZE(buf);
358   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
359       GST_DEBUG_PAD_NAME(pad),
360       queue->level_buffers, queue->size_buffers);
361
362   /* reader waiting on an empty queue */
363   reader = queue->reader;
364
365   g_mutex_unlock (queue->qlock);
366
367   if (reader)
368   {
369     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
370     g_cond_signal (queue->not_empty);
371   }
372 }
373
374 static GstBuffer *
375 gst_queue_get (GstPad *pad)
376 {
377   GstQueue *queue;
378   GstBuffer *buf = NULL;
379   GSList *front;
380   gboolean writer;
381
382   g_assert(pad != NULL);
383   g_assert(GST_IS_PAD(pad));
384   g_return_val_if_fail (pad != NULL, NULL);
385   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
386
387   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
388
389   writer = FALSE;
390
391 restart:
392   /* have to lock for thread-safety */
393   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
394   g_mutex_lock (queue->qlock);
395   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
396
397   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
398   while (queue->level_buffers == 0) {
399     /* if there's a pending state change for this queue or its manager, switch
400      * back to iterator so bottom half of state change executes
401      */ 
402     while (GST_STATE (queue) != GST_STATE_PLAYING) {
403       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
404       g_mutex_unlock (queue->qlock);
405       cothread_switch(cothread_current_main());
406       goto restart;
407     }
408     g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
409
410     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
411     if (queue->reader)
412       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
413     queue->reader = TRUE;
414     g_cond_wait (queue->not_empty, queue->qlock);
415     queue->reader = FALSE;
416     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
417   }
418   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
419
420   front = queue->queue;
421   buf = (GstBuffer *)(front->data);
422   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
423   queue->queue = g_slist_remove_link (queue->queue, front);
424   g_slist_free (front);
425
426   queue->level_buffers--;
427   queue->level_bytes -= GST_BUFFER_SIZE(buf);
428   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
429       GST_DEBUG_PAD_NAME(pad),
430       queue->level_buffers, queue->size_buffers);
431
432   /* writer waiting on a full queue */
433   writer = queue->writer;
434
435   g_mutex_unlock (queue->qlock);
436
437   if (writer)
438   {
439     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
440     g_cond_signal (queue->not_full);
441   }
442
443   /* FIXME where should this be? locked? */
444   if (GST_IS_EVENT(buf)) {
445     GstEvent *event = GST_EVENT(buf);
446     switch (GST_EVENT_TYPE(event)) {
447       case GST_EVENT_EOS:
448         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue));
449         gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
450         break;
451       default:
452         break;
453     }
454   }
455
456   return buf;
457 }
458
459 static GstElementStateReturn
460 gst_queue_change_state (GstElement *element)
461 {
462   GstQueue *queue;
463   GstElementStateReturn ret;
464   GstElementState new_state;
465   g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
466
467   queue = GST_QUEUE (element);
468
469   GST_DEBUG_ENTER("('%s')", GST_ELEMENT_NAME (element));
470
471   /* lock the queue so another thread (not in sync with this thread's state)
472    * can't call this queue's _get (or whatever)
473    */
474   g_mutex_lock (queue->qlock);
475
476   new_state = GST_STATE_PENDING (element);
477
478   if (new_state == GST_STATE_PAUSED) {
479     g_cond_signal (queue->not_full);
480     g_cond_signal (queue->not_empty);
481   }
482   else if (new_state == GST_STATE_READY) {
483     gst_queue_locked_flush (queue);
484   }
485   else if (new_state == GST_STATE_PLAYING) {
486     if (!GST_PAD_CONNECTED (queue->sinkpad)) {
487       /* FIXME can this be? */
488       if (queue->reader)
489         g_cond_signal (queue->not_empty);
490       g_mutex_unlock (queue->qlock);
491
492       return GST_STATE_FAILURE;
493     }
494   }
495
496   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
497   g_mutex_unlock (queue->qlock);
498
499   GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element));
500   return ret;
501 }
502
503
504 static void
505 gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
506 {
507   GstQueue *queue;
508
509   /* it's not null if we got it, but it might not be ours */
510   g_return_if_fail (GST_IS_QUEUE (object));
511
512   queue = GST_QUEUE (object);
513
514   switch (prop_id) {
515     case ARG_LEAKY:
516       queue->leaky = g_value_get_int(value);
517       break;
518     case ARG_MAX_LEVEL:
519       queue->size_buffers = g_value_get_int(value);
520       break;
521     default:
522       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
523       break;
524   }
525 }
526
527 static void
528 gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
529 {
530   GstQueue *queue;
531
532   /* it's not null if we got it, but it might not be ours */
533   g_return_if_fail (GST_IS_QUEUE (object));
534
535   queue = GST_QUEUE (object);
536
537   switch (prop_id) {
538     case ARG_LEAKY:
539       g_value_set_int(value, queue->leaky);
540       break;
541     case ARG_LEVEL:
542       g_value_set_int(value, queue->level_buffers);
543       break;
544     case ARG_MAX_LEVEL:
545       g_value_set_int(value, queue->size_buffers);
546       break;
547     default:
548       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
549       break;
550   }
551 }