queue->max_level.time, \
(guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
queue->current->writing_pos - queue->current->max_reading_pos : \
- queue->queue->length))
+ queue->queue.length))
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (q->qlock); \
static void gst_queue2_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
-static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
-static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
-static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
static gboolean gst_queue2_handle_query (GstElement * element,
GstQuery * query);
-static GstCaps *gst_queue2_getcaps (GstPad * pad, GstCaps * filter);
-static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
+static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
+ guint64 offset, guint length, GstBuffer ** buffer);
-static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
- guint length, GstBuffer ** buffer);
-
-static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
-static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
-static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
+static gboolean gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent,
+ gboolean active);
+static gboolean gst_queue2_src_activate_push (GstPad * pad, GstObject * parent,
+ gboolean active);
+static gboolean gst_queue2_sink_activate_push (GstPad * pad, GstObject * parent,
+ gboolean active);
static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
GstStateChange transition);
GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
- gst_pad_set_getcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
- gst_pad_set_acceptcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
+ gst_pad_set_query_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
+ GST_OBJECT_FLAG_SET (queue->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
gst_pad_set_getrange_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_get_range));
- gst_pad_set_getcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
- gst_pad_set_acceptcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
gst_pad_set_event_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
gst_pad_set_query_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
+ GST_OBJECT_FLAG_SET (queue->srcpad, GST_PAD_FLAG_PROXY_CAPS);
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
/* levels */
queue->item_add = g_cond_new ();
queue->waiting_del = FALSE;
queue->item_del = g_cond_new ();
- queue->queue = g_queue_new ();
+ g_queue_init (&queue->queue);
queue->buffering_percent = 100;
GST_DEBUG_OBJECT (queue, "finalizing queue");
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
gst_mini_object_unref (data);
}
- g_queue_free (queue->queue);
+ g_queue_clear (&queue->queue);
g_mutex_free (queue->qlock);
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
queue->current = add_range (queue, 0);
}
-static gboolean
-gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
-{
- GstQueue2 *queue;
- GstPad *otherpad;
- gboolean result;
-
- queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
-
- otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
- result = gst_pad_peer_accept_caps (otherpad, caps);
-
- return result;
-}
-
-static GstCaps *
-gst_queue2_getcaps (GstPad * pad, GstCaps * filter)
-{
- GstQueue2 *queue;
- GstPad *otherpad;
- GstCaps *result;
-
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
- if (G_UNLIKELY (queue == NULL))
- return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
-
- otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
- result = gst_pad_peer_get_caps (otherpad, filter);
- if (result == NULL)
- result = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
-
- gst_object_unref (queue);
-
- return result;
-}
-
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue. */
static void
mode = GST_BUFFERING_DOWNLOAD;
if (queue->byte_in_rate > 0) {
- if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES,
+ if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
&duration)) {
buffering_left =
(gdouble) ((duration -
gst_queue2_flush_temp_file (queue);
init_ranges (queue);
} else {
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
update_buffering (queue);
if (QUEUE_IS_USING_QUEUE (queue)) {
- g_queue_push_tail (queue->queue, item);
+ g_queue_push_tail (&queue->queue, item);
} else {
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
}
if (!QUEUE_IS_USING_QUEUE (queue))
item = gst_queue2_read_item_from_file (queue);
else
- item = g_queue_pop_head (queue->queue);
+ item = g_queue_pop_head (&queue->queue);
if (item == NULL)
goto no_item;
}
static gboolean
-gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE2 (parent);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
}
static gboolean
+gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query)
+{
+ gboolean res;
+
+ switch (GST_QUERY_TYPE (query)) {
+ default:
+ res = gst_pad_query_default (pad, parent, query);
+ break;
+ }
+ return res;
+}
+
+static gboolean
gst_queue2_is_empty (GstQueue2 * queue)
{
/* never empty on EOS */
if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
return queue->current->writing_pos <= queue->current->max_reading_pos;
} else {
- if (queue->queue->length == 0)
+ if (queue->queue.length == 0)
return TRUE;
}
}
static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE2 (parent);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
}
static gboolean
-gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
gboolean res = TRUE;
- GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ GstQueue2 *queue = GST_QUEUE2 (parent);
- if (G_UNLIKELY (queue == NULL)) {
- gst_event_unref (event);
- return FALSE;
- }
#ifndef GST_DISABLE_GST_DEBUG
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
event, GST_EVENT_TYPE_NAME (event));
break;
}
- gst_object_unref (queue);
return res;
}
static gboolean
-gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
-{
- gboolean ret = FALSE;
- GstPad *peer;
-
- if ((peer = gst_pad_get_peer (pad))) {
- ret = gst_pad_query (peer, query);
- gst_object_unref (peer);
- }
- return ret;
-}
-
-static gboolean
-gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
+gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
- if (G_UNLIKELY (queue == NULL))
- return FALSE;
+ queue = GST_QUEUE2 (parent);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:
gint64 peer_pos;
GstFormat format;
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
/* get peer position */
{
GST_DEBUG_OBJECT (queue, "doing peer query");
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "peer query success");
/* FIXME - is this condition correct? what should ring buffer do? */
if (QUEUE_IS_USING_QUEUE (queue)) {
/* no temp file, just forward to the peer */
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
} else {
duration = writing_pos;
} else {
/* get duration of upstream in bytes */
- peer_res = gst_pad_query_peer_duration (queue->sinkpad,
+ peer_res = gst_pad_peer_query_duration (queue->sinkpad,
GST_FORMAT_BYTES, &duration);
}
case GST_QUERY_SCHEDULING:
{
gboolean pull_mode;
+ GstSchedulingFlags flags = 0;
/* we can operate in pull mode when we are using a tempfile */
pull_mode = !QUEUE_IS_USING_QUEUE (queue);
- gst_query_set_scheduling (query, pull_mode, pull_mode, FALSE, 0, -1, 1);
+ if (pull_mode)
+ flags |= GST_SCHEDULING_FLAG_SEEKABLE;
+ gst_query_set_scheduling (query, flags, 0, -1, 0);
+ if (pull_mode)
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
break;
}
default:
/* peer handled other queries */
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_query_default (pad, parent, query))
goto peer_failed;
break;
}
- gst_object_unref (queue);
return TRUE;
/* ERRORS */
peer_failed:
{
GST_DEBUG_OBJECT (queue, "failed peer query");
- gst_object_unref (queue);
return FALSE;
}
}
static gboolean
gst_queue2_handle_query (GstElement * element, GstQuery * query)
{
+ GstQueue2 *queue = GST_QUEUE2 (element);
+
/* simply forward to the srcpad query function */
- return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
+ return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
+ query);
}
static void
{
gint64 upstream_size = -1;
- if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES,
+ if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
&upstream_size)) {
GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
queue->upstream_size = upstream_size;
}
static GstFlowReturn
-gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
- GstBuffer ** buffer)
+gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
+ guint length, GstBuffer ** buffer)
{
GstQueue2 *queue;
GstFlowReturn ret;
- queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2_CAST (parent);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
ret = gst_queue2_create_read (queue, offset, length, buffer);
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
-
return ret;
/* ERRORS */
GST_DEBUG_OBJECT (queue, "we are flushing");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
return ret;
}
out_unexpected:
{
GST_DEBUG_OBJECT (queue, "read beyond end of file");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
return GST_FLOW_EOS;
}
}
/* sink currently only operates in push mode */
static gboolean
-gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
+gst_queue2_sink_activate_push (GstPad * pad, GstObject * parent,
+ gboolean active)
{
gboolean result = TRUE;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
GST_QUEUE2_MUTEX_UNLOCK (queue);
}
- gst_object_unref (queue);
-
return result;
}
/* src operating in push mode, we start a task on the source pad that pushes out
* buffers from the queue */
static gboolean
-gst_queue2_src_activate_push (GstPad * pad, gboolean active)
+gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
{
gboolean result = FALSE;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
result = gst_pad_stop_task (pad);
}
- gst_object_unref (queue);
-
return result;
}
/* pull mode, downstream will call our getrange function */
static gboolean
-gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
+gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
{
gboolean result;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
result = TRUE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
}
- gst_object_unref (queue);
return result;
}