appsrc: serialize custom events with buffers flow
authorGuillaume Desmottes <guillaume.desmottes@collabora.com>
Mon, 22 Feb 2021 12:17:18 +0000 (13:17 +0100)
committerGuillaume Desmottes <guillaume.desmottes@collabora.com>
Thu, 22 Jul 2021 11:56:22 +0000 (13:56 +0200)
Application may want to inject events to the pipeline and keep them
synchronized with the buffers flow.

Fix #247

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1046>

gst-libs/gst/app/gstappsrc.c
tests/check/elements/appsrc.c

index 457220d..ce0875d 100644 (file)
@@ -1014,6 +1014,17 @@ gst_app_src_send_event (GstElement * element, GstEvent * event)
       g_mutex_unlock (&priv->mutex);
       break;
     default:
+      if (GST_EVENT_IS_SERIALIZED (event)) {
+        GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
+        g_mutex_lock (&priv->mutex);
+        gst_queue_array_push_tail (priv->queue, event);
+
+        if ((priv->wait_status & STREAM_WAITING))
+          g_cond_broadcast (&priv->cond);
+
+        g_mutex_unlock (&priv->mutex);
+        return TRUE;
+      }
       break;
   }
 
@@ -1596,6 +1607,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
   }
 
   while (TRUE) {
+    /* Our lock may have been release to push events or caps, check out
+     * state in case we are now flushing. */
+    if (G_UNLIKELY (priv->flushing))
+      goto flushing;
+
     /* return data as long as we have some */
     if (!gst_queue_array_is_empty (priv->queue)) {
       GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
@@ -1618,13 +1634,6 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
         if (caps_changed)
           gst_app_src_do_negotiate (bsrc);
 
-        /* Lock has released so now may need
-         *- flushing
-         *- new caps change
-         *- check queue has data */
-        if (G_UNLIKELY (priv->flushing))
-          goto flushing;
-
         /* Continue checks caps and queue */
         continue;
       }
@@ -1661,24 +1670,56 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
         *buf = NULL;
       } else if (GST_IS_EVENT (obj)) {
         GstEvent *event = GST_EVENT (obj);
-        const GstSegment *segment = NULL;
-
-        gst_event_parse_segment (event, &segment);
-        g_assert (segment != NULL);
-
-        if (!gst_segment_is_equal (&priv->current_segment, segment)) {
-          GST_DEBUG_OBJECT (appsrc,
-              "Update new segment %" GST_PTR_FORMAT, event);
-          if (!gst_base_src_new_segment (bsrc, segment)) {
-            GST_ERROR_OBJECT (appsrc,
-                "Couldn't set new segment %" GST_PTR_FORMAT, event);
-            gst_event_unref (event);
-            goto invalid_segment;
+
+        GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
+
+        if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
+          const GstSegment *segment = NULL;
+
+          gst_event_parse_segment (event, &segment);
+          g_assert (segment != NULL);
+
+          if (!gst_segment_is_equal (&priv->current_segment, segment)) {
+            GST_DEBUG_OBJECT (appsrc,
+                "Update new segment %" GST_PTR_FORMAT, event);
+            if (!gst_base_src_new_segment (bsrc, segment)) {
+              GST_ERROR_OBJECT (appsrc,
+                  "Couldn't set new segment %" GST_PTR_FORMAT, event);
+              gst_event_unref (event);
+              goto invalid_segment;
+            }
+            gst_segment_copy_into (segment, &priv->current_segment);
           }
-          gst_segment_copy_into (segment, &priv->current_segment);
-        }
 
-        gst_event_unref (event);
+          gst_event_unref (event);
+        } else {
+          GstEvent *seg_event;
+          GstSegment last_segment = priv->last_segment;
+
+          /* event is serialized with the buffers flow */
+
+          /* We are about to push an event, release out lock */
+          g_mutex_unlock (&priv->mutex);
+
+          seg_event =
+              gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
+              GST_EVENT_SEGMENT, 0);
+          if (!seg_event) {
+            seg_event = gst_event_new_segment (&last_segment);
+
+            GST_DEBUG_OBJECT (appsrc,
+                "received serialized event before first buffer, push default segment %"
+                GST_PTR_FORMAT, seg_event);
+
+            gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
+          } else {
+            gst_event_unref (seg_event);
+          }
+
+          gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
+
+          g_mutex_lock (&priv->mutex);
+        }
         continue;
       } else {
         g_assert_not_reached ();
index ae25429..732c5d2 100644 (file)
@@ -1368,6 +1368,78 @@ GST_START_TEST (test_appsrc_limits)
 
 GST_END_TEST;
 
+static GstFlowReturn
+send_event_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+  GST_LOG ("  buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
+
+  fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
+  ++expect_offset;
+  gst_buffer_unref (buf);
+
+  if (expect_offset == 2) {
+    /* test is done */
+    g_mutex_lock (&check_mutex);
+    done = TRUE;
+    g_cond_signal (&check_cond);
+    g_mutex_unlock (&check_mutex);
+  }
+
+  return GST_FLOW_OK;
+}
+
+static gboolean
+send_event_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  GST_LOG ("event %" GST_PTR_FORMAT, event);
+  if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
+    /* this event should arrive after the first buffer */
+    fail_unless_equals_int (expect_offset, 1);
+  }
+  gst_event_unref (event);
+  return TRUE;
+}
+
+/* check that custom downstream events are properly serialized with buffers */
+GST_START_TEST (test_appsrc_send_custom_event)
+{
+  GstElement *src;
+  GstBuffer *buf;
+
+  src = setup_appsrc ();
+
+  ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+
+  expect_offset = 0;
+  gst_pad_set_chain_function (mysinkpad, send_event_chain_func);
+  gst_pad_set_event_function (mysinkpad, send_event_event_func);
+
+  /* send a buffer, a custom event and a second buffer */
+  buf = gst_buffer_new_and_alloc (1);
+  GST_BUFFER_OFFSET (buf) = 0;
+  fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
+          buf) == GST_FLOW_OK);
+
+  gst_element_send_event (src,
+      gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+          gst_structure_new ("custom", NULL, NULL)));
+
+  buf = gst_buffer_new_and_alloc (2);
+  GST_BUFFER_OFFSET (buf) = 1;
+  fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
+          buf) == GST_FLOW_OK);
+
+  g_mutex_lock (&check_mutex);
+  while (!done)
+    g_cond_wait (&check_cond, &check_mutex);
+  g_mutex_unlock (&check_mutex);
+
+  ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_appsrc (src);
+}
+
+GST_END_TEST;
+
 static Suite *
 appsrc_suite (void)
 {
@@ -1382,6 +1454,7 @@ appsrc_suite (void)
   tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
   tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
   tcase_add_test (tc_chain, test_appsrc_limits);
+  tcase_add_test (tc_chain, test_appsrc_send_custom_event);
 
   if (RUNNING_ON_VALGRIND)
     tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);