gst/interleave/deinterleave.c: Add another example launch line.
authorSebastian Dröge <slomo@circular-chaos.org>
Mon, 26 May 2008 10:28:47 +0000 (10:28 +0000)
committerSebastian Dröge <slomo@circular-chaos.org>
Mon, 26 May 2008 10:28:47 +0000 (10:28 +0000)
Original commit message from CVS:
* gst/interleave/deinterleave.c:
Add another example launch line.
* gst/interleave/interleave.c: (interleave_24),
(gst_interleave_finalize), (gst_interleave_base_init),
(gst_interleave_class_init), (gst_interleave_init),
(gst_interleave_request_new_pad), (gst_interleave_release_pad),
(gst_interleave_change_state), (__remove_channels),
(__set_channels), (gst_interleave_sink_getcaps),
(gst_interleave_set_process_function),
(gst_interleave_sink_setcaps), (gst_interleave_sink_event),
(gst_interleave_src_query_duration), (gst_interleave_src_query),
(forward_event_func), (forward_event), (gst_interleave_src_event),
(gst_interleave_collected):
* gst/interleave/interleave.h:
Major rewrite of interleave using GstCollectpads. This new version
also supports almost all raw audio formats and has better caps
negotiation. Fixes bug #506594.
Also update docs and add some more examples.
* tests/check/elements/interleave.c: (interleave_chain_func),
(GST_START_TEST), (src_handoff_float32), (sink_handoff_float32),
(interleave_suite):
Add some more extensive unit tests for interleave.

gst/interleave/deinterleave.c
gst/interleave/interleave.c
gst/interleave/interleave.h
tests/check/elements/interleave.c

index 2ba6773..e8a10de 100644 (file)
  * </programlisting>
  * Decodes an MP3 file and encodes the left and right channel into separate Ogg Vorbis files.
  * </para>
+ * <para>
+ * <programlisting>
+ * gst-launch-0.10 filesrc location=file.mp3 ! decodebin ! audioconvert ! "audio/x-raw-int,channels=2" ! deinterleave name=d  interleave name=i ! audioconvert ! wavenc ! filesink location=test.wav    d.src0 ! queue ! audioconvert ! i.sink1    d.src1 ! queue ! audioconvert ! i.sink0
+ * </programlisting>
+ * Decodes and deinterleaves a Stereo MP3 file into separate channels and then interleaves the channels
+ * again to a WAV file with the channel with the channels exchanged.
+ * </para>
  * </refsect2>
  */
 
index be5b410..b65f5a2 100644 (file)
@@ -3,8 +3,9 @@
  *                    2000 Wim Taymans <wtay@chello.be>
  *                    2005 Wim Taymans <wim@fluendo.com>
  *                    2007 Andy Wingo <wingo at pobox.com>
+ *                    2008 Sebastian Dröge <slomo@circular-chaos.rg>
  *
- * interleave.c: interleave samples, based on gstsignalprocessor.c
+ * interleave.c: interleave samples, mostly based on adder.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
  * Boston, MA 02111-1307, USA.
  */
 
+/* TODO:
+ *       - handle caps changes
+ *       - set channel positions / keep from upstream
+ *       - handle more queries
+ */
+
 /**
  * SECTION:element-interleave
  *
  * <para>
  * Merges separate mono inputs into one interleaved stream.
  * </para>
+ * <para>
+ * This element handles all raw floating point sample formats and all signed integer sample formats. The first
+ * caps on one of the sinkpads will set the caps of the output so usually an audioconvert element should be
+ * placed before every sinkpad of interleave.
+ * </para>
+ * <para>
+ * It's possible to change the number of channels while the pipeline is running by adding or removing
+ * some of the request pads but this will change the caps of the output buffers. Changing the input
+ * caps is _not_ supported yet.
+ * </para>
  * <title>Example launch line</title>
  * <para>
  * <programlisting>
- * gst-launch-0.10 filesrc location=song.ogg ! decodebin ! audioconvert ! ladspa-gverb name=g ! interleave name=i ! audioconvert ! autoaudiosink g. ! i.
+ * gst-launch-0.10 filesrc location=file.mp3 ! decodebin ! audioconvert ! "audio/x-raw-int,channels=2" ! deinterleave name=d  interleave name=i ! audioconvert ! wavenc ! filesink location=test.wav    d.src0 ! queue ! audioconvert ! i.sink1    d.src1 ! queue ! audioconvert ! i.sink0
  * </programlisting>
- * Apply ladspa gverb to the music and merge separate left/right outputs into a
- * stereo stream for playback.
+ * Decodes and deinterleaves a Stereo MP3 file into separate channels and then interleaves the channels
+ * again to a WAV file with the channel with the channels exchanged.
+ * </para>
+ * <para>
+ * <programlisting>
+ * gst-launch-0.10 interleave name=i ! audioconvert ! wavenc ! filesink location=file.wav  filesrc location=file1.wav ! decodebin ! audioconvert ! "audio/x-raw-int,channels=1" ! queue ! i.sink0   filesrc location=file2.wav ! decodebin ! audioconvert ! "audio/x-raw-int,channels=1" ! queue ! i.sink1
+ * </programlisting>
+ * Interleaves two Mono WAV files to a single Stereo WAV file.
  * </para>
  * </refsect2>
  */
@@ -45,6 +68,7 @@
 #endif
 
 #include <gst/gst.h>
+#include <string.h>
 #include "interleave.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_interleave_debug);
@@ -53,74 +77,103 @@ GST_DEBUG_CATEGORY_STATIC (gst_interleave_debug);
 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink%d",
     GST_PAD_SINK,
     GST_PAD_REQUEST,
-    GST_STATIC_CAPS ("audio/x-raw-float, "
+    GST_STATIC_CAPS ("audio/x-raw-int, "
+        "rate = (int) [ 1, MAX ], "
+        "channels = (int) 1, "
+        "endianness = (int) { LITTLE_ENDIAN, BIG_ENDIAN }, "
+        "width = (int) { 8, 16, 24, 32 }, "
+        "depth = (int) [ 1, 32 ], "
+        "signed = (boolean) true; "
+        "audio/x-raw-float, "
         "rate = (int) [ 1, MAX ], "
         "channels = (int) 1, "
-        "endianness = (int) BYTE_ORDER, " "width = (int) 32")
+        "endianness = (int) { LITTLE_ENDIAN , BIG_ENDIAN }, "
+        "width = (int) { 32, 64 }")
     );
 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
     GST_PAD_ALWAYS,
-    GST_STATIC_CAPS ("audio/x-raw-float, "
+    GST_STATIC_CAPS ("audio/x-raw-int, "
+        "rate = (int) [ 1, MAX ], "
+        "channels = (int) [ 1, MAX ], "
+        "endianness = (int) { LITTLE_ENDIAN, BIG_ENDIAN }, "
+        "width = (int) { 8, 16, 24, 32 }, "
+        "depth = (int) [ 1, 32 ], "
+        "signed = (boolean) true; "
+        "audio/x-raw-float, "
         "rate = (int) [ 1, MAX ], "
         "channels = (int) [ 1, MAX ], "
-        "endianness = (int) BYTE_ORDER, " "width = (int) 32")
+        "endianness = (int) { LITTLE_ENDIAN , BIG_ENDIAN }, "
+        "width = (int) { 32, 64 }")
     );
 
+#define MAKE_FUNC(type) \
+static void interleave_##type (guint##type *out, guint##type *in, \
+    guint stride, guint nframes) \
+{ \
+  gint i; \
+  \
+  for (i = 0; i < nframes; i++) { \
+    *out = in[i]; \
+    out += stride; \
+  } \
+}
 
-#define GST_TYPE_INTERLEAVE_PAD (gst_interleave_pad_get_type ())
-#define GST_INTERLEAVE_PAD(obj) \
-    (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_INTERLEAVE_PAD,\
-    GstInterleavePad))
-typedef struct _GstInterleavePad GstInterleavePad;
-typedef GstPadClass GstInterleavePadClass;
+MAKE_FUNC (8);
+MAKE_FUNC (16);
+MAKE_FUNC (32);
+MAKE_FUNC (64);
 
-struct _GstInterleavePad
+static void
+interleave_24 (guint8 * out, guint8 * in, guint stride, guint nframes)
 {
-  GstPad parent;
+  gint i;
 
-  GstBuffer *pen;
+  for (i = 0; i < nframes; i++) {
+    memcpy (out, in, 3);
+    out += stride * 3;
+    in += 3;
+  }
+}
 
+typedef struct
+{
+  GstCollectData data;
   guint channel;
+} GstInterleaveCollectData;
 
-  /* these are only used for sink pads */
-  guint samples_avail;
-  gfloat *data;
-};
+GST_BOILERPLATE (GstInterleave, gst_interleave, GstElement, GST_TYPE_ELEMENT);
 
-static GType
-gst_interleave_pad_get_type (void)
-{
-  static GType type = 0;
+static GstPad *gst_interleave_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name);
+static void gst_interleave_release_pad (GstElement * element, GstPad * pad);
+static GstStateChangeReturn gst_interleave_change_state (GstElement * element,
+    GstStateChange transition);
 
-  if (!type) {
-    static const GTypeInfo info = {
-      sizeof (GstInterleavePadClass), NULL, NULL, NULL, NULL,
-      NULL, sizeof (GstInterleavePad), 0, NULL
-    };
+static gboolean gst_interleave_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_interleave_src_event (GstPad * pad, GstEvent * event);
 
-    type = g_type_register_static (GST_TYPE_PAD, "GstInterleavePad", &info, 0);
-  }
-  return type;
-}
+static gboolean gst_interleave_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_interleave_sink_setcaps (GstPad * pad, GstCaps * caps);
+static GstCaps *gst_interleave_sink_getcaps (GstPad * pad);
 
+static GstFlowReturn gst_interleave_collected (GstCollectPads * pads,
+    GstInterleave * self);
 
-GST_BOILERPLATE (GstInterleave, gst_interleave, GstElement, GST_TYPE_ELEMENT);
+static void
+gst_interleave_finalize (GObject * object)
+{
+  GstInterleave *self = GST_INTERLEAVE (object);
 
+  if (self->collect) {
+    gst_object_unref (self->collect);
+    self->collect = NULL;
+  }
 
-static gboolean gst_interleave_src_activate_pull (GstPad * pad,
-    gboolean active);
-static gboolean gst_interleave_sink_activate_push (GstPad * pad,
-    gboolean active);
-static GstPad *gst_interleave_request_new_pad (GstElement * element,
-    GstPadTemplate * templ, const gchar * name);
+  gst_caps_replace (&self->sinkcaps, NULL);
 
-static GstFlowReturn gst_interleave_getrange (GstPad * pad,
-    guint64 offset, guint length, GstBuffer ** buffer);
-static GstFlowReturn gst_interleave_chain (GstPad * pad, GstBuffer * buffer);
-static gboolean gst_interleave_src_setcaps (GstPad * pad, GstCaps * caps);
-static gboolean gst_interleave_sink_setcaps (GstPad * pad, GstCaps * caps);
-static GstCaps *gst_interleave_src_getcaps (GstPad * pad);
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
 
 static void
 gst_interleave_base_init (gpointer g_class)
@@ -128,7 +181,8 @@ gst_interleave_base_init (gpointer g_class)
   gst_element_class_set_details_simple (g_class, "Audio interleaver",
       "Filter/Converter/Audio",
       "Folds many mono channels into one interleaved audio stream",
-      "Andy Wingo <wingo at pobox.com>");
+      "Andy Wingo <wingo at pobox.com>, "
+      "Sebastian Dröge <slomo@circular-chaos.org>");
 
   gst_element_class_add_pad_template (g_class,
       gst_static_pad_template_get (&sink_template));
@@ -140,107 +194,332 @@ static void
 gst_interleave_class_init (GstInterleaveClass * klass)
 {
   GstElementClass *gstelement_class;
+  GObjectClass *gobject_class;
 
+  gobject_class = G_OBJECT_CLASS (klass);
   gstelement_class = GST_ELEMENT_CLASS (klass);
 
   GST_DEBUG_CATEGORY_INIT (gst_interleave_debug, "interleave", 0,
       "interleave element");
 
+  gobject_class->finalize = gst_interleave_finalize;
+
   gstelement_class->request_new_pad =
       GST_DEBUG_FUNCPTR (gst_interleave_request_new_pad);
+  gstelement_class->release_pad =
+      GST_DEBUG_FUNCPTR (gst_interleave_release_pad);
+  gstelement_class->change_state =
+      GST_DEBUG_FUNCPTR (gst_interleave_change_state);
 }
 
 static void
 gst_interleave_init (GstInterleave * self, GstInterleaveClass * klass)
 {
-  self->pending_in = 0;
-  self->mode = GST_ACTIVATE_NONE;
-
   self->src = gst_pad_new_from_static_template (&src_template, "src");
 
-  gst_pad_set_getrange_function (self->src,
-      GST_DEBUG_FUNCPTR (gst_interleave_getrange));
-  gst_pad_set_activatepull_function (self->src,
-      GST_DEBUG_FUNCPTR (gst_interleave_src_activate_pull));
-  gst_pad_set_setcaps_function (self->src,
-      GST_DEBUG_FUNCPTR (gst_interleave_src_setcaps));
-  gst_pad_set_getcaps_function (self->src,
-      GST_DEBUG_FUNCPTR (gst_interleave_src_getcaps));
+  gst_pad_set_query_function (self->src,
+      GST_DEBUG_FUNCPTR (gst_interleave_src_query));
+  gst_pad_set_event_function (self->src,
+      GST_DEBUG_FUNCPTR (gst_interleave_src_event));
 
   gst_element_add_pad (GST_ELEMENT (self), self->src);
+
+  self->collect = gst_collect_pads_new ();
+  gst_collect_pads_set_function (self->collect,
+      (GstCollectPadsFunction) gst_interleave_collected, self);
 }
 
 static GstPad *
 gst_interleave_request_new_pad (GstElement * element, GstPadTemplate * templ,
-    const gchar * name)
+    const gchar * req_name)
 {
   GstInterleave *self = GST_INTERLEAVE (element);
   GstPad *new_pad;
   gchar *pad_name;
+  gint channels;
+  GstInterleaveCollectData *cdata;
+
+  if (templ->direction != GST_PAD_SINK)
+    goto not_sink_pad;
 
-  pad_name = g_strdup_printf ("sink%d", self->channels);
-  new_pad = g_object_new (GST_TYPE_INTERLEAVE_PAD, "name", pad_name,
-      "direction", templ->direction, "template", templ, NULL);
+  channels = g_atomic_int_exchange_and_add (&self->channels, 1);
+
+  pad_name = g_strdup_printf ("sink%d", channels);
+  new_pad = gst_pad_new_from_template (templ, pad_name);
+  GST_DEBUG_OBJECT (self, "requested new pad %s", pad_name);
   g_free (pad_name);
-  GST_INTERLEAVE_PAD (new_pad)->channel = self->channels;
-  ++self->channels;
 
   gst_pad_set_setcaps_function (new_pad,
       GST_DEBUG_FUNCPTR (gst_interleave_sink_setcaps));
-  gst_pad_set_chain_function (new_pad,
-      GST_DEBUG_FUNCPTR (gst_interleave_chain));
-  gst_pad_set_activatepush_function (new_pad,
-      GST_DEBUG_FUNCPTR (gst_interleave_sink_activate_push));
+  gst_pad_set_getcaps_function (new_pad,
+      GST_DEBUG_FUNCPTR (gst_interleave_sink_getcaps));
+
+  cdata = (GstInterleaveCollectData *)
+      gst_collect_pads_add_pad (self->collect, new_pad,
+      sizeof (GstInterleaveCollectData));
+  cdata->channel = channels;
+
+  /* FIXME: hacked way to override/extend the event function of
+   * GstCollectPads; because it sets its own event function giving the
+   * element no access to events */
+  self->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (new_pad);
+  gst_pad_set_event_function (new_pad,
+      GST_DEBUG_FUNCPTR (gst_interleave_sink_event));
+
+  if (!gst_element_add_pad (element, new_pad))
+    goto could_not_add;
+
+  /* Update the src caps if we already have them */
+  if (self->sinkcaps) {
+    GstCaps *srccaps;
+    GstStructure *s;
 
-  self->pending_in++;
+    /* Take lock to make sure processing finishes first */
+    GST_OBJECT_LOCK (self->collect);
 
-  GST_PAD_UNSET_FLUSHING (new_pad);
-  gst_element_add_pad (element, new_pad);
+    srccaps = gst_caps_copy (self->sinkcaps);
+    s = gst_caps_get_structure (srccaps, 0);
+
+    gst_structure_set (s, "channels", G_TYPE_INT, self->channels, NULL);
+
+    gst_pad_set_caps (self->src, srccaps);
+    gst_caps_unref (srccaps);
+
+    GST_OBJECT_UNLOCK (self->collect);
+  }
 
   return new_pad;
+
+  /* errors */
+not_sink_pad:
+  {
+    g_warning ("interleave: requested new pad that is not a SINK pad\n");
+    return NULL;
+  }
+could_not_add:
+  {
+    GST_DEBUG_OBJECT (self, "could not add pad %s", GST_PAD_NAME (new_pad));
+    gst_collect_pads_remove_pad (self->collect, new_pad);
+    gst_object_unref (new_pad);
+    return NULL;
+  }
 }
 
 static void
-gst_interleave_unset_caps (GstInterleave * self)
+gst_interleave_release_pad (GstElement * element, GstPad * pad)
 {
-  GstElement *elem;
-  GList *sinks;
+  GstInterleave *self = GST_INTERLEAVE (element);
+
+  /* Take lock to make sure we're not changing this when processing buffers */
+  GST_OBJECT_LOCK (self->collect);
+
+  self->channels--;
+
+  /* Update the src caps if we already have them */
+  if (self->sinkcaps) {
+    GstCaps *srccaps;
+    GstStructure *s;
+    GSList *l;
+    gint i = 0;
+
+    srccaps = gst_caps_copy (self->sinkcaps);
+    s = gst_caps_get_structure (srccaps, 0);
+
+    gst_structure_set (s, "channels", G_TYPE_INT, self->channels, NULL);
+
+    gst_pad_set_caps (self->src, srccaps);
+    gst_caps_unref (srccaps);
+
+    /* Update channel numbers */
+    for (l = self->collect->data; l != NULL; l = l->next) {
+      GstInterleaveCollectData *cdata = l->data;
 
-  elem = GST_ELEMENT (self);
+      if (cdata == NULL || cdata->data.pad == pad)
+        continue;
+      cdata->channel = i;
+      i++;
+    }
+  }
 
-  GST_INFO_OBJECT (self, "unset_caps()");
+  GST_OBJECT_UNLOCK (self->collect);
 
-  for (sinks = elem->sinkpads; sinks; sinks = sinks->next)
-    gst_pad_set_caps (GST_PAD (sinks->data), NULL);
+  gst_collect_pads_remove_pad (self->collect, pad);
+  gst_element_remove_pad (element, pad);
 }
 
-static gboolean
-gst_interleave_sink_setcaps (GstPad * pad, GstCaps * caps)
+static GstStateChangeReturn
+gst_interleave_change_state (GstElement * element, GstStateChange transition)
 {
   GstInterleave *self;
+  GstStateChangeReturn ret;
+
+  self = GST_INTERLEAVE (element);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      self->timestamp = 0;
+      self->offset = 0;
+      self->segment_pending = TRUE;
+      self->segment_position = 0;
+      self->segment_rate = 1.0;
+      gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
+      gst_collect_pads_start (self->collect);
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      break;
+    default:
+      break;
+  }
 
-  self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      gst_collect_pads_stop (self->collect);
+      gst_pad_set_caps (self->src, NULL);
+      gst_caps_replace (&self->sinkcaps, NULL);
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      break;
+    default:
+      break;
+  }
 
-  if (self->sinkcaps && !gst_caps_is_equal (caps, self->sinkcaps))
-    goto cannot_change_caps;
+  return ret;
+}
 
-  if (self->mode == GST_ACTIVATE_PULL) {
-    GstPad *peer;
+static void
+__remove_channels (GstCaps * caps)
+{
+  GstStructure *s;
+  gint i, size;
+
+  size = gst_caps_get_size (caps);
+  for (i = 0; i < size; i++) {
+    s = gst_caps_get_structure (caps, i);
+    gst_structure_remove_field (s, "channel-positions");
+    gst_structure_remove_field (s, "channels");
+  }
+}
 
-    if ((peer = gst_pad_get_peer (pad))) {
-      gboolean res = gst_pad_set_caps (peer, caps);
+static void
+__set_channels (GstCaps * caps, gint channels)
+{
+  GstStructure *s;
+  gint i, size;
+
+  size = gst_caps_get_size (caps);
+  for (i = 0; i < size; i++) {
+    s = gst_caps_get_structure (caps, i);
+    if (channels > 0)
+      gst_structure_set (s, "channels", G_TYPE_INT, channels, NULL);
+    else
+      gst_structure_set (s, "channels", GST_TYPE_INT_RANGE, 1, G_MAXINT, NULL);
+  }
+}
+
+/* we can only accept caps that we and downstream can handle. */
+static GstCaps *
+gst_interleave_sink_getcaps (GstPad * pad)
+{
+  GstInterleave *self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+  GstCaps *result, *peercaps, *sinkcaps;
+
+  GST_OBJECT_LOCK (self);
 
-      gst_object_unref (peer);
-      if (!res)
-        goto peer_did_not_accept;
+  /* If we already have caps on one of the sink pads return them */
+  if (self->sinkcaps) {
+    result = gst_caps_copy (self->sinkcaps);
+  } else {
+    /* get the downstream possible caps */
+    peercaps = gst_pad_peer_get_caps (self->src);
+    /* get the allowed caps on this sinkpad */
+    sinkcaps = gst_caps_copy (gst_pad_get_pad_template_caps (pad));
+    __remove_channels (sinkcaps);
+    if (peercaps) {
+      __remove_channels (peercaps);
+      /* if the peer has caps, intersect */
+      GST_DEBUG_OBJECT (pad, "intersecting peer and template caps");
+      result = gst_caps_intersect (peercaps, sinkcaps);
+      gst_caps_unref (peercaps);
+      gst_caps_unref (sinkcaps);
+    } else {
+      /* the peer has no caps (or there is no peer), just use the allowed caps
+       * of this sinkpad. */
+      GST_DEBUG_OBJECT (pad, "no peer caps, using sinkcaps");
+      result = sinkcaps;
     }
+    __set_channels (result, 1);
+  }
+
+  GST_OBJECT_UNLOCK (self);
+
+  gst_object_unref (self);
+
+  GST_DEBUG_OBJECT (pad, "Returning caps %" GST_PTR_FORMAT, result);
+
+  return result;
+}
+
+static void
+gst_interleave_set_process_function (GstInterleave * self)
+{
+  switch (self->width) {
+    case 8:
+      self->func = (GstInterleaveFunc) interleave_8;
+      break;
+    case 16:
+      self->func = (GstInterleaveFunc) interleave_16;
+      break;
+    case 24:
+      self->func = (GstInterleaveFunc) interleave_24;
+      break;
+    case 32:
+      self->func = (GstInterleaveFunc) interleave_32;
+      break;
+    case 64:
+      self->func = (GstInterleaveFunc) interleave_64;
+      break;
+    default:
+      g_assert_not_reached ();
+      break;
+  }
+}
+
+static gboolean
+gst_interleave_sink_setcaps (GstPad * pad, GstCaps * caps)
+{
+  GstInterleave *self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+
+  /* First caps that are set on a sink pad are used as output caps */
+  /* TODO: handle caps changes */
+  if (self->sinkcaps && !gst_caps_is_equal (caps, self->sinkcaps)) {
+    goto cannot_change_caps;
   } else {
     GstCaps *srccaps;
+    GstStructure *s;
     gboolean res;
 
+    s = gst_caps_get_structure (caps, 0);
+
+    if (!gst_structure_get_int (s, "width", &self->width))
+      goto no_width;
+
+    if (!gst_structure_get_int (s, "rate", &self->rate))
+      goto no_rate;
+
+    gst_interleave_set_process_function (self);
+
     srccaps = gst_caps_copy (caps);
-    gst_structure_set (gst_caps_get_structure (srccaps, 0), "channels",
-        G_TYPE_INT, self->channels, NULL);
+    s = gst_caps_get_structure (srccaps, 0);
+
+    /* TODO: channel positions */
+    gst_structure_set (s, "channels", G_TYPE_INT, self->channels, NULL);
+    gst_structure_remove_field (s, "channel-positions");
 
     res = gst_pad_set_caps (self->src, srccaps);
     gst_caps_unref (srccaps);
@@ -252,456 +531,386 @@ gst_interleave_sink_setcaps (GstPad * pad, GstCaps * caps)
   if (!self->sinkcaps)
     gst_caps_replace (&self->sinkcaps, caps);
 
+  gst_object_unref (self);
+
   return TRUE;
 
 cannot_change_caps:
   {
     GST_DEBUG_OBJECT (self, "caps of %" GST_PTR_FORMAT " already set, can't "
         "change", self->sinkcaps);
-    return FALSE;
-  }
-peer_did_not_accept:
-  {
-    GST_DEBUG_OBJECT (self, "peer did not accept setcaps()");
+    gst_object_unref (self);
     return FALSE;
   }
 src_did_not_accept:
   {
     GST_DEBUG_OBJECT (self, "src did not accept setcaps()");
-    return FALSE;
-  }
-}
-
-static gboolean
-gst_interleave_src_setcaps (GstPad * pad, GstCaps * caps)
-{
-  GstInterleave *self;
-  gint channels;
-
-  self = GST_INTERLEAVE (gst_pad_get_parent (pad));
-
-  if (!gst_structure_get_int (gst_caps_get_structure (caps, 0), "channels",
-          &channels))
-    goto impossible;
-
-  if (channels != self->channels)
-    goto wrong_num_channels;
-
-  if (self->mode == GST_ACTIVATE_PULL) {
-    GstCaps *sinkcaps;
-    GList *l;
-
-    sinkcaps = gst_caps_copy (caps);
-    gst_structure_set (gst_caps_get_structure (sinkcaps, 0), "channels",
-        G_TYPE_INT, 1, NULL);
-
-    for (l = GST_ELEMENT (self)->sinkpads; l; l = l->next)
-      if (!gst_pad_set_caps (GST_PAD (l->data), sinkcaps))
-        goto sinks_did_not_accept;
-
-    gst_caps_unref (sinkcaps);
-  }
-
-  gst_object_unref (self);
-
-  return TRUE;
-
-impossible:
-  {
-    g_warning ("caps didn't have channels property, how is this possible");
     gst_object_unref (self);
     return FALSE;
   }
-wrong_num_channels:
+no_width:
   {
-    GST_INFO_OBJECT (self, "bad number of channels (%d != %d)",
-        self->channels, channels);
+    GST_WARNING_OBJECT (self, "caps did not have width: %" GST_PTR_FORMAT,
+        caps);
     gst_object_unref (self);
     return FALSE;
   }
-sinks_did_not_accept:
+no_rate:
   {
-    /* assume they already logged */
+    GST_WARNING_OBJECT (self, "caps did not have rate: %" GST_PTR_FORMAT, caps);
     gst_object_unref (self);
     return FALSE;
   }
 }
 
-static GstCaps *
-gst_interleave_src_getcaps (GstPad * pad)
+static gboolean
+gst_interleave_sink_event (GstPad * pad, GstEvent * event)
 {
-  GstInterleave *self;
-  GList *l;
-  GstCaps *ret;
-
-  self = GST_INTERLEAVE (gst_pad_get_parent (pad));
-
-  ret = gst_caps_copy (gst_pad_get_pad_template_caps (pad));
-
-  for (l = GST_ELEMENT (self)->sinkpads; l; l = l->next) {
-    GstCaps *sinkcaps, *oldcaps;
-
-    oldcaps = ret;
-    sinkcaps = gst_pad_get_caps (GST_PAD (l->data));
-    ret = gst_caps_intersect (sinkcaps, oldcaps);
-    gst_caps_unref (oldcaps);
-    gst_caps_unref (sinkcaps);
+  GstInterleave *self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+  gboolean ret;
+
+  GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
+      GST_DEBUG_PAD_NAME (pad));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:
+      /* mark a pending new segment. This event is synchronized
+       * with the streaming thread so we can safely update the
+       * variable without races. It's somewhat weird because we
+       * assume the collectpads forwarded the FLUSH_STOP past us
+       * and downstream (using our source pad, the bastard!).
+       */
+      self->segment_pending = TRUE;
+      break;
+    default:
+      break;
   }
 
-  if (self->channels)
-    gst_structure_set (gst_caps_get_structure (ret, 0), "channels", G_TYPE_INT,
-        self->channels, NULL);
+  /* now GstCollectPads can take care of the rest, e.g. EOS */
+  ret = self->collect_event (pad, event);
 
   gst_object_unref (self);
-
   return ret;
 }
 
-static void
-gst_interleave_update_inputs (GstInterleave * self, guint nprocessed)
+static gboolean
+gst_interleave_src_query_duration (GstInterleave * self, GstQuery * query)
 {
-  GstElement *elem = (GstElement *) self;
-  GList *sinks;
-
-  for (sinks = elem->sinkpads; sinks; sinks = sinks->next) {
-    GstInterleavePad *sinkpad;
-
-    sinkpad = (GstInterleavePad *) sinks->data;
-    g_assert (sinkpad->samples_avail >= nprocessed);
-
-    if (sinkpad->pen && sinkpad->samples_avail == nprocessed) {
-      /* used up this buffer, unpen */
-      gst_buffer_unref (sinkpad->pen);
-      sinkpad->pen = NULL;
-    }
-
-    if (!sinkpad->pen) {
-      /* this buffer was used up */
-      self->pending_in++;
-      sinkpad->data = NULL;
-      sinkpad->samples_avail = 0;
-    } else {
-      /* advance ->data pointers and decrement ->samples_avail, unreffing buffer
-         if no samples are left */
-      sinkpad->samples_avail -= nprocessed;
-      sinkpad->data += nprocessed;      /* gfloat* arithmetic */
+  gint64 max;
+  gboolean res;
+  GstFormat format;
+  GstIterator *it;
+  gboolean done;
+
+  /* parse format */
+  gst_query_parse_duration (query, &format, NULL);
+
+  max = -1;
+  res = TRUE;
+  done = FALSE;
+
+  /* Take maximum of all durations */
+  it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (self));
+  while (!done) {
+    GstIteratorResult ires;
+    gpointer item;
+
+    ires = gst_iterator_next (it, &item);
+    switch (ires) {
+      case GST_ITERATOR_DONE:
+        done = TRUE;
+        break;
+      case GST_ITERATOR_OK:
+      {
+        GstPad *pad = GST_PAD_CAST (item);
+        gint64 duration;
+
+        /* ask sink peer for duration */
+        res &= gst_pad_query_peer_duration (pad, &format, &duration);
+        /* take max from all valid return values */
+        if (res) {
+          /* valid unknown length, stop searching */
+          if (duration == -1) {
+            max = duration;
+            done = TRUE;
+          }
+          /* else see if bigger than current max */
+          else if (duration > max)
+            max = duration;
+        }
+        break;
+      }
+      case GST_ITERATOR_RESYNC:
+        max = -1;
+        res = TRUE;
+        break;
+      default:
+        res = FALSE;
+        done = TRUE;
+        break;
     }
   }
-}
-
-static GstFlowReturn
-gst_interleave_process (GstInterleave * self, guint nframes, GstBuffer ** buf)
-{
-  GstFlowReturn ret;
-  GstElement *elem;
-  GList *sinks;
-  guint bufsize, i, j, channels;
-  gfloat *in, *out;
-
-  g_return_val_if_fail (self->pending_in == 0, GST_FLOW_ERROR);
+  gst_iterator_free (it);
 
-  elem = GST_ELEMENT (self);
-
-  /* determine the number of samples that we can process */
-  for (sinks = elem->sinkpads; sinks; sinks = sinks->next) {
-    GstInterleavePad *sinkpad = (GstInterleavePad *) sinks->data;
-
-    g_assert (sinkpad->samples_avail > 0);
-    nframes = MIN (nframes, sinkpad->samples_avail);
+  if (res) {
+    /* and store the max */
+    gst_query_set_duration (query, format, max);
   }
 
-  channels = self->channels;
-  bufsize = nframes * channels * sizeof (gfloat);
-
-  ret = gst_pad_alloc_buffer (GST_PAD (self->src), -1,
-      bufsize, GST_PAD_CAPS (self->src), buf);
-
-  if (ret != GST_FLOW_OK)
-    goto alloc_buffer_failed;
-
-  if (GST_BUFFER_SIZE (*buf) != bufsize)
-    goto alloc_buffer_bad_size;
-
-  gst_buffer_set_caps (*buf, GST_PAD_CAPS (self->src));
-
-  /* do the thing */
-  for (sinks = elem->sinkpads, i = 0; sinks; sinks = sinks->next, i++) {
-    GstInterleavePad *sinkpad = (GstInterleavePad *) sinks->data;
+  return res;
+}
 
-    out = (gfloat *) GST_BUFFER_DATA (*buf);
-    out += i;                   /* gfloat* arith */
-    in = sinkpad->data;
-    for (j = 0; j < nframes; j++)
-      out[j * channels] = in[j];
+static gboolean
+gst_interleave_src_query (GstPad * pad, GstQuery * query)
+{
+  GstInterleave *self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+  gboolean res = FALSE;
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_POSITION:
+    {
+      GstFormat format;
+
+      gst_query_parse_position (query, &format, NULL);
+
+      switch (format) {
+        case GST_FORMAT_TIME:
+          /* FIXME, bring to stream time, might be tricky */
+          gst_query_set_position (query, format, self->timestamp);
+          res = TRUE;
+          break;
+        case GST_FORMAT_DEFAULT:
+          gst_query_set_position (query, format, self->offset);
+          res = TRUE;
+          break;
+        default:
+          break;
+      }
+      break;
+    }
+    case GST_QUERY_DURATION:
+      res = gst_interleave_src_query_duration (self, query);
+      break;
+    default:
+      /* FIXME, needs a custom query handler because we have multiple
+       * sinkpads */
+      res = gst_pad_query_default (pad, query);
+      break;
   }
 
-  gst_interleave_update_inputs (self, nframes);
-
-  return ret;
+  gst_object_unref (self);
+  return res;
+}
 
-alloc_buffer_failed:
-  {
-    GST_WARNING ("gst_pad_alloc_buffer() returned %d", ret);
-    return ret;
-  }
-alloc_buffer_bad_size:
-  {
-    GST_WARNING ("called alloc_buffer() for %d bytes but got %d", bufsize,
-        GST_BUFFER_SIZE (*buf));
-    gst_buffer_unref (*buf);
-    return GST_FLOW_NOT_NEGOTIATED;
+static gboolean
+forward_event_func (GstPad * pad, GValue * ret, GstEvent * event)
+{
+  gst_event_ref (event);
+  GST_LOG_OBJECT (pad, "About to send event %s", GST_EVENT_TYPE_NAME (event));
+  if (!gst_pad_push_event (pad, event)) {
+    g_value_set_boolean (ret, FALSE);
+    GST_WARNING_OBJECT (pad, "Sending event  %p (%s) failed.",
+        event, GST_EVENT_TYPE_NAME (event));
+  } else {
+    GST_LOG_OBJECT (pad, "Sent event  %p (%s).",
+        event, GST_EVENT_TYPE_NAME (event));
   }
+  gst_object_unref (pad);
+  return TRUE;
 }
 
-static GstFlowReturn
-gst_interleave_pen_buffer (GstInterleave * self, GstPad * pad,
-    GstBuffer * buffer)
+static gboolean
+forward_event (GstInterleave * self, GstEvent * event)
 {
-  GstInterleavePad *spad = (GstInterleavePad *) pad;
+  gboolean ret;
+  GstIterator *it;
+  GValue vret = { 0 };
 
-  if (spad->pen)
-    goto had_buffer;
+  GST_LOG_OBJECT (self, "Forwarding event %p (%s)", event,
+      GST_EVENT_TYPE_NAME (event));
 
-  /* keep the reference */
-  spad->pen = buffer;
-  spad->data = (gfloat *) GST_BUFFER_DATA (buffer);
-  spad->samples_avail = GST_BUFFER_SIZE (buffer) / sizeof (float);
+  ret = TRUE;
 
-  g_assert (self->pending_in != 0);
+  g_value_init (&vret, G_TYPE_BOOLEAN);
+  g_value_set_boolean (&vret, TRUE);
+  it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (self));
+  gst_iterator_fold (it, (GstIteratorFoldFunction) forward_event_func, &vret,
+      event);
+  gst_iterator_free (it);
+  gst_event_unref (event);
 
-  self->pending_in--;
+  ret = g_value_get_boolean (&vret);
 
-  return GST_FLOW_OK;
-
-  /* ERRORS */
-had_buffer:
-  {
-    GST_WARNING ("Pad %s:%s already has penned buffer",
-        GST_DEBUG_PAD_NAME (pad));
-    gst_buffer_unref (buffer);
-    return GST_FLOW_ERROR;
-  }
+  return ret;
 }
 
-static void
-gst_interleave_flush (GstInterleave * self)
-{
-  GList *pads;
 
-  GST_INFO_OBJECT (self, "flush()");
+static gboolean
+gst_interleave_src_event (GstPad * pad, GstEvent * event)
+{
+  GstInterleave *self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+  gboolean result;
 
-  for (pads = GST_ELEMENT (self)->sinkpads; pads; pads = pads->next) {
-    GstInterleavePad *spad = (GstInterleavePad *) pads->data;
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_QOS:
+      /* QoS might be tricky */
+      result = FALSE;
+      break;
+    case GST_EVENT_SEEK:
+    {
+      GstSeekFlags flags;
+      GstSeekType curtype;
+      gint64 cur;
+
+      /* parse the seek parameters */
+      gst_event_parse_seek (event, &self->segment_rate, NULL, &flags, &curtype,
+          &cur, NULL, NULL);
+
+      /* check if we are flushing */
+      if (flags & GST_SEEK_FLAG_FLUSH) {
+        /* make sure we accept nothing anymore and return WRONG_STATE */
+        gst_collect_pads_set_flushing (self->collect, TRUE);
+
+        /* flushing seek, start flush downstream, the flush will be done
+         * when all pads received a FLUSH_STOP. */
+        gst_pad_push_event (self->src, gst_event_new_flush_start ());
+      }
 
-    if (spad->pen) {
-      gst_buffer_unref (spad->pen);
-      spad->pen = NULL;
-      spad->data = NULL;
-      spad->samples_avail = 0;
+      /* now wait for the collected to be finished and mark a new
+       * segment */
+      GST_OBJECT_LOCK (self->collect);
+      if (curtype == GST_SEEK_TYPE_SET)
+        self->segment_position = cur;
+      else
+        self->segment_position = 0;
+      self->segment_pending = TRUE;
+      GST_OBJECT_UNLOCK (self->collect);
+
+      result = forward_event (self, event);
+      break;
     }
+    case GST_EVENT_NAVIGATION:
+      /* navigation is rather pointless. */
+      result = FALSE;
+      break;
+    default:
+      /* just forward the rest for now */
+      result = forward_event (self, event);
+      break;
   }
+  gst_object_unref (self);
 
-  self->pending_in = GST_ELEMENT (self)->numsinkpads;
+  return result;
 }
 
 static GstFlowReturn
-gst_interleave_do_pulls (GstInterleave * self, guint nframes)
+gst_interleave_collected (GstCollectPads * pads, GstInterleave * self)
 {
-  GList *sinkpads;
+  guint size;
+  GstBuffer *outbuf;
   GstFlowReturn ret = GST_FLOW_OK;
+  GSList *collected;
+  guint nsamples;
+  guint ncollected = 0;
+  gboolean empty = TRUE;
+  gint width = self->width / 8;
 
-  /* FIXME: not threadsafe atm */
-
-  sinkpads = GST_ELEMENT (self)->sinkpads;
+  g_return_val_if_fail (self->func != NULL, GST_FLOW_NOT_NEGOTIATED);
+  g_return_val_if_fail (self->width > 0, GST_FLOW_NOT_NEGOTIATED);
+  g_return_val_if_fail (self->channels > 0, GST_FLOW_NOT_NEGOTIATED);
+  g_return_val_if_fail (self->rate > 0, GST_FLOW_NOT_NEGOTIATED);
 
-  for (; sinkpads; sinkpads = sinkpads->next) {
-    GstInterleavePad *spad = (GstInterleavePad *) sinkpads->data;
-    GstBuffer *buf;
+  size = gst_collect_pads_available (pads);
 
-    if (spad->pen) {
-      g_warning ("Unexpectedly full buffer pen for pad %s:%s",
-          GST_DEBUG_PAD_NAME (spad));
-      continue;
-    }
+  g_return_val_if_fail (size % width == 0, GST_FLOW_ERROR);
 
-    ret =
-        gst_pad_pull_range (GST_PAD (spad), -1, nframes * sizeof (gfloat),
-        &buf);
-    if (ret != GST_FLOW_OK)
-      goto pull_failed;
+  GST_DEBUG_OBJECT (self, "Starting to collect %u bytes from %d channels", size,
+      self->channels);
 
-    if (!buf)
-      goto no_buffer;
+  nsamples = size / width;
 
-    ret = gst_interleave_pen_buffer (self, GST_PAD (spad), buf);
-    if (ret != GST_FLOW_OK)
-      goto pull_failed;
-  }
+  ret =
+      gst_pad_alloc_buffer (self->src, GST_BUFFER_OFFSET_NONE,
+      size * self->channels, GST_PAD_CAPS (self->src), &outbuf);
 
-  return ret;
-
-pull_failed:
-  {
-    gst_interleave_flush (self);
+  if (ret != GST_FLOW_OK) {
     return ret;
+  } else if (outbuf == NULL || GST_BUFFER_SIZE (outbuf) < size * self->channels) {
+    gst_buffer_unref (outbuf);
+    return GST_FLOW_NOT_NEGOTIATED;
+  } else if (!gst_caps_is_equal (GST_BUFFER_CAPS (outbuf),
+          GST_PAD_CAPS (self->src))) {
+    gst_buffer_unref (outbuf);
+    return GST_FLOW_NOT_NEGOTIATED;
   }
-no_buffer:
-  {
-    g_critical ("Pull failed to make a buffer!");
-    return GST_FLOW_ERROR;
-  }
-}
-
-static GstFlowReturn
-gst_interleave_getrange (GstPad * pad, guint64 offset,
-    guint length, GstBuffer ** buffer)
-{
-  GstInterleave *self;
-  GstFlowReturn ret = GST_FLOW_ERROR;
-  guint nframes;
-
-  self = GST_INTERLEAVE (gst_pad_get_parent (pad));
 
-  nframes = length / self->channels / sizeof (gfloat);
+  memset (GST_BUFFER_DATA (outbuf), 0, size * self->channels);
 
-  ret = gst_interleave_do_pulls (self, nframes);
+  for (collected = pads->data; collected != NULL; collected = collected->next) {
+    GstInterleaveCollectData *cdata;
+    GstBuffer *inbuf;
+    guint8 *outdata;
 
-  if (ret == GST_FLOW_OK)
-    ret = gst_interleave_process (self, nframes, buffer);
+    cdata = (GstInterleaveCollectData *) collected->data;
 
-  GST_DEBUG_OBJECT (self, "returns %s", gst_flow_get_name (ret));
-
-  gst_object_unref (self);
-
-  return ret;
-}
-
-static GstFlowReturn
-gst_interleave_chain (GstPad * pad, GstBuffer * buffer)
-{
-  GstFlowReturn ret;
-  GstInterleave *self;
-
-  self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+    inbuf = gst_collect_pads_take_buffer (pads, (GstCollectData *) cdata, size);
+    if (inbuf == NULL) {
+      GST_DEBUG_OBJECT (cdata->data.pad, "No buffer available");
+      goto next;
+    }
+    ncollected++;
 
-  ret = gst_interleave_pen_buffer (self, pad, buffer);
-  if (ret != GST_FLOW_OK)
-    goto pen_failed;
+    if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP))
+      goto next;
 
-  if (self->pending_in == 0) {
-    GstBuffer *out;
+    empty = FALSE;
+    outdata = GST_BUFFER_DATA (outbuf) + width * cdata->channel;
 
-    ret = gst_interleave_process (self, G_MAXUINT, &out);
-    if (ret != GST_FLOW_OK)
-      goto process_failed;
+    self->func (outdata, GST_BUFFER_DATA (inbuf), self->channels, nsamples);
 
-    ret = gst_pad_push (self->src, out);
+  next:
+    if (inbuf)
+      gst_buffer_unref (inbuf);
   }
 
-done:
-  gst_object_unref (self);
-  return ret;
-
-pen_failed:
-  {
-    GST_WARNING_OBJECT (self, "pen failed");
-    goto done;
-  }
-process_failed:
-  {
-    GST_WARNING_OBJECT (self, "process failed");
-    goto done;
-  }
-}
+  if (ncollected == 0)
+    goto eos;
 
-static gboolean
-gst_interleave_sink_activate_push (GstPad * pad, gboolean active)
-{
-  gboolean result = TRUE;
-  GstInterleave *self;
+  if (self->segment_pending) {
+    GstEvent *event;
 
-  self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+    event = gst_event_new_new_segment_full (FALSE, self->segment_rate,
+        1.0, GST_FORMAT_TIME, self->timestamp, -1, self->segment_position);
 
-  if (active) {
-    if (self->mode == GST_ACTIVATE_NONE) {
-      self->mode = GST_ACTIVATE_PUSH;
-      result = TRUE;
-    } else if (self->mode == GST_ACTIVATE_PUSH) {
-      result = TRUE;
-    } else {
-      g_warning ("foo");
-      result = FALSE;
-    }
-  } else {
-    if (self->mode == GST_ACTIVATE_NONE) {
-      result = TRUE;
-    } else if (self->mode == GST_ACTIVATE_PUSH) {
-      self->mode = GST_ACTIVATE_NONE;
-      result = TRUE;
-    } else {
-      g_warning ("foo");
-      result = FALSE;
-    }
+    gst_pad_push_event (self->src, event);
+    self->segment_pending = FALSE;
+    self->segment_position = 0;
   }
 
-  GST_DEBUG_OBJECT (self, "result : %d", result);
+  GST_BUFFER_TIMESTAMP (outbuf) = self->timestamp;
+  GST_BUFFER_OFFSET (outbuf) = self->offset;
 
-  gst_object_unref (self);
+  self->offset += nsamples;
+  self->timestamp = gst_util_uint64_scale_int (self->offset,
+      GST_SECOND, self->rate);
 
-  return result;
-}
+  GST_BUFFER_DURATION (outbuf) = self->timestamp -
+      GST_BUFFER_TIMESTAMP (outbuf);
 
-static gboolean
-gst_interleave_src_activate_pull (GstPad * pad, gboolean active)
-{
-  gboolean result = TRUE;
-  GstInterleave *self;
+  if (empty)
+    GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_GAP);
 
-  self = GST_INTERLEAVE (gst_pad_get_parent (pad));
+  GST_LOG_OBJECT (self, "pushing outbuf, timestamp %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)));
+  ret = gst_pad_push (self->src, outbuf);
 
-  if (active) {
-    if (self->mode == GST_ACTIVATE_NONE) {
-      GList *l;
-
-      if (GST_ELEMENT (self)->sinkpads) {
-        for (l = GST_ELEMENT (self)->sinkpads; l; l = l->next)
-          result &= gst_pad_activate_pull (GST_PAD (l->data), active);
-      } else {
-        /* nobody has requested pads, seems i am operating in delayed-request
-           push mode */
-        result = FALSE;
-      }
-      if (result)
-        self->mode = GST_ACTIVATE_PULL;
-    } else if (self->mode == GST_ACTIVATE_PULL) {
-      result = TRUE;
-    } else {
-      g_warning ("foo");
-      result = FALSE;
-    }
-  } else {
-    if (self->mode == GST_ACTIVATE_NONE) {
-      result = TRUE;
-    } else if (self->mode == GST_ACTIVATE_PULL) {
-      GList *l;
-
-      for (l = GST_ELEMENT (self)->sinkpads; l; l = l->next)
-        result &= gst_pad_activate_pull (GST_PAD (l->data), active);
-      if (result)
-        self->mode = GST_ACTIVATE_NONE;
-      result = TRUE;
-    } else {
-      g_warning ("foo");
-      result = FALSE;
-    }
+  return ret;
 
-    gst_interleave_unset_caps (self);
-    gst_interleave_flush (self);
+eos:
+  {
+    GST_DEBUG_OBJECT (self, "no data available, must be EOS");
+    gst_buffer_unref (outbuf);
+    gst_pad_push_event (self->src, gst_event_new_eos ());
+    return GST_FLOW_UNEXPECTED;
   }
-
-  GST_DEBUG_OBJECT (self, "result : %d", result);
-
-  gst_object_unref (self);
-
-  return result;
 }
index 915871e..6500695 100644 (file)
@@ -3,8 +3,9 @@
  *                    2000 Wim Taymans <wtay@chello.be>
  *                    2005 Wim Taymans <wim@fluendo.com>
  *                    2007 Andy Wingo <wingo at pobox.com>
+ *                    2008 Sebastian Dröge <slomo@circular-chaos.org>
  *
- * interleave.c: interleave samples, based on gstsignalprocessor.c
+ * interleave.c: interleave samples, mostly based on adder
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -26,6 +27,7 @@
 #define __INTERLEAVE_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstcollectpads.h>
 
 G_BEGIN_DECLS
 
@@ -40,18 +42,34 @@ G_BEGIN_DECLS
 typedef struct _GstInterleave GstInterleave;
 typedef struct _GstInterleaveClass GstInterleaveClass;
 
+typedef void (*GstInterleaveFunc) (gpointer out, gpointer in, guint stride, guint nframes);
+
 struct _GstInterleave
 {
   GstElement element;
 
   /*< private >*/
+  GstCollectPads *collect;
+
+  gint channels;
+  gint rate;
+  gint width;
+
   GstCaps *sinkcaps;
-  guint channels;
 
-  GstPad *src;
-  GstActivateMode mode;
+  GstClockTime timestamp;
+  guint64 offset;
 
-  guint pending_in;
+  gboolean segment_pending;
+  guint64 segment_position;
+  gdouble segment_rate;
+  GstSegment segment;
+
+  GstPadEventFunction collect_event;
+
+  GstInterleaveFunc func;
+
+  GstPad *src;
 };
 
 struct _GstInterleaveClass
index 6d9fdd0..433e2c5 100644 (file)
@@ -1,5 +1,6 @@
 /* GStreamer unit tests for the interleave element
  * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
+ * Copyright (C) 2008 Sebastian Dröge <slomo@circular-chaos.org>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -63,6 +64,442 @@ GST_START_TEST (test_request_pads)
 
 GST_END_TEST;
 
+static GstPad **mysrcpads, *mysinkpad;
+static GstBus *bus;
+static GstElement *interleave;
+static gint have_data;
+static gfloat input[2];
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS ("audio/x-raw-float, "
+        "width = (int) 32, "
+        "channels = (int) 2, "
+        "rate = (int) 48000, " "endianness = (int) BYTE_ORDER"));
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS ("audio/x-raw-float, "
+        "width = (int) 32, "
+        "channels = (int) 1, "
+        "rate = (int) 48000, " "endianness = (int) BYTE_ORDER"));
+
+#define CAPS_48khz \
+         "audio/x-raw-float, " \
+        "width = (int) 32, " \
+        "channels = (int) 1, " \
+        "rate = (int) 48000, " \
+       "endianness = (int) BYTE_ORDER"
+
+static GstFlowReturn
+interleave_chain_func (GstPad * pad, GstBuffer * buffer)
+{
+  gfloat *outdata;
+  gint i;
+
+  fail_unless (GST_IS_BUFFER (buffer));
+  fail_unless_equals_int (GST_BUFFER_SIZE (buffer),
+      48000 * 2 * sizeof (gfloat));
+  fail_unless (GST_BUFFER_DATA (buffer) != NULL);
+
+  outdata = (gfloat *) GST_BUFFER_DATA (buffer);
+
+  for (i = 0; i < 48000 * 2; i += 2) {
+    fail_unless_equals_float (outdata[i], input[0]);
+    fail_unless_equals_float (outdata[i + 1], input[1]);
+  }
+
+  have_data++;
+
+  gst_buffer_unref (buffer);
+
+  return GST_FLOW_OK;
+}
+
+GST_START_TEST (test_interleave_2ch)
+{
+  GstElement *queue;
+  GstPad *sink0, *sink1, *src, *tmp;
+  GstCaps *caps;
+  gint i;
+
+  GstBuffer *inbuf;
+  gfloat *indata;
+
+  mysrcpads = g_new0 (GstPad *, 2);
+
+  have_data = 0;
+
+  interleave = gst_element_factory_make ("interleave", NULL);
+  fail_unless (interleave != NULL);
+
+  queue = gst_element_factory_make ("queue", "queue");
+  fail_unless (queue != NULL);
+
+  sink0 = gst_element_get_request_pad (interleave, "sink%d");
+  fail_unless (sink0 != NULL);
+  fail_unless_equals_string (GST_OBJECT_NAME (sink0), "sink0");
+
+  sink1 = gst_element_get_request_pad (interleave, "sink%d");
+  fail_unless (sink1 != NULL);
+  fail_unless_equals_string (GST_OBJECT_NAME (sink1), "sink1");
+
+  mysrcpads[0] = gst_pad_new_from_static_template (&srctemplate, "src0");
+  fail_unless (mysrcpads[0] != NULL);
+
+  caps = gst_caps_from_string (CAPS_48khz);
+  fail_unless (gst_pad_set_caps (mysrcpads[0], caps));
+  gst_pad_use_fixed_caps (mysrcpads[0]);
+
+  mysrcpads[1] = gst_pad_new_from_static_template (&srctemplate, "src1");
+  fail_unless (mysrcpads[1] != NULL);
+
+  fail_unless (gst_pad_set_caps (mysrcpads[1], caps));
+  gst_pad_use_fixed_caps (mysrcpads[1]);
+
+  tmp = gst_element_get_static_pad (queue, "sink");
+  fail_unless (gst_pad_link (mysrcpads[0], tmp) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+  tmp = gst_element_get_static_pad (queue, "src");
+  fail_unless (gst_pad_link (tmp, sink0) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+
+  fail_unless (gst_pad_link (mysrcpads[1], sink1) == GST_PAD_LINK_OK);
+
+  mysinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
+  fail_unless (mysinkpad != NULL);
+  gst_pad_set_chain_function (mysinkpad, interleave_chain_func);
+  gst_pad_set_active (mysinkpad, TRUE);
+
+  src = gst_element_get_static_pad (interleave, "src");
+  fail_unless (src != NULL);
+  fail_unless (gst_pad_link (src, mysinkpad) == GST_PAD_LINK_OK);
+  gst_object_unref (src);
+
+  bus = gst_bus_new ();
+  gst_element_set_bus (interleave, bus);
+
+  fail_unless (gst_element_set_state (interleave,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS);
+  fail_unless (gst_element_set_state (queue,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS);
+
+  input[0] = -1.0;
+  inbuf = gst_buffer_new_and_alloc (48000 * sizeof (gfloat));
+  indata = (gfloat *) GST_BUFFER_DATA (inbuf);
+  for (i = 0; i < 48000; i++)
+    indata[i] = -1.0;
+  gst_buffer_set_caps (inbuf, caps);
+  fail_unless (gst_pad_push (mysrcpads[0], inbuf) == GST_FLOW_OK);
+
+  input[1] = 1.0;
+  inbuf = gst_buffer_new_and_alloc (48000 * sizeof (gfloat));
+  indata = (gfloat *) GST_BUFFER_DATA (inbuf);
+  for (i = 0; i < 48000; i++)
+    indata[i] = 1.0;
+  gst_buffer_set_caps (inbuf, caps);
+  fail_unless (gst_pad_push (mysrcpads[1], inbuf) == GST_FLOW_OK);
+
+  inbuf = gst_buffer_new_and_alloc (48000 * sizeof (gfloat));
+  indata = (gfloat *) GST_BUFFER_DATA (inbuf);
+  for (i = 0; i < 48000; i++)
+    indata[i] = -1.0;
+  gst_buffer_set_caps (inbuf, caps);
+  fail_unless (gst_pad_push (mysrcpads[0], inbuf) == GST_FLOW_OK);
+
+  inbuf = gst_buffer_new_and_alloc (48000 * sizeof (gfloat));
+  indata = (gfloat *) GST_BUFFER_DATA (inbuf);
+  for (i = 0; i < 48000; i++)
+    indata[i] = 1.0;
+  gst_buffer_set_caps (inbuf, caps);
+  fail_unless (gst_pad_push (mysrcpads[1], inbuf) == GST_FLOW_OK);
+
+  fail_unless (have_data == 2);
+
+  gst_object_unref (mysrcpads[0]);
+  gst_object_unref (mysrcpads[1]);
+  gst_object_unref (mysinkpad);
+
+  gst_element_release_request_pad (interleave, sink0);
+  gst_object_unref (sink0);
+  gst_element_release_request_pad (interleave, sink1);
+  gst_object_unref (sink1);
+
+  gst_element_set_state (interleave, GST_STATE_NULL);
+  gst_element_set_state (queue, GST_STATE_NULL);
+  gst_object_unref (interleave);
+  gst_object_unref (queue);
+  gst_object_unref (bus);
+  gst_caps_unref (caps);
+
+  g_free (mysrcpads);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_interleave_2ch_1eos)
+{
+  GstElement *queue;
+  GstPad *sink0, *sink1, *src, *tmp;
+  GstCaps *caps;
+  gint i;
+
+  GstBuffer *inbuf;
+  gfloat *indata;
+
+  mysrcpads = g_new0 (GstPad *, 2);
+
+  have_data = 0;
+
+  interleave = gst_element_factory_make ("interleave", NULL);
+  fail_unless (interleave != NULL);
+
+  queue = gst_element_factory_make ("queue", "queue");
+  fail_unless (queue != NULL);
+
+  sink0 = gst_element_get_request_pad (interleave, "sink%d");
+  fail_unless (sink0 != NULL);
+  fail_unless_equals_string (GST_OBJECT_NAME (sink0), "sink0");
+
+  sink1 = gst_element_get_request_pad (interleave, "sink%d");
+  fail_unless (sink1 != NULL);
+  fail_unless_equals_string (GST_OBJECT_NAME (sink1), "sink1");
+
+  mysrcpads[0] = gst_pad_new_from_static_template (&srctemplate, "src0");
+  fail_unless (mysrcpads[0] != NULL);
+
+  caps = gst_caps_from_string (CAPS_48khz);
+  fail_unless (gst_pad_set_caps (mysrcpads[0], caps));
+  gst_pad_use_fixed_caps (mysrcpads[0]);
+
+  mysrcpads[1] = gst_pad_new_from_static_template (&srctemplate, "src1");
+  fail_unless (mysrcpads[1] != NULL);
+
+  fail_unless (gst_pad_set_caps (mysrcpads[1], caps));
+  gst_pad_use_fixed_caps (mysrcpads[1]);
+
+  tmp = gst_element_get_static_pad (queue, "sink");
+  fail_unless (gst_pad_link (mysrcpads[0], tmp) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+  tmp = gst_element_get_static_pad (queue, "src");
+  fail_unless (gst_pad_link (tmp, sink0) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+
+  fail_unless (gst_pad_link (mysrcpads[1], sink1) == GST_PAD_LINK_OK);
+
+  mysinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
+  fail_unless (mysinkpad != NULL);
+  gst_pad_set_chain_function (mysinkpad, interleave_chain_func);
+  gst_pad_set_active (mysinkpad, TRUE);
+
+  src = gst_element_get_static_pad (interleave, "src");
+  fail_unless (src != NULL);
+  fail_unless (gst_pad_link (src, mysinkpad) == GST_PAD_LINK_OK);
+  gst_object_unref (src);
+
+  bus = gst_bus_new ();
+  gst_element_set_bus (interleave, bus);
+
+  fail_unless (gst_element_set_state (interleave,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS);
+  fail_unless (gst_element_set_state (queue,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS);
+
+  input[0] = -1.0;
+  inbuf = gst_buffer_new_and_alloc (48000 * sizeof (gfloat));
+  indata = (gfloat *) GST_BUFFER_DATA (inbuf);
+  for (i = 0; i < 48000; i++)
+    indata[i] = -1.0;
+  gst_buffer_set_caps (inbuf, caps);
+  fail_unless (gst_pad_push (mysrcpads[0], inbuf) == GST_FLOW_OK);
+
+  input[1] = 1.0;
+  inbuf = gst_buffer_new_and_alloc (48000 * sizeof (gfloat));
+  indata = (gfloat *) GST_BUFFER_DATA (inbuf);
+  for (i = 0; i < 48000; i++)
+    indata[i] = 1.0;
+  gst_buffer_set_caps (inbuf, caps);
+  fail_unless (gst_pad_push (mysrcpads[1], inbuf) == GST_FLOW_OK);
+
+  input[0] = 0.0;
+  gst_pad_push_event (mysrcpads[0], gst_event_new_eos ());
+
+  input[1] = 1.0;
+  inbuf = gst_buffer_new_and_alloc (48000 * sizeof (gfloat));
+  indata = (gfloat *) GST_BUFFER_DATA (inbuf);
+  for (i = 0; i < 48000; i++)
+    indata[i] = 1.0;
+  gst_buffer_set_caps (inbuf, caps);
+  fail_unless (gst_pad_push (mysrcpads[1], inbuf) == GST_FLOW_OK);
+
+  fail_unless (have_data == 2);
+
+  gst_object_unref (mysrcpads[0]);
+  gst_object_unref (mysrcpads[1]);
+  gst_object_unref (mysinkpad);
+
+  gst_element_release_request_pad (interleave, sink0);
+  gst_object_unref (sink0);
+  gst_element_release_request_pad (interleave, sink1);
+  gst_object_unref (sink1);
+
+  gst_element_set_state (interleave, GST_STATE_NULL);
+  gst_element_set_state (queue, GST_STATE_NULL);
+  gst_object_unref (interleave);
+  gst_object_unref (queue);
+  gst_object_unref (bus);
+  gst_caps_unref (caps);
+
+  g_free (mysrcpads);
+}
+
+GST_END_TEST;
+
+static void
+src_handoff_float32 (GstElement * element, GstBuffer * buffer, GstPad * pad,
+    gpointer user_data)
+{
+  gint n = GPOINTER_TO_INT (user_data);
+  GstCaps *caps;
+  gfloat *data;
+  gint i;
+
+  caps = gst_caps_new_simple ("audio/x-raw-float",
+      "width", G_TYPE_INT, 32,
+      "channels", G_TYPE_INT, 1,
+      "rate", G_TYPE_INT, 48000, "endianness", G_TYPE_INT, G_BYTE_ORDER, NULL);
+
+  data = g_new (gfloat, 48000);
+  GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) data;
+  GST_BUFFER_DATA (buffer) = (guint8 *) data;
+  GST_BUFFER_SIZE (buffer) = 48000 * sizeof (gfloat);
+
+  GST_BUFFER_OFFSET (buffer) = GST_BUFFER_OFFSET_NONE;
+  GST_BUFFER_TIMESTAMP (buffer) = GST_CLOCK_TIME_NONE;
+  GST_BUFFER_OFFSET_END (buffer) = GST_BUFFER_OFFSET_NONE;
+  GST_BUFFER_DURATION (buffer) = GST_SECOND;
+
+  gst_buffer_set_caps (buffer, caps);
+  gst_caps_unref (caps);
+
+  for (i = 0; i < 48000; i++)
+    data[i] = (n == 0) ? -1.0 : 1.0;
+}
+
+static void
+sink_handoff_float32 (GstElement * element, GstBuffer * buffer, GstPad * pad,
+    gpointer user_data)
+{
+  gint i;
+  gfloat *data;
+  GstCaps *caps;
+
+  fail_unless (GST_IS_BUFFER (buffer));
+  fail_unless_equals_int (GST_BUFFER_SIZE (buffer),
+      48000 * 2 * sizeof (gfloat));
+  fail_unless_equals_int (GST_BUFFER_DURATION (buffer), GST_SECOND);
+
+  caps = gst_caps_new_simple ("audio/x-raw-float",
+      "width", G_TYPE_INT, 32,
+      "channels", G_TYPE_INT, 2,
+      "rate", G_TYPE_INT, 48000, "endianness", G_TYPE_INT, G_BYTE_ORDER, NULL);
+
+  fail_unless (gst_caps_is_equal (caps, GST_BUFFER_CAPS (buffer)));
+  gst_caps_unref (caps);
+
+  data = (gfloat *) GST_BUFFER_DATA (buffer);
+
+  for (i = 0; i < 48000 * 2; i += 2) {
+    fail_unless_equals_float (data[i], -1.0);
+    fail_unless_equals_float (data[i + 1], 1.0);
+  }
+
+  have_data++;
+}
+
+GST_START_TEST (test_interleave_2ch_pipeline)
+{
+  GstElement *pipeline, *queue, *src1, *src2, *interleave, *sink;
+  GstPad *sinkpad0, *sinkpad1, *tmp, *tmp2;
+  GstMessage *msg;
+
+  have_data = 0;
+
+  pipeline = (GstElement *) gst_pipeline_new ("pipeline");
+  fail_unless (pipeline != NULL);
+
+  src1 = gst_element_factory_make ("fakesrc", "src1");
+  fail_unless (src1 != NULL);
+  g_object_set (src1, "num-buffers", 4, NULL);
+  g_object_set (src1, "signal-handoffs", TRUE, NULL);
+  g_signal_connect (src1, "handoff", G_CALLBACK (src_handoff_float32),
+      GINT_TO_POINTER (0));
+  gst_bin_add (GST_BIN (pipeline), src1);
+
+  src2 = gst_element_factory_make ("fakesrc", "src2");
+  fail_unless (src2 != NULL);
+  g_object_set (src2, "num-buffers", 4, NULL);
+  g_object_set (src2, "signal-handoffs", TRUE, NULL);
+  g_signal_connect (src2, "handoff", G_CALLBACK (src_handoff_float32),
+      GINT_TO_POINTER (1));
+  gst_bin_add (GST_BIN (pipeline), src2);
+
+  queue = gst_element_factory_make ("queue", "queue");
+  fail_unless (queue != NULL);
+  gst_bin_add (GST_BIN (pipeline), queue);
+
+  interleave = gst_element_factory_make ("interleave", "interleave");
+  fail_unless (interleave != NULL);
+  gst_bin_add (GST_BIN (pipeline), gst_object_ref (interleave));
+
+  sinkpad0 = gst_element_get_request_pad (interleave, "sink%d");
+  fail_unless (sinkpad0 != NULL);
+  tmp = gst_element_get_static_pad (src1, "src");
+  fail_unless (gst_pad_link (tmp, sinkpad0) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+
+  sinkpad1 = gst_element_get_request_pad (interleave, "sink%d");
+  fail_unless (sinkpad1 != NULL);
+  tmp = gst_element_get_static_pad (src2, "src");
+  tmp2 = gst_element_get_static_pad (queue, "sink");
+  fail_unless (gst_pad_link (tmp, tmp2) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+  gst_object_unref (tmp2);
+  tmp = gst_element_get_static_pad (queue, "src");
+  fail_unless (gst_pad_link (tmp, sinkpad1) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+
+  sink = gst_element_factory_make ("fakesink", "sink");
+  fail_unless (sink != NULL);
+  g_object_set (sink, "signal-handoffs", TRUE, NULL);
+  g_signal_connect (sink, "handoff", G_CALLBACK (sink_handoff_float32), NULL);
+  gst_bin_add (GST_BIN (pipeline), sink);
+  tmp = gst_element_get_static_pad (interleave, "src");
+  tmp2 = gst_element_get_static_pad (sink, "sink");
+  fail_unless (gst_pad_link (tmp, tmp2) == GST_PAD_LINK_OK);
+  gst_object_unref (tmp);
+  gst_object_unref (tmp2);
+
+  gst_element_set_state (pipeline, GST_STATE_PLAYING);
+
+  msg = gst_bus_poll (GST_ELEMENT_BUS (pipeline), GST_MESSAGE_EOS, -1);
+  gst_message_unref (msg);
+
+  fail_unless (have_data == 4);
+
+  gst_element_set_state (pipeline, GST_STATE_NULL);
+  gst_object_unref (pipeline);
+  gst_element_release_request_pad (interleave, sinkpad0);
+  gst_object_unref (sinkpad0);
+  gst_element_release_request_pad (interleave, sinkpad1);
+  gst_object_unref (sinkpad1);
+  gst_object_unref (interleave);
+}
+
+GST_END_TEST;
+
 static Suite *
 interleave_suite (void)
 {
@@ -72,6 +509,9 @@ interleave_suite (void)
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_create_and_unref);
   tcase_add_test (tc_chain, test_request_pads);
+  tcase_add_test (tc_chain, test_interleave_2ch);
+  tcase_add_test (tc_chain, test_interleave_2ch_1eos);
+  tcase_add_test (tc_chain, test_interleave_2ch_pipeline);
 
   return s;
 }