X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=plugins%2Felements%2Fgstqueue2.c;h=51db8285ba4ba01976d2303c154310815e4ef247;hb=a87b4551a6090663a1714f263d4e20fe75eb46ca;hp=c1f1ce662faac40fc44d1099d0fdc02925eeca1e;hpb=e692993e79d31c29481f89f25638ccbe9cc46e14;p=platform%2Fupstream%2Fgstreamer.git diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index c1f1ce6..51db828 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -25,6 +25,7 @@ /** * SECTION:element-queue2 + * @title: queue2 * * Data is queued until one of the limits specified by the * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or @@ -48,8 +49,6 @@ * * The temp-location property will be used to notify the application of the * allocated filename. - * - * Last reviewed on 2009-07-10 (0.10.24) */ #ifdef HAVE_CONFIG_H @@ -75,6 +74,14 @@ #include #endif +#ifdef __BIONIC__ /* Android */ +#undef lseek +#define lseek lseek64 +#undef off_t +#define off_t guint64 +#include +#endif + static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, @@ -107,9 +114,12 @@ enum #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */ #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */ #define DEFAULT_USE_BUFFERING FALSE +#define DEFAULT_USE_TAGS_BITRATE FALSE #define DEFAULT_USE_RATE_ESTIMATE TRUE #define DEFAULT_LOW_PERCENT 10 #define DEFAULT_HIGH_PERCENT 99 +#define DEFAULT_LOW_WATERMARK 0.01 +#define DEFAULT_HIGH_WATERMARK 0.99 #define DEFAULT_TEMP_REMOVE TRUE #define DEFAULT_RING_BUFFER_MAX_SIZE 0 @@ -123,16 +133,46 @@ enum PROP_MAX_SIZE_BYTES, PROP_MAX_SIZE_TIME, PROP_USE_BUFFERING, + PROP_USE_TAGS_BITRATE, PROP_USE_RATE_ESTIMATE, PROP_LOW_PERCENT, PROP_HIGH_PERCENT, + PROP_LOW_WATERMARK, + PROP_HIGH_WATERMARK, PROP_TEMP_TEMPLATE, PROP_TEMP_LOCATION, PROP_TEMP_REMOVE, PROP_RING_BUFFER_MAX_SIZE, + PROP_AVG_IN_RATE, PROP_LAST }; +/* Explanation for buffer levels and percentages: + * + * The buffering_level functions here return a value in a normalized range + * that specifies the queue's current fill level. The range goes from 0 to + * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range. + * + * This is not to be confused with the buffering_percent value, which is + * a *relative* quantity - relative to the low/high watermarks. + * buffering_percent = 0% means buffering_level is at the low watermark. + * buffering_percent = 100% means buffering_level is at the high watermark. + * buffering_percent is used for determining if the fill level has reached + * the high watermark, and for producing BUFFERING messages. This value + * always uses a 0..100 range (since it is a percentage). + * + * To avoid future confusions, whenever "buffering level" is mentioned, it + * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL + * range. Whenever "buffering_percent" is mentioned, it refers to the + * percentage value that is relative to the low/high watermark. */ + +/* Using a buffering level range of 0..1000000 to allow for a + * resolution in ppm (1 ppm = 0.0001%) */ +#define MAX_BUFFERING_LEVEL 1000000 + +/* How much 1% makes up in the buffer level range */ +#define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100) + #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \ l.buffers = 0; \ l.bytes = 0; \ @@ -208,6 +248,16 @@ enum } \ } G_STMT_END +#define SET_PERCENT(q, perc) G_STMT_START { \ + if (perc != q->buffering_percent) { \ + q->buffering_percent = perc; \ + q->percent_changed = TRUE; \ + GST_DEBUG_OBJECT (q, "buffering %d percent", perc); \ + get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out, \ + &q->buffering_left); \ + } \ +} G_STMT_END + #define _do_init \ GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \ GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \ @@ -229,8 +279,8 @@ static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent, static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue); static void gst_queue2_loop (GstPad * pad); -static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, - GstEvent * event); +static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad, + GstObject * parent, GstEvent * event); static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query); @@ -255,6 +305,9 @@ static gboolean gst_queue2_is_empty (GstQueue2 * queue); static gboolean gst_queue2_is_filled (GstQueue2 * queue); static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range); +static void update_in_rates (GstQueue2 * queue, gboolean force); +static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue); +static void gst_queue2_post_buffering (GstQueue2 * queue); typedef enum { @@ -265,6 +318,12 @@ typedef enum GST_QUEUE2_ITEM_TYPE_QUERY } GstQueue2ItemType; +typedef struct +{ + GstQueue2ItemType type; + GstMiniObject *item; +} GstQueue2Item; + /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */ static void @@ -294,21 +353,31 @@ gst_queue2_class_init (GstQueue2Class * klass) g_param_spec_uint ("max-size-bytes", "Max. size (kB)", "Max. amount of data in the queue (bytes, 0=disable)", 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | + G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS, g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | + G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, g_param_spec_uint64 ("max-size-time", "Max. size (ns)", "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64, - DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | + G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USE_BUFFERING, g_param_spec_boolean ("use-buffering", "Use buffering", "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", - DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | + G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USE_TAGS_BITRATE, + g_param_spec_boolean ("use-tags-bitrate", "Use bitrate from tags", + "Use a bitrate from upstream tags to estimate buffer duration if not provided", + DEFAULT_USE_TAGS_BITRATE, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | + G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", "Estimate the bitrate of the stream to calculate time level", @@ -316,13 +385,25 @@ gst_queue2_class_init (GstQueue2Class * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, g_param_spec_int ("low-percent", "Low percent", - "Low threshold for buffering to start. Only used if use-buffering is True", - 0, 100, DEFAULT_LOW_PERCENT, + "Low threshold for buffering to start. Only used if use-buffering is True " + "(Deprecated: use low-watermark instead)", + 0, 100, DEFAULT_LOW_WATERMARK * 100, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, g_param_spec_int ("high-percent", "High percent", + "High threshold for buffering to finish. Only used if use-buffering is True " + "(Deprecated: use high-watermark instead)", + 0, 100, DEFAULT_HIGH_WATERMARK * 100, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK, + g_param_spec_double ("low-watermark", "Low watermark", + "Low threshold for buffering to start. Only used if use-buffering is True", + 0.0, 1.0, DEFAULT_LOW_WATERMARK, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK, + g_param_spec_double ("high-watermark", "High watermark", "High threshold for buffering to finish. Only used if use-buffering is True", - 0, 100, DEFAULT_HIGH_PERCENT, + 0.0, 1.0, DEFAULT_HIGH_WATERMARK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE, @@ -360,13 +441,21 @@ gst_queue2_class_init (GstQueue2Class * klass) 0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstQueue2:avg-in-rate + * + * The average input data rate. + */ + g_object_class_install_property (gobject_class, PROP_AVG_IN_RATE, + g_param_spec_int64 ("avg-in-rate", "Input data rate (bytes/s)", + "Average input data rate (bytes/s)", + 0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /* set several parent class virtual functions */ gobject_class->finalize = gst_queue2_finalize; - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&srctemplate)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&sinktemplate)); + gst_element_class_add_static_pad_template (gstelement_class, &srctemplate); + gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); gst_element_class_set_static_metadata (gstelement_class, "Queue 2", "Generic", @@ -389,7 +478,7 @@ gst_queue2_init (GstQueue2 * queue) GST_DEBUG_FUNCPTR (gst_queue2_chain_list)); gst_pad_set_activatemode_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode)); - gst_pad_set_event_function (queue->sinkpad, + gst_pad_set_event_full_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event)); gst_pad_set_query_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query)); @@ -417,8 +506,8 @@ gst_queue2_init (GstQueue2 * queue) queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME; queue->use_buffering = DEFAULT_USE_BUFFERING; queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE; - queue->low_percent = DEFAULT_LOW_PERCENT; - queue->high_percent = DEFAULT_HIGH_PERCENT; + queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL; + queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL; gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); @@ -444,6 +533,7 @@ gst_queue2_init (GstQueue2 * queue) g_cond_init (&queue->query_handled); queue->last_query = FALSE; + g_mutex_init (&queue->buffering_post_lock); queue->buffering_percent = 100; /* tempfile related */ @@ -467,15 +557,17 @@ gst_queue2_finalize (GObject * object) GST_DEBUG_OBJECT (queue, "finalizing queue"); while (!g_queue_is_empty (&queue->queue)) { - GstMiniObject *data = g_queue_pop_head (&queue->queue); + GstQueue2Item *qitem = g_queue_pop_head (&queue->queue); - if (!GST_IS_QUERY (data)) - gst_mini_object_unref (data); + if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY) + gst_mini_object_unref (qitem->item); + g_slice_free (GstQueue2Item, qitem); } queue->last_query = FALSE; g_queue_clear (&queue->queue); g_mutex_clear (&queue->qlock); + g_mutex_clear (&queue->buffering_post_lock); g_cond_clear (&queue->item_add); g_cond_clear (&queue->item_del); g_cond_clear (&queue->query_handled); @@ -674,7 +766,7 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment, /* now configure the values, we use these to track timestamps on the * sinkpad. */ if (segment->format != GST_FORMAT_TIME) { - /* non-time format, pretent the current time segment is closed with a + /* non-time format, pretend the current time segment is closed with a * 0 start and unknown stop time. */ segment->format = GST_FORMAT_TIME; segment->start = 0; @@ -693,16 +785,51 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment, update_time_level (queue); } +static void +apply_gap (GstQueue2 * queue, GstEvent * event, + GstSegment * segment, gboolean is_sink) +{ + GstClockTime timestamp; + GstClockTime duration; + + gst_event_parse_gap (event, ×tamp, &duration); + + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + + if (GST_CLOCK_TIME_IS_VALID (duration)) { + timestamp += duration; + } + + segment->position = timestamp; + + if (is_sink) + queue->sink_tainted = TRUE; + else + queue->src_tainted = TRUE; + + /* calc diff with other end */ + update_time_level (queue); + } +} + /* take a buffer and update segment, updating the time level of the queue. */ static void apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, - gboolean is_sink) + guint64 size, gboolean is_sink) { GstClockTime duration, timestamp; - timestamp = GST_BUFFER_TIMESTAMP (buffer); + timestamp = GST_BUFFER_DTS_OR_PTS (buffer); duration = GST_BUFFER_DURATION (buffer); + /* If we have no duration, pick one from the bitrate if we can */ + if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) { + guint bitrate = + is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate; + if (bitrate) + duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate); + } + /* if no timestamp is set, assume it's continuous with the previous * time */ if (timestamp == GST_CLOCK_TIME_NONE) @@ -726,21 +853,38 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, update_time_level (queue); } +struct BufListData +{ + GstClockTime timestamp; + guint bitrate; +}; + static gboolean buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data) { - GstClockTime *timestamp = data; + struct BufListData *bld = data; + GstClockTime *timestamp = &bld->timestamp; + GstClockTime btime; - GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT + GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT " duration %" GST_TIME_FORMAT, idx, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)), + GST_TIME_ARGS (GST_BUFFER_PTS (*buf)), + GST_TIME_ARGS (GST_BUFFER_DTS (*buf)), GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); - if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf)) - *timestamp = GST_BUFFER_TIMESTAMP (*buf); + btime = GST_BUFFER_DTS_OR_PTS (*buf); + if (GST_CLOCK_TIME_IS_VALID (btime)) + *timestamp = btime; if (GST_BUFFER_DURATION_IS_VALID (*buf)) *timestamp += GST_BUFFER_DURATION (*buf); + else if (bld->bitrate != 0) { + guint64 size = gst_buffer_get_size (*buf); + + /* If we have no duration, pick one from the bitrate if we can */ + *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size); + } + GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp)); return TRUE; @@ -751,17 +895,25 @@ static void apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, GstSegment * segment, gboolean is_sink) { - GstClockTime timestamp; + struct BufListData bld; /* if no timestamp is set, assume it's continuous with the previous time */ - timestamp = segment->position; + bld.timestamp = segment->position; + + if (queue->use_tags_bitrate) { + if (is_sink) + bld.bitrate = queue->sink_tags_bitrate; + else + bld.bitrate = queue->src_tags_bitrate; + } else + bld.bitrate = 0; - gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp); + gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld); GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT, - GST_TIME_ARGS (timestamp)); + GST_TIME_ARGS (bld.timestamp)); - segment->position = timestamp; + segment->position = bld.timestamp; if (is_sink) queue->sink_tainted = TRUE; @@ -772,81 +924,102 @@ apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, update_time_level (queue); } +static inline gint +normalize_to_buffering_level (guint64 cur_level, guint64 max_level, + guint64 alt_max) +{ + guint64 p; + + if (max_level == 0) + return 0; + + if (alt_max > 0) + p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, + MIN (max_level, alt_max)); + else + p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level); + + return MIN (p, MAX_BUFFERING_LEVEL); +} + static gboolean -get_buffering_percent (GstQueue2 * queue, gboolean * is_buffering, - gint * percent) +get_buffering_level (GstQueue2 * queue, gboolean * is_buffering, + gint * buffering_level) { - gboolean post = FALSE; - gint perc; + gint buflevel, buflevel2; - if (queue->high_percent <= 0) { - if (percent) - *percent = 100; + if (queue->high_watermark <= 0) { + if (buffering_level) + *buffering_level = MAX_BUFFERING_LEVEL; if (is_buffering) *is_buffering = FALSE; return FALSE; } -#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0) +#define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \ + normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max)) if (queue->is_eos) { /* on EOS we are always 100% full, we set the var here so that it we can * reuse the logic below to stop buffering */ - perc = 100; + buflevel = MAX_BUFFERING_LEVEL; GST_LOG_OBJECT (queue, "we are EOS"); } else { - /* figure out the percent we are filled, we take the max of all formats. */ + GST_LOG_OBJECT (queue, + "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u", + queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time), + queue->cur_level.buffers); + + /* figure out the buffering level we are filled, we take the max of all formats. */ if (!QUEUE_IS_USING_RING_BUFFER (queue)) { - perc = GET_PERCENT (bytes, 0); + buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0); } else { guint64 rb_size = queue->ring_buffer_max_size; - perc = GET_PERCENT (bytes, rb_size); + buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size); } - perc = MAX (perc, GET_PERCENT (time, 0)); - perc = MAX (perc, GET_PERCENT (buffers, 0)); - /* also apply the rate estimate when we need to */ - if (queue->use_rate_estimate) - perc = MAX (perc, GET_PERCENT (rate_time, 0)); - } -#undef GET_PERCENT + buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0); + buflevel = MAX (buflevel, buflevel2); - if (queue->is_buffering) { - post = TRUE; - /* if we were buffering see if we reached the high watermark */ - if (perc >= queue->high_percent) - queue->is_buffering = FALSE; - } else { - /* we were not buffering, check if we need to start buffering if we drop - * below the low threshold */ - if (perc < queue->low_percent) { - queue->is_buffering = TRUE; - queue->buffering_iteration++; - post = TRUE; + buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0); + buflevel = MAX (buflevel, buflevel2); + + /* also apply the rate estimate when we need to */ + if (queue->use_rate_estimate) { + buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0); + buflevel = MAX (buflevel, buflevel2); } + + /* Don't get to 0% unless we're really empty */ + if (queue->cur_level.bytes > 0) + buflevel = MAX (1, buflevel); } +#undef GET_BUFFER_LEVEL_FOR_QUANTITY if (is_buffering) *is_buffering = queue->is_buffering; - /* scale to high percent so that it becomes the 100% mark */ - perc = perc * 100 / queue->high_percent; - /* clip */ - if (perc > 100) - perc = 100; + if (buffering_level) + *buffering_level = buflevel; - if (post) { - if (perc == queue->buffering_percent) - post = FALSE; - else - queue->buffering_percent = perc; - } - if (percent) - *percent = perc; + GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering, + buflevel); - GST_DEBUG_OBJECT (queue, "buffering %d, percent %d", queue->is_buffering, - perc); + return TRUE; +} + +static gint +convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level) +{ + int percent; - return post; + /* scale so that if buffering_level equals the high watermark, + * the percentage is 100% */ + percent = buffering_level * 100 / queue->high_watermark; + /* clip */ + if (percent > 100) + percent = 100; + + return percent; } static void @@ -884,29 +1057,71 @@ get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode, } } +/* Called with the lock taken */ +static GstMessage * +gst_queue2_get_buffering_message (GstQueue2 * queue) +{ + GstMessage *msg = NULL; + + if (queue->percent_changed) { + gint percent = queue->buffering_percent; + + queue->percent_changed = FALSE; + + GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent); + msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent); + + gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in, + queue->avg_out, queue->buffering_left); + } + + return msg; +} + +static void +gst_queue2_post_buffering (GstQueue2 * queue) +{ + GstMessage *msg = NULL; + + g_mutex_lock (&queue->buffering_post_lock); + GST_QUEUE2_MUTEX_LOCK (queue); + msg = gst_queue2_get_buffering_message (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); + + if (msg != NULL) + gst_element_post_message (GST_ELEMENT_CAST (queue), msg); + + g_mutex_unlock (&queue->buffering_post_lock); +} + static void update_buffering (GstQueue2 * queue) { - gint percent; - gboolean post = FALSE; + gint buffering_level, percent; - post = get_buffering_percent (queue, NULL, &percent); + /* Ensure the variables used to calculate buffering state are up-to-date. */ + if (queue->current) + update_cur_level (queue, queue->current); + update_in_rates (queue, FALSE); - if (post) { - GstMessage *message; - GstBufferingMode mode; - gint avg_in, avg_out; - gint64 buffering_left; + if (!get_buffering_level (queue, NULL, &buffering_level)) + return; - get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out, - &buffering_left); + percent = convert_to_buffering_percent (queue, buffering_level); - message = gst_message_new_buffering (GST_OBJECT_CAST (queue), - (gint) percent); - gst_message_set_buffering_stats (message, mode, - avg_in, avg_out, buffering_left); + if (queue->is_buffering) { + /* if we were buffering see if we reached the high watermark */ + if (percent >= 100) + queue->is_buffering = FALSE; - gst_element_post_message (GST_ELEMENT_CAST (queue), message); + SET_PERCENT (queue, percent); + } else { + /* we were not buffering, check if we need to start buffering if we drop + * below the low threshold */ + if (buffering_level < queue->low_watermark) { + queue->is_buffering = TRUE; + SET_PERCENT (queue, percent); + } } } @@ -918,6 +1133,7 @@ reset_rate_timer (GstQueue2 * queue) queue->byte_in_rate = 0.0; queue->byte_in_period = 0; queue->byte_out_rate = 0.0; + queue->last_update_in_rates_elapsed = 0.0; queue->last_in_elapsed = 0.0; queue->last_out_elapsed = 0.0; queue->in_timer_started = FALSE; @@ -937,7 +1153,7 @@ reset_rate_timer (GstQueue2 * queue) #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0 static void -update_in_rates (GstQueue2 * queue) +update_in_rates (GstQueue2 * queue, gboolean force) { gdouble elapsed, period; gdouble byte_in_rate; @@ -948,10 +1164,11 @@ update_in_rates (GstQueue2 * queue) return; } - elapsed = g_timer_elapsed (queue->in_timer, NULL); + queue->last_update_in_rates_elapsed = elapsed = + g_timer_elapsed (queue->in_timer, NULL); /* recalc after each interval. */ - if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) { + if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) { period = elapsed - queue->last_in_elapsed; GST_DEBUG_OBJECT (queue, @@ -1051,6 +1268,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset) queue->seeking = TRUE; GST_QUEUE2_MUTEX_UNLOCK (queue); + debug_ranges (queue); + GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset); event = @@ -1075,6 +1294,22 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset) return res; } +/* get the threshold for when we decide to seek rather than wait */ +static guint64 +get_seek_threshold (GstQueue2 * queue) +{ + guint64 threshold; + + /* FIXME, find a good threshold based on the incoming rate. */ + threshold = 1024 * 512; + + if (QUEUE_IS_USING_RING_BUFFER (queue)) { + threshold = MIN (threshold, + QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes); + } + return threshold; +} + /* see if there is enough data in the file to read a full buffer */ static gboolean gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) @@ -1113,25 +1348,12 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) " len %u", offset, length); /* we don't have the range, see how far away we are */ if (!queue->is_eos && queue->current) { - /* FIXME, find a good threshold based on the incoming rate. */ - guint64 threshold = 1024 * 512; - - if (QUEUE_IS_USING_RING_BUFFER (queue)) { - guint64 distance; + guint64 threshold = get_seek_threshold (queue); - distance = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes; - /* don't wait for the complete buffer to fill */ - distance = MIN (distance, threshold); - - if (offset >= queue->current->offset && offset <= - queue->current->writing_pos + distance) { - GST_INFO_OBJECT (queue, - "requested data is within range, wait for data"); - return FALSE; - } - } else if (offset < queue->current->writing_pos + threshold) { - update_cur_pos (queue, queue->current, offset + length); - GST_INFO_OBJECT (queue, "wait for data"); + if (offset >= queue->current->offset && offset <= + queue->current->writing_pos + threshold) { + GST_INFO_OBJECT (queue, + "requested data is within range, wait for data"); return FALSE; } } @@ -1226,7 +1448,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, else buf = *buffer; - gst_buffer_map (buf, &info, GST_MAP_WRITE); + if (!gst_buffer_map (buf, &info, GST_MAP_WRITE)) + goto buffer_write_fail; data = info.data; GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length, @@ -1285,6 +1508,10 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, update_cur_pos (queue, queue->current, rpos); GST_QUEUE2_SIGNAL_DEL (queue); } + + if (queue->use_buffering) + update_buffering (queue); + GST_DEBUG_OBJECT (queue, "waiting for add"); GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing); continue; @@ -1373,6 +1600,14 @@ read_error: gst_buffer_unref (buf); return ret; } +buffer_write_fail: + { + GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL), + ("Can't write to buffer")); + if (*buffer == NULL) + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } } /* should be called with QUEUE_LOCK */ @@ -1426,7 +1661,7 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue) GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template); - /* If temp_template was set, allocate a filename and open that filen */ + /* If temp_template was set, allocate a filename and open that file */ /* nothing to do */ if (queue->temp_template == NULL) @@ -1434,7 +1669,13 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue) /* make copy of the template, we don't want to change this */ name = g_strdup (queue->temp_template); + +#ifdef __BIONIC__ + fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR); +#else fd = g_mkstemp (name); +#endif + if (fd == -1) goto mkstemp_failed; @@ -1501,8 +1742,12 @@ gst_queue2_close_temp_location_file (GstQueue2 * queue) fflush (queue->temp_file); fclose (queue->temp_file); - if (queue->temp_remove) - remove (queue->temp_location); + if (queue->temp_remove) { + if (remove (queue->temp_location) < 0) { + GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s", + queue->temp_location, g_strerror (errno)); + } + } queue->temp_file = NULL; clean_ranges (queue); @@ -1520,24 +1765,33 @@ gst_queue2_flush_temp_file (GstQueue2 * queue) } static void -gst_queue2_locked_flush (GstQueue2 * queue) +gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp) { if (!QUEUE_IS_USING_QUEUE (queue)) { - if (QUEUE_IS_USING_TEMP_FILE (queue)) + if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp) gst_queue2_flush_temp_file (queue); init_ranges (queue); } else { while (!g_queue_is_empty (&queue->queue)) { - GstMiniObject *data = g_queue_pop_head (&queue->queue); + GstQueue2Item *qitem = g_queue_pop_head (&queue->queue); + + if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT + && GST_EVENT_IS_STICKY (qitem->item) + && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT + && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) { + gst_pad_store_sticky_event (queue->srcpad, + GST_EVENT_CAST (qitem->item)); + } /* Then lose another reference because we are supposed to destroy that data when flushing */ - if (!GST_IS_QUERY (data)) - gst_mini_object_unref (data); + if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY) + gst_mini_object_unref (qitem->item); + g_slice_free (GstQueue2Item, qitem); } } - g_cond_signal (&queue->query_handled); queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); GST_QUEUE2_CLEAR_LEVEL (queue->cur_level); gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); @@ -1596,6 +1850,7 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) guint size, rb_size; guint64 writing_pos, new_writing_pos; GstQueue2Range *range, *prev, *next; + gboolean do_seek = FALSE; if (QUEUE_IS_USING_RING_BUFFER (queue)) writing_pos = queue->current->rb_writing_pos; @@ -1604,7 +1859,8 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) ring_buffer = queue->ring_buffer; rb_size = queue->ring_buffer_max_size; - gst_buffer_map (buffer, &info, GST_MAP_READ); + if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) + goto buffer_read_error; size = info.size; data = info.data; @@ -1656,6 +1912,9 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) guint64 range_data_start, range_data_end; GstQueue2Range *range_to_destroy = NULL; + if (range == queue->current) + goto next_range; + range_data_start = range->rb_offset; range_data_end = range->rb_writing_pos; @@ -1769,15 +2028,19 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT, next->writing_pos); - /* remove the group, we could choose to not read the data in this range - * again. This would involve us doing a seek to the current writing position - * in the range. FIXME, It would probably make sense to do a seek when there - * is a lot of data in the range we merged with to avoid reading it all - * again. */ + /* remove the group */ queue->current->next = next->next; - g_slice_free (GstQueue2Range, next); - debug_ranges (queue); + /* We use the threshold to decide if we want to do a seek or simply + * read the data again. If there is not so much data in the range we + * prefer to avoid to seek and read it again. */ + if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) { + /* the new range had more data than the threshold, it's worth keeping + * it and doing a seek. */ + new_writing_pos = next->writing_pos; + do_seek = TRUE; + } + g_slice_free (GstQueue2Range, next); } goto update_and_signal; } @@ -1827,11 +2090,24 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) } else { queue->current->writing_pos = writing_pos = new_writing_pos; } + if (do_seek) + perform_seek_to_offset (queue, new_writing_pos); + update_cur_level (queue, queue->current); /* update the buffering status */ - if (queue->use_buffering) + if (queue->use_buffering) { + GstMessage *msg; update_buffering (queue); + msg = gst_queue2_get_buffering_message (queue); + if (msg) { + GST_QUEUE2_MUTEX_UNLOCK (queue); + g_mutex_lock (&queue->buffering_post_lock); + gst_element_post_message (GST_ELEMENT_CAST (queue), msg); + g_mutex_unlock (&queue->buffering_post_lock); + GST_QUEUE2_MUTEX_LOCK (queue); + } + } GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")", queue->cur_level.bytes, QUEUE_MAX_BYTES (queue)); @@ -1873,6 +2149,12 @@ handle_error: gst_buffer_unmap (buffer, &info); return FALSE; } +buffer_read_error: + { + GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), + ("Can't read from buffer")); + return FALSE; + } } static gboolean @@ -1923,9 +2205,9 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, queue->bytes_in += size; /* apply new buffer to segment stats */ - apply_buffer (queue, buffer, &queue->sink_segment, TRUE); + apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE); /* update the byterate stats */ - update_in_rates (queue); + update_in_rates (queue, FALSE); if (!QUEUE_IS_USING_QUEUE (queue)) { /* FIXME - check return value? */ @@ -1942,7 +2224,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, /* add buffer to the statistics */ if (QUEUE_IS_USING_QUEUE (queue)) { - queue->cur_level.buffers++; + queue->cur_level.buffers += gst_buffer_list_length (buffer_list); queue->cur_level.bytes += size; } queue->bytes_in += size; @@ -1951,7 +2233,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE); /* update the byterate stats */ - update_in_rates (queue); + update_in_rates (queue, FALSE); if (!QUEUE_IS_USING_QUEUE (queue)) { gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue); @@ -1967,6 +2249,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, * filled and we can read all data from the queue. */ GST_DEBUG_OBJECT (queue, "we have EOS"); queue->is_eos = TRUE; + /* Force updating the input bitrate */ + update_in_rates (queue, TRUE); break; case GST_EVENT_SEGMENT: apply_segment (queue, event, &queue->sink_segment, TRUE); @@ -1986,6 +2270,9 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, * from downstream */ queue->unexpected = FALSE; break; + case GST_EVENT_GAP: + apply_gap (queue, event, &queue->sink_segment, TRUE); + break; case GST_EVENT_STREAM_START: if (!QUEUE_IS_USING_QUEUE (queue)) { gst_event_replace (&queue->stream_start_event, event); @@ -2028,7 +2315,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, update_buffering (queue); if (QUEUE_IS_USING_QUEUE (queue)) { - g_queue_push_tail (&queue->queue, item); + GstQueue2Item *qitem = g_slice_new (GstQueue2Item); + qitem->type = item_type; + qitem->item = item; + g_queue_push_tail (&queue->queue, qitem); } else { gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); } @@ -2041,10 +2331,11 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, /* ERRORS */ unexpected_event: { - g_warning - ("Unexpected event of kind %s can't be added in temp file of queue %s ", - gst_event_type_get_name (GST_EVENT_TYPE (item)), - GST_OBJECT_NAME (queue)); + gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM; + + GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: " + "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "", + GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item)); gst_event_unref (GST_EVENT_CAST (item)); return; } @@ -2056,10 +2347,17 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) { GstMiniObject *item; - if (!QUEUE_IS_USING_QUEUE (queue)) + if (!QUEUE_IS_USING_QUEUE (queue)) { item = gst_queue2_read_item_from_file (queue); - else - item = g_queue_pop_head (&queue->queue); + } else { + GstQueue2Item *qitem = g_queue_pop_head (&queue->queue); + + if (qitem == NULL) + goto no_item; + + item = qitem->item; + g_slice_free (GstQueue2Item, qitem); + } if (item == NULL) goto no_item; @@ -2081,7 +2379,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) } queue->bytes_out += size; - apply_buffer (queue, buffer, &queue->src_segment, FALSE); + apply_buffer (queue, buffer, &queue->src_segment, size, FALSE); /* update the byterate stats */ update_out_rates (queue); /* update the buffering */ @@ -2104,6 +2402,9 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) case GST_EVENT_SEGMENT: apply_segment (queue, event, &queue->src_segment, FALSE); break; + case GST_EVENT_GAP: + apply_gap (queue, event, &queue->src_segment, FALSE); + break; default: break; } @@ -2119,7 +2420,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) "retrieved buffer list %p from queue", buffer_list); if (QUEUE_IS_USING_QUEUE (queue)) { - queue->cur_level.buffers--; + queue->cur_level.buffers -= gst_buffer_list_length (buffer_list); queue->cur_level.bytes -= size; } queue->bytes_out += size; @@ -2153,10 +2454,11 @@ no_item: } } -static gboolean +static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { + gboolean ret = TRUE; GstQueue2 *queue; queue = GST_QUEUE2 (parent); @@ -2167,7 +2469,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event"); if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) { /* forward event */ - gst_pad_push_event (queue->srcpad, event); + ret = gst_pad_push_event (queue->srcpad, event); /* now unblock the chain function */ GST_QUEUE2_MUTEX_LOCK (queue); @@ -2182,16 +2484,23 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, * flush_start downstream. */ gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); + + GST_QUEUE2_MUTEX_LOCK (queue); + queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); + GST_QUEUE2_MUTEX_UNLOCK (queue); } else { GST_QUEUE2_MUTEX_LOCK (queue); /* flush the sink pad */ queue->sinkresult = GST_FLOW_FLUSHING; GST_QUEUE2_SIGNAL_DEL (queue); + queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); } - goto done; + break; } case GST_EVENT_FLUSH_STOP: { @@ -2199,15 +2508,16 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) { /* forward event */ - gst_pad_push_event (queue->srcpad, event); + ret = gst_pad_push_event (queue->srcpad, event); GST_QUEUE2_MUTEX_LOCK (queue); - gst_queue2_locked_flush (queue); + gst_queue2_locked_flush (queue, FALSE, TRUE); queue->srcresult = GST_FLOW_OK; queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; queue->seeking = FALSE; + queue->src_tags_bitrate = queue->sink_tags_bitrate = 0; /* reset rate counters */ reset_rate_timer (queue); gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, @@ -2220,29 +2530,64 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, queue->unexpected = FALSE; queue->sinkresult = GST_FLOW_OK; queue->seeking = FALSE; + queue->src_tags_bitrate = queue->sink_tags_bitrate = 0; GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); } - goto done; + break; + } + case GST_EVENT_TAG:{ + if (queue->use_tags_bitrate) { + GstTagList *tags; + guint bitrate; + + gst_event_parse_tag (event, &tags); + if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) || + gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) { + GST_QUEUE2_MUTEX_LOCK (queue); + queue->sink_tags_bitrate = bitrate; + GST_QUEUE2_MUTEX_UNLOCK (queue); + GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate); + } + } + /* Fall-through */ } default: if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing); + if (queue->srcresult != GST_FLOW_OK) { + /* Errors in sticky event pushing are no problem and ignored here + * as they will cause more meaningful errors during data flow. + * For EOS events, that are not followed by data flow, we still + * return FALSE here though and report an error. + */ + if (!GST_EVENT_IS_STICKY (event)) { + goto out_flow_error; + } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + if (queue->srcresult == GST_FLOW_NOT_LINKED + || queue->srcresult < GST_FLOW_EOS) { + GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult); + } + goto out_flow_error; + } + } /* refuse more events on EOS */ if (queue->is_eos) goto out_eos; gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT); GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_queue2_post_buffering (queue); } else { /* non-serialized events are passed upstream. */ - gst_pad_push_event (queue->srcpad, event); + ret = gst_pad_push_event (queue->srcpad, event); } break; } -done: - return TRUE; + if (ret == FALSE) + return GST_FLOW_ERROR; + return GST_FLOW_OK; /* ERRORS */ out_flushing: @@ -2250,14 +2595,23 @@ out_flushing: GST_DEBUG_OBJECT (queue, "refusing event, we are flushing"); GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); - return FALSE; + return GST_FLOW_FLUSHING; } out_eos: { GST_DEBUG_OBJECT (queue, "refusing event, we are EOS"); GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); - return FALSE; + return GST_FLOW_EOS; + } +out_flow_error: + { + GST_LOG_OBJECT (queue, + "refusing event, we have a downstream flow error: %s", + gst_flow_get_name (queue->srcresult)); + GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_event_unref (event); + return queue->srcresult; } } @@ -2274,22 +2628,39 @@ gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent, default: if (GST_QUERY_IS_SERIALIZED (query)) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query); - /* serialized events go in the queue */ + /* serialized events go in the queue. We need to be certain that we + * don't cause deadlocks waiting for the query return value. We check if + * the queue is empty (nothing is blocking downstream and the query can + * be pushed for sure) or we are not buffering. If we are buffering, + * the pipeline waits to unblock downstream until our queue fills up + * completely, which can not happen if we block on the query.. + * Therefore we only potentially block when we are not buffering. */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing); - if (QUEUE_IS_USING_QUEUE (queue)) { - gst_queue2_locked_enqueue (queue, query, GST_QUEUE2_ITEM_TYPE_QUERY); - - STATUS (queue, queue->sinkpad, "wait for QUERY"); - g_cond_wait (&queue->query_handled, &queue->qlock); - if (queue->sinkresult != GST_FLOW_OK) - goto out_flushing; - res = queue->last_query; + if (QUEUE_IS_USING_QUEUE (queue) && (gst_queue2_is_empty (queue) + || !queue->use_buffering)) { + if (!g_atomic_int_get (&queue->downstream_may_block)) { + gst_queue2_locked_enqueue (queue, query, + GST_QUEUE2_ITEM_TYPE_QUERY); + + STATUS (queue, queue->sinkpad, "wait for QUERY"); + while (queue->sinkresult == GST_FLOW_OK && + queue->last_handled_query != query) + g_cond_wait (&queue->query_handled, &queue->qlock); + queue->last_handled_query = NULL; + if (queue->sinkresult != GST_FLOW_OK) + goto out_flushing; + res = queue->last_query; + } else { + GST_DEBUG_OBJECT (queue, "refusing query, downstream might block"); + res = FALSE; + } } else { GST_DEBUG_OBJECT (queue, "refusing query, we are not using the queue"); res = FALSE; } GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_queue2_post_buffering (queue); } else { res = gst_pad_query_default (pad, parent, query); } @@ -2390,6 +2761,7 @@ gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue, /* put buffer in queue now */ gst_queue2_locked_enqueue (queue, item, item_type); GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_queue2_post_buffering (queue); return GST_FLOW_OK; @@ -2516,7 +2888,7 @@ gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type) static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue) { - GstFlowReturn result = GST_FLOW_OK; + GstFlowReturn result = queue->srcresult; GstMiniObject *data; GstQueue2ItemType item_type; @@ -2525,7 +2897,12 @@ gst_queue2_push_one (GstQueue2 * queue) goto no_item; next: + STATUS (queue, queue->srcpad, "We have something dequeud"); + g_atomic_int_set (&queue->downstream_may_block, + item_type == GST_QUEUE2_ITEM_TYPE_BUFFER || + item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST); GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_queue2_post_buffering (queue); if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { GstBuffer *buffer; @@ -2533,6 +2910,7 @@ next: buffer = GST_BUFFER_CAST (data); result = gst_pad_push (queue->srcpad, buffer); + g_atomic_int_set (&queue->downstream_may_block, 0); /* need to check for srcresult here as well */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); @@ -2548,6 +2926,22 @@ next: GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); + if (type == GST_EVENT_TAG) { + if (queue->use_tags_bitrate) { + GstTagList *tags; + guint bitrate; + + gst_event_parse_tag (event, &tags); + if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) || + gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) { + GST_QUEUE2_MUTEX_LOCK (queue); + queue->src_tags_bitrate = bitrate; + GST_QUEUE2_MUTEX_UNLOCK (queue); + GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate); + } + } + } + gst_pad_push_event (queue->srcpad, event); /* if we're EOS, return EOS so that the task pauses. */ @@ -2564,6 +2958,7 @@ next: buffer_list = GST_BUFFER_LIST_CAST (data); result = gst_pad_push_list (queue->srcpad, buffer_list); + g_atomic_int_set (&queue->downstream_may_block, 0); /* need to check for srcresult here as well */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); @@ -2578,7 +2973,10 @@ next: } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) { GstQuery *query = GST_QUERY_CAST (data); + GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query); + queue->last_handled_query = query; queue->last_query = gst_pad_peer_query (queue->srcpad, query); + GST_LOG_OBJECT (queue->srcpad, "Peered query"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "did query %p, return %d", query, queue->last_query); g_cond_signal (&queue->query_handled); @@ -2641,6 +3039,7 @@ gst_queue2_loop (GstPad * pad) goto out_flushing; GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_queue2_post_buffering (queue); return; @@ -2651,16 +3050,20 @@ out_flushing: GstFlowReturn ret = queue->srcresult; gst_pad_pause_task (queue->srcpad); + if (ret == GST_FLOW_FLUSHING) { + gst_queue2_locked_flush (queue, FALSE, FALSE); + } else { + GST_QUEUE2_SIGNAL_DEL (queue); + queue->last_query = FALSE; + g_cond_signal (&queue->query_handled); + } GST_QUEUE2_MUTEX_UNLOCK (queue); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pause task, reason: %s", gst_flow_get_name (queue->srcresult)); /* let app know about us giving up if upstream is not expected to do so */ /* EOS is already taken care of elsewhere */ if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { - GST_ELEMENT_ERROR (queue, STREAM, FAILED, - (_("Internal data flow error.")), - ("streaming task paused, reason %s (%d)", - gst_flow_get_name (ret), ret)); + GST_ELEMENT_FLOW_ERROR (queue, ret); gst_pad_push_event (queue->srcpad, gst_event_new_eos ()); } return; @@ -2757,9 +3160,13 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) switch (format) { case GST_FORMAT_BYTES: peer_pos -= queue->cur_level.bytes; + if (peer_pos < 0) /* Clamp result to 0 */ + peer_pos = 0; break; case GST_FORMAT_TIME: peer_pos -= queue->cur_level.time; + if (peer_pos < 0) /* Clamp result to 0 */ + peer_pos = 0; break; default: GST_WARNING_OBJECT (queue, "dropping query in %s format, don't " @@ -2790,7 +3197,8 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) GST_DEBUG_OBJECT (queue, "query buffering"); - get_buffering_percent (queue, &is_buffering, &percent); + get_buffering_level (queue, &is_buffering, &percent); + percent = convert_to_buffering_percent (queue, percent); gst_query_set_buffering_percent (query, is_buffering, percent); get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out, @@ -2908,6 +3316,11 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) gboolean pull_mode; GstSchedulingFlags flags = 0; + if (!gst_pad_peer_query (queue->sinkpad, query)) + goto peer_failed; + + gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL); + /* we can operate in pull mode when we are using a tempfile */ pull_mode = !QUEUE_IS_USING_QUEUE (queue); @@ -2954,7 +3367,13 @@ gst_queue2_update_upstream_size (GstQueue2 * queue) if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES, &upstream_size)) { GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size); - queue->upstream_size = upstream_size; + + /* upstream_size can be negative but queue->upstream_size is unsigned. + * Prevent setting negative values to it (the query can return -1) */ + if (upstream_size >= 0) + queue->upstream_size = upstream_size; + else + queue->upstream_size = 0; } } @@ -2996,6 +3415,7 @@ gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset, /* FIXME - function will block when the range is not yet available */ ret = gst_queue2_create_read (queue, offset, length, buffer); GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_queue2_post_buffering (queue); return ret; @@ -3043,8 +3463,15 @@ gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (queue, "deactivating push mode"); queue->srcresult = GST_FLOW_FLUSHING; queue->sinkresult = GST_FLOW_FLUSHING; - gst_queue2_locked_flush (queue); + GST_QUEUE2_SIGNAL_DEL (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); + + /* wait until it is unblocked and clean up */ + GST_PAD_STREAM_LOCK (pad); + GST_QUEUE2_MUTEX_LOCK (queue); + gst_queue2_locked_flush (queue, TRUE, FALSE); + GST_QUEUE2_MUTEX_UNLOCK (queue); + GST_PAD_STREAM_UNLOCK (pad); } result = TRUE; break; @@ -3087,6 +3514,10 @@ gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); + + GST_QUEUE2_MUTEX_LOCK (queue); + gst_queue2_locked_flush (queue, FALSE, FALSE); + GST_QUEUE2_MUTEX_UNLOCK (queue); } return result; @@ -3244,8 +3675,10 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) /* changing the capacity of the queue must wake up * the _chain function, it might have more room now * to store the buffer/event in the queue */ -#define QUEUE_CAPACITY_CHANGE(q)\ - GST_QUEUE2_SIGNAL_DEL (queue); +#define QUEUE_CAPACITY_CHANGE(q) \ + GST_QUEUE2_SIGNAL_DEL (queue); \ + if (queue->use_buffering) \ + update_buffering (queue); /* Changing the minimum required fill level must * wake up the _loop function as it might now @@ -3308,15 +3741,44 @@ gst_queue2_set_property (GObject * object, break; case PROP_USE_BUFFERING: queue->use_buffering = g_value_get_boolean (value); + if (!queue->use_buffering && queue->is_buffering) { + GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, " + "posting 100%% message"); + SET_PERCENT (queue, 100); + queue->is_buffering = FALSE; + } + + if (queue->use_buffering) { + queue->is_buffering = TRUE; + update_buffering (queue); + } + break; + case PROP_USE_TAGS_BITRATE: + queue->use_tags_bitrate = g_value_get_boolean (value); break; case PROP_USE_RATE_ESTIMATE: queue->use_rate_estimate = g_value_get_boolean (value); break; case PROP_LOW_PERCENT: - queue->low_percent = g_value_get_int (value); + queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR; + if (queue->is_buffering) + update_buffering (queue); break; case PROP_HIGH_PERCENT: - queue->high_percent = g_value_get_int (value); + queue->high_watermark = + g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR; + if (queue->is_buffering) + update_buffering (queue); + break; + case PROP_LOW_WATERMARK: + queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL; + if (queue->is_buffering) + update_buffering (queue); + break; + case PROP_HIGH_WATERMARK: + queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL; + if (queue->is_buffering) + update_buffering (queue); break; case PROP_TEMP_TEMPLATE: gst_queue2_set_temp_template (queue, g_value_get_string (value)); @@ -3333,6 +3795,7 @@ gst_queue2_set_property (GObject * object, } GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_queue2_post_buffering (queue); } static void @@ -3365,14 +3828,25 @@ gst_queue2_get_property (GObject * object, case PROP_USE_BUFFERING: g_value_set_boolean (value, queue->use_buffering); break; + case PROP_USE_TAGS_BITRATE: + g_value_set_boolean (value, queue->use_tags_bitrate); + break; case PROP_USE_RATE_ESTIMATE: g_value_set_boolean (value, queue->use_rate_estimate); break; case PROP_LOW_PERCENT: - g_value_set_int (value, queue->low_percent); + g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR); break; case PROP_HIGH_PERCENT: - g_value_set_int (value, queue->high_percent); + g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR); + break; + case PROP_LOW_WATERMARK: + g_value_set_double (value, queue->low_watermark / + (gdouble) MAX_BUFFERING_LEVEL); + break; + case PROP_HIGH_WATERMARK: + g_value_set_double (value, queue->high_watermark / + (gdouble) MAX_BUFFERING_LEVEL); break; case PROP_TEMP_TEMPLATE: g_value_set_string (value, queue->temp_template); @@ -3386,6 +3860,19 @@ gst_queue2_get_property (GObject * object, case PROP_RING_BUFFER_MAX_SIZE: g_value_set_uint64 (value, queue->ring_buffer_max_size); break; + case PROP_AVG_IN_RATE: + { + gdouble in_rate = queue->byte_in_rate; + + /* During the first RATE_INTERVAL, byte_in_rate will not have been + * calculated, so calculate it here. */ + if (in_rate == 0.0 && queue->bytes_in + && queue->last_update_in_rates_elapsed > 0.0) + in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed; + + g_value_set_int64 (value, (gint64) in_rate); + break; + } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break;