aggregator: Simplify clip function
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 7e8c5be..6661bdf 100644 (file)
  */
 /**
  * SECTION: gstaggregator
+ * @title: GstAggregator
  * @short_description: manages a set of pads with the purpose of
  * aggregating their buffers.
  * @see_also: gstcollectpads for historical reasons.
  *
  * Manages a set of pads with the purpose of aggregating their buffers.
  * Control is given to the subclass when all pads have data.
- * <itemizedlist>
- *  <listitem><para>
- *    Base class for mixers and muxers. Subclasses should at least implement
+ *
+ *  * Base class for mixers and muxers. Subclasses should at least implement
  *    the #GstAggregatorClass.aggregate() virtual method.
- *  </para></listitem>
- *  <listitem><para>
- *    When data is queued on all pads, tha aggregate vmethod is called.
- *  </para></listitem>
- *  <listitem><para>
- *    One can peek at the data on any given GstAggregatorPad with the
+ *
+ *  * When data is queued on all pads, tha aggregate vmethod is called.
+ *
+ *  * One can peek at the data on any given GstAggregatorPad with the
  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
  *    has been taken with steal_buffer (), a new buffer can be queued
  *    on that pad.
- *  </para></listitem>
- *  <listitem><para>
- *    If the subclass wishes to push a buffer downstream in its aggregate
+ *
+ *  * If the subclass wishes to push a buffer downstream in its aggregate
  *    implementation, it should do so through the
  *    gst_aggregator_finish_buffer () method. This method will take care
  *    of sending and ordering mandatory events such as stream start, caps
  *    and segment.
- *  </para></listitem>
- *  <listitem><para>
- *    Same goes for EOS events, which should not be pushed directly by the
+ *
+ *  * Same goes for EOS events, which should not be pushed directly by the
  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
  *    implementation.
- *  </para></listitem>
- *  <listitem><para>
- *    Note that the aggregator logic regarding gap event handling is to turn
+ *
+ *  * Note that the aggregator logic regarding gap event handling is to turn
  *    these into gap buffers with matching PTS and duration. It will also
  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
  *    to ease their identification and subsequent processing.
- *  </para></listitem>
- * </itemizedlist>
+ *
  */
 
 #ifdef HAVE_CONFIG_H
@@ -235,12 +229,10 @@ struct _GstAggregatorPadPrivate
   GMutex flush_lock;
 };
 
-static gboolean
-gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
+/* Must be called with PAD_LOCK held */
+static void
+gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
 {
-  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
-
-  PAD_LOCK (aggpad);
   aggpad->priv->pending_eos = FALSE;
   aggpad->priv->eos = FALSE;
   aggpad->priv->flow_return = GST_FLOW_OK;
@@ -253,6 +245,16 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
   aggpad->priv->time_level = 0;
+  aggpad->priv->first_buffer = TRUE;
+}
+
+static gboolean
+gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
+{
+  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+  PAD_LOCK (aggpad);
+  gst_aggregator_pad_reset_unlocked (aggpad);
   PAD_UNLOCK (aggpad);
 
   if (klass->flush)
@@ -424,7 +426,8 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
 {
   GstAggregatorPad *pad;
   GList *l, *sinkpads;
-  gboolean have_data = TRUE;
+  gboolean have_buffer = TRUE;
+  gboolean have_event = FALSE;
 
   GST_LOG_OBJECT (self, "checking pads");
 
@@ -439,9 +442,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
 
     PAD_LOCK (pad);
 
-    if (gst_aggregator_pad_queue_is_empty (pad)) {
+    if (pad->priv->num_buffers == 0) {
+      if (!gst_aggregator_pad_queue_is_empty (pad))
+        have_event = TRUE;
       if (!pad->priv->eos) {
-        have_data = FALSE;
+        have_buffer = FALSE;
 
         /* If not live we need data on all pads, so leave the loop */
         if (!self->priv->peer_latency_live) {
@@ -460,10 +465,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
     PAD_UNLOCK (pad);
   }
 
-  if (!have_data)
+  if (!have_buffer && !have_event)
     goto pad_not_ready;
 
-  self->priv->first_buffer = FALSE;
+  if (have_buffer)
+    self->priv->first_buffer = FALSE;
 
   GST_OBJECT_UNLOCK (self);
   GST_LOG_OBJECT (self, "pads are ready");
@@ -477,9 +483,13 @@ no_sinkpads:
   }
 pad_not_ready:
   {
-    GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
+    if (have_event)
+      GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
+          " but waking up for serialized event");
+    else
+      GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
     GST_OBJECT_UNLOCK (self);
-    return FALSE;
+    return have_event;
   }
 }
 
@@ -1024,6 +1034,9 @@ update_time_level (GstAggregatorPad * aggpad, gboolean head)
           GST_FORMAT_TIME, aggpad->priv->head_position);
     else
       aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+
+    if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
+      aggpad->priv->tail_time = aggpad->priv->head_time;
   } else {
     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
         aggpad->segment.format == GST_FORMAT_TIME)
@@ -1095,8 +1108,6 @@ gst_aggregator_default_sink_event (GstAggregator * self,
         GST_OBJECT_UNLOCK (self);
       }
 
-      aggpad->priv->first_buffer = TRUE;
-
       /* We never forward the event */
       goto eat;
     }
@@ -2130,7 +2141,6 @@ static GstFlowReturn
 gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
 {
-  GstBuffer *actual_buf = buffer;
   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
@@ -2150,15 +2160,15 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   PAD_UNLOCK (aggpad);
 
   if (aggclass->clip && head) {
-    aggclass->clip (self, aggpad, buffer, &actual_buf);
+    buffer = aggclass->clip (self, aggpad, buffer);
   }
 
-  if (actual_buf == NULL) {
-    GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function");
+  if (buffer == NULL) {
+    GST_LOG_OBJECT (aggpad, "Buffer dropped by clip function");
     goto done;
   }
 
-  buf_pts = GST_BUFFER_PTS (actual_buf);
+  buf_pts = GST_BUFFER_PTS (buffer);
 
   aggpad->priv->first_buffer = FALSE;
 
@@ -2169,12 +2179,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
     if (gst_aggregator_pad_has_space (self, aggpad)
         && aggpad->priv->flow_return == GST_FLOW_OK) {
       if (head)
-        g_queue_push_head (&aggpad->priv->buffers, actual_buf);
+        g_queue_push_head (&aggpad->priv->buffers, buffer);
       else
-        g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
-      apply_buffer (aggpad, actual_buf, head);
+        g_queue_push_tail (&aggpad->priv->buffers, buffer);
+      apply_buffer (aggpad, buffer, head);
       aggpad->priv->num_buffers++;
-      actual_buf = buffer = NULL;
+      buffer = NULL;
       SRC_BROADCAST (self);
       break;
     }
@@ -2351,17 +2361,12 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
   }
 
   if (event) {
+    gboolean is_caps = (GST_EVENT_TYPE (event) == GST_EVENT_CAPS);
+
     if (!klass->sink_event (self, aggpad, event)) {
       /* Copied from GstPad to convert boolean to a GstFlowReturn in
        * the event handling func */
-      switch (GST_EVENT_TYPE (event)) {
-        case GST_EVENT_CAPS:
-          ret = GST_FLOW_NOT_NEGOTIATED;
-          break;
-        default:
-          ret = GST_FLOW_ERROR;
-          break;
-      }
+      ret = is_caps ? GST_FLOW_NOT_NEGOTIATED : GST_FLOW_ERROR;
     }
   }
 
@@ -2414,8 +2419,7 @@ gst_aggregator_pad_constructed (GObject * object)
   gst_pad_set_chain_function (pad,
       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
   gst_pad_set_event_full_function_full (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func),
-      NULL, NULL);
+      GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
   gst_pad_set_query_function (pad,
       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
   gst_pad_set_activatemode_function (pad,
@@ -2469,7 +2473,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
   g_mutex_init (&pad->priv->flush_lock);
   g_mutex_init (&pad->priv->lock);
 
-  pad->priv->first_buffer = TRUE;
+  gst_aggregator_pad_reset_unlocked (pad);
 }
 
 /**