Merge branch 'upstream/1.16' into tizen_gst_1.16.2
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue2.c
index 8a59976..a2c9538 100644 (file)
 #endif
 
 #ifdef __BIONIC__               /* Android */
-#undef lseek
-#define lseek lseek64
-#undef off_t
-#define off_t guint64
 #include <fcntl.h>
 #endif
 
@@ -122,6 +118,7 @@ enum
 #define DEFAULT_HIGH_WATERMARK     0.99
 #define DEFAULT_TEMP_REMOVE        TRUE
 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
+#define DEFAULT_USE_BITRATE_QUERY  TRUE
 
 enum
 {
@@ -144,6 +141,8 @@ enum
   PROP_TEMP_REMOVE,
   PROP_RING_BUFFER_MAX_SIZE,
   PROP_AVG_IN_RATE,
+  PROP_USE_BITRATE_QUERY,
+  PROP_BITRATE,
 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
   PROP_BUFFER_MODE,
 #endif
@@ -197,7 +196,7 @@ enum
                       queue->max_level.time, \
                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
                         queue->current->writing_pos - queue->current->max_reading_pos : \
-                        queue->queue.length))
+                        gst_queue_array_get_length(queue->queue)))
 
 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
   g_mutex_lock (&q->qlock);                                              \
@@ -423,6 +422,13 @@ gst_queue2_class_init (GstQueue2Class * klass)
           "Location to store temporary files in (Only read this property, "
           "use temp-template to configure the name template)",
           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_USE_BITRATE_QUERY,
+      g_param_spec_boolean ("use-bitrate-query",
+          "Use bitrate from downstream query",
+          "Use a bitrate from a downstream query to estimate buffer duration if not provided",
+          DEFAULT_USE_BITRATE_QUERY,
+          G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
+          G_PARAM_STATIC_STRINGS));
 
   /**
    * GstQueue2:temp-remove
@@ -457,6 +463,18 @@ gst_queue2_class_init (GstQueue2Class * klass)
           "Average input data rate (bytes/s)",
           0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstQueue2:bitrate
+   *
+   * The value used to convert between byte and time values for limiting
+   * the size of the queue.  Values are taken from either the upstream tags
+   * or from the downstream bitrate query.
+   */
+  g_object_class_install_property (gobject_class, PROP_BITRATE,
+      g_param_spec_uint64 ("bitrate", "Bitrate (bits/s)",
+          "Conversion value between data size and time",
+          0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
   /**
    * GstQueue2:buffer-mode:
@@ -469,6 +487,7 @@ gst_queue2_class_init (GstQueue2Class * klass)
           GST_TYPE_BUFFERING_MODE, GST_BUFFERING_STREAM,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 #endif
+
   /* set several parent class virtual functions */
   gobject_class->finalize = gst_queue2_finalize;
 
@@ -546,13 +565,14 @@ gst_queue2_init (GstQueue2 * queue)
   g_cond_init (&queue->item_add);
   queue->waiting_del = FALSE;
   g_cond_init (&queue->item_del);
-  g_queue_init (&queue->queue);
+  queue->queue = gst_queue_array_new_for_struct (sizeof (GstQueue2Item), 32);
 
   g_cond_init (&queue->query_handled);
   queue->last_query = FALSE;
 
   g_mutex_init (&queue->buffering_post_lock);
   queue->buffering_percent = 100;
+  queue->last_posted_buffering_percent = -1;
 
   /* tempfile related */
   queue->temp_template = NULL;
@@ -561,6 +581,9 @@ gst_queue2_init (GstQueue2 * queue)
 
   queue->ring_buffer = NULL;
   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
+
+  queue->use_bitrate_query = DEFAULT_USE_BITRATE_QUERY;
+
 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
   queue->mode = -1;
 #endif
@@ -573,19 +596,17 @@ static void
 gst_queue2_finalize (GObject * object)
 {
   GstQueue2 *queue = GST_QUEUE2 (object);
+  GstQueue2Item *qitem;
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
-  while (!g_queue_is_empty (&queue->queue)) {
-    GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
-
+  while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
       gst_mini_object_unref (qitem->item);
-    g_slice_free (GstQueue2Item, qitem);
   }
+  gst_queue_array_free (queue->queue);
 
   queue->last_query = FALSE;
-  g_queue_clear (&queue->queue);
   g_mutex_clear (&queue->qlock);
   g_mutex_clear (&queue->buffering_post_lock);
   g_cond_clear (&queue->item_add);
@@ -843,6 +864,29 @@ apply_gap (GstQueue2 * queue, GstEvent * event,
   }
 }
 
+static void
+query_downstream_bitrate (GstQueue2 * queue)
+{
+  GstQuery *query = gst_query_new_bitrate ();
+  guint downstream_bitrate = 0;
+
+  if (gst_pad_peer_query (queue->srcpad, query)) {
+    gst_query_parse_bitrate (query, &downstream_bitrate);
+    GST_DEBUG_OBJECT (queue, "Got bitrate of %u from downstream",
+        downstream_bitrate);
+  } else {
+    GST_DEBUG_OBJECT (queue, "Failed to query bitrate from downstream");
+  }
+
+  gst_query_unref (query);
+
+  GST_QUEUE2_MUTEX_LOCK (queue);
+  queue->downstream_bitrate = downstream_bitrate;
+  GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+  g_object_notify (G_OBJECT (queue), "bitrate");
+}
+
 /* take a buffer and update segment, updating the time level of the queue. */
 static void
 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
@@ -854,11 +898,24 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
   duration = GST_BUFFER_DURATION (buffer);
 
   /* If we have no duration, pick one from the bitrate if we can */
-  if (duration == GST_CLOCK_TIME_NONE && queue->use_tags_bitrate) {
-    guint bitrate =
-        is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
-    if (bitrate)
-      duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
+  if (duration == GST_CLOCK_TIME_NONE) {
+    if (queue->use_tags_bitrate) {
+      guint bitrate =
+          is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
+      if (bitrate)
+        duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
+    }
+    if (duration == GST_CLOCK_TIME_NONE && !is_sink && queue->use_bitrate_query) {
+      if (queue->downstream_bitrate > 0) {
+        duration =
+            gst_util_uint64_scale (size, 8 * GST_SECOND,
+            queue->downstream_bitrate);
+
+        GST_LOG_OBJECT (queue, "got bitrate %u resulting in estimated "
+            "duration %" GST_TIME_FORMAT, queue->downstream_bitrate,
+            GST_TIME_ARGS (duration));
+      }
+    }
   }
 
   /* if no timestamp is set, assume it's continuous with the previous
@@ -931,13 +988,16 @@ apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
   /* if no timestamp is set, assume it's continuous with the previous time */
   bld.timestamp = segment->position;
 
+  bld.bitrate = 0;
   if (queue->use_tags_bitrate) {
     if (is_sink)
       bld.bitrate = queue->sink_tags_bitrate;
     else
       bld.bitrate = queue->src_tags_bitrate;
-  } else
-    bld.bitrate = 0;
+  }
+  if (!is_sink && bld.bitrate == 0 && queue->use_bitrate_query) {
+    bld.bitrate = queue->downstream_bitrate;
+  }
 
   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
 
@@ -996,9 +1056,10 @@ get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
     GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
   } else {
     GST_LOG_OBJECT (queue,
-        "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
-        queue->cur_level.bytes, GST_TIME_ARGS (queue->cur_level.time),
-        queue->cur_level.buffers);
+        "Cur level bytes/time/rate-time/buffers %u/%" GST_TIME_FORMAT "/%"
+        GST_TIME_FORMAT "/%u", queue->cur_level.bytes,
+        GST_TIME_ARGS (queue->cur_level.time),
+        GST_TIME_ARGS (queue->cur_level.rate_time), queue->cur_level.buffers);
 
     /* figure out the buffering level we are filled, we take the max of all formats. */
     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
@@ -1110,17 +1171,31 @@ static GstMessage *
 gst_queue2_get_buffering_message (GstQueue2 * queue)
 {
   GstMessage *msg = NULL;
-
   if (queue->percent_changed) {
-    gint percent = queue->buffering_percent;
-
+    /* Don't change the buffering level if the sinkpad is waiting for
+     * space to become available.  This prevents the situation where,
+     * upstream is pushing buffers larger than our limits so only 1 buffer
+     * is ever in the queue at a time.
+     * Changing the level causes a buffering message to be posted saying that
+     * we are buffering which the application may pause to wait for another
+     * 100% buffering message which would be posted very soon after the
+     * waiting sink thread adds it's buffer to the queue */
+    /* FIXME: This situation above can still occur later if
+     * the sink pad is waiting to push a serialized event into the queue and
+     * the queue becomes empty for a short period of time. */
+    if (!queue->waiting_del
+        && queue->last_posted_buffering_percent != queue->buffering_percent) {
+      gint percent = queue->buffering_percent;
+
+      GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
+      msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
+
+      gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
+          queue->avg_out, queue->buffering_left);
+
+      queue->last_posted_buffering_percent = percent;
+    }
     queue->percent_changed = FALSE;
-
-    GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", percent);
-    msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
-
-    gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
-        queue->avg_out, queue->buffering_left);
   }
 
   return msg;
@@ -1253,7 +1328,15 @@ update_in_rates (GstQueue2 * queue, gboolean force)
     queue->bytes_in = 0;
   }
 
-  if (queue->byte_in_rate > 0.0) {
+  if (queue->use_bitrate_query && queue->downstream_bitrate > 0) {
+    queue->cur_level.rate_time =
+        gst_util_uint64_scale (8 * queue->cur_level.bytes, GST_SECOND,
+        queue->downstream_bitrate);
+    GST_LOG_OBJECT (queue,
+        "got bitrate %u with byte level %u resulting in time %"
+        GST_TIME_FORMAT, queue->downstream_bitrate, queue->cur_level.bytes,
+        GST_TIME_ARGS (queue->cur_level.rate_time));
+  } else if (queue->byte_in_rate > 0.0) {
     queue->cur_level.rate_time =
         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
   }
@@ -1924,9 +2007,9 @@ gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
       gst_queue2_flush_temp_file (queue);
     init_ranges (queue);
   } else {
-    while (!g_queue_is_empty (&queue->queue)) {
-      GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
+    GstQueue2Item *qitem;
 
+    while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
           && GST_EVENT_IS_STICKY (qitem->item)
           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
@@ -1939,7 +2022,6 @@ gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
          data when flushing */
       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
         gst_mini_object_unref (qitem->item);
-      g_slice_free (GstQueue2Item, qitem);
     }
   }
   queue->last_query = FALSE;
@@ -2347,18 +2429,6 @@ buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
   return TRUE;
 }
 
-static gboolean
-buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
-{
-  guint *p_size = data;
-  gsize buf_size;
-
-  buf_size = gst_buffer_get_size (*buf);
-  GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
-  *p_size += buf_size;
-  return TRUE;
-}
-
 /* enqueue an item an update the level stats */
 static void
 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
@@ -2389,11 +2459,11 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
     }
   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
     GstBufferList *buffer_list;
-    guint size = 0;
+    guint size;
 
     buffer_list = GST_BUFFER_LIST_CAST (item);
 
-    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    size = gst_buffer_list_calculate_size (buffer_list);
     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
 
     /* add buffer to the statistics */
@@ -2489,10 +2559,11 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
       update_buffering (queue);
 
     if (QUEUE_IS_USING_QUEUE (queue)) {
-      GstQueue2Item *qitem = g_slice_new (GstQueue2Item);
-      qitem->type = item_type;
-      qitem->item = item;
-      g_queue_push_tail (&queue->queue, qitem);
+      GstQueue2Item qitem;
+
+      qitem.type = item_type;
+      qitem.item = item;
+      gst_queue_array_push_tail_struct (queue->queue, &qitem);
     } else {
       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
     }
@@ -2524,13 +2595,12 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
   if (!QUEUE_IS_USING_QUEUE (queue)) {
     item = gst_queue2_read_item_from_file (queue);
   } else {
-    GstQueue2Item *qitem = g_queue_pop_head (&queue->queue);
+    GstQueue2Item *qitem = gst_queue_array_pop_head_struct (queue->queue);
 
     if (qitem == NULL)
       goto no_item;
 
     item = qitem->item;
-    g_slice_free (GstQueue2Item, qitem);
   }
 
   if (item == NULL)
@@ -2584,10 +2654,10 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
     }
   } else if (GST_IS_BUFFER_LIST (item)) {
     GstBufferList *buffer_list;
-    guint size = 0;
+    guint size;
 
     buffer_list = GST_BUFFER_LIST_CAST (item);
-    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    size = gst_buffer_list_calculate_size (buffer_list);
     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
@@ -2643,7 +2713,6 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
         /* forward event */
         ret = gst_pad_push_event (queue->srcpad, event);
@@ -2681,8 +2750,6 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
     }
     case GST_EVENT_FLUSH_STOP:
     {
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
-
       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
         /* forward event */
         ret = gst_pad_push_event (queue->srcpad, event);
@@ -2712,6 +2779,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
 
         gst_event_unref (event);
       }
+      g_object_notify (G_OBJECT (queue), "bitrate");
       break;
     }
     case GST_EVENT_TAG:{
@@ -2726,12 +2794,14 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
           queue->sink_tags_bitrate = bitrate;
           GST_QUEUE2_MUTEX_UNLOCK (queue);
           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
+          g_object_notify (G_OBJECT (queue), "bitrate");
         }
       }
       /* Fall-through */
     }
     default:
       if (GST_EVENT_IS_SERIALIZED (event)) {
+        gboolean bitrate_changed = TRUE;
         /* serialized events go in the queue */
 
         /* STREAM_START and SEGMENT reset the EOS status of a
@@ -2781,6 +2851,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
                 queue->seeking = FALSE;
                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
               }
+              bitrate_changed = TRUE;
 
               break;
             default:
@@ -2791,6 +2862,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
         gst_queue2_post_buffering (queue);
+        if (bitrate_changed)
+          g_object_notify (G_OBJECT (queue), "bitrate");
       } else {
         /* non-serialized events are passed downstream. */
         ret = gst_pad_push_event (queue->srcpad, event);
@@ -2841,7 +2914,8 @@ gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
   switch (GST_QUERY_TYPE (query)) {
     default:
       if (GST_QUERY_IS_SERIALIZED (query)) {
-        GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received query %p", query);
+        GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+            "received query %" GST_PTR_FORMAT, query);
         /* serialized events go in the queue. We need to be certain that we
          * don't cause deadlocks waiting for the query return value. We check if
          * the queue is empty (nothing is blocking downstream and the query can
@@ -2902,7 +2976,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
     return queue->current->writing_pos <= queue->current->max_reading_pos;
   } else {
-    if (queue->queue.length == 0)
+    if (gst_queue_array_get_length (queue->queue) == 0)
       return TRUE;
   }
 
@@ -3155,6 +3229,7 @@ next:
           queue->src_tags_bitrate = bitrate;
           GST_QUEUE2_MUTEX_UNLOCK (queue);
           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
+          g_object_notify (G_OBJECT (queue), "bitrate");
         }
       }
     }
@@ -3359,9 +3434,13 @@ gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
               NULL);
         }
+
       }
       GST_QUEUE2_MUTEX_UNLOCK (queue);
 
+      /* force a new bitrate query to be performed */
+      query_downstream_bitrate (queue);
+
       res = gst_pad_push_event (queue->sinkpad, event);
       break;
     default:
@@ -3885,6 +3964,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
       queue->starting_segment = NULL;
       gst_event_replace (&queue->stream_start_event, NULL);
       GST_QUEUE2_MUTEX_UNLOCK (queue);
+      query_downstream_bitrate (queue);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
 #ifdef TIZEN_FEATURE_QUEUE2_MODIFICATION
@@ -4052,6 +4132,9 @@ gst_queue2_set_property (GObject * object,
     case PROP_RING_BUFFER_MAX_SIZE:
       queue->ring_buffer_max_size = g_value_get_uint64 (value);
       break;
+    case PROP_USE_BITRATE_QUERY:
+      queue->use_bitrate_query = g_value_get_boolean (value);
+      break;
 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
     case PROP_BUFFER_MODE:
       queue->mode = g_value_get_enum (value);
@@ -4141,6 +4224,23 @@ gst_queue2_get_property (GObject * object,
       g_value_set_int64 (value, (gint64) in_rate);
       break;
     }
+    case PROP_USE_BITRATE_QUERY:
+      g_value_set_boolean (value, queue->use_bitrate_query);
+      break;
+    case PROP_BITRATE:{
+      guint64 bitrate = 0;
+      if (bitrate == 0 && queue->use_tags_bitrate) {
+        if (queue->sink_tags_bitrate > 0)
+          bitrate = queue->sink_tags_bitrate;
+        else if (queue->src_tags_bitrate)
+          bitrate = queue->src_tags_bitrate;
+      }
+      if (bitrate == 0 && queue->use_bitrate_query) {
+        bitrate = queue->downstream_bitrate;
+      }
+      g_value_set_uint64 (value, (guint64) bitrate);
+      break;
+    }
 #ifdef TIZEN_FEATURE_RTSPSRC_MODIFICATION
     case PROP_BUFFER_MODE:
       g_value_set_enum (value, queue->mode);