queue->max_level.time, \
(guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
queue->current->writing_pos - queue->current->max_reading_pos : \
- queue->queue->length))
+ queue->queue.length))
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (q->qlock); \
} \
} G_STMT_END
-#define _do_init(bla) \
+#define _do_init \
GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
"dataflow inside the queue element");
-
-GST_BOILERPLATE_FULL (GstQueue2, gst_queue2, GstElement, GST_TYPE_ELEMENT,
- _do_init);
+#define gst_queue2_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
static void gst_queue2_finalize (GObject * object);
static void gst_queue2_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
-static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
-static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
- guint size, GstCaps * caps, GstBuffer ** buf);
+static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
-static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
-static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
static gboolean gst_queue2_handle_query (GstElement * element,
GstQuery * query);
-static GstCaps *gst_queue2_getcaps (GstPad * pad);
-static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
+static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
+ guint64 offset, guint length, GstBuffer ** buffer);
-static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
- guint length, GstBuffer ** buffer);
-static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
-
-static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
-static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
-static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
+static gboolean gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent,
+ gboolean active);
+static gboolean gst_queue2_src_activate_push (GstPad * pad, GstObject * parent,
+ gboolean active);
+static gboolean gst_queue2_sink_activate_push (GstPad * pad, GstObject * parent,
+ gboolean active);
static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
GstStateChange transition);
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
-
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
static void
-gst_queue2_base_init (gpointer g_class)
-{
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
-
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&srctemplate));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sinktemplate));
-
- gst_element_class_set_details_simple (gstelement_class, "Queue 2",
- "Generic",
- "Simple data queue",
- "Erik Walthinsen <omega@cse.ogi.edu>, "
- "Wim Taymans <wim.taymans@gmail.com>");
-}
-
-static void
gst_queue2_class_init (GstQueue2Class * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
g_param_spec_uint64 ("ring-buffer-max-size",
"Max. ring buffer size (bytes)",
- "Max. amount of data in the ring buffer (bytes, 0 = disabled",
+ "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/* set several parent class virtual functions */
gobject_class->finalize = gst_queue2_finalize;
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&srctemplate));
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&sinktemplate));
+
+ gst_element_class_set_details_simple (gstelement_class, "Queue 2",
+ "Generic",
+ "Simple data queue",
+ "Erik Walthinsen <omega@cse.ogi.edu>, "
+ "Wim Taymans <wim.taymans@gmail.com>");
+
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
}
static void
-gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
+gst_queue2_init (GstQueue2 * queue)
{
queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
- gst_pad_set_getcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
- gst_pad_set_acceptcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
- gst_pad_set_bufferalloc_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
+ gst_pad_set_query_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
+ GST_OBJECT_FLAG_SET (queue->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
gst_pad_set_getrange_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_get_range));
- gst_pad_set_checkgetrange_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
- gst_pad_set_getcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
- gst_pad_set_acceptcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
gst_pad_set_event_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
gst_pad_set_query_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
+ GST_OBJECT_FLAG_SET (queue->srcpad, GST_PAD_FLAG_PROXY_CAPS);
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
/* levels */
queue->item_add = g_cond_new ();
queue->waiting_del = FALSE;
queue->item_del = g_cond_new ();
- queue->queue = g_queue_new ();
+ g_queue_init (&queue->queue);
queue->buffering_percent = 100;
GST_DEBUG_OBJECT (queue, "finalizing queue");
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
gst_mini_object_unref (data);
}
- g_queue_free (queue->queue);
+ g_queue_clear (&queue->queue);
g_mutex_free (queue->qlock);
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
queue->current = add_range (queue, 0);
}
-static gboolean
-gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
-{
- GstQueue2 *queue;
- GstPad *otherpad;
- gboolean result;
-
- queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
-
- otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
- result = gst_pad_peer_accept_caps (otherpad, caps);
-
- return result;
-}
-
-static GstCaps *
-gst_queue2_getcaps (GstPad * pad)
-{
- GstQueue2 *queue;
- GstPad *otherpad;
- GstCaps *result;
-
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
- if (G_UNLIKELY (queue == NULL))
- return gst_caps_new_any ();
-
- otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
- result = gst_pad_peer_get_caps (otherpad);
- if (result == NULL)
- result = gst_caps_new_any ();
-
- gst_object_unref (queue);
-
- return result;
-}
-
-static GstFlowReturn
-gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
- GstCaps * caps, GstBuffer ** buf)
-{
- GstQueue2 *queue;
- GstFlowReturn result;
-
- queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
-
- /* Forward to src pad, without setting caps on the src pad */
- result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
-
- return result;
-}
-
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue. */
static void
if (queue->sink_tainted) {
queue->sinktime =
gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
- queue->sink_segment.last_stop);
+ queue->sink_segment.position);
queue->sink_tainted = FALSE;
}
if (queue->src_tainted) {
queue->srctime =
gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
- queue->src_segment.last_stop);
+ queue->src_segment.position);
queue->src_tainted = FALSE;
}
queue->cur_level.time = 0;
}
-/* take a NEWSEGMENT event and apply the values to segment, updating the time
+/* take a SEGMENT event and apply the values to segment, updating the time
* level of queue. */
static void
apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
gboolean is_sink)
{
- gboolean update;
- GstFormat format;
- gdouble rate, arate;
- gint64 start, stop, time;
-
- gst_event_parse_new_segment_full (event, &update, &rate, &arate,
- &format, &start, &stop, &time);
-
- GST_DEBUG_OBJECT (queue,
- "received NEWSEGMENT update %d, rate %lf, applied rate %lf, "
- "format %d, "
- "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
- G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
+ gst_event_copy_segment (event, segment);
- if (format == GST_FORMAT_BYTES) {
+ if (segment->format == GST_FORMAT_BYTES) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
/* start is where we'll be getting from and as such writing next */
- queue->current = add_range (queue, start);
+ queue->current = add_range (queue, segment->start);
/* update the stats for this range */
update_cur_level (queue, queue->current);
}
/* now configure the values, we use these to track timestamps on the
* sinkpad. */
- if (format != GST_FORMAT_TIME) {
+ if (segment->format != GST_FORMAT_TIME) {
/* non-time format, pretent the current time segment is closed with a
* 0 start and unknown stop time. */
- update = FALSE;
- format = GST_FORMAT_TIME;
- start = 0;
- stop = -1;
- time = 0;
+ segment->format = GST_FORMAT_TIME;
+ segment->start = 0;
+ segment->stop = -1;
+ segment->time = 0;
}
- gst_segment_set_newsegment_full (segment, update,
- rate, arate, format, start, stop, time);
- GST_DEBUG_OBJECT (queue,
- "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
+ GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
if (is_sink)
queue->sink_tainted = TRUE;
/* if no timestamp is set, assume it's continuous with the previous
* time */
if (timestamp == GST_CLOCK_TIME_NONE)
- timestamp = segment->last_stop;
+ timestamp = segment->position;
/* add duration */
if (duration != GST_CLOCK_TIME_NONE)
timestamp += duration;
- GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+ GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
GST_TIME_ARGS (timestamp));
- gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+ segment->position = timestamp;
if (is_sink)
queue->sink_tainted = TRUE;
queue->buffering_percent = percent;
if (!QUEUE_IS_USING_QUEUE (queue)) {
- GstFormat fmt = GST_FORMAT_BYTES;
gint64 duration;
if (QUEUE_IS_USING_RING_BUFFER (queue))
mode = GST_BUFFERING_DOWNLOAD;
if (queue->byte_in_rate > 0) {
- if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
+ if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
+ &duration)) {
buffering_left =
(gdouble) ((duration -
queue->current->writing_pos) * 1000) / queue->byte_in_rate;
+ }
} else {
buffering_left = G_MAXINT64;
}
queue->bytes_in = 0;
queue->bytes_out = 0;
queue->byte_in_rate = 0.0;
+ queue->byte_in_period = 0;
queue->byte_out_rate = 0.0;
queue->last_in_elapsed = 0.0;
queue->last_out_elapsed = 0.0;
/* Tuning for rate estimation. We use a large window for the input rate because
* it should be stable when connected to a network. The output rate is less
* stable (the elements preroll, queues behind a demuxer fill, ...) and should
- * therefore adapt more quickly. */
-#define AVG_IN(avg,val) ((avg) * 15.0 + (val)) / 16.0
+ * therefore adapt more quickly.
+ * However, initial input rate may be subject to a burst, and should therefore
+ * initially also adapt more quickly to changes, and only later on give higher
+ * weight to previous values. */
+#define AVG_IN(avg,val,w1,w2) ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
static void
period = elapsed - queue->last_in_elapsed;
GST_DEBUG_OBJECT (queue,
- "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
+ "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
+ period, queue->bytes_in, queue->byte_in_period);
byte_in_rate = queue->bytes_in / period;
if (queue->byte_in_rate == 0.0)
queue->byte_in_rate = byte_in_rate;
else
- queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
+ queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
+ (double) queue->byte_in_period, period);
+
+ /* another data point, cap at 16 for long time running average */
+ if (queue->byte_in_period < 16 * RATE_INTERVAL)
+ queue->byte_in_period += period;
/* reset the values to calculate rate over the next interval */
queue->last_in_elapsed = elapsed;
} else {
GST_INFO_OBJECT (queue, "not found in any range");
/* we don't have the range, see how far away we are, FIXME, find a good
- * threshold based on the incomming rate. */
+ * threshold based on the incoming rate. */
if (!queue->is_eos && queue->current) {
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
if (offset < queue->current->offset || offset >
#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
#endif
-static gint64
+static GstFlowReturn
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
- guint8 * dst)
+ guint8 * dst, gint64 * read_return)
{
guint8 *ring_buffer;
size_t res;
goto eos;
}
- return res;
+ *read_return = res;
+
+ return GST_FLOW_OK;
seek_failed:
{
eos:
{
GST_DEBUG ("non-regular file hits EOS");
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
}
guint8 *data;
guint64 file_offset;
guint block_length, remaining, read_length;
- gint64 read_return;
guint64 rb_size;
guint64 rpos;
+ GstFlowReturn ret = GST_FLOW_OK;
/* allocate the output buffer of the requested size */
- buf = gst_buffer_new_and_alloc (length);
- data = GST_BUFFER_DATA (buf);
+ buf = gst_buffer_new_allocate (NULL, length, 0);
+ data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE);
GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
offset);
"EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
level);
read_length = level;
- } else {
- GST_DEBUG_OBJECT (queue,
- "EOS hit and we don't have any requested data");
- gst_buffer_unref (buf);
- return GST_FLOW_UNEXPECTED;
- }
+ } else
+ goto hit_eos;
}
}
/* set range reading_pos to actual reading position for this read */
queue->current->reading_pos = rpos;
- /* congfigure how much and from where to read */
+ /* configure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
(queue->current->rb_offset + (rpos -
/* while we still have data to read, we loop */
while (read_length > 0) {
- read_return =
+ gint64 read_return;
+
+ ret =
gst_queue2_read_data_at_offset (queue, file_offset, block_length,
- data);
- if (read_return < 0)
+ data, &read_return);
+ if (ret != GST_FLOW_OK)
goto read_error;
file_offset += read_return;
GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
}
- GST_BUFFER_SIZE (buf) = length;
+ gst_buffer_unmap (buf, data, length);
+
GST_BUFFER_OFFSET (buf) = offset;
GST_BUFFER_OFFSET_END (buf) = offset + length;
*buffer = buf;
- return GST_FLOW_OK;
+ return ret;
/* ERRORS */
+hit_eos:
+ {
+ GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
+ gst_buffer_unref (buf);
+ return GST_FLOW_EOS;
+ }
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
read_error:
{
GST_DEBUG_OBJECT (queue, "we have a read error");
+ gst_buffer_unmap (buf, data, 0);
gst_buffer_unref (buf);
- return read_return;
+ return ret;
}
}
case GST_FLOW_OK:
item = GST_MINI_OBJECT_CAST (buffer);
break;
- case GST_FLOW_UNEXPECTED:
+ case GST_FLOW_EOS:
item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
break;
default:
gst_queue2_flush_temp_file (queue);
init_ranges (queue);
} else {
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
static gboolean
gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
{
- guint8 *data, *ring_buffer;
+ guint8 *odata, *data, *ring_buffer;
guint size, rb_size;
+ gsize osize;
guint64 writing_pos, new_writing_pos;
GstQueue2Range *range, *prev, *next;
ring_buffer = queue->ring_buffer;
rb_size = queue->ring_buffer_max_size;
- size = GST_BUFFER_SIZE (buffer);
- data = GST_BUFFER_DATA (buffer);
+ odata = gst_buffer_map (buffer, &osize, NULL, GST_MAP_READ);
+
+ size = osize;
+ data = odata;
GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
GST_BUFFER_OFFSET (buffer));
queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
GST_QUEUE2_SIGNAL_ADD (queue);
- };
+ }
+
+ gst_buffer_unmap (buffer, odata, osize);
return TRUE;
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
- /* FIXME - GST_FLOW_UNEXPECTED ? */
+ gst_buffer_unmap (buffer, odata, osize);
+ /* FIXME - GST_FLOW_EOS ? */
return FALSE;
}
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
+ gst_buffer_unmap (buffer, odata, osize);
return FALSE;
}
handle_error:
("%s", g_strerror (errno)));
}
}
+ gst_buffer_unmap (buffer, odata, osize);
return FALSE;
}
}
guint size;
buffer = GST_BUFFER_CAST (item);
- size = GST_BUFFER_SIZE (buffer);
+ size = gst_buffer_get_size (buffer);
/* add buffer to the statistics */
if (QUEUE_IS_USING_QUEUE (queue)) {
GST_DEBUG_OBJECT (queue, "we have EOS");
queue->is_eos = TRUE;
break;
- case GST_EVENT_NEWSEGMENT:
+ case GST_EVENT_SEGMENT:
apply_segment (queue, event, &queue->sink_segment, TRUE);
/* This is our first new segment, we hold it
* as we can't save it on the temp file */
queue->starting_segment = event;
item = NULL;
}
- /* a new segment allows us to accept more buffers if we got UNEXPECTED
+ /* a new segment allows us to accept more buffers if we got EOS
* from downstream */
queue->unexpected = FALSE;
break;
update_buffering (queue);
if (QUEUE_IS_USING_QUEUE (queue)) {
- g_queue_push_tail (queue->queue, item);
+ g_queue_push_tail (&queue->queue, item);
} else {
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
}
if (!QUEUE_IS_USING_QUEUE (queue))
item = gst_queue2_read_item_from_file (queue);
else
- item = g_queue_pop_head (queue->queue);
+ item = g_queue_pop_head (&queue->queue);
if (item == NULL)
goto no_item;
guint size;
buffer = GST_BUFFER_CAST (item);
- size = GST_BUFFER_SIZE (buffer);
+ size = gst_buffer_get_size (buffer);
*is_buffer = TRUE;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
/* queue is empty now that we dequeued the EOS */
GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
break;
- case GST_EVENT_NEWSEGMENT:
+ case GST_EVENT_SEGMENT:
apply_segment (queue, event, &queue->src_segment, FALSE);
break;
default:
}
static gboolean
-gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE2 (parent);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
}
static gboolean
+gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query)
+{
+ gboolean res;
+
+ switch (GST_QUERY_TYPE (query)) {
+ default:
+ res = gst_pad_query_default (pad, parent, query);
+ break;
+ }
+ return res;
+}
+
+static gboolean
gst_queue2_is_empty (GstQueue2 * queue)
{
/* never empty on EOS */
if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
return queue->current->writing_pos <= queue->current->max_reading_pos;
} else {
- if (queue->queue->length == 0)
+ if (queue->queue.length == 0)
return TRUE;
}
}
static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE2 (parent);
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
- GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
+ G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+ GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
out_unexpected:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "exit because we received UNEXPECTED");
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
}
if (is_buffer) {
GstBuffer *buffer;
+#if 0
GstCaps *caps;
+#endif
buffer = GST_BUFFER_CAST (data);
+#if 0
caps = GST_BUFFER_CAPS (buffer);
+#endif
+#if 0
/* set caps before pushing the buffer so that core does not try to do
* something fancy to check if this is possible. */
if (caps && caps != GST_PAD_CAPS (queue->srcpad))
gst_pad_set_caps (queue->srcpad, caps);
+#endif
result = gst_pad_push (queue->srcpad, buffer);
/* need to check for srcresult here as well */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
- if (result == GST_FLOW_UNEXPECTED) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "got UNEXPECTED from downstream");
+ if (result == GST_FLOW_EOS) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
/* stop pushing buffers, we dequeue all items until we see an item that we
- * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
+ * can push again, which is EOS or SEGMENT. If there is nothing in the
* queue we can push, we set a flag to make the sinkpad refuse more
- * buffers with an UNEXPECTED return value until we receive something
+ * buffers with an EOS return value until we receive something
* pushable again or we get flushed. */
while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) {
if (is_buffer) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED buffer %p", data);
+ "dropping EOS buffer %p", data);
gst_buffer_unref (GST_BUFFER_CAST (data));
} else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
- if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
+ if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
/* we found a pushable item in the queue, push it out */
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "pushing pushable event %s after UNEXPECTED",
+ "pushing pushable event %s after EOS",
GST_EVENT_TYPE_NAME (event));
goto next;
}
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED event %p", event);
+ "dropping EOS event %p", event);
gst_event_unref (event);
}
}
/* no more items in the queue. Set the unexpected flag so that upstream
* make us refuse any more buffers on the sinkpad. Since we will still
- * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
+ * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
* task function does not shut down. */
queue->unexpected = TRUE;
result = GST_FLOW_OK;
gst_pad_push_event (queue->srcpad, event);
- /* if we're EOS, return UNEXPECTED so that the task pauses. */
+ /* if we're EOS, return EOS so that the task pauses. */
if (type == GST_EVENT_EOS) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "pushed EOS event %p, return UNEXPECTED", event);
- result = GST_FLOW_UNEXPECTED;
+ "pushed EOS event %p, return EOS", event);
+ result = GST_FLOW_EOS;
}
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
}
}
-/* called repeadedly with @pad as the source pad. This function should push out
+/* called repeatedly with @pad as the source pad. This function should push out
* data to the peer element. */
static void
gst_queue2_loop (GstPad * pad)
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (queue->srcresult));
/* let app know about us giving up if upstream is not expected to do so */
- /* UNEXPECTED is already taken care of elsewhere */
- if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) {
+ /* EOS is already taken care of elsewhere */
+ if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
GST_ELEMENT_ERROR (queue, STREAM, FAILED,
(_("Internal data flow error.")),
("streaming task paused, reason %s (%d)",
}
static gboolean
-gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
gboolean res = TRUE;
- GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ GstQueue2 *queue = GST_QUEUE2 (parent);
- if (G_UNLIKELY (queue == NULL)) {
- gst_event_unref (event);
- return FALSE;
- }
#ifndef GST_DISABLE_GST_DEBUG
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
event, GST_EVENT_TYPE_NAME (event));
break;
}
- gst_object_unref (queue);
return res;
}
static gboolean
-gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
-{
- gboolean ret = FALSE;
- GstPad *peer;
-
- if ((peer = gst_pad_get_peer (pad))) {
- ret = gst_pad_query (peer, query);
- gst_object_unref (peer);
- }
- return ret;
-}
-
-static gboolean
-gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
+gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
- if (G_UNLIKELY (queue == NULL))
- return FALSE;
+ queue = GST_QUEUE2 (parent);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:
gint64 peer_pos;
GstFormat format;
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
/* get peer position */
{
GST_DEBUG_OBJECT (queue, "doing peer query");
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "peer query success");
/* FIXME - is this condition correct? what should ring buffer do? */
if (QUEUE_IS_USING_QUEUE (queue)) {
/* no temp file, just forward to the peer */
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
} else {
guint64 writing_pos;
gint percent;
gint64 estimated_total, buffering_left;
- GstFormat peer_fmt;
gint64 duration;
gboolean peer_res, is_buffering, is_eos;
gdouble byte_in_rate, byte_out_rate;
duration = writing_pos;
} else {
/* get duration of upstream in bytes */
- peer_fmt = GST_FORMAT_BYTES;
- peer_res = gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt,
- &duration);
+ peer_res = gst_pad_peer_query_duration (queue->sinkpad,
+ GST_FORMAT_BYTES, &duration);
}
/* calculate remaining and total download time */
}
break;
}
+ case GST_QUERY_SCHEDULING:
+ {
+ gboolean pull_mode;
+ GstSchedulingFlags flags = 0;
+
+ /* we can operate in pull mode when we are using a tempfile */
+ pull_mode = !QUEUE_IS_USING_QUEUE (queue);
+
+ if (pull_mode)
+ flags |= GST_SCHEDULING_FLAG_SEEKABLE;
+ gst_query_set_scheduling (query, flags, 0, -1, 0);
+ if (pull_mode)
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
+ break;
+ }
default:
/* peer handled other queries */
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_query_default (pad, parent, query))
goto peer_failed;
break;
}
- gst_object_unref (queue);
return TRUE;
/* ERRORS */
peer_failed:
{
GST_DEBUG_OBJECT (queue, "failed peer query");
- gst_object_unref (queue);
return FALSE;
}
}
static gboolean
gst_queue2_handle_query (GstElement * element, GstQuery * query)
{
+ GstQueue2 *queue = GST_QUEUE2 (element);
+
/* simply forward to the srcpad query function */
- return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
+ return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
+ query);
}
static void
gst_queue2_update_upstream_size (GstQueue2 * queue)
{
- GstFormat fmt = GST_FORMAT_BYTES;
gint64 upstream_size = -1;
- if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &upstream_size)) {
+ if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
+ &upstream_size)) {
GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
queue->upstream_size = upstream_size;
}
}
static GstFlowReturn
-gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
- GstBuffer ** buffer)
+gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
+ guint length, GstBuffer ** buffer)
{
GstQueue2 *queue;
GstFlowReturn ret;
- queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2_CAST (parent);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
ret = gst_queue2_create_read (queue, offset, length, buffer);
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
-
return ret;
/* ERRORS */
GST_DEBUG_OBJECT (queue, "we are flushing");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
return ret;
}
out_unexpected:
{
GST_DEBUG_OBJECT (queue, "read beyond end of file");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
}
-static gboolean
-gst_queue2_src_checkgetrange_function (GstPad * pad)
-{
- GstQueue2 *queue;
- gboolean ret;
-
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
-
- /* we can operate in pull mode when we are using a tempfile */
- ret = !QUEUE_IS_USING_QUEUE (queue);
-
- gst_object_unref (GST_OBJECT (queue));
-
- return ret;
-}
-
/* sink currently only operates in push mode */
static gboolean
-gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
+gst_queue2_sink_activate_push (GstPad * pad, GstObject * parent,
+ gboolean active)
{
gboolean result = TRUE;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
GST_QUEUE2_MUTEX_UNLOCK (queue);
}
- gst_object_unref (queue);
-
return result;
}
/* src operating in push mode, we start a task on the source pad that pushes out
* buffers from the queue */
static gboolean
-gst_queue2_src_activate_push (GstPad * pad, gboolean active)
+gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
{
gboolean result = FALSE;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
result = gst_pad_stop_task (pad);
}
- gst_object_unref (queue);
-
return result;
}
/* pull mode, downstream will call our getrange function */
static gboolean
-gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
+gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
{
gboolean result;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
queue->is_eos = FALSE;
queue->unexpected = FALSE;
queue->upstream_size = 0;
- GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
- GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
/* this is not allowed, we cannot operate in pull mode without a temp
* file. */
queue->srcresult = GST_FLOW_WRONG_STATE;
queue->sinkresult = GST_FLOW_WRONG_STATE;
result = FALSE;
- GST_QUEUE2_MUTEX_UNLOCK (queue);
}
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating pull mode");
result = TRUE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
}
- gst_object_unref (queue);
return result;
}