gst/gstpad.c: Implement pad blocking on events according to part-block.txt.
authorEdward Hervey <bilboed@bilboed.com>
Mon, 3 Jul 2006 10:30:49 +0000 (10:30 +0000)
committerEdward Hervey <bilboed@bilboed.com>
Mon, 3 Jul 2006 10:30:49 +0000 (10:30 +0000)
Original commit message from CVS:
* gst/gstpad.c: (handle_pad_block), (gst_pad_push_event):
Implement pad blocking on events according to part-block.txt.
More comments on behaviour.
* tests/check/gst/gstevent.c: (test_event):
Send event to peer pad of blocked pad (else it will block).

ChangeLog
gst/gstpad.c
tests/check/gst/gstevent.c

index 8ebebe0..fd1ae47 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2006-07-03  Edward Hervey  <edward@fluendo.com>
+
+       * gst/gstpad.c: (handle_pad_block), (gst_pad_push_event):
+       Implement pad blocking on events according to part-block.txt.
+       More comments on behaviour.
+       * tests/check/gst/gstevent.c: (test_event):
+       Send event to peer pad of blocked pad (else it will block).
+
 2006-07-03  Thomas Vander Stichele  <thomas at apestaart dot org>
 
        * libs/gst/check/gstcheck.c: (gst_check_message_error),
index eca4098..f01e9c0 100644 (file)
@@ -3070,7 +3070,28 @@ gst_ghost_pad_save_thyself (GstPad * pad, xmlNodePtr parent)
 #endif /* GST_DISABLE_LOADSAVE */
 
 /*
- * should be called with pad lock held
+ * should be called with pad OBJECT_LOCK and STREAM_LOCK helt. 
+ * GST_PAD_IS_BLOCK (pad) == TRUE when this function is
+ * called.
+ *
+ * This function perform the pad blocking when an event, buffer push
+ * or buffer_alloc is performed on a _SRC_ pad. It blocks the
+ * streaming thread after informing the pad has been blocked. 
+ *
+ * An application can with this method wait and block any streaming
+ * thread and perform operations such as seeking or linking.
+ *
+ * Two methods are available for notifying the application of the
+ * block: 
+ * - the callback method, which happens in the STREAMING thread with
+ *   the STREAM_LOCK helt. With this method, the most usefull way of
+ *   dealing with the callback is to post a message to the main thread
+ *   where the pad block can then be handled outside of the streaming
+ *   thread. With the last method one can perform all operations such
+ *   as doing a state change, linking, unblocking, seeking etc on the
+ *   pad.
+ * - the GCond signal method, which makes any thread unblock when
+ *   the pad block happens.
  *
  * MT safe.
  */
@@ -3084,36 +3105,67 @@ handle_pad_block (GstPad * pad)
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
       "signal block taken on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
-  /* need to grab extra ref for the callbacks */
+  /* flushing, don't bother trying to block and return WRONG_STATE
+   * right away */
+  if (GST_PAD_IS_FLUSHING (pad))
+    goto flushingnonref;
+
+  /* we grab an extra ref for the callbacks, this is probably not
+   * needed (callback code does not have a ref and cannot unref). I
+   * think this was done to make it possible to unref the element in
+   * the callback, which is in the end totally impossible as it 
+   * requires grabbing the STREAM_LOCK and OBJECT_LOCK which are
+   * all taken when calling this function. */
   gst_object_ref (pad);
 
+  /* we either have a callback installed to notify the block or
+   * some other thread is doing a GCond wait. */
   callback = pad->block_callback;
   if (callback) {
+    /* there is a callback installed, call it. We release the
+     * lock so that the callback can do something usefull with the
+     * pad */
     user_data = pad->block_data;
     GST_OBJECT_UNLOCK (pad);
     callback (pad, TRUE, user_data);
     GST_OBJECT_LOCK (pad);
   } else {
+    /* no callback, signal the thread that is doing a GCond wait
+     * if any. */
     GST_PAD_BLOCK_SIGNAL (pad);
   }
 
+  /* OBJECT_LOCK could have been released when we did the callback, which
+   * then could have made the pad unblock so we need to check the blocking
+   * condition again.   */
   while (GST_PAD_IS_BLOCKED (pad)) {
     if (GST_PAD_IS_FLUSHING (pad))
       goto flushing;
+
+    /* now we block the streaming thread. It can be unlocked when we 
+     * deactivate the pad (which will also set the FLUSHING flag) or
+     * when the pad is unblocked. A flushing event will also unblock
+     * the pad after setting the FLUSHING flag. */
     GST_PAD_BLOCK_WAIT (pad);
+
+    /* see if we got unlocked by a flush or not */
     if (GST_PAD_IS_FLUSHING (pad))
       goto flushing;
   }
 
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got unblocked");
 
+  /* when we get here, the pad is unblocked again and we perform
+   * the needed unblock code. */
   callback = pad->block_callback;
   if (callback) {
+    /* we need to call the callback */
     user_data = pad->block_data;
     GST_OBJECT_UNLOCK (pad);
     callback (pad, FALSE, user_data);
     GST_OBJECT_LOCK (pad);
   } else {
+    /* we need to signal the thread waiting on the GCond */
     GST_PAD_BLOCK_SIGNAL (pad);
   }
 
@@ -3121,8 +3173,16 @@ handle_pad_block (GstPad * pad)
 
   return ret;
 
+flushingnonref:
+  {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+        "pad %s:%s was flushing", GST_DEBUG_PAD_NAME (pad));
+    return GST_FLOW_WRONG_STATE;
+  }
 flushing:
   {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+        "pad %s:%s became flushing", GST_DEBUG_PAD_NAME (pad));
     gst_object_unref (pad);
     return GST_FLOW_WRONG_STATE;
   }
@@ -3634,24 +3694,43 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
   g_return_val_if_fail (event != NULL, FALSE);
   g_return_val_if_fail (GST_IS_EVENT (event), FALSE);
 
+  GST_LOG_OBJECT (pad, "event:%s", GST_EVENT_TYPE_NAME (event));
+
   GST_OBJECT_LOCK (pad);
+
+  /* Two checks to be made:
+   * . (un)set the FLUSHING flag for flushing events,
+   * . handle pad blocking */
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
       GST_PAD_SET_FLUSHING (pad);
+
+      if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
+        /* flush start will have set the FLUSHING flag and will then
+         * unlock all threads doing a GCond wait on the blocked pad. This
+         * will typically unblock the STREAMING thread blocked on a pad. */
+        GST_PAD_BLOCK_SIGNAL (pad);
+        goto flushed;
+      }
       break;
     case GST_EVENT_FLUSH_STOP:
       GST_PAD_UNSET_FLUSHING (pad);
+
+      if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
+        goto flushed;
       break;
     default:
+      if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
+        if (GST_PAD_IS_FLUSHING (pad))
+          goto flushed;
+        while (GST_PAD_IS_BLOCKED (pad))
+          /* else block the event as long as the pad is blocked */
+          if (handle_pad_block (pad) != GST_FLOW_OK)
+            goto flushed;
+      }
       break;
   }
 
-  if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
-    if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START) {
-      GST_PAD_BLOCK_SIGNAL (pad);
-    }
-  }
-
   if (G_UNLIKELY (GST_PAD_DO_EVENT_SIGNALS (pad) > 0)) {
     GST_OBJECT_UNLOCK (pad);
 
@@ -3688,6 +3767,15 @@ not_linked:
     GST_OBJECT_UNLOCK (pad);
     return FALSE;
   }
+
+flushed:
+  {
+    GST_DEBUG_OBJECT (pad,
+        "Not forwarding event since we're flushing and blocking");
+    gst_event_unref (event);
+    GST_OBJECT_UNLOCK (pad);
+    return TRUE;
+  }
 }
 
 /**
index b128b89..0e9313a 100644 (file)
@@ -274,6 +274,7 @@ static void test_event
     gboolean expect_before_q, GstPad * fake_srcpad)
 {
   GstEvent *event;
+  GstPad *peer;
   gint i;
 
   got_event_before_q = got_event_after_q = NULL;
@@ -288,8 +289,14 @@ static void test_event
   got_event_time.tv_sec = 0;
   got_event_time.tv_usec = 0;
 
+  /* We block the pad so the stream lock is released and we can send the event */
   fail_unless (gst_pad_set_blocked (fake_srcpad, TRUE) == TRUE);
-  gst_pad_push_event (pad, event);
+
+  /* We send on the peer pad, since the pad is blocked */
+  fail_unless ((peer = gst_pad_get_peer (pad)) != NULL);
+  gst_pad_send_event (peer, event);
+  gst_object_unref (peer);
+
   fail_unless (gst_pad_set_blocked (fake_srcpad, FALSE) == TRUE);
 
   /* Wait up to 5 seconds for the event to appear */