queue: apply the upstream 1.14.4 95/200295/1
authorEunhae Choi <eunhae1.choi@samsung.com>
Thu, 21 Feb 2019 08:08:25 +0000 (17:08 +0900)
committerEunhae Choi <eunhae1.choi@samsung.com>
Thu, 21 Feb 2019 08:08:31 +0000 (17:08 +0900)
- apply the upstream 1.14.4 patches
  to support the latest playback operation.

Change-Id: I70db32bb428c7b97148b6d481d18d43dbfc034f7

plugins/elements/gstmultiqueue.c
plugins/elements/gstqueue.c
plugins/elements/gstqueue2.c

index 38f1efa..393f788 100644 (file)
@@ -150,6 +150,7 @@ struct _GstSingleQueue
   GstDataQueueSize max_size, extra_size;
   GstClockTime cur_time;
   gboolean is_eos;
+  gboolean is_segment_done;
   gboolean is_sparse;
   gboolean flushing;
   gboolean active;
@@ -168,7 +169,8 @@ struct _GstSingleQueue
   GstQuery *last_handled_query;
 
   /* For interleave calculation */
-  GThread *thread;
+  GThread *thread;              /* Streaming thread of SingleQueue */
+  GstClockTime interleave;      /* Calculated interleve within the thread */
 };
 
 
@@ -203,8 +205,7 @@ static void recheck_buffering_status (GstMultiQueue * mq);
 
 static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
 
-static void calculate_interleave (GstMultiQueue * mq);
-
+static void calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq);
 
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
     GST_PAD_SINK,
@@ -501,6 +502,7 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
 
   /* PROPERTIES */
+
   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
           "Max. amount of data in the queue (bytes, 0=disable)",
@@ -819,7 +821,7 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
       mq->min_interleave_time = g_value_get_uint64 (value);
       if (mq->use_interleave)
-        calculate_interleave (mq);
+        calculate_interleave (mq, NULL);
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       break;
     default:
@@ -1100,11 +1102,39 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
 }
 
 static gboolean
-gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
-    gboolean full)
+gst_single_queue_start (GstMultiQueue * mq, GstSingleQueue * sq)
+{
+  GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
+  return gst_pad_start_task (sq->srcpad,
+      (GstTaskFunction) gst_multi_queue_loop, sq->srcpad, NULL);
+}
+
+static gboolean
+gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
 {
   gboolean result;
 
+  GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
+  result = gst_pad_pause_task (sq->srcpad);
+  sq->sink_tainted = sq->src_tainted = TRUE;
+  return result;
+}
+
+static gboolean
+gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq)
+{
+  gboolean result;
+
+  GST_LOG_OBJECT (mq, "SingleQueue %d : stopping task", sq->id);
+  result = gst_pad_stop_task (sq->srcpad);
+  sq->sink_tainted = sq->src_tainted = TRUE;
+  return result;
+}
+
+static void
+gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
+    gboolean full)
+{
   GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
       sq->id);
 
@@ -1122,10 +1152,6 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
     sq->last_query = FALSE;
     g_cond_signal (&sq->query_handled);
     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
-
-    GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
-    result = gst_pad_pause_task (sq->srcpad);
-    sq->sink_tainted = sq->src_tainted = TRUE;
   } else {
     gst_single_queue_flush_queue (sq, full);
 
@@ -1139,6 +1165,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
     sq->cur_time = 0;
     sq->max_size.visible = mq->max_size.visible;
     sq->is_eos = FALSE;
+    sq->is_segment_done = FALSE;
     sq->nextid = 0;
     sq->oldid = 0;
     sq->last_oldid = G_MAXUINT32;
@@ -1156,13 +1183,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
 
     sq->flushing = FALSE;
     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
-
-    GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
-    result =
-        gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
-        sq->srcpad, NULL);
   }
-  return result;
 }
 
 /* WITH LOCK TAKEN */
@@ -1180,7 +1201,8 @@ get_buffering_level (GstSingleQueue * sq)
       size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
 
   /* get bytes and time buffer levels and take the max */
-  if (sq->is_eos || sq->srcresult == GST_FLOW_NOT_LINKED || sq->is_sparse) {
+  if (sq->is_eos || sq->is_segment_done || sq->srcresult == GST_FLOW_NOT_LINKED
+      || sq->is_sparse) {
     buffering_level = MAX_BUFFERING_LEVEL;
   } else {
     buffering_level = 0;
@@ -1206,6 +1228,7 @@ static void
 update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
 {
   gint buffering_level, percent;
+
   /* nothing to dowhen we are not in buffering mode */
   if (!mq->use_buffering)
     return;
@@ -1313,41 +1336,49 @@ recheck_buffering_status (GstMultiQueue * mq)
 }
 
 static void
-calculate_interleave (GstMultiQueue * mq)
+calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
 {
   GstClockTimeDiff low, high;
-  GstClockTime interleave;
+  GstClockTime interleave, other_interleave = 0;
   GList *tmp;
 
   low = high = GST_CLOCK_STIME_NONE;
   interleave = mq->interleave;
   /* Go over all single queues and calculate lowest/highest value */
   for (tmp = mq->queues; tmp; tmp = tmp->next) {
-    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+    GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
     /* Ignore sparse streams for interleave calculation */
-    if (sq->is_sparse)
+    if (oq->is_sparse)
       continue;
     /* If a stream is not active yet (hasn't received any buffers), set
      * a maximum interleave to allow it to receive more data */
-    if (!sq->active) {
+    if (!oq->active) {
       GST_LOG_OBJECT (mq,
-          "queue %d is not active yet, forcing interleave to 5s", sq->id);
+          "queue %d is not active yet, forcing interleave to 5s", oq->id);
       mq->interleave = 5 * GST_SECOND;
       /* Update max-size time */
       mq->max_size.time = mq->interleave;
       SET_CHILD_PROPERTY (mq, time);
       goto beach;
     }
-    if (GST_CLOCK_STIME_IS_VALID (sq->cached_sinktime)) {
-      if (low == GST_CLOCK_STIME_NONE || sq->cached_sinktime < low)
-        low = sq->cached_sinktime;
-      if (high == GST_CLOCK_STIME_NONE || sq->cached_sinktime > high)
-        high = sq->cached_sinktime;
+
+    /* Calculate within each streaming thread */
+    if (sq && sq->thread != oq->thread) {
+      if (oq->interleave > other_interleave)
+        other_interleave = oq->interleave;
+      continue;
+    }
+
+    if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime)) {
+      if (low == GST_CLOCK_STIME_NONE || oq->cached_sinktime < low)
+        low = oq->cached_sinktime;
+      if (high == GST_CLOCK_STIME_NONE || oq->cached_sinktime > high)
+        high = oq->cached_sinktime;
     }
     GST_LOG_OBJECT (mq,
         "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
-        " high:%" GST_STIME_FORMAT, sq->id,
-        GST_STIME_ARGS (sq->cached_sinktime), GST_STIME_ARGS (low),
+        " high:%" GST_STIME_FORMAT, oq->id,
+        GST_STIME_ARGS (oq->cached_sinktime), GST_STIME_ARGS (low),
         GST_STIME_ARGS (high));
   }
 
@@ -1355,6 +1386,10 @@ calculate_interleave (GstMultiQueue * mq)
     interleave = high - low;
     /* Padding of interleave and minimum value */
     interleave = (150 * interleave / 100) + mq->min_interleave_time;
+    if (sq)
+      sq->interleave = interleave;
+
+    interleave = MAX (interleave, other_interleave);
 
     /* Update the stored interleave if:
      * * No data has arrived yet (high == low)
@@ -1413,7 +1448,7 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
       sq->sink_tainted = FALSE;
       if (mq->use_interleave) {
         sq->cached_sinktime = sink_time;
-        calculate_interleave (mq);
+        calculate_interleave (mq, sq);
       }
     }
   } else
@@ -1585,8 +1620,6 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
     if (GST_CLOCK_TIME_IS_VALID (btime)) {
       if (end && GST_BUFFER_DURATION_IS_VALID (buf))
         btime += GST_BUFFER_DURATION (buf);
-      if (btime > segment->stop)
-        btime = segment->stop;
       time = my_segment_to_running_time (segment, btime);
     }
   } else if (GST_IS_BUFFER_LIST (object)) {
@@ -1602,8 +1635,6 @@ get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
       if (GST_CLOCK_TIME_IS_VALID (btime)) {
         if (end && GST_BUFFER_DURATION_IS_VALID (buf))
           btime += GST_BUFFER_DURATION (buf);
-        if (btime > segment->stop)
-          btime = segment->stop;
         time = my_segment_to_running_time (segment, btime);
         if (!end)
           goto done;
@@ -1667,11 +1698,19 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     event = GST_EVENT_CAST (object);
 
     switch (GST_EVENT_TYPE (event)) {
+      case GST_EVENT_SEGMENT_DONE:
+        *allow_drop = FALSE;
+        break;
       case GST_EVENT_EOS:
         result = GST_FLOW_EOS;
         if (G_UNLIKELY (*allow_drop))
           *allow_drop = FALSE;
         break;
+      case GST_EVENT_STREAM_START:
+        result = GST_FLOW_OK;
+        if (G_UNLIKELY (*allow_drop))
+          *allow_drop = FALSE;
+        break;
       case GST_EVENT_SEGMENT:
         apply_segment (mq, sq, event, &sq->src_segment);
         /* Applying the segment may have made the queue non-full again, unblock it if needed */
@@ -2075,7 +2114,7 @@ out_flushing:
      * has returned an error flow return. After EOS there
      * will be no further buffer which could propagate the
      * error upstream */
-    if (sq->is_eos && sq->srcresult < GST_FLOW_EOS) {
+    if ((sq->is_eos || sq->is_segment_done) && sq->srcresult < GST_FLOW_EOS) {
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       GST_ELEMENT_FLOW_ERROR (mq, sq->srcresult);
     } else {
@@ -2155,7 +2194,7 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
           "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
           GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
           GST_STIME_ARGS (sq->cached_sinktime));
-      calculate_interleave (mq);
+      calculate_interleave (mq, sq);
     }
     GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   }
@@ -2285,6 +2324,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       res = gst_pad_push_event (sq->srcpad, event);
 
       gst_single_queue_flush (mq, sq, TRUE, FALSE);
+      gst_single_queue_pause (mq, sq);
       goto done;
 
     case GST_EVENT_FLUSH_STOP:
@@ -2294,6 +2334,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       res = gst_pad_push_event (sq->srcpad, event);
 
       gst_single_queue_flush (mq, sq, FALSE, FALSE);
+      gst_single_queue_start (mq, sq);
 
 #ifdef TIZEN_FEATURE_MQ_MODIFICATION
       /* need to reset the buffering data after seeking */
@@ -2309,6 +2350,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 #endif
       goto done;
     case GST_EVENT_SEGMENT:
+      sq->is_segment_done = FALSE;
       sref = gst_event_ref (event);
       break;
     case GST_EVENT_GAP:
@@ -2326,7 +2368,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
           stime = my_segment_to_running_time (&sq->sink_segment, val);
           if (GST_CLOCK_STIME_IS_VALID (stime)) {
             sq->cached_sinktime = stime;
-            calculate_interleave (mq);
+            calculate_interleave (mq, sq);
           }
           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
         }
@@ -2360,6 +2402,14 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
   /* mark EOS when we received one, we must do that after putting the
    * buffer in the queue because EOS marks the buffer as filled. */
   switch (type) {
+    case GST_EVENT_SEGMENT_DONE:
+      sq->is_segment_done = TRUE;
+      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+      update_buffering (mq, sq);
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+      single_queue_overrun_cb (sq->queue, sq);
+      gst_multi_queue_post_buffering (mq);
+      break;
     case GST_EVENT_EOS:
       GST_MULTI_QUEUE_MUTEX_LOCK (mq);
       sq->is_eos = TRUE;
@@ -2518,11 +2568,11 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
   switch (mode) {
     case GST_PAD_MODE_PUSH:
       if (active) {
-        result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
+        gst_single_queue_flush (mq, sq, FALSE, TRUE);
+        result = parent ? gst_single_queue_start (mq, sq) : TRUE;
       } else {
-        result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
-        /* make sure streaming finishes */
-        result |= gst_pad_stop_task (pad);
+        gst_single_queue_flush (mq, sq, TRUE, TRUE);
+        result = gst_single_queue_stop (mq, sq);
       }
       break;
     default:
@@ -2888,7 +2938,7 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
       sq->max_size.bytes, sq->cur_time, sq->max_size.time);
 
   /* we are always filled on EOS */
-  if (sq->is_eos)
+  if (sq->is_eos || sq->is_segment_done)
     return TRUE;
 
   /* we never go past the max visible items unless we are in buffering mode */
@@ -3100,6 +3150,9 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
   }
   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
+  if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
+    gst_single_queue_start (mqueue, sq);
+  }
   g_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
 
   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
index 66eb57e..051e20f 100644 (file)
@@ -561,7 +561,8 @@ update_time_level (GstQueue * queue)
   GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
-  if (sink_time >= src_time)
+  if (GST_CLOCK_STIME_IS_VALID (src_time)
+      && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
   else
     queue->cur_level.time = 0;
@@ -936,6 +937,9 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
   queue = GST_QUEUE (parent);
 
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+      GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
       STATUS (queue, pad, "received flush start event");
@@ -988,6 +992,14 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
         GST_QUEUE_MUTEX_LOCK (queue);
+
+        /* STREAM_START and SEGMENT reset the EOS status of a
+         * pad. Change the cached sinkpad flow result accordingly */
+        if (queue->srcresult == GST_FLOW_EOS
+            && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+                || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+          queue->srcresult = GST_FLOW_OK;
+
         if (queue->srcresult != GST_FLOW_OK) {
           /* Errors in sticky event pushing are no problem and ignored here
            * as they will cause more meaningful errors during data flow.
@@ -1008,9 +1020,30 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
             goto out_flow_error;
           }
         }
-        /* refuse more events on EOS */
-        if (queue->eos)
-          goto out_eos;
+
+        /* refuse more events on EOS unless they unset the EOS status */
+        if (queue->eos) {
+          switch (GST_EVENT_TYPE (event)) {
+            case GST_EVENT_STREAM_START:
+            case GST_EVENT_SEGMENT:
+              /* Restart the loop */
+              if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+                queue->srcresult = GST_FLOW_OK;
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+                gst_pad_start_task (queue->srcpad,
+                    (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
+              } else {
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+              }
+
+              break;
+            default:
+              goto out_eos;
+          }
+        }
+
         gst_queue_locked_enqueue_event (queue, event);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -1424,7 +1457,8 @@ next:
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
-          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+              || type == GST_EVENT_STREAM_START) {
             /* we found a pushable item in the queue, push it out */
             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
                 "pushing pushable event %s after EOS",
index 03c2613..d2722fd 100644 (file)
@@ -989,11 +989,11 @@ get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
 
-  if (queue->is_eos) {
-    /* on EOS we are always 100% full, we set the var here so that it we can
-     * reuse the logic below to stop buffering */
+  if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
+    /* on EOS and NOT_LINKED we are always 100% full, we set the var
+     * here so that we can reuse the logic below to stop buffering */
     buflevel = MAX_BUFFERING_LEVEL;
-    GST_LOG_OBJECT (queue, "we are EOS");
+    GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
   } else {
     GST_LOG_OBJECT (queue,
         "Cur level bytes/time/buffers %u/%" GST_TIME_FORMAT "/%u",
@@ -2626,6 +2626,9 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
 
   queue = GST_QUEUE2 (parent);
 
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+      GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
@@ -2719,6 +2722,14 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
     default:
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
+
+        /* STREAM_START and SEGMENT reset the EOS status of a
+         * pad. Change the cached sinkpad flow result accordingly */
+        if (queue->sinkresult == GST_FLOW_EOS
+            && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+                || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+          queue->sinkresult = GST_FLOW_OK;
+
         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
         if (queue->srcresult != GST_FLOW_OK) {
           /* Errors in sticky event pushing are no problem and ignored here
@@ -2736,9 +2747,36 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
             goto out_flow_error;
           }
         }
-        /* refuse more events on EOS */
-        if (queue->is_eos)
-          goto out_eos;
+
+        /* refuse more events on EOS unless they unset the EOS status */
+        if (queue->is_eos) {
+          switch (GST_EVENT_TYPE (event)) {
+            case GST_EVENT_STREAM_START:
+            case GST_EVENT_SEGMENT:
+              /* Restart the loop */
+              if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+                queue->srcresult = GST_FLOW_OK;
+                queue->is_eos = FALSE;
+                queue->unexpected = FALSE;
+                queue->seeking = FALSE;
+                queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
+                /* reset rate counters */
+                reset_rate_timer (queue);
+                gst_pad_start_task (queue->srcpad,
+                    (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
+              } else {
+                queue->is_eos = FALSE;
+                queue->unexpected = FALSE;
+                queue->seeking = FALSE;
+                queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
+              }
+
+              break;
+            default:
+              goto out_eos;
+          }
+        }
+
         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
         gst_queue2_post_buffering (queue);
@@ -3022,7 +3060,8 @@ gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
       GstEvent *event = GST_EVENT_CAST (data);
       GstEventType type = GST_EVENT_TYPE (event);
 
-      if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+      if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+          || type == GST_EVENT_STREAM_START) {
         /* we found a pushable item in the queue, push it out */
         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
@@ -3054,7 +3093,7 @@ gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
 static GstFlowReturn
 gst_queue2_push_one (GstQueue2 * queue)
 {
-  GstFlowReturn result = queue->srcresult;
+  GstFlowReturn result;
   GstMiniObject *data;
   GstQueue2ItemType item_type;
 
@@ -3063,6 +3102,7 @@ gst_queue2_push_one (GstQueue2 * queue)
     goto no_item;
 
 next:
+  result = queue->srcresult;
   STATUS (queue, queue->srcpad, "We have something dequeud");
   g_atomic_int_set (&queue->downstream_may_block,
       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
@@ -3238,6 +3278,12 @@ out_flushing:
     GST_QUEUE2_MUTEX_UNLOCK (queue);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
+    /* Recalculate buffering levels before stopping since the source flow
+     * might cause a different buffering level (like NOT_LINKED making
+     * the queue appear as full) */
+    if (queue->use_buffering)
+      update_buffering (queue);
+    gst_queue2_post_buffering (queue);
     /* let app know about us giving up if upstream is not expected to do so */
     /* EOS is already taken care of elsewhere */
     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {