} \
} G_STMT_END
+#define SET_PERCENT(q, perc) G_STMT_START { \
+ if (perc != q->buffering_percent) { \
+ q->buffering_percent = perc; \
+ q->percent_changed = TRUE; \
+ GST_DEBUG_OBJECT (q, "buffering %d percent", perc); \
+ get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out, \
+ &q->buffering_left); \
+ } \
+} G_STMT_END
+
#define _do_init \
GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
static void update_in_rates (GstQueue2 * queue);
+static void gst_queue2_post_buffering (GstQueue2 * queue);
+#ifdef GST_QUEUE2_MODIFICATION
+static gboolean change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length);
+#endif
typedef enum
{
g_cond_init (&queue->query_handled);
queue->last_query = FALSE;
+ g_mutex_init (&queue->buffering_post_lock);
queue->buffering_percent = 100;
/* tempfile related */
queue->last_query = FALSE;
g_queue_clear (&queue->queue);
g_mutex_clear (&queue->qlock);
+ g_mutex_clear (&queue->buffering_post_lock);
g_cond_clear (&queue->item_add);
g_cond_clear (&queue->item_del);
g_cond_clear (&queue->query_handled);
g_slice_free_chain (GstQueue2Range, queue->ranges, next);
queue->ranges = NULL;
queue->current = NULL;
+#ifdef GST_QUEUE2_MODIFICATION
+ queue->read = NULL;
+#endif
}
/* find a range that contains @offset or NULL when nothing does */
/* get rid of all the current ranges */
clean_ranges (queue);
/* make a range for offset 0 */
+#ifdef GST_QUEUE2_MODIFICATION
+ queue->current = queue->read = add_range (queue, 0, TRUE);
+#else
queue->current = add_range (queue, 0, TRUE);
+#endif
}
/* calculate the diff between running time on the sink and src of the queue.
if (segment->format == GST_FORMAT_BYTES) {
if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
/* start is where we'll be getting from and as such writing next */
+#ifdef GST_QUEUE2_MODIFICATION
+ queue->current = queue->read = add_range (queue, segment->start, TRUE);
+#else
queue->current = add_range (queue, segment->start, TRUE);
+#endif
}
}
update_time_level (queue);
}
+static void
+apply_gap (GstQueue2 * queue, GstEvent * event,
+ GstSegment * segment, gboolean is_sink)
+{
+ GstClockTime timestamp;
+ GstClockTime duration;
+
+ gst_event_parse_gap (event, ×tamp, &duration);
+
+ if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+
+ if (GST_CLOCK_TIME_IS_VALID (duration)) {
+ timestamp += duration;
+ }
+
+ segment->position = timestamp;
+
+ if (is_sink)
+ queue->sink_tainted = TRUE;
+ else
+ queue->src_tainted = TRUE;
+
+ /* calc diff with other end */
+ update_time_level (queue);
+ }
+}
+
/* take a buffer and update segment, updating the time level of the queue. */
static void
apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
}
}
+static void
+gst_queue2_post_buffering (GstQueue2 * queue)
+{
+ GstMessage *msg = NULL;
+
+ g_mutex_lock (&queue->buffering_post_lock);
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ if (queue->percent_changed) {
+ gint percent = queue->buffering_percent;
+
+ queue->percent_changed = FALSE;
+
+ GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
+ msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
+
+ gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
+ queue->avg_out, queue->buffering_left);
+ }
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+ if (msg != NULL)
+ gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
+
+ g_mutex_unlock (&queue->buffering_post_lock);
+}
+
static void
update_buffering (GstQueue2 * queue)
{
gint percent;
- gboolean post = FALSE;
+#ifdef GST_QUEUE2_MODIFICATION
+ GstQueue2Range *range;
+
+ if (queue->read)
+ range = queue->read;
+ else
+ range = queue->current;
+#endif
/* Ensure the variables used to calculate buffering state are up-to-date. */
+#ifdef GST_QUEUE2_MODIFICATION
+ if (range)
+ update_cur_level (queue, range);
+#else
if (queue->current)
update_cur_level (queue, queue->current);
+#endif
update_in_rates (queue);
if (!get_buffering_percent (queue, NULL, &percent))
return;
if (queue->is_buffering) {
- post = TRUE;
/* if we were buffering see if we reached the high watermark */
- if (percent >= queue->high_percent)
+ if (percent >= 100)
queue->is_buffering = FALSE;
+
+ SET_PERCENT (queue, percent);
} else {
/* we were not buffering, check if we need to start buffering if we drop
* below the low threshold */
if (percent < queue->low_percent) {
queue->is_buffering = TRUE;
- post = TRUE;
+ SET_PERCENT (queue, percent);
}
}
-
- if (post) {
- if (percent == queue->buffering_percent)
- post = FALSE;
- else
- queue->buffering_percent = percent;
- }
-
- if (post) {
- GstMessage *message;
- GstBufferingMode mode;
- gint avg_in, avg_out;
- gint64 buffering_left;
-
- get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
- &buffering_left);
-
- message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
- (gint) percent);
- gst_message_set_buffering_stats (message, mode,
- avg_in, avg_out, buffering_left);
-
- gst_element_post_message (GST_ELEMENT_CAST (queue), message);
- }
}
static void
* cause data to be written to the wrong offset in the file or ring buffer.
* We still do the add_range call to switch the current range to the
* requested range, or create one if one doesn't exist yet. */
+#ifdef GST_QUEUE2_MODIFICATION
+ queue->current = queue->read = add_range (queue, offset, FALSE);
+#else
queue->current = add_range (queue, offset, FALSE);
+#endif
}
return res;
return threshold;
}
+#ifdef GST_QUEUE2_MODIFICATION
+/* check the buffered data size, not to change range repeatedly.
+ * changing range cause the new http connection and it makes buffering state frequently. */
+static gboolean
+change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length)
+{
+ guint64 curr_level = 0;
+ guint64 curr_min_level = 0;
+
+ if (queue->current->writing_pos > queue->current->reading_pos)
+ curr_level = queue->current->writing_pos - queue->current->reading_pos;
+
+ /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
+ curr_min_level = MIN((queue->max_level.bytes*0.05), get_seek_threshold(queue));
+
+ GST_DEBUG_OBJECT(queue, " curr[w%"G_GUINT64_FORMAT", r%"G_GUINT64_FORMAT
+ ", d%"G_GUINT64_FORMAT", m%"G_GUINT64_FORMAT
+ "] u[%"G_GUINT64_FORMAT"][EOS:%d]",
+ queue->current->writing_pos, queue->current->reading_pos,
+ curr_level, curr_min_level,
+ queue->upstream_size, queue->is_eos);
+
+ /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
+ if ((queue->is_eos) ||
+ (queue->upstream_size > 0 && queue->current->writing_pos >= queue->upstream_size) ||
+ (curr_level >= curr_min_level)) {
+ GST_DEBUG_OBJECT (queue, "Range Switching");
+ return TRUE;
+ } else {
+ GST_DEBUG_OBJECT (queue, "keep receiving data without range switching.");
+ return FALSE;
+ }
+}
+#endif
+
/* see if there is enough data in the file to read a full buffer */
static gboolean
gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
{
GstQueue2Range *range;
-
+#ifdef GST_QUEUE2_MODIFICATION
+ gboolean need_to_seek = TRUE;
+#endif
GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
offset, length);
if ((range = find_range (queue, offset))) {
+#ifdef GST_QUEUE2_MODIFICATION
+ queue->read = range;
+#endif
if (queue->current != range) {
GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
+#ifdef GST_QUEUE2_MODIFICATION
+ if (QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER(queue))
+ need_to_seek = change_current_range(queue, range, offset, length);
+
+ if (need_to_seek == TRUE)
+ perform_seek_to_offset (queue, range->writing_pos);
+#else
perform_seek_to_offset (queue, range->writing_pos);
+#endif
}
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
} else {
GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
" len %u", offset, length);
+
+#ifdef GST_QUEUE2_MODIFICATION
+ queue->read = queue->current;
+#endif
+
/* we don't have the range, see how far away we are */
if (!queue->is_eos && queue->current) {
guint64 threshold = get_seek_threshold (queue);
guint64 level;
/* calculate how far away the offset is */
+#ifdef GST_QUEUE2_MODIFICATION
+ if (queue->read->writing_pos > rpos)
+ level = queue->read->writing_pos - rpos;
+#else
if (queue->current->writing_pos > rpos)
level = queue->current->writing_pos - rpos;
+#endif
else
level = 0;
GST_DEBUG_OBJECT (queue,
"reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+#ifdef GST_QUEUE2_MODIFICATION
+ rpos, queue->read->writing_pos, level, max_size);
+#else
rpos, queue->current->writing_pos, level, max_size);
+#endif
if (level >= max_size) {
/* we don't have the data but if we have a ring buffer that is full, we
if (read_length == 0) {
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+#ifdef GST_QUEUE2_MODIFICATION
+ GST_DEBUG_OBJECT (queue,
+ "update current position [%" G_GUINT64_FORMAT "-%"
+ G_GUINT64_FORMAT "]", rpos, queue->read->max_reading_pos);
+ update_cur_pos (queue, queue->read, rpos);
+#else
GST_DEBUG_OBJECT (queue,
"update current position [%" G_GUINT64_FORMAT "-%"
G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
update_cur_pos (queue, queue->current, rpos);
+#endif
GST_QUEUE2_SIGNAL_DEL (queue);
}
}
/* set range reading_pos to actual reading position for this read */
+#ifdef GST_QUEUE2_MODIFICATION
+ queue->read->reading_pos = rpos;
+#else
queue->current->reading_pos = rpos;
+#endif
/* configure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
+#ifdef GST_QUEUE2_MODIFICATION
+ (queue->read->rb_offset + (rpos -
+ queue->read->offset)) % rb_size;
+#else
(queue->current->rb_offset + (rpos -
queue->current->offset)) % rb_size;
+#endif
if (file_offset + read_length > rb_size) {
block_length = rb_size - file_offset;
} else {
block_length = read_length;
remaining -= read_return;
+#ifdef GST_QUEUE2_MODIFICATION
+ rpos = (queue->read->reading_pos += read_return);
+ update_cur_pos (queue, queue->read, queue->read->reading_pos);
+#else
rpos = (queue->current->reading_pos += read_return);
update_cur_pos (queue, queue->current, queue->current->reading_pos);
+#endif
}
GST_QUEUE2_SIGNAL_DEL (queue);
GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
GstBuffer *buffer = NULL;
guint64 reading_pos;
+#ifdef GST_QUEUE2_MODIFICATION
+ reading_pos = queue->read->reading_pos;
+#else
reading_pos = queue->current->reading_pos;
+#endif
ret =
gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
queue->current->rb_writing_pos = writing_pos = new_writing_pos;
} else {
queue->current->writing_pos = writing_pos = new_writing_pos;
+#ifdef GST_QUEUE2_MODIFICATION
+ if (do_seek) {
+ if (queue->upstream_size == 0 || new_writing_pos < queue->upstream_size)
+ perform_seek_to_offset (queue, new_writing_pos);
+ else
+ queue->is_eos = TRUE;
+ }
+#endif
}
+
+#ifndef GST_QUEUE2_MODIFICATION
if (do_seek)
perform_seek_to_offset (queue, new_writing_pos);
+#endif
update_cur_level (queue, queue->current);
/* add buffer to the statistics */
if (QUEUE_IS_USING_QUEUE (queue)) {
- queue->cur_level.buffers++;
+ queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
queue->cur_level.bytes += size;
}
queue->bytes_in += size;
* from downstream */
queue->unexpected = FALSE;
break;
+ case GST_EVENT_GAP:
+ apply_gap (queue, event, &queue->sink_segment, TRUE);
+ break;
case GST_EVENT_STREAM_START:
if (!QUEUE_IS_USING_QUEUE (queue)) {
gst_event_replace (&queue->stream_start_event, event);
case GST_EVENT_SEGMENT:
apply_segment (queue, event, &queue->src_segment, FALSE);
break;
+ case GST_EVENT_GAP:
+ apply_gap (queue, event, &queue->src_segment, FALSE);
+ break;
default:
break;
}
"retrieved buffer list %p from queue", buffer_list);
if (QUEUE_IS_USING_QUEUE (queue)) {
- queue->cur_level.buffers--;
+ queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
queue->cur_level.bytes -= size;
}
queue->bytes_out += size;
goto out_eos;
gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_queue2_post_buffering (queue);
} else {
/* non-serialized events are passed upstream. */
ret = gst_pad_push_event (queue->srcpad, event);
res = FALSE;
}
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_queue2_post_buffering (queue);
} else {
res = gst_pad_query_default (pad, parent, query);
}
/* put buffer in queue now */
gst_queue2_locked_enqueue (queue, item, item_type);
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_queue2_post_buffering (queue);
return GST_FLOW_OK;
item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_queue2_post_buffering (queue);
if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer;
goto out_flushing;
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_queue2_post_buffering (queue);
return;
range_stop = 0;
break;
}
+#ifdef GST_QUEUE2_MODIFICATION
+ range_start =
+ gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+ queued_ranges->reading_pos, duration);
+#else
range_start =
gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
queued_ranges->offset, duration);
+#endif
range_stop =
gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
queued_ranges->writing_pos, duration);
break;
case GST_FORMAT_BYTES:
+#ifdef GST_QUEUE2_MODIFICATION
+ range_start = queued_ranges->reading_pos;
+#else
range_start = queued_ranges->offset;
+#endif
range_stop = queued_ranges->writing_pos;
break;
default:
gboolean pull_mode;
GstSchedulingFlags flags = 0;
- if (!gst_pad_peer_query (queue->sinkpad, query))
+ if ((!QUEUE_IS_USING_QUEUE (queue)) && !gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
&upstream_size)) {
GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
- queue->upstream_size = upstream_size;
+
+ /* upstream_size can be negative but queue->upstream_size is unsigned.
+ * Prevent setting negative values to it (the query can return -1) */
+ if (upstream_size >= 0)
+ queue->upstream_size = upstream_size;
+ else
+ queue->upstream_size = 0;
}
}
/* FIXME - function will block when the range is not yet available */
ret = gst_queue2_create_read (queue, offset, length, buffer);
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_queue2_post_buffering (queue);
return ret;
case PROP_USE_BUFFERING:
queue->use_buffering = g_value_get_boolean (value);
if (!queue->use_buffering && queue->is_buffering) {
- GstMessage *msg = gst_message_new_buffering (GST_OBJECT_CAST (queue),
- 100);
-
GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
"posting 100%% message");
+ SET_PERCENT (queue, 100);
queue->is_buffering = FALSE;
- gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
}
if (queue->use_buffering) {
}
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_queue2_post_buffering (queue);
}
static void