ffmpegdemux: cache events from upstream and re-send them later
authorTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 28 Jul 2009 22:21:11 +0000 (23:21 +0100)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 28 Jul 2009 22:58:02 +0000 (23:58 +0100)
Cache any events we get from upstream before we're open, especially
tag events we may be getting from apedemux/id3demux or the like, and
push them downstream later when we've added our pads instead of just
dropping them silently. Fixes transcoding tags for Monkey's Audio
Files with preceding APE or ID3v2 tags (#586957). Add minimal unit
test for this.

Also push stream tags later after the global tags and the newsegment
event rather than right after creating the pad.

configure.ac
ext/ffmpeg/gstffmpegdemux.c
tests/Makefile.am
tests/check/.gitignore
tests/check/Makefile.am
tests/check/elements/ffdemux_ape.c [new file with mode: 0644]
tests/files/586957.ape [new file with mode: 0644]
tests/files/Makefile.am [new file with mode: 0644]

index 4f59b48..e5e2ef9 100644 (file)
@@ -407,6 +407,7 @@ docs/Makefile
 docs/version.entities
 tests/Makefile
 tests/check/Makefile
+tests/files/Makefile
 )
 AC_OUTPUT
 
index c5feb3f..6912de3 100644 (file)
@@ -56,6 +56,8 @@ struct _GstFFStream
   gboolean discont;
   gboolean eos;
   GstFlowReturn last_flow;
+
+  GstTagList *tags;             /* stream tags */
 };
 
 struct _GstFFMpegDemux
@@ -90,6 +92,9 @@ struct _GstFFMpegDemux
   /* cached seek in READY */
   GstEvent *seek_event;
 
+  /* cached upstream events */
+  GList *cached_events;
+
   /* push mode data */
   GstFFMpegPipe ffpipe;
   GstTask *task;
@@ -325,8 +330,11 @@ gst_ffmpegdemux_close (GstFFMpegDemux * demux)
     GstFFStream *stream;
 
     stream = demux->streams[n];
-    if (stream && stream->pad) {
-      gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
+    if (stream) {
+      if (stream->pad)
+        gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
+      if (stream->tags)
+        gst_tag_list_free (stream->tags);
       g_free (stream);
     }
     demux->streams[n] = NULL;
@@ -958,6 +966,7 @@ gst_ffmpegdemux_get_stream (GstFFMpegDemux * demux, AVStream * avstream)
   stream->avstream = avstream;
   stream->last_ts = GST_CLOCK_TIME_NONE;
   stream->last_flow = GST_FLOW_OK;
+  stream->tags = NULL;
 
   switch (ctx->codec_type) {
     case CODEC_TYPE_VIDEO:
@@ -1019,12 +1028,11 @@ gst_ffmpegdemux_get_stream (GstFFMpegDemux * demux, AVStream * avstream)
 
   /* metadata */
   if ((codec = gst_ffmpeg_get_codecid_longname (ctx->codec_id))) {
-    GstTagList *list = gst_tag_list_new ();
+    stream->tags = gst_tag_list_new ();
 
-    gst_tag_list_add (list, GST_TAG_MERGE_REPLACE,
+    gst_tag_list_add (stream->tags, GST_TAG_MERGE_REPLACE,
         (ctx->codec_type == CODEC_TYPE_VIDEO) ?
         GST_TAG_VIDEO_CODEC : GST_TAG_AUDIO_CODEC, codec, NULL);
-    gst_element_found_tags_for_pad (GST_ELEMENT (demux), pad, list);
   }
 
   return stream;
@@ -1124,9 +1132,10 @@ gst_ffmpegdemux_open (GstFFMpegDemux * demux)
   GstFFMpegDemuxClass *oclass =
       (GstFFMpegDemuxClass *) G_OBJECT_GET_CLASS (demux);
   gchar *location;
-  gint res, n_streams;
+  gint res, n_streams, i;
   GstTagList *tags;
   GstEvent *event;
+  GList *cached_events;
 
   /* to be sure... */
   gst_ffmpegdemux_close (demux);
@@ -1156,8 +1165,8 @@ gst_ffmpegdemux_open (GstFFMpegDemux * demux)
 
   /* open_input_file() automatically reads the header. We can now map each
    * created AVStream to a GstPad to make GStreamer handle it. */
-  for (res = 0; res < n_streams; res++) {
-    gst_ffmpegdemux_get_stream (demux, demux->context->streams[res]);
+  for (i = 0; i < n_streams; i++) {
+    gst_ffmpegdemux_get_stream (demux, demux->context->streams[i]);
   }
 
   gst_element_no_more_pads (GST_ELEMENT (demux));
@@ -1183,6 +1192,8 @@ gst_ffmpegdemux_open (GstFFMpegDemux * demux)
   demux->opened = TRUE;
   event = demux->seek_event;
   demux->seek_event = NULL;
+  cached_events = demux->cached_events;
+  demux->cached_events = NULL;
   GST_OBJECT_UNLOCK (demux);
 
   if (event) {
@@ -1195,13 +1206,35 @@ gst_ffmpegdemux_open (GstFFMpegDemux * demux)
             demux->segment.start, demux->segment.stop, demux->segment.time));
   }
 
-  /* grab the tags */
+  while (cached_events) {
+    event = cached_events->data;
+    GST_INFO_OBJECT (demux, "pushing cached %s event: %" GST_PTR_FORMAT,
+        GST_EVENT_TYPE_NAME (event), event->structure);
+    gst_ffmpegdemux_push_event (demux, event);
+    cached_events = g_list_delete_link (cached_events, cached_events);
+  }
+
+  /* grab the global tags */
   tags = gst_ffmpegdemux_read_tags (demux);
   if (tags) {
+    GST_INFO_OBJECT (demux, "global tags: %" GST_PTR_FORMAT, tags);
     gst_element_post_message (GST_ELEMENT (demux),
         gst_message_new_tag (GST_OBJECT (demux), tags));
   }
 
+  /* now handle the stream tags */
+  for (i = 0; i < n_streams; i++) {
+    GstFFStream *stream;
+
+    stream = gst_ffmpegdemux_get_stream (demux, demux->context->streams[i]);
+    if (stream->tags != NULL && stream->pad != NULL) {
+      GST_INFO_OBJECT (stream->pad, "stream tags: %" GST_PTR_FORMAT,
+          stream->tags);
+      gst_element_found_tags_for_pad (GST_ELEMENT (demux), stream->pad,
+          gst_tag_list_copy (stream->tags));
+    }
+  }
+
   return TRUE;
 
   /* ERRORS */
@@ -1517,7 +1550,8 @@ gst_ffmpegdemux_sink_event (GstPad * sinkpad, GstEvent * event)
   demux = (GstFFMpegDemux *) (GST_PAD_PARENT (sinkpad));
   ffpipe = &(demux->ffpipe);
 
-  GST_DEBUG_OBJECT (demux, "event %s", GST_EVENT_TYPE_NAME (event));
+  GST_LOG_OBJECT (demux, "%s event: %" GST_PTR_FORMAT,
+      GST_EVENT_TYPE_NAME (event), event->structure);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
@@ -1539,6 +1573,11 @@ gst_ffmpegdemux_sink_event (GstPad * sinkpad, GstEvent * event)
       /* forward event */
       gst_pad_event_default (sinkpad, event);
 
+      GST_OBJECT_LOCK (demux);
+      g_list_foreach (demux->cached_events, (GFunc) gst_mini_object_unref,
+          NULL);
+      g_list_free (demux->cached_events);
+      GST_OBJECT_UNLOCK (demux);
       GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
       gst_adapter_clear (ffpipe->adapter);
       ffpipe->srcresult = GST_FLOW_OK;
@@ -1565,11 +1604,19 @@ gst_ffmpegdemux_sink_event (GstPad * sinkpad, GstEvent * event)
        *
        * If the demuxer isn't opened, push straight away, since we'll
        * be waiting against a cond that will never be signalled. */
-      if (GST_EVENT_IS_SERIALIZED (event) && demux->opened) {
-        GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
-        while (!ffpipe->needed)
-          GST_FFMPEG_PIPE_WAIT (ffpipe);
-        GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
+      if (GST_EVENT_IS_SERIALIZED (event)) {
+        if (demux->opened) {
+          GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
+          while (!ffpipe->needed)
+            GST_FFMPEG_PIPE_WAIT (ffpipe);
+          GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
+        } else {
+          /* queue events and send them later (esp. tag events) */
+          GST_OBJECT_LOCK (demux);
+          demux->cached_events = g_list_append (demux->cached_events, event);
+          GST_OBJECT_UNLOCK (demux);
+          goto done;
+        }
       }
       break;
   }
@@ -1760,6 +1807,10 @@ gst_ffmpegdemux_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       gst_ffmpegdemux_close (demux);
       gst_adapter_clear (demux->ffpipe.adapter);
+      g_list_foreach (demux->cached_events, (GFunc) gst_mini_object_unref,
+          NULL);
+      g_list_free (demux->cached_events);
+      demux->cached_events = NULL;
       break;
     default:
       break;
index 48686c0..0ecdd8f 100644 (file)
@@ -4,7 +4,7 @@ else
 SUBDIRS_CHECK =
 endif
 
-SUBDIRS = $(SUBDIRS_CHECK)
+SUBDIRS = $(SUBDIRS_CHECK) files
 
-DIST_SUBDIRS = check
+DIST_SUBDIRS = check files
 
index 9af3130..79b7878 100644 (file)
@@ -1 +1,3 @@
 test-registry.*
+elements/ffdemux_ape
+.dirstamp
index e417c0c..aa302f7 100644 (file)
@@ -1,6 +1,7 @@
 include $(top_srcdir)/common/check.mak
 
 CHECK_REGISTRY = $(top_builddir)/tests/check/test-registry.xml
+TEST_FILES_DIRECTORY = $(top_srcdir)/tests/files
 
 REGISTRY_ENVIRONMENT = \
        GST_REGISTRY=$(CHECK_REGISTRY)
@@ -8,7 +9,7 @@ REGISTRY_ENVIRONMENT = \
 TESTS_ENVIRONMENT = \
        $(REGISTRY_ENVIRONMENT)                                 \
        GST_PLUGIN_SYSTEM_PATH=                                 \
-       GST_PLUGIN_PATH=$(top_builddir)/gst:$(top_builddir)/ext:$(GSTPB_PLUGINS_DIR):$(GST_PLUGINS_DIR)
+       GST_PLUGIN_PATH=$(top_builddir)/gst:$(top_builddir)/ext:$(top_builddir)/../gst-plugins-good/gst:$(GSTPB_PLUGINS_DIR):$(GST_PLUGINS_DIR)
 
 # ths core dumps of some machines have PIDs appended
 CLEANFILES = core.* test-registry.xml
@@ -17,18 +18,22 @@ clean-local: clean-local-check
 
 check_PROGRAMS = \
        generic/plugin-test \
-       generic/libavcodec-locking
+       generic/libavcodec-locking \
+       elements/ffdemux_ape
 
 VALGRIND_TO_FIX = \
        generic/plugin-test \
-       generic/libavcodec-locking
+       generic/libavcodec-locking \
+       elements/ffdemux_ape
 
 TESTS = $(check_PROGRAMS)
 
 # these tests don't even pass
 noinst_PROGRAMS =
 
-AM_CFLAGS = $(GST_OBJ_CFLAGS) $(GST_CHECK_CFLAGS) $(CHECK_CFLAGS)
+AM_CFLAGS = $(GST_OBJ_CFLAGS) $(GST_CHECK_CFLAGS) $(CHECK_CFLAGS) \
+       $(GST_OPTION_CFLAGS) -DGST_TEST_FILES_PATH="\"$(TEST_FILES_DIRECTORY)\""
+
 LDADD = $(GST_OBJ_LIBS) $(GST_CHECK_LIBS) $(CHECK_LIBS)
 
 # valgrind testing
diff --git a/tests/check/elements/ffdemux_ape.c b/tests/check/elements/ffdemux_ape.c
new file mode 100644 (file)
index 0000000..2e8f106
--- /dev/null
@@ -0,0 +1,195 @@
+/* GStreamer unit tests for ffdemux_ape
+ *
+ * Copyright (C) 2009 Tim-Philipp Müller  <tim centricular net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <gst/check/gstcheck.h>
+
+#include <gst/gst.h>
+
+typedef void (CheckTagsFunc) (const GstTagList * tags, const gchar * file);
+
+static void
+pad_added_cb (GstElement * decodebin, GstPad * pad, GstBin * pipeline)
+{
+  GstElement *sink;
+
+  sink = gst_bin_get_by_name (pipeline, "fakesink");
+  fail_unless (gst_element_link (decodebin, sink));
+  gst_object_unref (sink);
+
+  gst_element_set_state (sink, GST_STATE_PAUSED);
+}
+
+static GstBusSyncReply
+error_cb (GstBus * bus, GstMessage * msg, gpointer user_data)
+{
+  if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) {
+    const gchar *file = (const gchar *) user_data;
+    GError *err = NULL;
+    gchar *dbg = NULL;
+
+    gst_message_parse_error (msg, &err, &dbg);
+    g_error ("ERROR for %s: %s\n%s\n", file, err->message, dbg);
+  }
+
+  return GST_BUS_PASS;
+}
+
+static gboolean
+event_probe (GstPad * pad, GstEvent * event, GstTagList ** p_tags)
+{
+  if (GST_EVENT_TYPE (event) == GST_EVENT_TAG) {
+    GST_INFO ("tag event: %" GST_PTR_FORMAT, event->structure);
+    if (*p_tags == NULL) {
+      GST_INFO ("first tag, saving");
+      *p_tags = gst_tag_list_copy ((GstTagList *) event->structure);
+    }
+  }
+  return TRUE;                  /* keep the data */
+}
+
+/* FIXME: push_mode not used currently */
+static GstTagList *
+read_tags_from_file (const gchar * file, gboolean push_mode)
+{
+  GstStateChangeReturn state_ret;
+  GstTagList *tags = NULL;
+  GstElement *sink, *src, *dec, *pipeline;
+  GstBus *bus;
+  GstPad *pad;
+  gchar *path;
+
+  pipeline = gst_pipeline_new ("pipeline");
+  fail_unless (pipeline != NULL, "Failed to create pipeline!");
+
+  src = gst_element_factory_make ("filesrc", "filesrc");
+  fail_unless (src != NULL, "Failed to create filesrc!");
+
+  dec = gst_element_factory_make ("decodebin2", "decodebin2");
+  fail_unless (dec != NULL, "Failed to create decodebin2!");
+
+  sink = gst_element_factory_make ("fakesink", "fakesink");
+  fail_unless (sink != NULL, "Failed to create fakesink!");
+
+  bus = gst_element_get_bus (pipeline);
+
+  /* kids, don't use a sync handler for this at home, really; we do because
+   * we just want to abort and nothing else */
+  gst_bus_set_sync_handler (bus, error_cb, (gpointer) file);
+
+  gst_bin_add_many (GST_BIN (pipeline), src, dec, sink, NULL);
+  gst_element_link_many (src, dec, NULL);
+
+  path = g_build_filename (GST_TEST_FILES_PATH, file, NULL);
+  GST_LOG ("reading file '%s'", path);
+  g_object_set (src, "location", path, NULL);
+
+  /* can't link uridecodebin and sink yet, do that later */
+  g_signal_connect (dec, "pad-added", G_CALLBACK (pad_added_cb), pipeline);
+
+  /* we want to make sure there's a tag event coming out of ffdemux_ape
+   * (ie. the one apedemux generated) */
+  pad = gst_element_get_static_pad (sink, "sink");
+  gst_pad_add_event_probe (pad, G_CALLBACK (event_probe), &tags);
+  gst_object_unref (pad);
+
+  state_ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
+  fail_unless (state_ret != GST_STATE_CHANGE_FAILURE);
+
+  if (state_ret == GST_STATE_CHANGE_ASYNC) {
+    GST_LOG ("waiting for pipeline to reach PAUSED state");
+    state_ret = gst_element_get_state (pipeline, NULL, NULL, -1);
+    fail_unless_equals_int (state_ret, GST_STATE_CHANGE_SUCCESS);
+  }
+
+  GST_LOG ("PAUSED, let's retrieve our tags");
+
+  fail_unless (tags != NULL, "Expected tag event! (%s)", file);
+
+  gst_object_unref (bus);
+
+  fail_unless_equals_int (gst_element_set_state (pipeline, GST_STATE_NULL),
+      GST_STATE_CHANGE_SUCCESS);
+  gst_object_unref (pipeline);
+
+  g_free (path);
+
+  GST_INFO ("%s: tags = %" GST_PTR_FORMAT, file, tags);
+  return tags;
+}
+
+static void
+run_check_for_file (const gchar * filename, CheckTagsFunc * check_func)
+{
+  GstTagList *tags;
+
+  /* first, pull-based */
+  tags = read_tags_from_file (filename, FALSE);
+  fail_unless (tags != NULL, "Failed to extract tags from '%s'", filename);
+  check_func (tags, filename);
+  gst_tag_list_free (tags);
+}
+
+#define tag_list_has_tag(taglist,tag) \
+    (gst_tag_list_get_value_index((taglist),(tag),0) != NULL)
+
+/* just make sure ffdemux_ape forwarded the tags extracted by apedemux
+ * (should be the first tag list / tag event too) */
+static void
+check_for_apedemux_tags (const GstTagList * tags, const gchar * file)
+{
+  gchar *artist = NULL;
+
+  fail_unless (gst_tag_list_get_string (tags, GST_TAG_ARTIST, &artist));
+  fail_unless (artist != NULL);
+  fail_unless_equals_string (artist, "Marvin Gaye");
+  g_free (artist);
+
+  fail_unless (tag_list_has_tag (tags, GST_TAG_CONTAINER_FORMAT));
+
+  GST_LOG ("all good");
+}
+
+GST_START_TEST (test_tag_caching)
+{
+  if (!gst_default_registry_check_feature_version ("apedemux", 0, 10, 0) ||
+      !gst_default_registry_check_feature_version ("decodebin2", 0, 10, 0)) {
+    g_printerr ("Skipping test_tag_caching: required element apedemux or "
+        "decodebin2 element not found\n");
+    return;
+  }
+
+  run_check_for_file ("586957.ape", check_for_apedemux_tags);
+}
+
+GST_END_TEST;
+
+static Suite *
+ffdemux_ape_suite (void)
+{
+  Suite *s = suite_create ("ffdemux_ape");
+  TCase *tc_chain = tcase_create ("general");
+
+  suite_add_tcase (s, tc_chain);
+  tcase_add_test (tc_chain, test_tag_caching);
+
+  return s;
+}
+
+GST_CHECK_MAIN (ffdemux_ape)
diff --git a/tests/files/586957.ape b/tests/files/586957.ape
new file mode 100644 (file)
index 0000000..3e891c8
Binary files /dev/null and b/tests/files/586957.ape differ
diff --git a/tests/files/Makefile.am b/tests/files/Makefile.am
new file mode 100644 (file)
index 0000000..141fd67
--- /dev/null
@@ -0,0 +1,2 @@
+EXTRA_DIST = \
+       586957.ape