pad: Rework pad blocking, another attempt
authorWim Taymans <wim.taymans@collabora.co.uk>
Mon, 30 May 2011 16:29:06 +0000 (18:29 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Mon, 30 May 2011 16:29:06 +0000 (18:29 +0200)
Make the PadBlock callback take a GstBlockType parameter to handle the different
kind of stages in the pad block. This provides for more backwards compatibility
in the pad block API.
Separate blocking and unblocking into different methods, only blocking can do a
callback, unblock is always immediately. Also removed synchronous blocking, it
can always be implemented with a callback.

gst/gstpad.c
gst/gstpad.h
tests/check/generic/sinks.c
tests/check/gst/gstevent.c
tests/check/gst/gstghostpad.c
tests/check/gst/gstpad.c

index 91ed541..5f540e8 100644 (file)
@@ -121,7 +121,7 @@ static void gst_pad_set_property (GObject * object, guint prop_id,
 static void gst_pad_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 
-static GstFlowReturn handle_pad_block (GstPad * pad);
+static GstFlowReturn handle_pad_block (GstPad * pad, GstBlockType type);
 static GstCaps *gst_pad_get_caps_unlocked (GstPad * pad, GstCaps * filter);
 static void gst_pad_set_pad_template (GstPad * pad, GstPadTemplate * templ);
 static gboolean gst_pad_activate_default (GstPad * pad);
@@ -1070,24 +1070,16 @@ gst_pad_is_active (GstPad * pad)
 }
 
 /**
- * gst_pad_set_blocked:
+ * gst_pad_block:
  * @pad: the #GstPad to block or unblock
- * @blocked: boolean indicating whether the pad should be blocked or unblocked
- * @callback: #GstPadBlockCallback that will be called when the
- *            operation succeeds
+ * @type: the different pad block states
+ * @callback: #GstPadBlockCallback that will be called with notifications of
+ *           the pad block state
  * @user_data: (closure): user data passed to the callback
  * @destroy_data: #GDestroyNotify for user_data
  *
- * Blocks or unblocks the dataflow on a pad. The provided callback
- * is called when the operation succeeds; this happens right before the next
- * attempt at pushing a buffer on the pad.
- *
- * This can take a while as the pad can only become blocked when real dataflow
- * is happening.
- * When the pipeline is stalled, for example in PAUSED, this can
- * take an indeterminate amount of time.
- * You can pass NULL as the callback to make this call block. Be careful with
- * this blocking call as it might not return for reasons stated above.
+ * Blocks the dataflow on a pad. The provided callback is called with
+ * notifications about the different stages in the pad block.
  *
  * <note>
  *  Pad block handlers are only called for source pads in push mode
@@ -1095,86 +1087,103 @@ gst_pad_is_active (GstPad * pad)
  * </note>
  *
  * Returns: TRUE if the pad could be blocked. This function can fail if the
- * wrong parameters were passed or the pad was already in the requested state.
+ * wrong parameters were passed or the pad was already blocked.
  *
  * MT safe.
  */
 gboolean
-gst_pad_set_blocked (GstPad * pad, gboolean blocked,
+gst_pad_block (GstPad * pad, GstBlockType type,
     GstPadBlockCallback callback, gpointer user_data,
     GDestroyNotify destroy_data)
 {
-  gboolean was_blocked = FALSE;
-
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+  g_return_val_if_fail (callback != NULL, FALSE);
 
   GST_OBJECT_LOCK (pad);
 
-  was_blocked = GST_PAD_IS_BLOCKED (pad);
+  if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
+    goto was_blocked;
 
-  if (G_UNLIKELY (was_blocked == blocked))
-    goto had_right_state;
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad");
 
-  if (blocked) {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad");
+  GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
 
-    GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
-
-    if (pad->block_destroy_data && pad->block_data)
-      pad->block_destroy_data (pad->block_data);
+  if (pad->block_destroy_data && pad->block_data)
+    pad->block_destroy_data (pad->block_data);
 
-    pad->block_callback = callback;
-    pad->block_data = user_data;
-    pad->block_destroy_data = destroy_data;
-    pad->block_callback_called = FALSE;
-    if (!callback) {
-      GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
-      GST_PAD_BLOCK_WAIT (pad);
-      GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
-    }
+  /* always store the block_cb */
+  pad->block_type = type;
+  pad->block_callback = callback;
+  pad->block_data = user_data;
+  pad->block_destroy_data = destroy_data;
+  pad->block_callback_called = FALSE;
+
+  if (pad->priv->using > 0) {
+    /* the pad is in use, we can't signal the idle callback yet. Since we set the
+     * flag above, the last thread to leave the push will do the callback. New
+     * threads going into the push will block. */
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use");
+    GST_OBJECT_UNLOCK (pad);
   } else {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad");
-
-    if (GST_PAD_IS_SRC (pad)) {
-      GstPad *peer;
-      /* a pad block dropped all events, make sure we copy any new events on the
-       * srcpad to the sinkpad and schedule an update on the sinkpad */
-      if ((peer = GST_PAD_PEER (pad))) {
-        GST_OBJECT_LOCK (peer);
-        prepare_event_update (pad, peer);
-        GST_OBJECT_UNLOCK (peer);
-      }
-    }
-
-    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
+    /* the pad is idle now, we can signal the idle callback now */
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle");
+    GST_OBJECT_UNLOCK (pad);
 
-    if (pad->block_destroy_data && pad->block_data)
-      pad->block_destroy_data (pad->block_data);
+    /* call the callback if we need to be called for idle callbacks */
+    if (type & GST_BLOCK_TYPE_IDLE)
+      callback (pad, GST_BLOCK_TYPE_IDLE, user_data);
+  }
+  return TRUE;
 
-    pad->block_callback = callback;
-    pad->block_data = user_data;
-    pad->block_destroy_data = destroy_data;
-    pad->block_callback_called = FALSE;
+was_blocked:
+  {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was blocked");
+    GST_OBJECT_UNLOCK (pad);
 
-    GST_PAD_BLOCK_BROADCAST (pad);
-    if (!callback) {
-      /* no callback, wait for the unblock to happen */
-      GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for unblock");
-      GST_PAD_BLOCK_WAIT (pad);
-      GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocked");
-    }
+    return FALSE;
   }
+}
+
+/**
+ * gst_pad_unblock:
+ * @pad: the #GstPad to unblock
+ *
+ * Unblock @pad. All pending and current pad blocks, if any, are canceled. After
+ * this method, dataflow will continue on @pad.
+ *
+ * MT safe.
+ */
+void
+gst_pad_unblock (GstPad * pad)
+{
+  g_return_if_fail (GST_IS_PAD (pad));
+
+  GST_OBJECT_LOCK (pad);
+
+  if (G_UNLIKELY (!GST_PAD_IS_BLOCKED (pad)))
+    goto had_right_state;
+
+  /* cleanup */
+  if (pad->block_destroy_data && pad->block_data)
+    pad->block_destroy_data (pad->block_data);
+
+  pad->block_type = 0;
+  pad->block_callback = NULL;
+  pad->block_data = NULL;
+  pad->block_destroy_data = NULL;
+
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad");
+  GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
+  GST_PAD_BLOCK_BROADCAST (pad);
   GST_OBJECT_UNLOCK (pad);
 
-  return TRUE;
+  return;
 
 had_right_state:
   {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "pad was in right state (%d)", was_blocked);
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was unblocked");
     GST_OBJECT_UNLOCK (pad);
-
-    return FALSE;
+    return;
   }
 }
 
@@ -2694,7 +2703,7 @@ gst_pad_set_caps (GstPad * pad, GstCaps * caps)
   event = gst_event_new_caps (caps);
 
   if (GST_PAD_IS_SRC (pad))
-    gst_pad_push_event (pad, event);
+    res = gst_pad_push_event (pad, event);
   else
     res = gst_pad_send_event (pad, event);
 
@@ -3466,7 +3475,7 @@ gst_pad_query_default (GstPad * pad, GstQuery * query)
  * MT safe.
  */
 static GstFlowReturn
-handle_pad_block (GstPad * pad)
+handle_pad_block (GstPad * pad, GstBlockType type)
 {
   GstPadBlockCallback callback;
   gpointer user_data;
@@ -3493,22 +3502,19 @@ handle_pad_block (GstPad * pad)
        * some other thread is doing a GCond wait. */
       callback = pad->block_callback;
       pad->block_callback_called = TRUE;
-      if (callback) {
-        /* there is a callback installed, call it. We release the
-         * lock so that the callback can do something usefull with the
-         * pad */
+      if ((pad->block_type & type) == pad->block_type) {
         user_data = pad->block_data;
         GST_OBJECT_UNLOCK (pad);
-        callback (pad, TRUE, user_data);
-        GST_OBJECT_LOCK (pad);
 
+        GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "calling block callback");
+        /* Call the callback. We release the lock so that the callback can
+         * do something usefull with the pad */
+        callback (pad, type, user_data);
+
+        GST_OBJECT_LOCK (pad);
         /* we released the lock, recheck flushing */
         if (GST_PAD_IS_FLUSHING (pad))
           goto flushing;
-      } else {
-        /* no callback, signal the thread that is doing a GCond wait
-         * if any. */
-        GST_PAD_BLOCK_BROADCAST (pad);
       }
     } while (pad->block_callback_called == FALSE && GST_PAD_IS_BLOCKED (pad));
 
@@ -3535,20 +3541,6 @@ handle_pad_block (GstPad * pad)
 
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got unblocked");
 
-  /* when we get here, the pad is unblocked again and we perform
-   * the needed unblock code. */
-  callback = pad->block_callback;
-  if (callback) {
-    /* we need to call the callback */
-    user_data = pad->block_data;
-    GST_OBJECT_UNLOCK (pad);
-    callback (pad, FALSE, user_data);
-    GST_OBJECT_LOCK (pad);
-  } else {
-    /* we need to signal the thread waiting on the GCond */
-    GST_PAD_BLOCK_BROADCAST (pad);
-  }
-
   gst_object_unref (pad);
 
   return ret;
@@ -3684,6 +3676,8 @@ gst_pad_emit_have_data_signal (GstPad * pad, GstMiniObject * obj)
   else
     detail = 0;
 
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "Emiting have-data signal");
+
   /* actually emit */
   g_signal_emitv (args, gst_pad_signals[PAD_HAVE_DATA], detail, &ret);
   res = g_value_get_boolean (&ret);
@@ -3952,22 +3946,24 @@ static GstFlowReturn
 pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data)
 {
   GstFlowReturn ret;
-  gboolean need_probes, did_probes = FALSE;
+  gboolean need_probes, do_probes = TRUE;
 
 again:
   GST_OBJECT_LOCK (pad);
   /* FIXME: this check can go away; pad_set_blocked could be implemented with
    * probes completely or probes with an extended pad block. */
   while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
-    if ((ret = handle_pad_block (pad)) != GST_FLOW_OK)
+    if ((ret =
+            handle_pad_block (pad,
+                GST_BLOCK_TYPE_PUSH | GST_BLOCK_TYPE_DATA) != GST_FLOW_OK))
       goto flushed;
 
   need_probes = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
 
   /* we emit signals on the pad arg, the peer will have a chance to
    * emit in the _chain() function */
-  if (G_UNLIKELY (need_probes && !did_probes)) {
-    did_probes = TRUE;
+  if (G_UNLIKELY (need_probes && do_probes)) {
+    do_probes = FALSE;
     /* unlock before emitting */
     GST_OBJECT_UNLOCK (pad);
 
@@ -4015,6 +4011,23 @@ pad_post_push (GstPad * pad)
 {
   GST_OBJECT_LOCK (pad);
   pad->priv->using--;
+  if (pad->priv->using == 0) {
+    /* pad is not active anymore, check if we need to trigger the block */
+    if (GST_PAD_IS_BLOCKED (pad)) {
+      GstPadBlockCallback callback;
+      gpointer user_data;
+
+      callback = pad->block_callback;
+      user_data = pad->block_data;
+      GST_PAD_BLOCK_BROADCAST (pad);
+      GST_OBJECT_UNLOCK (pad);
+
+      if (callback)
+        callback (pad, GST_BLOCK_TYPE_IDLE, user_data);
+
+      return;
+    }
+  }
   GST_OBJECT_UNLOCK (pad);
 }
 
@@ -4290,7 +4303,8 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
   GST_OBJECT_LOCK (pad);
 
   while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
-    handle_pad_block (pad);
+    if ((ret = handle_pad_block (pad, GST_BLOCK_TYPE_PULL) != GST_FLOW_OK))
+      goto flushed;
 
   if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
     goto not_connected;
@@ -4309,6 +4323,12 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
   if (G_UNLIKELY (ret != GST_FLOW_OK))
     goto pull_range_failed;
 
+  while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
+    if ((ret =
+            handle_pad_block (pad,
+                GST_BLOCK_TYPE_PULL | GST_BLOCK_TYPE_DATA) != GST_FLOW_OK))
+      goto flushed;
+
   /* can only fire the signal if we have a valid buffer */
   if (G_UNLIKELY (emit_signal)) {
     if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (*buffer)))
@@ -4331,6 +4351,12 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
   return ret;
 
   /* ERROR recovery here */
+flushed:
+  {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "we are flushing");
+    GST_OBJECT_UNLOCK (pad);
+    return GST_FLOW_NOT_LINKED;
+  }
 not_connected:
   {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
@@ -4385,8 +4411,8 @@ gboolean
 gst_pad_push_event (GstPad * pad, GstEvent * event)
 {
   GstPad *peerpad;
-  gboolean result, need_probes, did_probes = FALSE, did_event_actions = FALSE;
-  gint64 offset;
+  gboolean result, need_probes, do_probes = TRUE, do_event_actions = TRUE;
+  gboolean stored = FALSE;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (event != NULL, FALSE);
@@ -4397,6 +4423,9 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
 again:
   GST_OBJECT_LOCK (pad);
 
+  peerpad = GST_PAD_PEER (pad);
+  need_probes = do_probes && (GST_PAD_DO_EVENT_SIGNALS (pad) > 0);
+
   /* Two checks to be made:
    * . (un)set the FLUSHING flag for flushing events,
    * . handle pad blocking */
@@ -4416,89 +4445,92 @@ again:
       break;
     case GST_EVENT_FLUSH_STOP:
       GST_PAD_UNSET_FLUSHING (pad);
+      if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
+        GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop");
+        goto flushed;
+      }
       break;
     default:
-      break;
-  }
-
-  /* store the event on the pad, but only on srcpads */
-  if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) {
-    if (GST_PAD_IS_FLUSHING (pad)) {
-      goto flushing;
-    } else {
-      guint idx;
-
-      idx = GST_EVENT_STICKY_IDX (event);
-      GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
-          GST_EVENT_TYPE_NAME (event), idx);
+    {
+      /* store the event on the pad, but only on srcpads */
+      if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) {
+        guint idx;
 
-      /* srcpad sticky events always become active immediately */
-      gst_event_replace (&pad->priv->events[idx].event, event);
-    }
-  }
+        idx = GST_EVENT_STICKY_IDX (event);
+        GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
+            GST_EVENT_TYPE_NAME (event), idx);
 
-  /* drop all events when blocking. Sticky events will stay on the pad and will
-   * be activated on the peer when unblocking. */
-  if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
-    GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding event");
-    goto flushed;
-  }
+        /* srcpad sticky events always become active immediately */
+        gst_event_replace (&pad->priv->events[idx].event, event);
 
-  offset = pad->offset;
-  need_probes = !did_probes && (GST_PAD_DO_EVENT_SIGNALS (pad) > 0);
-  peerpad = GST_PAD_PEER (pad);
+        stored = TRUE;
+      }
 
-  /* backwards compatibility mode for caps */
-  if (!did_event_actions) {
-    did_event_actions = TRUE;
+      /* backwards compatibility mode for caps */
+      if (do_event_actions) {
+        do_event_actions = FALSE;
 
-    switch (GST_EVENT_TYPE (event)) {
-      case GST_EVENT_CAPS:
-      {
-        GstCaps *caps;
+        switch (GST_EVENT_TYPE (event)) {
+          case GST_EVENT_CAPS:
+          {
+            GstCaps *caps;
+
+            GST_OBJECT_UNLOCK (pad);
+
+            gst_event_parse_caps (event, &caps);
+            /* FIXME, this is awkward because we don't check flushing here which means
+             * that we can call the setcaps functions on flushing pads, this is not
+             * quite what we want, otoh, this code should just go away and elements
+             * that set caps on their srcpad should just setup stuff themselves. */
+            gst_pad_call_setcaps (pad, caps);
+
+            /* recheck everything, we released the lock */
+            goto again;
+          }
+          case GST_EVENT_SEGMENT:
+          {
+            gint64 offset;
 
-        GST_OBJECT_UNLOCK (pad);
+            offset = pad->offset;
+            /* check if we need to adjust the segment */
+            if (offset != 0 && (need_probes || peerpad != NULL)) {
+              GstSegment segment;
 
-        gst_event_parse_caps (event, &caps);
-        /* FIXME, this is awkward because we don't check flushing here which means
-         * that we can call the setcaps functions on flushing pads, this is not
-         * quite what we want, otoh, this code should just go away and elements
-         * that set caps on their srcpad should just setup stuff themselves. */
-        gst_pad_call_setcaps (pad, caps);
+              /* copy segment values */
+              gst_event_copy_segment (event, &segment);
+              gst_event_unref (event);
 
-        /* recheck everything, we released the lock */
-        goto again;
+              /* adjust and make a new event with the offset applied */
+              segment.base += offset;
+              event = gst_event_new_segment (&segment);
+            }
+            break;
+          }
+          case GST_EVENT_RECONFIGURE:
+            if (GST_PAD_IS_SINK (pad))
+              GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_RECONFIGURE);
+            break;
+          default:
+            break;
+        }
       }
-      case GST_EVENT_SEGMENT:
-        /* check if we need to adjust the segment */
-        if (offset != 0 && (need_probes || peerpad != NULL)) {
-          GstSegment segment;
 
-          /* copy segment values */
-          gst_event_copy_segment (event, &segment);
-          gst_event_unref (event);
+      while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
+        if (handle_pad_block (pad,
+                GST_BLOCK_TYPE_PUSH | GST_BLOCK_TYPE_DATA) != GST_FLOW_OK)
+          goto flushed;
 
-          /* adjust and make a new event with the offset applied */
-          segment.base += offset;
-          event = gst_event_new_segment (&segment);
-        }
-        break;
-      case GST_EVENT_RECONFIGURE:
-        if (GST_PAD_IS_SINK (pad))
-          GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_RECONFIGURE);
-        break;
-      default:
-        break;
+      break;
     }
   }
 
   /* send probes after modifying the events above */
   if (G_UNLIKELY (need_probes)) {
-    did_probes = TRUE;
+    do_probes = FALSE;
     GST_OBJECT_UNLOCK (pad);
 
     if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event)))
-      goto dropping;
+      goto dropped;
 
     /* retry, we released the lock */
     goto again;
@@ -4523,11 +4555,9 @@ again:
 
   gst_object_unref (peerpad);
 
-  GST_OBJECT_LOCK (pad);
-  pad->priv->using--;
-  GST_OBJECT_UNLOCK (pad);
+  pad_post_push (pad);
 
-  return result;
+  return result | stored;
 
   /* ERROR handling */
 flushed:
@@ -4535,28 +4565,20 @@ flushed:
     GST_DEBUG_OBJECT (pad, "We're flushing");
     GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
-    GST_OBJECT_UNLOCK (pad);
-    return TRUE;
+    return stored;
   }
-dropping:
+dropped:
   {
     GST_DEBUG_OBJECT (pad, "Dropping event after FALSE probe return");
     gst_event_unref (event);
-    return FALSE;
+    return stored;
   }
 not_linked:
   {
     GST_DEBUG_OBJECT (pad, "Dropping event because pad is not linked");
     GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
-    return FALSE;
-  }
-flushing:
-  {
-    GST_DEBUG_OBJECT (pad, "Dropping event because pad is flushing");
-    GST_OBJECT_UNLOCK (pad);
-    gst_event_unref (event);
-    return FALSE;
+    return stored;
   }
 }
 
index 970e1ef..850957f 100644 (file)
@@ -471,15 +471,32 @@ typedef void                      (*GstPadFixateCapsFunction)     (GstPad *pad, GstCaps *caps);
 typedef gboolean               (*GstPadDispatcherFunction)     (GstPad *pad, gpointer data);
 
 /**
+ * GstBlockType:
+ * @GST_BLOCK_TYPE_IDLE: The pad is idle
+ * @GST_BLOCK_TYPE_DATA: Data is queued on the pad
+ * @GST_BLOCK_TYPE_PUSH: Blocked on a push operation
+ * @GST_BLOCK_TYPE_PULL: Blocked on a pull operation
+ *
+ * The different blocking types that can occur.
+ */
+typedef enum
+{
+  GST_BLOCK_TYPE_IDLE = (1 << 0),
+  GST_BLOCK_TYPE_DATA = (1 << 1),
+  GST_BLOCK_TYPE_PUSH = (1 << 2),
+  GST_BLOCK_TYPE_PULL = (1 << 3),
+} GstBlockType;
+
+/**
  * GstPadBlockCallback:
- * @pad: the #GstPad that is blockend or unblocked.
- * @blocked: blocking state for the pad
+ * @pad: the #GstPad that is blocked
+ * @type: the current blocking type
  * @user_data: the gpointer to optional user data.
  *
- * Callback used by gst_pad_set_blocked_async(). Gets called when the blocking
- * operation succeeds.
+ * Callback used by gst_pad_block(). Gets called to notify about the current
+ * blocking type.
  */
-typedef void                   (*GstPadBlockCallback)          (GstPad *pad, gboolean blocked, gpointer user_data);
+typedef void                   (*GstPadBlockCallback)          (GstPad *pad, GstBlockType type, gpointer user_data);
 
 /**
  * GstPadStickyEventsForeachFunction:
@@ -600,6 +617,7 @@ struct _GstPad {
   /*< public >*/ /* with LOCK */
   /* block cond, mutex is from the object */
   GCond                                *block_cond;
+  GstBlockType                   block_type;
   GstPadBlockCallback           block_callback;
   gpointer                      block_data;
   GDestroyNotify                 block_destroy_data;
@@ -804,9 +822,13 @@ gboolean           gst_pad_is_active                       (GstPad *pad);
 gboolean               gst_pad_activate_pull                   (GstPad *pad, gboolean active);
 gboolean               gst_pad_activate_push                   (GstPad *pad, gboolean active);
 
-gboolean               gst_pad_set_blocked                     (GstPad *pad, gboolean blocked,
-                                                                GstPadBlockCallback callback, gpointer user_data,
+gboolean               gst_pad_block                           (GstPad *pad,
+                                                                GstBlockType type,
+                                                                GstPadBlockCallback callback,
+                                                                 gpointer user_data,
                                                                  GDestroyNotify destroy_data);
+void                    gst_pad_unblock                         (GstPad *pad);
+
 gboolean               gst_pad_is_blocked                      (GstPad *pad);
 gboolean               gst_pad_is_blocking                     (GstPad *pad);
 
index 3aca7f7..9bbe722 100644 (file)
@@ -764,10 +764,10 @@ static GMutex *blocked_lock;
 static GCond *blocked_cond;
 
 static void
-pad_blocked_cb (GstPad * pad, gboolean blocked, gpointer user_data)
+pad_blocked_cb (GstPad * pad, GstBlockType type, gpointer user_data)
 {
   g_mutex_lock (blocked_lock);
-  GST_DEBUG ("srcpad blocked: %d, sending signal", blocked);
+  GST_DEBUG ("srcpad blocked: %d, sending signal", type);
   g_cond_signal (blocked_cond);
   g_mutex_unlock (blocked_lock);
 }
@@ -799,7 +799,7 @@ GST_START_TEST (test_add_live2)
   GST_DEBUG ("blocking srcpad");
   /* block source pad */
   srcpad = gst_element_get_static_pad (src, "src");
-  gst_pad_set_blocked (srcpad, TRUE, pad_blocked_cb, NULL, NULL);
+  gst_pad_block (srcpad, GST_BLOCK_TYPE_DATA, pad_blocked_cb, NULL, NULL);
 
   /* set source to PAUSED without adding it to the pipeline */
   ret = gst_element_set_state (src, GST_STATE_PAUSED);
@@ -827,7 +827,7 @@ GST_START_TEST (test_add_live2)
   GST_DEBUG ("unblocking srcpad");
 
   /* and unblock */
-  gst_pad_set_blocked (srcpad, FALSE, pad_blocked_cb, NULL, NULL);
+  gst_pad_unblock (srcpad);
 
   GST_DEBUG ("getting state");
 
index 4e80ccf..3f2ba10 100644 (file)
@@ -307,6 +307,55 @@ event_probe (GstPad * pad, GstMiniObject ** data, gpointer user_data)
   return TRUE;
 }
 
+
+typedef struct
+{
+  GMutex *lock;
+  GCond *cond;
+  gboolean signaled;
+} SignalData;
+
+static void
+signal_data_init (SignalData * data)
+{
+  data->lock = g_mutex_new ();
+  data->cond = g_cond_new ();
+  data->signaled = FALSE;
+}
+
+static void
+signal_data_cleanup (SignalData * data)
+{
+  g_mutex_free (data->lock);
+  g_cond_free (data->cond);
+}
+
+static void
+signal_data_signal (SignalData * data)
+{
+  g_mutex_lock (data->lock);
+  data->signaled = TRUE;
+  g_cond_broadcast (data->cond);
+  g_mutex_unlock (data->lock);
+}
+
+static void
+signal_data_wait (SignalData * data)
+{
+  g_mutex_lock (data->lock);
+  while (!data->signaled)
+    g_cond_wait (data->cond, data->lock);
+  g_mutex_unlock (data->lock);
+}
+
+static void
+signal_blocked (GstPad * pad, GstBlockType type, gpointer user_data)
+{
+  SignalData *data = (SignalData *) user_data;
+
+  signal_data_signal (data);
+}
+
 static void test_event
     (GstBin * pipeline, GstEventType type, GstPad * pad,
     gboolean expect_before_q, GstPad * fake_srcpad)
@@ -314,6 +363,7 @@ static void test_event
   GstEvent *event;
   GstPad *peer;
   gint i;
+  SignalData data;
 
   got_event_before_q = got_event_after_q = NULL;
 
@@ -329,17 +379,21 @@ static void test_event
   got_event_time.tv_sec = 0;
   got_event_time.tv_usec = 0;
 
+  signal_data_init (&data);
+
   /* We block the pad so the stream lock is released and we can send the event */
-  fail_unless (gst_pad_set_blocked (fake_srcpad, TRUE, NULL, NULL,
-          NULL) == TRUE);
+  fail_unless (gst_pad_block (fake_srcpad, GST_BLOCK_TYPE_DATA,
+          signal_blocked, &data, NULL) == TRUE);
+
+  signal_data_wait (&data);
 
   /* We send on the peer pad, since the pad is blocked */
   fail_unless ((peer = gst_pad_get_peer (pad)) != NULL);
   gst_pad_send_event (peer, event);
   gst_object_unref (peer);
 
-  fail_unless (gst_pad_set_blocked (fake_srcpad, FALSE, NULL, NULL,
-          NULL) == TRUE);
+  gst_pad_unblock (fake_srcpad);
+  signal_data_cleanup (&data);
 
   if (expect_before_q) {
     /* Wait up to 5 seconds for the event to appear */
index bd5080f..9dfebb1 100644 (file)
@@ -479,7 +479,7 @@ typedef struct
 } BlockData;
 
 static void
-block_callback (GstPad * pad, gboolean blocked, gpointer user_data)
+block_callback (GstPad * pad, GstBlockType type, gpointer user_data)
 {
   BlockData *block_data = (BlockData *) user_data;
 
@@ -514,7 +514,8 @@ GST_START_TEST (test_ghost_pads_block)
   block_data.cond = g_cond_new ();
 
   g_mutex_lock (block_data.mutex);
-  gst_pad_set_blocked (srcghost, TRUE, block_callback, &block_data, NULL);
+  gst_pad_block (srcghost, GST_BLOCK_TYPE_DATA, block_callback, &block_data,
+      NULL);
   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
   /* and wait now */
   g_cond_wait (block_data.cond, block_data.mutex);
@@ -555,7 +556,8 @@ GST_START_TEST (test_ghost_pads_probes)
   block_data.cond = g_cond_new ();
 
   g_mutex_lock (block_data.mutex);
-  gst_pad_set_blocked (srcghost, TRUE, block_callback, &block_data, NULL);
+  gst_pad_block (srcghost, GST_BLOCK_TYPE_DATA, block_callback, &block_data,
+      NULL);
   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
   /* and wait now */
   g_cond_wait (block_data.cond, block_data.mutex);
index 8d0bbbf..c16b194 100644 (file)
@@ -682,29 +682,20 @@ GST_START_TEST (test_sink_unref_unlink)
 GST_END_TEST;
 
 static void
-unblock_async_cb (GstPad * pad, gboolean blocked, gpointer user_data)
+block_async_cb (GstPad * pad, GstBlockType type, gpointer user_data)
 {
   gboolean *bool_user_data = (gboolean *) user_data;
 
-  /* here we should have blocked == 1 unblocked == 0 */
-  fail_unless (bool_user_data[0] == TRUE);
-  fail_unless (bool_user_data[1] == FALSE);
-
-  bool_user_data[1] = TRUE;
-}
-
-static void
-block_async_cb (GstPad * pad, gboolean blocked, gpointer user_data)
-{
-  gboolean *bool_user_data = (gboolean *) user_data;
+  fail_unless ((type & GST_BLOCK_TYPE_DATA) != 0);
 
   /* here we should have blocked == 0 unblocked == 0 */
   fail_unless (bool_user_data[0] == FALSE);
   fail_unless (bool_user_data[1] == FALSE);
 
-  bool_user_data[0] = blocked;
+  bool_user_data[0] = TRUE;
 
-  gst_pad_set_blocked (pad, FALSE, unblock_async_cb, user_data, NULL);
+  gst_pad_unblock (pad);
+  bool_user_data[1] = TRUE;
 }
 
 GST_START_TEST (test_block_async)
@@ -718,7 +709,7 @@ GST_START_TEST (test_block_async)
   fail_unless (pad != NULL);
 
   gst_pad_set_active (pad, TRUE);
-  gst_pad_set_blocked (pad, TRUE, block_async_cb, &data, NULL);
+  gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_cb, &data, NULL);
 
   fail_unless (data[0] == FALSE);
   fail_unless (data[1] == FALSE);
@@ -796,9 +787,9 @@ block_async_full_destroy (gpointer user_data)
 }
 
 static void
-block_async_full_cb (GstPad * pad, gboolean blocked, gpointer user_data)
+block_async_full_cb (GstPad * pad, GstBlockType type, gpointer user_data)
 {
-  *(gint *) user_data = (gint) blocked;
+  *(gint *) user_data = (gint) TRUE;
 
   gst_pad_push_event (pad, gst_event_new_flush_start ());
   GST_DEBUG ("setting state to 1");
@@ -814,7 +805,7 @@ GST_START_TEST (test_block_async_full_destroy)
   fail_unless (pad != NULL);
   gst_pad_set_active (pad, TRUE);
 
-  gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
+  gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
       &state, block_async_full_destroy);
   fail_unless (state == 0);
 
@@ -825,26 +816,24 @@ GST_START_TEST (test_block_async_full_destroy)
   gst_pad_push_event (pad, gst_event_new_flush_stop ());
 
   /* pad was already blocked so nothing happens */
-  gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
+  gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
       &state, block_async_full_destroy);
   fail_unless (state == 1);
 
-  /* unblock with the same data, callback is called */
-  gst_pad_set_blocked (pad, FALSE, block_async_full_cb,
-      &state, block_async_full_destroy);
+  /* unblock callback is called */
+  gst_pad_unblock (pad);
   fail_unless (state == 2);
 
-  /* block with the same data, callback is called */
+  /* block with the same data, nothing is called */
   state = 1;
-  gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
+  gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
       &state, block_async_full_destroy);
-  fail_unless (state == 2);
+  fail_unless (state == 1);
 
   /* now change user_data (to NULL in this case) so destroy_notify should be
    * called */
   state = 1;
-  gst_pad_set_blocked (pad, FALSE, block_async_full_cb,
-      NULL, block_async_full_destroy);
+  gst_pad_unblock (pad);
   fail_unless (state == 2);
 
   gst_object_unref (pad);
@@ -862,7 +851,7 @@ GST_START_TEST (test_block_async_full_destroy_dispose)
   fail_unless (pad != NULL);
   gst_pad_set_active (pad, TRUE);
 
-  gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
+  gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
       &state, block_async_full_destroy);
 
   gst_pad_push (pad, gst_buffer_new ());
@@ -880,6 +869,7 @@ GST_START_TEST (test_block_async_full_destroy_dispose)
 GST_END_TEST;
 
 
+#if 0
 static void
 unblock_async_no_flush_cb (GstPad * pad, gboolean blocked, gpointer user_data)
 {
@@ -895,20 +885,24 @@ unblock_async_no_flush_cb (GstPad * pad, gboolean blocked, gpointer user_data)
 
   bool_user_data[2] = TRUE;
 }
+#endif
 
 
+#if 0
 static void
 unblock_async_not_called (GstPad * pad, gboolean blocked, gpointer user_data)
 {
   g_warn_if_reached ();
 }
+#endif
 
 static void
-block_async_second_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
+block_async_second_no_flush (GstPad * pad, GstBlockType type,
+    gpointer user_data)
 {
   gboolean *bool_user_data = (gboolean *) user_data;
 
-  fail_unless (blocked == TRUE);
+  fail_unless (type & GST_BLOCK_TYPE_DATA);
 
   fail_unless (bool_user_data[0] == TRUE);
   fail_unless (bool_user_data[1] == FALSE);
@@ -916,34 +910,32 @@ block_async_second_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
 
   bool_user_data[1] = TRUE;
 
-  fail_unless (gst_pad_set_blocked (pad, FALSE, unblock_async_no_flush_cb,
-          user_data, NULL));
+  gst_pad_unblock (pad);
 }
 
 static void
-block_async_first_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
+block_async_first_no_flush (GstPad * pad, GstBlockType type, gpointer user_data)
 {
   static int n_calls = 0;
   gboolean *bool_user_data = (gboolean *) user_data;
 
-  fail_unless (blocked == TRUE);
+  fail_unless (type & GST_BLOCK_TYPE_DATA);
 
   if (++n_calls > 1)
     /* we expect this callback to be called only once */
     g_warn_if_reached ();
 
-  *bool_user_data = blocked;
+  *bool_user_data = TRUE;
 
   fail_unless (bool_user_data[0] == TRUE);
   fail_unless (bool_user_data[1] == FALSE);
   fail_unless (bool_user_data[2] == FALSE);
 
-  fail_unless (gst_pad_set_blocked (pad, FALSE, unblock_async_not_called,
-          NULL, NULL));
+  gst_pad_unblock (pad);
 
   /* replace block_async_first with block_async_second so next time the pad is
    * blocked the latter should be called */
-  fail_unless (gst_pad_set_blocked (pad, TRUE,
+  fail_unless (gst_pad_block (pad, GST_BLOCK_TYPE_DATA,
           block_async_second_no_flush, user_data, NULL));
 }
 
@@ -956,13 +948,13 @@ GST_START_TEST (test_block_async_replace_callback_no_flush)
   fail_unless (pad != NULL);
   gst_pad_set_active (pad, TRUE);
 
-  fail_unless (gst_pad_set_blocked (pad, TRUE, block_async_first_no_flush,
-          bool_user_data, NULL));
+  fail_unless (gst_pad_block (pad, GST_BLOCK_TYPE_DATA,
+          block_async_first_no_flush, bool_user_data, NULL));
 
   gst_pad_push (pad, gst_buffer_new ());
   fail_unless (bool_user_data[0] == TRUE);
   fail_unless (bool_user_data[1] == TRUE);
-  fail_unless (bool_user_data[2] == TRUE);
+  fail_unless (bool_user_data[2] == FALSE);
 
   gst_object_unref (pad);
 }