aggregator: Don't block if adding to the tail of the queue
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 538c7e9..3a642d8 100644 (file)
@@ -217,6 +217,9 @@ struct _GstAggregatorPadPrivate
   GQueue data;                  /* buffers, events and queries */
   GstBuffer *clipped_buffer;
   guint num_buffers;
+
+  /* used to track fill state of queues, only used with live-src and when
+   * latency property is set to > 0 */
   GstClockTime head_position;
   GstClockTime tail_position;
   GstClockTime head_time;       /* running time */
@@ -1317,38 +1320,37 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
 static void
 update_time_level (GstAggregatorPad * aggpad, gboolean head)
 {
+  GstAggregatorPadPrivate *priv = aggpad->priv;
+
   if (head) {
-    if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
-        aggpad->priv->head_segment.format == GST_FORMAT_TIME)
-      aggpad->priv->head_time =
-          gst_segment_to_running_time (&aggpad->priv->head_segment,
-          GST_FORMAT_TIME, aggpad->priv->head_position);
+    if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
+        priv->head_segment.format == GST_FORMAT_TIME)
+      priv->head_time = gst_segment_to_running_time (&priv->head_segment,
+          GST_FORMAT_TIME, priv->head_position);
     else
-      aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+      priv->head_time = GST_CLOCK_TIME_NONE;
 
-    if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
-      aggpad->priv->tail_time = aggpad->priv->head_time;
+    if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
+      priv->tail_time = priv->head_time;
   } else {
-    if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
+    if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
         aggpad->segment.format == GST_FORMAT_TIME)
-      aggpad->priv->tail_time =
-          gst_segment_to_running_time (&aggpad->segment,
-          GST_FORMAT_TIME, aggpad->priv->tail_position);
+      priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
+          GST_FORMAT_TIME, priv->tail_position);
     else
-      aggpad->priv->tail_time = aggpad->priv->head_time;
+      priv->tail_time = priv->head_time;
   }
 
-  if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
-      aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
-    aggpad->priv->time_level = 0;
+  if (priv->head_time == GST_CLOCK_TIME_NONE ||
+      priv->tail_time == GST_CLOCK_TIME_NONE) {
+    priv->time_level = 0;
     return;
   }
 
-  if (aggpad->priv->tail_time > aggpad->priv->head_time)
-    aggpad->priv->time_level = 0;
+  if (priv->tail_time > priv->head_time)
+    priv->time_level = 0;
   else
-    aggpad->priv->time_level = aggpad->priv->head_time -
-        aggpad->priv->tail_time;
+    priv->time_level = priv->head_time - priv->tail_time;
 }
 
 
@@ -1546,7 +1548,7 @@ gst_aggregator_stop (GstAggregator * agg)
 
   agg->priv->has_peer_latency = FALSE;
   agg->priv->peer_latency_live = FALSE;
-  agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
+  agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
 
   if (agg->priv->tags)
     gst_tag_list_unref (agg->priv->tags);
@@ -1716,8 +1718,6 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
 
   gst_query_parse_latency (query, &live, &min, &max);
 
-  our_latency = self->priv->latency;
-
   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
@@ -1732,6 +1732,8 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
     return FALSE;
   }
 
+  our_latency = self->priv->latency;
+
   self->priv->peer_latency_live = live;
   self->priv->peer_latency_min = min;
   self->priv->peer_latency_max = max;
@@ -2495,7 +2497,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
       aggpad->priv->first_buffer = FALSE;
     }
 
-    if (gst_aggregator_pad_has_space (self, aggpad)
+    if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
         && aggpad->priv->flow_return == GST_FLOW_OK) {
       if (head)
         g_queue_push_head (&aggpad->priv->data, buffer);
@@ -2602,7 +2604,6 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
 {
   GstAggregator *self = GST_AGGREGATOR (parent);
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
-  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
   if (GST_QUERY_IS_SERIALIZED (query)) {
     GstStructure *s;
@@ -2638,9 +2639,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
     PAD_UNLOCK (aggpad);
 
     return ret;
-  }
+  } else {
+    GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
-  return klass->sink_query (self, aggpad, query);
+    return klass->sink_query (self, aggpad, query);
+  }
 
 flushing:
   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
@@ -2661,17 +2664,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
   GstFlowReturn ret = GST_FLOW_OK;
   GstAggregator *self = GST_AGGREGATOR (parent);
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
-  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
   if (GST_EVENT_IS_SERIALIZED (event)
       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
     SRC_LOCK (self);
     PAD_LOCK (aggpad);
 
-    if (aggpad->priv->flow_return != GST_FLOW_OK) {
-      ret = aggpad->priv->flow_return;
+    if (aggpad->priv->flow_return != GST_FLOW_OK)
       goto flushing;
-    }
 
     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
       GST_OBJECT_LOCK (aggpad);
@@ -2687,6 +2687,8 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
     PAD_UNLOCK (aggpad);
     SRC_UNLOCK (self);
   } else {
+    GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
+
     if (!klass->sink_event (self, aggpad, event)) {
       /* Copied from GstPad to convert boolean to a GstFlowReturn in
        * the event handling func */
@@ -2705,7 +2707,7 @@ flushing:
     gst_pad_store_sticky_event (pad, event);
   gst_event_unref (event);
 
-  return ret;
+  return aggpad->priv->flow_return;
 }
 
 static gboolean