multiqueue: fix potential crash on shutdown
[platform/upstream/gstreamer.git] / subprojects / gstreamer / plugins / elements / gstmultiqueue.c
index e13d317..10ad203 100644 (file)
@@ -1295,6 +1295,7 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
         sq->last_query = FALSE;
         g_cond_signal (&sq->query_handled);
       }
+      mqueue->interleave_incomplete = FALSE;
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
       break;
     }
@@ -1569,6 +1570,7 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
 {
   GstClockTimeDiff low, high;
   GstClockTime interleave, other_interleave = 0;
+  gboolean some_inactive = FALSE;
   GList *tmp;
 
   low = high = GST_CLOCK_STIME_NONE;
@@ -1579,16 +1581,12 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
     /* Ignore sparse streams for interleave calculation */
     if (oq->is_sparse)
       continue;
-    /* If a stream is not active yet (hasn't received any buffers), set
-     * a maximum interleave to allow it to receive more data */
+
+    /* If some streams aren't active yet (haven't received any buffers), we will
+     * grow interleave accordingly */
     if (!oq->active) {
-      GST_LOG_OBJECT (mq,
-          "queue %d is not active yet, forcing interleave to 5s", oq->id);
-      mq->interleave = 5 * GST_SECOND;
-      /* Update max-size time */
-      mq->max_size.time = mq->interleave;
-      SET_CHILD_PROPERTY (mq, time);
-      goto beach;
+      some_inactive = TRUE;
+      continue;
     }
 
     /* Calculate within each streaming thread */
@@ -1598,11 +1596,23 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
       continue;
     }
 
-    if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime)) {
+    /* If the stream isn't EOS, update the low/high input value */
+    if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime) && !oq->is_eos) {
       if (low == GST_CLOCK_STIME_NONE || oq->cached_sinktime < low)
         low = oq->cached_sinktime;
       if (high == GST_CLOCK_STIME_NONE || oq->cached_sinktime > high)
         high = oq->cached_sinktime;
+
+      /* If the input is before the segment start, consider as inactive to allow
+       * the interleave to grow until *all* streams have data within the segment.
+       *
+       * The reason for this is that there is no requirements for data before
+       * the segment start to be "aligned" and therefore interleave calculation
+       * can't reliably be done. For example a demuxer could provide video data
+       * from the previous keyframe but audio only from just before the segment
+       * start */
+      if (oq->cached_sinktime < 0)
+        some_inactive = TRUE;
     }
     GST_LOG_OBJECT (mq,
         "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
@@ -1612,6 +1622,7 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
   }
 
   if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
+    gboolean do_update = high == low;
     interleave = high - low;
     /* Padding of interleave and minimum value */
     interleave = (150 * interleave / 100) + mq->min_interleave_time;
@@ -1620,11 +1631,28 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
 
     interleave = MAX (interleave, other_interleave);
 
+    /* Progressively grow up the interleave up to 5s if some streams were inactive */
+    if (some_inactive && interleave <= mq->interleave) {
+      interleave = MIN (5 * GST_SECOND, mq->interleave + 500 * GST_MSECOND);
+      do_update = TRUE;
+    }
+
+    /* We force the interleave update if:
+     * * the interleave was previously set while some streams were not active
+     *   yet but they now all are
+     * * OR the interleave was previously based on all streams being active
+     *   whereas some now aren't
+     */
+    if (mq->interleave_incomplete != some_inactive)
+      do_update = TRUE;
+
+    mq->interleave_incomplete = some_inactive;
+
     /* Update the stored interleave if:
      * * No data has arrived yet (high == low)
      * * Or it went higher
      * * Or it went lower and we've gone past the previous interleave needed */
-    if (high == low || interleave > mq->interleave ||
+    if (do_update || interleave > mq->interleave ||
         ((mq->last_interleave_update + (2 * MIN (GST_SECOND,
                         mq->interleave)) < low)
             && interleave < (mq->interleave * 3 / 4))) {
@@ -1637,7 +1665,6 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
     }
   }
 
-beach:
   GST_DEBUG_OBJECT (mq,
       "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
       GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
@@ -2092,6 +2119,7 @@ gst_multi_queue_loop (GstPad * pad)
   GstFlowReturn result;
   GstClockTimeDiff next_time;
   gboolean is_buffer;
+  gboolean is_query = FALSE;
   gboolean do_update_buffering = FALSE;
   gboolean dropping = FALSE;
   GstPad *srcpad = NULL;
@@ -2101,7 +2129,7 @@ gst_multi_queue_loop (GstPad * pad)
   srcpad = g_weak_ref_get (&sq->srcpad);
 
   if (!mq || !srcpad)
-    goto out_flushing;
+    goto done;
 
 next:
   GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
@@ -2117,6 +2145,8 @@ next:
   item = (GstMultiQueueItem *) sitem;
   newid = item->posid;
 
+  is_query = item->is_query;
+
   /* steal the object and destroy the item */
   object = gst_multi_queue_item_steal_object (item);
   gst_multi_queue_item_destroy (item);
@@ -2362,7 +2392,7 @@ done:
 
 out_flushing:
   {
-    if (object && !GST_IS_QUERY (object))
+    if (object && !is_query)
       gst_mini_object_unref (object);
 
     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
@@ -2416,7 +2446,7 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   mq = g_weak_ref_get (&sq->mqueue);
 
   if (!mq)
-    goto flushing;
+    goto done;
 
   /* if eos, we are always full, so avoid hanging incoming indefinitely */
   if (sq->is_eos)