queue: Add support for buffer lists
authorSebastian Dröge <sebastian@centricular.com>
Tue, 17 Feb 2015 09:44:40 +0000 (11:44 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 18 Feb 2015 09:03:08 +0000 (11:03 +0200)
plugins/elements/gstqueue.c

index ebd88cd..eb3ed95 100644 (file)
@@ -195,6 +195,8 @@ static void gst_queue_get_property (GObject * object,
 
 static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buffer);
+static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue_push_one (GstQueue * queue);
 static void gst_queue_loop (GstPad * pad);
 
@@ -417,6 +419,7 @@ gst_queue_class_init (GstQueueClass * klass)
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
 }
 
 static void
@@ -425,6 +428,7 @@ gst_queue_init (GstQueue * queue)
   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
 
   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
+  gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
   gst_pad_set_activatemode_function (queue->sinkpad,
       gst_queue_sink_activate_mode);
   gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event);
@@ -622,6 +626,60 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
   update_time_level (queue);
 }
 
+typedef struct
+{
+  GstClockTime timestamp;
+  gboolean with_duration;
+} BufferListApplyTimeData;
+
+static gboolean
+buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
+{
+  BufferListApplyTimeData *data = user_data;
+
+  GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+    data->timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+  if (data->with_duration && GST_BUFFER_DURATION_IS_VALID (*buf))
+    data->timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (data->timestamp));
+
+  return TRUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean with_duration, gboolean sink)
+{
+  BufferListApplyTimeData data;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  data.timestamp = segment->position;
+  data.with_duration = with_duration;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &data);
+
+  GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (data.timestamp));
+
+  segment->position = data.timestamp;
+
+  if (sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
 static void
 gst_queue_locked_flush (GstQueue * queue, gboolean full)
 {
@@ -678,6 +736,40 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
+static gboolean
+buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
+{
+  guint *p_size = data;
+  gsize buf_size;
+
+  buf_size = gst_buffer_get_size (*buf);
+  GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
+  *p_size += buf_size;
+  return TRUE;
+}
+
+static inline void
+gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
+{
+  GstQueueItem *qitem;
+  GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+  gsize bsize = 0;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize);
+
+  /* add buffer to the statistics */
+  queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
+  queue->cur_level.bytes += bsize;
+  apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE, TRUE);
+
+  qitem = g_slice_new (GstQueueItem);
+  qitem->item = item;
+  qitem->is_query = FALSE;
+  qitem->size = bsize;
+  gst_queue_array_push_tail (queue->queue, qitem);
+  GST_QUEUE_SIGNAL_ADD (queue);
+}
+
 static inline void
 gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
 {
@@ -751,7 +843,19 @@ gst_queue_locked_dequeue (GstQueue * queue)
     /* if the queue is empty now, update the other side */
     if (queue->cur_level.buffers == 0)
       queue->cur_level.time = 0;
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
 
+    queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
+    queue->cur_level.bytes -= bufsize;
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, TRUE, FALSE);
+
+    /* if the queue is empty now, update the other side */
+    if (queue->cur_level.buffers == 0)
+      queue->cur_level.time = 0;
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
@@ -1014,11 +1118,27 @@ gst_queue_leak_downstream (GstQueue * queue)
   }
 }
 
+static gboolean
+discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
+{
+  GstQueue *queue = user_data;
+  GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
+
+  if (subbuffer) {
+    *buffer = subbuffer;
+    GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
+  } else {
+    GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+  }
+
+  return FALSE;
+}
+
 static GstFlowReturn
-gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
+    GstMiniObject * obj, gboolean is_list)
 {
   GstQueue *queue;
-  GstClockTime duration, timestamp;
 
   queue = GST_QUEUE_CAST (parent);
 
@@ -1030,13 +1150,22 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   if (queue->unexpected)
     goto out_unexpected;
 
-  timestamp = GST_BUFFER_TIMESTAMP (buffer);
-  duration = GST_BUFFER_DURATION (buffer);
+  if (!is_list) {
+    GstClockTime duration, timestamp;
+    GstBuffer *buffer = GST_BUFFER_CAST (obj);
 
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
-      G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
-      GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+    timestamp = GST_BUFFER_TIMESTAMP (buffer);
+    duration = GST_BUFFER_DURATION (buffer);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
+        G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+        GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
+        GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
+  } else {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "received buffer list %p with %u buffers", obj,
+        gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
+  }
 
   /* We make space available if we're "full" according to whatever
    * the user defined as "full". Note that this only applies to buffers.
@@ -1091,19 +1220,33 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   }
 
   if (queue->tail_needs_discont) {
-    GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+    if (!is_list) {
+      GstBuffer *buffer = GST_BUFFER_CAST (obj);
+      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+      if (subbuffer) {
+        buffer = subbuffer;
+        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      } else {
+        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      }
 
-    if (subbuffer) {
-      buffer = subbuffer;
-      GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+      obj = GST_MINI_OBJECT_CAST (buffer);
     } else {
-      GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
+
+      buffer_list = gst_buffer_list_make_writable (buffer_list);
+      gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+      obj = GST_MINI_OBJECT_CAST (buffer_list);
     }
     queue->tail_needs_discont = FALSE;
   }
 
   /* put buffer in queue now */
-  gst_queue_locked_enqueue_buffer (queue, buffer);
+  if (is_list)
+    gst_queue_locked_enqueue_buffer_list (queue, obj);
+  else
+    gst_queue_locked_enqueue_buffer (queue, obj);
   GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -1113,7 +1256,7 @@ out_unref:
   {
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_OK;
   }
@@ -1124,7 +1267,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return ret;
   }
@@ -1133,7 +1276,7 @@ out_eos:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
@@ -1142,12 +1285,27 @@ out_unexpected:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (obj);
 
     return GST_FLOW_EOS;
   }
 }
 
+static GstFlowReturn
+gst_queue_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer_list), TRUE);
+}
+
+static GstFlowReturn
+gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  return gst_queue_chain_buffer_or_list (pad, parent,
+      GST_MINI_OBJECT_CAST (buffer), FALSE);
+}
+
 /* dequeue an item from the queue an push it downstream. This functions returns
  * the result of the push. */
 static GstFlowReturn
@@ -1155,31 +1313,49 @@ gst_queue_push_one (GstQueue * queue)
 {
   GstFlowReturn result = queue->srcresult;
   GstMiniObject *data;
+  gboolean is_list;
 
   data = gst_queue_locked_dequeue (queue);
   if (data == NULL)
     goto no_item;
 
 next:
-  if (GST_IS_BUFFER (data)) {
-    GstBuffer *buffer;
+  is_list = GST_IS_BUFFER_LIST (data);
 
-    buffer = GST_BUFFER_CAST (data);
+  if (GST_IS_BUFFER (data) || is_list) {
+    if (!is_list) {
+      GstBuffer *buffer;
 
-    if (queue->head_needs_discont) {
-      GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+      buffer = GST_BUFFER_CAST (data);
 
-      if (subbuffer) {
-        buffer = subbuffer;
-        GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
-      } else {
-        GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+      if (queue->head_needs_discont) {
+        GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
+
+        if (subbuffer) {
+          buffer = subbuffer;
+          GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+        } else {
+          GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
+        }
+        queue->head_needs_discont = FALSE;
       }
-      queue->head_needs_discont = FALSE;
-    }
 
-    GST_QUEUE_MUTEX_UNLOCK (queue);
-    result = gst_pad_push (queue->srcpad, buffer);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push (queue->srcpad, buffer);
+    } else {
+      GstBufferList *buffer_list;
+
+      buffer_list = GST_BUFFER_LIST_CAST (data);
+
+      if (queue->head_needs_discont) {
+        buffer_list = gst_buffer_list_make_writable (buffer_list);
+        gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
+        queue->head_needs_discont = FALSE;
+      }
+
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+      result = gst_pad_push_list (queue->srcpad, buffer_list);
+    }
 
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
@@ -1195,6 +1371,10 @@ next:
           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
               "dropping EOS buffer %p", data);
           gst_buffer_unref (GST_BUFFER_CAST (data));
+        } else if (GST_IS_BUFFER_LIST (data)) {
+          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+              "dropping EOS buffer list %p", data);
+          gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
         } else if (GST_IS_EVENT (data)) {
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);