decodebin2: refactoring and race fixes
authorWim Taymans <wim.taymans@collabora.co.uk>
Thu, 19 Mar 2009 18:19:38 +0000 (19:19 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Thu, 19 Mar 2009 18:19:38 +0000 (19:19 +0100)
Refactor some code so that we can take the right locks and in the right order.
Fixes quite a bit of races already.

gst/playback/gstdecodebin2.c

index fcd7ab5..1706409 100644 (file)
@@ -284,7 +284,7 @@ static GstPad *gst_decode_group_control_demuxer_pad (GstDecodeGroup * group,
 static gboolean gst_decode_group_control_source_pad (GstDecodeGroup * group,
     GstDecodePad * pad);
 static gboolean gst_decode_group_expose (GstDecodeGroup * group);
-static void gst_decode_group_check_if_blocked (GstDecodeGroup * group);
+static gboolean gst_decode_group_check_if_blocked (GstDecodeGroup * group);
 static void gst_decode_group_set_complete (GstDecodeGroup * group);
 static void gst_decode_group_hide (GstDecodeGroup * group);
 static void gst_decode_group_free (GstDecodeGroup * group);
@@ -1478,27 +1478,32 @@ pad_added_group_cb (GstElement * element, GstPad * pad, GstDecodeGroup * group)
 {
   GstCaps *caps;
   gboolean expose = FALSE;
+  GstDecodeBin *dbin;
+
+  dbin = group->dbin;
 
   GST_DEBUG_OBJECT (pad, "pad added, group:%p", group);
 
   caps = gst_pad_get_caps (pad);
-  analyze_new_pad (group->dbin, element, pad, caps, group);
+  analyze_new_pad (dbin, element, pad, caps, group);
   if (caps)
     gst_caps_unref (caps);
 
   GROUP_MUTEX_LOCK (group);
-  group->nbdynamic--;
+  if (group->nbdynamic > 0)
+    group->nbdynamic--;
   GST_LOG ("Group %p has now %d dynamic objects", group, group->nbdynamic);
   if (group->nbdynamic == 0)
     expose = TRUE;
   GROUP_MUTEX_UNLOCK (group);
 
   if (expose) {
-    GST_LOG
-        ("That was the last dynamic object, now attempting to expose the group");
-    DECODE_BIN_LOCK (group->dbin);
-    gst_decode_group_expose (group);
-    DECODE_BIN_UNLOCK (group->dbin);
+    GST_LOG_OBJECT (dbin,
+        "That was the last dynamic object, now attempting to expose the group");
+    DECODE_BIN_LOCK (dbin);
+    if (!gst_decode_group_expose (group))
+      GST_WARNING_OBJECT (dbin, "Couldn't expose group");
+    DECODE_BIN_UNLOCK (dbin);
   }
 }
 
@@ -1517,7 +1522,7 @@ no_more_pads_group_cb (GstElement * element, GstDecodeGroup * group)
 {
   GST_LOG_OBJECT (element, "no more pads, setting group %p to complete", group);
 
-  /* FIXME : FILLME */
+  /* when we received no_more_pads, we can complete the pads of the group */
   gst_decode_group_set_complete (group);
 }
 
@@ -1681,19 +1686,42 @@ are_raw_caps (GstDecodeBin * dbin, GstCaps * caps)
  * GstDecodeGroup functions
  ****/
 
+/* The overrun callback is used to expose groups that have not yet had their
+ * no_more_pads called while the (large) multiqueue overflowed. When this
+ * happens we must assume that the no_more_pads will not arrive anymore and we
+ * must expose the pads that we have. 
+ */
 static void
 multi_queue_overrun_cb (GstElement * queue, GstDecodeGroup * group)
 {
-  GST_LOG_OBJECT (group->dbin, "multiqueue is full");
+  GstDecodeBin *dbin;
+  gboolean expose;
 
-  /* if we haven't exposed the group, do it */
-  DECODE_BIN_LOCK (group->dbin);
-  /* decrease the number of dynamic elements, we don't expect anything anymore
-   * and we need the groups to be 0 for the expose to work */
-  if (group->nbdynamic > 0)
-    group->nbdynamic--;
-  gst_decode_group_expose (group);
-  DECODE_BIN_UNLOCK (group->dbin);
+  dbin = group->dbin;
+
+  GST_LOG_OBJECT (dbin, "multiqueue %p is full", queue);
+
+  GROUP_MUTEX_LOCK (group);
+  if (group->complete) {
+    /* the group was already complete (had the no_more_pads called), we
+     * can ignore the overrun signal, the last remaining dynamic element
+     * will expose the group eventually. */
+    GST_LOG_OBJECT (dbin, "group %p was already complete", group);
+    expose = FALSE;
+  } else {
+    /* set number of dynamic element to 0, we don't expect anything anymore
+     * and we need the groups to be 0 for the expose to work */
+    group->nbdynamic = 0;
+    expose = TRUE;
+  }
+  GROUP_MUTEX_UNLOCK (group);
+
+  if (expose) {
+    DECODE_BIN_LOCK (dbin);
+    if (!gst_decode_group_expose (group))
+      GST_WARNING_OBJECT (dbin, "Couldn't expose group");
+    DECODE_BIN_UNLOCK (group->dbin);
+  }
 }
 
 /* gst_decode_group_new:
@@ -1875,8 +1903,10 @@ gst_decode_group_control_source_pad (GstDecodeGroup * group,
  * and will ghost/expose all pads on decodebin if the group is the current one.
  *
  * Call with the group lock taken ! MT safe
+ *
+ * Returns: TRUE when the group is completely blocked and ready to be exposed.
  */
-static void
+static gboolean
 gst_decode_group_check_if_blocked (GstDecodeGroup * group)
 {
   GList *tmp;
@@ -1885,10 +1915,10 @@ gst_decode_group_check_if_blocked (GstDecodeGroup * group)
   GST_LOG ("group : %p , ->complete:%d , ->nbdynamic:%d",
       group, group->complete, group->nbdynamic);
 
-  /* 1. don't do anything if group is not complete */
+  /* don't do anything if group is not complete */
   if (!group->complete || group->nbdynamic) {
     GST_DEBUG_OBJECT (group->dbin, "Group isn't complete yet");
-    return;
+    return FALSE;
   }
 
   for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) {
@@ -1900,77 +1930,100 @@ gst_decode_group_check_if_blocked (GstDecodeGroup * group)
     }
   }
 
-  /* 2. Update status of group */
+  /* Update status of group */
   group->blocked = blocked;
   GST_LOG ("group is blocked:%d", blocked);
 
-  /* 3. don't do anything if not blocked completely */
-  if (!blocked)
-    return;
+  return blocked;
+}
+
+/* activate the next group when there is one
+ *
+ * Returns: TRUE when group was the active group and there was a
+ * next group to activate.
+ */
+static gboolean
+gst_decode_bin_activate_next_group (GstDecodeBin * dbin, GstDecodeGroup * group)
+{
+  gboolean have_next = FALSE;
+
+  DECODE_BIN_LOCK (dbin);
+  /* Check if there is a next group to activate */
+  if ((group == dbin->activegroup) && dbin->groups) {
+    GstDecodeGroup *newgroup;
+
+    /* get the next group */
+    newgroup = (GstDecodeGroup *) dbin->groups->data;
+
+    GST_DEBUG_OBJECT (dbin, "Switching to new group");
 
-  /* 4. if we're the current group, expose pads */
-  DECODE_BIN_LOCK (group->dbin);
-  if (!gst_decode_group_expose (group))
-    GST_WARNING_OBJECT (group->dbin, "Couldn't expose group");
-  DECODE_BIN_UNLOCK (group->dbin);
+    /* hide current group */
+    gst_decode_group_hide (group);
+    /* expose next group */
+    gst_decode_group_expose (newgroup);
+
+    /* we have a next group */
+    have_next = TRUE;
+  }
+  DECODE_BIN_UNLOCK (dbin);
+
+  return have_next;
 }
 
+/* check if the group is drained, meaning all pads have seen an EOS 
+ * event.  */
 static void
-gst_decode_group_check_if_drained (GstDecodeGroup * group)
+gst_decode_pad_handle_eos (GstDecodePad * pad)
 {
   GList *tmp;
-  GstDecodeBin *dbin = group->dbin;
+  GstDecodeBin *dbin;
+  GstDecodeGroup *group;
   gboolean drained = TRUE;
 
-  GST_LOG ("group : %p", group);
+  group = pad->group;
+  dbin = group->dbin;
 
-  DECODE_BIN_LOCK (dbin);
+  GST_LOG_OBJECT (dbin, "group : %p, pad %p", group, pad);
+
+  GROUP_MUTEX_LOCK (group);
+  /* mark pad as drained */
+  pad->drained = TRUE;
 
   /* Ensure we only emit the drained signal once, for this group */
-  if (group->drained) {
-    drained = FALSE;
-    goto done;
-  }
+  if (group->drained)
+    goto was_drained;
 
   for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) {
     GstDecodePad *dpad = (GstDecodePad *) tmp->data;
 
-    GST_LOG ("testing dpad %p", dpad);
+    GST_LOG_OBJECT (dbin, "testing dpad %p", dpad);
 
     if (!dpad->drained) {
       drained = FALSE;
       break;
     }
   }
-
   group->drained = drained;
-  if (!drained)
-    goto done;
-
-  /* we are drained. Check if there is a next group to activate */
-  if ((group == dbin->activegroup) && dbin->groups) {
-    GstDecodeGroup *newgroup;
-
-    newgroup = (GstDecodeGroup *) dbin->groups->data;
-
-    GST_DEBUG_OBJECT (dbin, "Switching to new group");
+  GROUP_MUTEX_UNLOCK (group);
 
-    /* hide current group */
-    gst_decode_group_hide (group);
-    /* expose next group */
-    gst_decode_group_expose (newgroup);
-    /* we're not yet drained now */
-    drained = FALSE;
+  if (drained) {
+    /* the current group is completely drained, try to activate the next
+     * group. this function returns FALSE if there was no next group activated
+     * and so we are really drained. */
+    if (!gst_decode_bin_activate_next_group (dbin, group)) {
+      /* no more groups to activate, we're completely drained now */
+      GST_LOG_OBJECT (dbin, "all groups drained, fire signal");
+      g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_DRAINED], 0,
+          NULL);
+    }
   }
+  return;
 
-done:
-  DECODE_BIN_UNLOCK (dbin);
-
-  if (drained) {
-    /* no more groups to activate, we're completely drained now */
-    GST_LOG ("all groups drained, fire signal");
-    g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_DRAINED], 0,
-        NULL);
+was_drained:
+  {
+    GST_LOG_OBJECT (dbin, "group was already drained");
+    GROUP_MUTEX_UNLOCK (group);
+    return;
   }
 }
 
@@ -2161,6 +2214,7 @@ name_problem:
   }
 }
 
+/* must be called with the decodebin lock */
 static void
 gst_decode_group_hide (GstDecodeGroup * group)
 {
@@ -2174,7 +2228,6 @@ gst_decode_group_hide (GstDecodeGroup * group)
   }
 
   GROUP_MUTEX_LOCK (group);
-
   /* Remove ghost pads */
   for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) {
     GstDecodePad *dpad = (GstDecodePad *) tmp->data;
@@ -2183,9 +2236,7 @@ gst_decode_group_hide (GstDecodeGroup * group)
       gst_element_remove_pad (GST_ELEMENT (group->dbin), GST_PAD (dpad));
     dpad->added = FALSE;
   }
-
   group->exposed = FALSE;
-
   GROUP_MUTEX_UNLOCK (group);
 
   group->dbin->activegroup = NULL;
@@ -2311,21 +2362,37 @@ gst_decode_group_free (GstDecodeGroup * group)
 /* gst_decode_group_set_complete:
  *
  * Mark the group as complete. This means no more streams will be controlled
- * through this group.
+ * through this group. This method is usually called when we got no_more_pads or
+ * when we added the last pad not from a demuxer.
+ *
+ * When this method is called, it is possible that some dynamic plugging is
+ * going on in streaming threads. We decrement the dynamic counter and when it
+ * reaches zero, we check if all of our pads are blocked before we finally
+ * expose the group.
  *
  * MT safe
  */
 static void
 gst_decode_group_set_complete (GstDecodeGroup * group)
 {
+  gboolean expose = FALSE;
+
   GST_LOG_OBJECT (group->dbin, "Setting group %p to COMPLETE", group);
 
   GROUP_MUTEX_LOCK (group);
   group->complete = TRUE;
   if (group->nbdynamic > 0)
     group->nbdynamic--;
-  gst_decode_group_check_if_blocked (group);
+  expose = gst_decode_group_check_if_blocked (group);
   GROUP_MUTEX_UNLOCK (group);
+
+  /* don't do anything if not blocked completely */
+  if (expose) {
+    DECODE_BIN_LOCK (group->dbin);
+    if (!gst_decode_group_expose (group))
+      GST_WARNING_OBJECT (group->dbin, "Couldn't expose group");
+    DECODE_BIN_UNLOCK (group->dbin);
+  }
 }
 
 /*************************
@@ -2350,15 +2417,27 @@ gst_decode_pad_init (GstDecodePad * pad)
 static void
 source_pad_blocked_cb (GstDecodePad * dpad, gboolean blocked, gpointer unused)
 {
-  GST_LOG_OBJECT (dpad, "blocked:%d, dpad->group:%p", blocked, dpad->group);
+  GstDecodeGroup *group;
+  GstDecodeBin *dbin;
+  gboolean expose = FALSE;
+
+  group = dpad->group;
+  dbin = group->dbin;
 
+  GST_LOG_OBJECT (dpad, "blocked:%d, dpad->group:%p", blocked, group);
+
+  GROUP_MUTEX_LOCK (group);
   /* Update this GstDecodePad status */
   dpad->blocked = blocked;
+  if (blocked)
+    expose = gst_decode_group_check_if_blocked (group);
+  GROUP_MUTEX_UNLOCK (group);
 
-  if (blocked) {
-    GROUP_MUTEX_LOCK (dpad->group);
-    gst_decode_group_check_if_blocked (dpad->group);
-    GROUP_MUTEX_UNLOCK (dpad->group);
+  if (expose) {
+    DECODE_BIN_LOCK (dbin);
+    if (!gst_decode_group_expose (group))
+      GST_WARNING_OBJECT (dbin, "Couldn't expose group");
+    DECODE_BIN_UNLOCK (dbin);
   }
 }
 
@@ -2368,15 +2447,12 @@ source_pad_event_probe (GstPad * pad, GstEvent * event, GstDecodePad * dpad)
   GST_LOG_OBJECT (pad, "%s dpad:%p", GST_EVENT_TYPE_NAME (event), dpad);
 
   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
-    /* Set our pad as drained */
-    dpad->drained = TRUE;
-
     GST_DEBUG_OBJECT (pad, "we received EOS");
 
     /* Check if all pads are drained. If there is a next group to expose, we
      * will remove the ghostpad of the current group first, which unlinks the
      * peer and so drops the EOS. */
-    gst_decode_group_check_if_drained (dpad->group);
+    gst_decode_pad_handle_eos (dpad);
   }
   /* never drop events */
   return TRUE;