queue2: add bufferlist support
authorTim-Philipp Müller <tim.muller@collabora.co.uk>
Thu, 3 Nov 2011 10:34:49 +0000 (10:34 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Mon, 28 Nov 2011 00:16:40 +0000 (00:16 +0000)
We want to maintain buffer lists if possible.

plugins/elements/gstqueue2.c

index 80a9857..9378d57 100644 (file)
@@ -225,6 +225,8 @@ static void gst_queue2_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
 
 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain_list (GstPad * pad,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
     guint size, GstCaps * caps, GstBuffer ** buf);
 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
@@ -259,6 +261,7 @@ typedef enum
 {
   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
   GST_QUEUE2_ITEM_TYPE_BUFFER,
+  GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
   GST_QUEUE2_ITEM_TYPE_EVENT
 } GstQueue2ItemType;
 
@@ -392,6 +395,8 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
 
   gst_pad_set_chain_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_chain));
+  gst_pad_set_chain_list_function (queue->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
   gst_pad_set_activatepush_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
   gst_pad_set_event_function (queue->sinkpad,
@@ -801,6 +806,52 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
   update_time_level (queue);
 }
 
+static GstBufferListItem
+buffer_list_apply_time (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+  GstClockTime *timestamp = data;
+
+  GST_TRACE ("buffer %u in group %u has ts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx, group,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+    *timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buf))
+    *timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  timestamp = segment->last_stop;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+
+  if (is_sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
 static void
 update_buffering (GstQueue2 * queue)
 {
@@ -1813,6 +1864,35 @@ handle_error:
   }
 }
 
+static GstBufferListItem
+buffer_list_create_write (GstBuffer ** buf, guint group, guint idx, gpointer q)
+{
+  GstQueue2 *queue = q;
+
+  GST_TRACE_OBJECT (queue, "writing buffer %u in group %u of size %u bytes",
+      idx, group, GST_BUFFER_SIZE (*buf));
+
+  if (!gst_queue2_create_write (queue, *buf)) {
+    GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
+    return GST_BUFFER_LIST_END;
+  }
+
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
+static GstBufferListItem
+buffer_list_calc_size (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+  guint *p_size = data;
+  guint buf_size;
+
+  buf_size = GST_BUFFER_SIZE (*buf);
+  GST_TRACE ("buffer %u in group %u has size %u", idx, group, buf_size);
+  *p_size += buf_size;
+
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
 /* enqueue an item an update the level stats */
 static void
 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
@@ -1841,6 +1921,31 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
       /* FIXME - check return value? */
       gst_queue2_create_write (queue, buffer);
     }
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+    guint size = 0;
+
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
+
+    /* add buffer to the statistics */
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers++;
+      queue->cur_level.bytes += size;
+    }
+    queue->bytes_in += size;
+
+    /* apply new buffer to segment stats */
+    apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+    /* update the byterate stats */
+    update_in_rates (queue);
+
+    if (!QUEUE_IS_USING_QUEUE (queue)) {
+      gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
+    }
   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
     GstEvent *event;
 
@@ -1968,6 +2073,30 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
       default:
         break;
     }
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list;
+    guint size = 0;
+
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
+
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers--;
+      queue->cur_level.bytes -= size;
+    }
+    queue->bytes_out += size;
+
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+    /* update the byterate stats */
+    update_out_rates (queue);
+    /* update the buffering */
+    if (queue->use_buffering)
+      update_buffering (queue);
+
   } else {
     g_warning
         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
@@ -2154,18 +2283,9 @@ gst_queue2_is_filled (GstQueue2 * queue)
 }
 
 static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
+    GstMiniObject * item, GstQueue2ItemType item_type)
 {
-  GstQueue2 *queue;
-
-  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
-
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
   /* we have to lock the queue since we span threads */
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
   /* when we received EOS, we refuse more data */
@@ -2179,7 +2299,7 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
     goto out_flushing;
 
   /* put buffer in queue now */
-  gst_queue2_locked_enqueue (queue, buffer, GST_QUEUE2_ITEM_TYPE_BUFFER);
+  gst_queue2_locked_enqueue (queue, item, item_type);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -2192,7 +2312,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return ret;
   }
@@ -2200,7 +2320,7 @@ out_eos:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_UNEXPECTED;
   }
@@ -2209,12 +2329,43 @@ out_unexpected:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because we received UNEXPECTED");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_UNEXPECTED;
   }
 }
 
+static GstFlowReturn
+gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
+      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
+}
+
+static GstFlowReturn
+gst_queue2_chain_list (GstPad * pad, GstBufferList * buffer_list)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "received buffer list %p", buffer_list);
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
+}
+
 static GstMiniObject *
 gst_queue2_dequeue_on_unexpected (GstQueue2 * queue,
     GstQueue2ItemType * item_type)
@@ -2312,6 +2463,30 @@ next:
     }
 
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+    GstBuffer *first_buf;
+    GstCaps *caps;
+
+    buffer_list = GST_BUFFER_LIST_CAST (data);
+
+    first_buf = gst_buffer_list_get (buffer_list, 0, 0);
+    caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
+
+    /* set caps before pushing the buffer so that core does not try to do
+     * something fancy to check if this is possible. */
+    if (caps && caps != GST_PAD_CAPS (queue->srcpad))
+      gst_pad_set_caps (queue->srcpad, caps);
+
+    result = gst_pad_push_list (queue->srcpad, buffer_list);
+
+    /* need to check for srcresult here as well */
+    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+    if (result == GST_FLOW_UNEXPECTED) {
+      data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+      if (data != NULL)
+        goto next;
+    }
   }
   return result;