libs/gst/base/gstcollectpads.c: Refactoring of collectpads. This version removes...
authorJulien Moutte <julien@moutte.net>
Wed, 14 Dec 2005 17:08:36 +0000 (17:08 +0000)
committerJulien Moutte <julien@moutte.net>
Wed, 14 Dec 2005 17:08:36 +0000 (17:08 +0000)
Original commit message from CVS:
2005-12-14  Julien MOUTTE  <julien@moutte.net>

* libs/gst/base/gstcollectpads.c: (gst_collect_pads_base_init),
(gst_collect_pads_remove_pad), (gst_collect_pads_is_collected),
(gst_collect_pads_event), (gst_collect_pads_chain): Refactoring
of collectpads. This version removes a lot of races without
touching API/ABI. Yay !

ChangeLog
libs/gst/base/gstcollectpads.c

index ed943d6..fbab0bc 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2005-12-14  Julien MOUTTE  <julien@moutte.net>
+
+       * libs/gst/base/gstcollectpads.c: (gst_collect_pads_base_init),
+       (gst_collect_pads_remove_pad), (gst_collect_pads_is_collected),
+       (gst_collect_pads_event), (gst_collect_pads_chain): Refactoring
+       of collectpads. This version removes a lot of races without
+       touching API/ABI. Yay !
+
 2005-12-14  Jan Schmidt  <thaytan@mad.scientist.com>
 
        * gst/gstpad.c: (gst_pad_activate_pull), (gst_pad_link_prepare):
index affa92d..e7ce98f 100644 (file)
@@ -246,6 +246,8 @@ gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
     pads->data = g_slist_delete_link (pads->data, list);
   }
   pads->numpads--;
+  /* FIXME : if the pad has data queued we should decrease the number of
+     queuedpads */
   pads->cookie++;
   GST_OBJECT_UNLOCK (pads);
 
@@ -562,6 +564,46 @@ gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
 }
 
 static gboolean
+gst_collect_pads_is_collected (GstCollectPads * pads, GstFlowReturn * ret)
+{
+  GstFlowReturn flow_ret = GST_FLOW_OK;
+  gboolean res = FALSE;
+
+  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
+
+  /* If all our pads are EOS just collect once */
+  if (pads->eospads == pads->numpads) {
+    GST_DEBUG ("All active pads are EOS, calling %s",
+        GST_DEBUG_FUNCPTR_NAME (pads->func));
+    flow_ret = pads->func (pads, pads->user_data);
+    res = TRUE;
+    goto beach;
+  }
+
+  /* We call the collected function as long as our condition matches.
+     FIXME: should we error out if the collect function did not pop anything ?
+     we can get a busy loop here if the element does not pop from the collect
+     function */
+  while (((pads->queuedpads + pads->eospads) >= pads->numpads) && pads->func) {
+    GST_DEBUG ("All active pads have data, calling %s",
+        GST_DEBUG_FUNCPTR_NAME (pads->func));
+    flow_ret = pads->func (pads, pads->user_data);
+    res = TRUE;
+  }
+
+beach:
+  if (!res) {
+    GST_DEBUG ("Not all active pads have data, continuing");
+  }
+
+  if (ret) {
+    *ret = flow_ret;
+  }
+
+  return res;
+}
+
+static gboolean
 gst_collect_pads_event (GstPad * pad, GstEvent * event)
 {
   GstCollectData *data;
@@ -580,23 +622,17 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_EOS:
     {
-      GstFlowReturn ret = GST_FLOW_OK;
-
       GST_OBJECT_LOCK (pads);
 
       pads->eospads++;
 
-      /* if all pads are EOS and we have a function, call it */
-      if ((pads->eospads == pads->numpads) && pads->func) {
-        ret = pads->func (pads, pads->user_data);
-      }
+      gst_collect_pads_is_collected (pads, NULL);
 
       GST_OBJECT_UNLOCK (pads);
 
       /* We eat this event */
       gst_event_unref (event);
       return TRUE;
-      break;
     }
     case GST_EVENT_NEWSEGMENT:
     {
@@ -627,7 +663,11 @@ not_ours:
   }
 }
 
-
+/* For each buffer we receive we check if our collected condition is reached
+   and if so we call the collected function. When this is done we check if
+   data has been unqueued. If data is still queued we wait holding the stream
+   lock to make sure no EOS event can happen while we are ready to be
+   collected */
 static GstFlowReturn
 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
 {
@@ -652,13 +692,20 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
   if (!pads->started)
     goto not_started;
 
-  /* Call the collected callback until a pad with a buffer is popped. */
-  while (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func)
-    ret = pads->func (pads, pads->user_data);
+  GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
+      GST_DEBUG_PAD_NAME (pad));
+
+  /* One more pad has data queued */
+  pads->queuedpads++;
+  gst_buffer_replace (&data->buffer, buffer);
 
-  /* queue buffer on this pad, block if filled */
+  /* Check if our collected condition is matched and call the collected function
+     if it is */
+  gst_collect_pads_is_collected (pads, &ret);
+
+  /* We still have data queued on this pad, wait for something to happen */
   while (data->buffer != NULL) {
-    GST_DEBUG ("Pad %s:%s already has a buffer queued, waiting",
+    GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
         GST_DEBUG_PAD_NAME (pad));
     GST_COLLECT_PADS_WAIT (pads);
     GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
@@ -667,21 +714,6 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
       goto not_started;
   }
 
-  GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
-      GST_DEBUG_PAD_NAME (pad));
-
-  pads->queuedpads++;
-  gst_buffer_replace (&data->buffer, buffer);
-
-  /* if all pads have data and we have a function, call it */
-  if (((pads->queuedpads + pads->eospads) == pads->numpads) && pads->func) {
-    GST_DEBUG ("All active pads have data, calling %s",
-        GST_DEBUG_FUNCPTR_NAME (pads->func));
-    ret = pads->func (pads, pads->user_data);
-  } else {
-    GST_DEBUG ("Not all active pads have data, continuing");
-    ret = GST_FLOW_OK;
-  }
   GST_OBJECT_UNLOCK (pads);
 
   return ret;