libs/gst/base/gstcollectpads.*: Handle flush. Adapted from
authorJulien Moutte <julien@moutte.net>
Sun, 5 Feb 2006 16:18:37 +0000 (16:18 +0000)
committerJulien Moutte <julien@moutte.net>
Sun, 5 Feb 2006 16:18:37 +0000 (16:18 +0000)
Original commit message from CVS:
2006-02-05  Julien MOUTTE  <julien@moutte.net>

* libs/gst/base/gstcollectpads.c: (gst_collect_pads_add_pad),
(gst_collect_pads_start), (gst_collect_pads_stop),
(gst_collect_pads_event), (gst_collect_pads_chain):
* libs/gst/base/gstcollectpads.h: Handle flush. Adapted from
Mark Nauwelaerts's patch on bug #328491.

ChangeLog
libs/gst/base/gstcollectpads.c
libs/gst/base/gstcollectpads.h

index 7e87153..f460b17 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2006-02-05  Julien MOUTTE  <julien@moutte.net>
+
+       * libs/gst/base/gstcollectpads.c: (gst_collect_pads_add_pad),
+       (gst_collect_pads_start), (gst_collect_pads_stop),
+       (gst_collect_pads_event), (gst_collect_pads_chain):
+       * libs/gst/base/gstcollectpads.h: Handle flush. Adapted from
+       Mark Nauwelaerts's patch on bug #328491.
+
 2006-02-04  Tim-Philipp Müller  <tim at centricular dot net>
 
        * tests/check/gst/gstutils.c: (test_parse_bin_from_description),
index 48816e4..2be453c 100644 (file)
@@ -197,6 +197,8 @@ gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
   data->pad = pad;
   data->buffer = NULL;
   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
+  data->abidata.ABI.flushing = FALSE;
+  data->abidata.ABI.new_segment = FALSE;
 
   GST_OBJECT_LOCK (pads);
   pads->data = g_slist_append (pads->data, data);
@@ -338,10 +340,28 @@ gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
 void
 gst_collect_pads_start (GstCollectPads * pads)
 {
+  GSList *walk = NULL;
+
   g_return_if_fail (pads != NULL);
   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
+  GST_DEBUG_OBJECT (pads, "starting collect pads");
+
   GST_OBJECT_LOCK (pads);
+  /* Set our pads as non flushing */
+  walk = pads->data;
+  while (walk) {
+    GstCollectData *cdata = walk->data;
+
+    if (GST_IS_PAD (cdata->pad)) {
+      GST_OBJECT_LOCK (cdata->pad);
+      GST_PAD_UNSET_FLUSHING (cdata->pad);
+      GST_OBJECT_UNLOCK (cdata->pad);
+    }
+
+    walk = g_slist_next (walk);
+  }
+  /* Start collect pads */
   pads->started = TRUE;
   GST_OBJECT_UNLOCK (pads);
 }
@@ -358,11 +378,30 @@ gst_collect_pads_start (GstCollectPads * pads)
 void
 gst_collect_pads_stop (GstCollectPads * pads)
 {
+  GSList *walk = NULL;
+
   g_return_if_fail (pads != NULL);
   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
+  GST_DEBUG_OBJECT (pads, "stopping collect pads");
+
   GST_OBJECT_LOCK (pads);
+  /* Set our pads as flushing */
+  walk = pads->data;
+  while (walk) {
+    GstCollectData *cdata = walk->data;
+
+    if (GST_IS_PAD (cdata->pad)) {
+      GST_OBJECT_LOCK (cdata->pad);
+      GST_PAD_SET_FLUSHING (cdata->pad);
+      GST_OBJECT_UNLOCK (cdata->pad);
+    }
+
+    walk = g_slist_next (walk);
+  }
+  /* Stop collect pads */
   pads->started = FALSE;
+  /* Wake them up */
   GST_COLLECT_PADS_BROADCAST (pads);
   GST_OBJECT_UNLOCK (pads);
 }
@@ -624,6 +663,30 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
       GST_DEBUG_PAD_NAME (data->pad));
 
   switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_START:
+    {
+      /* forward event to unblock is_collected */
+      gst_pad_event_default (pad, event);
+
+      /* now unblock the chain function
+         no cond per pad, so they all unblock, non-flushing block again */
+      GST_OBJECT_LOCK (pads);
+      data->abidata.ABI.flushing = TRUE;
+      GST_COLLECT_PADS_BROADCAST (pads);
+      GST_OBJECT_UNLOCK (pads);
+
+      /* event already cleaned up by forwarding */
+      return TRUE;
+    }
+    case GST_EVENT_FLUSH_STOP:
+    {
+      /* flush the 1 buffer queue */
+      GST_OBJECT_LOCK (pads);
+      data->abidata.ABI.flushing = FALSE;
+      gst_collect_pads_pop (pads, data);
+      GST_OBJECT_UNLOCK (pads);
+      goto beach;
+    }
     case GST_EVENT_EOS:
     {
       GST_OBJECT_LOCK (pads);
@@ -648,9 +711,18 @@ gst_collect_pads_event (GstPad * pad, GstEvent * event)
       gst_event_parse_new_segment (event, &update, &rate, &format,
           &start, &stop, &time);
 
+      GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
+          ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
+          GST_TIME_ARGS (stop));
+
       gst_segment_set_newsegment (&data->segment, update, rate, format,
           start, stop, time);
-      goto beach;
+
+      data->abidata.ABI.new_segment = TRUE;
+
+      /* We eat this event */
+      gst_event_unref (event);
+      return TRUE;
     }
     default:
       goto beach;
@@ -695,6 +767,9 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
   /* if not started, bail out */
   if (!pads->started)
     goto not_started;
+  /* check if this pad is flushing */
+  if (data->abidata.ABI.flushing)
+    goto flushing;
 
   GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
       GST_DEBUG_PAD_NAME (pad));
@@ -702,6 +777,8 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
   /* One more pad has data queued */
   pads->queuedpads++;
   gst_buffer_replace (&data->buffer, buffer);
+  gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME,
+      GST_BUFFER_TIMESTAMP (buffer));
 
   /* Check if our collected condition is matched and call the collected function
      if it is */
@@ -716,6 +793,8 @@ gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
     /* after a signal,  we could be stopped */
     if (!pads->started)
       goto not_started;
+    if (data->abidata.ABI.flushing)
+      goto flushing;
   }
 
   GST_OBJECT_UNLOCK (pads);
@@ -734,4 +813,10 @@ not_started:
     GST_DEBUG ("collect_pads not started");
     return GST_FLOW_WRONG_STATE;
   }
+flushing:
+  {
+    GST_OBJECT_UNLOCK (pads);
+    GST_DEBUG ("collect_pads %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
+    return GST_FLOW_WRONG_STATE;
+  }
 }
index aa331cb..0498782 100644 (file)
@@ -56,7 +56,14 @@ struct _GstCollectData
   GstSegment             segment;
 
   /*< private >*/
-  gpointer               _gst_reserved[GST_PADDING];
+  union {
+    struct {
+      gboolean           flushing;
+      gboolean           new_segment;
+    } ABI;
+    /* adding + 0 to mark ABI change to be undone later */
+    gpointer _gst_reserved[GST_PADDING + 0];
+  } abidata;
 };
 
 /**