[queue2] Update queue2 for progressive download 78/47978/1
authorHyongtaek Lim <hyongtaek.lim@samsung.com>
Thu, 10 Sep 2015 12:08:56 +0000 (21:08 +0900)
committerHyongtaek Lim <hyongtaek.lim@samsung.com>
Thu, 10 Sep 2015 12:09:30 +0000 (21:09 +0900)
Signed-off-by: Hyongtaek Lim <hyongtaek.lim@samsung.com>
Change-Id: I16115b8be724067b98f4b561a52d42da27aa39e4

plugins/elements/gstqueue2.c
plugins/elements/gstqueue2.h

index 63a56a30cbcb99617d80eb628024793fa5d2b914..d698f4059927832cfbe2b866784dec16d8f58a1c 100644 (file)
@@ -206,6 +206,16 @@ enum
   }                                                                     \
 } G_STMT_END
 
+#define SET_PERCENT(q, perc) G_STMT_START {                              \
+  if (perc != q->buffering_percent) {                                    \
+    q->buffering_percent = perc;                                         \
+    q->percent_changed = TRUE;                                           \
+    GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
+    get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
+        &q->buffering_left);                                             \
+  }                                                                      \
+} G_STMT_END
+
 #define _do_init \
     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
@@ -254,6 +264,10 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue);
 
 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
 static void update_in_rates (GstQueue2 * queue);
+static void gst_queue2_post_buffering (GstQueue2 * queue);
+#ifdef GST_QUEUE2_MODIFICATION
+static gboolean change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length);
+#endif
 
 typedef enum
 {
@@ -449,6 +463,7 @@ gst_queue2_init (GstQueue2 * queue)
   g_cond_init (&queue->query_handled);
   queue->last_query = FALSE;
 
+  g_mutex_init (&queue->buffering_post_lock);
   queue->buffering_percent = 100;
 
   /* tempfile related */
@@ -482,6 +497,7 @@ gst_queue2_finalize (GObject * object)
   queue->last_query = FALSE;
   g_queue_clear (&queue->queue);
   g_mutex_clear (&queue->qlock);
+  g_mutex_clear (&queue->buffering_post_lock);
   g_cond_clear (&queue->item_add);
   g_cond_clear (&queue->item_del);
   g_cond_clear (&queue->query_handled);
@@ -519,6 +535,9 @@ clean_ranges (GstQueue2 * queue)
   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
   queue->ranges = NULL;
   queue->current = NULL;
+#ifdef GST_QUEUE2_MODIFICATION
+  queue->read = NULL;
+#endif
 }
 
 /* find a range that contains @offset or NULL when nothing does */
@@ -629,7 +648,11 @@ init_ranges (GstQueue2 * queue)
   /* get rid of all the current ranges */
   clean_ranges (queue);
   /* make a range for offset 0 */
+#ifdef GST_QUEUE2_MODIFICATION
+  queue->current = queue->read = add_range (queue, 0, TRUE);
+#else
   queue->current = add_range (queue, 0, TRUE);
+#endif
 }
 
 /* calculate the diff between running time on the sink and src of the queue.
@@ -673,7 +696,11 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
   if (segment->format == GST_FORMAT_BYTES) {
     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
       /* start is where we'll be getting from and as such writing next */
+#ifdef GST_QUEUE2_MODIFICATION
+      queue->current = queue->read = add_range (queue, segment->start, TRUE);
+#else
       queue->current = add_range (queue, segment->start, TRUE);
+#endif
     }
   }
 
@@ -699,6 +726,33 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
   update_time_level (queue);
 }
 
+static void
+apply_gap (GstQueue2 * queue, GstEvent * event,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+  GstClockTime duration;
+
+  gst_event_parse_gap (event, &timestamp, &duration);
+
+  if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+
+    if (GST_CLOCK_TIME_IS_VALID (duration)) {
+      timestamp += duration;
+    }
+
+    segment->position = timestamp;
+
+    if (is_sink)
+      queue->sink_tainted = TRUE;
+    else
+      queue->src_tainted = TRUE;
+
+    /* calc diff with other end */
+    update_time_level (queue);
+  }
+}
+
 /* take a buffer and update segment, updating the time level of the queue. */
 static void
 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
@@ -868,57 +922,72 @@ get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
   }
 }
 
+static void
+gst_queue2_post_buffering (GstQueue2 * queue)
+{
+  GstMessage *msg = NULL;
+
+  g_mutex_lock (&queue->buffering_post_lock);
+  GST_QUEUE2_MUTEX_LOCK (queue);
+  if (queue->percent_changed) {
+    gint percent = queue->buffering_percent;
+
+    queue->percent_changed = FALSE;
+
+    GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
+    msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
+
+    gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
+        queue->avg_out, queue->buffering_left);
+  }
+  GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+  if (msg != NULL)
+    gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
+
+  g_mutex_unlock (&queue->buffering_post_lock);
+}
+
 static void
 update_buffering (GstQueue2 * queue)
 {
   gint percent;
-  gboolean post = FALSE;
+#ifdef GST_QUEUE2_MODIFICATION
+  GstQueue2Range *range;
+
+  if (queue->read)
+    range = queue->read;
+  else
+    range = queue->current;
+#endif
 
   /* Ensure the variables used to calculate buffering state are up-to-date. */
+#ifdef GST_QUEUE2_MODIFICATION
+  if (range)
+    update_cur_level (queue, range);
+#else
   if (queue->current)
     update_cur_level (queue, queue->current);
+#endif
   update_in_rates (queue);
 
   if (!get_buffering_percent (queue, NULL, &percent))
     return;
 
   if (queue->is_buffering) {
-    post = TRUE;
     /* if we were buffering see if we reached the high watermark */
-    if (percent >= queue->high_percent)
+    if (percent >= 100)
       queue->is_buffering = FALSE;
+
+    SET_PERCENT (queue, percent);
   } else {
     /* we were not buffering, check if we need to start buffering if we drop
      * below the low threshold */
     if (percent < queue->low_percent) {
       queue->is_buffering = TRUE;
-      post = TRUE;
+      SET_PERCENT (queue, percent);
     }
   }
-
-  if (post) {
-    if (percent == queue->buffering_percent)
-      post = FALSE;
-    else
-      queue->buffering_percent = percent;
-  }
-
-  if (post) {
-    GstMessage *message;
-    GstBufferingMode mode;
-    gint avg_in, avg_out;
-    gint64 buffering_left;
-
-    get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
-        &buffering_left);
-
-    message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
-        (gint) percent);
-    gst_message_set_buffering_stats (message, mode,
-        avg_in, avg_out, buffering_left);
-
-    gst_element_post_message (GST_ELEMENT_CAST (queue), message);
-  }
 }
 
 static void
@@ -1082,7 +1151,11 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
      * cause data to be written to the wrong offset in the file or ring buffer.
      * We still do the add_range call to switch the current range to the
      * requested range, or create one if one doesn't exist yet. */
+#ifdef GST_QUEUE2_MODIFICATION
+    queue->current = queue->read = add_range (queue, offset, FALSE);
+#else
     queue->current = add_range (queue, offset, FALSE);
+#endif
   }
 
   return res;
@@ -1104,19 +1177,67 @@ get_seek_threshold (GstQueue2 * queue)
   return threshold;
 }
 
+#ifdef GST_QUEUE2_MODIFICATION
+/* check the buffered data size, not to change range repeatedly.
+ * changing range cause the new http connection and it makes buffering state frequently. */
+static gboolean
+change_current_range(GstQueue2 * queue, GstQueue2Range *req_range, guint64 offset, guint length)
+{
+  guint64 curr_level = 0;
+  guint64 curr_min_level = 0;
+
+  if (queue->current->writing_pos > queue->current->reading_pos)
+    curr_level = queue->current->writing_pos - queue->current->reading_pos;
+
+  /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
+  curr_min_level = MIN((queue->max_level.bytes*0.05), get_seek_threshold(queue));
+
+  GST_DEBUG_OBJECT(queue, " curr[w%"G_GUINT64_FORMAT", r%"G_GUINT64_FORMAT
+        ", d%"G_GUINT64_FORMAT", m%"G_GUINT64_FORMAT
+        "] u[%"G_GUINT64_FORMAT"][EOS:%d]",
+        queue->current->writing_pos, queue->current->reading_pos,
+        curr_level, curr_min_level,
+        queue->upstream_size, queue->is_eos);
+
+  /* FIXME, find a good threshold based on the incoming rate and content bitrate. */
+  if ((queue->is_eos) ||
+      (queue->upstream_size > 0 && queue->current->writing_pos >= queue->upstream_size) ||
+      (curr_level >= curr_min_level)) {
+    GST_DEBUG_OBJECT (queue, "Range Switching");
+    return TRUE;
+  } else {
+    GST_DEBUG_OBJECT (queue, "keep receiving data without range switching.");
+    return FALSE;
+  }
+}
+#endif
+
 /* see if there is enough data in the file to read a full buffer */
 static gboolean
 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
 {
   GstQueue2Range *range;
-
+#ifdef GST_QUEUE2_MODIFICATION
+  gboolean need_to_seek = TRUE;
+#endif
   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
       offset, length);
 
   if ((range = find_range (queue, offset))) {
+#ifdef GST_QUEUE2_MODIFICATION
+    queue->read = range;
+#endif
     if (queue->current != range) {
       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
+#ifdef GST_QUEUE2_MODIFICATION
+      if (QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER(queue))
+        need_to_seek = change_current_range(queue, range, offset, length);
+
+      if (need_to_seek == TRUE)
+        perform_seek_to_offset (queue, range->writing_pos);
+#else
       perform_seek_to_offset (queue, range->writing_pos);
+#endif
     }
 
     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
@@ -1140,6 +1261,11 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
   } else {
     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
         " len %u", offset, length);
+
+#ifdef GST_QUEUE2_MODIFICATION
+    queue->read = queue->current;
+#endif
+
     /* we don't have the range, see how far away we are */
     if (!queue->is_eos && queue->current) {
       guint64 threshold = get_seek_threshold (queue);
@@ -1262,15 +1388,24 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
         guint64 level;
 
         /* calculate how far away the offset is */
+#ifdef GST_QUEUE2_MODIFICATION
+        if (queue->read->writing_pos > rpos)
+          level = queue->read->writing_pos - rpos;
+#else
         if (queue->current->writing_pos > rpos)
           level = queue->current->writing_pos - rpos;
+#endif
         else
           level = 0;
 
         GST_DEBUG_OBJECT (queue,
             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+#ifdef GST_QUEUE2_MODIFICATION
+            rpos, queue->read->writing_pos, level, max_size);
+#else
             rpos, queue->current->writing_pos, level, max_size);
+#endif
 
         if (level >= max_size) {
           /* we don't have the data but if we have a ring buffer that is full, we
@@ -1295,10 +1430,17 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 
       if (read_length == 0) {
         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+#ifdef GST_QUEUE2_MODIFICATION
+          GST_DEBUG_OBJECT (queue,
+              "update current position [%" G_GUINT64_FORMAT "-%"
+              G_GUINT64_FORMAT "]", rpos, queue->read->max_reading_pos);
+          update_cur_pos (queue, queue->read, rpos);
+#else
           GST_DEBUG_OBJECT (queue,
               "update current position [%" G_GUINT64_FORMAT "-%"
               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
           update_cur_pos (queue, queue->current, rpos);
+#endif
           GST_QUEUE2_SIGNAL_DEL (queue);
         }
 
@@ -1315,13 +1457,22 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     }
 
     /* set range reading_pos to actual reading position for this read */
+#ifdef GST_QUEUE2_MODIFICATION
+    queue->read->reading_pos = rpos;
+#else
     queue->current->reading_pos = rpos;
+#endif
 
     /* configure how much and from where to read */
     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
       file_offset =
+#ifdef GST_QUEUE2_MODIFICATION
+          (queue->read->rb_offset + (rpos -
+              queue->read->offset)) % rb_size;
+#else
           (queue->current->rb_offset + (rpos -
               queue->current->offset)) % rb_size;
+#endif
       if (file_offset + read_length > rb_size) {
         block_length = rb_size - file_offset;
       } else {
@@ -1351,8 +1502,13 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
       block_length = read_length;
       remaining -= read_return;
 
+#ifdef GST_QUEUE2_MODIFICATION
+      rpos = (queue->read->reading_pos += read_return);
+      update_cur_pos (queue, queue->read, queue->read->reading_pos);
+#else
       rpos = (queue->current->reading_pos += read_return);
       update_cur_pos (queue, queue->current, queue->current->reading_pos);
+#endif
     }
     GST_QUEUE2_SIGNAL_DEL (queue);
     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
@@ -1412,7 +1568,11 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
     GstBuffer *buffer = NULL;
     guint64 reading_pos;
 
+#ifdef GST_QUEUE2_MODIFICATION
+    reading_pos = queue->read->reading_pos;
+#else
     reading_pos = queue->current->reading_pos;
+#endif
 
     ret =
         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
@@ -1864,9 +2024,20 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
     } else {
       queue->current->writing_pos = writing_pos = new_writing_pos;
+#ifdef GST_QUEUE2_MODIFICATION
+      if (do_seek) {
+        if (queue->upstream_size == 0 || new_writing_pos < queue->upstream_size)
+          perform_seek_to_offset (queue, new_writing_pos);
+        else
+          queue->is_eos = TRUE;
+      }
+#endif
     }
+
+#ifndef GST_QUEUE2_MODIFICATION
     if (do_seek)
       perform_seek_to_offset (queue, new_writing_pos);
+#endif
 
     update_cur_level (queue, queue->current);
 
@@ -1983,7 +2154,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
 
     /* add buffer to the statistics */
     if (QUEUE_IS_USING_QUEUE (queue)) {
-      queue->cur_level.buffers++;
+      queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
       queue->cur_level.bytes += size;
     }
     queue->bytes_in += size;
@@ -2027,6 +2198,9 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
          * from downstream */
         queue->unexpected = FALSE;
         break;
+      case GST_EVENT_GAP:
+        apply_gap (queue, event, &queue->sink_segment, TRUE);
+        break;
       case GST_EVENT_STREAM_START:
         if (!QUEUE_IS_USING_QUEUE (queue)) {
           gst_event_replace (&queue->stream_start_event, event);
@@ -2155,6 +2329,9 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
       case GST_EVENT_SEGMENT:
         apply_segment (queue, event, &queue->src_segment, FALSE);
         break;
+      case GST_EVENT_GAP:
+        apply_gap (queue, event, &queue->src_segment, FALSE);
+        break;
       default:
         break;
     }
@@ -2170,7 +2347,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
         "retrieved buffer list %p from queue", buffer_list);
 
     if (QUEUE_IS_USING_QUEUE (queue)) {
-      queue->cur_level.buffers--;
+      queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
       queue->cur_level.bytes -= size;
     }
     queue->bytes_out += size;
@@ -2310,6 +2487,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
           goto out_eos;
         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
+        gst_queue2_post_buffering (queue);
       } else {
         /* non-serialized events are passed upstream. */
         ret = gst_pad_push_event (queue->srcpad, event);
@@ -2386,6 +2564,7 @@ gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
           res = FALSE;
         }
         GST_QUEUE2_MUTEX_UNLOCK (queue);
+        gst_queue2_post_buffering (queue);
       } else {
         res = gst_pad_query_default (pad, parent, query);
       }
@@ -2486,6 +2665,7 @@ gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
   /* put buffer in queue now */
   gst_queue2_locked_enqueue (queue, item, item_type);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
+  gst_queue2_post_buffering (queue);
 
   return GST_FLOW_OK;
 
@@ -2626,6 +2806,7 @@ next:
       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
+  gst_queue2_post_buffering (queue);
 
   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
@@ -2745,6 +2926,7 @@ gst_queue2_loop (GstPad * pad)
     goto out_flushing;
 
   GST_QUEUE2_MUTEX_UNLOCK (queue);
+  gst_queue2_post_buffering (queue);
 
   return;
 
@@ -2978,15 +3160,25 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
                 range_stop = 0;
                 break;
               }
+#ifdef GST_QUEUE2_MODIFICATION
+              range_start =
+                  gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
+                  queued_ranges->reading_pos, duration);
+#else
               range_start =
                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
                   queued_ranges->offset, duration);
+#endif
               range_stop =
                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
                   queued_ranges->writing_pos, duration);
               break;
             case GST_FORMAT_BYTES:
+#ifdef GST_QUEUE2_MODIFICATION
+              range_start = queued_ranges->reading_pos;
+#else
               range_start = queued_ranges->offset;
+#endif
               range_stop = queued_ranges->writing_pos;
               break;
             default:
@@ -3012,7 +3204,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       gboolean pull_mode;
       GstSchedulingFlags flags = 0;
 
-      if (!gst_pad_peer_query (queue->sinkpad, query))
+      if ((!QUEUE_IS_USING_QUEUE (queue)) && !gst_pad_peer_query (queue->sinkpad, query))
         goto peer_failed;
 
       gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
@@ -3063,7 +3255,13 @@ gst_queue2_update_upstream_size (GstQueue2 * queue)
   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
           &upstream_size)) {
     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
-    queue->upstream_size = upstream_size;
+
+    /* upstream_size can be negative but queue->upstream_size is unsigned.
+     * Prevent setting negative values to it (the query can return -1) */
+    if (upstream_size >= 0)
+      queue->upstream_size = upstream_size;
+    else
+      queue->upstream_size = 0;
   }
 }
 
@@ -3105,6 +3303,7 @@ gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
   /* FIXME - function will block when the range is not yet available */
   ret = gst_queue2_create_read (queue, offset, length, buffer);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
+  gst_queue2_post_buffering (queue);
 
   return ret;
 
@@ -3430,13 +3629,10 @@ gst_queue2_set_property (GObject * object,
     case PROP_USE_BUFFERING:
       queue->use_buffering = g_value_get_boolean (value);
       if (!queue->use_buffering && queue->is_buffering) {
-        GstMessage *msg = gst_message_new_buffering (GST_OBJECT_CAST (queue),
-            100);
-
         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
             "posting 100%% message");
+        SET_PERCENT (queue, 100);
         queue->is_buffering = FALSE;
-        gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
       }
 
       if (queue->use_buffering) {
@@ -3468,6 +3664,7 @@ gst_queue2_set_property (GObject * object,
   }
 
   GST_QUEUE2_MUTEX_UNLOCK (queue);
+  gst_queue2_post_buffering (queue);
 }
 
 static void
index bbd614dd4a06d44c60732a445ecb39dba982e658..1f6653ac491c86a025d2b876449f20a84ae1e456 100644 (file)
@@ -141,6 +141,9 @@ struct _GstQueue2
   /* list of downloaded areas and the current area */
   GstQueue2Range *ranges;
   GstQueue2Range *current;
+#ifdef GST_QUEUE2_MODIFICATION
+  GstQueue2Range *read;
+#endif
   /* we need this to send the first new segment event of the stream
    * because we can't save it on the file */
   gboolean segment_event_received;
@@ -153,6 +156,13 @@ struct _GstQueue2
   guint8 * ring_buffer;
 
   volatile gint downstream_may_block;
+
+  GstBufferingMode mode;
+  gint64 buffering_left;
+  gint avg_in;
+  gint avg_out;
+  gboolean percent_changed;
+  GMutex buffering_post_lock; /* assures only one posted at a time */
 };
 
 struct _GstQueue2Class