aggregator: Fixup for previous commit to prevent infinite loop if no events are pending
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 7eeea59..918108d 100644 (file)
@@ -300,6 +300,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
  * GstAggregator implementation  *
  *************************************/
 static GstElementClass *aggregator_parent_class = NULL;
+static gint aggregator_private_offset = 0;
 
 /* All members are protected by the object lock unless otherwise noted */
 
@@ -717,6 +718,12 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
   return res;
 }
 
+typedef struct
+{
+  gboolean processed_event;
+  GstFlowReturn flow_ret;
+} DoHandleEventsAndQueriesData;
+
 static gboolean
 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
     gpointer user_data)
@@ -726,7 +733,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
   GstEvent *event = NULL;
   GstQuery *query = NULL;
   GstAggregatorClass *klass = NULL;
-  gboolean *processed_event = user_data;
+  DoHandleEventsAndQueriesData *data = user_data;
 
   do {
     event = NULL;
@@ -744,8 +751,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
     if (event || query) {
       gboolean ret;
 
-      if (processed_event)
-        *processed_event = TRUE;
+      data->processed_event = TRUE;
       if (klass == NULL)
         klass = GST_AGGREGATOR_GET_CLASS (self);
 
@@ -755,8 +761,11 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
         ret = klass->sink_event (aggregator, pad, event);
 
         PAD_LOCK (pad);
-        if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
+        if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
           pad->priv->negotiated = ret;
+          if (!ret)
+            pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
+        }
         if (g_queue_peek_tail (&pad->priv->data) == event)
           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
         gst_event_unref (event);
@@ -1096,10 +1105,13 @@ gst_aggregator_aggregate_func (GstAggregator * self)
   GST_LOG_OBJECT (self, "Checking aggregate");
   while (priv->send_eos && priv->running) {
     GstFlowReturn flow_return = GST_FLOW_OK;
-    gboolean processed_event = FALSE;
+    DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
 
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
-        gst_aggregator_do_events_and_queries, NULL);
+        gst_aggregator_do_events_and_queries, &events_query_data);
+
+    if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+      goto handle_error;
 
     if (self->priv->peer_latency_live)
       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
@@ -1110,10 +1122,15 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     if (!gst_aggregator_wait_and_check (self, &timeout))
       continue;
 
+    events_query_data.processed_event = FALSE;
+    events_query_data.flow_ret = GST_FLOW_OK;
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
-        gst_aggregator_do_events_and_queries, &processed_event);
+        gst_aggregator_do_events_and_queries, &events_query_data);
+
+    if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+      goto handle_error;
 
-    if (processed_event)
+    if (events_query_data.processed_event)
       continue;
 
     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
@@ -1143,6 +1160,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
       gst_aggregator_push_eos (self);
     }
 
+  handle_error:
     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
 
     if (flow_return != GST_FLOW_OK) {
@@ -1895,7 +1913,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
     } else {
       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
-      gst_object_unref (peer);
     }
   }
 
@@ -1935,6 +1952,9 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
 
   evdata->result &= ret;
 
+  if (peer)
+    gst_object_unref (peer);
+
   /* Always send to all pads */
   return FALSE;
 }
@@ -2287,11 +2307,13 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   GstElementClass *gstelement_class = (GstElementClass *) klass;
 
   aggregator_parent_class = g_type_class_peek_parent (klass);
-  g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
 
   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
       GST_DEBUG_FG_MAGENTA, "GstAggregator");
 
+  if (aggregator_private_offset != 0)
+    g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
+
   klass->finish_buffer = gst_aggregator_default_finish_buffer;
 
   klass->sink_event = gst_aggregator_default_sink_event;
@@ -2338,6 +2360,12 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
+static inline gpointer
+gst_aggregator_get_instance_private (GstAggregator * self)
+{
+  return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
+}
+
 static void
 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 {
@@ -2346,9 +2374,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 
   g_return_if_fail (klass->aggregate != NULL);
 
-  self->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
-      GstAggregatorPrivate);
+  self->priv = gst_aggregator_get_instance_private (self);
 
   priv = self->priv;
 
@@ -2408,6 +2434,10 @@ gst_aggregator_get_type (void)
 
     _type = g_type_register_static (GST_TYPE_ELEMENT,
         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
+
+    aggregator_private_offset =
+        g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
+
     g_once_init_leave (&type, _type);
   }
   return type;
@@ -2741,7 +2771,7 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
 /***********************************
  * GstAggregatorPad implementation  *
  ************************************/
-G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
 
 static void
 gst_aggregator_pad_constructed (GObject * object)
@@ -2787,8 +2817,6 @@ gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
 
-  g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
-
   gobject_class->constructed = gst_aggregator_pad_constructed;
   gobject_class->finalize = gst_aggregator_pad_finalize;
   gobject_class->dispose = gst_aggregator_pad_dispose;
@@ -2797,9 +2825,7 @@ gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
 static void
 gst_aggregator_pad_init (GstAggregatorPad * pad)
 {
-  pad->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
-      GstAggregatorPadPrivate);
+  pad->priv = gst_aggregator_pad_get_instance_private (pad);
 
   g_queue_init (&pad->priv->data);
   g_cond_init (&pad->priv->event_cond);
@@ -2881,6 +2907,11 @@ gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
 
   PAD_LOCK (pad);
 
+  if (pad->priv->flow_return != GST_FLOW_OK) {
+    PAD_UNLOCK (pad);
+    return NULL;
+  }
+
   gst_aggregator_pad_clip_buffer_unlocked (pad);
 
   buffer = pad->priv->clipped_buffer;
@@ -2933,6 +2964,11 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
 
   PAD_LOCK (pad);
 
+  if (pad->priv->flow_return != GST_FLOW_OK) {
+    PAD_UNLOCK (pad);
+    return NULL;
+  }
+
   gst_aggregator_pad_clip_buffer_unlocked (pad);
 
   if (pad->priv->clipped_buffer) {
@@ -2955,7 +2991,7 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
  *
  * Returns: %TRUE if the pad has a buffer available as the next thing.
  *
- * Since: 1.16
+ * Since: 1.14.1
  */
 gboolean
 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
@@ -3114,3 +3150,40 @@ gst_aggregator_get_allocator (GstAggregator * self,
   if (params)
     *params = self->priv->allocation_params;
 }
+
+/**
+ * gst_aggregator_simple_get_next_time:
+ * @self: A #GstAggregator
+ *
+ * This is a simple #GstAggregator::get_next_time implementation that
+ * just looks at the #GstSegment on the srcpad of the aggregator and bases
+ * the next time on the running there there.
+ *
+ * This is the desired behaviour in most cases where you have a live source
+ * and you have a dead line based aggregator subclass.
+ *
+ * Returns: The running time based on the position
+ *
+ * Since: 1.16
+ */
+GstClockTime
+gst_aggregator_simple_get_next_time (GstAggregator * self)
+{
+  GstClockTime next_time;
+  GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
+  GstSegment *segment = &srcpad->segment;
+
+  GST_OBJECT_LOCK (self);
+  if (segment->position == -1 || segment->position < segment->start)
+    next_time = segment->start;
+  else
+    next_time = segment->position;
+
+  if (segment->stop != -1 && next_time > segment->stop)
+    next_time = segment->stop;
+
+  next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
+  GST_OBJECT_UNLOCK (self);
+
+  return next_time;
+}