streamiddemux: Add streamiddemux element
authorHoonHee Lee <hoonhee.lee@lge.com>
Tue, 4 Mar 2014 10:40:05 +0000 (19:40 +0900)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 12 Mar 2015 14:42:18 +0000 (14:42 +0000)
Demultiplex a stream to multiple source pads based on the stream ids from the
stream-start events. This basically reverses the behaviour of funnel.

https://bugzilla.gnome.org/show_bug.cgi?id=707605

configure.ac
plugins/elements/Makefile.am
plugins/elements/gstelements.c
plugins/elements/gststreamiddemux.c [new file with mode: 0644]
plugins/elements/gststreamiddemux.h [new file with mode: 0644]
tests/check/Makefile.am
tests/check/elements/streamiddemux.c [new file with mode: 0644]
tests/examples/Makefile.am
tests/examples/streamiddemux/Makefile.am [new file with mode: 0644]
tests/examples/streamiddemux/streamiddemux-stream.c [new file with mode: 0644]

index e637239..9e45826 100644 (file)
@@ -839,6 +839,7 @@ tests/examples/memory/Makefile
 tests/examples/metadata/Makefile
 tests/examples/netclock/Makefile
 tests/examples/queue/Makefile
+tests/examples/streamiddemux/Makefile
 tests/examples/streams/Makefile
 tests/examples/typefind/Makefile
 tools/Makefile
index 6d3857c..ee3c9fb 100644 (file)
@@ -24,6 +24,7 @@ libgstcoreelements_la_SOURCES =       \
        gstsparsefile.c         \
        gsttee.c                \
        gsttypefindelement.c    \
+       gststreamiddemux.c      \
        gstvalve.c
 
 libgstcoreelements_la_CFLAGS = $(GST_OBJ_CFLAGS)
@@ -54,6 +55,7 @@ noinst_HEADERS =              \
        gstsparsefile.h         \
        gsttee.h                \
        gsttypefindelement.h    \
+       gststreamiddemux.h      \
        gstvalve.h
 
 EXTRA_DIST = gstfdsrc.c \
index 2b1781d..5c07855 100644 (file)
@@ -46,6 +46,7 @@
 #include "gsttee.h"
 #include "gsttypefindelement.h"
 #include "gstvalve.h"
+#include "gststreamiddemux.h"
 
 static gboolean
 plugin_init (GstPlugin * plugin)
@@ -109,6 +110,10 @@ plugin_init (GstPlugin * plugin)
           gst_valve_get_type ()))
     return FALSE;
 
+  if (!gst_element_register (plugin, "streamiddemux", GST_RANK_PRIMARY,
+          gst_streamid_demux_get_type ()))
+    return FALSE;
+
   return TRUE;
 }
 
diff --git a/plugins/elements/gststreamiddemux.c b/plugins/elements/gststreamiddemux.c
new file mode 100644 (file)
index 0000000..8528acb
--- /dev/null
@@ -0,0 +1,401 @@
+/* GStreamer streamiddemux element
+ *
+ * Copyright 2013 LGE Corporation.
+ *  @author: Hoonhee Lee <hoonhee.lee@lge.com>
+ *  @author: Jeongseok Kim <jeongseok.kim@lge.com>
+ *  @author: Wonchul Lee <wonchul86.lee@lge.com>
+ *
+ * gststreamiddemux.c: Simple stream-id-demultiplexer element
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+/**
+ * SECTION:element-streamid-demux
+ * @see_also: #GstFunnel
+ *
+ * Direct input stream to one out of N output pads by stream-id.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+
+#include "gststreamiddemux.h"
+
+GST_DEBUG_CATEGORY_STATIC (streamid_demux_debug);
+#define GST_CAT_DEFAULT streamid_demux_debug
+
+enum
+{
+  PROP_0,
+  PROP_ACTIVE_PAD,
+  PROP_LAST
+};
+
+static GstStaticPadTemplate gst_streamid_demux_sink_factory =
+GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
+static GstStaticPadTemplate gst_streamid_demux_src_factory =
+GST_STATIC_PAD_TEMPLATE ("src_%u",
+    GST_PAD_SRC,
+    GST_PAD_SOMETIMES,
+    GST_STATIC_CAPS_ANY);
+
+#define _do_init \
+GST_DEBUG_CATEGORY_INIT (streamid_demux_debug, \
+        "streamiddemux", 0, "Streamid demuxer");
+#define gst_streamid_demux_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstStreamidDemux, gst_streamid_demux,
+    GST_TYPE_ELEMENT, _do_init);
+
+static void gst_streamid_demux_dispose (GObject * object);
+static void gst_streamid_demux_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+static GstFlowReturn gst_streamid_demux_chain (GstPad * pad,
+    GstObject * parent, GstBuffer * buf);
+static gboolean gst_streamid_demux_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static GstStateChangeReturn gst_streamid_demux_change_state (GstElement *
+    element, GstStateChange transition);
+static GstPad *gst_streamid_demux_get_srcpad_by_stream_id (GstStreamidDemux *
+    demux, const gchar * stream_id);
+static gboolean gst_streamid_demux_srcpad_create (GstStreamidDemux * demux,
+    GstPad * pad, const gchar * stream_id);
+static void gst_streamid_demux_reset (GstStreamidDemux * demux);
+static void gst_streamid_demux_release_srcpad (const GValue * item,
+    GstStreamidDemux * demux);
+
+static void
+gst_streamid_demux_class_init (GstStreamidDemuxClass * klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+
+  gobject_class->get_property = gst_streamid_demux_get_property;
+  gobject_class->dispose = gst_streamid_demux_dispose;
+
+  g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
+      g_param_spec_object ("active-pad", "Active pad",
+          "The currently active src pad", GST_TYPE_PAD,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+  gst_element_class_set_static_metadata (gstelement_class, "Streamid Demux",
+      "Generic", "1-to-N output stream by stream-id",
+      "HoonHee Lee <hoonhee.lee@lge.com>");
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&gst_streamid_demux_sink_factory));
+
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&gst_streamid_demux_src_factory));
+
+  gstelement_class->change_state = gst_streamid_demux_change_state;
+}
+
+static void
+gst_streamid_demux_init (GstStreamidDemux * demux)
+{
+  demux->sinkpad =
+      gst_pad_new_from_static_template (&gst_streamid_demux_sink_factory,
+      "sink");
+  gst_pad_set_chain_function (demux->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_streamid_demux_chain));
+  gst_pad_set_event_function (demux->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_streamid_demux_event));
+
+  gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
+
+  /* srcpad management */
+  demux->active_srcpad = NULL;
+  demux->nb_srcpads = 0;
+
+  /* initialize hash table for srcpad */
+  demux->stream_id_pairs =
+      g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify) g_free,
+      (GDestroyNotify) gst_object_unref);
+}
+
+static void
+gst_streamid_demux_dispose (GObject * object)
+{
+  GstStreamidDemux *demux = GST_STREAMID_DEMUX (object);
+
+  gst_streamid_demux_reset (demux);
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static void
+gst_streamid_demux_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstStreamidDemux *demux = GST_STREAMID_DEMUX (object);
+
+  switch (prop_id) {
+    case PROP_ACTIVE_PAD:
+      GST_OBJECT_LOCK (demux);
+      g_value_set_object (value, demux->active_srcpad);
+      GST_OBJECT_UNLOCK (demux);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static gboolean
+forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+  GstPad *srcpad = GST_PAD_CAST (user_data);
+
+  gst_pad_push_event (srcpad, gst_event_ref (*event));
+
+  return TRUE;
+}
+
+static gboolean
+gst_streamid_demux_srcpad_create (GstStreamidDemux * demux, GstPad * pad,
+    const gchar * stream_id)
+{
+  gchar *padname = NULL;
+  GstPad *srcpad = NULL;
+  GstPadTemplate *pad_tmpl = NULL;
+
+  padname = g_strdup_printf ("src_%u", demux->nb_srcpads++);
+  pad_tmpl = gst_static_pad_template_get (&gst_streamid_demux_src_factory);
+
+  GST_LOG_OBJECT (demux, "generating a srcpad:%s", padname);
+  srcpad = gst_pad_new_from_template (pad_tmpl, padname);
+  gst_object_unref (pad_tmpl);
+  g_free (padname);
+  g_return_val_if_fail (srcpad != NULL, FALSE);
+
+  demux->active_srcpad = srcpad;
+  g_hash_table_insert (demux->stream_id_pairs, g_strdup (stream_id),
+      gst_object_ref (srcpad));
+
+  return TRUE;
+}
+
+static GstFlowReturn
+gst_streamid_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+  GstFlowReturn res = GST_FLOW_OK;
+  GstStreamidDemux *demux = NULL;
+  GstPad *srcpad = NULL;
+
+  demux = GST_STREAMID_DEMUX (parent);
+
+  GST_LOG_OBJECT (demux, "pushing buffer to %" GST_PTR_FORMAT,
+      demux->active_srcpad);
+
+  GST_OBJECT_LOCK (demux);
+  if (demux->active_srcpad) {
+    srcpad = gst_object_ref (demux->active_srcpad);
+    GST_OBJECT_UNLOCK (demux);
+    res = gst_pad_push (srcpad, buf);
+    gst_object_unref (srcpad);
+  } else {
+    GST_OBJECT_UNLOCK (demux);
+    goto no_active_srcpad;
+  }
+
+  GST_LOG_OBJECT (demux, "handled buffer %s", gst_flow_get_name (res));
+  return res;
+
+/* ERROR */
+no_active_srcpad:
+  {
+    GST_WARNING_OBJECT (demux, "srcpad is not initialized");
+    return GST_FLOW_NOT_NEGOTIATED;
+  }
+}
+
+static GstPad *
+gst_streamid_demux_get_srcpad_by_stream_id (GstStreamidDemux * demux,
+    const gchar * stream_id)
+{
+  GstPad *srcpad = NULL;
+
+  GST_DEBUG_OBJECT (demux, "stream_id = %s", stream_id);
+  if (demux->stream_id_pairs == NULL || stream_id == NULL) {
+    goto done;
+  }
+
+  srcpad = g_hash_table_lookup (demux->stream_id_pairs, stream_id);
+
+  if (srcpad) {
+    GST_DEBUG_OBJECT (demux, "srcpad = %s:%s matched",
+        GST_DEBUG_PAD_NAME (srcpad));
+  }
+
+done:
+  return srcpad;
+}
+
+static gboolean
+gst_streamid_demux_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  gboolean res = TRUE;
+  GstStreamidDemux *demux;
+  const gchar *stream_id = NULL;
+  GstPad *active_srcpad = NULL;
+
+  demux = GST_STREAMID_DEMUX (parent);
+
+  GST_DEBUG_OBJECT (demux, "event = %s, sticky = %d",
+      GST_EVENT_TYPE_NAME (event), GST_EVENT_IS_STICKY (event));
+
+  if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
+    gst_event_parse_stream_start (event, &stream_id);
+    if (!stream_id)
+      goto no_stream_id;
+
+    GST_OBJECT_LOCK (demux);
+    active_srcpad =
+        gst_streamid_demux_get_srcpad_by_stream_id (demux, stream_id);
+    if (!active_srcpad) {
+      /* try to generate a srcpad */
+      if (gst_streamid_demux_srcpad_create (demux, pad, stream_id)) {
+        GST_OBJECT_UNLOCK (demux);
+
+        gst_pad_set_active (demux->active_srcpad, TRUE);
+        /* Forward sticky events to the new srcpad */
+        gst_pad_sticky_events_foreach (demux->sinkpad, forward_sticky_events,
+            demux->active_srcpad);
+        gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->active_srcpad);
+      } else {
+        GST_OBJECT_UNLOCK (demux);
+        goto fail_create_srcpad;
+      }
+    } else if (demux->active_srcpad != active_srcpad) {
+      demux->active_srcpad = active_srcpad;
+      GST_OBJECT_UNLOCK (demux);
+
+      g_object_notify (G_OBJECT (demux), "active-pad");
+    } else
+      GST_OBJECT_UNLOCK (demux);
+  }
+
+  if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START
+      || GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP
+      || GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+    res = gst_pad_event_default (pad, parent, event);
+  } else if (demux->active_srcpad) {
+    GstPad *srcpad = NULL;
+    GST_OBJECT_LOCK (demux);
+    srcpad = gst_object_ref (demux->active_srcpad);
+    GST_OBJECT_UNLOCK (demux);
+    res = gst_pad_push_event (srcpad, event);
+    gst_object_unref (srcpad);
+  } else {
+    gst_event_unref (event);
+  }
+  return res;
+
+  /* ERRORS */
+no_stream_id:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
+        ("Error occurred trying to get stream-id to create a srcpad"),
+        ("no stream-id found at %s", GST_EVENT_TYPE_NAME (event)));
+
+    gst_event_unref (event);
+    return FALSE;
+  }
+
+fail_create_srcpad:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, FAILED,
+        ("Error occurred trying to create a srcpad"),
+        ("Failed to create a srcpad via stream-id:%s", stream_id));
+    gst_event_unref (event);
+    return FALSE;
+  }
+}
+
+static void
+gst_streamid_demux_release_srcpad (const GValue * item,
+    GstStreamidDemux * demux)
+{
+  GstPad *pad = g_value_get_object (item);
+
+  if (pad != NULL) {
+    gst_pad_set_active (pad, FALSE);
+    gst_element_remove_pad (GST_ELEMENT_CAST (demux), pad);
+  }
+}
+
+static void
+gst_streamid_demux_reset (GstStreamidDemux * demux)
+{
+  GstIterator *it = NULL;
+  GstIteratorResult itret = GST_ITERATOR_OK;
+
+  GST_OBJECT_LOCK (demux);
+  if (demux->active_srcpad != NULL)
+    demux->active_srcpad = NULL;
+
+  GST_OBJECT_UNLOCK (demux);
+
+  if (demux->stream_id_pairs != NULL) {
+    g_hash_table_unref (demux->stream_id_pairs);
+    demux->stream_id_pairs = NULL;
+  }
+
+  it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
+  while (itret == GST_ITERATOR_OK || itret == GST_ITERATOR_RESYNC) {
+    itret =
+        gst_iterator_foreach (it,
+        (GstIteratorForeachFunction) gst_streamid_demux_release_srcpad, demux);
+    if (itret == GST_ITERATOR_RESYNC)
+      gst_iterator_resync (it);
+  }
+  gst_iterator_free (it);
+}
+
+static GstStateChangeReturn
+gst_streamid_demux_change_state (GstElement * element,
+    GstStateChange transition)
+{
+  GstStreamidDemux *demux;
+  GstStateChangeReturn result;
+
+  demux = GST_STREAMID_DEMUX (element);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      break;
+    default:
+      break;
+  }
+
+  result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      gst_streamid_demux_reset (demux);
+      break;
+    default:
+      break;
+  }
+
+  return result;
+}
diff --git a/plugins/elements/gststreamiddemux.h b/plugins/elements/gststreamiddemux.h
new file mode 100644 (file)
index 0000000..ec5383b
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * GStreamer streamiddemux eleement
+ *
+ * Copyright 2013 LGE Corporation.
+ *  @author: Hoonhee Lee <hoonhee.lee@lge.com>
+ *  @author: Jeongseok Kim <jeongseok.kim@lge.com>
+ *  @author: Wonchul Lee <wonchul86.lee@lge.com>
+ *
+ * gststreamiddemux.h: Simple stream-id-demultiplexer element
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+#ifndef __GST_STREAMID_DEMUX_H__
+#define __GST_STREAMID_DEMUX_H__
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+#define GST_TYPE_STREAMID_DEMUX \
+  (gst_streamid_demux_get_type())
+#define GST_STREAMID_DEMUX(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMID_DEMUX, GstStreamidDemux))
+#define GST_STREAMID_DEMUX_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMID_DEMUX, GstStreamidDemuxClass))
+#define GST_IS_STREAMID_DEMUX(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMID_DEMUX))
+#define GST_IS_STREAMID_DEMUX_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMID_DEMUX))
+typedef struct _GstStreamidDemux GstStreamidDemux;
+typedef struct _GstStreamidDemuxClass GstStreamidDemuxClass;
+
+struct _GstStreamidDemux
+{
+  GstElement element;
+
+  GstPad *sinkpad;
+
+  guint nb_srcpads;
+  GstPad *active_srcpad;
+
+  /* This table contains srcpad and stream-id */
+  GHashTable *stream_id_pairs;
+};
+
+struct _GstStreamidDemuxClass
+{
+  GstElementClass parent_class;
+};
+
+G_GNUC_INTERNAL GType gst_streamid_demux_get_type (void);
+
+G_END_DECLS
+#endif /* __GST_STREAMID_DEMUX_H__ */
index 36d0ebc..5e7e5ab 100644 (file)
@@ -95,6 +95,7 @@ REGISTRY_CHECKS =                             \
        elements/queue                          \
        elements/queue2                         \
        elements/valve                          \
+       elements/streamiddemux                  \
        libs/baseparse                          \
        libs/basesrc                            \
        libs/basesink                           \
diff --git a/tests/check/elements/streamiddemux.c b/tests/check/elements/streamiddemux.c
new file mode 100644 (file)
index 0000000..8c10bce
--- /dev/null
@@ -0,0 +1,514 @@
+/* GStreamer unit tests for the streamiddemux
+ *
+ * Copyright 2013 LGE Corporation.
+ *  @author: Hoonhee Lee <hoonhee.lee@lge.com>
+ *  @author: Jeongseok Kim <jeongseok.kim@lge.com>
+ *  @author: Wonchul Lee <wonchul86.lee@lge.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+*/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gst/check/gstcheck.h>
+#include <stdlib.h>
+
+#define NUM_SUBSTREAMS 100
+#define NUM_BUFFER 1000
+
+static GstPad *active_srcpad;
+
+struct TestData
+{
+  GstElement *demux;
+  GstPad *mysrc, *mysink[NUM_SUBSTREAMS];
+  GstPad *demuxsink, *demuxsrc[NUM_SUBSTREAMS];
+  gint srcpad_cnt;
+  GstCaps *mycaps;
+  GstCaps *caps[NUM_SUBSTREAMS];
+  GstSegment segment[NUM_SUBSTREAMS];
+  gchar *stream_ids[NUM_SUBSTREAMS];
+};
+
+static void
+set_active_srcpad (struct TestData *td)
+{
+  if (active_srcpad)
+    gst_object_unref (active_srcpad);
+
+  g_object_get (td->demux, "active-pad", &active_srcpad, NULL);
+}
+
+static void
+release_test_objects (struct TestData *td)
+{
+  fail_unless (gst_element_set_state (td->demux, GST_STATE_NULL) ==
+      GST_STATE_CHANGE_SUCCESS);
+
+  gst_object_unref (td->demuxsink);
+
+  gst_caps_unref (td->mycaps);
+
+  if (active_srcpad)
+    gst_object_unref (active_srcpad);
+
+  gst_object_unref (td->demux);
+}
+
+static void
+src_pad_added_cb (GstElement * demux, GstPad * pad, struct TestData *td)
+{
+  if (td->srcpad_cnt < NUM_SUBSTREAMS) {
+    td->demuxsrc[td->srcpad_cnt] = pad;
+    fail_unless (gst_pad_link (pad,
+            td->mysink[td->srcpad_cnt++]) == GST_PAD_LINK_OK);
+  }
+}
+
+static void
+setup_test_objects (struct TestData *td)
+{
+  td->mycaps = gst_caps_new_empty_simple ("test/test");
+  td->srcpad_cnt = 0;
+
+  td->demux = gst_element_factory_make ("streamiddemux", NULL);
+  fail_unless (td->demux != NULL);
+  g_signal_connect (td->demux, "pad-added", G_CALLBACK (src_pad_added_cb), td);
+  td->demuxsink = gst_element_get_static_pad (td->demux, "sink");
+  fail_unless (td->demuxsink != NULL);
+
+  fail_unless (gst_element_set_state (td->demux, GST_STATE_PLAYING) ==
+      GST_STATE_CHANGE_SUCCESS);
+}
+
+static GstFlowReturn
+chain_ok (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+  GstPad *peer_pad = NULL;
+  gchar *pad_stream_id, *active_srcpad_stream_id;
+
+  peer_pad = gst_pad_get_peer (active_srcpad);
+  pad_stream_id = gst_pad_get_stream_id (pad);
+  active_srcpad_stream_id = gst_pad_get_stream_id (active_srcpad);
+  fail_unless (pad == peer_pad);
+  fail_unless (g_strcmp0 (pad_stream_id, active_srcpad_stream_id) == 0);
+
+  g_free (pad_stream_id);
+  g_free (active_srcpad_stream_id);
+  gst_object_unref (peer_pad);
+  gst_buffer_unref (buffer);
+
+  return GST_FLOW_OK;
+}
+
+GST_START_TEST (test_simple_create_destroy)
+{
+  GstElement *demux;
+
+  demux = gst_element_factory_make ("streamiddemux", NULL);
+  gst_object_unref (demux);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_streamiddemux_with_stream_start)
+{
+  struct TestData td;
+
+  setup_test_objects (&td);
+
+  GST_DEBUG ("Creating mysink");
+  td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+  gst_pad_set_active (td.mysink[0], TRUE);
+
+  GST_DEBUG ("Creating mysrc");
+  td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+  fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+  gst_pad_set_active (td.mysrc, TRUE);
+
+  GST_DEBUG ("Pushing stream-start event");
+  fail_unless (gst_pad_push_event (td.mysrc,
+          gst_event_new_stream_start ("test0")));
+
+  g_object_get (td.demux, "active-pad", &active_srcpad, NULL);
+  fail_unless (active_srcpad != NULL, "Failed to generate a srcpad");
+  fail_unless (td.srcpad_cnt == 1, "pad-added signal has not emmited");
+
+  GST_DEBUG ("Releasing mysink and mysrc");
+  gst_pad_set_active (td.mysink[0], FALSE);
+  gst_pad_set_active (td.mysrc, FALSE);
+
+  gst_object_unref (td.mysink[0]);
+  gst_object_unref (td.mysrc);
+
+  GST_DEBUG ("Releasing streamiddemux");
+  release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_streamiddemux_without_stream_start)
+{
+  struct TestData td;
+  GstSegment segment;
+
+  setup_test_objects (&td);
+
+  GST_DEBUG ("Creating mysink");
+  td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+  gst_pad_set_active (td.mysink[0], TRUE);
+
+  GST_DEBUG ("Creating mysrc");
+  td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+  fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+  gst_pad_set_active (td.mysrc, TRUE);
+
+  GST_DEBUG ("Pushing caps and segment event without stream-start");
+  fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_caps (td.mycaps)));
+  gst_segment_init (&segment, GST_FORMAT_BYTES);
+  fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_segment (&segment)));
+
+  g_object_get (td.demux, "active-pad", &active_srcpad, NULL);
+  fail_unless (active_srcpad == NULL, "srcpad has created unexpectedly");
+  fail_unless (td.srcpad_cnt == 0, "pad-added signal is emmited unexpectedly");
+
+  GST_DEBUG ("Releasing mysink and mysrc");
+  gst_pad_set_active (td.mysink[0], FALSE);
+  gst_pad_set_active (td.mysrc, FALSE);
+
+  gst_object_unref (td.mysink[0]);
+  gst_object_unref (td.mysrc);
+
+  GST_DEBUG ("Releasing streamiddemux");
+  release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_streamiddemux_simple)
+{
+  struct TestData td;
+
+  setup_test_objects (&td);
+
+  GST_DEBUG ("Creating mysink");
+  td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+  td.mysink[0]->chaindata = &td;
+  gst_pad_set_chain_function (td.mysink[0], chain_ok);
+  gst_pad_set_active (td.mysink[0], TRUE);
+
+  td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK);
+  td.mysink[1]->chaindata = &td;
+  gst_pad_set_chain_function (td.mysink[1], chain_ok);
+  gst_pad_set_active (td.mysink[1], TRUE);
+
+  GST_DEBUG ("Creating mysrc");
+  td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+  fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+  gst_pad_set_active (td.mysrc, TRUE);
+
+  GST_DEBUG ("Pushing stream-start, caps and segment event");
+  gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+      GST_FORMAT_BYTES, "test0");
+  set_active_srcpad (&td);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+  gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+      GST_FORMAT_BYTES, "test1");
+  set_active_srcpad (&td);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+  GST_DEBUG ("Pushing buffer");
+  fail_unless (gst_pad_push_event (td.mysrc,
+          gst_event_new_stream_start ("test0")));
+  set_active_srcpad (&td);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+  fail_unless (gst_pad_push_event (td.mysrc,
+          gst_event_new_stream_start ("test1")));
+  set_active_srcpad (&td);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+  GST_DEBUG ("Releasing mysink and mysrc");
+  gst_pad_set_active (td.mysink[0], FALSE);
+  gst_pad_set_active (td.mysink[1], FALSE);
+  gst_pad_set_active (td.mysrc, FALSE);
+
+  gst_object_unref (td.mysink[0]);
+  gst_object_unref (td.mysink[1]);
+  gst_object_unref (td.mysrc);
+
+  GST_DEBUG ("Releasing streamiddemux");
+  release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+GList *expected[NUM_SUBSTREAMS];
+
+static gboolean
+sink_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  GList **expected = GST_PAD_ELEMENT_PRIVATE (pad);
+  GstEvent *exp;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CAPS:{
+      GstCaps *recvcaps, *expectcaps;
+
+      *expected = g_list_first (*expected);
+      exp = GST_EVENT ((*expected)->data);
+
+      gst_event_parse_caps (event, &recvcaps);
+      gst_event_parse_caps (exp, &expectcaps);
+
+      fail_unless (gst_caps_is_equal (recvcaps, expectcaps));
+      break;
+    }
+    case GST_EVENT_SEGMENT:{
+      const GstSegment *recvseg, *expectseg;
+
+      *expected = g_list_last (*expected);
+      exp = GST_EVENT ((*expected)->data);
+
+      gst_event_parse_segment (event, &recvseg);
+      gst_event_parse_segment (exp, &expectseg);
+
+      fail_unless_equals_uint64 (recvseg->position, expectseg->position);
+      break;
+    }
+    default:
+      break;
+  }
+
+  return gst_pad_event_default (pad, parent, event);
+}
+
+GST_START_TEST (test_streamiddemux_num_buffers)
+{
+  struct TestData td;
+  gint buffer_cnt = 0;
+  gint stream_cnt = 0;
+  GstEvent *event;
+
+  setup_test_objects (&td);
+
+  GST_DEBUG ("Creating mysink");
+  for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+    gchar *name;
+    name = g_strdup_printf ("mysink%d", stream_cnt);
+    td.mysink[stream_cnt] = gst_pad_new (name, GST_PAD_SINK);
+    g_free (name);
+    gst_pad_set_chain_function (td.mysink[stream_cnt], chain_ok);
+    gst_pad_set_event_function (td.mysink[stream_cnt], sink_event_func);
+    gst_pad_set_active (td.mysink[stream_cnt], TRUE);
+    GST_PAD_ELEMENT_PRIVATE (td.mysink[stream_cnt]) = &expected[stream_cnt];
+  }
+
+  GST_DEBUG ("Creating mysrc");
+  td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+  fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+  gst_pad_set_active (td.mysrc, TRUE);
+
+  GST_DEBUG ("Creating caps");
+  for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+    gchar *caps_name;
+    caps_name = g_strdup_printf ("test/test%d", stream_cnt);
+    td.caps[stream_cnt] = gst_caps_new_empty_simple (caps_name);
+
+    g_free (caps_name);
+  }
+
+  GST_DEBUG ("Creating segment");
+  for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+    gst_segment_init (&td.segment[stream_cnt], GST_FORMAT_BYTES);
+    td.segment[stream_cnt].position = stream_cnt * GST_SECOND;
+  }
+
+  GST_DEBUG ("Pushing stream-start, caps and segment event");
+  for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+    gchar *name;
+    name = g_strdup_printf ("test%d", stream_cnt);
+
+    fail_unless (gst_pad_push_event (td.mysrc,
+            gst_event_new_stream_start (name)));
+
+    event = gst_event_new_caps (td.caps[stream_cnt]);
+    expected[stream_cnt] =
+        g_list_append (expected[stream_cnt], gst_event_ref (event));
+    fail_unless (gst_pad_push_event (td.mysrc, event));
+
+    event = gst_event_new_segment (&td.segment[stream_cnt]);
+    expected[stream_cnt] =
+        g_list_append (expected[stream_cnt], gst_event_ref (event));
+    fail_unless (gst_pad_push_event (td.mysrc, event));
+
+    g_free (name);
+    set_active_srcpad (&td);
+
+    fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+  }
+
+  GST_DEBUG ("Pushing buffers to random srcpad");
+  for (buffer_cnt = 0; buffer_cnt < NUM_BUFFER; ++buffer_cnt) {
+    gchar *name;
+    gint active_stream = rand () % NUM_SUBSTREAMS;
+    name = g_strdup_printf ("test%d", active_stream);
+
+    fail_unless (gst_pad_push_event (td.mysrc,
+            gst_event_new_stream_start (name)));
+    fail_unless (gst_pad_push_event (td.mysrc,
+            gst_event_new_caps (td.caps[active_stream])));
+    fail_unless (gst_pad_push_event (td.mysrc,
+            gst_event_new_segment (&td.segment[active_stream])));
+
+    g_free (name);
+    set_active_srcpad (&td);
+
+    fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+  }
+
+  for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt)
+    gst_caps_unref (td.caps[stream_cnt]);
+
+  GST_DEBUG ("Releasing mysink and mysrc");
+  for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+    gst_pad_set_active (td.mysink[stream_cnt], FALSE);
+  }
+  gst_pad_set_active (td.mysrc, FALSE);
+
+  for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) {
+    gst_object_unref (td.mysink[stream_cnt]);
+  }
+  gst_object_unref (td.mysrc);
+
+  GST_DEBUG ("Releasing streamiddemux");
+  release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+guint num_eos = 0;
+guint num_flush_start = 0;
+guint num_flush_stop = 0;
+
+static gboolean
+event_func (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_STREAM_START:
+      ++num_flush_start;
+      break;
+    case GST_EVENT_FLUSH_STOP:
+      ++num_flush_stop;
+      break;
+    case GST_EVENT_EOS:
+      ++num_eos;
+      break;
+    default:
+      break;
+  }
+
+  return gst_pad_event_default (pad, parent, event);
+}
+
+GST_START_TEST (test_streamiddemux_eos)
+{
+  struct TestData td;
+
+  setup_test_objects (&td);
+
+  num_eos = 0;
+
+  GST_DEBUG ("Creating mysink");
+  td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK);
+  gst_pad_set_chain_function (td.mysink[0], chain_ok);
+  gst_pad_set_event_function (td.mysink[0], event_func);
+  gst_pad_set_active (td.mysink[0], TRUE);
+
+  td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK);
+  gst_pad_set_chain_function (td.mysink[1], chain_ok);
+  gst_pad_set_event_function (td.mysink[1], event_func);
+  gst_pad_set_active (td.mysink[1], TRUE);
+
+  GST_DEBUG ("Creating mysrc");
+  td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC);
+  fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink)));
+  gst_pad_set_active (td.mysrc, TRUE);
+
+  GST_DEBUG ("Pushing stream-start, caps and segment event");
+  gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+      GST_FORMAT_BYTES, "test0");
+  set_active_srcpad (&td);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+  gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps,
+      GST_FORMAT_BYTES, "test1");
+  set_active_srcpad (&td);
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK);
+
+  GST_DEBUG ("Pushing flush event");
+  fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_start ()));
+  fail_unless (num_flush_start == 2,
+      "Failed to send flush-start event to all pads internally linked");
+  fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_stop (TRUE)));
+  fail_unless (num_flush_stop == 2,
+      "Failed to send flush-stop event to all pads internally linked");
+
+  GST_DEBUG ("Pushing eos event");
+  fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_eos ()));
+  fail_unless (num_eos == 2,
+      "Failed to send eos event to all pads internally linked");
+
+  fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_EOS);
+
+  GST_DEBUG ("Releasing mysink and mysrc");
+  gst_pad_set_active (td.mysink[0], FALSE);
+  gst_pad_set_active (td.mysink[1], FALSE);
+  gst_pad_set_active (td.mysrc, FALSE);
+
+  gst_object_unref (td.mysink[0]);
+  gst_object_unref (td.mysink[1]);
+  gst_object_unref (td.mysrc);
+
+  GST_DEBUG ("Releasing streamiddemux");
+  release_test_objects (&td);
+}
+
+GST_END_TEST;
+
+static Suite *
+streamiddemux_suite (void)
+{
+  Suite *s = suite_create ("streamiddemux");
+  TCase *tc_chain;
+
+  tc_chain = tcase_create ("streamiddemux simple");
+  tcase_add_test (tc_chain, test_simple_create_destroy);
+  tcase_add_test (tc_chain, test_streamiddemux_with_stream_start);
+  tcase_add_test (tc_chain, test_streamiddemux_without_stream_start);
+  tcase_add_test (tc_chain, test_streamiddemux_simple);
+  tcase_add_test (tc_chain, test_streamiddemux_num_buffers);
+  tcase_add_test (tc_chain, test_streamiddemux_eos);
+  suite_add_tcase (s, tc_chain);
+
+  return s;
+}
+
+GST_CHECK_MAIN (streamiddemux);
index 376bac8..fce72a1 100644 (file)
@@ -20,6 +20,7 @@ always_dirs = \
        netclock \
        queue      \
        stepping \
+       streamiddemux \
        streams \
        typefind
 
diff --git a/tests/examples/streamiddemux/Makefile.am b/tests/examples/streamiddemux/Makefile.am
new file mode 100644 (file)
index 0000000..e182d29
--- /dev/null
@@ -0,0 +1,6 @@
+noinst_PROGRAMS = streamiddemux-stream
+
+streamiddemux_stream_SOURCES = streamiddemux-stream.c
+streamiddemux_stream_LDADD = $(GST_OBJ_LIBS)
+streamiddemux_stream_CFLAGS = $(GST_OBJ_CFLAGS)
+
diff --git a/tests/examples/streamiddemux/streamiddemux-stream.c b/tests/examples/streamiddemux/streamiddemux-stream.c
new file mode 100644 (file)
index 0000000..1ef128b
--- /dev/null
@@ -0,0 +1,241 @@
+#include <gst/gst.h>
+
+#define NUM_STREAM 13
+
+typedef struct _App App;
+
+struct _App
+{
+  GstElement *pipeline;
+  GstElement *audiotestsrc[NUM_STREAM];
+  GstElement *audioconvert[NUM_STREAM];
+  GstElement *capsfilter[NUM_STREAM];
+  GstElement *vorbisenc[NUM_STREAM];
+  GstElement *oggmux[NUM_STREAM];
+  GstElement *funnel;
+  GstElement *demux;
+  GstElement *stream_synchronizer;
+  GstElement *queue[NUM_STREAM];
+  GstElement *filesink[NUM_STREAM];
+
+  gboolean pad_blocked[NUM_STREAM];
+  GstPad *queue_srcpad[NUM_STREAM];
+  gulong blocked_id[NUM_STREAM];
+};
+
+App s_app;
+
+gint pad_added_cnt = 0;
+
+static gboolean
+bus_call (GstBus * bus, GstMessage * msg, gpointer data)
+{
+  GMainLoop *loop = (GMainLoop *) data;
+
+  switch (GST_MESSAGE_TYPE (msg)) {
+    case GST_MESSAGE_EOS:{
+      g_main_loop_quit (loop);
+      break;
+    }
+    case GST_MESSAGE_ERROR:{
+      g_main_loop_quit (loop);
+      break;
+    }
+    default:
+      break;
+  }
+  return TRUE;
+}
+
+static void
+set_blocked (App * app, gboolean blocked)
+{
+  gint i = 0;
+
+  for (i = 0; i < NUM_STREAM; i++) {
+    gst_pad_remove_probe (app->queue_srcpad[i], app->blocked_id[i]);
+  }
+}
+
+static void
+sink_do_reconfigure (App * app)
+{
+  gint i = 0;
+  GstPad *filesink_sinkpad[NUM_STREAM];
+  GstPad *sync_sinkpad[NUM_STREAM];
+  GstPad *sync_srcpad[NUM_STREAM];
+  GstIterator *it;
+  GValue item = G_VALUE_INIT;
+
+  for (i = 0; i < NUM_STREAM; i++) {
+    sync_sinkpad[i] =
+        gst_element_get_request_pad (app->stream_synchronizer, "sink_%u");
+    it = gst_pad_iterate_internal_links (sync_sinkpad[i]);
+    g_assert (it);
+    gst_iterator_next (it, &item);
+    sync_srcpad[i] = g_value_dup_object (&item);
+    g_value_unset (&item);
+
+    filesink_sinkpad[i] = gst_element_get_static_pad (app->filesink[i], "sink");
+
+    gst_pad_link_full (app->queue_srcpad[i], sync_sinkpad[i],
+        GST_PAD_LINK_CHECK_NOTHING);
+    gst_pad_link_full (sync_srcpad[i], filesink_sinkpad[i],
+        GST_PAD_LINK_CHECK_NOTHING);
+  }
+  gst_iterator_free (it);
+
+}
+
+static GstPadProbeReturn
+blocked_cb (GstPad * blockedpad, GstPadProbeInfo * info, gpointer user_data)
+{
+  App *app = user_data;
+  gint i = 0;
+  gboolean all_pads_blocked = TRUE;
+
+  for (i = 0; i < NUM_STREAM; i++) {
+    if (blockedpad == app->queue_srcpad[i])
+      app->pad_blocked[i] = TRUE;
+  }
+
+  for (i = 0; i < NUM_STREAM; i++) {
+    if (app->queue_srcpad[i] == FALSE) {
+      all_pads_blocked = FALSE;
+      break;
+    }
+  }
+
+  if (all_pads_blocked == TRUE) {
+    sink_do_reconfigure (app);
+    set_blocked (app, FALSE);
+  }
+
+  return GST_PAD_PROBE_OK;
+}
+
+static void
+src_pad_added_cb (GstElement * demux, GstPad * pad, App * app)
+{
+  GstPad *queue_sinkpad[NUM_STREAM];
+
+  queue_sinkpad[pad_added_cnt] =
+      gst_element_get_static_pad (app->queue[pad_added_cnt], "sink");
+  gst_pad_link_full (pad, queue_sinkpad[pad_added_cnt],
+      GST_PAD_LINK_CHECK_NOTHING);
+
+  app->queue_srcpad[pad_added_cnt] =
+      gst_element_get_static_pad (app->queue[pad_added_cnt], "src");
+  app->blocked_id[pad_added_cnt] =
+      gst_pad_add_probe (app->queue_srcpad[pad_added_cnt],
+      GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, blocked_cb, app, NULL);
+
+  pad_added_cnt++;
+}
+
+gint
+main (gint argc, gchar * argv[])
+{
+  App *app = &s_app;
+
+  GMainLoop *loop = NULL;
+  GstBus *bus;
+  guint bus_watch_id;
+
+  GstPad *funnel_sinkpad[NUM_STREAM];
+  GstPad *funnel_srcpad;
+  GstPad *demux_sinkpad;
+  GstPad *oggmux_srcpad[NUM_STREAM];
+
+  guint stream_cnt = 0;
+  GstCaps *caps;
+
+  gst_init (&argc, &argv);
+
+  app->pipeline = gst_pipeline_new ("pipeline");
+
+  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+    app->audiotestsrc[stream_cnt] =
+        gst_element_factory_make ("audiotestsrc", NULL);
+    app->audioconvert[stream_cnt] =
+        gst_element_factory_make ("audioconvert", NULL);
+    app->capsfilter[stream_cnt] = gst_element_factory_make ("capsfilter", NULL);
+    app->vorbisenc[stream_cnt] = gst_element_factory_make ("vorbisenc", NULL);
+    app->oggmux[stream_cnt] = gst_element_factory_make ("oggmux", NULL);
+  }
+
+  app->funnel = gst_element_factory_make ("funnel", NULL);
+  app->demux = gst_element_factory_make ("streamiddemux", NULL);
+  app->stream_synchronizer =
+      gst_element_factory_make ("streamsynchronizer", NULL);
+
+  caps = gst_caps_from_string ("audio/x-raw,channels=1;");
+
+  stream_cnt = 0;
+
+  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+    app->queue[stream_cnt] = gst_element_factory_make ("queue", NULL);
+    app->filesink[stream_cnt] = gst_element_factory_make ("filesink", NULL);
+
+    g_object_set (app->audiotestsrc[stream_cnt], "wave", stream_cnt,
+        "num-buffers", 2000, NULL);
+    g_object_set (app->capsfilter[stream_cnt], "caps", caps, NULL);
+    g_object_set (app->filesink[stream_cnt], "location",
+        g_strdup_printf ("filesink_%d.ogg", stream_cnt), NULL);
+  }
+
+  stream_cnt = 0;
+
+  g_signal_connect (app->demux, "pad-added", G_CALLBACK (src_pad_added_cb),
+      app);
+
+  loop = g_main_loop_new (NULL, FALSE);
+
+  bus = gst_element_get_bus (app->pipeline);
+  bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
+  g_object_unref (bus);
+
+  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+    gst_bin_add_many (GST_BIN (app->pipeline), app->audiotestsrc[stream_cnt],
+        app->audioconvert[stream_cnt], app->capsfilter[stream_cnt],
+        app->vorbisenc[stream_cnt], app->oggmux[stream_cnt],
+        app->queue[stream_cnt], app->filesink[stream_cnt], NULL);
+    if (stream_cnt == 0) {
+      gst_bin_add_many (GST_BIN (app->pipeline), app->funnel, app->demux,
+          app->stream_synchronizer, NULL);
+    }
+  }
+
+  stream_cnt = 0;
+
+  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+    gst_element_link_many (app->audiotestsrc[stream_cnt],
+        app->audioconvert[stream_cnt], app->capsfilter[stream_cnt],
+        app->vorbisenc[stream_cnt], app->oggmux[stream_cnt], NULL);
+  }
+
+  stream_cnt = 0;
+
+  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
+    funnel_sinkpad[stream_cnt] =
+        gst_element_get_request_pad (app->funnel, "sink_%u");
+    oggmux_srcpad[stream_cnt] =
+        gst_element_get_static_pad (app->oggmux[stream_cnt], "src");
+    gst_pad_link (oggmux_srcpad[stream_cnt], funnel_sinkpad[stream_cnt]);
+  }
+
+  funnel_srcpad = gst_element_get_static_pad (app->funnel, "src");
+
+  demux_sinkpad = gst_element_get_static_pad (app->demux, "sink");
+  gst_pad_link (funnel_srcpad, demux_sinkpad);
+
+  gst_element_set_state (app->pipeline, GST_STATE_PLAYING);
+  g_main_loop_run (loop);
+
+  gst_element_set_state (app->pipeline, GST_STATE_NULL);
+  g_object_unref (app->pipeline);
+  g_source_remove (bus_watch_id);
+  g_main_loop_unref (loop);
+
+  return 0;
+}