aggregator: For the start time selection, only set the segment position
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index a0a8776..26d48a3 100644 (file)
@@ -215,7 +215,10 @@ struct _GstAggregatorPadPrivate
   gboolean pending_flush_stop;
   gboolean pending_eos;
 
+  gboolean first_buffer;
+
   GQueue buffers;
+  guint num_buffers;
   GstClockTime head_position;
   GstClockTime tail_position;
   GstClockTime head_time;
@@ -310,6 +313,7 @@ typedef struct
   GstEvent *event;
   gboolean result;
   gboolean flush;
+  gboolean only_to_active_pads;
 
   gboolean one_actually_seeked;
 } EventData;
@@ -777,6 +781,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
     }
     item = next;
   }
+  aggpad->priv->num_buffers = 0;
 
   PAD_BROADCAST_EVENT (aggpad);
   PAD_UNLOCK (aggpad);
@@ -1084,6 +1089,8 @@ gst_aggregator_default_sink_event (GstAggregator * self,
         GST_OBJECT_UNLOCK (self);
       }
 
+      aggpad->priv->first_buffer = TRUE;
+
       /* We never forward the event */
       goto eat;
     }
@@ -1501,6 +1508,7 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
         stop_type, stop, NULL);
     self->priv->seqnum = gst_event_get_seqnum (event);
+    self->priv->first_buffer = FALSE;
     GST_OBJECT_UNLOCK (self);
 
     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
@@ -1551,9 +1559,14 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
 
   if (peer) {
-    ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
-    GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
-    gst_object_unref (peer);
+    if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
+      GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
+      ret = TRUE;
+    } else {
+      ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
+      GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
+      gst_object_unref (peer);
+    }
   }
 
   if (ret == FALSE) {
@@ -1599,7 +1612,7 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
 
 static EventData
 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
-    GstEvent * event, gboolean flush)
+    GstEvent * event, gboolean flush, gboolean only_to_active_pads)
 {
   EventData evdata;
 
@@ -1607,6 +1620,7 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
   evdata.result = TRUE;
   evdata.flush = flush;
   evdata.one_actually_seeked = FALSE;
+  evdata.only_to_active_pads = only_to_active_pads;
 
   /* We first need to set all pads as flushing in a first pass
    * as flush_start flush_stop is sometimes sent synchronously
@@ -1666,7 +1680,8 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
   GST_OBJECT_UNLOCK (self);
 
   /* forward the seek upstream */
-  evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
+  evdata =
+      gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE);
   event = NULL;
 
   if (!evdata.result || !evdata.one_actually_seeked) {
@@ -1709,7 +1724,12 @@ gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
     }
   }
 
-  evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
+  /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
+   * they will receive a QOS event that has earliest_time=0 (because we can't
+   * have negative timestamps), and consider their buffer as too late */
+  evdata =
+      gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE,
+      GST_EVENT_TYPE (event) == GST_EVENT_QOS);
   res = evdata.result;
 
 done:
@@ -2040,7 +2060,7 @@ gst_aggregator_get_type (void)
   return type;
 }
 
-/* Must be called with PAD lock held */
+/* Must be called with SRC lock and PAD lock held */
 static gboolean
 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
 {
@@ -2048,6 +2068,11 @@ gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
   if (g_queue_get_length (&aggpad->priv->buffers) == 0)
     return TRUE;
 
+  /* We also want at least two buffers, one is being processed and one is ready
+   * for the next iteration when we operate in live mode. */
+  if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
+    return TRUE;
+
   /* zero latency, if there is a buffer, it's full */
   if (self->priv->latency == 0)
     return FALSE;
@@ -2124,6 +2149,8 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
 
   buf_pts = GST_BUFFER_PTS (actual_buf);
 
+  aggpad->priv->first_buffer = FALSE;
+
   for (;;) {
     SRC_LOCK (self);
     PAD_LOCK (aggpad);
@@ -2134,6 +2161,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
       else
         g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
       apply_buffer (aggpad, actual_buf, head);
+      aggpad->priv->num_buffers++;
       actual_buf = buffer = NULL;
       SRC_BROADCAST (self);
       break;
@@ -2188,8 +2216,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
         self->segment.position = start_time;
       else
         self->segment.position = MIN (start_time, self->segment.position);
-      self->segment.start = MIN (start_time, self->segment.start);
-      self->segment.time = MIN (start_time, self->segment.time);
 
       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
           GST_TIME_ARGS (start_time));
@@ -2408,6 +2434,8 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
 
   g_mutex_init (&pad->priv->flush_lock);
   g_mutex_init (&pad->priv->lock);
+
+  pad->priv->first_buffer = TRUE;
 }
 
 /**
@@ -2430,6 +2458,7 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
 
   if (buffer) {
     apply_buffer (pad, buffer, FALSE);
+    pad->priv->num_buffers--;
     GST_TRACE_OBJECT (pad, "Consuming buffer");
     if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
       pad->priv->pending_eos = FALSE;