/* Queue signals and args */
enum {
- FULL,
+ SIGNAL_UNDERRUN,
+ SIGNAL_RUNNING,
+ SIGNAL_OVERRUN,
LAST_SIGNAL
};
enum {
ARG_0,
- ARG_LEVEL_BUFFERS,
- ARG_LEVEL_BYTES,
- ARG_LEVEL_TIME,
- ARG_SIZE_BUFFERS,
- ARG_SIZE_BYTES,
- ARG_SIZE_TIME,
+ /* FIXME: don't we have another way of doing this
+ * "Gstreamer format" (frame/byte/time) queries? */
+ ARG_CUR_LEVEL_BUFFERS,
+ ARG_CUR_LEVEL_BYTES,
+ ARG_CUR_LEVEL_TIME,
+ ARG_MAX_SIZE_BUFFERS,
+ ARG_MAX_SIZE_BYTES,
+ ARG_MAX_SIZE_TIME,
+ ARG_MIN_TRESHOLD_BUFFERS,
+ ARG_MIN_TRESHOLD_BYTES,
+ ARG_MIN_TRESHOLD_TIME,
ARG_LEAKY,
- ARG_LEVEL,
- ARG_MAX_LEVEL,
- ARG_MIN_THRESHOLD_BYTES,
ARG_MAY_DEADLOCK,
- ARG_BLOCK_TIMEOUT,
+ ARG_BLOCK_TIMEOUT
+ /* FILL ME */
};
-static void gst_queue_base_init (gpointer g_class);
-static void gst_queue_class_init (gpointer g_class,
- gpointer class_data);
-static void gst_queue_init (GTypeInstance *instance,
- gpointer g_class);
-static void gst_queue_dispose (GObject *object);
-
-static void gst_queue_set_property (GObject *object, guint prop_id,
- const GValue *value, GParamSpec *pspec);
-static void gst_queue_get_property (GObject *object, guint prop_id,
- GValue *value, GParamSpec *pspec);
-
-static void gst_queue_chain (GstPad *pad, GstData *data);
-static GstData * gst_queue_get (GstPad *pad);
-static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
+typedef struct _GstQueueEventResponse {
+ GstEvent *event;
+ gboolean ret, handled;
+} GstQueueEventResponse;
+
+static void gst_queue_base_init (GstQueueClass *klass);
+static void gst_queue_class_init (GstQueueClass *klass);
+static void gst_queue_init (GstQueue *queue);
+static void gst_queue_dispose (GObject *object);
+
+static void gst_queue_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec);
+static void gst_queue_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec);
+
+static GstCaps *gst_queue_getcaps (GstPad *pad,
+ GstCaps *caps);
+static GstPadLinkReturn
+ gst_queue_link (GstPad *pad,
+ GstCaps *caps);
+static void gst_queue_chain (GstPad *pad,
+ GstData *data);
+static GstData *gst_queue_get (GstPad *pad);
+static GstBufferPool *
+ gst_queue_get_bufferpool (GstPad *pad);
-static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event);
-
+static gboolean gst_queue_handle_src_event (GstPad *pad,
+ GstEvent *event);
-static void gst_queue_locked_flush (GstQueue *queue);
+static void gst_queue_locked_flush (GstQueue *queue);
-static GstElementStateReturn gst_queue_change_state (GstElement *element);
-static gboolean gst_queue_release_locks (GstElement *element);
+static GstElementStateReturn
+ gst_queue_change_state (GstElement *element);
+static gboolean gst_queue_release_locks (GstElement *element);
-#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
+#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
+
static GType
-queue_leaky_get_type(void) {
+queue_leaky_get_type (void)
+{
static GType queue_leaky_type = 0;
static GEnumValue queue_leaky[] = {
{ GST_QUEUE_NO_LEAK, "0", "Not Leaky" },
static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
GType
-gst_queue_get_type(void)
+gst_queue_get_type (void)
{
static GType queue_type = 0;
if (!queue_type) {
static const GTypeInfo queue_info = {
- sizeof(GstQueueClass),
- gst_queue_base_init,
+ sizeof (GstQueueClass),
+ (GBaseInitFunc) gst_queue_base_init,
NULL,
- gst_queue_class_init,
+ (GClassInitFunc) gst_queue_class_init,
NULL,
NULL,
- sizeof(GstQueue),
+ sizeof (GstQueue),
4,
- gst_queue_init,
+ (GInstanceInitFunc) gst_queue_init,
NULL
};
- queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0);
+
+ queue_type = g_type_register_static (GST_TYPE_ELEMENT,
+ "GstQueue", &queue_info, 0);
}
+
return queue_type;
}
static void
-gst_queue_base_init (gpointer g_class)
+gst_queue_base_init (GstQueueClass *klass)
{
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
gst_element_class_set_details (gstelement_class, &gst_queue_details);
}
static void
-gst_queue_class_init (gpointer g_class, gpointer class_data)
+gst_queue_class_init (GstQueueClass *klass)
{
- GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
- GstQueueClass *gstqueue_class = GST_QUEUE_CLASS (g_class);
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
- parent_class = g_type_class_peek_parent (g_class);
+ parent_class = g_type_class_peek_parent (klass);
- gst_queue_signals[FULL] =
- g_signal_new ("full", G_TYPE_FROM_CLASS (gstqueue_class), G_SIGNAL_RUN_FIRST,
- G_STRUCT_OFFSET (GstQueueClass, full), NULL, NULL,
+ /* signals */
+ gst_queue_signals[SIGNAL_UNDERRUN] =
+ g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+ gst_queue_signals[SIGNAL_RUNNING] =
+ g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+ gst_queue_signals[SIGNAL_OVERRUN] =
+ g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEAKY,
- g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
- GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEVEL,
- g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
- 0, G_MAXINT, 0, G_PARAM_READABLE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAX_LEVEL,
- g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
- 0, G_MAXINT, 100, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MIN_THRESHOLD_BYTES,
- g_param_spec_int ("min_threshold_bytes", "Minimum Threshold",
- "Minimum bytes required before signalling not_empty to reader.",
- 0, G_MAXINT, 0, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAY_DEADLOCK,
- g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
- TRUE, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_BLOCK_TIMEOUT,
- g_param_spec_int ("block_timeout", "Timeout for Block",
- "Microseconds until blocked queue times out and returns filler event. "
- "Value of -1 disables timeout",
- -1, G_MAXINT, -1, G_PARAM_READWRITE));
-
- gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
- gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
- gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
-
- gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
- gstelement_class->release_locks = GST_DEBUG_FUNCPTR(gst_queue_release_locks);
-}
-
-static GstPadLinkReturn
-gst_queue_link (GstPad *pad, GstCaps *caps)
-{
- GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
- GstPad *otherpad;
-
- if (pad == queue->srcpad)
- otherpad = queue->sinkpad;
- else
- otherpad = queue->srcpad;
-
- return gst_pad_proxy_link (otherpad, caps);
-}
-
-static GstCaps*
-gst_queue_getcaps (GstPad *pad, GstCaps *caps)
-{
- GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
- GstPad *otherpad;
-
- if (pad == queue->srcpad)
- otherpad = GST_PAD_PEER (queue->sinkpad);
- else
- otherpad = GST_PAD_PEER (queue->srcpad);
-
- if (otherpad)
- return gst_pad_get_caps (otherpad);
- return NULL;
+ /* properties */
+ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
+ g_param_spec_uint ("current-level-bytes", "Current level (kB)",
+ "Current amount of data in the queue (bytes)",
+ 0, G_MAXUINT, 0, G_PARAM_READABLE));
+ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
+ g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
+ "Current number of buffers in the queue",
+ 0, G_MAXUINT, 0, G_PARAM_READABLE));
+ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
+ g_param_spec_uint64 ("current-level-time", "Current level (ns)",
+ "Current amount of data in the queue (in ns)",
+ 0, G_MAXUINT64, 0, G_PARAM_READABLE));
+
+ g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
+ g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
+ "Max. amount of data in the queue (bytes, 0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
+ g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
+ "Max. number of buffers in the queue (0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
+ g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
+ "Max. amount of data in the queue (in ns, 0=disable)",
+ 0, G_MAXUINT64, 0, G_PARAM_READWRITE));
+
+ g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BYTES,
+ g_param_spec_uint ("min-treshold-bytes", "Min. treshold (kB)",
+ "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BUFFERS,
+ g_param_spec_uint ("min-treshold-buffers", "Min. treshold (buffers)",
+ "Min. number of buffers in the queue to allow reading (0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_TIME,
+ g_param_spec_uint64 ("min-treshold-time", "Min. treshold (ns)",
+ "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
+ 0, G_MAXUINT64, 0, G_PARAM_READWRITE));
+
+ g_object_class_install_property (gobject_class, ARG_LEAKY,
+ g_param_spec_enum ("leaky", "Leaky",
+ "Where the queue leaks, if at all",
+ GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
+ g_param_spec_boolean ("may_deadlock", "May Deadlock",
+ "The queue may deadlock if it's full and not PLAYING",
+ TRUE, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
+ g_param_spec_uint64 ("block_timeout", "Timeout for Block",
+ "Nanoseconds until blocked queue times out and returns filler event. "
+ "Value of -1 disables timeout",
+ 0, G_MAXUINT64, -1, G_PARAM_READWRITE));
+
+ /* set several parent class virtual functions */
+ gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
+ gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
+ gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
+
+ gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
+ gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
}
static void
-gst_queue_init (GTypeInstance *instance, gpointer g_class)
+gst_queue_init (GstQueue *queue)
{
- GstQueue *queue = GST_QUEUE (instance);
-
/* scheduling on this kind of element is, well, interesting */
GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
gst_pad_set_active (queue->srcpad, TRUE);
+ queue->cur_level.buffers = 0; /* no content */
+ queue->cur_level.bytes = 0; /* no content */
+ queue->cur_level.time = 0; /* no content */
+ queue->max_size.buffers = 100; /* max. 100 buffers */
+ queue->max_size.bytes = 1024 * 1024; /* max. 1 MB */
+ queue->max_size.time = GST_SECOND; /* max. 1 sec. */
+ queue->min_treshold.buffers = 0; /* no treshold */
+ queue->min_treshold.bytes = 0; /* no treshold */
+ queue->min_treshold.time = 0; /* no treshold */
+
queue->leaky = GST_QUEUE_NO_LEAK;
- queue->queue = NULL;
- queue->level_buffers = 0;
- queue->level_bytes = 0;
- queue->level_time = G_GINT64_CONSTANT (0);
- queue->size_buffers = 100; /* 100 buffers */
- queue->size_bytes = 100 * 1024; /* 100KB */
- queue->size_time = GST_SECOND; /* 1sec */
- queue->min_threshold_bytes = 0;
queue->may_deadlock = TRUE;
- queue->block_timeout = -1;
+ queue->block_timeout = GST_CLOCK_TIME_NONE;
queue->interrupt = FALSE;
queue->flush = FALSE;
queue->qlock = g_mutex_new ();
- queue->not_empty = g_cond_new ();
- queue->not_full = g_cond_new ();
- queue->events = g_async_queue_new();
+ queue->item_add = g_cond_new ();
+ queue->item_del = g_cond_new ();
+ queue->event_done = g_cond_new ();
+ queue->events = g_queue_new ();
queue->queue = g_queue_new ();
- GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions");
+
+ GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
+ "initialized queue's not_empty & not_full conditions");
}
static void
gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL);
- g_mutex_free (queue->qlock);
- g_cond_free (queue->not_empty);
- g_cond_free (queue->not_full);
+ while (!g_queue_is_empty (queue->queue)) {
+ GstData *data = g_queue_pop_head (queue->queue);
+ gst_data_unref (data);
+ }
g_queue_free (queue->queue);
+ g_mutex_free (queue->qlock);
+ g_cond_free (queue->item_add);
+ g_cond_free (queue->item_del);
+ g_cond_free (queue->event_done);
+ while (!g_queue_is_empty (queue->events)) {
+ GstEvent *event = g_queue_pop_head (queue->events);
+ gst_event_unref (event);
+ }
- g_async_queue_unref(queue->events);
-
- G_OBJECT_CLASS (parent_class)->dispose (object);
+ if (G_OBJECT_CLASS (parent_class)->dispose)
+ G_OBJECT_CLASS (parent_class)->dispose (object);
}
-static GstBufferPool*
-gst_queue_get_bufferpool (GstPad *pad)
+static GstPad *
+gst_queue_otherpad (GstPad *pad)
{
- GstQueue *queue;
+ GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
+ GstPad *otherpad;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ if (pad == queue->srcpad)
+ otherpad = queue->sinkpad;
+ else
+ otherpad = queue->srcpad;
- return gst_pad_get_bufferpool (queue->srcpad);
+ return otherpad;
}
-static void
-gst_queue_cleanup_data (gpointer data, const gpointer user_data)
+static GstPadLinkReturn
+gst_queue_link (GstPad *pad,
+ GstCaps *caps)
{
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data);
+ return gst_pad_proxy_link (gst_queue_otherpad (pad), caps);
+}
- gst_data_unref (GST_DATA (data));
+static GstCaps *
+gst_queue_getcaps (GstPad *pad,
+ GstCaps *caps)
+{
+ GstPad *otherpad = GST_PAD_PEER (gst_queue_otherpad (pad));
+
+ if (otherpad)
+ return gst_pad_get_caps (otherpad);
+
+ return NULL;
+}
+
+static GstBufferPool *
+gst_queue_get_bufferpool (GstPad *pad)
+{
+ return gst_pad_get_bufferpool (gst_queue_otherpad (pad));
}
static void
gst_queue_locked_flush (GstQueue *queue)
{
- gpointer data;
-
- while ((data = g_queue_pop_head (queue->queue))) {
- gst_queue_cleanup_data (data, (gpointer) queue);
+ while (!g_queue_is_empty (queue->queue)) {
+ GstData *data = g_queue_pop_head (queue->queue);
+ gst_data_unref (data);
}
queue->timeval = NULL;
- queue->level_buffers = 0;
- queue->level_bytes = 0;
- queue->level_time = G_GINT64_CONSTANT (0);
+ queue->cur_level.buffers = 0;
+ queue->cur_level.bytes = 0;
+ queue->cur_level.time = 0;
+
/* make sure any pending buffers to be added are flushed too */
queue->flush = TRUE;
- /* signal not_full, since we apparently aren't full anymore */
- g_cond_signal (queue->not_full);
+
+ /* we deleted something... */
+ g_cond_signal (queue->item_del);
}
+#define STATUS(queue, msg) \
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
+ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+ "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+ "-%" G_GUINT64_FORMAT " ns, %u elements", \
+ GST_DEBUG_PAD_NAME (pad), \
+ queue->cur_level.buffers, \
+ queue->min_treshold.buffers, \
+ queue->max_size.buffers, \
+ queue->cur_level.bytes, \
+ queue->min_treshold.bytes, \
+ queue->max_size.bytes, \
+ queue->cur_level.time, \
+ queue->min_treshold.time, \
+ queue->max_size.time, \
+ queue->queue->length)
+
static void
-gst_queue_chain (GstPad *pad, GstData *data)
+gst_queue_chain (GstPad *pad,
+ GstData *data)
{
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- /* check for events to send upstream */
- g_async_queue_lock(queue->events);
- while (g_async_queue_length_unlocked(queue->events) > 0){
- GstEvent *event = (GstEvent*)g_async_queue_pop_unlocked(queue->events);
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream\n");
- gst_pad_event_default (pad, event);
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n");
- }
- g_async_queue_unlock(queue->events);
-
restart:
/* we have to lock the queue since we span threads */
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
-
+
+ /* check for events to send upstream */
+ while (!g_queue_is_empty (queue->events)){
+ GstQueueEventResponse *er = g_queue_pop_head (queue->events);
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
+ er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event);
+ er->handled = TRUE;
+ g_cond_signal (queue->event_done);
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
+ }
+
/* assume don't need to flush this buffer when the queue is filled */
queue->flush = FALSE;
if (GST_IS_EVENT (data)) {
switch (GST_EVENT_TYPE (data)) {
case GST_EVENT_FLUSH:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
+ STATUS (queue, "received flush event");
gst_queue_locked_flush (queue);
break;
case GST_EVENT_EOS:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n",
- GST_ELEMENT_NAME (queue), queue->level_buffers);
+ STATUS (queue, "received EOS");
break;
default:
/* we put the event in the queue, we don't have to act ourselves */
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "adding event %p of type %d",
+ data, GST_EVENT_TYPE (data));
break;
}
}
if (GST_IS_BUFFER (data))
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
- "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
-
- if (queue->level_buffers == queue->size_buffers) {
+ "adding buffer %p of size %d",
+ data, GST_BUFFER_SIZE (data));
+
+ /* We make space available if we're "full" according to whatever
+ * the user defined as "full". Note that this only applies to buffers.
+ * We always handle events and they don't count in our statistics. */
+ if (GST_IS_BUFFER (data) &&
+ ((queue->max_size.buffers > 0 &&
+ queue->cur_level.buffers >= queue->max_size.buffers) ||
+ (queue->max_size.bytes > 0 &&
+ queue->cur_level.bytes >= queue->max_size.bytes) ||
+ (queue->max_size.time > 0 &&
+ queue->cur_level.time >= queue->max_size.time))) {
g_mutex_unlock (queue->qlock);
- g_signal_emit (G_OBJECT (queue), gst_queue_signals[FULL], 0);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
g_mutex_lock (queue->qlock);
- /* if this is a leaky queue... */
- if (queue->leaky) {
- /* FIXME don't want to leak events! */
- /* if we leak on the upstream side, drop the current buffer */
- if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
- if (GST_IS_EVENT (data))
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_EVENT_TYPE(GST_EVENT(data)));
- /* now we have to clean up and exit right away */
+ /* how are we going to make space for this buffer? */
+ switch (queue->leaky) {
+ /* leak current buffer */
+ case GST_QUEUE_LEAK_UPSTREAM:
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "queue is full, leaking buffer on upstream end");
+ /* now we can clean up and exit right away */
g_mutex_unlock (queue->qlock);
goto out_unref;
- }
- /* otherwise we have to push a buffer off the other end */
- else {
- gpointer front;
- GstData *leak;
-
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
-
- front = g_queue_pop_head (queue->queue);
- leak = GST_DATA (front);
-
- queue->level_buffers--;
- if (GST_IS_EVENT (leak)) {
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_EVENT_TYPE(GST_EVENT(leak)));
- } else {
- queue->level_bytes -= GST_BUFFER_SIZE(leak);
+
+ /* leak first buffer in the queue */
+ case GST_QUEUE_LEAK_DOWNSTREAM: {
+ /* this is a bit hacky. We'll manually iterate the list
+ * and find the first buffer from the head on. We'll
+ * unref that and "fix up" the GQueue object... */
+ GList *item;
+ GstData *leak = NULL;
+
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "queue is full, leaking buffer on downstream end");
+
+ for (item = queue->queue->head; item != NULL; item = item->next) {
+ if (GST_IS_BUFFER (item->data)) {
+ leak = item->data;
+ break;
+ }
}
- gst_data_unref (leak);
+ /* if we didn't find anything, it means we have no buffers
+ * in here. That cannot happen, since we had >= 1 bufs */
+ g_assert (leak);
+
+ /* Now remove it from the list, fixing up the GQueue
+ * CHECKME: is a queue->head the first or the last item? */
+ item = g_list_delete_link (queue->queue->head, item);
+ queue->queue->head = g_list_first (item);
+ queue->queue->tail = g_list_last (item);
+ queue->queue->length--;
+
+ /* and unref the data at the end. Twice, because we keep a ref
+ * to make things read-only. Also keep our list uptodate. */
+ queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
+ queue->cur_level.buffers --;
+ if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
+ queue->cur_level.time -= GST_BUFFER_DURATION (data);
+
+ gst_data_unref (data);
+ gst_data_unref (data);
+ break;
}
- }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ default:
+ g_warning ("Unknown leaky type, using default");
+ /* fall-through */
+
+ /* don't leak. Instead, wait for space to be available */
+ case GST_QUEUE_NO_LEAK:
+ STATUS (queue, "pre-full wait");
+
+ while ((queue->max_size.buffers > 0 &&
+ queue->cur_level.buffers >= queue->max_size.buffers) ||
+ (queue->max_size.bytes > 0 &&
+ queue->cur_level.bytes >= queue->max_size.bytes) ||
+ (queue->max_size.time > 0 &&
+ queue->cur_level.time >= queue->max_size.time)) {
+ /* if there's a pending state change for this queue
+ * or its manager, switch back to iterator so bottom
+ * half of state change executes */
+ if (queue->interrupt) {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
+ g_mutex_unlock (queue->qlock);
+ if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
+ GST_ELEMENT (queue))) {
+ goto out_unref;
+ }
+ /* if we got here because we were unlocked after a
+ * flush, we don't need to add the buffer to the
+ * queue again */
+ if (queue->flush) {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "not adding pending buffer after flush");
+ goto out_unref;
+ }
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "adding pending buffer after interrupt");
+ goto restart;
+ }
+
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down. Try to
+ * signal to resolve the error */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_data_unref (data);
+ gst_element_error (GST_ELEMENT (queue),
+ "deadlock found, source pad elements are shut down");
+ /* we don't go to out_unref here, since we want to
+ * unref the buffer *before* calling gst_element_error */
+ return;
+ } else {
+ GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+ "%s: waiting for the app to restart "
+ "source pad elements",
+ GST_ELEMENT_NAME (queue));
+ }
+ }
+
+ STATUS (queue, "waiting for item_del signal");
+ g_cond_wait (queue->item_del, queue->qlock);
+ STATUS (queue, "received item_del signal");
+ }
- while (queue->level_buffers == queue->size_buffers) {
- /* if there's a pending state change for this queue or its manager, switch */
- /* back to iterator so bottom half of state change executes */
- if (queue->interrupt) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!");
+ STATUS (queue, "post-full wait");
g_mutex_unlock (queue->qlock);
- if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue)))
- goto out_unref;
- /* if we got here because we were unlocked after a flush, we don't need
- * to add the buffer to the queue again */
- if (queue->flush) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush");
- goto out_unref;
- }
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "adding pending buffer after interrupt");
- goto restart;
- }
- if (GST_STATE (queue) != GST_STATE_PLAYING) {
- /* this means the other end is shut down */
- /* try to signal to resolve the error */
- if (!queue->may_deadlock) {
- g_mutex_unlock (queue->qlock);
- gst_data_unref (data);
- gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
- /* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */
- return;
- }
- else {
- g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
- }
- }
-
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
- g_cond_wait (queue->not_full, queue->qlock);
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal");
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+ g_mutex_lock (queue->qlock);
+ break;
}
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
}
- /* put the buffer on the tail of the list */
+ /* put the buffer on the tail of the list. We keep a reference,
+ * so that the data is read-only while in here. There's a good
+ * reason to do so: we have a size and time counter, and any
+ * modification to the content could change any of the two. */
+ gst_data_ref (data);
g_queue_push_tail (queue->queue, data);
- queue->level_buffers++;
- if (GST_IS_BUFFER (data))
- queue->level_bytes += GST_BUFFER_SIZE (data);
-
- /* this assertion _has_ to hold */
- g_assert (queue->queue->length == queue->level_buffers);
+ /* Note that we only add buffers (not events) to the statistics */
+ if (GST_IS_BUFFER (data)) {
+ queue->cur_level.buffers++;
+ queue->cur_level.bytes += GST_BUFFER_SIZE (data);
+ if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
+ queue->cur_level.time += GST_BUFFER_DURATION (data);
+ }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes",
- GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ STATUS (queue, "+ level");
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
- g_cond_signal (queue->not_empty);
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_add");
+ g_cond_signal (queue->item_add);
g_mutex_unlock (queue->qlock);
return;
gst_queue_get (GstPad *pad)
{
GstQueue *queue;
- GstData *data = NULL;
- gpointer front;
+ GstData *data;
- g_assert(pad != NULL);
- g_assert(GST_IS_PAD(pad));
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
restart:
/* have to lock for thread-safety */
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty);
-
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
- while (queue->level_buffers == 0) {
- /* if there's a pending state change for this queue or its manager, switch
- * back to iterator so bottom half of state change executes
- */
- //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
- if (queue->interrupt) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!");
- g_mutex_unlock (queue->qlock);
- if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
- return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
- goto restart;
- }
- if (GST_STATE (queue) != GST_STATE_PLAYING) {
- /* this means the other end is shut down */
- if (!queue->may_deadlock) {
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "locked t:%p", g_thread_self ());
+
+ if (queue->queue->length == 0 ||
+ (queue->min_treshold.buffers > 0 &&
+ queue->cur_level.buffers < queue->min_treshold.buffers) ||
+ (queue->min_treshold.bytes > 0 &&
+ queue->cur_level.bytes < queue->min_treshold.bytes) ||
+ (queue->min_treshold.time > 0 &&
+ queue->cur_level.time < queue->min_treshold.time)) {
+ g_mutex_unlock (queue->qlock);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
+ g_mutex_lock (queue->qlock);
+
+ STATUS (queue, "pre-empty wait");
+ while (queue->queue->length == 0 ||
+ (queue->min_treshold.buffers > 0 &&
+ queue->cur_level.buffers < queue->min_treshold.buffers) ||
+ (queue->min_treshold.bytes > 0 &&
+ queue->cur_level.bytes < queue->min_treshold.bytes) ||
+ (queue->min_treshold.time > 0 &&
+ queue->cur_level.time < queue->min_treshold.time)) {
+ /* if there's a pending state change for this queue or its
+ * manager, switch back to iterator so bottom half of state
+ * change executes. */
+ if (queue->interrupt) {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
g_mutex_unlock (queue->qlock);
- gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
+ if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
+ GST_ELEMENT (queue)))
+ return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
goto restart;
}
- else {
- g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_element_error (GST_ELEMENT (queue),
+ "deadlock found, sink pad elements are shut down");
+ goto restart;
+ } else {
+ GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+ "%s: waiting for the app to restart "
+ "source pad elements",
+ GST_ELEMENT_NAME (queue));
+ }
}
- }
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ STATUS (queue, "waiting for item_add");
- /* if (queue->block_timeout > -1){ */
- if (FALSE) {
- GTimeVal timeout;
- g_get_current_time(&timeout);
- g_time_val_add(&timeout, queue->block_timeout);
- if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
- g_mutex_unlock (queue->qlock);
- g_warning ("filler");
- return GST_DATA (gst_event_new_filler());
+ if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
+ GTimeVal timeout;
+ g_get_current_time (&timeout);
+ g_time_val_add (&timeout, queue->block_timeout / 1000);
+ if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)){
+ g_mutex_unlock (queue->qlock);
+ GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+ "Sending filler event");
+ return GST_DATA (gst_event_new_filler ());
+ }
+ } else {
+ g_cond_wait (queue->item_add, queue->qlock);
}
+ STATUS (queue, "got item_add signal");
}
- else {
- g_cond_wait (queue->not_empty, queue->qlock);
- }
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
- }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
- front = g_queue_pop_head (queue->queue);
- data = GST_DATA (front);
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved data %p from queue", data);
+ STATUS (queue, "post-empty wait");
+ g_mutex_unlock (queue->qlock);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+ g_mutex_lock (queue->qlock);
+ }
- queue->level_buffers--;
- if (GST_IS_BUFFER (data))
- queue->level_bytes -= GST_BUFFER_SIZE (data);
+ /* There's something in the list now, whatever it is */
+ data = g_queue_pop_head (queue->queue);
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "retrieved data %p from queue", data);
+
+ if (GST_IS_BUFFER (data)) {
+ /* Update statistics */
+ queue->cur_level.buffers--;
+ queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
+ if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
+ queue->cur_level.time -= GST_BUFFER_DURATION (data);
+ }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes",
- GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ /* Now that we're done, we can lose our own reference to
+ * the item, since we're no longer in danger. */
+ gst_data_unref (data);
- /* this assertion _has_ to hold */
- g_assert (queue->queue->length == queue->level_buffers);
+ STATUS (queue, "after _get()");
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_full");
- g_cond_signal (queue->not_full);
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
+ g_cond_signal (queue->item_del);
g_mutex_unlock (queue->qlock);
- /* FIXME where should this be? locked? */
+ /* FIXME: I suppose this needs to be locked, since the EOS
+ * bit affects the pipeline state. However, that bit is
+ * locked too so it'd cause a deadlock. */
if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT (data);
- switch (GST_EVENT_TYPE(event)) {
+ switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "queue \"%s\" eos",
+ GST_ELEMENT_NAME (queue));
gst_element_set_eos (GST_ELEMENT (queue));
break;
default:
static gboolean
-gst_queue_handle_src_event (GstPad *pad, GstEvent *event)
+gst_queue_handle_src_event (GstPad *pad,
+ GstEvent *event)
{
- GstQueue *queue;
+ GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res;
- gint event_type;
- gint flag_flush = 0;
-
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
g_mutex_lock (queue->qlock);
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
- /* push the event to the queue for upstream consumption */
- g_async_queue_push(queue->events, event);
- g_warning ("FIXME: sending event in a running queue");
- /* FIXME wait for delivery of the event here, then return the result
- * instead of FALSE */
- res = FALSE;
- goto done;
- }
-
- event_type = GST_EVENT_TYPE (event);
- if (event_type == GST_EVENT_SEEK)
- flag_flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
-
- res = gst_pad_event_default (pad, event);
+ GstQueueEventResponse er;
+
+ /* push the event to the queue and wait for upstream consumption */
+ er.event = event;
+ er.handled = FALSE;
+ g_queue_push_tail (queue->events, &er);
+ while (!er.handled) {
+ g_cond_wait (queue->event_done, queue->qlock);
+ }
+ res = er.ret;
+ } else {
+ res = gst_pad_event_default (pad, event);
- switch (event_type) {
- case GST_EVENT_FLUSH:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
- gst_queue_locked_flush (queue);
- break;
- case GST_EVENT_SEEK:
- if (flag_flush) {
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH:
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "FLUSH event, flushing queue\n");
gst_queue_locked_flush (queue);
- }
- default:
- break;
+ break;
+ case GST_EVENT_SEEK:
+ if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
+ gst_queue_locked_flush (queue);
+ }
+ default:
+ break;
+ }
}
-done:
g_mutex_unlock (queue->qlock);
/* we have to claim success, but we don't really know */
g_mutex_lock (queue->qlock);
queue->interrupt = TRUE;
- g_cond_signal (queue->not_full);
- g_cond_signal (queue->not_empty);
+ g_cond_signal (queue->item_add);
+ g_cond_signal (queue->item_del);
g_mutex_unlock (queue->qlock);
return TRUE;
gst_queue_change_state (GstElement *element)
{
GstQueue *queue;
- GstElementStateReturn ret;
+ GstElementStateReturn ret = GST_STATE_SUCCESS;
queue = GST_QUEUE (element);
case GST_STATE_NULL_TO_READY:
gst_queue_locked_flush (queue);
break;
- case GST_STATE_READY_TO_PAUSED:
- break;
case GST_STATE_PAUSED_TO_PLAYING:
if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
+ "queue %s is not linked",
+ GST_ELEMENT_NAME (queue));
/* FIXME can this be? */
- g_cond_signal (queue->not_empty);
+ g_cond_signal (queue->item_add);
ret = GST_STATE_FAILURE;
goto error;
- }
- else {
+ } else {
GstScheduler *src_sched, *sink_sched;
src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
if (src_sched == sink_sched) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s does not connect different schedulers",
- GST_ELEMENT_NAME (queue));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
+ "queue %s does not connect different schedulers",
+ GST_ELEMENT_NAME (queue));
g_warning ("queue %s does not connect different schedulers",
- GST_ELEMENT_NAME (queue));
+ GST_ELEMENT_NAME (queue));
ret = GST_STATE_FAILURE;
goto error;
}
queue->interrupt = FALSE;
break;
- case GST_STATE_PLAYING_TO_PAUSED:
- break;
case GST_STATE_PAUSED_TO_READY:
gst_queue_locked_flush (queue);
break;
- case GST_STATE_READY_TO_NULL:
+ default:
break;
}
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
- /* this is an ugly hack to make sure our pads are always active. Reason for this is that
- * pad activation for the queue element depends on 2 schedulers (ugh) */
+ if (GST_ELEMENT_CLASS (parent_class)->change_state)
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+ /* this is an ugly hack to make sure our pads are always active.
+ * Reason for this is that pad activation for the queue element
+ * depends on 2 schedulers (ugh) */
gst_pad_set_active (queue->sinkpad, TRUE);
gst_pad_set_active (queue->srcpad, TRUE);
g_mutex_unlock (queue->qlock);
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
+
return ret;
}
static void
-gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
+gst_queue_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
{
- GstQueue *queue;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_QUEUE (object));
+ GstQueue *queue = GST_QUEUE (object);
- queue = GST_QUEUE (object);
+ /* someone could change levels here, and since this
+ * affects the get/put funcs, we need to lock for safety. */
+ g_mutex_lock (queue->qlock);
switch (prop_id) {
- case ARG_LEAKY:
- queue->leaky = g_value_get_enum (value);
+ case ARG_MAX_SIZE_BYTES:
+ queue->max_size.bytes = g_value_get_uint (value);
+ break;
+ case ARG_MAX_SIZE_BUFFERS:
+ queue->max_size.buffers = g_value_get_uint (value);
+ break;
+ case ARG_MAX_SIZE_TIME:
+ queue->max_size.time = g_value_get_uint64 (value);
break;
- case ARG_MAX_LEVEL:
- queue->size_buffers = g_value_get_int (value);
+ case ARG_MIN_TRESHOLD_BYTES:
+ queue->max_size.bytes = g_value_get_uint (value);
break;
- case ARG_MIN_THRESHOLD_BYTES:
- queue->min_threshold_bytes = g_value_get_int (value);
+ case ARG_MIN_TRESHOLD_BUFFERS:
+ queue->max_size.buffers = g_value_get_uint (value);
+ break;
+ case ARG_MIN_TRESHOLD_TIME:
+ queue->max_size.time = g_value_get_uint64 (value);
+ break;
+ case ARG_LEAKY:
+ queue->leaky = g_value_get_enum (value);
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
break;
case ARG_BLOCK_TIMEOUT:
- queue->block_timeout = g_value_get_int (value);
+ queue->block_timeout = g_value_get_uint64 (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
+
+ g_mutex_unlock (queue->qlock);
}
static void
-gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
+gst_queue_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
{
- GstQueue *queue;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_QUEUE (object));
-
- queue = GST_QUEUE (object);
+ GstQueue *queue = GST_QUEUE (object);
switch (prop_id) {
- case ARG_LEAKY:
- g_value_set_enum (value, queue->leaky);
+ case ARG_CUR_LEVEL_BYTES:
+ g_value_set_uint (value, queue->cur_level.bytes);
+ break;
+ case ARG_CUR_LEVEL_BUFFERS:
+ g_value_set_uint (value, queue->cur_level.buffers);
+ break;
+ case ARG_CUR_LEVEL_TIME:
+ g_value_set_uint64 (value, queue->cur_level.time);
+ break;
+ case ARG_MAX_SIZE_BYTES:
+ g_value_set_uint (value, queue->max_size.bytes);
break;
- case ARG_LEVEL:
- g_value_set_int (value, queue->level_buffers);
+ case ARG_MAX_SIZE_BUFFERS:
+ g_value_set_uint (value, queue->max_size.buffers);
break;
- case ARG_MAX_LEVEL:
- g_value_set_int (value, queue->size_buffers);
+ case ARG_MAX_SIZE_TIME:
+ g_value_set_uint64 (value, queue->max_size.time);
break;
- case ARG_MIN_THRESHOLD_BYTES:
- g_value_set_int (value, queue->min_threshold_bytes);
+ case ARG_MIN_TRESHOLD_BYTES:
+ g_value_set_uint (value, queue->min_treshold.bytes);
+ break;
+ case ARG_MIN_TRESHOLD_BUFFERS:
+ g_value_set_uint (value, queue->min_treshold.buffers);
+ break;
+ case ARG_MIN_TRESHOLD_TIME:
+ g_value_set_uint64 (value, queue->min_treshold.time);
+ break;
+ case ARG_LEAKY:
+ g_value_set_enum (value, queue->leaky);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;
case ARG_BLOCK_TIMEOUT:
- g_value_set_int (value, queue->block_timeout);
+ g_value_set_uint64 (value, queue->block_timeout);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GstPad *sinkpad;
GstPad *srcpad;
- /* the queue of buffers we're keeping our grubby hands on */
+ /* the queue of data we're keeping our grubby hands on */
GQueue *queue;
- guint level_buffers; /* number of buffers queued here */
- guint level_bytes; /* number of bytes queued here */
- guint64 level_time; /* amount of time queued here */
+ struct {
+ guint buffers; /* no. of buffers */
+ guint bytes; /* no. of bytes */
+ guint64 time; /* amount of time */
+ } cur_level, /* currently in the queue */
+ max_size, /* max. amount of data allowed in the queue */
+ min_treshold; /* min. amount of data required to wake reader */
- guint size_buffers; /* size of queue in buffers */
- guint size_bytes; /* size of queue in bytes */
- guint64 size_time; /* size of queue in time */
+ /* whether we leak data, and at which end */
+ gint leaky;
+
+ /* number of nanoseconds until a blocked queue 'times out'
+ * to receive data and returns a filler event. -1 = disable */
+ guint64 block_timeout;
+
+ /* it the queue should fail on possible deadlocks */
+ gboolean may_deadlock;
- gint leaky; /* whether the queue is leaky, and if so at which end */
- gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER.
- * A value of -1 will block forever. */
- guint min_threshold_bytes; /* the minimum number of bytes required before
- * waking up the reader thread */
- gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
gboolean interrupt;
gboolean flush;
GMutex *qlock; /* lock for queue (vs object lock) */
- GCond *not_empty; /* signals buffers now available for reading */
- GCond *not_full; /* signals space now available for writing */
+ GCond *item_add; /* signals buffers now available for reading */
+ GCond *item_del; /* signals space now available for writing */
+ GCond *event_done; /* upstream event signaller */
GTimeVal *timeval; /* the timeout for the queue locking */
- GAsyncQueue *events; /* upstream events get decoupled here */
+ GQueue *events; /* upstream events get decoupled here */
gpointer _gst_reserved[GST_PADDING];
};
struct _GstQueueClass {
GstElementClass parent_class;
- /* signal callbacks */
- void (*full) (GstQueue *queue);
+ /* signals - 'running' is called from both sides
+ * which might make it sort of non-useful... */
+ void (*underrun) (GstQueue *queue);
+ void (*running) (GstQueue *queue);
+ void (*overrun) (GstQueue *queue);
gpointer _gst_reserved[GST_PADDING];
};
/* Queue signals and args */
enum {
- FULL,
+ SIGNAL_UNDERRUN,
+ SIGNAL_RUNNING,
+ SIGNAL_OVERRUN,
LAST_SIGNAL
};
enum {
ARG_0,
- ARG_LEVEL_BUFFERS,
- ARG_LEVEL_BYTES,
- ARG_LEVEL_TIME,
- ARG_SIZE_BUFFERS,
- ARG_SIZE_BYTES,
- ARG_SIZE_TIME,
+ /* FIXME: don't we have another way of doing this
+ * "Gstreamer format" (frame/byte/time) queries? */
+ ARG_CUR_LEVEL_BUFFERS,
+ ARG_CUR_LEVEL_BYTES,
+ ARG_CUR_LEVEL_TIME,
+ ARG_MAX_SIZE_BUFFERS,
+ ARG_MAX_SIZE_BYTES,
+ ARG_MAX_SIZE_TIME,
+ ARG_MIN_TRESHOLD_BUFFERS,
+ ARG_MIN_TRESHOLD_BYTES,
+ ARG_MIN_TRESHOLD_TIME,
ARG_LEAKY,
- ARG_LEVEL,
- ARG_MAX_LEVEL,
- ARG_MIN_THRESHOLD_BYTES,
ARG_MAY_DEADLOCK,
- ARG_BLOCK_TIMEOUT,
+ ARG_BLOCK_TIMEOUT
+ /* FILL ME */
};
-static void gst_queue_base_init (gpointer g_class);
-static void gst_queue_class_init (gpointer g_class,
- gpointer class_data);
-static void gst_queue_init (GTypeInstance *instance,
- gpointer g_class);
-static void gst_queue_dispose (GObject *object);
-
-static void gst_queue_set_property (GObject *object, guint prop_id,
- const GValue *value, GParamSpec *pspec);
-static void gst_queue_get_property (GObject *object, guint prop_id,
- GValue *value, GParamSpec *pspec);
-
-static void gst_queue_chain (GstPad *pad, GstData *data);
-static GstData * gst_queue_get (GstPad *pad);
-static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
+typedef struct _GstQueueEventResponse {
+ GstEvent *event;
+ gboolean ret, handled;
+} GstQueueEventResponse;
+
+static void gst_queue_base_init (GstQueueClass *klass);
+static void gst_queue_class_init (GstQueueClass *klass);
+static void gst_queue_init (GstQueue *queue);
+static void gst_queue_dispose (GObject *object);
+
+static void gst_queue_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec);
+static void gst_queue_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec);
+
+static GstCaps *gst_queue_getcaps (GstPad *pad,
+ GstCaps *caps);
+static GstPadLinkReturn
+ gst_queue_link (GstPad *pad,
+ GstCaps *caps);
+static void gst_queue_chain (GstPad *pad,
+ GstData *data);
+static GstData *gst_queue_get (GstPad *pad);
+static GstBufferPool *
+ gst_queue_get_bufferpool (GstPad *pad);
-static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event);
-
+static gboolean gst_queue_handle_src_event (GstPad *pad,
+ GstEvent *event);
-static void gst_queue_locked_flush (GstQueue *queue);
+static void gst_queue_locked_flush (GstQueue *queue);
-static GstElementStateReturn gst_queue_change_state (GstElement *element);
-static gboolean gst_queue_release_locks (GstElement *element);
+static GstElementStateReturn
+ gst_queue_change_state (GstElement *element);
+static gboolean gst_queue_release_locks (GstElement *element);
-#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
+#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
+
static GType
-queue_leaky_get_type(void) {
+queue_leaky_get_type (void)
+{
static GType queue_leaky_type = 0;
static GEnumValue queue_leaky[] = {
{ GST_QUEUE_NO_LEAK, "0", "Not Leaky" },
static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
GType
-gst_queue_get_type(void)
+gst_queue_get_type (void)
{
static GType queue_type = 0;
if (!queue_type) {
static const GTypeInfo queue_info = {
- sizeof(GstQueueClass),
- gst_queue_base_init,
+ sizeof (GstQueueClass),
+ (GBaseInitFunc) gst_queue_base_init,
NULL,
- gst_queue_class_init,
+ (GClassInitFunc) gst_queue_class_init,
NULL,
NULL,
- sizeof(GstQueue),
+ sizeof (GstQueue),
4,
- gst_queue_init,
+ (GInstanceInitFunc) gst_queue_init,
NULL
};
- queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0);
+
+ queue_type = g_type_register_static (GST_TYPE_ELEMENT,
+ "GstQueue", &queue_info, 0);
}
+
return queue_type;
}
static void
-gst_queue_base_init (gpointer g_class)
+gst_queue_base_init (GstQueueClass *klass)
{
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
gst_element_class_set_details (gstelement_class, &gst_queue_details);
}
static void
-gst_queue_class_init (gpointer g_class, gpointer class_data)
+gst_queue_class_init (GstQueueClass *klass)
{
- GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
- GstQueueClass *gstqueue_class = GST_QUEUE_CLASS (g_class);
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
- parent_class = g_type_class_peek_parent (g_class);
+ parent_class = g_type_class_peek_parent (klass);
- gst_queue_signals[FULL] =
- g_signal_new ("full", G_TYPE_FROM_CLASS (gstqueue_class), G_SIGNAL_RUN_FIRST,
- G_STRUCT_OFFSET (GstQueueClass, full), NULL, NULL,
+ /* signals */
+ gst_queue_signals[SIGNAL_UNDERRUN] =
+ g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+ gst_queue_signals[SIGNAL_RUNNING] =
+ g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+ gst_queue_signals[SIGNAL_OVERRUN] =
+ g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEAKY,
- g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
- GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_LEVEL,
- g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
- 0, G_MAXINT, 0, G_PARAM_READABLE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAX_LEVEL,
- g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
- 0, G_MAXINT, 100, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MIN_THRESHOLD_BYTES,
- g_param_spec_int ("min_threshold_bytes", "Minimum Threshold",
- "Minimum bytes required before signalling not_empty to reader.",
- 0, G_MAXINT, 0, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_MAY_DEADLOCK,
- g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
- TRUE, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (gstqueue_class), ARG_BLOCK_TIMEOUT,
- g_param_spec_int ("block_timeout", "Timeout for Block",
- "Microseconds until blocked queue times out and returns filler event. "
- "Value of -1 disables timeout",
- -1, G_MAXINT, -1, G_PARAM_READWRITE));
-
- gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
- gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
- gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
-
- gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
- gstelement_class->release_locks = GST_DEBUG_FUNCPTR(gst_queue_release_locks);
-}
-
-static GstPadLinkReturn
-gst_queue_link (GstPad *pad, GstCaps *caps)
-{
- GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
- GstPad *otherpad;
-
- if (pad == queue->srcpad)
- otherpad = queue->sinkpad;
- else
- otherpad = queue->srcpad;
-
- return gst_pad_proxy_link (otherpad, caps);
-}
-
-static GstCaps*
-gst_queue_getcaps (GstPad *pad, GstCaps *caps)
-{
- GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
- GstPad *otherpad;
-
- if (pad == queue->srcpad)
- otherpad = GST_PAD_PEER (queue->sinkpad);
- else
- otherpad = GST_PAD_PEER (queue->srcpad);
-
- if (otherpad)
- return gst_pad_get_caps (otherpad);
- return NULL;
+ /* properties */
+ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
+ g_param_spec_uint ("current-level-bytes", "Current level (kB)",
+ "Current amount of data in the queue (bytes)",
+ 0, G_MAXUINT, 0, G_PARAM_READABLE));
+ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
+ g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
+ "Current number of buffers in the queue",
+ 0, G_MAXUINT, 0, G_PARAM_READABLE));
+ g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
+ g_param_spec_uint64 ("current-level-time", "Current level (ns)",
+ "Current amount of data in the queue (in ns)",
+ 0, G_MAXUINT64, 0, G_PARAM_READABLE));
+
+ g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
+ g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
+ "Max. amount of data in the queue (bytes, 0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
+ g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
+ "Max. number of buffers in the queue (0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
+ g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
+ "Max. amount of data in the queue (in ns, 0=disable)",
+ 0, G_MAXUINT64, 0, G_PARAM_READWRITE));
+
+ g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BYTES,
+ g_param_spec_uint ("min-treshold-bytes", "Min. treshold (kB)",
+ "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_BUFFERS,
+ g_param_spec_uint ("min-treshold-buffers", "Min. treshold (buffers)",
+ "Min. number of buffers in the queue to allow reading (0=disable)",
+ 0, G_MAXUINT, 0, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MIN_TRESHOLD_TIME,
+ g_param_spec_uint64 ("min-treshold-time", "Min. treshold (ns)",
+ "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
+ 0, G_MAXUINT64, 0, G_PARAM_READWRITE));
+
+ g_object_class_install_property (gobject_class, ARG_LEAKY,
+ g_param_spec_enum ("leaky", "Leaky",
+ "Where the queue leaks, if at all",
+ GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
+ g_param_spec_boolean ("may_deadlock", "May Deadlock",
+ "The queue may deadlock if it's full and not PLAYING",
+ TRUE, G_PARAM_READWRITE));
+ g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
+ g_param_spec_uint64 ("block_timeout", "Timeout for Block",
+ "Nanoseconds until blocked queue times out and returns filler event. "
+ "Value of -1 disables timeout",
+ 0, G_MAXUINT64, -1, G_PARAM_READWRITE));
+
+ /* set several parent class virtual functions */
+ gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
+ gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
+ gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
+
+ gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
+ gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
}
static void
-gst_queue_init (GTypeInstance *instance, gpointer g_class)
+gst_queue_init (GstQueue *queue)
{
- GstQueue *queue = GST_QUEUE (instance);
-
/* scheduling on this kind of element is, well, interesting */
GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
gst_pad_set_active (queue->srcpad, TRUE);
+ queue->cur_level.buffers = 0; /* no content */
+ queue->cur_level.bytes = 0; /* no content */
+ queue->cur_level.time = 0; /* no content */
+ queue->max_size.buffers = 100; /* max. 100 buffers */
+ queue->max_size.bytes = 1024 * 1024; /* max. 1 MB */
+ queue->max_size.time = GST_SECOND; /* max. 1 sec. */
+ queue->min_treshold.buffers = 0; /* no treshold */
+ queue->min_treshold.bytes = 0; /* no treshold */
+ queue->min_treshold.time = 0; /* no treshold */
+
queue->leaky = GST_QUEUE_NO_LEAK;
- queue->queue = NULL;
- queue->level_buffers = 0;
- queue->level_bytes = 0;
- queue->level_time = G_GINT64_CONSTANT (0);
- queue->size_buffers = 100; /* 100 buffers */
- queue->size_bytes = 100 * 1024; /* 100KB */
- queue->size_time = GST_SECOND; /* 1sec */
- queue->min_threshold_bytes = 0;
queue->may_deadlock = TRUE;
- queue->block_timeout = -1;
+ queue->block_timeout = GST_CLOCK_TIME_NONE;
queue->interrupt = FALSE;
queue->flush = FALSE;
queue->qlock = g_mutex_new ();
- queue->not_empty = g_cond_new ();
- queue->not_full = g_cond_new ();
- queue->events = g_async_queue_new();
+ queue->item_add = g_cond_new ();
+ queue->item_del = g_cond_new ();
+ queue->event_done = g_cond_new ();
+ queue->events = g_queue_new ();
queue->queue = g_queue_new ();
- GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions");
+
+ GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
+ "initialized queue's not_empty & not_full conditions");
}
static void
gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL);
- g_mutex_free (queue->qlock);
- g_cond_free (queue->not_empty);
- g_cond_free (queue->not_full);
+ while (!g_queue_is_empty (queue->queue)) {
+ GstData *data = g_queue_pop_head (queue->queue);
+ gst_data_unref (data);
+ }
g_queue_free (queue->queue);
+ g_mutex_free (queue->qlock);
+ g_cond_free (queue->item_add);
+ g_cond_free (queue->item_del);
+ g_cond_free (queue->event_done);
+ while (!g_queue_is_empty (queue->events)) {
+ GstEvent *event = g_queue_pop_head (queue->events);
+ gst_event_unref (event);
+ }
- g_async_queue_unref(queue->events);
-
- G_OBJECT_CLASS (parent_class)->dispose (object);
+ if (G_OBJECT_CLASS (parent_class)->dispose)
+ G_OBJECT_CLASS (parent_class)->dispose (object);
}
-static GstBufferPool*
-gst_queue_get_bufferpool (GstPad *pad)
+static GstPad *
+gst_queue_otherpad (GstPad *pad)
{
- GstQueue *queue;
+ GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
+ GstPad *otherpad;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ if (pad == queue->srcpad)
+ otherpad = queue->sinkpad;
+ else
+ otherpad = queue->srcpad;
- return gst_pad_get_bufferpool (queue->srcpad);
+ return otherpad;
}
-static void
-gst_queue_cleanup_data (gpointer data, const gpointer user_data)
+static GstPadLinkReturn
+gst_queue_link (GstPad *pad,
+ GstCaps *caps)
{
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data);
+ return gst_pad_proxy_link (gst_queue_otherpad (pad), caps);
+}
- gst_data_unref (GST_DATA (data));
+static GstCaps *
+gst_queue_getcaps (GstPad *pad,
+ GstCaps *caps)
+{
+ GstPad *otherpad = GST_PAD_PEER (gst_queue_otherpad (pad));
+
+ if (otherpad)
+ return gst_pad_get_caps (otherpad);
+
+ return NULL;
+}
+
+static GstBufferPool *
+gst_queue_get_bufferpool (GstPad *pad)
+{
+ return gst_pad_get_bufferpool (gst_queue_otherpad (pad));
}
static void
gst_queue_locked_flush (GstQueue *queue)
{
- gpointer data;
-
- while ((data = g_queue_pop_head (queue->queue))) {
- gst_queue_cleanup_data (data, (gpointer) queue);
+ while (!g_queue_is_empty (queue->queue)) {
+ GstData *data = g_queue_pop_head (queue->queue);
+ gst_data_unref (data);
}
queue->timeval = NULL;
- queue->level_buffers = 0;
- queue->level_bytes = 0;
- queue->level_time = G_GINT64_CONSTANT (0);
+ queue->cur_level.buffers = 0;
+ queue->cur_level.bytes = 0;
+ queue->cur_level.time = 0;
+
/* make sure any pending buffers to be added are flushed too */
queue->flush = TRUE;
- /* signal not_full, since we apparently aren't full anymore */
- g_cond_signal (queue->not_full);
+
+ /* we deleted something... */
+ g_cond_signal (queue->item_del);
}
+#define STATUS(queue, msg) \
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
+ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+ "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+ "-%" G_GUINT64_FORMAT " ns, %u elements", \
+ GST_DEBUG_PAD_NAME (pad), \
+ queue->cur_level.buffers, \
+ queue->min_treshold.buffers, \
+ queue->max_size.buffers, \
+ queue->cur_level.bytes, \
+ queue->min_treshold.bytes, \
+ queue->max_size.bytes, \
+ queue->cur_level.time, \
+ queue->min_treshold.time, \
+ queue->max_size.time, \
+ queue->queue->length)
+
static void
-gst_queue_chain (GstPad *pad, GstData *data)
+gst_queue_chain (GstPad *pad,
+ GstData *data)
{
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- /* check for events to send upstream */
- g_async_queue_lock(queue->events);
- while (g_async_queue_length_unlocked(queue->events) > 0){
- GstEvent *event = (GstEvent*)g_async_queue_pop_unlocked(queue->events);
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream\n");
- gst_pad_event_default (pad, event);
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n");
- }
- g_async_queue_unlock(queue->events);
-
restart:
/* we have to lock the queue since we span threads */
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
-
+
+ /* check for events to send upstream */
+ while (!g_queue_is_empty (queue->events)){
+ GstQueueEventResponse *er = g_queue_pop_head (queue->events);
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
+ er->ret = gst_pad_event_default (GST_PAD_PEER (pad), er->event);
+ er->handled = TRUE;
+ g_cond_signal (queue->event_done);
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
+ }
+
/* assume don't need to flush this buffer when the queue is filled */
queue->flush = FALSE;
if (GST_IS_EVENT (data)) {
switch (GST_EVENT_TYPE (data)) {
case GST_EVENT_FLUSH:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
+ STATUS (queue, "received flush event");
gst_queue_locked_flush (queue);
break;
case GST_EVENT_EOS:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n",
- GST_ELEMENT_NAME (queue), queue->level_buffers);
+ STATUS (queue, "received EOS");
break;
default:
/* we put the event in the queue, we don't have to act ourselves */
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "adding event %p of type %d",
+ data, GST_EVENT_TYPE (data));
break;
}
}
if (GST_IS_BUFFER (data))
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
- "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
-
- if (queue->level_buffers == queue->size_buffers) {
+ "adding buffer %p of size %d",
+ data, GST_BUFFER_SIZE (data));
+
+ /* We make space available if we're "full" according to whatever
+ * the user defined as "full". Note that this only applies to buffers.
+ * We always handle events and they don't count in our statistics. */
+ if (GST_IS_BUFFER (data) &&
+ ((queue->max_size.buffers > 0 &&
+ queue->cur_level.buffers >= queue->max_size.buffers) ||
+ (queue->max_size.bytes > 0 &&
+ queue->cur_level.bytes >= queue->max_size.bytes) ||
+ (queue->max_size.time > 0 &&
+ queue->cur_level.time >= queue->max_size.time))) {
g_mutex_unlock (queue->qlock);
- g_signal_emit (G_OBJECT (queue), gst_queue_signals[FULL], 0);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
g_mutex_lock (queue->qlock);
- /* if this is a leaky queue... */
- if (queue->leaky) {
- /* FIXME don't want to leak events! */
- /* if we leak on the upstream side, drop the current buffer */
- if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
- if (GST_IS_EVENT (data))
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_EVENT_TYPE(GST_EVENT(data)));
- /* now we have to clean up and exit right away */
+ /* how are we going to make space for this buffer? */
+ switch (queue->leaky) {
+ /* leak current buffer */
+ case GST_QUEUE_LEAK_UPSTREAM:
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "queue is full, leaking buffer on upstream end");
+ /* now we can clean up and exit right away */
g_mutex_unlock (queue->qlock);
goto out_unref;
- }
- /* otherwise we have to push a buffer off the other end */
- else {
- gpointer front;
- GstData *leak;
-
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
-
- front = g_queue_pop_head (queue->queue);
- leak = GST_DATA (front);
-
- queue->level_buffers--;
- if (GST_IS_EVENT (leak)) {
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_EVENT_TYPE(GST_EVENT(leak)));
- } else {
- queue->level_bytes -= GST_BUFFER_SIZE(leak);
+
+ /* leak first buffer in the queue */
+ case GST_QUEUE_LEAK_DOWNSTREAM: {
+ /* this is a bit hacky. We'll manually iterate the list
+ * and find the first buffer from the head on. We'll
+ * unref that and "fix up" the GQueue object... */
+ GList *item;
+ GstData *leak = NULL;
+
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "queue is full, leaking buffer on downstream end");
+
+ for (item = queue->queue->head; item != NULL; item = item->next) {
+ if (GST_IS_BUFFER (item->data)) {
+ leak = item->data;
+ break;
+ }
}
- gst_data_unref (leak);
+ /* if we didn't find anything, it means we have no buffers
+ * in here. That cannot happen, since we had >= 1 bufs */
+ g_assert (leak);
+
+ /* Now remove it from the list, fixing up the GQueue
+ * CHECKME: is a queue->head the first or the last item? */
+ item = g_list_delete_link (queue->queue->head, item);
+ queue->queue->head = g_list_first (item);
+ queue->queue->tail = g_list_last (item);
+ queue->queue->length--;
+
+ /* and unref the data at the end. Twice, because we keep a ref
+ * to make things read-only. Also keep our list uptodate. */
+ queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
+ queue->cur_level.buffers --;
+ if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
+ queue->cur_level.time -= GST_BUFFER_DURATION (data);
+
+ gst_data_unref (data);
+ gst_data_unref (data);
+ break;
}
- }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ default:
+ g_warning ("Unknown leaky type, using default");
+ /* fall-through */
+
+ /* don't leak. Instead, wait for space to be available */
+ case GST_QUEUE_NO_LEAK:
+ STATUS (queue, "pre-full wait");
+
+ while ((queue->max_size.buffers > 0 &&
+ queue->cur_level.buffers >= queue->max_size.buffers) ||
+ (queue->max_size.bytes > 0 &&
+ queue->cur_level.bytes >= queue->max_size.bytes) ||
+ (queue->max_size.time > 0 &&
+ queue->cur_level.time >= queue->max_size.time)) {
+ /* if there's a pending state change for this queue
+ * or its manager, switch back to iterator so bottom
+ * half of state change executes */
+ if (queue->interrupt) {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
+ g_mutex_unlock (queue->qlock);
+ if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
+ GST_ELEMENT (queue))) {
+ goto out_unref;
+ }
+ /* if we got here because we were unlocked after a
+ * flush, we don't need to add the buffer to the
+ * queue again */
+ if (queue->flush) {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "not adding pending buffer after flush");
+ goto out_unref;
+ }
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "adding pending buffer after interrupt");
+ goto restart;
+ }
+
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down. Try to
+ * signal to resolve the error */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_data_unref (data);
+ gst_element_error (GST_ELEMENT (queue),
+ "deadlock found, source pad elements are shut down");
+ /* we don't go to out_unref here, since we want to
+ * unref the buffer *before* calling gst_element_error */
+ return;
+ } else {
+ GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+ "%s: waiting for the app to restart "
+ "source pad elements",
+ GST_ELEMENT_NAME (queue));
+ }
+ }
+
+ STATUS (queue, "waiting for item_del signal");
+ g_cond_wait (queue->item_del, queue->qlock);
+ STATUS (queue, "received item_del signal");
+ }
- while (queue->level_buffers == queue->size_buffers) {
- /* if there's a pending state change for this queue or its manager, switch */
- /* back to iterator so bottom half of state change executes */
- if (queue->interrupt) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!");
+ STATUS (queue, "post-full wait");
g_mutex_unlock (queue->qlock);
- if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue)))
- goto out_unref;
- /* if we got here because we were unlocked after a flush, we don't need
- * to add the buffer to the queue again */
- if (queue->flush) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush");
- goto out_unref;
- }
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "adding pending buffer after interrupt");
- goto restart;
- }
- if (GST_STATE (queue) != GST_STATE_PLAYING) {
- /* this means the other end is shut down */
- /* try to signal to resolve the error */
- if (!queue->may_deadlock) {
- g_mutex_unlock (queue->qlock);
- gst_data_unref (data);
- gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
- /* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */
- return;
- }
- else {
- g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
- }
- }
-
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
- g_cond_wait (queue->not_full, queue->qlock);
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal");
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+ g_mutex_lock (queue->qlock);
+ break;
}
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
}
- /* put the buffer on the tail of the list */
+ /* put the buffer on the tail of the list. We keep a reference,
+ * so that the data is read-only while in here. There's a good
+ * reason to do so: we have a size and time counter, and any
+ * modification to the content could change any of the two. */
+ gst_data_ref (data);
g_queue_push_tail (queue->queue, data);
- queue->level_buffers++;
- if (GST_IS_BUFFER (data))
- queue->level_bytes += GST_BUFFER_SIZE (data);
-
- /* this assertion _has_ to hold */
- g_assert (queue->queue->length == queue->level_buffers);
+ /* Note that we only add buffers (not events) to the statistics */
+ if (GST_IS_BUFFER (data)) {
+ queue->cur_level.buffers++;
+ queue->cur_level.bytes += GST_BUFFER_SIZE (data);
+ if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
+ queue->cur_level.time += GST_BUFFER_DURATION (data);
+ }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes",
- GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ STATUS (queue, "+ level");
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
- g_cond_signal (queue->not_empty);
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_add");
+ g_cond_signal (queue->item_add);
g_mutex_unlock (queue->qlock);
return;
gst_queue_get (GstPad *pad)
{
GstQueue *queue;
- GstData *data = NULL;
- gpointer front;
+ GstData *data;
- g_assert(pad != NULL);
- g_assert(GST_IS_PAD(pad));
g_return_val_if_fail (pad != NULL, NULL);
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
restart:
/* have to lock for thread-safety */
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock);
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty);
-
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
- while (queue->level_buffers == 0) {
- /* if there's a pending state change for this queue or its manager, switch
- * back to iterator so bottom half of state change executes
- */
- //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
- if (queue->interrupt) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!");
- g_mutex_unlock (queue->qlock);
- if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
- return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
- goto restart;
- }
- if (GST_STATE (queue) != GST_STATE_PLAYING) {
- /* this means the other end is shut down */
- if (!queue->may_deadlock) {
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "locked t:%p", g_thread_self ());
+
+ if (queue->queue->length == 0 ||
+ (queue->min_treshold.buffers > 0 &&
+ queue->cur_level.buffers < queue->min_treshold.buffers) ||
+ (queue->min_treshold.bytes > 0 &&
+ queue->cur_level.bytes < queue->min_treshold.bytes) ||
+ (queue->min_treshold.time > 0 &&
+ queue->cur_level.time < queue->min_treshold.time)) {
+ g_mutex_unlock (queue->qlock);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
+ g_mutex_lock (queue->qlock);
+
+ STATUS (queue, "pre-empty wait");
+ while (queue->queue->length == 0 ||
+ (queue->min_treshold.buffers > 0 &&
+ queue->cur_level.buffers < queue->min_treshold.buffers) ||
+ (queue->min_treshold.bytes > 0 &&
+ queue->cur_level.bytes < queue->min_treshold.bytes) ||
+ (queue->min_treshold.time > 0 &&
+ queue->cur_level.time < queue->min_treshold.time)) {
+ /* if there's a pending state change for this queue or its
+ * manager, switch back to iterator so bottom half of state
+ * change executes. */
+ if (queue->interrupt) {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
g_mutex_unlock (queue->qlock);
- gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
+ if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
+ GST_ELEMENT (queue)))
+ return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
goto restart;
}
- else {
- g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_element_error (GST_ELEMENT (queue),
+ "deadlock found, sink pad elements are shut down");
+ goto restart;
+ } else {
+ GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+ "%s: waiting for the app to restart "
+ "source pad elements",
+ GST_ELEMENT_NAME (queue));
+ }
}
- }
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ STATUS (queue, "waiting for item_add");
- /* if (queue->block_timeout > -1){ */
- if (FALSE) {
- GTimeVal timeout;
- g_get_current_time(&timeout);
- g_time_val_add(&timeout, queue->block_timeout);
- if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
- g_mutex_unlock (queue->qlock);
- g_warning ("filler");
- return GST_DATA (gst_event_new_filler());
+ if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
+ GTimeVal timeout;
+ g_get_current_time (&timeout);
+ g_time_val_add (&timeout, queue->block_timeout / 1000);
+ if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)){
+ g_mutex_unlock (queue->qlock);
+ GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
+ "Sending filler event");
+ return GST_DATA (gst_event_new_filler ());
+ }
+ } else {
+ g_cond_wait (queue->item_add, queue->qlock);
}
+ STATUS (queue, "got item_add signal");
}
- else {
- g_cond_wait (queue->not_empty, queue->qlock);
- }
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
- }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes",
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
- front = g_queue_pop_head (queue->queue);
- data = GST_DATA (front);
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved data %p from queue", data);
+ STATUS (queue, "post-empty wait");
+ g_mutex_unlock (queue->qlock);
+ g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
+ g_mutex_lock (queue->qlock);
+ }
- queue->level_buffers--;
- if (GST_IS_BUFFER (data))
- queue->level_bytes -= GST_BUFFER_SIZE (data);
+ /* There's something in the list now, whatever it is */
+ data = g_queue_pop_head (queue->queue);
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "retrieved data %p from queue", data);
+
+ if (GST_IS_BUFFER (data)) {
+ /* Update statistics */
+ queue->cur_level.buffers--;
+ queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
+ if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
+ queue->cur_level.time -= GST_BUFFER_DURATION (data);
+ }
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes",
- GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers, queue->level_bytes);
+ /* Now that we're done, we can lose our own reference to
+ * the item, since we're no longer in danger. */
+ gst_data_unref (data);
- /* this assertion _has_ to hold */
- g_assert (queue->queue->length == queue->level_buffers);
+ STATUS (queue, "after _get()");
- GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling not_full");
- g_cond_signal (queue->not_full);
+ GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
+ g_cond_signal (queue->item_del);
g_mutex_unlock (queue->qlock);
- /* FIXME where should this be? locked? */
+ /* FIXME: I suppose this needs to be locked, since the EOS
+ * bit affects the pipeline state. However, that bit is
+ * locked too so it'd cause a deadlock. */
if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT (data);
- switch (GST_EVENT_TYPE(event)) {
+ switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "queue \"%s\" eos",
+ GST_ELEMENT_NAME (queue));
gst_element_set_eos (GST_ELEMENT (queue));
break;
default:
static gboolean
-gst_queue_handle_src_event (GstPad *pad, GstEvent *event)
+gst_queue_handle_src_event (GstPad *pad,
+ GstEvent *event)
{
- GstQueue *queue;
+ GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res;
- gint event_type;
- gint flag_flush = 0;
-
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
g_mutex_lock (queue->qlock);
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
- /* push the event to the queue for upstream consumption */
- g_async_queue_push(queue->events, event);
- g_warning ("FIXME: sending event in a running queue");
- /* FIXME wait for delivery of the event here, then return the result
- * instead of FALSE */
- res = FALSE;
- goto done;
- }
-
- event_type = GST_EVENT_TYPE (event);
- if (event_type == GST_EVENT_SEEK)
- flag_flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
-
- res = gst_pad_event_default (pad, event);
+ GstQueueEventResponse er;
+
+ /* push the event to the queue and wait for upstream consumption */
+ er.event = event;
+ er.handled = FALSE;
+ g_queue_push_tail (queue->events, &er);
+ while (!er.handled) {
+ g_cond_wait (queue->event_done, queue->qlock);
+ }
+ res = er.ret;
+ } else {
+ res = gst_pad_event_default (pad, event);
- switch (event_type) {
- case GST_EVENT_FLUSH:
- GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
- gst_queue_locked_flush (queue);
- break;
- case GST_EVENT_SEEK:
- if (flag_flush) {
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH:
+ GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
+ "FLUSH event, flushing queue\n");
gst_queue_locked_flush (queue);
- }
- default:
- break;
+ break;
+ case GST_EVENT_SEEK:
+ if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
+ gst_queue_locked_flush (queue);
+ }
+ default:
+ break;
+ }
}
-done:
g_mutex_unlock (queue->qlock);
/* we have to claim success, but we don't really know */
g_mutex_lock (queue->qlock);
queue->interrupt = TRUE;
- g_cond_signal (queue->not_full);
- g_cond_signal (queue->not_empty);
+ g_cond_signal (queue->item_add);
+ g_cond_signal (queue->item_del);
g_mutex_unlock (queue->qlock);
return TRUE;
gst_queue_change_state (GstElement *element)
{
GstQueue *queue;
- GstElementStateReturn ret;
+ GstElementStateReturn ret = GST_STATE_SUCCESS;
queue = GST_QUEUE (element);
case GST_STATE_NULL_TO_READY:
gst_queue_locked_flush (queue);
break;
- case GST_STATE_READY_TO_PAUSED:
- break;
case GST_STATE_PAUSED_TO_PLAYING:
if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
+ "queue %s is not linked",
+ GST_ELEMENT_NAME (queue));
/* FIXME can this be? */
- g_cond_signal (queue->not_empty);
+ g_cond_signal (queue->item_add);
ret = GST_STATE_FAILURE;
goto error;
- }
- else {
+ } else {
GstScheduler *src_sched, *sink_sched;
src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
if (src_sched == sink_sched) {
- GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue, "queue %s does not connect different schedulers",
- GST_ELEMENT_NAME (queue));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
+ "queue %s does not connect different schedulers",
+ GST_ELEMENT_NAME (queue));
g_warning ("queue %s does not connect different schedulers",
- GST_ELEMENT_NAME (queue));
+ GST_ELEMENT_NAME (queue));
ret = GST_STATE_FAILURE;
goto error;
}
queue->interrupt = FALSE;
break;
- case GST_STATE_PLAYING_TO_PAUSED:
- break;
case GST_STATE_PAUSED_TO_READY:
gst_queue_locked_flush (queue);
break;
- case GST_STATE_READY_TO_NULL:
+ default:
break;
}
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
- /* this is an ugly hack to make sure our pads are always active. Reason for this is that
- * pad activation for the queue element depends on 2 schedulers (ugh) */
+ if (GST_ELEMENT_CLASS (parent_class)->change_state)
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+ /* this is an ugly hack to make sure our pads are always active.
+ * Reason for this is that pad activation for the queue element
+ * depends on 2 schedulers (ugh) */
gst_pad_set_active (queue->sinkpad, TRUE);
gst_pad_set_active (queue->srcpad, TRUE);
g_mutex_unlock (queue->qlock);
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
+
return ret;
}
static void
-gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
+gst_queue_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
{
- GstQueue *queue;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_QUEUE (object));
+ GstQueue *queue = GST_QUEUE (object);
- queue = GST_QUEUE (object);
+ /* someone could change levels here, and since this
+ * affects the get/put funcs, we need to lock for safety. */
+ g_mutex_lock (queue->qlock);
switch (prop_id) {
- case ARG_LEAKY:
- queue->leaky = g_value_get_enum (value);
+ case ARG_MAX_SIZE_BYTES:
+ queue->max_size.bytes = g_value_get_uint (value);
+ break;
+ case ARG_MAX_SIZE_BUFFERS:
+ queue->max_size.buffers = g_value_get_uint (value);
+ break;
+ case ARG_MAX_SIZE_TIME:
+ queue->max_size.time = g_value_get_uint64 (value);
break;
- case ARG_MAX_LEVEL:
- queue->size_buffers = g_value_get_int (value);
+ case ARG_MIN_TRESHOLD_BYTES:
+ queue->max_size.bytes = g_value_get_uint (value);
break;
- case ARG_MIN_THRESHOLD_BYTES:
- queue->min_threshold_bytes = g_value_get_int (value);
+ case ARG_MIN_TRESHOLD_BUFFERS:
+ queue->max_size.buffers = g_value_get_uint (value);
+ break;
+ case ARG_MIN_TRESHOLD_TIME:
+ queue->max_size.time = g_value_get_uint64 (value);
+ break;
+ case ARG_LEAKY:
+ queue->leaky = g_value_get_enum (value);
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
break;
case ARG_BLOCK_TIMEOUT:
- queue->block_timeout = g_value_get_int (value);
+ queue->block_timeout = g_value_get_uint64 (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
+
+ g_mutex_unlock (queue->qlock);
}
static void
-gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
+gst_queue_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
{
- GstQueue *queue;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_QUEUE (object));
-
- queue = GST_QUEUE (object);
+ GstQueue *queue = GST_QUEUE (object);
switch (prop_id) {
- case ARG_LEAKY:
- g_value_set_enum (value, queue->leaky);
+ case ARG_CUR_LEVEL_BYTES:
+ g_value_set_uint (value, queue->cur_level.bytes);
+ break;
+ case ARG_CUR_LEVEL_BUFFERS:
+ g_value_set_uint (value, queue->cur_level.buffers);
+ break;
+ case ARG_CUR_LEVEL_TIME:
+ g_value_set_uint64 (value, queue->cur_level.time);
+ break;
+ case ARG_MAX_SIZE_BYTES:
+ g_value_set_uint (value, queue->max_size.bytes);
break;
- case ARG_LEVEL:
- g_value_set_int (value, queue->level_buffers);
+ case ARG_MAX_SIZE_BUFFERS:
+ g_value_set_uint (value, queue->max_size.buffers);
break;
- case ARG_MAX_LEVEL:
- g_value_set_int (value, queue->size_buffers);
+ case ARG_MAX_SIZE_TIME:
+ g_value_set_uint64 (value, queue->max_size.time);
break;
- case ARG_MIN_THRESHOLD_BYTES:
- g_value_set_int (value, queue->min_threshold_bytes);
+ case ARG_MIN_TRESHOLD_BYTES:
+ g_value_set_uint (value, queue->min_treshold.bytes);
+ break;
+ case ARG_MIN_TRESHOLD_BUFFERS:
+ g_value_set_uint (value, queue->min_treshold.buffers);
+ break;
+ case ARG_MIN_TRESHOLD_TIME:
+ g_value_set_uint64 (value, queue->min_treshold.time);
+ break;
+ case ARG_LEAKY:
+ g_value_set_enum (value, queue->leaky);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;
case ARG_BLOCK_TIMEOUT:
- g_value_set_int (value, queue->block_timeout);
+ g_value_set_uint64 (value, queue->block_timeout);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GstPad *sinkpad;
GstPad *srcpad;
- /* the queue of buffers we're keeping our grubby hands on */
+ /* the queue of data we're keeping our grubby hands on */
GQueue *queue;
- guint level_buffers; /* number of buffers queued here */
- guint level_bytes; /* number of bytes queued here */
- guint64 level_time; /* amount of time queued here */
+ struct {
+ guint buffers; /* no. of buffers */
+ guint bytes; /* no. of bytes */
+ guint64 time; /* amount of time */
+ } cur_level, /* currently in the queue */
+ max_size, /* max. amount of data allowed in the queue */
+ min_treshold; /* min. amount of data required to wake reader */
- guint size_buffers; /* size of queue in buffers */
- guint size_bytes; /* size of queue in bytes */
- guint64 size_time; /* size of queue in time */
+ /* whether we leak data, and at which end */
+ gint leaky;
+
+ /* number of nanoseconds until a blocked queue 'times out'
+ * to receive data and returns a filler event. -1 = disable */
+ guint64 block_timeout;
+
+ /* it the queue should fail on possible deadlocks */
+ gboolean may_deadlock;
- gint leaky; /* whether the queue is leaky, and if so at which end */
- gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER.
- * A value of -1 will block forever. */
- guint min_threshold_bytes; /* the minimum number of bytes required before
- * waking up the reader thread */
- gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
gboolean interrupt;
gboolean flush;
GMutex *qlock; /* lock for queue (vs object lock) */
- GCond *not_empty; /* signals buffers now available for reading */
- GCond *not_full; /* signals space now available for writing */
+ GCond *item_add; /* signals buffers now available for reading */
+ GCond *item_del; /* signals space now available for writing */
+ GCond *event_done; /* upstream event signaller */
GTimeVal *timeval; /* the timeout for the queue locking */
- GAsyncQueue *events; /* upstream events get decoupled here */
+ GQueue *events; /* upstream events get decoupled here */
gpointer _gst_reserved[GST_PADDING];
};
struct _GstQueueClass {
GstElementClass parent_class;
- /* signal callbacks */
- void (*full) (GstQueue *queue);
+ /* signals - 'running' is called from both sides
+ * which might make it sort of non-useful... */
+ void (*underrun) (GstQueue *queue);
+ void (*running) (GstQueue *queue);
+ void (*overrun) (GstQueue *queue);
gpointer _gst_reserved[GST_PADDING];
};