Merged from INCSCHED on 200505251!!!
[platform/upstream/gstreamer.git] / gst / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstqueue.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 //#define DEBUG_ENABLED
24 //#define STATUS_ENABLED
25 #ifdef STATUS_ENABLED
26 #define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
27 #else
28 #define STATUS(A)
29 #endif
30
31 #include <pthread.h>
32
33 #include "config.h"
34 #include "gst_private.h"
35
36 #include "gstqueue.h"
37 #include "gstscheduler.h"
38
39 GstElementDetails gst_queue_details = {
40   "Queue",
41   "Connection",
42   "Simple data queue",
43   VERSION,
44   "Erik Walthinsen <omega@cse.ogi.edu>",
45   "(C) 1999",
46 };
47
48
49 /* Queue signals and args */
50 enum {
51   LOW_WATERMARK,
52   HIGH_WATERMARK,
53   LAST_SIGNAL
54 };
55
56 enum {
57   ARG_0,
58   ARG_LEVEL_BUFFERS,
59   ARG_LEVEL_BYTES,
60   ARG_LEVEL_TIME,
61   ARG_SIZE_BUFFERS,
62   ARG_SIZE_BYTES,
63   ARG_SIZE_TIME,
64   ARG_LEAKY,
65   ARG_LEVEL,
66   ARG_MAX_LEVEL,
67 };
68
69
70 static void                     gst_queue_class_init    (GstQueueClass *klass);
71 static void                     gst_queue_init          (GstQueue *queue);
72
73 static void                     gst_queue_set_arg       (GtkObject *object, GtkArg *arg, guint id);
74 static void                     gst_queue_get_arg       (GtkObject *object, GtkArg *arg, guint id);
75
76 static gboolean                 gst_queue_handle_eos    (GstPad *pad);
77 static GstPadNegotiateReturn    gst_queue_handle_negotiate_src (GstPad *pad, GstCaps **caps, gpointer *data);
78 static GstPadNegotiateReturn    gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data);
79 static void                     gst_queue_chain         (GstPad *pad, GstBuffer *buf);
80 static GstBuffer *              gst_queue_get           (GstPad *pad);
81 static GstBufferPool*           gst_queue_get_bufferpool (GstPad *pad);
82         
83 static void                     gst_queue_flush         (GstQueue *queue);
84
85 static GstElementStateReturn    gst_queue_change_state  (GstElement *element);
86
87   
88 static GtkType
89 queue_leaky_get_type(void) {
90   static GtkType queue_leaky_type = 0;
91   static GtkEnumValue queue_leaky[] = {
92     { GST_QUEUE_NO_LEAK, "0", "Not Leaky" },
93     { GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream" },
94     { GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream" },
95     { 0, NULL, NULL },
96   };
97   if (!queue_leaky_type) {
98     queue_leaky_type = gtk_type_register_enum("GstQueueLeaky", queue_leaky);
99   }
100   return queue_leaky_type;
101 }
102 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
103
104
105 static GstElementClass *parent_class = NULL;
106 //static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
107
108 GtkType
109 gst_queue_get_type(void) {
110   static GtkType queue_type = 0;
111
112   if (!queue_type) {
113     static const GtkTypeInfo queue_info = {
114       "GstQueue",
115       sizeof(GstQueue),
116       sizeof(GstQueueClass),
117       (GtkClassInitFunc)gst_queue_class_init,
118       (GtkObjectInitFunc)gst_queue_init,
119       (GtkArgSetFunc)gst_queue_set_arg,
120       (GtkArgGetFunc)gst_queue_get_arg,
121       (GtkClassInitFunc)NULL,
122     };
123     queue_type = gtk_type_unique (GST_TYPE_ELEMENT, &queue_info);
124   }
125   return queue_type;
126 }
127
128 static void
129 gst_queue_class_init (GstQueueClass *klass)
130 {
131   GtkObjectClass *gtkobject_class;
132   GstElementClass *gstelement_class;
133
134   gtkobject_class = (GtkObjectClass*)klass;
135   gstelement_class = (GstElementClass*)klass;
136
137   parent_class = gtk_type_class (GST_TYPE_ELEMENT);
138
139   gtk_object_add_arg_type ("GstQueue::leaky", GST_TYPE_QUEUE_LEAKY,
140                            GTK_ARG_READWRITE, ARG_LEAKY);
141   gtk_object_add_arg_type ("GstQueue::level", GTK_TYPE_INT,
142                            GTK_ARG_READABLE, ARG_LEVEL);
143   gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
144                            GTK_ARG_READWRITE, ARG_MAX_LEVEL);
145
146   gtkobject_class->set_arg = gst_queue_set_arg;
147   gtkobject_class->get_arg = gst_queue_get_arg;
148
149   gstelement_class->change_state = gst_queue_change_state;
150 }
151
152 static void
153 gst_queue_init (GstQueue *queue)
154 {
155   // scheduling on this kind of element is, well, interesting
156   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
157
158   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
159   gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
160   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
161   gst_pad_set_eos_function (queue->sinkpad, gst_queue_handle_eos);
162   gst_pad_set_negotiate_function (queue->sinkpad, gst_queue_handle_negotiate_sink);
163   gst_pad_set_bufferpool_function (queue->sinkpad, gst_queue_get_bufferpool);
164
165   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
166   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
167   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
168   gst_pad_set_negotiate_function (queue->srcpad, gst_queue_handle_negotiate_src);
169
170   queue->queue = NULL;
171   queue->level_buffers = 0;
172   queue->level_bytes = 0;
173   queue->level_time = 0LL;
174   queue->size_buffers = 100;            // 100 buffers
175   queue->size_bytes = 100 * 1024;       // 100KB
176   queue->size_time = 1000000000LL;      // 1sec
177
178   queue->emptycond = g_cond_new ();
179   queue->fullcond = g_cond_new ();
180   GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
181 }
182
183 static GstBufferPool*
184 gst_queue_get_bufferpool (GstPad *pad)
185 {
186   GstQueue *queue;
187
188   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
189
190   return gst_pad_get_bufferpool (queue->srcpad);
191 }
192
193 static GstPadNegotiateReturn
194 gst_queue_handle_negotiate_src (GstPad *pad, GstCaps **caps, gpointer *data)
195 {
196   GstQueue *queue;
197
198   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
199
200   return gst_pad_negotiate_proxy (pad, queue->sinkpad, caps);
201   
202
203   //return GST_PAD_NEGOTIATE_FAIL;
204 }
205
206 static GstPadNegotiateReturn
207 gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data)
208 {
209   GstQueue *queue;
210
211   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
212
213   /*
214   if (counter == 0) {
215      *caps = NULL;
216      return GST_PAD_NEGOTIATE_TRY;
217   }
218   if (*caps) {
219   */
220     return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
221     /*
222   }
223
224   return GST_PAD_NEGOTIATE_FAIL;
225   */
226 }
227
228 static gboolean
229 gst_queue_handle_eos (GstPad *pad)
230 {
231   GstQueue *queue;
232
233   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
234
235   GST_DEBUG (GST_CAT_DATAFLOW,"%s received eos\n", GST_ELEMENT_NAME (queue));
236
237   GST_LOCK (queue);
238   GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
239                   queue->level_buffers);
240
241   GST_FLAG_SET (pad, GST_PAD_EOS);
242
243   g_cond_signal (queue->emptycond);
244
245   GST_UNLOCK (queue);
246
247   return TRUE;
248 }
249
250 static void
251 gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
252 {
253   GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
254
255   gst_buffer_unref (GST_BUFFER (data));
256 }
257
258 static void
259 gst_queue_flush (GstQueue *queue)
260 {
261   g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
262                   (char *) GST_ELEMENT_NAME (queue));
263   g_slist_free (queue->queue);
264
265   queue->queue = NULL;
266   queue->level_buffers = 0;
267   queue->timeval = NULL;
268 }
269
270 static void
271 gst_queue_chain (GstPad *pad, GstBuffer *buf)
272 {
273   GstQueue *queue;
274   gboolean tosignal = FALSE;
275   const guchar *name;
276
277   g_return_if_fail (pad != NULL);
278   g_return_if_fail (GST_IS_PAD (pad));
279   g_return_if_fail (buf != NULL);
280
281   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
282   name = GST_ELEMENT_NAME (queue);
283
284   /* we have to lock the queue since we span threads */
285
286 //  GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
287   GST_LOCK (queue);
288
289   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
290     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
291     gst_queue_flush (queue);
292   }
293
294   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
295
296   if (queue->level_buffers >= queue->size_buffers) {
297     // if this is a leaky queue...
298     if (queue->leaky) {
299       // if we leak on the upstream side, drop the current buffer
300       if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
301         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
302         gst_buffer_unref(buf);
303         // now we have to clean up and exit right away
304         GST_UNLOCK (queue);
305         return;
306       }
307       // otherwise we have to push a buffer off the other end
308       else {
309         GSList *front;
310         GstBuffer *leakbuf;
311         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
312         front = queue->queue;
313         leakbuf = (GstBuffer *)(front->data);
314         queue->level_buffers--;
315         queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
316         gst_buffer_unref(leakbuf);
317         queue->queue = g_slist_remove_link (queue->queue, front);
318         g_slist_free (front);
319       }
320     }
321
322     while (queue->level_buffers >= queue->size_buffers) {
323       // if there's a pending state change for this queue or its manager, switch
324       // back to iterator so bottom half of state change executes
325       if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
326 //          GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
327 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != 
328 GST_STATE_NONE_PENDING)
329       {
330         GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
331         if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
332           GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
333         if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
334           GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
335         GST_UNLOCK(queue);
336         cothread_switch(cothread_current_main());
337       }
338
339       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
340       g_cond_signal (queue->emptycond);
341       g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
342       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
343     }
344   }
345
346   /* put the buffer on the tail of the list */
347   queue->queue = g_slist_append (queue->queue, buf);
348   queue->level_buffers++;
349   queue->level_bytes += GST_BUFFER_SIZE(buf);
350 //  GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
351
352   /* if we were empty, but aren't any more, signal a condition */
353   if (queue->level_buffers == 1)
354   {
355     GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
356     g_cond_signal (queue->emptycond);
357   }
358
359   GST_UNLOCK (queue);
360 }
361
362 static GstBuffer *
363 gst_queue_get (GstPad *pad)
364 {
365   GstQueue *queue;
366   GstBuffer *buf = NULL;
367   GSList *front;
368   const guchar *name;
369
370   g_assert(pad != NULL);
371   g_assert(GST_IS_PAD(pad));
372   g_return_val_if_fail (pad != NULL, NULL);
373   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
374
375   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
376   name = GST_ELEMENT_NAME (queue);
377
378   /* have to lock for thread-safety */
379   GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
380   GST_LOCK (queue);
381   GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
382   GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
383
384   while (!queue->level_buffers) {
385     if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
386       GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
387       GST_UNLOCK(queue);
388       gst_pad_set_eos (queue->srcpad);
389       // this return NULL shouldn't hurt anything...
390       return NULL;
391     }
392
393     // if there's a pending state change for this queue or its manager, switch
394     // back to iterator so bottom half of state change executes
395     if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
396 //        GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
397 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != 
398 GST_STATE_NONE_PENDING)
399     {
400       GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
401       if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
402         GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
403       if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
404         GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
405       GST_UNLOCK(queue);
406       cothread_switch(cothread_current_main());
407     }
408
409     g_cond_signal (queue->fullcond);
410     g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
411   }
412
413   front = queue->queue;
414   buf = (GstBuffer *)(front->data);
415   GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
416   queue->queue = g_slist_remove_link (queue->queue, front);
417   g_slist_free (front);
418
419 //  if (queue->level_buffers < queue->size_buffers)
420   if (queue->level_buffers == queue->size_buffers)
421   {
422     GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
423     g_cond_signal (queue->fullcond);
424   }
425
426   queue->level_buffers--;
427   queue->level_bytes -= GST_BUFFER_SIZE(buf);
428   GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
429
430   GST_UNLOCK(queue);
431
432   return buf;
433 }
434
435 static GstElementStateReturn
436 gst_queue_change_state (GstElement *element)
437 {
438   GstQueue *queue;
439   GstElementStateReturn ret;
440   g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
441
442   queue = GST_QUEUE (element);
443
444   // lock the queue so another thread (not in sync with this thread's state)
445   // can't call this queue's _get (or whatever)
446   GST_LOCK (queue);
447
448   /* if going down into NULL state, clear out buffers*/
449   if (GST_STATE_PENDING (element) == GST_STATE_READY) {
450     /* otherwise (READY or higher) we need to open the file */
451     gst_queue_flush (queue);
452   }
453
454   /* if we haven't failed already, give the parent class a chance to ;-) */
455   if (GST_ELEMENT_CLASS (parent_class)->change_state)
456   {
457     gboolean valid_handler = FALSE;
458     guint state_change_id = gtk_signal_lookup("state_change", GTK_OBJECT_TYPE(element));
459
460     // determine whether we need to block the parent (element) class'
461     // STATE_CHANGE signal so we can UNLOCK before returning.  we block
462     // it if we could find the state_change signal AND there's a signal
463     // handler attached to it.
464     //
465     // note: this assumes that change_state() *only* emits state_change signal.
466     // if element change_state() emits other signals, they need to be blocked
467     // as well.
468     if (state_change_id &&
469         gtk_signal_handler_pending(GTK_OBJECT(element), state_change_id, FALSE))
470       valid_handler = TRUE;
471     if (valid_handler)
472       gtk_signal_handler_block(GTK_OBJECT(element), state_change_id);
473
474     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
475
476     if (valid_handler)
477       gtk_signal_handler_unblock(GTK_OBJECT(element), state_change_id);
478
479     // UNLOCK, *then* emit signal (if there's one there)
480     GST_UNLOCK(queue);
481     if (valid_handler)
482       gtk_signal_emit(GTK_OBJECT (element), state_change_id, GST_STATE(element));
483   }
484   else
485   {
486     ret = GST_STATE_SUCCESS;
487     GST_UNLOCK(queue);
488   }
489
490   return ret;
491 }
492
493
494 static void
495 gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
496 {
497   GstQueue *queue;
498
499   /* it's not null if we got it, but it might not be ours */
500   g_return_if_fail (GST_IS_QUEUE (object));
501
502   queue = GST_QUEUE (object);
503
504   switch(id) {
505     case ARG_LEAKY:
506       queue->leaky = GTK_VALUE_INT (*arg);
507       break;
508     case ARG_MAX_LEVEL:
509       queue->size_buffers = GTK_VALUE_INT (*arg);
510       break;
511     default:
512       break;
513   }
514 }
515
516 static void
517 gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
518 {
519   GstQueue *queue;
520
521   /* it's not null if we got it, but it might not be ours */
522   g_return_if_fail (GST_IS_QUEUE (object));
523
524   queue = GST_QUEUE (object);
525
526   switch (id) {
527     case ARG_LEAKY:
528       GTK_VALUE_INT (*arg) = queue->leaky;
529       break;
530     case ARG_LEVEL:
531       GTK_VALUE_INT (*arg) = queue->level_buffers;
532       break;
533     case ARG_MAX_LEVEL:
534       GTK_VALUE_INT (*arg) = queue->size_buffers;
535       break;
536     default:
537       arg->type = GTK_TYPE_INVALID;
538       break;
539   }
540 }