collectpads: implement flushing seek support
authorAlessandro Decina <alessandro.d@gmail.com>
Mon, 16 Sep 2013 07:55:58 +0000 (09:55 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 11 Nov 2013 15:50:42 +0000 (16:50 +0100)
Implement common flushing seek logic in GstCollectPads. Add new
API so that elements can opt-in to using the new logic
(gst_collect_pads_src_event_default) and can extend it
(gst_collect_pads_set_flush_function) to flush any internal
state.

See https://bugzilla.gnome.org/show_bug.cgi?id=706779 and
https://bugzilla.gnome.org/show_bug.cgi?id=706441 for the
background discussion.

API: gst_collect_pads_set_flush_function()
API: gst_collect_pads_src_event_default()

https://bugzilla.gnome.org/show_bug.cgi?id=708416

libs/gst/base/gstcollectpads.c
libs/gst/base/gstcollectpads.h

index 799835a..5951cca 100644 (file)
@@ -132,11 +132,17 @@ struct _GstCollectPadsPrivate
   gpointer query_user_data;
   GstCollectPadsClipFunction clip_func;
   gpointer clip_user_data;
+  GstCollectPadsFlushFunction flush_func;
+  gpointer flush_user_data;
 
   /* no other lock needed */
   GMutex evt_lock;              /* these make up sort of poor man's event signaling */
   GCond evt_cond;
   guint32 evt_cookie;
+
+  gboolean seeking;
+  gboolean pending_flush_start;
+  gboolean pending_flush_stop;
 };
 
 static void gst_collect_pads_clear (GstCollectPads * pads,
@@ -260,6 +266,10 @@ gst_collect_pads_init (GstCollectPads * pads)
   g_mutex_init (&pads->priv->evt_lock);
   g_cond_init (&pads->priv->evt_cond);
   pads->priv->evt_cookie = 0;
+
+  pads->priv->seeking = FALSE;
+  pads->priv->pending_flush_start = FALSE;
+  pads->priv->pending_flush_stop = FALSE;
 }
 
 static void
@@ -541,6 +551,17 @@ gst_collect_pads_set_clip_function (GstCollectPads * pads,
   pads->priv->clip_user_data = user_data;
 }
 
+void
+gst_collect_pads_set_flush_function (GstCollectPads * pads,
+    GstCollectPadsFlushFunction func, gpointer user_data)
+{
+  g_return_if_fail (pads != NULL);
+  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
+
+  pads->priv->flush_func = func;
+  pads->priv->flush_user_data = user_data;
+}
+
 /**
  * gst_collect_pads_add_pad:
  * @pads: the collectpads to use
@@ -1179,7 +1200,7 @@ gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
   /* Do something only on a change and if not locked */
   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
-          ! !waiting)) {
+          !!waiting)) {
     /* Set waiting state for this pad */
     if (waiting)
       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
@@ -1284,6 +1305,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads)
     GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
 
+    if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
+                TRUE, FALSE) == TRUE)) {
+      GST_INFO_OBJECT (pads, "finished seeking");
+    }
     do {
       flow_ret = func (pads, user_data);
     } while (flow_ret == GST_FLOW_OK);
@@ -1298,6 +1323,10 @@ gst_collect_pads_check_collected (GstCollectPads * pads)
           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
           GST_DEBUG_FUNCPTR_NAME (func));
 
+      if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
+                  TRUE, FALSE) == TRUE)) {
+        GST_INFO_OBJECT (pads, "finished seeking");
+      }
       flow_ret = func (pads, user_data);
       collected = TRUE;
 
@@ -1624,33 +1653,59 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
-      /* forward event to unblock check_collected */
-      GST_DEBUG_OBJECT (pad, "forwarding flush start");
-      res = gst_pad_event_default (pad, parent, event);
-      event = NULL;
-
-      /* now unblock the chain function.
-       * no cond per pad, so they all unblock,
-       * non-flushing block again */
-      GST_COLLECT_PADS_STREAM_LOCK (pads);
-      GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
-      gst_collect_pads_clear (pads, data);
-
-      /* cater for possible default muxing functionality */
-      if (buffer_func) {
-        /* restore to initial state */
-        gst_collect_pads_set_waiting (pads, data, TRUE);
-        /* if the current pad is affected, reset state, recalculate later */
-        if (pads->priv->earliest_data == data) {
-          unref_data (data);
-          pads->priv->earliest_data = NULL;
-          pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
+      if (g_atomic_int_get (&pads->priv->seeking)) {
+        /* drop all but the first FLUSH_STARTs when seeking */
+        if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_start,
+                TRUE, FALSE) == FALSE)
+          goto eat;
+
+        /* unblock collect pads */
+        gst_pad_event_default (pad, parent, event);
+        event = NULL;
+
+        GST_COLLECT_PADS_STREAM_LOCK (pads);
+        /* Start flushing. We never call gst_collect_pads_set_flushing (FALSE), we
+         * instead wait until each pad gets its FLUSH_STOP and let that reset the pad to
+         * non-flushing (which happens in gst_collect_pads_event_default).
+         */
+        gst_collect_pads_set_flushing (pads, TRUE);
+
+        if (pads->priv->flush_func)
+          pads->priv->flush_func (pads, pads->priv->flush_user_data);
+
+        g_atomic_int_set (&pads->priv->pending_flush_stop, TRUE);
+        GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+
+        goto eat;
+      } else {
+        /* forward event to unblock check_collected */
+        GST_DEBUG_OBJECT (pad, "forwarding flush start");
+        res = gst_pad_event_default (pad, parent, event);
+        event = NULL;
+
+        /* now unblock the chain function.
+         * no cond per pad, so they all unblock,
+         * non-flushing block again */
+        GST_COLLECT_PADS_STREAM_LOCK (pads);
+        GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
+        gst_collect_pads_clear (pads, data);
+
+        /* cater for possible default muxing functionality */
+        if (buffer_func) {
+          /* restore to initial state */
+          gst_collect_pads_set_waiting (pads, data, TRUE);
+          /* if the current pad is affected, reset state, recalculate later */
+          if (pads->priv->earliest_data == data) {
+            unref_data (data);
+            pads->priv->earliest_data = NULL;
+            pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
+          }
         }
-      }
 
-      GST_COLLECT_PADS_STREAM_UNLOCK (pads);
+        GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
-      goto eat;
+        goto eat;
+      }
     }
     case GST_EVENT_FLUSH_STOP:
     {
@@ -1673,7 +1728,15 @@ gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
       }
       GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
-      goto forward;
+      if (g_atomic_int_get (&pads->priv->seeking)) {
+        if (g_atomic_int_compare_and_exchange (&pads->priv->pending_flush_stop,
+                TRUE, FALSE))
+          goto forward;
+        else
+          goto eat;
+      } else {
+        goto forward;
+      }
     }
     case GST_EVENT_EOS:
     {
@@ -1774,6 +1837,73 @@ forward:
     return gst_pad_event_default (pad, parent, event);
 }
 
+typedef struct
+{
+  GstEvent *event;
+  gboolean result;
+} EventData;
+
+static gboolean
+event_forward_func (GstPad * pad, EventData * data)
+{
+  data->result &= gst_pad_push_event (pad, gst_event_ref (data->event));
+  return !data->result;
+}
+
+static gboolean
+forward_event_to_all_sinkpads (GstPad * srcpad, GstEvent * event)
+{
+  EventData data;
+
+  data.event = event;
+  data.result = TRUE;
+
+  gst_pad_forward (srcpad, (GstPadForwardFunction) event_forward_func, &data);
+  return data.result;
+}
+
+gboolean
+gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
+    GstEvent * event)
+{
+  GstObject *parent;
+  gboolean res = TRUE;
+
+  parent = GST_OBJECT_PARENT (pad);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:{
+      GstSeekFlags flags;
+
+      GST_INFO_OBJECT (pads, "starting seek");
+
+      gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
+      if (flags & GST_SEEK_FLAG_FLUSH) {
+        g_atomic_int_set (&pads->priv->seeking, TRUE);
+        g_atomic_int_set (&pads->priv->pending_flush_start, TRUE);
+        /* forward the seek upstream */
+        res = forward_event_to_all_sinkpads (pad, event);
+        event = NULL;
+        if (!res) {
+          g_atomic_int_set (&pads->priv->seeking, FALSE);
+          g_atomic_int_set (&pads->priv->pending_flush_start, FALSE);
+        }
+      }
+
+      GST_INFO_OBJECT (pads, "seek done, result: %d", res);
+
+      break;
+    }
+    default:
+      break;
+  }
+
+  if (event)
+    res = gst_pad_event_default (pad, parent, event);
+
+  return res;
+}
+
 static gboolean
 gst_collect_pads_event_default_internal (GstCollectPads * pads,
     GstCollectData * data, GstEvent * event, gpointer user_data)
index 4c99d58..8d8128c 100644 (file)
@@ -236,6 +236,23 @@ typedef GstFlowReturn (*GstCollectPadsClipFunction) (GstCollectPads *pads, GstCo
                                                      GstBuffer *inbuffer, GstBuffer **outbuffer,
                                                      gpointer user_data);
 
+
+/**
+ * GstCollectPadsFlushFunction:
+ * @pads: a #GstCollectPads
+ * @user_data: user data
+ *
+ * A function that will be called while processing a flushing seek event.
+ *
+ * The function should flush any internal state of the element and the state of
+ * all the pads. It should clear only the state not directly managed by the
+ * @pads object. It is therefore not necessary to call
+ * gst_collect_pads_set_flushing nor gst_collect_pads_clear from this function.
+ *
+ * Since: FIXME
+ */
+typedef void (*GstCollectPadsFlushFunction) (GstCollectPads *pads, gpointer user_data);
+
 /**
  * GST_COLLECT_PADS_GET_STREAM_LOCK:
  * @pads: a #GstCollectPads
@@ -311,6 +328,9 @@ void            gst_collect_pads_set_compare_function (GstCollectPads *pads,
 void            gst_collect_pads_set_clip_function    (GstCollectPads *pads,
                                                        GstCollectPadsClipFunction clipfunc,
                                                        gpointer user_data);
+void            gst_collect_pads_set_flush_function    (GstCollectPads *pads,
+                                                       GstCollectPadsFlushFunction func,
+                                                       gpointer user_data);
 
 /* pad management */
 GstCollectData* gst_collect_pads_add_pad       (GstCollectPads *pads, GstPad *pad, guint size,
@@ -349,6 +369,8 @@ GstFlowReturn       gst_collect_pads_clip_running_time (GstCollectPads * pads,
 /* default handlers */
 gboolean        gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
                                                 GstEvent * event, gboolean discard);
+gboolean        gst_collect_pads_src_event_default (GstCollectPads * pads, GstPad * pad,
+                                                    GstEvent * event);
 gboolean        gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
                                                 GstQuery * query, gboolean discard);