aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
index 4e59723..e7e3815 100644 (file)
 #include <gst/glib-compat-private.h>
 
 #include "gstbasesrc.h"
-#include "gsttypefindhelper.h"
 #include <gst/gst-i18n-lib.h>
 
 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
@@ -189,7 +188,6 @@ enum
 
 #define DEFAULT_BLOCKSIZE       4096
 #define DEFAULT_NUM_BUFFERS     -1
-#define DEFAULT_TYPEFIND        FALSE
 #define DEFAULT_DO_TIMESTAMP    FALSE
 
 enum
@@ -197,13 +195,12 @@ enum
   PROP_0,
   PROP_BLOCKSIZE,
   PROP_NUM_BUFFERS,
+#ifndef GST_REMOVE_DEPRECATED
   PROP_TYPEFIND,
+#endif
   PROP_DO_TIMESTAMP
 };
 
-#define GST_BASE_SRC_GET_PRIVATE(obj)  \
-   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
-
 /* The basesrc implementation need to respect the following locking order:
  *   1. STREAM_LOCK
  *   2. LIVE_LOCK
@@ -253,7 +250,6 @@ struct _GstBaseSrcPrivate
   volatile gint have_events;    /* OBJECT_LOCK */
 
   /* QoS *//* with LOCK */
-  gboolean qos_enabled;         /* unused */
   gdouble proportion;           /* OBJECT_LOCK */
   GstClockTime earliest_time;   /* OBJECT_LOCK */
 
@@ -262,9 +258,16 @@ struct _GstBaseSrcPrivate
   GstAllocationParams params;   /* OBJECT_LOCK */
 
   GCond async_cond;             /* OBJECT_LOCK */
+
+  /* for _submit_buffer_list() */
+  GstBufferList *pending_bufferlist;
 };
 
+#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
+    ((src)->priv->pending_bufferlist != NULL)
+
 static GstElementClass *parent_class = NULL;
+static gint private_offset = 0;
 
 static void gst_base_src_class_init (GstBaseSrcClass * klass);
 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
@@ -292,11 +295,21 @@ gst_base_src_get_type (void)
 
     _type = g_type_register_static (GST_TYPE_ELEMENT,
         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
+
+    private_offset =
+        g_type_add_instance_private (_type, sizeof (GstBaseSrcPrivate));
+
     g_once_init_leave (&base_src_type, _type);
   }
   return base_src_type;
 }
 
+static inline GstBaseSrcPrivate *
+gst_base_src_get_instance_private (GstBaseSrc * self)
+{
+  return (G_STRUCT_MEMBER_P (self, private_offset));
+}
+
 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
     GstCaps * filter);
 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
@@ -317,8 +330,8 @@ static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
-static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
-    gboolean active);
+static void gst_base_src_set_pool_flushing (GstBaseSrc * basesrc,
+    gboolean flushing);
 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
     GstSegment * segment);
@@ -360,9 +373,10 @@ gst_base_src_class_init (GstBaseSrcClass * klass)
   gobject_class = G_OBJECT_CLASS (klass);
   gstelement_class = GST_ELEMENT_CLASS (klass);
 
-  GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
+  if (private_offset != 0)
+    g_type_class_adjust_private_offset (klass, &private_offset);
 
-  g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
+  GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
 
   parent_class = g_type_class_peek_parent (klass);
 
@@ -379,10 +393,12 @@ gst_base_src_class_init (GstBaseSrcClass * klass)
           "Number of buffers to output before sending EOS (-1 = unlimited)",
           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
           G_PARAM_STATIC_STRINGS));
+#ifndef GST_REMOVE_DEPRECATED
   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
       g_param_spec_boolean ("typefind", "Typefind",
-          "Run typefind before negotiating", DEFAULT_TYPEFIND,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Run typefind before negotiating (deprecated, non-functional)", FALSE,
+          G_PARAM_READWRITE | G_PARAM_DEPRECATED | G_PARAM_STATIC_STRINGS));
+#endif
   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
       g_param_spec_boolean ("do-timestamp", "Do timestamp",
           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
@@ -419,7 +435,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   GstPad *pad;
   GstPadTemplate *pad_template;
 
-  basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
+  basesrc->priv = gst_base_src_get_instance_private (basesrc);
 
   basesrc->is_live = FALSE;
   g_mutex_init (&basesrc->live_lock);
@@ -452,7 +468,6 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   basesrc->clock_id = NULL;
   /* we operate in BYTES by default */
   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
-  basesrc->typefind = DEFAULT_TYPEFIND;
   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
 
@@ -641,6 +656,12 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
  * that can't return an authoritative size and only know that they're EOS
  * when trying to read more should set this to %FALSE.
  *
+ * When @src operates in %GST_FORMAT_TIME, #GstBaseSrc will send an EOS
+ * when a buffer outside of the currently configured segment is pushed if
+ * @automatic_eos is %TRUE. Since 1.16, if @automatic_eos is %FALSE an
+ * EOS will be pushed only when the #GstBaseSrc.create implementation
+ * returns %GST_FLOW_EOS.
+ *
  * Since: 1.4
  */
 void
@@ -1876,13 +1897,14 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
         src->priv->pending_eos = event;
         GST_OBJECT_UNLOCK (src);
 
-        gst_base_src_activate_pool (src, FALSE);
+        gst_base_src_set_pool_flushing (src, TRUE);
         if (bclass->unlock)
           bclass->unlock (src);
 
         GST_PAD_STREAM_LOCK (src->srcpad);
         if (bclass->unlock_stop)
           bclass->unlock_stop (src);
+        gst_base_src_set_pool_flushing (src, TRUE);
         GST_PAD_STREAM_UNLOCK (src->srcpad);
       }
 
@@ -1895,6 +1917,7 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
       /* sending random SEGMENT downstream can break sync. */
       break;
     case GST_EVENT_TAG:
+    case GST_EVENT_SINK_MESSAGE:
     case GST_EVENT_CUSTOM_DOWNSTREAM:
     case GST_EVENT_CUSTOM_BOTH:
     case GST_EVENT_PROTECTION:
@@ -2108,9 +2131,11 @@ gst_base_src_set_property (GObject * object, guint prop_id,
     case PROP_NUM_BUFFERS:
       src->num_buffers = g_value_get_int (value);
       break;
+#ifndef GST_REMOVE_DEPRECATED
     case PROP_TYPEFIND:
       src->typefind = g_value_get_boolean (value);
       break;
+#endif
     case PROP_DO_TIMESTAMP:
       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
       break;
@@ -2135,9 +2160,11 @@ gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_NUM_BUFFERS:
       g_value_set_int (value, src->num_buffers);
       break;
+#ifndef GST_REMOVE_DEPRECATED
     case PROP_TYPEFIND:
       g_value_set_boolean (value, src->typefind);
       break;
+#endif
     case PROP_DO_TIMESTAMP:
       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
       break;
@@ -2430,7 +2457,7 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
   /* ERRORS */
 unexpected_length:
   {
-    GST_WARNING_OBJECT (src, "processing at or past EOS");
+    GST_DEBUG_OBJECT (src, "processing at or past EOS");
     return FALSE;
   }
 }
@@ -2445,6 +2472,7 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
   GstClockReturn status;
   GstBuffer *res_buf;
   GstBuffer *in_buf;
+  gboolean own_res_buf;
 
   bclass = GST_BASE_SRC_GET_CLASS (src);
 
@@ -2493,6 +2521,7 @@ again:
       G_GINT64_FORMAT, offset, length, src->segment.time);
 
   res_buf = in_buf = *buf;
+  own_res_buf = (*buf == NULL);
 
   GST_LIVE_UNLOCK (src);
   ret = bclass->create (src, offset, length, &res_buf);
@@ -2504,7 +2533,7 @@ again:
       GstFlowReturn wait_ret;
       wait_ret = gst_base_src_wait_playing_unlocked (src);
       if (wait_ret != GST_FLOW_OK) {
-        if (ret == GST_FLOW_OK && *buf == NULL)
+        if (ret == GST_FLOW_OK && own_res_buf)
           gst_buffer_unref (res_buf);
         ret = wait_ret;
         goto stopped;
@@ -2517,7 +2546,7 @@ again:
    * discard when the create function returned _OK. */
   if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
     if (ret == GST_FLOW_OK) {
-      if (*buf == NULL)
+      if (own_res_buf)
         gst_buffer_unref (res_buf);
     }
     src->priv->forced_eos = TRUE;
@@ -2548,6 +2577,16 @@ again:
     res_buf = in_buf;
   }
 
+  if (res_buf == NULL) {
+    GstBufferList *pending_list = src->priv->pending_bufferlist;
+
+    if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
+      goto null_buffer;
+
+    res_buf = gst_buffer_list_get_writable (pending_list, 0);
+    own_res_buf = FALSE;
+  }
+
   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
   if (offset == 0 && src->segment.time == 0
       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
@@ -2576,7 +2615,7 @@ again:
       /* this case is triggered when we were waiting for the clock and
        * it got unlocked because we did a state change. In any case, get rid of
        * the buffer. */
-      if (*buf == NULL)
+      if (own_res_buf)
         gst_buffer_unref (res_buf);
 
       if (!src->live_running) {
@@ -2598,7 +2637,7 @@ again:
       GST_ELEMENT_ERROR (src, CORE, CLOCK,
           (_("Internal clock error.")),
           ("clock returned unexpected return value %d", status));
-      if (*buf == NULL)
+      if (own_res_buf)
         gst_buffer_unref (res_buf);
       ret = GST_FLOW_ERROR;
       break;
@@ -2626,7 +2665,7 @@ map_failed:
     GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
         (_("Failed to map buffer.")),
         ("failed to map result buffer in WRITE mode"));
-    if (*buf == NULL)
+    if (own_res_buf)
       gst_buffer_unref (res_buf);
     return GST_FLOW_ERROR;
   }
@@ -2654,7 +2693,7 @@ reached_num_buffers:
 flushing:
   {
     GST_DEBUG_OBJECT (src, "we are flushing");
-    if (*buf == NULL)
+    if (own_res_buf)
       gst_buffer_unref (res_buf);
     return GST_FLOW_FLUSHING;
   }
@@ -2663,6 +2702,14 @@ eos:
     GST_DEBUG_OBJECT (src, "we are EOS");
     return GST_FLOW_EOS;
   }
+null_buffer:
+  {
+    GST_ELEMENT_ERROR (src, STREAM, FAILED,
+        (_("Internal data flow error.")),
+        ("Subclass %s neither returned a buffer nor submitted a buffer list "
+            "from its create function", G_OBJECT_TYPE_NAME (src)));
+    return GST_FLOW_ERROR;
+  }
 }
 
 static GstFlowReturn
@@ -2795,6 +2842,12 @@ gst_base_src_loop (GstPad * pad)
   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
       GST_TIME_ARGS (position), blocksize);
 
+  /* clean up just in case we got interrupted or so last time round */
+  if (src->priv->pending_bufferlist != NULL) {
+    gst_buffer_list_unref (src->priv->pending_bufferlist);
+    src->priv->pending_bufferlist = NULL;
+  }
+
   ret = gst_base_src_get_range (src, position, blocksize, &buf);
   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
@@ -2802,9 +2855,11 @@ gst_base_src_loop (GstPad * pad)
     GST_LIVE_UNLOCK (src);
     goto pause;
   }
-  /* this should not happen */
-  if (G_UNLIKELY (buf == NULL))
-    goto null_buffer;
+
+  /* Note: at this point buf might be a single buf returned which we own or
+   * the first buf of a pending buffer list submitted via submit_buffer_list(),
+   * in which case the buffer is owned by the pending buffer list and not us. */
+  g_assert (buf != NULL);
 
   /* push events to close/start our segment before we push the buffer. */
   if (G_UNLIKELY (src->priv->segment_pending)) {
@@ -2882,7 +2937,8 @@ gst_base_src_loop (GstPad * pad)
       /* positive rate, check if we reached the stop */
       if (src->segment.stop != -1) {
         if (position >= src->segment.stop) {
-          eos = TRUE;
+          if (g_atomic_int_get (&src->priv->automatic_eos))
+            eos = TRUE;
           position = src->segment.stop;
         }
       }
@@ -2890,7 +2946,8 @@ gst_base_src_loop (GstPad * pad)
       /* negative rate, check if we reached the start. start is always set to
        * something different from -1 */
       if (position <= src->segment.start) {
-        eos = TRUE;
+        if (g_atomic_int_get (&src->priv->automatic_eos))
+          eos = TRUE;
         position = src->segment.start;
       }
       /* when going reverse, all buffers are DISCONT */
@@ -2909,7 +2966,14 @@ gst_base_src_loop (GstPad * pad)
   }
   GST_LIVE_UNLOCK (src);
 
-  ret = gst_pad_push (pad, buf);
+  /* push buffer or buffer list */
+  if (src->priv->pending_bufferlist != NULL) {
+    ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
+    src->priv->pending_bufferlist = NULL;
+  } else {
+    ret = gst_pad_push (pad, buf);
+  }
+
   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
     if (ret == GST_FLOW_NOT_NEGOTIATED) {
       goto not_negotiated;
@@ -3010,13 +3074,6 @@ pause:
     }
     goto done;
   }
-null_buffer:
-  {
-    GST_ELEMENT_ERROR (src, STREAM, FAILED,
-        (_("Internal data flow error.")), ("element returned NULL buffer"));
-    GST_LIVE_UNLOCK (src);
-    goto done;
-  }
 }
 
 static gboolean
@@ -3072,12 +3129,11 @@ activate_failed:
   }
 }
 
-static gboolean
-gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
+static void
+gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, gboolean flushing)
 {
   GstBaseSrcPrivate *priv = basesrc->priv;
   GstBufferPool *pool;
-  gboolean res = TRUE;
 
   GST_OBJECT_LOCK (basesrc);
   if ((pool = priv->pool))
@@ -3085,10 +3141,9 @@ gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
   GST_OBJECT_UNLOCK (basesrc);
 
   if (pool) {
-    res = gst_buffer_pool_set_active (pool, active);
+    gst_buffer_pool_set_flushing (pool, flushing);
     gst_object_unref (pool);
   }
-  return res;
 }
 
 
@@ -3474,10 +3529,6 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
 
   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
 
-  /* stop flushing now but for live sources, still block in the LIVE lock when
-   * we are not yet PLAYING */
-  gst_base_src_set_flushing (basesrc, FALSE);
-
   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
 
   GST_OBJECT_LOCK (basesrc->srcpad);
@@ -3596,8 +3647,11 @@ gst_base_src_stop (GstBaseSrc * basesrc)
 
   /* flush all */
   gst_base_src_set_flushing (basesrc, TRUE);
+
   /* stop the task */
   gst_pad_stop_task (basesrc->srcpad);
+  /* stop flushing, this will balance unlock/unlock_stop calls */
+  gst_base_src_set_flushing (basesrc, FALSE);
 
   GST_OBJECT_LOCK (basesrc);
   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
@@ -3613,6 +3667,11 @@ gst_base_src_stop (GstBaseSrc * basesrc)
   if (bclass->stop)
     result = bclass->stop (basesrc);
 
+  if (basesrc->priv->pending_bufferlist != NULL) {
+    gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
+    basesrc->priv->pending_bufferlist = NULL;
+  }
+
   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
 
   return result;
@@ -3637,7 +3696,7 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
   GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
 
   if (flushing) {
-    gst_base_src_activate_pool (basesrc, FALSE);
+    gst_base_src_set_pool_flushing (basesrc, TRUE);
     /* unlock any subclasses to allow turning off the streaming thread */
     if (bclass->unlock)
       bclass->unlock (basesrc);
@@ -3660,7 +3719,7 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
     if (basesrc->clock_id)
       gst_clock_id_unschedule (basesrc->clock_id);
   } else {
-    gst_base_src_activate_pool (basesrc, TRUE);
+    gst_base_src_set_pool_flushing (basesrc, FALSE);
 
     /* Drop all delayed events */
     GST_OBJECT_LOCK (basesrc);
@@ -3953,3 +4012,41 @@ gst_base_src_get_allocator (GstBaseSrc * src,
     *params = src->priv->params;
   GST_OBJECT_UNLOCK (src);
 }
+
+/**
+ * gst_base_src_submit_buffer_list:
+ * @src: a #GstBaseSrc
+ * @buffer_list: (transfer full): a #GstBufferList
+ *
+ * Subclasses can call this from their create virtual method implementation
+ * to submit a buffer list to be pushed out later. This is useful in
+ * cases where the create function wants to produce multiple buffers to be
+ * pushed out in one go in form of a #GstBufferList, which can reduce overhead
+ * drastically, especially for packetised inputs (for data streams where
+ * the packetisation/chunking is not important it is usually more efficient
+ * to return larger buffers instead).
+ *
+ * Subclasses that use this function from their create function must return
+ * %GST_FLOW_OK and no buffer from their create virtual method implementation.
+ * If a buffer is returned after a buffer list has also been submitted via this
+ * function the behaviour is undefined.
+ *
+ * Subclasses must only call this function once per create function call and
+ * subclasses must only call this function when the source operates in push
+ * mode.
+ *
+ * Since: 1.14
+ */
+void
+gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+  g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
+  g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
+
+  /* we need it to be writable later in get_range() where we use get_writable */
+  src->priv->pending_bufferlist = gst_buffer_list_make_writable (buffer_list);
+
+  GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
+      gst_buffer_list_length (buffer_list));
+}