* The default queue size limits are 100 buffers, 2MB of data, or
* two seconds worth of data, whichever is reached first.
*
- * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
+ * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
* will allocate a random free filename and buffer data in the file.
* By using this, it will buffer the entire stream data on the file independently
* of the queue size limits, they will only be used for buffering statistics.
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
- gst_element_class_set_details_simple (gstelement_class, "Queue 2",
+ gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
"Generic",
"Simple data queue",
"Erik Walthinsen <omega@cse.ogi.edu>, "
gst_pad_set_query_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
- GST_PAD_SET_PROXY_ALLOCATION (queue->sinkpad);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
gst_event_copy_segment (event, segment);
if (segment->format == GST_FORMAT_BYTES) {
- if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
/* start is where we'll be getting from and as such writing next */
queue->current = add_range (queue, segment->start);
- /* update the stats for this range */
- update_cur_level (queue, queue->current);
}
}
GstEvent *event;
gboolean res;
+ /* until we receive the FLUSH_STOP from this seek, we skip data */
+ queue->seeking = TRUE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
(offset + length) - range->writing_pos);
} else {
- GST_INFO_OBJECT (queue, "not found in any range");
- /* we don't have the range, see how far away we are, FIXME, find a good
- * threshold based on the incoming rate. */
+ GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
+ " len %u", offset, length);
+ /* we don't have the range, see how far away we are */
if (!queue->is_eos && queue->current) {
+ /* FIXME, find a good threshold based on the incoming rate. */
+ guint64 threshold = 1024 * 512;
+
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
- if (offset < queue->current->offset || offset >
- queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
- queue->cur_level.bytes) {
- perform_seek_to_offset (queue, offset);
- } else {
+ guint64 distance;
+
+ distance = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+ /* don't wait for the complete buffer to fill */
+ distance = MIN (distance, threshold);
+
+ if (offset >= queue->current->offset && offset <=
+ queue->current->writing_pos + distance) {
GST_INFO_OBJECT (queue,
"requested data is within range, wait for data");
+ return FALSE;
}
- } else if (offset < queue->current->writing_pos + 200000) {
+ } else if (offset < queue->current->writing_pos + threshold) {
update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "wait for data");
return FALSE;
guint64 file_offset;
guint block_length, remaining, read_length;
guint64 rb_size;
+ guint64 max_size;
guint64 rpos;
GstFlowReturn ret = GST_FLOW_OK;
/* allocate the output buffer of the requested size */
- buf = gst_buffer_new_allocate (NULL, length, 0);
+ if (*buffer == NULL)
+ buf = gst_buffer_new_allocate (NULL, length, NULL);
+ else
+ buf = *buffer;
+
gst_buffer_map (buf, &info, GST_MAP_WRITE);
data = info.data;
rpos = offset;
rb_size = queue->ring_buffer_max_size;
+ max_size = QUEUE_MAX_BYTES (queue);
remaining = length;
while (remaining > 0) {
GST_DEBUG_OBJECT (queue,
"reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
- ", level %" G_GUINT64_FORMAT,
- rpos, queue->current->writing_pos, level);
+ ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+ rpos, queue->current->writing_pos, level, max_size);
- if (level >= rb_size) {
+ if (level >= max_size) {
/* we don't have the data but if we have a ring buffer that is full, we
* need to read */
GST_DEBUG_OBJECT (queue,
- "ring buffer full, reading ring-buffer-max-size %"
- G_GUINT64_FORMAT " bytes", rb_size);
- read_length = rb_size;
+ "ring buffer full, reading QUEUE_MAX_BYTES %"
+ G_GUINT64_FORMAT " bytes", max_size);
+ read_length = max_size;
} else if (queue->is_eos) {
/* won't get any more data so read any data we have */
if (level) {
"EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
level);
read_length = level;
+ remaining = level;
+ length = level;
} else
goto hit_eos;
}
}
if (read_length == 0) {
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- && queue->current->max_reading_pos > rpos) {
- /* protect cached data (data between offset and max_reading_pos)
- * and update current level */
+ if (QUEUE_IS_USING_RING_BUFFER (queue)) {
GST_DEBUG_OBJECT (queue,
- "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
- "]", rpos, queue->current->max_reading_pos);
- queue->current->max_reading_pos = rpos;
- update_cur_level (queue, queue->current);
+ "update current position [%" G_GUINT64_FORMAT "-%"
+ G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
+ update_cur_pos (queue, queue->current, rpos);
+ GST_QUEUE2_SIGNAL_DEL (queue);
}
GST_DEBUG_OBJECT (queue, "waiting for add");
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
{
GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
gst_buffer_unmap (buf, &info);
- gst_buffer_unref (buf);
+ if (*buffer == NULL)
+ gst_buffer_unref (buf);
return GST_FLOW_EOS;
}
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
gst_buffer_unmap (buf, &info);
- gst_buffer_unref (buf);
+ if (*buffer == NULL)
+ gst_buffer_unref (buf);
return GST_FLOW_FLUSHING;
}
read_error:
{
GST_DEBUG_OBJECT (queue, "we have a read error");
gst_buffer_unmap (buf, &info);
- gst_buffer_unref (buf);
+ if (*buffer == NULL)
+ gst_buffer_unref (buf);
return ret;
}
}
queue->starting_segment = NULL;
} else {
GstFlowReturn ret;
- GstBuffer *buffer;
+ GstBuffer *buffer = NULL;
guint64 reading_pos;
reading_pos = queue->current->reading_pos;
data = info.data;
GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
- GST_BUFFER_OFFSET (buffer));
+ writing_pos);
while (size > 0) {
guint to_write;
case GST_EVENT_FLUSH_START:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
- if (QUEUE_IS_USING_QUEUE (queue)) {
+ if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
- if (QUEUE_IS_USING_QUEUE (queue)) {
+ if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
queue->sinkresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
+ queue->seeking = FALSE;
/* reset rate counters */
reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
- queue->srcpad);
+ queue->srcpad, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
queue->is_eos = FALSE;
queue->unexpected = FALSE;
queue->sinkresult = GST_FLOW_OK;
+ queue->seeking = FALSE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
switch (GST_QUERY_TYPE (query)) {
default:
- res = gst_pad_query_default (pad, parent, query);
+ if (GST_QUERY_IS_SERIALIZED (query)) {
+ GST_WARNING_OBJECT (pad, "unhandled serialized query");
+ res = FALSE;
+ } else {
+ res = gst_pad_query_default (pad, parent, query);
+ }
break;
}
return res;
if (queue->unexpected)
goto out_unexpected;
+ /* while we didn't receive the newsegment, we're seeking and we skip data */
+ if (queue->seeking)
+ goto out_seeking;
+
if (!gst_queue2_wait_free_space (queue))
goto out_flushing;
return GST_FLOW_EOS;
}
+out_seeking:
+ {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_mini_object_unref (item);
+
+ return GST_FLOW_OK;
+ }
out_unexpected:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer;
-#if 0
- GstCaps *caps;
-#endif
buffer = GST_BUFFER_CAST (data);
-#if 0
- caps = GST_BUFFER_CAPS (buffer);
-#endif
-
-#if 0
- /* set caps before pushing the buffer so that core does not try to do
- * something fancy to check if this is possible. */
- if (caps && caps != GST_PAD_CAPS (queue->srcpad))
- gst_pad_set_caps (queue->srcpad, caps);
-#endif
result = gst_pad_push (queue->srcpad, buffer);
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
} else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
GstBufferList *buffer_list;
-#if 0
- GstBuffer *first_buf;
- GstCaps *caps;
-#endif
buffer_list = GST_BUFFER_LIST_CAST (data);
-#if 0
- first_buf = gst_buffer_list_get (buffer_list, 0);
- caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
-
- /* set caps before pushing the buffer so that core does not try to do
- * something fancy to check if this is possible. */
- if (caps && caps != GST_PAD_CAPS (queue->srcpad))
- gst_pad_set_caps (queue->srcpad, caps);
-#endif
-
result = gst_pad_push_list (queue->srcpad, buffer_list);
/* need to check for srcresult here as well */
/* now unblock the getrange function */
GST_QUEUE2_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_OK;
- if (queue->current) {
- /* forget the highest read offset, we'll calculate a new one when we
- * get the next getrange request. We need to do this in order to reset
- * the buffering percentage */
- queue->current->max_reading_pos = 0;
- }
GST_QUEUE2_MUTEX_UNLOCK (queue);
/* when using a temp file, we eat the event */
start = 0;
/* get our available data relative to the duration */
if (duration != -1)
- stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
+ stop =
+ gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
+ duration);
else
stop = -1;
break;
range_stop = 0;
break;
}
- range_start = 100 * queued_ranges->offset / duration;
- range_stop = 100 * queued_ranges->writing_pos / duration;
+ range_start =
+ gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+ queued_ranges->offset, duration);
+ range_stop =
+ gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+ queued_ranges->writing_pos, duration);
break;
case GST_FORMAT_BYTES:
range_start = queued_ranges->offset;
queue->sinkresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
- result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
+ result =
+ gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
/* unblock loop function */
case PROP_TEMP_LOCATION:
g_free (queue->temp_location);
queue->temp_location = g_value_dup_string (value);
- /* you can set the property back to NULL to make it use the temp-tmpl
+ /* you can set the property back to NULL to make it use the temp-template
* property. */
queue->temp_location_set = queue->temp_location != NULL;
break;