X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=plugins%2Felements%2Fgstqueue2.c;h=51db8285ba4ba01976d2303c154310815e4ef247;hb=a87b4551a6090663a1714f263d4e20fe75eb46ca;hp=dd862a315b3aca7221ed29665619955d2babedfe;hpb=8d835ec400819140e5fc8532a90010c694e1212b;p=platform%2Fupstream%2Fgstreamer.git diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index dd862a3..51db828 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -25,6 +25,7 @@ /** * SECTION:element-queue2 + * @title: queue2 * * Data is queued until one of the limits specified by the * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or @@ -73,6 +74,14 @@ #include #endif +#ifdef __BIONIC__ /* Android */ +#undef lseek +#define lseek lseek64 +#undef off_t +#define off_t guint64 +#include +#endif + static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, @@ -105,9 +114,12 @@ enum #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */ #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */ #define DEFAULT_USE_BUFFERING FALSE +#define DEFAULT_USE_TAGS_BITRATE FALSE #define DEFAULT_USE_RATE_ESTIMATE TRUE #define DEFAULT_LOW_PERCENT 10 #define DEFAULT_HIGH_PERCENT 99 +#define DEFAULT_LOW_WATERMARK 0.01 +#define DEFAULT_HIGH_WATERMARK 0.99 #define DEFAULT_TEMP_REMOVE TRUE #define DEFAULT_RING_BUFFER_MAX_SIZE 0 @@ -121,16 +133,46 @@ enum PROP_MAX_SIZE_BYTES, PROP_MAX_SIZE_TIME, PROP_USE_BUFFERING, + PROP_USE_TAGS_BITRATE, PROP_USE_RATE_ESTIMATE, PROP_LOW_PERCENT, PROP_HIGH_PERCENT, + PROP_LOW_WATERMARK, + PROP_HIGH_WATERMARK, PROP_TEMP_TEMPLATE, PROP_TEMP_LOCATION, PROP_TEMP_REMOVE, PROP_RING_BUFFER_MAX_SIZE, + PROP_AVG_IN_RATE, PROP_LAST }; +/* Explanation for buffer levels and percentages: + * + * The buffering_level functions here return a value in a normalized range + * that specifies the queue's current fill level. The range goes from 0 to + * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range. + * + * This is not to be confused with the buffering_percent value, which is + * a *relative* quantity - relative to the low/high watermarks. + * buffering_percent = 0% means buffering_level is at the low watermark. + * buffering_percent = 100% means buffering_level is at the high watermark. + * buffering_percent is used for determining if the fill level has reached + * the high watermark, and for producing BUFFERING messages. This value + * always uses a 0..100 range (since it is a percentage). + * + * To avoid future confusions, whenever "buffering level" is mentioned, it + * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL + * range. Whenever "buffering_percent" is mentioned, it refers to the + * percentage value that is relative to the low/high watermark. */ + +/* Using a buffering level range of 0..1000000 to allow for a + * resolution in ppm (1 ppm = 0.0001%) */ +#define MAX_BUFFERING_LEVEL 1000000 + +/* How much 1% makes up in the buffer level range */ +#define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100) + #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \ l.buffers = 0; \ l.bytes = 0; \ @@ -237,8 +279,8 @@ static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent, static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue); static void gst_queue2_loop (GstPad * pad); -static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, - GstEvent * event); +static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad, + GstObject * parent, GstEvent * event); static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query); @@ -263,7 +305,8 @@ static gboolean gst_queue2_is_empty (GstQueue2 * queue); static gboolean gst_queue2_is_filled (GstQueue2 * queue); static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range); -static void update_in_rates (GstQueue2 * queue); +static void update_in_rates (GstQueue2 * queue, gboolean force); +static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue); static void gst_queue2_post_buffering (GstQueue2 * queue); typedef enum @@ -329,6 +372,12 @@ gst_queue2_class_init (GstQueue2Class * klass) "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE, + g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags", + "Use a bitrate from upstream tags to estimate buffer duration if not provided", + DEFAULT_USE_TAGS_BITRATE, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | + G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", "Estimate the bitrate of the stream to calculate time level", @@ -336,13 +385,25 @@ gst_queue2_class_init (GstQueue2Class * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, g_param_spec_int ("low-percent", "Low percent", - "Low threshold for buffering to start. Only used if use-buffering is True", - 0, 100, DEFAULT_LOW_PERCENT, + "Low threshold for buffering to start. Only used if use-buffering is True " + "(Deprecated: use low-watermark instead)", + 0, 100, DEFAULT_LOW_WATERMARK * 100, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, g_param_spec_int ("high-percent", "High percent", + "High threshold for buffering to finish. Only used if use-buffering is True " + "(Deprecated: use high-watermark instead)", + 0, 100, DEFAULT_HIGH_WATERMARK * 100, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK, + g_param_spec_double ("low-watermark", "Low watermark", + "Low threshold for buffering to start. Only used if use-buffering is True", + 0.0, 1.0, DEFAULT_LOW_WATERMARK, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK, + g_param_spec_double ("high-watermark", "High watermark", "High threshold for buffering to finish. Only used if use-buffering is True", - 0, 100, DEFAULT_HIGH_PERCENT, + 0.0, 1.0, DEFAULT_HIGH_WATERMARK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE, @@ -380,13 +441,21 @@ gst_queue2_class_init (GstQueue2Class * klass) 0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstQueue2:avg-in-rate + * + * The average input data rate. + */ + g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE, + g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)", + "Average input data rate (bytes/s)", + 0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /* set several parent class virtual functions */ gobject_class->finalize = gst_queue2_finalize; - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&srctemplate)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&sinktemplate)); + gst_element_class_add_static_pad_template (gstelement_class, &srctemplate); + gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); gst_element_class_set_static_metadata (gstelement_class, "Queue 2", "Generic", @@ -409,7 +478,7 @@ gst_queue2_init (GstQueue2 * queue) GST_DEBUG_FUNCPTR (gst_queue2_chain_list)); gst_pad_set_activatemode_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode)); - gst_pad_set_event_function (queue->sinkpad, + gst_pad_set_event_full_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event)); gst_pad_set_query_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query)); @@ -437,8 +506,8 @@ gst_queue2_init (GstQueue2 * queue) queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME; queue->use_buffering = DEFAULT_USE_BUFFERING; queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE; - queue->low_percent = DEFAULT_LOW_PERCENT; - queue->high_percent = DEFAULT_HIGH_PERCENT; + queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL; + queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL; gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); @@ -697,7 +766,7 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment, /* now configure the values, we use these to track timestamps on the * sinkpad. */ if (segment->format != GST_FORMAT_TIME) { - /* non-time format, pretent the current time segment is closed with a + /* non-time format, pretend the current time segment is closed with a * 0 start and unknown stop time. */ segment->format = GST_FORMAT_TIME; segment->start = 0; @@ -746,13 +815,21 @@ apply_gap (GstQueue2 * queue, GstEvent * event, /* take a buffer and update segment, updating the time level of the queue. */ static void apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, - gboolean is_sink) + guint64 size, gboolean is_sink) { GstClockTime duration, timestamp; - timestamp = GST_BUFFER_TIMESTAMP (buffer); + timestamp = GST_BUFFER_DTS_OR_PTS (buffer); duration = GST_BUFFER_DURATION (buffer); + /* If we have no duration, pick one from the bitrate if we can */ + if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) { + guint bitrate = + is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate; + if (bitrate) + duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate); + } + /* if no timestamp is set, assume it's continuous with the previous * time */ if (timestamp == GST_CLOCK_TIME_NONE) @@ -776,21 +853,38 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, update_time_level (queue); } +struct BufListData +{ + GstClockTime timestamp; + guint bitrate; +}; + static gboolean buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data) { - GstClockTime *timestamp = data; + struct BufListData *bld = data; + GstClockTime *timestamp = &bld->timestamp; + GstClockTime btime; - GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT + GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT " duration %" GST_TIME_FORMAT, idx, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)), + GST_TIME_ARGS (GST_BUFFER_PTS (*buf)), + GST_TIME_ARGS (GST_BUFFER_DTS (*buf)), GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); - if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf)) - *timestamp = GST_BUFFER_TIMESTAMP (*buf); + btime = GST_BUFFER_DTS_OR_PTS (*buf); + if (GST_CLOCK_TIME_IS_VALID (btime)) + *timestamp = btime; if (GST_BUFFER_DURATION_IS_VALID (*buf)) *timestamp += GST_BUFFER_DURATION (*buf); + else if (bld->bitrate != 0) { + guint64 size = gst_buffer_get_size (*buf); + + /* If we have no duration, pick one from the bitrate if we can */ + *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size); + } + GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp)); return TRUE; @@ -801,17 +895,25 @@ static void apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, GstSegment * segment, gboolean is_sink) { - GstClockTime timestamp; + struct BufListData bld; /* if no timestamp is set, assume it's continuous with the previous time */ - timestamp = segment->position; + bld.timestamp = segment->position; - gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp); + if (queue->use_tags_bitrate) { + if (is_sink) + bld.bitrate = queue->sink_tags_bitrate; + else + bld.bitrate = queue->src_tags_bitrate; + } else + bld.bitrate = 0; + + gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld); GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT, - GST_TIME_ARGS (timestamp)); + GST_TIME_ARGS (bld.timestamp)); - segment->position = timestamp; + segment->position = bld.timestamp; if (is_sink) queue->sink_tainted = TRUE; @@ -822,61 +924,104 @@ apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, update_time_level (queue); } +static inline gint +normalize_to_buffering_level (guint64 cur_level, guint64 max_level, + guint64 alt_max) +{ + guint64 p; + + if (max_level == 0) + return 0; + + if (alt_max > 0) + p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, + MIN (max_level, alt_max)); + else + p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level); + + return MIN (p, MAX_BUFFERING_LEVEL); +} + static gboolean -get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering, - gint * percent) +get_buffering_level (GstQueue2 * queue, gboolean * is_buffering, + gint * buffering_level) { - gint perc; + gint buflevel, buflevel2; - if (queue->high_percent <= 0) { - if (percent) - *percent = 100; + if (queue->high_watermark <= 0) { + if (buffering_level) + *buffering_level = MAX_BUFFERING_LEVEL; if (is_buffering) *is_buffering = FALSE; return FALSE; } -#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0) +#define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \ + normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max)) if (queue->is_eos) { /* on EOS we are always 100% full, we set the var here so that it we can * reuse the logic below to stop buffering */ - perc = 100; + buflevel = MAX_BUFFERING_LEVEL; GST_LOG_OBJECT (queue, "we are EOS"); } else { - /* figure out the percent we are filled, we take the max of all formats. */ + GST_LOG_OBJECT (queue, + "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u", + queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time), + queue->cur_level.buffers); + + /* figure out the buffering level we are filled, we take the max of all formats. */ if (!QUEUE_IS_USING_RING_BUFFER (queue)) { - perc = GET_PERCENT (bytes, 0); + buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0); } else { guint64 rb_size = queue->ring_buffer_max_size; - perc = GET_PERCENT (bytes, rb_size); + buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size); } - perc = MAX (perc, GET_PERCENT (time, 0)); - perc = MAX (perc, GET_PERCENT (buffers, 0)); + + buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0); + buflevel = MAX (buflevel, buflevel2); + + buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0); + buflevel = MAX (buflevel, buflevel2); /* also apply the rate estimate when we need to */ - if (queue->use_rate_estimate) - perc = MAX (perc, GET_PERCENT (rate_time, 0)); + if (queue->use_rate_estimate) { + buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0); + buflevel = MAX (buflevel, buflevel2); + } + + /* Don't get to 0% unless we're really empty */ + if (queue->cur_level.bytes > 0) + buflevel = MAX (1, buflevel); } -#undef GET_PERCENT +#undef GET_BUFFER_LEVEL_FOR_QUANTITY if (is_buffering) *is_buffering = queue->is_buffering; - /* scale to high percent so that it becomes the 100% mark */ - perc = perc * 100 / queue->high_percent; - /* clip */ - if (perc > 100) - perc = 100; + if (buffering_level) + *buffering_level = buflevel; - if (percent) - *percent = perc; - - GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering, - perc); + GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering, + buflevel); return TRUE; } +static gint +convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level) +{ + int percent; + + /* scale so that if buffering_level equals the high watermark, + * the percentage is 100% */ + percent = buffering_level * 100 / queue->high_watermark; + /* clip */ + if (percent > 100) + percent = 100; + + return percent; +} + static void get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode, gint * avg_in, gint * avg_out, gint64 * buffering_left) @@ -912,13 +1057,12 @@ get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode, } } -static void -gst_queue2_post_buffering (GstQueue2 * queue) +/* Called with the lock taken */ +static GstMessage * +gst_queue2_get_buffering_message (GstQueue2 * queue) { GstMessage *msg = NULL; - g_mutex_lock (&queue->buffering_post_lock); - GST_QUEUE2_MUTEX_LOCK (queue); if (queue->percent_changed) { gint percent = queue->buffering_percent; @@ -930,6 +1074,18 @@ gst_queue2_post_buffering (GstQueue2 * queue) gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in, queue->avg_out, queue->buffering_left); } + + return msg; +} + +static void +gst_queue2_post_buffering (GstQueue2 * queue) +{ + GstMessage *msg = NULL; + + g_mutex_lock (&queue->buffering_post_lock); + GST_QUEUE2_MUTEX_LOCK (queue); + msg = gst_queue2_get_buffering_message (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); if (msg != NULL) @@ -941,16 +1097,18 @@ gst_queue2_post_buffering (GstQueue2 * queue) static void update_buffering (GstQueue2 * queue) { - gint percent; + gint buffering_level, percent; /* Ensure the variables used to calculate buffering state are up-to-date. */ if (queue->current) update_cur_level (queue, queue->current); - update_in_rates (queue); + update_in_rates (queue, FALSE); - if (!get_buffering_percent (queue, NULL, &percent)) + if (!get_buffering_level (queue, NULL, &buffering_level)) return; + percent = convert_to_buffering_percent (queue, buffering_level); + if (queue->is_buffering) { /* if we were buffering see if we reached the high watermark */ if (percent >= 100) @@ -960,7 +1118,7 @@ update_buffering (GstQueue2 * queue) } else { /* we were not buffering, check if we need to start buffering if we drop * below the low threshold */ - if (percent < queue->low_percent) { + if (buffering_level < queue->low_watermark) { queue->is_buffering = TRUE; SET_PERCENT (queue, percent); } @@ -975,6 +1133,7 @@ reset_rate_timer (GstQueue2 * queue) queue->byte_in_rate = 0.0; queue->byte_in_period = 0; queue->byte_out_rate = 0.0; + queue->last_update_in_rates_elapsed = 0.0; queue->last_in_elapsed = 0.0; queue->last_out_elapsed = 0.0; queue->in_timer_started = FALSE; @@ -994,7 +1153,7 @@ reset_rate_timer (GstQueue2 * queue) #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0 static void -update_in_rates (GstQueue2 * queue) +update_in_rates (GstQueue2 * queue, gboolean force) { gdouble elapsed, period; gdouble byte_in_rate; @@ -1005,10 +1164,11 @@ update_in_rates (GstQueue2 * queue) return; } - elapsed = g_timer_elapsed (queue->in_timer, NULL); + queue->last_update_in_rates_elapsed = elapsed = + g_timer_elapsed (queue->in_timer, NULL); /* recalc after each interval. */ - if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) { + if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) { period = elapsed - queue->last_in_elapsed; GST_DEBUG_OBJECT (queue, @@ -1288,7 +1448,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, else buf = *buffer; - gst_buffer_map (buf, &info, GST_MAP_WRITE); + if (!gst_buffer_map (buf, &info, GST_MAP_WRITE)) + goto buffer_write_fail; data = info.data; GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length, @@ -1439,6 +1600,14 @@ read_error: gst_buffer_unref (buf); return ret; } +buffer_write_fail: + { + GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL), + ("Can't write to buffer")); + if (*buffer == NULL) + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } } /* should be called with QUEUE_LOCK */ @@ -1492,7 +1661,7 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue) GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template); - /* If temp_template was set, allocate a filename and open that filen */ + /* If temp_template was set, allocate a filename and open that file */ /* nothing to do */ if (queue->temp_template == NULL) @@ -1500,7 +1669,13 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue) /* make copy of the template, we don't want to change this */ name = g_strdup (queue->temp_template); + +#ifdef __BIONIC__ + fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR); +#else fd = g_mkstemp (name); +#endif + if (fd == -1) goto mkstemp_failed; @@ -1684,7 +1859,8 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) ring_buffer = queue->ring_buffer; rb_size = queue->ring_buffer_max_size; - gst_buffer_map (buffer, &info, GST_MAP_READ); + if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) + goto buffer_read_error; size = info.size; data = info.data; @@ -1736,6 +1912,9 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) guint64 range_data_start, range_data_end; GstQueue2Range *range_to_destroy = NULL; + if (range == queue->current) + goto next_range; + range_data_start = range->rb_offset; range_data_end = range->rb_writing_pos; @@ -1917,8 +2096,18 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) update_cur_level (queue, queue->current); /* update the buffering status */ - if (queue->use_buffering) + if (queue->use_buffering) { + GstMessage *msg; update_buffering (queue); + msg = gst_queue2_get_buffering_message (queue); + if (msg) { + GST_QUEUE2_MUTEX_UNLOCK (queue); + g_mutex_lock (&queue->buffering_post_lock); + gst_element_post_message (GST_ELEMENT_CAST (queue), msg); + g_mutex_unlock (&queue->buffering_post_lock); + GST_QUEUE2_MUTEX_LOCK (queue); + } + } GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")", queue->cur_level.bytes, QUEUE_MAX_BYTES (queue)); @@ -1960,6 +2149,12 @@ handle_error: gst_buffer_unmap (buffer, &info); return FALSE; } +buffer_read_error: + { + GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), + ("Can't read from buffer")); + return FALSE; + } } static gboolean @@ -2010,9 +2205,9 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, queue->bytes_in += size; /* apply new buffer to segment stats */ - apply_buffer (queue, buffer, &queue->sink_segment, TRUE); + apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE); /* update the byterate stats */ - update_in_rates (queue); + update_in_rates (queue, FALSE); if (!QUEUE_IS_USING_QUEUE (queue)) { /* FIXME - check return value? */ @@ -2029,7 +2224,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, /* add buffer to the statistics */ if (QUEUE_IS_USING_QUEUE (queue)) { - queue->cur_level.buffers++; + queue->cur_level.buffers += gst_buffer_list_length (buffer_list); queue->cur_level.bytes += size; } queue->bytes_in += size; @@ -2038,7 +2233,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE); /* update the byterate stats */ - update_in_rates (queue); + update_in_rates (queue, FALSE); if (!QUEUE_IS_USING_QUEUE (queue)) { gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue); @@ -2054,6 +2249,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, * filled and we can read all data from the queue. */ GST_DEBUG_OBJECT (queue, "we have EOS"); queue->is_eos = TRUE; + /* Force updating the input bitrate */ + update_in_rates (queue, TRUE); break; case GST_EVENT_SEGMENT: apply_segment (queue, event, &queue->sink_segment, TRUE); @@ -2134,10 +2331,11 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, /* ERRORS */ unexpected_event: { - g_warning - ("Unexpected event of kind %s can't be added in temp file of queue %s ", - gst_event_type_get_name (GST_EVENT_TYPE (item)), - GST_OBJECT_NAME (queue)); + gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM; + + GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: " + "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "", + GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item)); gst_event_unref (GST_EVENT_CAST (item)); return; } @@ -2181,7 +2379,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) } queue->bytes_out += size; - apply_buffer (queue, buffer, &queue->src_segment, FALSE); + apply_buffer (queue, buffer, &queue->src_segment, size, FALSE); /* update the byterate stats */ update_out_rates (queue); /* update the buffering */ @@ -2222,7 +2420,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) "retrieved buffer list %p from queue", buffer_list); if (QUEUE_IS_USING_QUEUE (queue)) { - queue->cur_level.buffers--; + queue->cur_level.buffers -= gst_buffer_list_length (buffer_list); queue->cur_level.bytes -= size; } queue->bytes_out += size; @@ -2256,7 +2454,7 @@ no_item: } } -static gboolean +static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { @@ -2280,14 +2478,17 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, /* unblock the loop and chain functions */ GST_QUEUE2_SIGNAL_ADD (queue); GST_QUEUE2_SIGNAL_DEL (queue); - queue->last_query = FALSE; - g_cond_signal (&queue->query_handled); GST_QUEUE2_MUTEX_UNLOCK (queue); /* make sure it pauses, this should happen since we sent * flush_start downstream. */ gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); + + GST_QUEUE2_MUTEX_LOCK (queue); + queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); + GST_QUEUE2_MUTEX_UNLOCK (queue); } else { GST_QUEUE2_MUTEX_LOCK (queue); /* flush the sink pad */ @@ -2316,6 +2517,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, queue->is_eos = FALSE; queue->unexpected = FALSE; queue->seeking = FALSE; + queue->src_tags_bitrate = queue->sink_tags_bitrate = 0; /* reset rate counters */ reset_rate_timer (queue); gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, @@ -2328,12 +2530,29 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, queue->unexpected = FALSE; queue->sinkresult = GST_FLOW_OK; queue->seeking = FALSE; + queue->src_tags_bitrate = queue->sink_tags_bitrate = 0; GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); } break; } + case GST_EVENT_TAG:{ + if (queue->use_tags_bitrate) { + GstTagList *tags; + guint bitrate; + + gst_event_parse_tag (event, &tags); + if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) || + gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) { + GST_QUEUE2_MUTEX_LOCK (queue); + queue->sink_tags_bitrate = bitrate; + GST_QUEUE2_MUTEX_UNLOCK (queue); + GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate); + } + } + /* Fall-through */ + } default: if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ @@ -2349,10 +2568,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { if (queue->srcresult == GST_FLOW_NOT_LINKED || queue->srcresult < GST_FLOW_EOS) { - GST_ELEMENT_ERROR (queue, STREAM, FAILED, - (_("Internal data flow error.")), - ("streaming task paused, reason %s (%d)", - gst_flow_get_name (queue->srcresult), queue->srcresult)); + GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult); } goto out_flow_error; } @@ -2369,7 +2585,9 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, } break; } - return ret; + if (ret == FALSE) + return GST_FLOW_ERROR; + return GST_FLOW_OK; /* ERRORS */ out_flushing: @@ -2377,14 +2595,14 @@ out_flushing: GST_DEBUG_OBJECT (queue, "refusing event, we are flushing"); GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); - return FALSE; + return GST_FLOW_FLUSHING; } out_eos: { GST_DEBUG_OBJECT (queue, "refusing event, we are EOS"); GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); - return FALSE; + return GST_FLOW_EOS; } out_flow_error: { @@ -2393,7 +2611,7 @@ out_flow_error: gst_flow_get_name (queue->srcresult)); GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); - return FALSE; + return queue->srcresult; } } @@ -2425,7 +2643,10 @@ gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent, GST_QUEUE2_ITEM_TYPE_QUERY); STATUS (queue, queue->sinkpad, "wait for QUERY"); - g_cond_wait (&queue->query_handled, &queue->qlock); + while (queue->sinkresult == GST_FLOW_OK && + queue->last_handled_query != query) + g_cond_wait (&queue->query_handled, &queue->qlock); + queue->last_handled_query = NULL; if (queue->sinkresult != GST_FLOW_OK) goto out_flushing; res = queue->last_query; @@ -2705,6 +2926,22 @@ next: GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); + if (type == GST_EVENT_TAG) { + if (queue->use_tags_bitrate) { + GstTagList *tags; + guint bitrate; + + gst_event_parse_tag (event, &tags); + if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) || + gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) { + GST_QUEUE2_MUTEX_LOCK (queue); + queue->src_tags_bitrate = bitrate; + GST_QUEUE2_MUTEX_UNLOCK (queue); + GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate); + } + } + } + gst_pad_push_event (queue->srcpad, event); /* if we're EOS, return EOS so that the task pauses. */ @@ -2737,6 +2974,7 @@ next: GstQuery *query = GST_QUERY_CAST (data); GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query); + queue->last_handled_query = query; queue->last_query = gst_pad_peer_query (queue->srcpad, query); GST_LOG_OBJECT (queue->srcpad, "Peered query"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, @@ -2812,16 +3050,20 @@ out_flushing: GstFlowReturn ret = queue->srcresult; gst_pad_pause_task (queue->srcpad); + if (ret == GST_FLOW_FLUSHING) { + gst_queue2_locked_flush (queue, FALSE, FALSE); + } else { + GST_QUEUE2_SIGNAL_DEL (queue); + queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); + } GST_QUEUE2_MUTEX_UNLOCK (queue); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pause task, reason: %s", gst_flow_get_name (queue->srcresult)); /* let app know about us giving up if upstream is not expected to do so */ /* EOS is already taken care of elsewhere */ if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { - GST_ELEMENT_ERROR (queue, STREAM, FAILED, - (_("Internal data flow error.")), - ("streaming task paused, reason %s (%d)", - gst_flow_get_name (ret), ret)); + GST_ELEMENT_FLOW_ERROR (queue, ret); gst_pad_push_event (queue->srcpad, gst_event_new_eos ()); } return; @@ -2918,9 +3160,13 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) switch (format) { case GST_FORMAT_BYTES: peer_pos -= queue->cur_level.bytes; + if (peer_pos < 0) /* Clamp result to 0 */ + peer_pos = 0; break; case GST_FORMAT_TIME: peer_pos -= queue->cur_level.time; + if (peer_pos < 0) /* Clamp result to 0 */ + peer_pos = 0; break; default: GST_WARNING_OBJECT (queue, "dropping query in %s format, don't " @@ -2951,7 +3197,8 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) GST_DEBUG_OBJECT (queue, "query buffering"); - get_buffering_percent (queue, &is_buffering, &percent); + get_buffering_level (queue, &is_buffering, &percent); + percent = convert_to_buffering_percent (queue, percent); gst_query_set_buffering_percent (query, is_buffering, percent); get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out, @@ -3120,7 +3367,13 @@ gst_queue2_update_upstream_size (GstQueue2 * queue) if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES, &upstream_size)) { GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size); - queue->upstream_size = upstream_size; + + /* upstream_size can be negative but queue->upstream_size is unsigned. + * Prevent setting negative values to it (the query can return -1) */ + if (upstream_size >= 0) + queue->upstream_size = upstream_size; + else + queue->upstream_size = 0; } } @@ -3211,9 +3464,6 @@ gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent, queue->srcresult = GST_FLOW_FLUSHING; queue->sinkresult = GST_FLOW_FLUSHING; GST_QUEUE2_SIGNAL_DEL (queue); - /* Unblock query handler */ - queue->last_query = FALSE; - g_cond_signal (&queue->query_handled); GST_QUEUE2_MUTEX_UNLOCK (queue); /* wait until it is unblocked and clean up */ @@ -3264,6 +3514,10 @@ gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); + + GST_QUEUE2_MUTEX_LOCK (queue); + gst_queue2_locked_flush (queue, FALSE, FALSE); + GST_QUEUE2_MUTEX_UNLOCK (queue); } return result; @@ -3499,14 +3753,32 @@ gst_queue2_set_property (GObject * object, update_buffering (queue); } break; + case PROP_USE_TAGS_BITRATE: + queue->use_tags_bitrate = g_value_get_boolean (value); + break; case PROP_USE_RATE_ESTIMATE: queue->use_rate_estimate = g_value_get_boolean (value); break; case PROP_LOW_PERCENT: - queue->low_percent = g_value_get_int (value); + queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR; + if (queue->is_buffering) + update_buffering (queue); break; case PROP_HIGH_PERCENT: - queue->high_percent = g_value_get_int (value); + queue->high_watermark = + g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR; + if (queue->is_buffering) + update_buffering (queue); + break; + case PROP_LOW_WATERMARK: + queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL; + if (queue->is_buffering) + update_buffering (queue); + break; + case PROP_HIGH_WATERMARK: + queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL; + if (queue->is_buffering) + update_buffering (queue); break; case PROP_TEMP_TEMPLATE: gst_queue2_set_temp_template (queue, g_value_get_string (value)); @@ -3556,14 +3828,25 @@ gst_queue2_get_property (GObject * object, case PROP_USE_BUFFERING: g_value_set_boolean (value, queue->use_buffering); break; + case PROP_USE_TAGS_BITRATE: + g_value_set_boolean (value, queue->use_tags_bitrate); + break; case PROP_USE_RATE_ESTIMATE: g_value_set_boolean (value, queue->use_rate_estimate); break; case PROP_LOW_PERCENT: - g_value_set_int (value, queue->low_percent); + g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR); break; case PROP_HIGH_PERCENT: - g_value_set_int (value, queue->high_percent); + g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR); + break; + case PROP_LOW_WATERMARK: + g_value_set_double (value, queue->low_watermark / + (gdouble) MAX_BUFFERING_LEVEL); + break; + case PROP_HIGH_WATERMARK: + g_value_set_double (value, queue->high_watermark / + (gdouble) MAX_BUFFERING_LEVEL); break; case PROP_TEMP_TEMPLATE: g_value_set_string (value, queue->temp_template); @@ -3577,6 +3860,19 @@ gst_queue2_get_property (GObject * object, case PROP_RING_BUFFER_MAX_SIZE: g_value_set_uint64 (value, queue->ring_buffer_max_size); break; + case PROP_AVG_IN_RATE: + { + gdouble in_rate = queue->byte_in_rate; + + /* During the first RATE_INTERVAL, byte_in_rate will not have been + * calculated, so calculate it here. */ + if (in_rate == 0.0 && queue->bytes_in + && queue->last_update_in_rates_elapsed > 0.0) + in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed; + + g_value_set_int64 (value, (gint64) in_rate); + break; + } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break;