* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2003 Colin Walters <cwalters@gnome.org>
* 2000,2005,2007 Wim Taymans <wim@fluendo.com>
- * 2007 Thiago Sousa Santos <thiagossantos at gmail dot com>
+ * 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
*
* gstqueue2.c:
*
*
* The default queue size limits are 100 buffers, 2MB of data, or
* two seconds worth of data, whichever is reached first.
- *
+ *
* If you set temp-location, the element will buffer data on the file
* specified by it. By using this, it will buffer the entire
* stream data on the file independently of the queue size limits, they
#include "config.h"
#endif
-#include <stdio.h>
#include <glib/gstdio.h>
#include <gst/gst.h>
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_QUEUE))
#define GST_IS_QUEUE_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE))
+#define GST_QUEUE_CAST(obj) \
+ ((GstQueue *)(obj))
typedef struct _GstQueue GstQueue;
typedef struct _GstQueueSize GstQueueSize;
* because we can't save it on the file */
gboolean segment_event_received;
GstEvent *starting_segment;
+
};
struct _GstQueueClass
static GstCaps *gst_queue_getcaps (GstPad * pad);
+static GstFlowReturn gst_queue_get_range (GstPad * pad, guint64 offset,
+ guint length, GstBuffer ** buffer);
+static gboolean gst_queue_src_checkgetrange_function (GstPad * pad);
+
+static gboolean gst_queue_src_activate_pull (GstPad * pad, gboolean active);
static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
static GstStateChangeReturn gst_queue_change_state (GstElement * element,
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
+ gst_pad_set_activatepull_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue_src_activate_pull));
gst_pad_set_activatepush_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_src_activate_push));
+ gst_pad_set_getrange_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue_get_range));
+ gst_pad_set_checkgetrange_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue_src_checkgetrange_function));
gst_pad_set_acceptcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
gst_pad_set_getcaps_function (queue->srcpad,
}
}
+/* should be called with QUEUE_LOCK */
static GstMiniObject *
gst_queue_read_item_from_file (GstQueue * queue)
{
/* ERRORS */
out_flushing:
{
+ GST_DEBUG_OBJECT (queue, "we are flushing");
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+
gst_buffer_unref (event);
return FALSE;
}
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
GST_QUEUE_MUTEX_UNLOCK (queue);
+
gst_buffer_unref (buffer);
return ret;
}
}
+/* called repeadedly with @pad as the source pad. This function should push out
+ * data to the peer element. */
static void
gst_queue_loop (GstPad * pad)
{
return TRUE;
}
+static GstFlowReturn
+gst_queue_get_range (GstPad * pad, guint64 offset, guint length,
+ GstBuffer ** buffer)
+{
+ GstQueue *queue;
+ GstFlowReturn ret;
+
+ queue = GST_QUEUE_CAST (gst_pad_get_parent (pad));
+
+ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
+ offset = (offset == -1) ? queue->reading_pos : offset;
+
+ /* function will block when the range is not yet available */
+ ret = gst_queue_create_read (queue, offset, length, buffer);
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+
+ gst_object_unref (queue);
+
+ return ret;
+
+ /* ERRORS */
+out_flushing:
+ {
+ GST_DEBUG_OBJECT (queue, "we are flushing");
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ return GST_FLOW_WRONG_STATE;
+ }
+}
+
+static gboolean
+gst_queue_src_checkgetrange_function (GstPad * pad)
+{
+ GstQueue *queue;
+ gboolean ret;
+
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
+ /* we can operate in pull mode when we are using a tempfile */
+ ret = QUEUE_IS_USING_TEMP_FILE (queue);
+ gst_object_unref (GST_OBJECT (queue));
+
+ return ret;
+}
+
+/* sink currently only operates in push mode */
static gboolean
gst_queue_sink_activate_push (GstPad * pad, gboolean active)
{
if (active) {
GST_QUEUE_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
reset_rate_timer (queue);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* unblock chain function */
GST_QUEUE_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
GST_QUEUE_MUTEX_UNLOCK (queue);
return result;
}
+/* src operating in push mode, we start a task on the source pad that pushes out
+ * buffers from the queue */
static gboolean
gst_queue_src_activate_push (GstPad * pad, gboolean active)
{
if (active) {
GST_QUEUE_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* unblock loop function */
GST_QUEUE_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
/* the item add signal will unblock */
g_cond_signal (queue->item_add);
return result;
}
+/* pull mode, downstream will call our getrange function */
+static gboolean
+gst_queue_src_activate_pull (GstPad * pad, gboolean active)
+{
+ gboolean result;
+ GstQueue *queue;
+
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+ if (active) {
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ GST_QUEUE_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "activating pull mode");
+ queue->srcresult = GST_FLOW_OK;
+ result = TRUE;
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ } else {
+ GST_QUEUE_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
+ /* this is not allowed, we cannot operate in pull mode without a temp
+ * file. */
+ queue->srcresult = GST_FLOW_WRONG_STATE;
+ result = FALSE;
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ }
+ } else {
+ GST_QUEUE_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "deactivating pull mode");
+ queue->srcresult = GST_FLOW_WRONG_STATE;
+ /* this will unlock getrange */
+ g_cond_signal (queue->item_add);
+ result = TRUE;
+ GST_QUEUE_MUTEX_UNLOCK (queue);
+ }
+ gst_object_unref (queue);
+
+ return result;
+}
+
static GstStateChangeReturn
gst_queue_change_state (GstElement * element, GstStateChange transition)
{