basesrc: preserve seqnum of eos events sent by the user
authorThiago Santos <ts.santos@sisa.samsung.com>
Thu, 23 Jan 2014 18:52:51 +0000 (15:52 -0300)
committerThiago Santos <ts.santos@sisa.samsung.com>
Fri, 24 Jan 2014 12:31:44 +0000 (09:31 -0300)
Store the eos event seqnum and use it when creating the
new eos event to be pushed downstream. To know if the eos
was caused by the eos events received on send_event, a
'forced_eos' flag is used to use the correct seqnum on
the event pushed downstream.

Useful if the application wants to check if the EOS message
was generated from its own pushed EOS or from another source
(stream really finished).

Also adds a test for this

https://bugzilla.gnome.org/show_bug.cgi?id=722791

libs/gst/base/gstbasesrc.c
tests/check/libs/basesrc.c

index d51fb11..e6acdcb 100644 (file)
@@ -188,6 +188,12 @@ GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
 
+#define CLEAR_PENDING_EOS(bsrc) \
+  G_STMT_START { \
+    g_atomic_int_set (&bsrc->priv->has_pending_eos, FALSE); \
+    gst_event_replace (&bsrc->priv->pending_eos, NULL); \
+  } G_STMT_END
+
 
 /* BaseSrc signals and args */
 enum
@@ -230,7 +236,11 @@ struct _GstBaseSrcPrivate
   guint32 segment_seqnum;
 
   /* if EOS is pending (atomic) */
-  gint pending_eos;
+  GstEvent *pending_eos;
+  gint has_pending_eos;
+
+  /* if the eos was caused by a forced eos from the application */
+  gboolean forced_eos;
 
   /* startup latency is the time it takes between going to PLAYING and producing
    * the first BUFFER with running_time 0. This value is included in the latency
@@ -1729,7 +1739,11 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
       GST_LIVE_LOCK (src);
       src->priv->flushing = TRUE;
       /* clear pending EOS if any */
-      g_atomic_int_set (&src->priv->pending_eos, FALSE);
+      if (g_atomic_int_get (&src->priv->has_pending_eos)) {
+        GST_OBJECT_LOCK (src);
+        CLEAR_PENDING_EOS (src);
+        GST_OBJECT_UNLOCK (src);
+      }
       if (bclass->unlock_stop)
         bclass->unlock_stop (src);
       if (src->clock_id)
@@ -1770,18 +1784,24 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
        *
        * We have two possibilities:
        *
-       *  - Before we are to enter the _create function, we check the pending_eos
+       *  - Before we are to enter the _create function, we check the has_pending_eos
        *    first and do EOS instead of entering it.
        *  - If we are in the _create function or we did not manage to set the
        *    flag fast enough and we are about to enter the _create function,
        *    we unlock it so that we exit with FLUSHING immediately. We then
        *    check the EOS flag and do the EOS logic.
        */
-      g_atomic_int_set (&src->priv->pending_eos, TRUE);
-      GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
+      GST_OBJECT_LOCK (src);
+      g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+      if (src->priv->pending_eos)
+        gst_event_unref (src->priv->pending_eos);
+      src->priv->pending_eos = event;
+      event = NULL;
+      GST_OBJECT_UNLOCK (src);
 
+      GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
 
-      /* unlock the _create function so that we can check the pending_eos flag
+      /* unlock the _create function so that we can check the has_pending_eos flag
        * and we can do EOS. This will eventually release the LIVE_LOCK again so
        * that we can grab it and stop the unlock again. We don't take the stream
        * lock so that this operation is guaranteed to never block. */
@@ -2385,9 +2405,11 @@ again:
   }
 
   /* don't enter the create function if a pending EOS event was set. For the
-   * logic of the pending_eos, check the event function of this class. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
+   * logic of the has_pending_eos, check the event function of this class. */
+  if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
+    src->priv->forced_eos = TRUE;
     goto eos;
+  }
 
   GST_DEBUG_OBJECT (src,
       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
@@ -2400,11 +2422,12 @@ again:
   /* The create function could be unlocked because we have a pending EOS. It's
    * possible that we have a valid buffer from create that we need to
    * discard when the create function returned _OK. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
+  if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
     if (ret == GST_FLOW_OK) {
       if (*buf == NULL)
         gst_buffer_unref (res_buf);
     }
+    src->priv->forced_eos = TRUE;
     goto eos;
   }
 
@@ -2837,26 +2860,40 @@ pause:
       GstFormat format;
       gint64 position;
 
-      /* perform EOS logic */
       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
       format = src->segment.format;
       position = src->segment.position;
 
-      if (flag_segment) {
+      /* perform EOS logic */
+      if (src->priv->forced_eos) {
+        g_assert (g_atomic_int_get (&src->priv->has_pending_eos));
+        GST_OBJECT_LOCK (src);
+        event = src->priv->pending_eos;
+        src->priv->pending_eos = NULL;
+        GST_OBJECT_UNLOCK (src);
+
+      } else if (flag_segment) {
         GstMessage *message;
 
-        message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
-            format, position);
-        gst_message_set_seqnum (message, src->priv->seqnum);
-        gst_element_post_message (GST_ELEMENT_CAST (src), message);
-        event = gst_event_new_segment_done (format, position);
+        if (flag_segment) {
+          message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
+              format, position);
+          gst_message_set_seqnum (message, src->priv->seqnum);
+          gst_element_post_message (GST_ELEMENT_CAST (src), message);
+          event = gst_event_new_segment_done (format, position);
+        } else {
+          event = gst_event_new_eos ();
+          gst_event_set_seqnum (event, src->priv->seqnum);
+        }
+
         gst_event_set_seqnum (event, src->priv->seqnum);
-        gst_pad_push_event (pad, event);
       } else {
         event = gst_event_new_eos ();
         gst_event_set_seqnum (event, src->priv->seqnum);
-        gst_pad_push_event (pad, event);
       }
+
+      gst_pad_push_event (pad, event);
+
     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
       event = gst_event_new_eos ();
       gst_event_set_seqnum (event, src->priv->seqnum);
@@ -3481,7 +3518,11 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
     basesrc->live_running = TRUE;
 
     /* clear pending EOS if any */
-    g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
+    if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
+      GST_OBJECT_LOCK (basesrc);
+      CLEAR_PENDING_EOS (basesrc);
+      GST_OBJECT_UNLOCK (basesrc);
+    }
 
     /* step 1, now that we have the LIVE lock, clear our unlock request */
     if (bclass->unlock_stop)
@@ -3715,7 +3756,11 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
     {
       /* we don't need to unblock anything here, the pad deactivation code
        * already did this */
-      g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
+      if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
+        GST_OBJECT_LOCK (basesrc);
+        CLEAR_PENDING_EOS (basesrc);
+        GST_OBJECT_UNLOCK (basesrc);
+      }
       gst_event_replace (&basesrc->pending_seek, NULL);
       break;
     }
index b9e66d1..211a1b5 100644 (file)
@@ -56,6 +56,8 @@ GST_START_TEST (basesrc_eos_events_push_live_op)
   GstPad *srcpad;
   guint probe, num_eos = 0;
   GstStreamConsistency *consistency;
+  GstEvent *eos_event;
+  guint32 eos_event_seqnum;
 
   pipe = gst_pipeline_new ("pipeline");
   sink = gst_element_factory_make ("fakesink", "sink");
@@ -95,7 +97,9 @@ GST_START_TEST (basesrc_eos_events_push_live_op)
   g_usleep (GST_USECOND * 1);
 
   /* shut down pipeline (should send EOS message) ... */
-  gst_element_send_event (pipe, gst_event_new_eos ());
+  eos_event = gst_event_new_eos ();
+  eos_event_seqnum = gst_event_get_seqnum (eos_event);
+  gst_element_send_event (pipe, eos_event);
 
   /* ... and wait for the EOS message from the sink */
   msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
@@ -105,6 +109,7 @@ GST_START_TEST (basesrc_eos_events_push_live_op)
 
   /* should be exactly one EOS event */
   fail_unless (num_eos == 1);
+  fail_unless (gst_message_get_seqnum (msg) == eos_event_seqnum);
 
   gst_element_set_state (pipe, GST_STATE_NULL);
   gst_element_get_state (pipe, NULL, NULL, -1);