* @short_description: Asynchronous data queue.
*
* Data is queued until one of the limits specified by the
- * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
- * #GstQueue2:max-size-time properties has been reached. Any attempt to push
+ * #GstQueue22:max-size-buffers, #GstQueue22:max-size-bytes and/or
+ * #GstQueue22:max-size-time properties has been reached. Any attempt to push
* more buffers into the queue will block the pushing thread until more space
* becomes available.
*
* processing on sink and source pad.
*
* You can query how many buffers are queued by reading the
- * #GstQueue2:current-level-buffers property.
+ * #GstQueue22:current-level-buffers property.
*
* The default queue size limits are 100 buffers, 2MB of data, or
* two seconds worth of data, whichever is reached first.
#include <glib/gstdio.h>
-#include <gst/gst-i18n-plugin.h>
+#include "gst/gst-i18n-lib.h"
#ifdef G_OS_WIN32
#include <io.h> /* lseek, open, close, read */
#include <unistd.h>
#endif
-static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
+static const GstElementDetails gst_queue2_details =
+GST_ELEMENT_DETAILS ("Queue",
"Generic",
"Simple data queue",
"Erik Walthinsen <omega@cse.ogi.edu>, "
PROP_TEMP_LOCATION
};
-/* used to keep track of sizes (current and max) */
-struct _GstQueueSize
-{
- guint buffers;
- guint bytes;
- guint64 time;
- guint64 rate_time;
-};
-
-#define GST_QUEUE_CLEAR_LEVEL(l) G_STMT_START { \
+#define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
l.buffers = 0; \
l.bytes = 0; \
l.time = 0; \
queue->writing_pos - queue->max_reading_pos : \
queue->queue->length))
-#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
+#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (q->qlock); \
} G_STMT_END
-#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
- GST_QUEUE_MUTEX_LOCK (q); \
+#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
+ GST_QUEUE2_MUTEX_LOCK (q); \
if (q->srcresult != GST_FLOW_OK) \
goto label; \
} G_STMT_END
-#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
+#define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \
g_mutex_unlock (q->qlock); \
} G_STMT_END
-#define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \
+#define GST_QUEUE2_WAIT_DEL_CHECK(q, label) G_STMT_START { \
STATUS (queue, q->sinkpad, "wait for DEL"); \
q->waiting_del = TRUE; \
g_cond_wait (q->item_del, queue->qlock); \
STATUS (queue, q->sinkpad, "received DEL"); \
} G_STMT_END
-#define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START { \
+#define GST_QUEUE2_WAIT_ADD_CHECK(q, label) G_STMT_START { \
STATUS (queue, q->srcpad, "wait for ADD"); \
q->waiting_add = TRUE; \
g_cond_wait (q->item_add, q->qlock); \
STATUS (queue, q->srcpad, "received ADD"); \
} G_STMT_END
-#define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START { \
+#define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \
if (q->waiting_del) { \
STATUS (q, q->srcpad, "signal DEL"); \
g_cond_signal (q->item_del); \
} \
} G_STMT_END
-#define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START { \
+#define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \
if (q->waiting_add) { \
STATUS (q, q->sinkpad, "signal ADD"); \
g_cond_signal (q->item_add); \
/* can't use boilerplate as we need to register with Queue2 to avoid conflicts
* with queue in core elements */
-static void gst_queue_class_init (GstQueueClass * klass);
-static void gst_queue_init (GstQueue * queue, GstQueueClass * g_class);
+static void gst_queue2_class_init (GstQueue2Class * klass);
+static void gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class);
static GstElementClass *parent_class;
-static GType
-gst_queue_get_type (void)
+GType
+gst_queue2_get_type (void)
{
- static GType gst_queue_type = 0;
+ static GType gst_queue2_type = 0;
- if (!gst_queue_type) {
- static const GTypeInfo gst_queue_info = {
- sizeof (GstQueueClass),
+ if (!gst_queue2_type) {
+ static const GTypeInfo gst_queue2_info = {
+ sizeof (GstQueue2Class),
NULL,
NULL,
- (GClassInitFunc) gst_queue_class_init,
+ (GClassInitFunc) gst_queue2_class_init,
NULL,
NULL,
- sizeof (GstQueue),
+ sizeof (GstQueue2),
0,
- (GInstanceInitFunc) gst_queue_init,
+ (GInstanceInitFunc) gst_queue2_init,
NULL
};
- gst_queue_type =
- g_type_register_static (GST_TYPE_ELEMENT, "GstQueue2",
- &gst_queue_info, 0);
+ gst_queue2_type =
+ g_type_register_static (GST_TYPE_ELEMENT, "GstQueue22",
+ &gst_queue2_info, 0);
}
- return gst_queue_type;
+ return gst_queue2_type;
}
-static void gst_queue_finalize (GObject * object);
+static void gst_queue2_finalize (GObject * object);
-static void gst_queue_set_property (GObject * object,
+static void gst_queue2_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec);
-static void gst_queue_get_property (GObject * object,
+static void gst_queue2_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
-static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer);
-static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset,
+static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
guint size, GstCaps * caps, GstBuffer ** buf);
-static GstFlowReturn gst_queue_push_one (GstQueue * queue);
-static void gst_queue_loop (GstPad * pad);
+static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
+static void gst_queue2_loop (GstPad * pad);
-static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
+static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
-static GstCaps *gst_queue_getcaps (GstPad * pad);
-static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps);
+static GstCaps *gst_queue2_getcaps (GstPad * pad);
+static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
-static GstFlowReturn gst_queue_get_range (GstPad * pad, guint64 offset,
+static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
guint length, GstBuffer ** buffer);
-static gboolean gst_queue_src_checkgetrange_function (GstPad * pad);
+static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
-static gboolean gst_queue_src_activate_pull (GstPad * pad, gboolean active);
-static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
-static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
-static GstStateChangeReturn gst_queue_change_state (GstElement * element,
+static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
+static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
+static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
+static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
GstStateChange transition);
-static gboolean gst_queue_is_empty (GstQueue * queue);
-static gboolean gst_queue_is_filled (GstQueue * queue);
+static gboolean gst_queue2_is_empty (GstQueue2 * queue);
+static gboolean gst_queue2_is_filled (GstQueue2 * queue);
-/* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */
+/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
static void
-gst_queue_class_init (GstQueueClass * klass)
+gst_queue2_class_init (GstQueue2Class * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
parent_class = g_type_class_peek_parent (klass);
- gobject_class->set_property = gst_queue_set_property;
- gobject_class->get_property = gst_queue_get_property;
+ gobject_class->set_property = gst_queue2_set_property;
+ gobject_class->get_property = gst_queue2_get_property;
/* properties */
g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
- gst_element_class_set_details (gstelement_class, &gst_queue_details);
+ gst_element_class_set_details (gstelement_class, &gst_queue2_details);
/* set several parent class virtual functions */
- gobject_class->finalize = gst_queue_finalize;
+ gobject_class->finalize = gst_queue2_finalize;
- gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
+ gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
}
static void
-gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
+gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
{
queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
gst_pad_set_chain_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_chain));
+ GST_DEBUG_FUNCPTR (gst_queue2_chain));
gst_pad_set_activatepush_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_sink_activate_push));
+ GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
gst_pad_set_event_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
+ GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
gst_pad_set_getcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_getcaps));
+ GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
gst_pad_set_acceptcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
+ GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
gst_pad_set_bufferalloc_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_bufferalloc));
+ GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
gst_pad_set_activatepull_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_src_activate_pull));
+ GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
gst_pad_set_activatepush_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_src_activate_push));
+ GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
gst_pad_set_getrange_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_get_range));
+ GST_DEBUG_FUNCPTR (gst_queue2_get_range));
gst_pad_set_checkgetrange_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_src_checkgetrange_function));
+ GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
gst_pad_set_getcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_getcaps));
+ GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
gst_pad_set_acceptcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
+ GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
gst_pad_set_event_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
+ GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
gst_pad_set_query_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue_handle_src_query));
+ GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
/* levels */
- GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
+ GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
/* called only once, as opposed to dispose */
static void
-gst_queue_finalize (GObject * object)
+gst_queue2_finalize (GObject * object)
{
- GstQueue *queue = GST_QUEUE (object);
+ GstQueue2 *queue = GST_QUEUE2 (object);
GST_DEBUG_OBJECT (queue, "finalizing queue");
}
static gboolean
-gst_queue_acceptcaps (GstPad * pad, GstCaps * caps)
+gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
{
- GstQueue *queue;
+ GstQueue2 *queue;
GstPad *otherpad;
gboolean result;
- queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
result = gst_pad_peer_accept_caps (otherpad, caps);
}
static GstCaps *
-gst_queue_getcaps (GstPad * pad)
+gst_queue2_getcaps (GstPad * pad)
{
- GstQueue *queue;
+ GstQueue2 *queue;
GstPad *otherpad;
GstCaps *result;
- queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
result = gst_pad_peer_get_caps (otherpad);
}
static GstFlowReturn
-gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps,
- GstBuffer ** buf)
+gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
+ GstCaps * caps, GstBuffer ** buf)
{
- GstQueue *queue;
+ GstQueue2 *queue;
GstFlowReturn result;
- queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
/* Forward to src pad, without setting caps on the src pad */
result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue. */
static void
-update_time_level (GstQueue * queue)
+update_time_level (GstQueue2 * queue)
{
gint64 sink_time, src_time;
/* take a NEWSEGMENT event and apply the values to segment, updating the time
* level of queue. */
static void
-apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment)
+apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment)
{
gboolean update;
GstFormat format;
/* take a buffer and update segment, updating the time level of the queue. */
static void
-apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment)
+apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment)
{
GstClockTime duration, timestamp;
}
static void
-update_buffering (GstQueue * queue)
+update_buffering (GstQueue2 * queue)
{
gint percent;
gboolean post = FALSE;
}
static void
-reset_rate_timer (GstQueue * queue)
+reset_rate_timer (GstQueue2 * queue)
{
queue->bytes_in = 0;
queue->bytes_out = 0;
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
static void
-update_in_rates (GstQueue * queue)
+update_in_rates (GstQueue2 * queue)
{
gdouble elapsed, period;
gdouble byte_in_rate;
}
static void
-update_out_rates (GstQueue * queue)
+update_out_rates (GstQueue2 * queue)
{
gdouble elapsed, period;
gdouble byte_out_rate;
}
static void
-gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer)
+gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
{
guint size;
guint8 *data;
/* see if there is enough data in the file to read a full buffer */
static gboolean
-gst_queue_have_data (GstQueue * queue, guint64 offset, guint length)
+gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
{
GST_DEBUG_OBJECT (queue,
"offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
}
static GstFlowReturn
-gst_queue_create_read (GstQueue * queue, guint64 offset, guint length,
+gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GstBuffer ** buffer)
{
size_t res;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
- while (!gst_queue_have_data (queue, offset, length)) {
- GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
+ while (!gst_queue2_have_data (queue, offset, length)) {
+ GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing);
}
#ifdef HAVE_FSEEKO
/* should be called with QUEUE_LOCK */
static GstMiniObject *
-gst_queue_read_item_from_file (GstQueue * queue)
+gst_queue2_read_item_from_file (GstQueue2 * queue)
{
GstMiniObject *item;
GstBuffer *buffer;
ret =
- gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
+ gst_queue2_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
&buffer);
switch (ret) {
case GST_FLOW_OK:
}
static gboolean
-gst_queue_open_temp_location_file (GstQueue * queue)
+gst_queue2_open_temp_location_file (GstQueue2 * queue)
{
gint fd = -1;
gchar *name = NULL;
}
static void
-gst_queue_close_temp_location_file (GstQueue * queue)
+gst_queue2_close_temp_location_file (GstQueue2 * queue)
{
/* nothing to do */
if (queue->temp_file == NULL)
}
static void
-gst_queue_flush_temp_file (GstQueue * queue)
+gst_queue2_flush_temp_file (GstQueue2 * queue)
{
if (queue->temp_file == NULL)
return;
}
static void
-gst_queue_locked_flush (GstQueue * queue)
+gst_queue2_locked_flush (GstQueue2 * queue)
{
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
- gst_queue_flush_temp_file (queue);
+ gst_queue2_flush_temp_file (queue);
} else {
while (!g_queue_is_empty (queue->queue)) {
GstMiniObject *data = g_queue_pop_head (queue->queue);
gst_mini_object_unref (data);
}
}
- GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
+ GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
if (queue->starting_segment != NULL)
queue->segment_event_received = FALSE;
/* we deleted a lot of something */
- GST_QUEUE_SIGNAL_DEL (queue);
+ GST_QUEUE2_SIGNAL_DEL (queue);
}
/* enqueue an item an update the level stats */
static void
-gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
+gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
{
if (GST_IS_BUFFER (item)) {
GstBuffer *buffer;
update_in_rates (queue);
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
- gst_queue_write_buffer_to_file (queue, buffer);
+ gst_queue2_write_buffer_to_file (queue, buffer);
}
} else if (GST_IS_EVENT (item)) {
else
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
- GST_QUEUE_SIGNAL_ADD (queue);
+ GST_QUEUE2_SIGNAL_ADD (queue);
}
return;
/* dequeue an item from the queue and update level stats */
static GstMiniObject *
-gst_queue_locked_dequeue (GstQueue * queue)
+gst_queue2_locked_dequeue (GstQueue2 * queue)
{
GstMiniObject *item;
if (QUEUE_IS_USING_TEMP_FILE (queue))
- item = gst_queue_read_item_from_file (queue);
+ item = gst_queue2_read_item_from_file (queue);
else
item = g_queue_pop_head (queue->queue);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
/* queue is empty now that we dequeued the EOS */
- GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
+ GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
break;
case GST_EVENT_NEWSEGMENT:
apply_segment (queue, event, &queue->src_segment);
item, GST_OBJECT_NAME (queue));
item = NULL;
}
- GST_QUEUE_SIGNAL_DEL (queue);
+ GST_QUEUE2_SIGNAL_DEL (queue);
return item;
}
static gboolean
-gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
{
- GstQueue *queue;
+ GstQueue2 *queue;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
gst_pad_push_event (queue->srcpad, event);
/* now unblock the chain function */
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
/* unblock the loop and chain functions */
g_cond_signal (queue->item_add);
g_cond_signal (queue->item_del);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
/* make sure it pauses, this should happen since we sent
* flush_start downstream. */
/* forward event */
gst_pad_push_event (queue->srcpad, event);
- GST_QUEUE_MUTEX_LOCK (queue);
- gst_queue_locked_flush (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ gst_queue2_locked_flush (queue);
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
/* reset rate counters */
reset_rate_timer (queue);
- gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
+ gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
queue->srcpad);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
goto done;
}
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
/* serialized events go in the queue */
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
/* refuse more events on EOS */
if (queue->is_eos)
goto out_eos;
- gst_queue_locked_enqueue (queue, event);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ gst_queue2_locked_enqueue (queue, event);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
/* non-serialized events are passed upstream. */
gst_pad_push_event (queue->srcpad, event);
out_flushing:
{
GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
return FALSE;
}
out_eos:
{
GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
return FALSE;
}
}
static gboolean
-gst_queue_is_empty (GstQueue * queue)
+gst_queue2_is_empty (GstQueue2 * queue)
{
/* never empty on EOS */
if (queue->is_eos)
}
static gboolean
-gst_queue_is_filled (GstQueue * queue)
+gst_queue2_is_filled (GstQueue2 * queue)
{
gboolean res;
}
static GstFlowReturn
-gst_queue_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
{
- GstQueue *queue;
+ GstQueue2 *queue;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
/* we have to lock the queue since we span threads */
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
/* when we received EOS, we refuse more data */
if (queue->is_eos)
goto out_eos;
/* We make space available if we're "full" according to whatever
* the user defined as "full". */
- if (gst_queue_is_filled (queue)) {
+ if (gst_queue2_is_filled (queue)) {
gboolean started;
/* pause the timer while we wait. The fact that we are waiting does not mean
"queue is full, waiting for free space");
do {
/* Wait for space to be available, we could be unlocked because of a flush. */
- GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
+ GST_QUEUE2_WAIT_DEL_CHECK (queue, out_flushing);
}
- while (gst_queue_is_filled (queue));
+ while (gst_queue2_is_filled (queue));
/* and continue if we were running before */
if (started)
}
/* put buffer in queue now */
- gst_queue_locked_enqueue (queue, buffer);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ gst_queue2_locked_enqueue (queue, buffer);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
return GST_FLOW_OK;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
return ret;
out_eos:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
return GST_FLOW_UNEXPECTED;
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because we received UNEXPECTED");
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
return GST_FLOW_UNEXPECTED;
/* dequeue an item from the queue an push it downstream. This functions returns
* the result of the push. */
static GstFlowReturn
-gst_queue_push_one (GstQueue * queue)
+gst_queue2_push_one (GstQueue2 * queue)
{
GstFlowReturn result = GST_FLOW_OK;
GstMiniObject *data;
- data = gst_queue_locked_dequeue (queue);
+ data = gst_queue2_locked_dequeue (queue);
if (data == NULL)
goto no_item;
buffer = GST_BUFFER_CAST (data);
caps = GST_BUFFER_CAPS (buffer);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
/* set caps before pushing the buffer so that core does not try to do
* something fancy to check if this is possible. */
result = gst_pad_push (queue->srcpad, buffer);
/* need to check for srcresult here as well */
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
if (result == GST_FLOW_UNEXPECTED) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"got UNEXPECTED from downstream");
* queue we can push, we set a flag to make the sinkpad refuse more
* buffers with an UNEXPECTED return value until we receive something
* pushable again or we get flushed. */
- while ((data = gst_queue_locked_dequeue (queue))) {
+ while ((data = gst_queue2_locked_dequeue (queue))) {
if (GST_IS_BUFFER (data)) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping UNEXPECTED buffer %p", data);
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_pad_push_event (queue->srcpad, event);
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
/* if we're EOS, return UNEXPECTED so that the task pauses. */
if (type == GST_EVENT_EOS) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
/* called repeadedly with @pad as the source pad. This function should push out
* data to the peer element. */
static void
-gst_queue_loop (GstPad * pad)
+gst_queue2_loop (GstPad * pad)
{
- GstQueue *queue;
+ GstQueue2 *queue;
GstFlowReturn ret;
- queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
/* have to lock for thread-safety */
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
- if (gst_queue_is_empty (queue)) {
+ if (gst_queue2_is_empty (queue)) {
gboolean started;
/* pause the timer while we wait. The fact that we are waiting does not mean
"queue is empty, waiting for new data");
do {
/* Wait for data to be available, we could be unlocked because of a flush. */
- GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
+ GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing);
}
- while (gst_queue_is_empty (queue));
+ while (gst_queue2_is_empty (queue));
/* and continue if we were running before */
if (started)
g_timer_continue (queue->out_timer);
}
- ret = gst_queue_push_one (queue);
+ ret = gst_queue2_push_one (queue);
queue->srcresult = ret;
if (ret != GST_FLOW_OK)
goto out_flushing;
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
return;
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (queue->srcresult));
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
/* let app know about us giving up if upstream is not expected to do so */
/* UNEXPECTED is already taken care of elsewhere */
if (eos && (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) &&
}
static gboolean
-gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
{
gboolean res = TRUE;
- GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
#ifndef GST_DISABLE_GST_DEBUG
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
}
static gboolean
-gst_queue_peer_query (GstQueue * queue, GstPad * pad, GstQuery * query)
+gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
{
gboolean ret = FALSE;
GstPad *peer;
}
static gboolean
-gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
+gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
{
- GstQueue *queue;
+ GstQueue2 *queue;
- queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:
gint64 peer_pos;
GstFormat format;
- if (!gst_queue_peer_query (queue, queue->sinkpad, query))
+ if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
goto peer_failed;
/* get peer position */
{
GST_DEBUG_OBJECT (queue, "doing peer query");
- if (!gst_queue_peer_query (queue, queue->sinkpad, query))
+ if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "peer query success");
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
/* no temp file, just forward to the peer */
- if (!gst_queue_peer_query (queue, queue->sinkpad, query))
+ if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
} else {
}
default:
/* peer handled other queries */
- if (!gst_queue_peer_query (queue, queue->sinkpad, query))
+ if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
goto peer_failed;
break;
}
}
static GstFlowReturn
-gst_queue_get_range (GstPad * pad, guint64 offset, guint length,
+gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** buffer)
{
- GstQueue *queue;
+ GstQueue2 *queue;
GstFlowReturn ret;
- queue = GST_QUEUE_CAST (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
- GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
offset = (offset == -1) ? queue->reading_pos : offset;
/* function will block when the range is not yet available */
- ret = gst_queue_create_read (queue, offset, length, buffer);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ ret = gst_queue2_create_read (queue, offset, length, buffer);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_object_unref (queue);
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
return GST_FLOW_WRONG_STATE;
}
}
static gboolean
-gst_queue_src_checkgetrange_function (GstPad * pad)
+gst_queue2_src_checkgetrange_function (GstPad * pad)
{
- GstQueue *queue;
+ GstQueue2 *queue;
gboolean ret;
- queue = GST_QUEUE (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (gst_pad_get_parent (pad));
/* we can operate in pull mode when we are using a tempfile */
ret = QUEUE_IS_USING_TEMP_FILE (queue);
/* sink currently only operates in push mode */
static gboolean
-gst_queue_sink_activate_push (GstPad * pad, gboolean active)
+gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
{
gboolean result = TRUE;
- GstQueue *queue;
+ GstQueue2 *queue;
- queue = GST_QUEUE (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (gst_pad_get_parent (pad));
if (active) {
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
reset_rate_timer (queue);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
/* unblock chain function */
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
- gst_queue_locked_flush (queue);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ gst_queue2_locked_flush (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
gst_object_unref (queue);
/* src operating in push mode, we start a task on the source pad that pushes out
* buffers from the queue */
static gboolean
-gst_queue_src_activate_push (GstPad * pad, gboolean active)
+gst_queue2_src_activate_push (GstPad * pad, gboolean active)
{
gboolean result = FALSE;
- GstQueue *queue;
+ GstQueue2 *queue;
- queue = GST_QUEUE (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (gst_pad_get_parent (pad));
if (active) {
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating push mode");
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
- result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
/* unblock loop function */
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating push mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
/* the item add signal will unblock */
g_cond_signal (queue->item_add);
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
/* pull mode, downstream will call our getrange function */
static gboolean
-gst_queue_src_activate_pull (GstPad * pad, gboolean active)
+gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
{
gboolean result;
- GstQueue *queue;
+ GstQueue2 *queue;
- queue = GST_QUEUE (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (gst_pad_get_parent (pad));
if (active) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating pull mode");
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
result = TRUE;
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
/* this is not allowed, we cannot operate in pull mode without a temp
* file. */
queue->srcresult = GST_FLOW_WRONG_STATE;
result = FALSE;
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
} else {
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating pull mode");
queue->srcresult = GST_FLOW_WRONG_STATE;
/* this will unlock getrange */
g_cond_signal (queue->item_add);
result = TRUE;
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
gst_object_unref (queue);
}
static GstStateChangeReturn
-gst_queue_change_state (GstElement * element, GstStateChange transition)
+gst_queue2_change_state (GstElement * element, GstStateChange transition)
{
- GstQueue *queue;
+ GstQueue2 *queue;
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
- queue = GST_QUEUE (element);
+ queue = GST_QUEUE2 (element);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
- if (!gst_queue_open_temp_location_file (queue))
+ if (!gst_queue2_open_temp_location_file (queue))
ret = GST_STATE_CHANGE_FAILURE;
}
queue->segment_event_received = FALSE;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (QUEUE_IS_USING_TEMP_FILE (queue))
- gst_queue_close_temp_location_file (queue);
+ gst_queue2_close_temp_location_file (queue);
if (queue->starting_segment != NULL) {
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;
g_cond_signal (queue->item_add);
static void
-gst_queue_set_temp_template (GstQueue * queue, const gchar * template)
+gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
{
GstState state;
}
static void
-gst_queue_set_property (GObject * object,
+gst_queue2_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
- GstQueue *queue = GST_QUEUE (object);
+ GstQueue2 *queue = GST_QUEUE2 (object);
/* someone could change levels here, and since this
* affects the get/put funcs, we need to lock for safety. */
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
switch (prop_id) {
case PROP_MAX_SIZE_BYTES:
queue->high_percent = g_value_get_int (value);
break;
case PROP_TEMP_TEMPLATE:
- gst_queue_set_temp_template (queue, g_value_get_string (value));
+ gst_queue2_set_temp_template (queue, g_value_get_string (value));
break;
case PROP_TEMP_LOCATION:
g_free (queue->temp_location);
break;
}
- GST_QUEUE_MUTEX_UNLOCK (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
static void
-gst_queue_get_property (GObject * object,
+gst_queue2_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
- GstQueue *queue = GST_QUEUE (object);
+ GstQueue2 *queue = GST_QUEUE2 (object);
- GST_QUEUE_MUTEX_LOCK (queue);
+ GST_QUEUE2_MUTEX_LOCK (queue);
switch (prop_id) {
case PROP_CUR_LEVEL_BYTES:
break;
}
- GST_QUEUE_MUTEX_UNLOCK (queue);
-}
-
-static gboolean
-plugin_init (GstPlugin * plugin)
-{
- GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element");
- GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0,
- "dataflow inside the queue element");
-
-#ifdef ENABLE_NLS
- GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE,
- LOCALEDIR);
- bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR);
- bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8");
-#endif /* ENABLE_NLS */
-
- return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
-
-GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
- GST_VERSION_MINOR,
- "queue2",
- "Queue newer version", plugin_init, VERSION, GST_LICENSE,
- GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)