libs/gst/base/gstbasesrc.c: Allow sending EOS to the source to make it send out an...
authorWim Taymans <wim.taymans@gmail.com>
Wed, 19 Dec 2007 17:49:38 +0000 (17:49 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 19 Dec 2007 17:49:38 +0000 (17:49 +0000)
Original commit message from CVS:
* libs/gst/base/gstbasesrc.c: (gst_base_src_send_event),
(gst_base_src_get_range), (gst_base_src_pad_get_range),
(gst_base_src_loop), (gst_base_src_set_flushing),
(gst_base_src_change_state):
Allow sending EOS to the source to make it send out an EOS event from
the streaming thread.
Update docs and deprecate the old NULL/READY shutdown method.
* tests/check/libs/basesrc.c: (GST_START_TEST),
(gst_basesrc_suite):
Add unit test for controlled shutdown.

ChangeLog
libs/gst/base/gstbasesrc.c
tests/check/libs/basesrc.c

index f3ee6ea..2a4f651 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,19 @@
 2007-12-19  Wim Taymans  <wim.taymans@collabora.co.uk>
 
+       * libs/gst/base/gstbasesrc.c: (gst_base_src_send_event),
+       (gst_base_src_get_range), (gst_base_src_pad_get_range),
+       (gst_base_src_loop), (gst_base_src_set_flushing),
+       (gst_base_src_change_state):
+       Allow sending EOS to the source to make it send out an EOS event from
+       the streaming thread.
+       Update docs and deprecate the old NULL/READY shutdown method.
+
+       * tests/check/libs/basesrc.c: (GST_START_TEST),
+       (gst_basesrc_suite):
+       Add unit test for controlled shutdown.
+
+2007-12-19  Wim Taymans  <wim.taymans@collabora.co.uk>
+
        * docs/design/part-synchronisation.txt:
        Small updates.
 
index 11ac86c..d113e35 100644 (file)
  * been processed and the pipeline can safely be stopped.
  * </para>
  * <para>
- * Since GStreamer 0.10.3 an application may simply set the source
- * element to NULL or READY state to make it send an EOS event downstream.
- * The application should lock the state of the source afterwards, so that
- * shutting down the pipeline from PLAYING doesn't temporarily start up the
- * source element for a second time:
- * <programlisting>
- * ...
- * // stop recording
- * gst_element_set_state (audio_source, #GST_STATE_NULL);
- * gst_element_set_locked_state (audio_source, %TRUE);
- * ...
- * </programlisting>
- * Now the application should wait for an EOS message
- * to be posted on the pipeline's bus. Once it has received
- * an EOS message, it may safely shut down the entire pipeline:
- * <programlisting>
- * ...
- * // everything done - shut down pipeline
- * gst_element_set_state (pipeline, #GST_STATE_NULL);
- * gst_element_set_locked_state (audio_source, %FALSE);
- * ...
- * </programlisting>
+ * Since GStreamer 0.10.16 an application may send an EOS event to a source
+ * element to make it send an EOS event downstream. This can typically be done
+ * with the gst_element_send_event() function on the element or its parent bin.
  * </para>
  * <para>
- * Note that setting the source element to NULL or READY when the 
- * pipeline is in the PAUSED state may cause a deadlock since the streaming
- * thread might be blocked in PREROLL.
+ * After the EOS has been sent to the element, the application should wait for
+ * an EOS message to be posted on the pipeline's bus. Once this EOS message is
+ * received, it may safely shut down the entire pipeline.
  * </para>
  * <para>
- * Last reviewed on 2007-09-13 (0.10.15)
+ * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
+ * is still available but deprecated as it is dangerous and less flexible.
+ * </para>
+ * <para>
+ * Last reviewed on 2007-12-19 (0.10.16)
  * </para>
  * </refsect2>
  */
@@ -249,6 +234,9 @@ struct _GstBaseSrcPrivate
   GstEvent *close_segment;
   GstEvent *start_segment;
 
+  /* if EOS is pending */
+  gboolean pending_eos;
+
   /* startup latency is the time it takes between going to PLAYING and producing
    * the first BUFFER with running_time 0. This value is included in the latency
    * reporting. */
@@ -1278,8 +1266,12 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
 
       /* downstream serialized events */
     case GST_EVENT_EOS:
-      /* FIXME, queue EOS and make sure the task or pull function 
-       * perform the EOS actions. */
+      /* queue EOS and make sure the task or pull function 
+       * performs the EOS actions. */
+      GST_LIVE_LOCK (src);
+      src->priv->pending_eos = TRUE;
+      GST_LIVE_UNLOCK (src);
+      result = TRUE;
       break;
     case GST_EVENT_NEWSEGMENT:
       /* sending random NEWSEGMENT downstream can break sync. */
@@ -1787,9 +1779,12 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
   status = gst_base_src_do_sync (src, *buf);
 
   /* waiting for the clock could have made us flushing */
-  if (src->priv->flushing)
+  if (G_UNLIKELY (src->priv->flushing))
     goto flushing;
 
+  if (G_UNLIKELY (src->priv->pending_eos))
+    goto eos;
+
   switch (status) {
     case GST_CLOCK_EARLY:
       /* the buffer is too late. We currently don't drop the buffer. */
@@ -1863,6 +1858,13 @@ flushing:
     *buf = NULL;
     return GST_FLOW_WRONG_STATE;
   }
+eos:
+  {
+    GST_DEBUG_OBJECT (src, "we are EOS");
+    gst_buffer_unref (*buf);
+    *buf = NULL;
+    return GST_FLOW_UNEXPECTED;
+  }
 }
 
 static GstFlowReturn
@@ -1875,9 +1877,13 @@ gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
   src = GST_BASE_SRC (gst_pad_get_parent (pad));
 
   GST_LIVE_LOCK (src);
-  if (src->priv->flushing)
+  if (G_UNLIKELY (src->priv->flushing))
     goto flushing;
 
+  /* if we're EOS, return right away */
+  if (G_UNLIKELY (src->priv->pending_eos))
+    goto eos;
+
   res = gst_base_src_get_range (src, offset, length, buf);
 
 done:
@@ -1894,6 +1900,12 @@ flushing:
     res = GST_FLOW_WRONG_STATE;
     goto done;
   }
+eos:
+  {
+    GST_DEBUG_OBJECT (src, "we are EOS");
+    res = GST_FLOW_UNEXPECTED;
+    goto done;
+  }
 }
 
 static gboolean
@@ -1967,9 +1979,13 @@ gst_base_src_loop (GstPad * pad)
   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
 
   GST_LIVE_LOCK (src);
-  if (src->priv->flushing)
+  if (G_UNLIKELY (src->priv->flushing))
     goto flushing;
 
+  /* if we're EOS, return right away */
+  if (G_UNLIKELY (src->priv->pending_eos))
+    goto eos;
+
   src->priv->last_sent_eos = FALSE;
 
   /* if we operate in bytes, we can calculate an offset */
@@ -1990,11 +2006,11 @@ gst_base_src_loop (GstPad * pad)
     goto null_buffer;
 
   /* push events to close/start our segment before we push the buffer. */
-  if (src->priv->close_segment) {
+  if (G_UNLIKELY (src->priv->close_segment)) {
     gst_pad_push_event (pad, src->priv->close_segment);
     src->priv->close_segment = NULL;
   }
-  if (src->priv->start_segment) {
+  if (G_UNLIKELY (src->priv->start_segment)) {
     gst_pad_push_event (pad, src->priv->start_segment);
     src->priv->start_segment = NULL;
   }
@@ -2051,7 +2067,7 @@ gst_base_src_loop (GstPad * pad)
     goto pause;
   }
 
-  if (eos) {
+  if (G_UNLIKELY (eos)) {
     GST_INFO_OBJECT (src, "pausing after end of segment");
     ret = GST_FLOW_UNEXPECTED;
     goto pause;
@@ -2068,6 +2084,13 @@ flushing:
     ret = GST_FLOW_WRONG_STATE;
     goto pause;
   }
+eos:
+  {
+    GST_DEBUG_OBJECT (src, "we are EOS");
+    GST_LIVE_UNLOCK (src);
+    ret = GST_FLOW_UNEXPECTED;
+    goto pause;
+  }
 pause:
   {
     const gchar *reason = gst_flow_get_name (ret);
@@ -2340,6 +2363,8 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
   if (flushing) {
     /* if we are locked in the live lock, signal it to make it flush */
     basesrc->live_running = TRUE;
+    /* clear pending EOS if any */
+    basesrc->priv->pending_eos = FALSE;
 
     /* step 1, now that we have the LIVE lock, clear our unlock request */
     if (bclass->unlock_stop)
@@ -2595,6 +2620,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
         basesrc->priv->last_sent_eos = TRUE;
       }
+      basesrc->priv->pending_eos = FALSE;
       event_p = &basesrc->data.ABI.pending_seek;
       gst_event_replace (event_p, NULL);
       event_p = &basesrc->priv->close_segment;
index 77a022d..0b5a51c 100644 (file)
@@ -328,6 +328,160 @@ GST_START_TEST (basesrc_eos_events_pull)
 GST_END_TEST;
 
 
+/* basesrc_eos_events_push_live_eos:
+ *  - make sure the source stops and emits EOS when we send an EOS event to the
+ *    pipeline.
+ */
+GST_START_TEST (basesrc_eos_events_push_live_eos)
+{
+  GstStateChangeReturn state_ret;
+  GstElement *src, *sink, *pipe;
+  GstMessage *msg;
+  GstBus *bus;
+  GstPad *srcpad;
+  guint probe, num_eos = 0;
+  gboolean res;
+
+  pipe = gst_pipeline_new ("pipeline");
+  sink = gst_element_factory_make ("fakesink", "sink");
+  src = gst_element_factory_make ("fakesrc", "src");
+
+  g_assert (pipe != NULL);
+  g_assert (sink != NULL);
+  g_assert (src != NULL);
+
+  fail_unless (gst_bin_add (GST_BIN (pipe), src) == TRUE);
+  fail_unless (gst_bin_add (GST_BIN (pipe), sink) == TRUE);
+
+  fail_unless (gst_element_link (src, sink) == TRUE);
+
+  g_object_set (sink, "can-activate-push", TRUE, NULL);
+  g_object_set (sink, "can-activate-pull", FALSE, NULL);
+
+  g_object_set (src, "can-activate-push", TRUE, NULL);
+  g_object_set (src, "can-activate-pull", FALSE, NULL);
+
+  /* set up event probe to count EOS events */
+  srcpad = gst_element_get_pad (src, "src");
+  fail_unless (srcpad != NULL);
+
+  probe = gst_pad_add_event_probe (srcpad,
+      G_CALLBACK (eos_event_counter), &num_eos);
+
+  bus = gst_element_get_bus (pipe);
+
+  gst_element_set_state (pipe, GST_STATE_PLAYING);
+  state_ret = gst_element_get_state (pipe, NULL, NULL, -1);
+  fail_unless (state_ret == GST_STATE_CHANGE_SUCCESS);
+
+  /* wait a second, then emit the EOS */
+  g_usleep (GST_USECOND * 1);
+
+  /* shut down source only (should send EOS event) ... */
+  res = gst_element_send_event (pipe, gst_event_new_eos ());
+  fail_unless (res == TRUE);
+
+  /* ... and wait for the EOS message from the sink */
+  msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
+  fail_unless (msg != NULL);
+  fail_unless (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_ERROR);
+  fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
+
+  /* should be exactly one EOS event */
+  fail_unless (num_eos == 1);
+
+  gst_element_set_state (pipe, GST_STATE_NULL);
+  gst_element_get_state (pipe, NULL, NULL, -1);
+
+  /* make sure source hasn't sent a second one when going PAUSED => READY */
+  fail_unless (num_eos == 1);
+
+  gst_pad_remove_event_probe (srcpad, probe);
+  gst_object_unref (srcpad);
+  gst_message_unref (msg);
+  gst_object_unref (bus);
+  gst_object_unref (pipe);
+}
+
+GST_END_TEST;
+
+/* basesrc_eos_events_pull_live_eos:
+ *  - make sure the source stops and emits EOS when we send an EOS event to the
+ *    pipeline.
+ */
+GST_START_TEST (basesrc_eos_events_pull_live_eos)
+{
+  GstStateChangeReturn state_ret;
+  GstElement *src, *sink, *pipe;
+  GstMessage *msg;
+  GstBus *bus;
+  GstPad *srcpad;
+  guint probe, num_eos = 0;
+  gboolean res;
+
+  pipe = gst_pipeline_new ("pipeline");
+  sink = gst_element_factory_make ("fakesink", "sink");
+  src = gst_element_factory_make ("fakesrc", "src");
+
+  g_assert (pipe != NULL);
+  g_assert (sink != NULL);
+  g_assert (src != NULL);
+
+  fail_unless (gst_bin_add (GST_BIN (pipe), src) == TRUE);
+  fail_unless (gst_bin_add (GST_BIN (pipe), sink) == TRUE);
+
+  fail_unless (gst_element_link (src, sink) == TRUE);
+
+  g_object_set (sink, "can-activate-push", FALSE, NULL);
+  g_object_set (sink, "can-activate-pull", TRUE, NULL);
+
+  g_object_set (src, "can-activate-push", FALSE, NULL);
+  g_object_set (src, "can-activate-pull", TRUE, NULL);
+
+  /* set up event probe to count EOS events */
+  srcpad = gst_element_get_pad (src, "src");
+  fail_unless (srcpad != NULL);
+
+  probe = gst_pad_add_event_probe (srcpad,
+      G_CALLBACK (eos_event_counter), &num_eos);
+
+  bus = gst_element_get_bus (pipe);
+
+  gst_element_set_state (pipe, GST_STATE_PLAYING);
+  state_ret = gst_element_get_state (pipe, NULL, NULL, -1);
+  fail_unless (state_ret == GST_STATE_CHANGE_SUCCESS);
+
+  /* wait a second, then emit the EOS */
+  g_usleep (GST_USECOND * 1);
+
+  /* shut down source only (should send EOS event) ... */
+  res = gst_element_send_event (pipe, gst_event_new_eos ());
+  fail_unless (res == TRUE);
+
+  /* ... and wait for the EOS message from the sink */
+  msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
+  fail_unless (msg != NULL);
+  fail_unless (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_ERROR);
+  fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
+
+  /* no EOS in pull mode */
+  fail_unless (num_eos == 0);
+
+  gst_element_set_state (pipe, GST_STATE_NULL);
+  gst_element_get_state (pipe, NULL, NULL, -1);
+
+  /* make sure source hasn't sent a second one when going PAUSED => READY */
+  fail_unless (num_eos == 0);
+
+  gst_pad_remove_event_probe (srcpad, probe);
+  gst_object_unref (srcpad);
+  gst_message_unref (msg);
+  gst_object_unref (bus);
+  gst_object_unref (pipe);
+}
+
+GST_END_TEST;
+
 Suite *
 gst_basesrc_suite (void)
 {
@@ -339,6 +493,8 @@ gst_basesrc_suite (void)
   tcase_add_test (tc, basesrc_eos_events_push);
   tcase_add_test (tc, basesrc_eos_events_push_live_op);
   tcase_add_test (tc, basesrc_eos_events_pull_live_op);
+  tcase_add_test (tc, basesrc_eos_events_push_live_eos);
+  tcase_add_test (tc, basesrc_eos_events_pull_live_eos);
 
   return s;
 }