aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstcollectpads.c
index 1973cdb..0c412aa 100644 (file)
  */
 /**
  * SECTION:gstcollectpads
+ * @title: GstCollectPads
  * @short_description: manages a set of pads that operate in collect mode
  * @see_also:
  *
  * Manages a set of pads that operate in collect mode. This means that control
  * is given to the manager of this object when all pads have data.
- * <itemizedlist>
- *   <listitem><para>
- *     Collectpads are created with gst_collect_pads_new(). A callback should then
+ *
+ *   * Collectpads are created with gst_collect_pads_new(). A callback should then
  *     be installed with gst_collect_pads_set_function ().
- *   </para></listitem>
- *   <listitem><para>
- *     Pads are added to the collection with gst_collect_pads_add_pad()/
- *     gst_collect_pads_remove_pad(). The pad
- *     has to be a sinkpad. The chain and event functions of the pad are
- *     overridden. The element_private of the pad is used to store
- *     private information for the collectpads.
- *   </para></listitem>
- *   <listitem><para>
- *     For each pad, data is queued in the _chain function or by
+ *
+ *   * Pads are added to the collection with gst_collect_pads_add_pad()/
+ *     gst_collect_pads_remove_pad(). The pad has to be a sinkpad. When added,
+ *     the chain, event and query functions of the pad are overridden. The
+ *     element_private of the pad is used to store private information for the
+ *     collectpads.
+ *
+ *   * For each pad, data is queued in the _chain function or by
  *     performing a pull_range.
- *   </para></listitem>
- *   <listitem><para>
- *     When data is queued on all pads in waiting mode, the callback function is called.
- *   </para></listitem>
- *   <listitem><para>
- *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
+ *
+ *   * When data is queued on all pads in waiting mode, the callback function is called.
+ *
+ *   * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
  *     One can peek at the data with the gst_collect_pads_peek() function.
  *     These functions will return %NULL if the pad received an EOS event. When all
  *     pads return %NULL from a gst_collect_pads_peek(), the element can emit an EOS
  *     event itself.
- *   </para></listitem>
- *   <listitem><para>
- *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
+ *
+ *   * Data can also be dequeued in byte units using the gst_collect_pads_available(),
  *     gst_collect_pads_read_buffer() and gst_collect_pads_flush() calls.
- *   </para></listitem>
- *   <listitem><para>
- *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
+ *
+ *   * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
  *     their state change functions to start and stop the processing of the collectpads.
  *     The gst_collect_pads_stop() call should be called before calling the parent
  *     element state change function in the PAUSED_TO_READY state change to ensure
  *     no pad is blocked and the element can finish streaming.
- *   </para></listitem>
- *   <listitem><para>
- *     gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
+ *
+ *   * gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
  *     CollectPads element is not waiting for data to be collected on non-waiting pads.
  *     Thus these pads may but need not have data when the callback is called.
  *     All pads are in waiting mode by default.
- *   </para></listitem>
- * </itemizedlist>
+ *
  */
 
 #ifdef HAVE_CONFIG_H
@@ -86,9 +78,6 @@
 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
 #define GST_CAT_DEFAULT collect_pads_debug
 
-#define parent_class gst_collect_pads_parent_class
-G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
-
 struct _GstCollectDataPrivate
 {
   /* refcounting for struct, and destroy callback */
@@ -102,7 +91,7 @@ struct _GstCollectPadsPrivate
   gboolean started;
 
   /* with STREAM_LOCK */
-  guint32 cookie;               /* @data list cookie */
+  guint32 cookie;               /* pad_list cookie */
   guint numpads;                /* number of pads in @data */
   guint queuedpads;             /* number of pads with a buffer */
   guint eospads;                /* number of pads that are EOS */
@@ -110,7 +99,7 @@ struct _GstCollectPadsPrivate
   GstCollectData *earliest_data;        /* Pad data for current earliest time */
 
   /* with LOCK */
-  GSList *pad_list;             /* updated pad list */
+  GSList *pad_list;             /* list of GstCollectData* */
   guint32 pad_cookie;           /* updated cookie */
 
   GstCollectPadsFunction func;  /* function and user_data for callback */
@@ -138,6 +127,9 @@ struct _GstCollectPadsPrivate
   gboolean pending_flush_stop;
 };
 
+#define parent_class gst_collect_pads_parent_class
+G_DEFINE_TYPE_WITH_PRIVATE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
+
 static void gst_collect_pads_clear (GstCollectPads * pads,
     GstCollectData * data);
 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
@@ -211,8 +203,6 @@ gst_collect_pads_class_init (GstCollectPadsClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
 
-  g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
-
   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
       "GstCollectPads");
 
@@ -222,9 +212,7 @@ gst_collect_pads_class_init (GstCollectPadsClass * klass)
 static void
 gst_collect_pads_init (GstCollectPads * pads)
 {
-  pads->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS,
-      GstCollectPadsPrivate);
+  pads->priv = gst_collect_pads_get_instance_private (pads);
 
   pads->data = NULL;
   pads->priv->cookie = 0;
@@ -263,9 +251,6 @@ gst_collect_pads_init (GstCollectPads * pads)
   pads->priv->seeking = FALSE;
   pads->priv->pending_flush_start = FALSE;
   pads->priv->pending_flush_stop = FALSE;
-
-  /* clear floating flag */
-  gst_object_ref_sink (pads);
 }
 
 static void
@@ -305,6 +290,9 @@ gst_collect_pads_new (void)
 
   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
 
+  /* clear floating flag */
+  gst_object_ref_sink (newcoll);
+
   return newcoll;
 }
 
@@ -320,7 +308,7 @@ gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_buffer_function:
  * @pads: the collectpads to use
- * @func: the function to set
+ * @func: (scope call): the function to set
  * @user_data: (closure): user data passed to the function
  *
  * Set the callback function and user data that will be called with
@@ -345,7 +333,7 @@ gst_collect_pads_set_buffer_function (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_compare_function:
  * @pads: the pads to use
- * @func: the function to set
+ * @func: (scope call): the function to set
  * @user_data: (closure): user data passed to the function
  *
  * Set the timestamp comparison function.
@@ -354,7 +342,7 @@ gst_collect_pads_set_buffer_function (GstCollectPads * pads,
  */
 /* NOTE allowing to change comparison seems not advisable;
 no known use-case, and collaboration with default algorithm is unpredictable.
-If custom compairing/operation is needed, just use a collect function of
+If custom comparing/operation is needed, just use a collect function of
 your own */
 void
 gst_collect_pads_set_compare_function (GstCollectPads * pads,
@@ -372,7 +360,7 @@ gst_collect_pads_set_compare_function (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_function:
  * @pads: the collectpads to use
- * @func: the function to set
+ * @func: (scope call): the function to set
  * @user_data: user data passed to the function
  *
  * CollectPads provides a default collection algorithm that will determine
@@ -432,7 +420,7 @@ unref_data (GstCollectData * data)
 /**
  * gst_collect_pads_set_event_function:
  * @pads: the collectpads to use
- * @func: the function to set
+ * @func: (scope call): the function to set
  * @user_data: user data passed to the function
  *
  * Set the event callback function and user data that will be called when
@@ -460,7 +448,7 @@ gst_collect_pads_set_event_function (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_query_function:
  * @pads: the collectpads to use
- * @func: the function to set
+ * @func: (scope call): the function to set
  * @user_data: user data passed to the function
  *
  * Set the query callback function and user data that will be called after
@@ -490,7 +478,7 @@ gst_collect_pads_set_query_function (GstCollectPads * pads,
 * @pads: the collectpads to use
 * @cdata: collect data of corresponding pad
 * @buf: buffer being clipped
-* @outbuf: (allow-none): output buffer with running time, or NULL if clipped
+* @outbuf: (allow-none) (out): output buffer with running time, or NULL if clipped
 * @user_data: user data (unused)
 *
 * Convenience clipping function that converts incoming buffer's timestamp
@@ -505,47 +493,51 @@ gst_collect_pads_clip_running_time (GstCollectPads * pads,
     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
     gpointer user_data)
 {
-  GstClockTime time;
-
   *outbuf = buf;
-  time = GST_BUFFER_PTS (buf);
 
   /* invalid left alone and passed */
-  if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
-    time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
-    if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
-      GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
-          GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
-      gst_buffer_unref (buf);
-      *outbuf = NULL;
-    } else {
-      GstClockTime buf_dts, abs_dts;
-      gint dts_sign;
-
-      GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
-          GST_TIME_FORMAT " running time",
-          GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
-      *outbuf = gst_buffer_make_writable (buf);
-      GST_BUFFER_PTS (*outbuf) = time;
-
-      dts_sign = gst_segment_to_running_time_full (&cdata->segment,
-          GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts);
-      buf_dts = GST_BUFFER_DTS (*outbuf);
-      if (dts_sign > 0) {
-        GST_BUFFER_DTS (*outbuf) = abs_dts;
-        GST_COLLECT_PADS_DTS (cdata) = abs_dts;
-      } else if (dts_sign < 0) {
-        GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
-        GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts);
-      } else {
-        GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
-        GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE;
+  if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) {
+    GstClockTime time;
+    GstClockTime buf_dts, abs_dts;
+    gint dts_sign;
+
+    time = GST_BUFFER_PTS (buf);
+
+    if (GST_CLOCK_TIME_IS_VALID (time)) {
+      time =
+          gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
+      if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
+        GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment %"
+            GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
+        gst_buffer_unref (buf);
+        *outbuf = NULL;
+        return GST_FLOW_OK;
       }
+    }
 
-      GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
-          GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
-          GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata)));
+    GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
+        GST_TIME_FORMAT " running time",
+        GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
+    *outbuf = gst_buffer_make_writable (buf);
+    GST_BUFFER_PTS (*outbuf) = time;
+
+    dts_sign = gst_segment_to_running_time_full (&cdata->segment,
+        GST_FORMAT_TIME, GST_BUFFER_DTS (*outbuf), &abs_dts);
+    buf_dts = GST_BUFFER_DTS (*outbuf);
+    if (dts_sign > 0) {
+      GST_BUFFER_DTS (*outbuf) = abs_dts;
+      GST_COLLECT_PADS_DTS (cdata) = abs_dts;
+    } else if (dts_sign < 0) {
+      GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
+      GST_COLLECT_PADS_DTS (cdata) = -((gint64) abs_dts);
+    } else {
+      GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
+      GST_COLLECT_PADS_DTS (cdata) = GST_CLOCK_STIME_NONE;
     }
+
+    GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
+        GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
+        GST_STIME_ARGS (GST_COLLECT_PADS_DTS (cdata)));
   }
 
   return GST_FLOW_OK;
@@ -554,7 +546,7 @@ gst_collect_pads_clip_running_time (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_clip_function:
  * @pads: the collectpads to use
- * @clipfunc: clip function to install
+ * @clipfunc: (scope call): clip function to install
  * @user_data: user data to pass to @clip_func
  *
  * Install a clipping function that is called right after a buffer is received
@@ -574,7 +566,7 @@ gst_collect_pads_set_clip_function (GstCollectPads * pads,
 /**
  * gst_collect_pads_set_flush_function:
  * @pads: the collectpads to use
- * @func: flush function to install
+ * @func: (scope call): flush function to install
  * @user_data: user data to pass to @func
  *
  * Install a flush function that is called when the internal
@@ -788,6 +780,8 @@ gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
 {
   GSList *walk = NULL;
 
+  GST_DEBUG ("sink-pads flushing=%d", flushing);
+
   /* Update the pads flushing flag */
   for (walk = pads->priv->pad_list; walk; walk = g_slist_next (walk)) {
     GstCollectData *cdata = walk->data;
@@ -948,8 +942,8 @@ gst_collect_pads_stop (GstCollectPads * pads)
  *
  * MT safe.
  *
- * Returns: The buffer in @data or %NULL if no buffer is queued.
- *  should unref the buffer after usage.
+ * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
+ * buffer is queued. should unref the buffer after usage.
  */
 GstBuffer *
 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
@@ -980,8 +974,8 @@ gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
  *
  * MT safe.
  *
- * Returns: (transfer full): The buffer in @data or %NULL if no buffer was
- *   queued. You should unref the buffer after usage.
+ * Returns: (transfer full) (nullable): The buffer in @data or %NULL if no
+ * buffer was queued. You should unref the buffer after usage.
  */
 GstBuffer *
 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
@@ -1147,9 +1141,9 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
  *
  * MT safe.
  *
- * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
- * A return of %NULL signals that the pad is end-of-stream.
- * Unref the buffer after use.
+ * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
+ * be less that requested. A return of %NULL signals that the pad is
+ * end-of-stream. Unref the buffer after use.
  */
 GstBuffer *
 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
@@ -1187,9 +1181,9 @@ gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
  *
  * MT safe.
  *
- * Returns: A sub buffer. The size of the buffer can be less that requested.
- * A return of %NULL signals that the pad is end-of-stream.
- * Unref the buffer after use.
+ * Returns: (transfer full) (nullable): A sub buffer. The size of the buffer can
+ * be less that requested. A return of %NULL signals that the pad is
+ * end-of-stream. Unref the buffer after use.
  */
 GstBuffer *
 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
@@ -1474,10 +1468,7 @@ gst_collect_pads_find_best_pad (GstCollectPads * pads,
     buffer = gst_collect_pads_peek (pads, data);
     /* if we have a buffer check if it is better then the current best one */
     if (buffer != NULL) {
-      timestamp = GST_BUFFER_DTS (buffer);
-      if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
-        timestamp = GST_BUFFER_PTS (buffer);
-      }
+      timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
       gst_buffer_unref (buffer);
       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
               best, best_time, pads->priv->compare_user_data) < 0) {
@@ -1642,7 +1633,7 @@ gst_collect_pads_clip_time (GstCollectPads * pads, GstCollectData * data,
   if (pads->priv->clip_func) {
     in = gst_buffer_new ();
     GST_BUFFER_PTS (in) = time;
-    GST_BUFFER_DTS (in) = time;
+    GST_BUFFER_DTS (in) = GST_CLOCK_TIME_NONE;
     pads->priv->clip_func (pads, data, in, &out, pads->priv->clip_user_data);
     if (out) {
       otime = GST_BUFFER_PTS (out);
@@ -1684,6 +1675,8 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
   pad = data->pad;
   parent = GST_OBJECT_PARENT (pad);
 
+  GST_DEBUG_OBJECT (pad, "Got '%s' event", GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
@@ -1714,7 +1707,9 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
       } else {
         /* forward event to unblock check_collected */
         GST_DEBUG_OBJECT (pad, "forwarding flush start");
-        res = gst_pad_event_default (pad, parent, event);
+        if (!(res = gst_pad_event_default (pad, parent, event))) {
+          GST_WARNING_OBJECT (pad, "forwarding flush start failed");
+        }
         event = NULL;
 
         /* now unblock the chain function.
@@ -1864,6 +1859,7 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
   }
 
 eat:
+  GST_DEBUG_OBJECT (pads, "dropping event: %" GST_PTR_FORMAT, event);
   if (event)
     gst_event_unref (event);
   return res;
@@ -1871,8 +1867,10 @@ eat:
 forward:
   if (discard)
     goto eat;
-  else
+  else {
+    GST_DEBUG_OBJECT (pads, "forward event: %" GST_PTR_FORMAT, event);
     return gst_pad_event_default (pad, parent, event);
+  }
 }
 
 typedef struct
@@ -2136,7 +2134,7 @@ pad_removed:
  * and if so we call the collected function. When this is done we check if
  * data has been unqueued. If data is still queued we wait holding the stream
  * lock to make sure no EOS event can happen while we are ready to be
- * collected 
+ * collected
  */
 static GstFlowReturn
 gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
@@ -2202,9 +2200,7 @@ gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
     GstClockTime timestamp;
 
-    timestamp = GST_BUFFER_DTS (buffer);
-    if (!GST_CLOCK_TIME_IS_VALID (timestamp))
-      timestamp = GST_BUFFER_PTS (buffer);
+    timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
 
     if (GST_CLOCK_TIME_IS_VALID (timestamp))
       data->segment.position = timestamp;