/**
* SECTION:element-queue2
+ * @title: queue2
*
* Data is queued until one of the limits specified by the
* #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
#endif
#ifdef __BIONIC__ /* Android */
-#undef lseek
-#define lseek lseek64
-#undef off_t
-#define off_t guint64
#include <fcntl.h>
#endif
#define DEFAULT_HIGH_WATERMARK 0.99
#define DEFAULT_TEMP_REMOVE TRUE
#define DEFAULT_RING_BUFFER_MAX_SIZE 0
+#define DEFAULT_USE_BITRATE_QUERY TRUE
enum
{
PROP_TEMP_REMOVE,
PROP_RING_BUFFER_MAX_SIZE,
PROP_AVG_IN_RATE,
+ PROP_USE_BITRATE_QUERY,
+ PROP_BITRATE,
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ PROP_BUFFER_MODE,
+#endif
PROP_LAST
};
queue->max_level.time, \
(guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
queue->current->writing_pos - queue->current->max_reading_pos : \
- queue->queue.length))
+ gst_queue_array_get_length(queue->queue)))
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (&q->qlock); \
static void update_in_rates (GstQueue2 * queue, gboolean force);
static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue);
static void gst_queue2_post_buffering (GstQueue2 * queue);
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+static gboolean change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length);
+#endif
typedef enum
{
"Location to store temporary files in (Only read this property, "
"use temp-template to configure the name template)",
NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_USE_BITRATE_QUERY,
+ g_param_spec_boolean ("use-bitrate-query",
+ "Use bitrate from downstream query",
+ "Use a bitrate from a downstream query to estimate buffer duration if not provided",
+ DEFAULT_USE_BITRATE_QUERY,
+ G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+ G_PARAM_STATIC_STRINGS));
/**
* GstQueue2:temp-remove
"Average input data rate (bytes/s)",
0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstQueue2:bitrate
+ *
+ * The value used to convert between byte and time values for limiting
+ * the size of the queue. Values are taken from either the upstream tags
+ * or from the downstream bitrate query.
+ */
+ g_object_class_install_property (gobject_class, PROP_BITRATE,
+ g_param_spec_uint64 ("bitrate", "Bitrate (bits/s)",
+ "Conversion value between data size and time",
+ 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ /**
+ * GstQueue2:buffer-mode:
+ *
+ * Control the buffering mode used by the queue2.
+ */
+ g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
+ g_param_spec_enum ("buffer-mode", "Buffer Mode",
+ "Control the buffering algorithm in use",
+ GST_TYPE_BUFFERING_MODE, GST_BUFFERING_STREAM,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+#endif
+
/* set several parent class virtual functions */
gobject_class->finalize = gst_queue2_finalize;
g_cond_init (&queue->item_add);
queue->waiting_del = FALSE;
g_cond_init (&queue->item_del);
- g_queue_init (&queue->queue);
+ queue->queue = gst_queue_array_new_for_struct (sizeof (GstQueue2Item), 32);
g_cond_init (&queue->query_handled);
queue->last_query = FALSE;
g_mutex_init (&queue->buffering_post_lock);
queue->buffering_percent = 100;
+ queue->last_posted_buffering_percent = -1;
/* tempfile related */
queue->temp_template = NULL;
queue->ring_buffer = NULL;
queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
+ queue->use_bitrate_query = DEFAULT_USE_BITRATE_QUERY;
+
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ queue->mode = -1;
+#endif
GST_DEBUG_OBJECT (queue,
"initialized queue's not_empty & not_full conditions");
}
gst_queue2_finalize (GObject * object)
{
GstQueue2 *queue = GST_QUEUE2 (object);
+ GstQueue2Item *qitem;
GST_DEBUG_OBJECT (queue, "finalizing queue");
- while (!g_queue_is_empty (&queue->queue)) {
- GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
-
+ while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
gst_mini_object_unref (qitem->item);
- g_slice_free (GstQueue2Item, qitem);
}
+ gst_queue_array_free (queue->queue);
queue->last_query = FALSE;
- g_queue_clear (&queue->queue);
g_mutex_clear (&queue->qlock);
g_mutex_clear (&queue->buffering_post_lock);
g_cond_clear (&queue->item_add);
g_slice_free_chain (GstQueue2Range, queue->ranges, next);
queue->ranges = NULL;
queue->current = NULL;
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ queue->read = NULL;
+#endif
}
/* find a range that contains @offset or NULL when nothing does */
/* get rid of all the current ranges */
clean_ranges (queue);
/* make a range for offset 0 */
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ queue->current = queue->read = add_range (queue, 0, TRUE);
+#else
queue->current = add_range (queue, 0, TRUE);
+#endif
}
/* calculate the diff between running time on the sink and src of the queue.
if (segment->format == GST_FORMAT_BYTES) {
if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
/* start is where we'll be getting from and as such writing next */
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ queue->current = queue->read = add_range (queue, segment->start, TRUE);
+#else
queue->current = add_range (queue, segment->start, TRUE);
+#endif
}
}
}
}
+static void
+query_downstream_bitrate (GstQueue2 * queue)
+{
+ GstQuery *query = gst_query_new_bitrate ();
+ guint downstream_bitrate = 0;
+
+ if (gst_pad_peer_query (queue->srcpad, query)) {
+ gst_query_parse_bitrate (query, &downstream_bitrate);
+ GST_DEBUG_OBJECT (queue, "Got bitrate of %u from downstream",
+ downstream_bitrate);
+ } else {
+ GST_DEBUG_OBJECT (queue, "Failed to query bitrate from downstream");
+ }
+
+ gst_query_unref (query);
+
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ queue->downstream_bitrate = downstream_bitrate;
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+ g_object_notify (G_OBJECT (queue), "bitrate");
+}
+
/* take a buffer and update segment, updating the time level of the queue. */
static void
apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
duration = GST_BUFFER_DURATION (buffer);
/* If we have no duration, pick one from the bitrate if we can */
- if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
- guint bitrate =
- is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
- if (bitrate)
- duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
+ if (duration == GST_CLOCK_TIME_NONE) {
+ if (queue->use_tags_bitrate) {
+ guint bitrate =
+ is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
+ if (bitrate)
+ duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
+ }
+ if (duration == GST_CLOCK_TIME_NONE && !is_sink && queue->use_bitrate_query) {
+ if (queue->downstream_bitrate > 0) {
+ duration =
+ gst_util_uint64_scale (size, 8 * GST_SECOND,
+ queue->downstream_bitrate);
+
+ GST_LOG_OBJECT (queue, "got bitrate %u resulting in estimated "
+ "duration %" GST_TIME_FORMAT, queue->downstream_bitrate,
+ GST_TIME_ARGS (duration));
+ }
+ }
}
/* if no timestamp is set, assume it's continuous with the previous
/* if no timestamp is set, assume it's continuous with the previous time */
bld.timestamp = segment->position;
+ bld.bitrate = 0;
if (queue->use_tags_bitrate) {
if (is_sink)
bld.bitrate = queue->sink_tags_bitrate;
else
bld.bitrate = queue->src_tags_bitrate;
- } else
- bld.bitrate = 0;
+ }
+ if (!is_sink && bld.bitrate == 0 && queue->use_bitrate_query) {
+ bld.bitrate = queue->downstream_bitrate;
+ }
gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
#define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
- if (queue->is_eos) {
- /* on EOS we are always 100% full, we set the var here so that it we can
- * reuse the logic below to stop buffering */
+ if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
+ /* on EOS and NOT_LINKED we are always 100% full, we set the var
+ * here so that we can reuse the logic below to stop buffering */
buflevel = MAX_BUFFERING_LEVEL;
- GST_LOG_OBJECT (queue, "we are EOS");
+ GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
} else {
GST_LOG_OBJECT (queue,
- "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
- queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
- queue->cur_level.buffers);
+ "Cur level bytes/time/rate-time/buffers %u/%" GST_TIME_FORMAT "/%"
+ GST_TIME_FORMAT "/%u", queue->cur_level.bytes,
+ GST_TIME_ARGS (queue->cur_level.time),
+ GST_TIME_ARGS (queue->cur_level.rate_time), queue->cur_level.buffers);
/* figure out the buffering level we are filled, we take the max of all formats. */
if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
gint * avg_in, gint * avg_out, gint64 * buffering_left)
{
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ if (queue->mode != -1) {
+ *mode = queue->mode;
+ } else {
+ if (mode) {
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
+ if (QUEUE_IS_USING_RING_BUFFER (queue))
+ *mode = GST_BUFFERING_TIMESHIFT;
+ else
+ *mode = GST_BUFFERING_DOWNLOAD;
+ } else {
+ *mode = GST_BUFFERING_STREAM;
+ }
+ }
+ }
+#else
if (mode) {
if (!QUEUE_IS_USING_QUEUE (queue)) {
if (QUEUE_IS_USING_RING_BUFFER (queue))
*mode = GST_BUFFERING_STREAM;
}
}
+#endif
if (avg_in)
*avg_in = queue->byte_in_rate;
gst_queue2_get_buffering_message (GstQueue2 * queue)
{
GstMessage *msg = NULL;
-
if (queue->percent_changed) {
- gint percent = queue->buffering_percent;
-
+ /* Don't change the buffering level if the sinkpad is waiting for
+ * space to become available. This prevents the situation where,
+ * upstream is pushing buffers larger than our limits so only 1 buffer
+ * is ever in the queue at a time.
+ * Changing the level causes a buffering message to be posted saying that
+ * we are buffering which the application may pause to wait for another
+ * 100% buffering message which would be posted very soon after the
+ * waiting sink thread adds it's buffer to the queue */
+ /* FIXME: This situation above can still occur later if
+ * the sink pad is waiting to push a serialized event into the queue and
+ * the queue becomes empty for a short period of time. */
+ if (!queue->waiting_del
+ && queue->last_posted_buffering_percent != queue->buffering_percent) {
+ gint percent = queue->buffering_percent;
+
+ GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
+ msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
+
+ gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
+ queue->avg_out, queue->buffering_left);
+
+ queue->last_posted_buffering_percent = percent;
+ }
queue->percent_changed = FALSE;
-
- GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
- msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
-
- gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
- queue->avg_out, queue->buffering_left);
}
return msg;
update_buffering (GstQueue2 * queue)
{
gint buffering_level, percent;
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ GstQueue2Range *range;
+
+ if (queue->read)
+ range = queue->read;
+ else
+ range = queue->current;
+#endif
/* Ensure the variables used to calculate buffering state are up-to-date. */
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if (range)
+ update_cur_level (queue, range);
+#else
if (queue->current)
update_cur_level (queue, queue->current);
+#endif
update_in_rates (queue, FALSE);
if (!get_buffering_level (queue, NULL, &buffering_level))
queue->bytes_in = 0;
}
- if (queue->byte_in_rate > 0.0) {
+ if (queue->use_bitrate_query && queue->downstream_bitrate > 0) {
+ queue->cur_level.rate_time =
+ gst_util_uint64_scale (8 * queue->cur_level.bytes, GST_SECOND,
+ queue->downstream_bitrate);
+ GST_LOG_OBJECT (queue,
+ "got bitrate %u with byte level %u resulting in time %"
+ GST_TIME_FORMAT, queue->downstream_bitrate, queue->cur_level.bytes,
+ GST_TIME_ARGS (queue->cur_level.rate_time));
+ } else if (queue->byte_in_rate > 0.0) {
queue->cur_level.rate_time =
queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
}
* cause data to be written to the wrong offset in the file or ring buffer.
* We still do the add_range call to switch the current range to the
* requested range, or create one if one doesn't exist yet. */
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ queue->current = queue->read = add_range (queue, offset, FALSE);
+#else
queue->current = add_range (queue, offset, FALSE);
+#endif
}
return res;
return threshold;
}
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+/* check the buffered data size, not to change range repeatedly.
+ * changing range cause the new http connection and it makes buffering state frequently. */
+static gboolean
+change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length)
+{
+ guint64 curr_level = 0;
+ guint64 curr_min_level = 0;
+
+ if (queue->current->writing_pos > queue->current->reading_pos)
+ curr_level = queue->current->writing_pos - queue->current->reading_pos;
+
+ /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
+ curr_min_level = MIN((queue->max_level.bytes*0.05), get_seek_threshold(queue));
+
+ GST_DEBUG_OBJECT(queue, " curr[w%"G_GUINT64_FORMAT", r%"G_GUINT64_FORMAT
+ ", d%"G_GUINT64_FORMAT", m%"G_GUINT64_FORMAT
+ "] u[%"G_GUINT64_FORMAT"][EOS:%d]",
+ queue->current->writing_pos, queue->current->reading_pos,
+ curr_level, curr_min_level,
+ queue->upstream_size, queue->is_eos);
+
+ /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
+ if ((queue->is_eos) ||
+ (queue->upstream_size > 0 && queue->current->writing_pos >= queue->upstream_size) ||
+ (curr_level >= curr_min_level)) {
+ GST_DEBUG_OBJECT (queue, "Range Switching");
+ return TRUE;
+ } else {
+ GST_DEBUG_OBJECT (queue, "keep receiving data without range switching.");
+ return FALSE;
+ }
+}
+#endif
+
/* see if there is enough data in the file to read a full buffer */
static gboolean
gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
{
GstQueue2Range *range;
-
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ gboolean need_to_seek = TRUE;
+#endif
GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
offset, length);
if ((range = find_range (queue, offset))) {
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ queue->read = range;
+#endif
if (queue->current != range) {
GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if (QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER(queue))
+ need_to_seek = change_current_range(queue, range, offset, length);
+
+ if (need_to_seek == TRUE)
+ perform_seek_to_offset (queue, range->writing_pos);
+#else
perform_seek_to_offset (queue, range->writing_pos);
+#endif
}
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
} else {
GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
" len %u", offset, length);
+
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ queue->read = queue->current;
+#endif
+
/* we don't have the range, see how far away we are */
if (!queue->is_eos && queue->current) {
guint64 threshold = get_seek_threshold (queue);
guint64 level;
/* calculate how far away the offset is */
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if (queue->read->writing_pos > rpos)
+ level = queue->read->writing_pos - rpos;
+#else
if (queue->current->writing_pos > rpos)
level = queue->current->writing_pos - rpos;
+#endif
else
level = 0;
GST_DEBUG_OBJECT (queue,
"reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ rpos, queue->read->writing_pos, level, max_size);
+#else
rpos, queue->current->writing_pos, level, max_size);
+#endif
if (level >= max_size) {
/* we don't have the data but if we have a ring buffer that is full, we
if (read_length == 0) {
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ GST_DEBUG_OBJECT (queue,
+ "update current position [%" G_GUINT64_FORMAT "-%"
+ G_GUINT64_FORMAT "]", rpos, queue->read->max_reading_pos);
+ update_cur_pos (queue, queue->read, rpos);
+#else
GST_DEBUG_OBJECT (queue,
"update current position [%" G_GUINT64_FORMAT "-%"
G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
update_cur_pos (queue, queue->current, rpos);
+#endif
GST_QUEUE2_SIGNAL_DEL (queue);
}
}
/* set range reading_pos to actual reading position for this read */
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ queue->read->reading_pos = rpos;
+#else
queue->current->reading_pos = rpos;
+#endif
/* configure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ (queue->read->rb_offset + (rpos -
+ queue->read->offset)) % rb_size;
+#else
(queue->current->rb_offset + (rpos -
queue->current->offset)) % rb_size;
+#endif
if (file_offset + read_length > rb_size) {
block_length = rb_size - file_offset;
} else {
block_length = read_length;
remaining -= read_return;
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ rpos = (queue->read->reading_pos += read_return);
+ update_cur_pos (queue, queue->read, queue->read->reading_pos);
+#else
rpos = (queue->current->reading_pos += read_return);
update_cur_pos (queue, queue->current, queue->current->reading_pos);
+#endif
}
GST_QUEUE2_SIGNAL_DEL (queue);
GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
GstBuffer *buffer = NULL;
guint64 reading_pos;
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ reading_pos = queue->read->reading_pos;
+#else
reading_pos = queue->current->reading_pos;
+#endif
ret =
gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
gst_queue2_flush_temp_file (queue);
init_ranges (queue);
} else {
- while (!g_queue_is_empty (&queue->queue)) {
- GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
+ GstQueue2Item *qitem;
+ while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
&& GST_EVENT_IS_STICKY (qitem->item)
&& GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
data when flushing */
if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
gst_mini_object_unref (qitem->item);
- g_slice_free (GstQueue2Item, qitem);
}
}
queue->last_query = FALSE;
guint64 range_data_start, range_data_end;
GstQueue2Range *range_to_destroy = NULL;
+#ifndef TIZEN_FEATURE_QUEUE2_MODIFICATION
if (range == queue->current)
goto next_range;
+#endif
range_data_start = range->rb_offset;
range_data_end = range->rb_writing_pos;
goto next_range;
if (new_writing_pos > range_data_start) {
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if (new_writing_pos >= range_data_end && range != queue->current) {
+#else
if (new_writing_pos >= range_data_end) {
+#endif
GST_DEBUG_OBJECT (queue,
"Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
G_GUINT64_FORMAT, range->offset, range->writing_pos);
if (new_wpos_virt <= range_data_start)
goto next_range;
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end
+ && range != queue->current) {
+#else
if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
+#endif
GST_DEBUG_OBJECT (queue,
"Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
G_GUINT64_FORMAT, range->offset, range->writing_pos);
queue->current->rb_writing_pos = writing_pos = new_writing_pos;
} else {
queue->current->writing_pos = writing_pos = new_writing_pos;
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if (do_seek) {
+ if (queue->upstream_size == 0 || new_writing_pos < queue->upstream_size)
+ perform_seek_to_offset (queue, new_writing_pos);
+ else
+ queue->is_eos = TRUE;
+ }
+#endif
}
+
+#ifndef TIZEN_FEATURE_QUEUE2_MODIFICATION
if (do_seek)
perform_seek_to_offset (queue, new_writing_pos);
+#endif
update_cur_level (queue, queue->current);
return TRUE;
}
-static gboolean
-buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
-{
- guint *p_size = data;
- gsize buf_size;
-
- buf_size = gst_buffer_get_size (*buf);
- GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
- *p_size += buf_size;
- return TRUE;
-}
-
/* enqueue an item an update the level stats */
static void
gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
}
} else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
GstBufferList *buffer_list;
- guint size = 0;
+ guint size;
buffer_list = GST_BUFFER_LIST_CAST (item);
- gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+ size = gst_buffer_list_calculate_size (buffer_list);
GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
/* add buffer to the statistics */
update_buffering (queue);
if (QUEUE_IS_USING_QUEUE (queue)) {
- GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
- qitem->type = item_type;
- qitem->item = item;
- g_queue_push_tail (&queue->queue, qitem);
+ GstQueue2Item qitem;
+
+ qitem.type = item_type;
+ qitem.item = item;
+ gst_queue_array_push_tail_struct (queue->queue, &qitem);
} else {
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
}
if (!QUEUE_IS_USING_QUEUE (queue)) {
item = gst_queue2_read_item_from_file (queue);
} else {
- GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
+ GstQueue2Item *qitem = gst_queue_array_pop_head_struct (queue->queue);
if (qitem == NULL)
goto no_item;
item = qitem->item;
- g_slice_free (GstQueue2Item, qitem);
}
if (item == NULL)
}
} else if (GST_IS_BUFFER_LIST (item)) {
GstBufferList *buffer_list;
- guint size = 0;
+ guint size;
buffer_list = GST_BUFFER_LIST_CAST (item);
- gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+ size = gst_buffer_list_calculate_size (buffer_list);
*item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
queue = GST_QUEUE2 (parent);
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+ GST_EVENT_TYPE_NAME (event));
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
/* forward event */
ret = gst_pad_push_event (queue->srcpad, event);
}
case GST_EVENT_FLUSH_STOP:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
-
if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
/* forward event */
ret = gst_pad_push_event (queue->srcpad, event);
gst_event_unref (event);
}
+ g_object_notify (G_OBJECT (queue), "bitrate");
break;
}
case GST_EVENT_TAG:{
queue->sink_tags_bitrate = bitrate;
GST_QUEUE2_MUTEX_UNLOCK (queue);
GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
+ g_object_notify (G_OBJECT (queue), "bitrate");
}
}
/* Fall-through */
}
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
+ gboolean bitrate_changed = TRUE;
/* serialized events go in the queue */
+
+ /* STREAM_START and SEGMENT reset the EOS status of a
+ * pad. Change the cached sinkpad flow result accordingly */
+ if (queue->sinkresult == GST_FLOW_EOS
+ && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+ || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+ queue->sinkresult = GST_FLOW_OK;
+
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
if (queue->srcresult != GST_FLOW_OK) {
/* Errors in sticky event pushing are no problem and ignored here
goto out_flow_error;
}
}
- /* refuse more events on EOS */
- if (queue->is_eos)
- goto out_eos;
+
+ /* refuse more events on EOS unless they unset the EOS status */
+ if (queue->is_eos) {
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_STREAM_START:
+ case GST_EVENT_SEGMENT:
+ /* Restart the loop */
+ if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+ queue->srcresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
+ queue->seeking = FALSE;
+ queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
+ /* reset rate counters */
+ reset_rate_timer (queue);
+ gst_pad_start_task (queue->srcpad,
+ (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
+ } else {
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
+ queue->seeking = FALSE;
+ queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
+ }
+ bitrate_changed = TRUE;
+
+ break;
+ default:
+ goto out_eos;
+ }
+ }
+
gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_queue2_post_buffering (queue);
+ if (bitrate_changed)
+ g_object_notify (G_OBJECT (queue), "bitrate");
} else {
- /* non-serialized events are passed upstream. */
+ /* non-serialized events are passed downstream. */
ret = gst_pad_push_event (queue->srcpad, event);
}
break;
/* ERRORS */
out_flushing:
{
- GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
+ GstFlowReturn ret = queue->sinkresult;
+ GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
+ gst_flow_get_name (ret));
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
- return GST_FLOW_FLUSHING;
+ return ret;
}
out_eos:
{
switch (GST_QUERY_TYPE (query)) {
default:
if (GST_QUERY_IS_SERIALIZED (query)) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "received query %" GST_PTR_FORMAT, query);
/* serialized events go in the queue. We need to be certain that we
* don't cause deadlocks waiting for the query return value. We check if
* the queue is empty (nothing is blocking downstream and the query can
GST_QUEUE2_ITEM_TYPE_QUERY);
STATUS (queue, queue->sinkpad, "wait for QUERY");
- g_cond_wait (&queue->query_handled, &queue->qlock);
+ while (queue->sinkresult == GST_FLOW_OK &&
+ queue->last_handled_query != query)
+ g_cond_wait (&queue->query_handled, &queue->qlock);
+ queue->last_handled_query = NULL;
if (queue->sinkresult != GST_FLOW_OK)
goto out_flushing;
res = queue->last_query;
/* ERRORS */
out_flushing:
{
- GST_DEBUG_OBJECT (queue, "refusing query, we are flushing");
+ GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
+ gst_flow_get_name (queue->sinkresult));
GST_QUEUE2_MUTEX_UNLOCK (queue);
return FALSE;
}
if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
return queue->current->writing_pos <= queue->current->max_reading_pos;
} else {
- if (queue->queue.length == 0)
+ if (gst_queue_array_get_length (queue->queue) == 0)
return TRUE;
}
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
- if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+ if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+ || type == GST_EVENT_STREAM_START) {
/* we found a pushable item in the queue, push it out */
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
static GstFlowReturn
gst_queue2_push_one (GstQueue2 * queue)
{
- GstFlowReturn result = queue->srcresult;
+ GstFlowReturn result;
GstMiniObject *data;
GstQueue2ItemType item_type;
goto no_item;
next:
+ result = queue->srcresult;
STATUS (queue, queue->srcpad, "We have something dequeud");
g_atomic_int_set (&queue->downstream_may_block,
item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
queue->src_tags_bitrate = bitrate;
GST_QUEUE2_MUTEX_UNLOCK (queue);
GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
+ g_object_notify (G_OBJECT (queue), "bitrate");
}
}
}
GstQuery *query = GST_QUERY_CAST (data);
GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
+ queue->last_handled_query = query;
queue->last_query = gst_pad_peer_query (queue->srcpad, query);
GST_LOG_OBJECT (queue->srcpad, "Peered query");
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
}
out_flushing:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
- return GST_FLOW_FLUSHING;
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
+ gst_flow_get_name (queue->srcresult));
+ return queue->srcresult;
}
}
gst_queue2_loop (GstPad * pad)
{
GstQueue2 *queue;
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ GstFlowReturn ret = GST_FLOW_OK;
+#else
GstFlowReturn ret;
+#endif
queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
if (started)
g_timer_continue (queue->out_timer);
}
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ /* if buffering mode is GST_BUFFERING_LIVE, it is rtsp streaming */
+ if (!((queue->mode == GST_BUFFERING_LIVE) && queue->is_buffering)) {
+ ret = gst_queue2_push_one (queue);
+ }
+#else
ret = gst_queue2_push_one (queue);
+#endif
queue->srcresult = ret;
queue->sinkresult = ret;
if (ret != GST_FLOW_OK)
GST_QUEUE2_MUTEX_UNLOCK (queue);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (queue->srcresult));
+ /* Recalculate buffering levels before stopping since the source flow
+ * might cause a different buffering level (like NOT_LINKED making
+ * the queue appear as full) */
+ if (queue->use_buffering)
+ update_buffering (queue);
+ gst_queue2_post_buffering (queue);
/* let app know about us giving up if upstream is not expected to do so */
/* EOS is already taken care of elsewhere */
if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
NULL);
}
+
}
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ /* force a new bitrate query to be performed */
+ query_downstream_bitrate (queue);
+
res = gst_pad_push_event (queue->sinkpad, event);
break;
default:
range_stop = 0;
break;
}
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ range_start =
+ gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+ queued_ranges->reading_pos, duration);
+#else
range_start =
gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
queued_ranges->offset, duration);
+#endif
range_stop =
gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
queued_ranges->writing_pos, duration);
break;
case GST_FORMAT_BYTES:
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ range_start = queued_ranges->reading_pos;
+#else
range_start = queued_ranges->offset;
+#endif
range_stop = queued_ranges->writing_pos;
break;
default:
{
gboolean pull_mode;
GstSchedulingFlags flags = 0;
-
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if ((!QUEUE_IS_USING_QUEUE (queue)) && !gst_pad_peer_query (queue->sinkpad, query))
+#else
if (!gst_pad_peer_query (queue->sinkpad, query))
+#endif
goto peer_failed;
gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ if (!(flags & GST_SCHEDULING_FLAG_SEEKABLE)) {
+ GST_DEBUG_OBJECT(queue, "peer can support only push mode");
+ gst_query_set_scheduling (query, flags, 0, -1, 0);
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
+ break;
+ }
+#endif
+ GST_DEBUG_OBJECT(queue, "peer can support pull mode");
+
/* we can operate in pull mode when we are using a tempfile */
pull_mode = !QUEUE_IS_USING_QUEUE (queue);
{
ret = queue->srcresult;
- GST_DEBUG_OBJECT (queue, "we are flushing");
+ GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
GST_QUEUE2_MUTEX_UNLOCK (queue);
return ret;
}
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
+
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ gst_queue2_locked_flush (queue, FALSE, FALSE);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
}
return result;
queue->starting_segment = NULL;
gst_event_replace (&queue->stream_start_event, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue);
+ query_downstream_bitrate (queue);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+#ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ queue->is_buffering = FALSE;
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+#endif
break;
default:
break;
case PROP_RING_BUFFER_MAX_SIZE:
queue->ring_buffer_max_size = g_value_get_uint64 (value);
break;
+ case PROP_USE_BITRATE_QUERY:
+ queue->use_bitrate_query = g_value_get_boolean (value);
+ break;
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ case PROP_BUFFER_MODE:
+ queue->mode = g_value_get_enum (value);
+ break;
+#endif
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
g_value_set_int64 (value, (gint64) in_rate);
break;
}
+ case PROP_USE_BITRATE_QUERY:
+ g_value_set_boolean (value, queue->use_bitrate_query);
+ break;
+ case PROP_BITRATE:{
+ guint64 bitrate = 0;
+ if (bitrate == 0 && queue->use_tags_bitrate) {
+ if (queue->sink_tags_bitrate > 0)
+ bitrate = queue->sink_tags_bitrate;
+ else if (queue->src_tags_bitrate)
+ bitrate = queue->src_tags_bitrate;
+ }
+ if (bitrate == 0 && queue->use_bitrate_query) {
+ bitrate = queue->downstream_bitrate;
+ }
+ g_value_set_uint64 (value, (guint64) bitrate);
+ break;
+ }
+#ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
+ case PROP_BUFFER_MODE:
+ g_value_set_enum (value, queue->mode);
+ break;
+#endif
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;