appsrc: retain the latest caps in queue when flushing
authorVikram Fugro <vikram.fugro@gmail.com>
Mon, 7 Sep 2015 20:05:19 +0000 (01:35 +0530)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 9 Sep 2015 09:27:40 +0000 (12:27 +0300)
- Retain the latest caps in the internal queue, when
  flushing.
- Add a unit test case for the same.

https://bugzilla.gnome.org/show_bug.cgi?id=754597

gst-libs/gst/app/gstappsrc.c
tests/check/elements/appsrc.c

index 61ad67c..b66caef 100644 (file)
@@ -562,18 +562,26 @@ gst_app_src_init (GstAppSrc * appsrc)
 }
 
 static void
-gst_app_src_flush_queued (GstAppSrc * src)
+gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
 {
   GstMiniObject *obj;
   GstAppSrcPrivate *priv = src->priv;
+  GstCaps *requeue_caps = NULL;
 
   while (!g_queue_is_empty (priv->queue)) {
     obj = g_queue_pop_head (priv->queue);
     if (obj) {
+      if (GST_IS_CAPS (obj) && retain_last_caps) {
+        gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
+      }
       gst_mini_object_unref (obj);
     }
   }
 
+  if (requeue_caps) {
+    g_queue_push_tail (priv->queue, requeue_caps);
+  }
+
   priv->queued_bytes = 0;
 }
 
@@ -599,7 +607,7 @@ gst_app_src_dispose (GObject * obj)
   priv->notify = NULL;
 
   GST_OBJECT_UNLOCK (appsrc);
-  gst_app_src_flush_queued (appsrc);
+  gst_app_src_flush_queued (appsrc, FALSE);
 
   G_OBJECT_CLASS (parent_class)->dispose (obj);
 }
@@ -762,7 +770,7 @@ gst_app_src_send_event (GstElement * element, GstEvent * event)
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_STOP:
-      gst_app_src_flush_queued (appsrc);
+      gst_app_src_flush_queued (appsrc, TRUE);
       break;
     default:
       break;
@@ -833,7 +841,7 @@ gst_app_src_stop (GstBaseSrc * bsrc)
   priv->is_eos = FALSE;
   priv->flushing = TRUE;
   priv->started = FALSE;
-  gst_app_src_flush_queued (appsrc);
+  gst_app_src_flush_queued (appsrc, TRUE);
   g_cond_broadcast (&priv->cond);
   g_mutex_unlock (&priv->mutex);
 
@@ -953,7 +961,7 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
 
   if (res) {
     GST_DEBUG_OBJECT (appsrc, "flushing queue");
-    gst_app_src_flush_queued (appsrc);
+    gst_app_src_flush_queued (appsrc, TRUE);
     priv->is_eos = FALSE;
   } else {
     GST_WARNING_OBJECT (appsrc, "seek failed");
index 595ac3f..f7f1a13 100644 (file)
@@ -334,6 +334,90 @@ GST_START_TEST (test_appsrc_set_caps_twice)
 
 GST_END_TEST;
 
+static gboolean
+seek_cb (GstAppSrc * src, guint64 offset, gpointer data)
+{
+  /* Return fake true */
+  return TRUE;
+}
+
+static void
+caps_cb (GObject * obj, GObject * child, GParamSpec * pspec,
+    GstCaps ** received_caps)
+{
+  GstCaps *caps = NULL;
+
+  /* Collect the caps */
+  g_object_get (child, "caps", &caps, NULL);
+  if (caps) {
+    GST_LOG_OBJECT (child, "caps set to  : %" GST_PTR_FORMAT, caps);
+    gst_caps_replace (received_caps, caps);
+    gst_caps_unref (caps);
+  }
+}
+
+GST_START_TEST (test_appsrc_caps_in_push_modes)
+{
+  GstElement *pipe, *src, *sink;
+  GstMessage *msg;
+  GstCaps *caps, *caps1, *received_caps;
+  gint i;
+  GstMessageType msg_types;
+  GstAppSrcCallbacks cb = { 0 };
+  GstAppStreamType modes[] = { GST_APP_STREAM_TYPE_STREAM,
+    GST_APP_STREAM_TYPE_SEEKABLE,
+    GST_APP_STREAM_TYPE_RANDOM_ACCESS
+  };
+
+  for (i = 0; i < sizeof (modes) / sizeof (modes[0]); i++) {
+    GST_INFO ("checking mode %d", modes[i]);
+    caps1 = gst_caps_new_simple ("foo/bar", "bleh", G_TYPE_INT, 2, NULL);
+    received_caps = NULL;
+
+    pipe = gst_pipeline_new ("pipeline");
+    src = gst_element_factory_make ("appsrc", NULL);
+    sink = gst_element_factory_make ("fakesink", NULL);
+    gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
+    gst_element_link (src, sink);
+
+    g_object_set (G_OBJECT (src), "stream-type", modes[i], NULL);
+    if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
+      cb.seek_data = seek_cb;
+      gst_app_src_set_callbacks (GST_APP_SRC (src), &cb, NULL, NULL);
+    }
+    g_signal_connect (pipe, "deep-notify::caps", G_CALLBACK (caps_cb),
+        &received_caps);
+
+    gst_app_src_set_caps (GST_APP_SRC (src), caps1);
+    caps = gst_app_src_get_caps (GST_APP_SRC (src));
+    fail_unless (gst_caps_is_equal (caps, caps1));
+    gst_caps_unref (caps);
+
+    gst_element_set_state (pipe, GST_STATE_PLAYING);
+
+    if (modes[i] != GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
+      gst_app_src_end_of_stream (GST_APP_SRC (src));
+      msg_types = GST_MESSAGE_EOS;
+    } else {
+      gst_app_src_push_buffer (GST_APP_SRC (src), gst_buffer_new ());
+      msg_types = GST_MESSAGE_ASYNC_DONE;
+    }
+
+    msg = gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1, msg_types);
+    gst_message_unref (msg);
+    /* The collected caps should match with one that was pushed */
+    fail_unless (received_caps && gst_caps_is_equal (received_caps, caps1));
+
+    gst_element_set_state (pipe, GST_STATE_NULL);
+    gst_object_unref (pipe);
+    gst_caps_unref (caps1);
+    if (received_caps)
+      gst_caps_unref (received_caps);
+  }
+}
+
+GST_END_TEST;
+
 static Suite *
 appsrc_suite (void)
 {
@@ -342,6 +426,7 @@ appsrc_suite (void)
 
   tcase_add_test (tc_chain, test_appsrc_non_null_caps);
   tcase_add_test (tc_chain, test_appsrc_set_caps_twice);
+  tcase_add_test (tc_chain, test_appsrc_caps_in_push_modes);
 
   if (RUNNING_ON_VALGRIND)
     tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);