multiqueue: Fix potential deadlock with parallel release_pad calls
[platform/upstream/gstreamer.git] / subprojects / gstreamer / plugins / elements / gstmultiqueue.c
index e13d317..cf4edc8 100644 (file)
@@ -118,6 +118,11 @@ struct _GstSingleQueue
   guint id;
   /* group of streams to which this queue belongs to */
   guint groupid;
+
+#ifndef GST_DISABLE_GST_DEBUG
+  /* debug identifier */
+  gchar *debug_id;
+#endif
   GstClockTimeDiff group_high_time;
 
   GWeakRef mqueue;
@@ -147,6 +152,15 @@ struct _GstSingleQueue
   /* TRUE if either position needs to be recalculated */
   gboolean sink_tainted, src_tainted;
 
+  /* stream group id */
+  guint32 sink_stream_gid;
+  guint32 src_stream_gid;
+
+  /* TRUE if the stream group-id changed. Resetted to FALSE the next time the
+   * segment is calculated */
+  gboolean sink_stream_gid_changed;
+  gboolean src_stream_gid_changed;
+
   /* queue of data */
   GstDataQueue *queue;
   GstDataQueueSize max_size, extra_size;
@@ -876,6 +890,7 @@ gst_multi_queue_init (GstMultiQueue * mqueue)
   mqueue->high_time = GST_CLOCK_STIME_NONE;
 
   g_mutex_init (&mqueue->qlock);
+  g_mutex_init (&mqueue->reconf_lock);
   g_mutex_init (&mqueue->buffering_post_lock);
 }
 
@@ -890,6 +905,7 @@ gst_multi_queue_finalize (GObject * object)
 
   /* free/unref instance data */
   g_mutex_clear (&mqueue->qlock);
+  g_mutex_clear (&mqueue->reconf_lock);
   g_mutex_clear (&mqueue->buffering_post_lock);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
@@ -935,8 +951,8 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
         GstSingleQueue *q = (GstSingleQueue *) tmp->data;
         gst_data_queue_get_level (q->queue, &size);
 
-        GST_DEBUG_OBJECT (mq, "Queue %d: Requested buffers size: %d,"
-            " current: %d, current max %d", q->id, new_size, size.visible,
+        GST_DEBUG_ID (q->debug_id, "Requested buffers size: %d,"
+            " current: %d, current max %d", new_size, size.visible,
             q->max_size.visible);
 
         /* do not reduce max size below current level if the single queue
@@ -1190,8 +1206,10 @@ gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
     GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
   }
 
+  g_mutex_lock (&mqueue->reconf_lock);
   /* Create a new single queue, add the sink and source pad and return the sink pad */
   squeue = gst_single_queue_new (mqueue, temp_id);
+  g_mutex_unlock (&mqueue->reconf_lock);
 
   new_pad = squeue ? g_weak_ref_get (&squeue->sinkpad) : NULL;
   /* request pad assumes the element is owning the ref of the pad it returns */
@@ -1213,6 +1231,12 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
 
   GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
+  /* Take the reconfiguration lock before the qlock to avoid deadlocks
+   * from two release_pad running in parallel on different mqueue slots.
+   * We need reconf_lock for removing the singlequeue from the list, to
+   * prevent overlapping release/request from causing problems */
+  g_mutex_lock (&mqueue->reconf_lock);
+
   GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
   /* Find which single queue it belongs to, knowing that it should be a sinkpad */
   for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
@@ -1232,6 +1256,7 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
     gst_clear_object (&srcpad);
     GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
     GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+    g_mutex_unlock (&mqueue->reconf_lock);
     return;
   }
 
@@ -1254,6 +1279,8 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
   gst_element_remove_pad (element, sinkpad);
   gst_object_unref (srcpad);
   gst_object_unref (sinkpad);
+
+  g_mutex_unlock (&mqueue->reconf_lock);
 }
 
 static GstStateChangeReturn
@@ -1272,6 +1299,7 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
       for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
         sq = (GstSingleQueue *) tmp->data;
         sq->flushing = FALSE;
+        sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID;
       }
 
       /* the visible limit might not have been set on single queues that have grown because of other queueus were empty */
@@ -1295,6 +1323,7 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
         sq->last_query = FALSE;
         g_cond_signal (&sq->query_handled);
       }
+      mqueue->interleave_incomplete = FALSE;
       GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
       break;
     }
@@ -1318,7 +1347,7 @@ gst_single_queue_start (GstMultiQueue * mq, GstSingleQueue * sq)
   gboolean res = FALSE;
   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
 
-  GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
+  GST_LOG_ID (sq->debug_id, "starting task");
 
   if (srcpad) {
     res = gst_pad_start_task (srcpad,
@@ -1335,13 +1364,14 @@ gst_single_queue_pause (GstMultiQueue * mq, GstSingleQueue * sq)
   gboolean result = FALSE;
   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
 
-  GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
+  GST_LOG_ID (sq->debug_id, "pausing task");
   if (srcpad) {
     result = gst_pad_pause_task (srcpad);
     gst_object_unref (srcpad);
   }
 
   sq->sink_tainted = sq->src_tainted = TRUE;
+
   return result;
 }
 
@@ -1351,12 +1381,13 @@ gst_single_queue_stop (GstMultiQueue * mq, GstSingleQueue * sq)
   gboolean result = FALSE;
   GstPad *srcpad = g_weak_ref_get (&sq->srcpad);
 
-  GST_LOG_OBJECT (mq, "SingleQueue %d : stopping task", sq->id);
+  GST_LOG_ID (sq->debug_id, "stopping task");
   if (srcpad) {
     result = gst_pad_stop_task (srcpad);
     gst_object_unref (srcpad);
   }
   sq->sink_tainted = sq->src_tainted = TRUE;
+
   return result;
 }
 
@@ -1364,8 +1395,7 @@ static void
 gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
     gboolean full)
 {
-  GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
-      sq->id);
+  GST_DEBUG_ID (sq->debug_id, "flush %s", (flush ? "start" : "stop"));
 
   if (flush) {
     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
@@ -1375,8 +1405,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
     sq->flushing = TRUE;
 
     /* wake up non-linked task */
-    GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
-        sq->id);
+    GST_LOG_ID (sq->debug_id, "Waking up eventually waiting task");
     g_cond_signal (&sq->turn);
     sq->last_query = FALSE;
     g_cond_signal (&sq->query_handled);
@@ -1424,9 +1453,9 @@ get_buffering_level (GstMultiQueue * mq, GstSingleQueue * sq)
 
   gst_data_queue_get_level (sq->queue, &size);
 
-  GST_DEBUG_OBJECT (mq,
-      "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
-      G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
+  GST_DEBUG_ID (sq->debug_id,
+      "visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
+      G_GUINT64_FORMAT, size.visible, sq->max_size.visible,
       size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
 
   /* get bytes and time buffer levels and take the max */
@@ -1569,6 +1598,7 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
 {
   GstClockTimeDiff low, high;
   GstClockTime interleave, other_interleave = 0;
+  gboolean some_inactive = FALSE;
   GList *tmp;
 
   low = high = GST_CLOCK_STIME_NONE;
@@ -1579,16 +1609,12 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
     /* Ignore sparse streams for interleave calculation */
     if (oq->is_sparse)
       continue;
-    /* If a stream is not active yet (hasn't received any buffers), set
-     * a maximum interleave to allow it to receive more data */
+
+    /* If some streams aren't active yet (haven't received any buffers), we will
+     * grow interleave accordingly */
     if (!oq->active) {
-      GST_LOG_OBJECT (mq,
-          "queue %d is not active yet, forcing interleave to 5s", oq->id);
-      mq->interleave = 5 * GST_SECOND;
-      /* Update max-size time */
-      mq->max_size.time = mq->interleave;
-      SET_CHILD_PROPERTY (mq, time);
-      goto beach;
+      some_inactive = TRUE;
+      continue;
     }
 
     /* Calculate within each streaming thread */
@@ -1598,20 +1624,33 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
       continue;
     }
 
-    if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime)) {
+    /* If the stream isn't EOS, update the low/high input value */
+    if (GST_CLOCK_STIME_IS_VALID (oq->cached_sinktime) && !oq->is_eos) {
       if (low == GST_CLOCK_STIME_NONE || oq->cached_sinktime < low)
         low = oq->cached_sinktime;
       if (high == GST_CLOCK_STIME_NONE || oq->cached_sinktime > high)
         high = oq->cached_sinktime;
+
+      /* If the input is before the segment start, consider as inactive to allow
+       * the interleave to grow until *all* streams have data within the segment.
+       *
+       * The reason for this is that there is no requirements for data before
+       * the segment start to be "aligned" and therefore interleave calculation
+       * can't reliably be done. For example a demuxer could provide video data
+       * from the previous keyframe but audio only from just before the segment
+       * start */
+      if (oq->cached_sinktime < 0)
+        some_inactive = TRUE;
     }
-    GST_LOG_OBJECT (mq,
-        "queue %d , sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
-        " high:%" GST_STIME_FORMAT, oq->id,
+    GST_LOG_ID (oq->debug_id,
+        "sinktime:%" GST_STIME_FORMAT " low:%" GST_STIME_FORMAT
+        " high:%" GST_STIME_FORMAT,
         GST_STIME_ARGS (oq->cached_sinktime), GST_STIME_ARGS (low),
         GST_STIME_ARGS (high));
   }
 
   if (GST_CLOCK_STIME_IS_VALID (low) && GST_CLOCK_STIME_IS_VALID (high)) {
+    gboolean do_update = high == low;
     interleave = high - low;
     /* Padding of interleave and minimum value */
     interleave = (150 * interleave / 100) + mq->min_interleave_time;
@@ -1620,11 +1659,28 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
 
     interleave = MAX (interleave, other_interleave);
 
+    /* Progressively grow up the interleave up to 5s if some streams were inactive */
+    if (some_inactive && interleave <= mq->interleave) {
+      interleave = MIN (5 * GST_SECOND, mq->interleave + 500 * GST_MSECOND);
+      do_update = TRUE;
+    }
+
+    /* We force the interleave update if:
+     * * the interleave was previously set while some streams were not active
+     *   yet but they now all are
+     * * OR the interleave was previously based on all streams being active
+     *   whereas some now aren't
+     */
+    if (mq->interleave_incomplete != some_inactive)
+      do_update = TRUE;
+
+    mq->interleave_incomplete = some_inactive;
+
     /* Update the stored interleave if:
      * * No data has arrived yet (high == low)
      * * Or it went higher
      * * Or it went lower and we've gone past the previous interleave needed */
-    if (high == low || interleave > mq->interleave ||
+    if (do_update || interleave > mq->interleave ||
         ((mq->last_interleave_update + (2 * MIN (GST_SECOND,
                         mq->interleave)) < low)
             && interleave < (mq->interleave * 3 / 4))) {
@@ -1637,7 +1693,6 @@ calculate_interleave (GstMultiQueue * mq, GstSingleQueue * sq)
     }
   }
 
-beach:
   GST_DEBUG_OBJECT (mq,
       "low:%" GST_STIME_FORMAT " high:%" GST_STIME_FORMAT " interleave:%"
       GST_TIME_FORMAT " mq->interleave:%" GST_TIME_FORMAT
@@ -1660,9 +1715,9 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
     sink_time = sq->sinktime = my_segment_to_running_time (&sq->sink_segment,
         sq->sink_segment.position);
 
-    GST_DEBUG_OBJECT (mq,
-        "queue %d sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%"
-        GST_STIME_FORMAT, sq->id, GST_TIME_ARGS (sq->sink_segment.position),
+    GST_DEBUG_ID (sq->debug_id,
+        "sink_segment.position:%" GST_TIME_FORMAT ", sink_time:%"
+        GST_STIME_FORMAT, GST_TIME_ARGS (sq->sink_segment.position),
         GST_STIME_ARGS (sink_time));
 
     if (G_UNLIKELY (sq->last_time == GST_CLOCK_STIME_NONE)) {
@@ -1711,8 +1766,8 @@ update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
   } else
     src_time = sq->srctime;
 
-  GST_DEBUG_OBJECT (mq,
-      "queue %d, sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT, sq->id,
+  GST_DEBUG_ID (sq->debug_id,
+      "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
   /* This allows for streams with out of order timestamping - sometimes the
@@ -1735,6 +1790,23 @@ static void
 apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
     GstSegment * segment)
 {
+  GstClockTimeDiff ppos = 0;
+
+  /* If we switched groups, grab the previous position */
+  if (segment->rate > 0.0) {
+    if (segment == &sq->sink_segment && sq->sink_stream_gid_changed) {
+      ppos =
+          gst_segment_to_running_time (segment, GST_FORMAT_TIME,
+          segment->position);
+      sq->sink_stream_gid_changed = FALSE;
+    } else if (segment == &sq->src_segment && sq->src_stream_gid_changed) {
+      ppos =
+          gst_segment_to_running_time (segment, GST_FORMAT_TIME,
+          segment->position);
+      sq->src_stream_gid_changed = FALSE;
+    }
+  }
+
   gst_event_copy_segment (event, segment);
 
   /* now configure the values, we use these to track timestamps on the
@@ -1749,12 +1821,19 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
   }
   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
 
+  if (ppos) {
+    GST_DEBUG_ID (sq->debug_id, "Applying base of %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (ppos));
+    segment->base = ppos;
+  }
+
   /* Make sure we have a valid initial segment position (and not garbage
    * from upstream) */
   if (segment->rate > 0.0)
     segment->position = segment->start;
   else
     segment->position = segment->stop;
+
   if (segment == &sq->sink_segment)
     sq->sink_tainted = TRUE;
   else {
@@ -1762,8 +1841,8 @@ apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
     sq->src_tainted = TRUE;
   }
 
-  GST_DEBUG_OBJECT (mq,
-      "queue %d, configured SEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
+  GST_DEBUG_ID (sq->debug_id,
+      "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
 
   /* segment can update the time level of the queue */
   update_time_level (mq, sq);
@@ -1788,9 +1867,8 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
   if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
-      sq->id, segment == &sq->sink_segment ? "sink" : "src",
-      GST_TIME_ARGS (timestamp));
+  GST_DEBUG_ID (sq->debug_id, "%s position updated to %" GST_TIME_FORMAT,
+      segment == &sq->sink_segment ? "sink" : "src", GST_TIME_ARGS (timestamp));
 
   segment->position = timestamp;
 
@@ -1822,8 +1900,9 @@ apply_gap (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
       timestamp += duration;
     }
 
-    GST_DEBUG_OBJECT (mq, "queue %d, %s position updated to %" GST_TIME_FORMAT,
-        sq->id, segment == &sq->sink_segment ? "sink" : "src",
+    GST_DEBUG_ID (sq->debug_id,
+        "%s position updated to %" GST_TIME_FORMAT,
+        segment == &sq->sink_segment ? "sink" : "src",
         GST_TIME_ARGS (timestamp));
 
     segment->position = timestamp;
@@ -1930,14 +2009,14 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     gst_data_queue_limits_changed (sq->queue);
 
     if (G_UNLIKELY (*allow_drop)) {
-      GST_DEBUG_OBJECT (mq,
-          "SingleQueue %d : Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
-          sq->id, buffer, GST_TIME_ARGS (timestamp));
+      GST_DEBUG_ID (sq->debug_id,
+          "Dropping EOS buffer %p with ts %" GST_TIME_FORMAT,
+          buffer, GST_TIME_ARGS (timestamp));
       gst_buffer_unref (buffer);
     } else {
-      GST_DEBUG_OBJECT (mq,
-          "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
-          sq->id, buffer, GST_TIME_ARGS (timestamp));
+      GST_DEBUG_ID (sq->debug_id,
+          "Pushing buffer %p with ts %" GST_TIME_FORMAT,
+          buffer, GST_TIME_ARGS (timestamp));
       result = gst_pad_push (srcpad, buffer);
     }
   } else if (GST_IS_EVENT (object)) {
@@ -1955,10 +2034,21 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
           *allow_drop = FALSE;
         break;
       case GST_EVENT_STREAM_START:
+      {
+        guint32 group_id;
+        if (gst_event_parse_group_id (event, &group_id)) {
+          if (sq->src_stream_gid == GST_GROUP_ID_INVALID) {
+            sq->src_stream_gid = group_id;
+          } else if (group_id != sq->src_stream_gid) {
+            sq->src_stream_gid = group_id;
+            sq->src_stream_gid_changed = TRUE;
+          }
+        }
         result = GST_FLOW_OK;
         if (G_UNLIKELY (*allow_drop))
           *allow_drop = FALSE;
         break;
+      }
       case GST_EVENT_SEGMENT:
         apply_segment (mq, sq, event, &sq->src_segment);
         /* Applying the segment may have made the queue non-full again, unblock it if needed */
@@ -1978,14 +2068,13 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     }
 
     if (G_UNLIKELY (*allow_drop)) {
-      GST_DEBUG_OBJECT (mq,
-          "SingleQueue %d : Dropping EOS event %p of type %s",
-          sq->id, event, GST_EVENT_TYPE_NAME (event));
+      GST_DEBUG_ID (sq->debug_id,
+          "Dropping EOS event %p of type %s",
+          event, GST_EVENT_TYPE_NAME (event));
       gst_event_unref (event);
     } else {
-      GST_DEBUG_OBJECT (mq,
-          "SingleQueue %d : Pushing event %p of type %s",
-          sq->id, event, GST_EVENT_TYPE_NAME (event));
+      GST_DEBUG_ID (sq->debug_id,
+          "Pushing event %p of type %s", event, GST_EVENT_TYPE_NAME (event));
 
       gst_pad_push_event (srcpad, event);
     }
@@ -1996,8 +2085,7 @@ gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
     query = GST_QUERY_CAST (object);
 
     if (G_UNLIKELY (*allow_drop)) {
-      GST_DEBUG_OBJECT (mq,
-          "SingleQueue %d : Dropping EOS query %p", sq->id, query);
+      GST_DEBUG_ID (sq->debug_id, "Dropping EOS query %p", query);
       gst_query_unref (query);
       res = FALSE;
     } else {
@@ -2092,6 +2180,7 @@ gst_multi_queue_loop (GstPad * pad)
   GstFlowReturn result;
   GstClockTimeDiff next_time;
   gboolean is_buffer;
+  gboolean is_query = FALSE;
   gboolean do_update_buffering = FALSE;
   gboolean dropping = FALSE;
   GstPad *srcpad = NULL;
@@ -2101,10 +2190,10 @@ gst_multi_queue_loop (GstPad * pad)
   srcpad = g_weak_ref_get (&sq->srcpad);
 
   if (!mq || !srcpad)
-    goto out_flushing;
+    goto done;
 
 next:
-  GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
+  GST_DEBUG_ID (sq->debug_id, "trying to pop an object");
 
   if (sq->flushing)
     goto out_flushing;
@@ -2117,6 +2206,8 @@ next:
   item = (GstMultiQueueItem *) sitem;
   newid = item->posid;
 
+  is_query = item->is_query;
+
   /* steal the object and destroy the item */
   object = gst_multi_queue_item_steal_object (item);
   gst_multi_queue_item_destroy (item);
@@ -2126,8 +2217,7 @@ next:
   /* Get running time of the item. Events will have GST_CLOCK_STIME_NONE */
   next_time = get_running_time (&sq->src_segment, object, FALSE);
 
-  GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
-      sq->id, newid, sq->last_oldid);
+  GST_LOG_ID (sq->debug_id, "newid:%d , oldid:%d", newid, sq->last_oldid);
 
   /* If we're not-linked, we do some extra work because we might need to
    * wait before pushing. If we're linked but there's a gap in the IDs,
@@ -2138,7 +2228,7 @@ next:
   if (sq->srcresult == GST_FLOW_NOT_LINKED
       || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
       || sq->last_oldid > mq->highid) {
-    GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
+    GST_LOG_ID (sq->debug_id, "CHECKING srcresult: %s",
         gst_flow_get_name (sq->srcresult));
 
     /* Check again if we're flushing after the lock is taken,
@@ -2168,7 +2258,7 @@ next:
       /* Recompute the high time */
       compute_high_time (mq, sq->groupid);
 
-      GST_DEBUG_OBJECT (mq,
+      GST_DEBUG_ID (sq->debug_id,
           "groupid %d high_time %" GST_STIME_FORMAT " next_time %"
           GST_STIME_FORMAT, sq->groupid, GST_STIME_ARGS (sq->group_high_time),
           GST_STIME_ARGS (next_time));
@@ -2187,10 +2277,10 @@ next:
 
       while (should_wait && sq->srcresult == GST_FLOW_NOT_LINKED) {
 
-        GST_DEBUG_OBJECT (mq,
-            "queue %d sleeping for not-linked wakeup with "
+        GST_DEBUG_ID (sq->debug_id,
+            "Sleeping for not-linked wakeup with "
             "newid %u, highid %u, next_time %" GST_STIME_FORMAT
-            ", high_time %" GST_STIME_FORMAT, sq->id, newid, mq->highid,
+            ", high_time %" GST_STIME_FORMAT, newid, mq->highid,
             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time));
 
         /* Wake up all non-linked pads before we sleep */
@@ -2209,10 +2299,10 @@ next:
         compute_high_time (mq, sq->groupid);
         compute_high_id (mq);
 
-        GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
+        GST_DEBUG_ID (sq->debug_id, "Woken from sleeping for not-linked "
             "wakeup with newid %u, highid %u, next_time %" GST_STIME_FORMAT
             ", high_time %" GST_STIME_FORMAT " mq high_time %" GST_STIME_FORMAT,
-            sq->id, newid, mq->highid,
+            newid, mq->highid,
             GST_STIME_ARGS (next_time), GST_STIME_ARGS (sq->group_high_time),
             GST_STIME_ARGS (mq->high_time));
 
@@ -2247,7 +2337,7 @@ next:
   if (sq->flushing)
     goto out_flushing;
 
-  GST_LOG_OBJECT (mq, "sq:%d BEFORE PUSHING sq->srcresult: %s", sq->id,
+  GST_LOG_ID (sq->debug_id, "BEFORE PUSHING sq->srcresult: %s",
       gst_flow_get_name (sq->srcresult));
 
   /* Update time stats */
@@ -2281,8 +2371,7 @@ next:
       && result == GST_FLOW_NOT_LINKED) {
     GList *tmp;
 
-    GST_LOG_OBJECT (mq, "SingleQueue %d : Changed from active to non-active",
-        sq->id);
+    GST_LOG_ID (sq->debug_id, "Changed from active to non-active");
 
     compute_high_id (mq);
     compute_high_time (mq, sq->groupid);
@@ -2295,7 +2384,7 @@ next:
         GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
 
         if (sq2->srcresult == GST_FLOW_NOT_LINKED) {
-          GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq2->id);
+          GST_LOG_ID (sq2->debug_id, "Waking up singlequeue");
           sq2->pushed = FALSE;
           sq2->srcresult = GST_FLOW_OK;
           g_cond_signal (&sq2->turn);
@@ -2313,7 +2402,7 @@ next:
    * sure we are relaying the correct info wrt proper segment */
   if (result == GST_FLOW_EOS && !dropping &&
       sq->srcresult != GST_FLOW_NOT_LINKED) {
-    GST_DEBUG_OBJECT (mq, "starting EOS drop on sq %d", sq->id);
+    GST_DEBUG_ID (sq->debug_id, "starting EOS drop");
     dropping = TRUE;
     /* pretend we have not seen EOS yet for upstream's sake */
     result = sq->srcresult;
@@ -2321,7 +2410,7 @@ next:
     /* queue empty, so stop dropping
      * we can commit the result we have now,
      * which is either OK after a segment, or EOS */
-    GST_DEBUG_OBJECT (mq, "committed EOS drop on sq %d", sq->id);
+    GST_DEBUG_ID (sq->debug_id, "committed EOS drop");
     dropping = FALSE;
     result = GST_FLOW_EOS;
   }
@@ -2334,8 +2423,9 @@ next:
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
   gst_multi_queue_post_buffering (mq);
 
-  GST_LOG_OBJECT (mq, "sq:%d AFTER PUSHING sq->srcresult: %s (is_eos:%d)",
-      sq->id, gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (srcpad));
+  GST_LOG_ID (sq->debug_id,
+      "AFTER PUSHING sq->srcresult: %s (is_eos:%d)",
+      gst_flow_get_name (sq->srcresult), GST_PAD_IS_EOS (srcpad));
 
   /* Need to make sure wake up any sleeping pads when we exit */
   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
@@ -2362,7 +2452,7 @@ done:
 
 out_flushing:
   {
-    if (object && !GST_IS_QUERY (object))
+    if (object && !is_query)
       gst_mini_object_unref (object);
 
     GST_MULTI_QUEUE_MUTEX_LOCK (mq);
@@ -2389,9 +2479,8 @@ out_flushing:
     single_queue_underrun_cb (sq->queue, sq);
     gst_data_queue_set_flushing (sq->queue, TRUE);
     gst_pad_pause_task (srcpad);
-    GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
-        "SingleQueue[%d] task paused, reason:%s",
-        sq->id, gst_flow_get_name (sq->srcresult));
+    GST_LOG_ID (sq->debug_id,
+        "task paused, reason:%s", gst_flow_get_name (sq->srcresult));
     goto done;
   }
 }
@@ -2416,7 +2505,7 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   mq = g_weak_ref_get (&sq->mqueue);
 
   if (!mq)
-    goto flushing;
+    goto done;
 
   /* if eos, we are always full, so avoid hanging incoming indefinitely */
   if (sq->is_eos)
@@ -2430,10 +2519,10 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
   duration = GST_BUFFER_DURATION (buffer);
 
-  GST_LOG_OBJECT (mq,
-      "SingleQueue %d : about to enqueue buffer %p with id %d (pts:%"
+  GST_LOG_ID (sq->debug_id,
+      "About to enqueue buffer %p with id %d (pts:%"
       GST_TIME_FORMAT " dts:%" GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT ")",
-      sq->id, buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
+      buffer, curid, GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
       GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), GST_TIME_ARGS (duration));
 
   item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
@@ -2452,9 +2541,9 @@ gst_multi_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
     dval = my_segment_to_running_time (&sq->sink_segment, val);
     if (GST_CLOCK_STIME_IS_VALID (dval)) {
       sq->cached_sinktime = dval;
-      GST_DEBUG_OBJECT (mq,
-          "Queue %d cached sink time now %" G_GINT64_FORMAT " %"
-          GST_STIME_FORMAT, sq->id, sq->cached_sinktime,
+      GST_DEBUG_ID (sq->debug_id,
+          "Cached sink time now %" G_GINT64_FORMAT " %"
+          GST_STIME_FORMAT, sq->cached_sinktime,
           GST_STIME_ARGS (sq->cached_sinktime));
       calculate_interleave (mq, sq);
     }
@@ -2475,8 +2564,8 @@ done:
   /* ERRORS */
 flushing:
   {
-    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
-        sq->id, gst_flow_get_name (sq->srcresult));
+    GST_LOG_ID (sq->debug_id, "exit because task paused, reason: %s",
+        gst_flow_get_name (sq->srcresult));
     if (item)
       gst_multi_queue_item_destroy (item);
     goto done;
@@ -2577,11 +2666,20 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
   switch (type) {
     case GST_EVENT_STREAM_START:
     {
+      guint32 group_id;
+      if (gst_event_parse_group_id (event, &group_id)) {
+        if (sq->sink_stream_gid == GST_GROUP_ID_INVALID) {
+          sq->sink_stream_gid = group_id;
+        } else if (group_id != sq->sink_stream_gid) {
+          sq->sink_stream_gid = group_id;
+          sq->sink_stream_gid_changed = TRUE;
+        }
+      }
       if (mq->sync_by_running_time) {
         GstStreamFlags stream_flags;
         gst_event_parse_stream_flags (event, &stream_flags);
         if ((stream_flags & GST_STREAM_FLAG_SPARSE)) {
-          GST_INFO_OBJECT (mq, "SingleQueue %d is a sparse stream", sq->id);
+          GST_INFO_ID (sq->debug_id, "Stream is sparse");
           sq->is_sparse = TRUE;
         }
       }
@@ -2593,8 +2691,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       break;
     }
     case GST_EVENT_FLUSH_START:
-      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
-          sq->id);
+      GST_DEBUG_ID (sq->debug_id, "Received flush start event");
 
       res = gst_pad_push_event (srcpad, event);
 
@@ -2603,8 +2700,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       goto done;
 
     case GST_EVENT_FLUSH_STOP:
-      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
-          sq->id);
+      GST_DEBUG_ID (sq->debug_id, "Received flush stop event");
 
       res = gst_pad_push_event (srcpad, event);
 
@@ -2655,9 +2751,9 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
   item = gst_multi_queue_mo_item_new ((GstMiniObject *) event, curid);
 
-  GST_DEBUG_OBJECT (mq,
-      "SingleQueue %d : Enqueuing event %p of type %s with id %d",
-      sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
+  GST_DEBUG_ID (sq->debug_id,
+      "Enqueuing event %p of type %s with id %d",
+      event, GST_EVENT_TYPE_NAME (event), curid);
 
   if (!gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))
     goto flushing;
@@ -2718,15 +2814,14 @@ done:
   gst_object_unref (srcpad);
   if (res == FALSE)
     flowret = GST_FLOW_ERROR;
-  GST_DEBUG_OBJECT (mq, "SingleQueue %d : returning %s", sq->id,
-      gst_flow_get_name (flowret));
+  GST_DEBUG_ID (sq->debug_id, "Returning %s", gst_flow_get_name (flowret));
   return flowret;
 
 flushing:
   {
     gst_object_unref (srcpad);
-    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
-        sq->id, gst_flow_get_name (sq->srcresult));
+    GST_LOG_ID (sq->debug_id, "Exit because task paused, reason: %s",
+        gst_flow_get_name (sq->srcresult));
     if (sref)
       gst_event_unref (sref);
     gst_multi_queue_item_destroy (item);
@@ -2774,9 +2869,9 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
 
           item = gst_multi_queue_mo_item_new ((GstMiniObject *) query, curid);
 
-          GST_DEBUG_OBJECT (mq,
-              "SingleQueue %d : Enqueuing query %p of type %s with id %d",
-              sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
+          GST_DEBUG_ID (sq->debug_id,
+              "Enqueuing query %p of type %s with id %d",
+              query, GST_QUERY_TYPE_NAME (query), curid);
           GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
           res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
           GST_MULTI_QUEUE_MUTEX_LOCK (mq);
@@ -2827,12 +2922,12 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
   mq = g_weak_ref_get (&sq->mqueue);
 
   if (!mq) {
-    GST_ERROR_OBJECT (pad, "No multique set anymore, can't activate pad");
+    GST_ERROR_OBJECT (pad, "No multiqueue set anymore, can't activate pad");
 
     return FALSE;
   }
 
-  GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
+  GST_DEBUG_ID (sq->debug_id, "active: %d", active);
 
   switch (mode) {
     case GST_PAD_MODE_PUSH:
@@ -2934,7 +3029,7 @@ wake_up_next_non_linked (GstMultiQueue * mq)
         if (GST_CLOCK_STIME_IS_VALID (sq->next_time) &&
             GST_CLOCK_STIME_IS_VALID (high_time)
             && sq->next_time <= high_time) {
-          GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
+          GST_LOG_ID (sq->debug_id, "Waking up singlequeue");
           g_cond_signal (&sq->turn);
         }
       }
@@ -2945,7 +3040,7 @@ wake_up_next_non_linked (GstMultiQueue * mq)
       GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
       if (sq->srcresult == GST_FLOW_NOT_LINKED &&
           sq->nextid != 0 && sq->nextid <= mq->highid) {
-        GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
+        GST_LOG_ID (sq->debug_id, "Waking up singlequeue");
         g_cond_signal (&sq->turn);
       }
     }
@@ -2973,12 +3068,12 @@ compute_high_id (GstMultiQueue * mq)
       continue;
     }
 
-    GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
-        sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
+    GST_LOG_ID (sq->debug_id, "nextid:%d, oldid:%d, srcresult:%s",
+        sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
 
     /* No need to consider queues which are not waiting */
     if (sq->nextid == 0) {
-      GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
+      GST_LOG_ID (sq->debug_id, "not waiting - ignoring");
       gst_object_unref (srcpad);
       continue;
     }
@@ -3036,9 +3131,9 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
       continue;
     }
 
-    GST_LOG_OBJECT (mq,
-        "inspecting sq:%d (group:%d) , next_time:%" GST_STIME_FORMAT
-        ", last_time:%" GST_STIME_FORMAT ", srcresult:%s", sq->id, sq->groupid,
+    GST_LOG_ID (sq->debug_id,
+        "inspecting (group:%d) , next_time:%" GST_STIME_FORMAT
+        ", last_time:%" GST_STIME_FORMAT ", srcresult:%s", sq->groupid,
         GST_STIME_ARGS (sq->next_time), GST_STIME_ARGS (sq->last_time),
         gst_flow_get_name (sq->srcresult));
 
@@ -3048,7 +3143,7 @@ compute_high_time (GstMultiQueue * mq, guint groupid)
     if (sq->srcresult == GST_FLOW_NOT_LINKED) {
       /* No need to consider queues which are not waiting */
       if (!GST_CLOCK_STIME_IS_VALID (sq->next_time)) {
-        GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
+        GST_LOG_ID (sq->debug_id, "Not waiting - ignoring");
         gst_object_unref (srcpad);
         continue;
       }
@@ -3131,9 +3226,9 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
 
   gst_data_queue_get_level (sq->queue, &size);
 
-  GST_LOG_OBJECT (mq,
-      "Single Queue %d: EOS %d, visible %u/%u, bytes %u/%u, time %"
-      G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, sq->is_eos, size.visible,
+  GST_LOG_ID (sq->debug_id,
+      "EOS %d, visible %u/%u, bytes %u/%u, time %"
+      G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->is_eos, size.visible,
       sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time,
       sq->max_size.time);
 
@@ -3154,13 +3249,13 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
       continue;
 
     if (oq->srcresult == GST_FLOW_NOT_LINKED) {
-      GST_LOG_OBJECT (mq, "Queue %d is not-linked", oq->id);
+      GST_LOG_ID (sq->debug_id, "Queue is not-linked");
       continue;
     }
 
-    GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
+    GST_LOG_ID (oq->debug_id, "Checking queue");
     if (gst_data_queue_is_empty (oq->queue) && !oq->is_sparse) {
-      GST_LOG_OBJECT (mq, "Queue %d is empty", oq->id);
+      GST_LOG_ID (oq->debug_id, "Queue is empty");
       empty_found = TRUE;
       break;
     }
@@ -3171,9 +3266,8 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
   if (empty_found) {
     if (IS_FILLED (sq, visible, size.visible)) {
       sq->max_size.visible = size.visible + 1;
-      GST_DEBUG_OBJECT (mq,
-          "Bumping single queue %d max visible to %d",
-          sq->id, sq->max_size.visible);
+      GST_DEBUG_ID (sq->debug_id,
+          "Bumping max visible to %d", sq->max_size.visible);
       filled = FALSE;
     }
   }
@@ -3184,7 +3278,7 @@ done:
 
   /* Overrun is always forwarded, since this is blocking the upstream element */
   if (filled) {
-    GST_DEBUG_OBJECT (mq, "Queue %d is filled, signalling overrun", sq->id);
+    GST_DEBUG_ID (sq->debug_id, "Queue is filled, signalling overrun");
     g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
   }
 }
@@ -3203,12 +3297,12 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
   }
 
   if (sq->srcresult == GST_FLOW_NOT_LINKED) {
-    GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id);
+    GST_LOG_ID (sq->debug_id, "Single Queue is empty but not-linked");
     gst_object_unref (mq);
     return;
   } else {
-    GST_LOG_OBJECT (mq,
-        "Single Queue %d is empty, Checking other single queues", sq->id);
+    GST_LOG_ID (sq->debug_id,
+        "Single Queue is empty, Checking other single queues");
   }
 
   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
@@ -3221,8 +3315,8 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
       gst_data_queue_get_level (oq->queue, &size);
       if (IS_FILLED (oq, visible, size.visible)) {
         oq->max_size.visible = size.visible + 1;
-        GST_DEBUG_OBJECT (mq,
-            "queue %d is filled, bumping its max visible to %d", oq->id,
+        GST_DEBUG_ID (oq->debug_id,
+            "queue is filled, bumping its max visible to %d",
             oq->max_size.visible);
         gst_data_queue_limits_changed (oq->queue);
       }
@@ -3252,9 +3346,9 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
     return TRUE;
   }
 
-  GST_DEBUG_OBJECT (mq,
-      "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
-      G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
+  GST_DEBUG_ID (sq->debug_id,
+      "visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
+      G_GUINT64_FORMAT, visible, sq->max_size.visible, bytes,
       sq->max_size.bytes, sq->cur_time, sq->max_size.time);
 
   /* we are always filled on EOS */
@@ -3351,6 +3445,9 @@ gst_single_queue_unref (GstSingleQueue * sq)
     g_weak_ref_clear (&sq->sinkpad);
     g_weak_ref_clear (&sq->srcpad);
     g_weak_ref_clear (&sq->mqueue);
+#ifndef GST_DISABLE_GST_DEBUG
+    g_free (sq->debug_id);
+#endif
     g_free (sq);
   }
 }
@@ -3399,6 +3496,10 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
 
   mqueue->nbqueues++;
   sq->id = temp_id;
+#ifndef GST_DISABLE_GST_DEBUG
+  sq->debug_id =
+      g_strdup_printf ("%s:queue_%d", GST_OBJECT_NAME (mqueue), temp_id);
+#endif
   sq->groupid = DEFAULT_PAD_GROUP_ID;
   sq->group_high_time = GST_CLOCK_STIME_NONE;
 
@@ -3443,6 +3544,10 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
   sq->sink_tainted = TRUE;
   sq->src_tainted = TRUE;
 
+  sq->sink_stream_gid = sq->src_stream_gid = GST_GROUP_ID_INVALID;
+  sq->sink_stream_gid_changed = FALSE;
+  sq->src_stream_gid_changed = FALSE;
+
   name = g_strdup_printf ("sink_%u", sq->id);
   templ = gst_static_pad_template_get (&sinktemplate);
   sinkpad = g_object_new (GST_TYPE_MULTIQUEUE_PAD, "name", name,
@@ -3502,8 +3607,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
   }
   g_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
 
-  GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
-      sq->id);
+  GST_DEBUG_ID (sq->debug_id, "GstSingleQueue created and pads added");
 
   return sq;
 }