aggregator: Remove klass->sinkpads_type
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 3a642d8..9dddf83 100644 (file)
@@ -103,8 +103,8 @@ gst_aggregator_start_time_selection_get_type (void)
 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
     const GstTagList * tags, GstTagMergeMode mode);
 static void gst_aggregator_set_latency_property (GstAggregator * agg,
-    gint64 latency);
-static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
+    GstClockTime latency);
+static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
 
 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
 
@@ -286,6 +286,8 @@ struct _GstAggregatorPrivate
   /* Our state is >= PAUSED */
   gboolean running;             /* protected by src_lock */
 
+  /* seqnum from seek or segment,
+   * to be applied to synthetic segment/eos events */
   gint seqnum;
   gboolean send_stream_start;   /* protected by srcpad stream lock */
   gboolean send_segment;
@@ -354,87 +356,6 @@ enum
 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
 
-/**
- * gst_aggregator_iterate_sinkpads:
- * @self: The #GstAggregator
- * @func: (scope call): The function to call.
- * @user_data: (closure): The data to pass to @func.
- *
- * Iterate the sinkpads of aggregator to call a function on them.
- *
- * This method guarantees that @func will be called only once for each
- * sink pad.
- *
- * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
- */
-gboolean
-gst_aggregator_iterate_sinkpads (GstAggregator * self,
-    GstAggregatorPadForeachFunc func, gpointer user_data)
-{
-  gboolean result = FALSE;
-  GstIterator *iter;
-  gboolean done = FALSE;
-  GValue item = { 0, };
-  GList *seen_pads = NULL;
-
-  iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
-
-  if (!iter)
-    goto no_iter;
-
-  while (!done) {
-    switch (gst_iterator_next (iter, &item)) {
-      case GST_ITERATOR_OK:
-      {
-        GstAggregatorPad *pad;
-
-        pad = g_value_get_object (&item);
-
-        /* if already pushed, skip. FIXME, find something faster to tag pads */
-        if (pad == NULL || g_list_find (seen_pads, pad)) {
-          g_value_reset (&item);
-          break;
-        }
-
-        GST_LOG_OBJECT (pad, "calling function %s on pad",
-            GST_DEBUG_FUNCPTR_NAME (func));
-
-        result = func (self, pad, user_data);
-
-        done = !result;
-
-        seen_pads = g_list_prepend (seen_pads, pad);
-
-        g_value_reset (&item);
-        break;
-      }
-      case GST_ITERATOR_RESYNC:
-        gst_iterator_resync (iter);
-        break;
-      case GST_ITERATOR_ERROR:
-        GST_ERROR_OBJECT (self,
-            "Could not iterate over internally linked pads");
-        done = TRUE;
-        break;
-      case GST_ITERATOR_DONE:
-        done = TRUE;
-        break;
-    }
-  }
-  g_value_unset (&item);
-  gst_iterator_free (iter);
-
-  if (seen_pads == NULL) {
-    GST_DEBUG_OBJECT (self, "No pad seen");
-    return FALSE;
-  }
-
-  g_list_free (seen_pads);
-
-no_iter:
-  return result;
-}
-
 static gboolean
 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 {
@@ -561,6 +482,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
     segment = gst_event_new_segment (&self->segment);
 
     if (!self->priv->seqnum)
+      /* This code-path is in preparation to be able to run without a source
+       * connected. Then we won't have a seq-num from a segment event. */
       self->priv->seqnum = gst_event_get_seqnum (segment);
     else
       gst_event_set_seqnum (segment, self->priv->seqnum);
@@ -758,9 +681,11 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
 }
 
 static gboolean
-gst_aggregator_do_events_and_queries (GstAggregator * self,
-    GstAggregatorPad * pad, gpointer user_data)
+gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
+    gpointer user_data)
 {
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+  GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
   GstEvent *event = NULL;
   GstQuery *query = NULL;
   GstAggregatorClass *klass = NULL;
@@ -790,7 +715,7 @@ gst_aggregator_do_events_and_queries (GstAggregator * self,
       if (event) {
         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
         gst_event_ref (event);
-        ret = klass->sink_event (self, pad, event);
+        ret = klass->sink_event (aggregator, pad, event);
 
         PAD_LOCK (pad);
         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
@@ -800,7 +725,7 @@ gst_aggregator_do_events_and_queries (GstAggregator * self,
         gst_event_unref (event);
       } else if (query) {
         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
-        ret = klass->sink_query (self, pad, query);
+        ret = klass->sink_query (aggregator, pad, query);
 
         PAD_LOCK (pad);
         if (g_queue_peek_tail (&pad->priv->data) == query) {
@@ -1100,14 +1025,17 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     GstFlowReturn flow_return = GST_FLOW_OK;
     gboolean processed_event = FALSE;
 
-    gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
-        NULL);
+    gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+        gst_aggregator_do_events_and_queries, NULL);
 
+    /* Ensure we have buffers ready (either in clipped_buffer or at the head of
+     * the queue */
     if (!gst_aggregator_wait_and_check (self, &timeout))
       continue;
 
-    gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
-        &processed_event);
+    gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+        gst_aggregator_do_events_and_queries, &processed_event);
+
     if (processed_event)
       continue;
 
@@ -1514,11 +1442,13 @@ eat:
   return res;
 }
 
-static inline gboolean
-gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
-    gpointer unused_udata)
+static gboolean
+gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
 {
-  gst_aggregator_pad_flush (pad, self);
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+  GstAggregator *agg = GST_AGGREGATOR_CAST (self);
+
+  gst_aggregator_pad_flush (pad, agg);
 
   PAD_LOCK (pad);
   pad->priv->flow_return = GST_FLOW_FLUSHING;
@@ -1537,7 +1467,9 @@ gst_aggregator_stop (GstAggregator * agg)
 
   gst_aggregator_reset_flow_values (agg);
 
-  gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
+  /* Application needs to make sure no pads are added while it shuts us down */
+  gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
+      gst_aggregator_stop_pad, NULL);
 
   klass = GST_AGGREGATOR_GET_CLASS (agg);
 
@@ -1632,6 +1564,9 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
   GstAggregatorPrivate *priv = self->priv;
   gint serial = 0;
   gchar *name = NULL;
+  GType pad_type =
+      GST_PAD_TEMPLATE_GTYPE (templ) ==
+      G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
 
   if (templ->direction != GST_PAD_SINK)
     goto not_sink;
@@ -1652,7 +1587,7 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
   }
 
   name = g_strdup_printf ("sink_%u", serial);
-  agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
+  agg_pad = g_object_new (pad_type,
       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
   g_free (name);
 
@@ -2170,7 +2105,7 @@ gst_aggregator_finalize (GObject * object)
  * as unresponsive.
  */
 static void
-gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
+gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
 {
   gboolean changed;
 
@@ -2221,12 +2156,12 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
  * before a pad is deemed unresponsive. A value of -1 means an
  * unlimited time.
  */
-static gint64
+static GstClockTime
 gst_aggregator_get_latency_property (GstAggregator * agg)
 {
-  gint64 res;
+  GstClockTime res;
 
-  g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
+  g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
 
   GST_OBJECT_LOCK (agg);
   res = agg->priv->latency;
@@ -2243,7 +2178,7 @@ gst_aggregator_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_LATENCY:
-      gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
+      gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
       break;
     case PROP_START_TIME_SELECTION:
       agg->priv->start_time_selection = g_value_get_enum (value);
@@ -2265,7 +2200,7 @@ gst_aggregator_get_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_LATENCY:
-      g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
+      g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
       break;
     case PROP_START_TIME_SELECTION:
       g_value_set_enum (value, agg->priv->start_time_selection);
@@ -2292,8 +2227,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
       GST_DEBUG_FG_MAGENTA, "GstAggregator");
 
-  klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
-
   klass->sink_event = gst_aggregator_default_sink_event;
   klass->sink_query = gst_aggregator_default_sink_query;
 
@@ -2318,11 +2251,10 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   gobject_class->finalize = gst_aggregator_finalize;
 
   g_object_class_install_property (gobject_class, PROP_LATENCY,
-      g_param_spec_int64 ("latency", "Buffer latency",
+      g_param_spec_uint64 ("latency", "Buffer latency",
           "Additional latency in live mode to allow upstream "
           "to take longer to produce buffers for the current "
-          "position (in nanoseconds)", 0,
-          (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
+          "position (in nanoseconds)", 0, G_MAXUINT64,
           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
@@ -2338,7 +2270,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           G_MAXUINT64,
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
 }
 
@@ -2467,6 +2398,12 @@ apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
   update_time_level (aggpad, head);
 }
 
+/*
+ * Can be called either from the sinkpad's chain function or from the srcpad's
+ * thread in the case of a buffer synthetized from a GAP event.
+ * Because of this second case, FLUSH_LOCK can't be used here.
+ */
+
 static GstFlowReturn
 gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
@@ -2476,8 +2413,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
 
   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
 
-  PAD_FLUSH_LOCK (aggpad);
-
   PAD_LOCK (aggpad);
   flow_return = aggpad->priv->flow_return;
   if (flow_return != GST_FLOW_OK)
@@ -2573,15 +2508,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GST_OBJECT_UNLOCK (self);
   SRC_UNLOCK (self);
 
-  PAD_FLUSH_UNLOCK (aggpad);
-
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
 
   return flow_return;
 
 flushing:
   PAD_UNLOCK (aggpad);
-  PAD_FLUSH_UNLOCK (aggpad);
 
   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
       gst_flow_get_name (flow_return));
@@ -2594,8 +2526,17 @@ flushing:
 static GstFlowReturn
 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
 {
-  return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
-      GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
+  GstFlowReturn ret;
+  GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
+
+  PAD_FLUSH_LOCK (aggpad);
+
+  ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
+      aggpad, buffer, TRUE);
+
+  PAD_FLUSH_UNLOCK (aggpad);
+
+  return ret;
 }
 
 static gboolean
@@ -2653,7 +2594,7 @@ flushing:
   return FALSE;
 }
 
-/* Queue serialized events and let the others go though directly.
+/* Queue serialized events and let the others go through directly.
  * The queued events with be handled from the src-pad task in
  * gst_aggregator_do_events_and_queries().
  */
@@ -2937,6 +2878,12 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
   return buffer;
 }
 
+/**
+ * gst_aggregator_pad_is_eos:
+ * @pad: an aggregator pad
+ *
+ * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
+ */
 gboolean
 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
 {