X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Fgstqueue.c;h=d0139e8f4ee1c8495aa1c8df88827ebb0bd6df4d;hb=8c9cd079d4f9a516ca52b2c44432ab8f3b19ec71;hp=dac68ea11ffc4db9b8a195a522213567e624aa23;hpb=a8b13468572d06fe32c9a08e3b583c160a9fea2a;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/gstqueue.c b/gst/gstqueue.c index dac68ea..d0139e8 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -1,6 +1,7 @@ /* GStreamer * Copyright (C) 1999,2000 Erik Walthinsen * 2000 Wim Taymans + * 2003 Colin Walters * * gstqueue.c: * @@ -20,77 +21,91 @@ * Boston, MA 02111-1307, USA. */ -/* #define DEBUG_ENABLED */ -/* #define STATUS_ENABLED */ -#ifdef STATUS_ENABLED -#define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue)) -#else -#define STATUS(A) -#endif -#include - -#include "config.h" #include "gst_private.h" #include "gstqueue.h" #include "gstscheduler.h" #include "gstevent.h" +#include "gstinfo.h" -GstElementDetails gst_queue_details = { +static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ( "Queue", - "Connection", + "Generic", "Simple data queue", - VERSION, - "Erik Walthinsen ", - "(C) 1999", -}; + "Erik Walthinsen " +); /* Queue signals and args */ enum { - LOW_WATERMARK, - HIGH_WATERMARK, + SIGNAL_UNDERRUN, + SIGNAL_RUNNING, + SIGNAL_OVERRUN, LAST_SIGNAL }; enum { ARG_0, - ARG_LEVEL_BUFFERS, - ARG_LEVEL_BYTES, - ARG_LEVEL_TIME, - ARG_SIZE_BUFFERS, - ARG_SIZE_BYTES, - ARG_SIZE_TIME, + /* FIXME: don't we have another way of doing this + * "Gstreamer format" (frame/byte/time) queries? */ + ARG_CUR_LEVEL_BUFFERS, + ARG_CUR_LEVEL_BYTES, + ARG_CUR_LEVEL_TIME, + ARG_MAX_SIZE_BUFFERS, + ARG_MAX_SIZE_BYTES, + ARG_MAX_SIZE_TIME, + ARG_MIN_TRESHOLD_BUFFERS, + ARG_MIN_TRESHOLD_BYTES, + ARG_MIN_TRESHOLD_TIME, ARG_LEAKY, - ARG_LEVEL, - ARG_MAX_LEVEL, + ARG_MAY_DEADLOCK, + ARG_BLOCK_TIMEOUT + /* FILL ME */ }; - -static void gst_queue_class_init (GstQueueClass *klass); -static void gst_queue_init (GstQueue *queue); - -static void gst_queue_set_property (GObject *object, guint prop_id, - const GValue *value, GParamSpec *pspec); -static void gst_queue_get_property (GObject *object, guint prop_id, - GValue *value, GParamSpec *pspec); - -static GstPadNegotiateReturn gst_queue_handle_negotiate_src (GstPad *pad, GstCaps **caps, gpointer *data); -static GstPadNegotiateReturn gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data); -static void gst_queue_chain (GstPad *pad, GstBuffer *buf); -static GstBuffer * gst_queue_get (GstPad *pad); -static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); +typedef struct _GstQueueEventResponse { + GstEvent *event; + gboolean ret, handled; +} GstQueueEventResponse; + +static void gst_queue_base_init (GstQueueClass *klass); +static void gst_queue_class_init (GstQueueClass *klass); +static void gst_queue_init (GstQueue *queue); +static void gst_queue_dispose (GObject *object); + +static void gst_queue_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec); +static void gst_queue_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec); + +static GstCaps *gst_queue_getcaps (GstPad *pad); +static GstPadLinkReturn + gst_queue_link (GstPad *pad, + const GstCaps *caps); +static void gst_queue_chain (GstPad *pad, + GstData *data); +static GstData *gst_queue_get (GstPad *pad); -static void gst_queue_locked_flush (GstQueue *queue); -static void gst_queue_flush (GstQueue *queue); +static gboolean gst_queue_handle_src_event (GstPad *pad, + GstEvent *event); -static GstElementStateReturn gst_queue_change_state (GstElement *element); +static void gst_queue_locked_flush (GstQueue *queue); + +static GstElementStateReturn + gst_queue_change_state (GstElement *element); +static gboolean gst_queue_release_locks (GstElement *element); -#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type()) +#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ()) + static GType -queue_leaky_get_type(void) { +queue_leaky_get_type (void) +{ static GType queue_leaky_type = 0; static GEnumValue queue_leaky[] = { { GST_QUEUE_NO_LEAK, "0", "Not Leaky" }, @@ -105,56 +120,125 @@ queue_leaky_get_type(void) { } static GstElementClass *parent_class = NULL; -/* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */ +static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; GType -gst_queue_get_type(void) +gst_queue_get_type (void) { static GType queue_type = 0; if (!queue_type) { static const GTypeInfo queue_info = { - sizeof(GstQueueClass), + sizeof (GstQueueClass), + (GBaseInitFunc) gst_queue_base_init, NULL, + (GClassInitFunc) gst_queue_class_init, NULL, - (GClassInitFunc)gst_queue_class_init, NULL, - NULL, - sizeof(GstQueue), + sizeof (GstQueue), 4, - (GInstanceInitFunc)gst_queue_init, + (GInstanceInitFunc) gst_queue_init, NULL }; - queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0); + + queue_type = g_type_register_static (GST_TYPE_ELEMENT, + "GstQueue", &queue_info, 0); } + return queue_type; } static void -gst_queue_class_init (GstQueueClass *klass) +gst_queue_base_init (GstQueueClass *klass) { - GObjectClass *gobject_class; - GstElementClass *gstelement_class; - - gobject_class = (GObjectClass*)klass; - gstelement_class = (GstElementClass*)klass; + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); - - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY, - g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.", - GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE)); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL, - g_param_spec_int("level","Level","How many buffers are in the queue.", - 0,G_MAXINT,0,G_PARAM_READABLE)); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL, - g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.", - 0,G_MAXINT,100,G_PARAM_READWRITE)); - - gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property); + gst_element_class_set_details (gstelement_class, &gst_queue_details); +} - gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state); +static void +gst_queue_class_init (GstQueueClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + parent_class = g_type_class_peek_parent (klass); + + /* signals */ + gst_queue_signals[SIGNAL_UNDERRUN] = + g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + gst_queue_signals[SIGNAL_RUNNING] = + g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + gst_queue_signals[SIGNAL_OVERRUN] = + g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, + G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL, + g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + /* properties */ + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, + g_param_spec_uint ("current-level-bytes", "Current level (kB)", + "Current amount of data in the queue (bytes)", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS, + g_param_spec_uint ("current-level-buffers", "Current level (buffers)", + "Current number of buffers in the queue", + 0, G_MAXUINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, + g_param_spec_uint64 ("current-level-time", "Current level (ns)", + "Current amount of data in the queue (in ns)", + 0, G_MAXUINT64, 0, G_PARAM_READABLE)); + + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, + g_param_spec_uint ("max-size-bytes", "Max. size (kB)", + "Max. amount of data in the queue (bytes, 0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, + g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", + "Max. number of buffers in the queue (0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, + g_param_spec_uint64 ("max-size-time", "Max. size (ns)", + "Max. amount of data in the queue (in ns, 0=disable)", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BYTES, + g_param_spec_uint ("min-treshold-bytes", "Min. treshold (kB)", + "Min. amount of data in the queue to allow reading (bytes, 0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BUFFERS, + g_param_spec_uint ("min-treshold-buffers", "Min. treshold (buffers)", + "Min. number of buffers in the queue to allow reading (0=disable)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_TIME, + g_param_spec_uint64 ("min-treshold-time", "Min. treshold (ns)", + "Min. amount of data in the queue to allow reading (in ns, 0=disable)", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_class, ARG_LEAKY, + g_param_spec_enum ("leaky", "Leaky", + "Where the queue leaks, if at all", + GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK, + g_param_spec_boolean ("may_deadlock", "May Deadlock", + "The queue may deadlock if it's full and not PLAYING", + TRUE, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT, + g_param_spec_uint64 ("block_timeout", "Timeout for Block", + "Nanoseconds until blocked queue times out and returns filler event. " + "Value of -1 disables timeout", + 0, G_MAXUINT64, -1, G_PARAM_READWRITE)); + + /* set several parent class virtual functions */ + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); + + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state); + gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks); } static void @@ -165,386 +249,748 @@ gst_queue_init (GstQueue *queue) GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE); queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain)); + gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_chain)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); - gst_pad_set_negotiate_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_handle_negotiate_sink)); - gst_pad_set_bufferpool_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_get_bufferpool)); + gst_pad_set_link_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_link)); + gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); + gst_pad_set_active (queue->sinkpad, TRUE); queue->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get)); + gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); - gst_pad_set_negotiate_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_handle_negotiate_src)); - - queue->queue = NULL; - queue->level_buffers = 0; - queue->level_bytes = 0; - queue->level_time = 0LL; - queue->size_buffers = 100; /* 100 buffers */ - queue->size_bytes = 100 * 1024; /* 100KB */ - queue->size_time = 1000000000LL; /* 1sec */ + gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link)); + gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); + gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_handle_src_event)); + gst_pad_set_active (queue->srcpad, TRUE); + + queue->cur_level.buffers = 0; /* no content */ + queue->cur_level.bytes = 0; /* no content */ + queue->cur_level.time = 0; /* no content */ + queue->max_size.buffers = 100; /* max. 100 buffers */ + queue->max_size.bytes = 1024 * 1024; /* max. 1 MB */ + queue->max_size.time = GST_SECOND; /* max. 1 sec. */ + queue->min_treshold.buffers = 0; /* no treshold */ + queue->min_treshold.bytes = 0; /* no treshold */ + queue->min_treshold.time = 0; /* no treshold */ + + queue->leaky = GST_QUEUE_NO_LEAK; + queue->may_deadlock = TRUE; + queue->block_timeout = GST_CLOCK_TIME_NONE; + queue->interrupt = FALSE; + queue->flush = FALSE; queue->qlock = g_mutex_new (); - queue->reader = FALSE; - queue->writer = FALSE; - queue->not_empty = g_cond_new (); - queue->not_full = g_cond_new (); - GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n"); + queue->item_add = g_cond_new (); + queue->item_del = g_cond_new (); + queue->event_done = g_cond_new (); + queue->events = g_queue_new (); + queue->queue = g_queue_new (); + + GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, + "initialized queue's not_empty & not_full conditions"); } -static GstBufferPool* -gst_queue_get_bufferpool (GstPad *pad) +static void +gst_queue_dispose (GObject *object) { - GstQueue *queue; + GstQueue *queue = GST_QUEUE (object); - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL); + + while (!g_queue_is_empty (queue->queue)) { + GstData *data = g_queue_pop_head (queue->queue); + gst_data_unref (data); + } + g_queue_free (queue->queue); + g_mutex_free (queue->qlock); + g_cond_free (queue->item_add); + g_cond_free (queue->item_del); + g_cond_free (queue->event_done); + while (!g_queue_is_empty (queue->events)) { + GstEvent *event = g_queue_pop_head (queue->events); + gst_event_unref (event); + } - return gst_pad_get_bufferpool (queue->srcpad); + if (G_OBJECT_CLASS (parent_class)->dispose) + G_OBJECT_CLASS (parent_class)->dispose (object); } -static GstPadNegotiateReturn -gst_queue_handle_negotiate_src (GstPad *pad, GstCaps **caps, gpointer *data) +static GstPad * +gst_queue_otherpad (GstPad *pad) { - GstQueue *queue; + GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); + GstPad *otherpad; - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); + if (pad == queue->srcpad) + otherpad = queue->sinkpad; + else + otherpad = queue->srcpad; - return gst_pad_negotiate_proxy (pad, queue->sinkpad, caps); + return otherpad; } -static GstPadNegotiateReturn -gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data) +static GstPadLinkReturn +gst_queue_link (GstPad *pad, const GstCaps *caps) { - GstQueue *queue; - - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - - return gst_pad_negotiate_proxy (pad, queue->srcpad, caps); + return gst_pad_proxy_link (gst_queue_otherpad (pad), caps); } -static void -gst_queue_cleanup_buffers (gpointer data, const gpointer user_data) +static GstCaps * +gst_queue_getcaps (GstPad *pad) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data); + GstPad *otherpad = GST_PAD_PEER (gst_queue_otherpad (pad)); - if (GST_IS_BUFFER (data)) { - gst_buffer_unref (GST_BUFFER (data)); - } + if (otherpad) + return gst_pad_get_caps (otherpad); + + return gst_caps_new_any (); } static void gst_queue_locked_flush (GstQueue *queue) { - g_slist_foreach (queue->queue, gst_queue_cleanup_buffers, - (gpointer) queue); - g_slist_free (queue->queue); - - queue->queue = NULL; - queue->level_buffers = 0; + while (!g_queue_is_empty (queue->queue)) { + GstData *data = g_queue_pop_head (queue->queue); + gst_data_unref (data); + } queue->timeval = NULL; + queue->cur_level.buffers = 0; + queue->cur_level.bytes = 0; + queue->cur_level.time = 0; + + /* make sure any pending buffers to be added are flushed too */ + queue->flush = TRUE; + + /* we deleted something... */ + g_cond_signal (queue->item_del); } static void -gst_queue_flush (GstQueue *queue) +gst_queue_handle_pending_events (GstQueue *queue) { - g_mutex_lock (queue->qlock); - gst_queue_locked_flush (queue); - g_mutex_unlock (queue->qlock); + /* check for events to send upstream */ + while (!g_queue_is_empty (queue->events)){ + GstQueueEventResponse *er = g_queue_pop_head (queue->events); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream"); + er->ret = gst_pad_event_default (queue->srcpad, er->event); + er->handled = TRUE; + g_cond_signal (queue->event_done); + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent"); + } } +#define STATUS(queue, msg) \ + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \ + "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ + "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ + "-%" G_GUINT64_FORMAT " ns, %u elements", \ + GST_DEBUG_PAD_NAME (pad), \ + queue->cur_level.buffers, \ + queue->min_treshold.buffers, \ + queue->max_size.buffers, \ + queue->cur_level.bytes, \ + queue->min_treshold.bytes, \ + queue->max_size.bytes, \ + queue->cur_level.time, \ + queue->min_treshold.time, \ + queue->max_size.time, \ + queue->queue->length) static void -gst_queue_chain (GstPad *pad, GstBuffer *buf) +gst_queue_chain (GstPad *pad, + GstData *data) { GstQueue *queue; - gboolean reader; g_return_if_fail (pad != NULL); g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (buf != NULL); + g_return_if_fail (data != NULL); queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - - reader = FALSE; - + restart: /* we have to lock the queue since we span threads */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ()); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); - if (GST_IS_EVENT (buf)) { - switch (GST_EVENT_TYPE (buf)) { + gst_queue_handle_pending_events (queue); + + /* assume don't need to flush this buffer when the queue is filled */ + queue->flush = FALSE; + + if (GST_IS_EVENT (data)) { + switch (GST_EVENT_TYPE (data)) { case GST_EVENT_FLUSH: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); + STATUS (queue, "received flush event"); gst_queue_locked_flush (queue); + STATUS (queue, "after flush"); break; case GST_EVENT_EOS: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", - GST_ELEMENT_NAME (queue), queue->level_buffers); + STATUS (queue, "received EOS"); break; default: - gst_pad_event_default (pad, GST_EVENT (buf)); + /* we put the event in the queue, we don't have to act ourselves */ + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "adding event %p of type %d", + data, GST_EVENT_TYPE (data)); break; } } - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf)); - - if (queue->level_buffers == queue->size_buffers) { - /* if this is a leaky queue... */ - if (queue->leaky) { - /* FIXME don't want to leak events! */ - /* if we leak on the upstream side, drop the current buffer */ - if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); - if (GST_IS_EVENT (buf)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_EVENT_TYPE(GST_EVENT(buf))); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); - gst_buffer_unref(buf); - /* now we have to clean up and exit right away */ + + if (GST_IS_BUFFER (data)) + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "adding buffer %p of size %d", + data, GST_BUFFER_SIZE (data)); + + /* We make space available if we're "full" according to whatever + * the user defined as "full". Note that this only applies to buffers. + * We always handle events and they don't count in our statistics. */ + if (GST_IS_BUFFER (data) && + ((queue->max_size.buffers > 0 && + queue->cur_level.buffers >= queue->max_size.buffers) || + (queue->max_size.bytes > 0 && + queue->cur_level.bytes >= queue->max_size.bytes) || + (queue->max_size.time > 0 && + queue->cur_level.time >= queue->max_size.time))) { + g_mutex_unlock (queue->qlock); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0); + g_mutex_lock (queue->qlock); + + /* how are we going to make space for this buffer? */ + switch (queue->leaky) { + /* leak current buffer */ + case GST_QUEUE_LEAK_UPSTREAM: + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue is full, leaking buffer on upstream end"); + /* now we can clean up and exit right away */ g_mutex_unlock (queue->qlock); - return; - } - /* otherwise we have to push a buffer off the other end */ - else { - GSList *front; - GstBuffer *leakbuf; - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n"); - front = queue->queue; - leakbuf = (GstBuffer *)(front->data); - if (GST_IS_EVENT (leakbuf)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_EVENT_TYPE(GST_EVENT(leakbuf))); - queue->level_buffers--; - queue->level_bytes -= GST_BUFFER_SIZE(leakbuf); - gst_buffer_unref(leakbuf); - queue->queue = g_slist_remove_link (queue->queue, front); - g_slist_free (front); + goto out_unref; + + /* leak first buffer in the queue */ + case GST_QUEUE_LEAK_DOWNSTREAM: { + /* this is a bit hacky. We'll manually iterate the list + * and find the first buffer from the head on. We'll + * unref that and "fix up" the GQueue object... */ + GList *item; + GstData *leak = NULL; + + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue is full, leaking buffer on downstream end"); + + for (item = queue->queue->head; item != NULL; item = item->next) { + if (GST_IS_BUFFER (item->data)) { + leak = item->data; + break; + } + } + + /* if we didn't find anything, it means we have no buffers + * in here. That cannot happen, since we had >= 1 bufs */ + g_assert (leak); + + /* Now remove it from the list, fixing up the GQueue + * CHECKME: is a queue->head the first or the last item? */ + item = g_list_delete_link (queue->queue->head, item); + queue->queue->head = g_list_first (item); + queue->queue->tail = g_list_last (item); + queue->queue->length--; + + /* and unref the data at the end. Twice, because we keep a ref + * to make things read-only. Also keep our list uptodate. */ + queue->cur_level.bytes -= GST_BUFFER_SIZE (data); + queue->cur_level.buffers --; + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time -= GST_BUFFER_DURATION (data); + + gst_data_unref (data); + gst_data_unref (data); + break; } - } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n", - queue->level_buffers, queue->size_buffers); - while (queue->level_buffers == queue->size_buffers) { - /* if there's a pending state change for this queue or its manager, switch */ - /* back to iterator so bottom half of state change executes */ - while (GST_STATE (queue) != GST_STATE_PLAYING) { - //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); + default: + g_warning ("Unknown leaky type, using default"); + /* fall-through */ + + /* don't leak. Instead, wait for space to be available */ + case GST_QUEUE_NO_LEAK: + STATUS (queue, "pre-full wait"); + + while ((queue->max_size.buffers > 0 && + queue->cur_level.buffers >= queue->max_size.buffers) || + (queue->max_size.bytes > 0 && + queue->cur_level.bytes >= queue->max_size.bytes) || + (queue->max_size.time > 0 && + queue->cur_level.time >= queue->max_size.time)) { + /* if there's a pending state change for this queue + * or its manager, switch back to iterator so bottom + * half of state change executes */ + if (queue->interrupt) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted"); + g_mutex_unlock (queue->qlock); + if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), + GST_ELEMENT (queue))) { + goto out_unref; + } + /* if we got here because we were unlocked after a + * flush, we don't need to add the buffer to the + * queue again */ + if (queue->flush) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "not adding pending buffer after flush"); + goto out_unref; + } + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "adding pending buffer after interrupt"); + goto restart; + } + + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down. Try to + * signal to resolve the error */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_data_unref (data); + gst_element_error (GST_ELEMENT (queue), + "deadlock found, source pad elements are shut down"); + /* we don't go to out_unref here, since we want to + * unref the buffer *before* calling gst_element_error */ + return; + } else { + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "%s: waiting for the app to restart " + "source pad elements", + GST_ELEMENT_NAME (queue)); + } + } + + /* OK, we've got a serious issue here. Imagine the situation + * where the puller (next element) is sending an event here, + * so it cannot pull events from the queue, and we cannot + * push data further because the queue is 'full' and therefore, + * we wait here (and do not handle events): deadlock! to solve + * that, we handle pending upstream events here, too. */ + gst_queue_handle_pending_events (queue); + + STATUS (queue, "waiting for item_del signal"); + g_cond_wait (queue->item_del, queue->qlock); + STATUS (queue, "received item_del signal"); + } + + STATUS (queue, "post-full wait"); g_mutex_unlock (queue->qlock); - cothread_switch(cothread_current_main()); - goto restart; - } - g_assert (GST_STATE (queue) == GST_STATE_PLAYING); - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - if (queue->writer) - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n"); - queue->writer = TRUE; - g_cond_wait (queue->not_full, queue->qlock); - queue->writer = FALSE; - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n"); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + g_mutex_lock (queue->qlock); + break; } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", - queue->level_buffers, queue->size_buffers); } - /* put the buffer on the tail of the list */ - queue->queue = g_slist_append (queue->queue, buf); - queue->level_buffers++; - queue->level_bytes += GST_BUFFER_SIZE(buf); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + /* put the buffer on the tail of the list. We keep a reference, + * so that the data is read-only while in here. There's a good + * reason to do so: we have a size and time counter, and any + * modification to the content could change any of the two. */ + gst_data_ref (data); + g_queue_push_tail (queue->queue, data); - /* reader waiting on an empty queue */ - reader = queue->reader; + /* Note that we only add buffers (not events) to the statistics */ + if (GST_IS_BUFFER (data)) { + queue->cur_level.buffers++; + queue->cur_level.bytes += GST_BUFFER_SIZE (data); + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time += GST_BUFFER_DURATION (data); + } + + STATUS (queue, "+ level"); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_add"); + g_cond_signal (queue->item_add); g_mutex_unlock (queue->qlock); - if (reader) - { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n"); - g_cond_signal (queue->not_empty); - } + return; + +out_unref: + gst_data_unref (data); + return; } -static GstBuffer * +static GstData * gst_queue_get (GstPad *pad) { GstQueue *queue; - GstBuffer *buf = NULL; - GSList *front; - gboolean writer; + GstData *data; - g_assert(pad != NULL); - g_assert(GST_IS_PAD(pad)); g_return_val_if_fail (pad != NULL, NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); - queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); - - writer = FALSE; + queue = GST_QUEUE (gst_pad_get_parent (pad)); restart: /* have to lock for thread-safety */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty); - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - while (queue->level_buffers == 0) { - /* if there's a pending state change for this queue or its manager, switch - * back to iterator so bottom half of state change executes - */ - while (GST_STATE (queue) != GST_STATE_PLAYING) { - //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); - g_mutex_unlock (queue->qlock); - cothread_switch(cothread_current_main()); - goto restart; + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "locked t:%p", g_thread_self ()); + + if (queue->queue->length == 0 || + (queue->min_treshold.buffers > 0 && + queue->cur_level.buffers < queue->min_treshold.buffers) || + (queue->min_treshold.bytes > 0 && + queue->cur_level.bytes < queue->min_treshold.bytes) || + (queue->min_treshold.time > 0 && + queue->cur_level.time < queue->min_treshold.time)) { + g_mutex_unlock (queue->qlock); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0); + g_mutex_lock (queue->qlock); + + STATUS (queue, "pre-empty wait"); + while (queue->queue->length == 0 || + (queue->min_treshold.buffers > 0 && + queue->cur_level.buffers < queue->min_treshold.buffers) || + (queue->min_treshold.bytes > 0 && + queue->cur_level.bytes < queue->min_treshold.bytes) || + (queue->min_treshold.time > 0 && + queue->cur_level.time < queue->min_treshold.time)) { + /* if there's a pending state change for this queue or its + * manager, switch back to iterator so bottom half of state + * change executes. */ + if (queue->interrupt) { + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted"); + g_mutex_unlock (queue->qlock); + if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), + GST_ELEMENT (queue))) + return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)); + goto restart; + } + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_element_error (GST_ELEMENT (queue), + "deadlock found, sink pad elements are shut down"); + goto restart; + } else { + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "%s: waiting for the app to restart " + "source pad elements", + GST_ELEMENT_NAME (queue)); + } + } + + STATUS (queue, "waiting for item_add"); + + if (queue->block_timeout != GST_CLOCK_TIME_NONE) { + GTimeVal timeout; + g_get_current_time (&timeout); + g_time_val_add (&timeout, queue->block_timeout / 1000); + if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)){ + g_mutex_unlock (queue->qlock); + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Sending filler event"); + return GST_DATA (gst_event_new_filler ()); + } + } else { + g_cond_wait (queue->item_add, queue->qlock); + } + STATUS (queue, "got item_add signal"); } - g_assert (GST_STATE (queue) == GST_STATE_PLAYING); - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - if (queue->reader) - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n"); - queue->reader = TRUE; - g_cond_wait (queue->not_empty, queue->qlock); - queue->reader = FALSE; - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n"); + + STATUS (queue, "post-empty wait"); + g_mutex_unlock (queue->qlock); + g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0); + g_mutex_lock (queue->qlock); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - front = queue->queue; - buf = (GstBuffer *)(front->data); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf); - queue->queue = g_slist_remove_link (queue->queue, front); - g_slist_free (front); + /* There's something in the list now, whatever it is */ + data = g_queue_pop_head (queue->queue); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, + "retrieved data %p from queue", data); - queue->level_buffers--; - queue->level_bytes -= GST_BUFFER_SIZE(buf); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + if (GST_IS_BUFFER (data)) { + /* Update statistics */ + queue->cur_level.buffers--; + queue->cur_level.bytes -= GST_BUFFER_SIZE (data); + if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE) + queue->cur_level.time -= GST_BUFFER_DURATION (data); + } + + /* Now that we're done, we can lose our own reference to + * the item, since we're no longer in danger. */ + gst_data_unref (data); - /* writer waiting on a full queue */ - writer = queue->writer; + STATUS (queue, "after _get()"); + GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del"); + g_cond_signal (queue->item_del); g_mutex_unlock (queue->qlock); - if (writer) - { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n"); - g_cond_signal (queue->not_full); + /* FIXME: I suppose this needs to be locked, since the EOS + * bit affects the pipeline state. However, that bit is + * locked too so it'd cause a deadlock. */ + if (GST_IS_EVENT (data)) { + GstEvent *event = GST_EVENT (data); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "queue \"%s\" eos", + GST_ELEMENT_NAME (queue)); + gst_element_set_eos (GST_ELEMENT (queue)); + break; + default: + break; + } } - /* FIXME where should this be? locked? */ - if (GST_IS_EVENT(buf)) { - GstEvent *event = GST_EVENT(buf); - switch (GST_EVENT_TYPE(event)) { - case GST_EVENT_EOS: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue)); - gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED); + return data; +} + + +static gboolean +gst_queue_handle_src_event (GstPad *pad, + GstEvent *event) +{ + GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); + gboolean res; + + g_mutex_lock (queue->qlock); + + if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { + GstQueueEventResponse er; + + /* push the event to the queue and wait for upstream consumption */ + er.event = event; + er.handled = FALSE; + g_queue_push_tail (queue->events, &er); + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Preparing for loop for event handler"); + /* see the chain function on why this is here - it prevents a deadlock */ + g_cond_signal (queue->item_del); + while (!er.handled) { + GTimeVal timeout; + g_get_current_time (&timeout); + g_time_val_add (&timeout, 500 * 1000); /* half a second */ + if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && + !er.handled) { + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "timeout in upstream event handling"); + /* remove ourselves from the pending list. Since we're + * locked, others cannot reference this anymore. */ + queue->queue->head = g_list_remove (queue->queue->head, &er); + queue->queue->head = g_list_first (queue->queue->head); + queue->queue->tail = g_list_last (queue->queue->head); + queue->queue->length--; + res = FALSE; + goto handled; + } + } + GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, + "Event handled"); + res = er.ret; + } else { + res = gst_pad_event_default (pad, event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH: + GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, + "FLUSH event, flushing queue\n"); + gst_queue_locked_flush (queue); break; + case GST_EVENT_SEEK: + if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) { + gst_queue_locked_flush (queue); + } default: break; } } +handled: + g_mutex_unlock (queue->qlock); - return buf; + return res; +} + +static gboolean +gst_queue_release_locks (GstElement *element) +{ + GstQueue *queue; + + queue = GST_QUEUE (element); + + g_mutex_lock (queue->qlock); + queue->interrupt = TRUE; + g_cond_signal (queue->item_add); + g_cond_signal (queue->item_del); + g_mutex_unlock (queue->qlock); + + return TRUE; } static GstElementStateReturn gst_queue_change_state (GstElement *element) { GstQueue *queue; - GstElementStateReturn ret; - GstElementState new_state; - g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE); + GstElementStateReturn ret = GST_STATE_SUCCESS; queue = GST_QUEUE (element); - GST_DEBUG_ENTER("('%s')", GST_ELEMENT_NAME (element)); + GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change"); /* lock the queue so another thread (not in sync with this thread's state) * can't call this queue's _get (or whatever) */ g_mutex_lock (queue->qlock); - new_state = GST_STATE_PENDING (element); - - if (new_state == GST_STATE_PAUSED) { - g_cond_signal (queue->not_full); - g_cond_signal (queue->not_empty); - } - else if (new_state == GST_STATE_READY) { - gst_queue_locked_flush (queue); - } - else if (new_state == GST_STATE_PLAYING) { - if (!GST_PAD_CONNECTED (queue->sinkpad)) { - /* FIXME can this be? */ - if (queue->reader) - g_cond_signal (queue->not_empty); - g_mutex_unlock (queue->qlock); - - return GST_STATE_FAILURE; - } + switch (GST_STATE_TRANSITION (element)) { + case GST_STATE_NULL_TO_READY: + gst_queue_locked_flush (queue); + break; + case GST_STATE_PAUSED_TO_PLAYING: + if (!GST_PAD_IS_LINKED (queue->sinkpad)) { + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, + "queue %s is not linked", + GST_ELEMENT_NAME (queue)); + /* FIXME can this be? */ + g_cond_signal (queue->item_add); + + ret = GST_STATE_FAILURE; + goto error; + } else { + GstScheduler *src_sched, *sink_sched; + + src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad)); + sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad)); + + if (src_sched == sink_sched) { + GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, + "queue %s does not connect different schedulers", + GST_ELEMENT_NAME (queue)); + + g_warning ("queue %s does not connect different schedulers", + GST_ELEMENT_NAME (queue)); + + ret = GST_STATE_FAILURE; + goto error; + } + } + queue->interrupt = FALSE; + break; + case GST_STATE_PAUSED_TO_READY: + gst_queue_locked_flush (queue); + break; + default: + break; } - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + if (GST_ELEMENT_CLASS (parent_class)->change_state) + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + + /* this is an ugly hack to make sure our pads are always active. + * Reason for this is that pad activation for the queue element + * depends on 2 schedulers (ugh) */ + gst_pad_set_active (queue->sinkpad, TRUE); + gst_pad_set_active (queue->srcpad, TRUE); + +error: g_mutex_unlock (queue->qlock); - GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element)); + GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); + return ret; } static void -gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) +gst_queue_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) { - GstQueue *queue; + GstQueue *queue = GST_QUEUE (object); - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_QUEUE (object)); - - queue = GST_QUEUE (object); + /* someone could change levels here, and since this + * affects the get/put funcs, we need to lock for safety. */ + g_mutex_lock (queue->qlock); switch (prop_id) { + case ARG_MAX_SIZE_BYTES: + queue->max_size.bytes = g_value_get_uint (value); + break; + case ARG_MAX_SIZE_BUFFERS: + queue->max_size.buffers = g_value_get_uint (value); + break; + case ARG_MAX_SIZE_TIME: + queue->max_size.time = g_value_get_uint64 (value); + break; + case ARG_MIN_TRESHOLD_BYTES: + queue->max_size.bytes = g_value_get_uint (value); + break; + case ARG_MIN_TRESHOLD_BUFFERS: + queue->max_size.buffers = g_value_get_uint (value); + break; + case ARG_MIN_TRESHOLD_TIME: + queue->max_size.time = g_value_get_uint64 (value); + break; case ARG_LEAKY: - queue->leaky = g_value_get_int(value); + queue->leaky = g_value_get_enum (value); + break; + case ARG_MAY_DEADLOCK: + queue->may_deadlock = g_value_get_boolean (value); break; - case ARG_MAX_LEVEL: - queue->size_buffers = g_value_get_int(value); + case ARG_BLOCK_TIMEOUT: + queue->block_timeout = g_value_get_uint64 (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + g_mutex_unlock (queue->qlock); } static void -gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) +gst_queue_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) { - GstQueue *queue; - - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_QUEUE (object)); - - queue = GST_QUEUE (object); + GstQueue *queue = GST_QUEUE (object); switch (prop_id) { + case ARG_CUR_LEVEL_BYTES: + g_value_set_uint (value, queue->cur_level.bytes); + break; + case ARG_CUR_LEVEL_BUFFERS: + g_value_set_uint (value, queue->cur_level.buffers); + break; + case ARG_CUR_LEVEL_TIME: + g_value_set_uint64 (value, queue->cur_level.time); + break; + case ARG_MAX_SIZE_BYTES: + g_value_set_uint (value, queue->max_size.bytes); + break; + case ARG_MAX_SIZE_BUFFERS: + g_value_set_uint (value, queue->max_size.buffers); + break; + case ARG_MAX_SIZE_TIME: + g_value_set_uint64 (value, queue->max_size.time); + break; + case ARG_MIN_TRESHOLD_BYTES: + g_value_set_uint (value, queue->min_treshold.bytes); + break; + case ARG_MIN_TRESHOLD_BUFFERS: + g_value_set_uint (value, queue->min_treshold.buffers); + break; + case ARG_MIN_TRESHOLD_TIME: + g_value_set_uint64 (value, queue->min_treshold.time); + break; case ARG_LEAKY: - g_value_set_int(value, queue->leaky); + g_value_set_enum (value, queue->leaky); break; - case ARG_LEVEL: - g_value_set_int(value, queue->level_buffers); + case ARG_MAY_DEADLOCK: + g_value_set_boolean (value, queue->may_deadlock); break; - case ARG_MAX_LEVEL: - g_value_set_int(value, queue->size_buffers); + case ARG_BLOCK_TIMEOUT: + g_value_set_uint64 (value, queue->block_timeout); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);