gst/filter/gstlpwsinc.*: Implement latency query and only forward those samples downs...
authorSebastian Dröge <slomo@circular-chaos.org>
Thu, 16 Aug 2007 09:48:27 +0000 (09:48 +0000)
committerSebastian Dröge <slomo@circular-chaos.org>
Thu, 16 Aug 2007 09:48:27 +0000 (09:48 +0000)
Original commit message from CVS:
* gst/filter/gstlpwsinc.c: (gst_lpwsinc_class_init),
(gst_lpwsinc_init), (process_32), (process_64),
(lpwsinc_build_kernel), (lpwsinc_push_residue),
(lpwsinc_transform), (lpwsinc_start), (lpwsinc_query),
(lpwsinc_query_type), (lpwsinc_event), (lpwsinc_set_property):
* gst/filter/gstlpwsinc.h:
Implement latency query and only forward those samples downstream
that actually contain the data we want, i.e. drop kernel_length/2
in the beginning and append kernel_length/2 (created by convolving
the filter kernel with zeroes) to the end.
* tests/check/elements/lpwsinc.c: (GST_START_TEST):
Adjust the unit test for this slightly changed behaviour.

gst/audiofx/audiowsinclimit.c
gst/audiofx/audiowsinclimit.h
tests/check/elements/audiowsinclimit.c

index c8b7beb..d86aace 100644 (file)
@@ -171,9 +171,13 @@ static GstFlowReturn lpwsinc_transform (GstBaseTransform * base,
 static gboolean lpwsinc_get_unit_size (GstBaseTransform * base, GstCaps * caps,
     guint * size);
 static gboolean lpwsinc_start (GstBaseTransform * base);
+static gboolean lpwsinc_event (GstBaseTransform * base, GstEvent * event);
 static gboolean lpwsinc_setup (GstAudioFilter * base,
     GstRingBufferSpec * format);
 
+static gboolean lpwsinc_query (GstPad * pad, GstQuery * query);
+static const GstQueryType *lpwsinc_query_type (GstPad * pad);
+
 /* Element class */
 
 static void
@@ -246,6 +250,7 @@ gst_lpwsinc_class_init (GstLPWSincClass * klass)
   trans_class->transform = GST_DEBUG_FUNCPTR (lpwsinc_transform);
   trans_class->get_unit_size = GST_DEBUG_FUNCPTR (lpwsinc_get_unit_size);
   trans_class->start = GST_DEBUG_FUNCPTR (lpwsinc_start);
+  trans_class->event = GST_DEBUG_FUNCPTR (lpwsinc_event);
   filter_class->setup = GST_DEBUG_FUNCPTR (lpwsinc_setup);
 }
 
@@ -255,11 +260,19 @@ gst_lpwsinc_init (GstLPWSinc * self, GstLPWSincClass * g_class)
   self->mode = MODE_LOW_PASS;
   self->window = WINDOW_HAMMING;
   self->kernel_length = 101;
+  self->latency = 50;
   self->frequency = 0.0;
   self->kernel = NULL;
   self->residue = NULL;
 
   self->have_kernel = FALSE;
+  self->residue_length = 0;
+  self->next_ts = GST_CLOCK_TIME_NONE;
+  self->next_off = GST_BUFFER_OFFSET_NONE;
+
+  gst_pad_set_query_function (GST_BASE_TRANSFORM (self)->srcpad, lpwsinc_query);
+  gst_pad_set_query_type_function (GST_BASE_TRANSFORM (self)->srcpad,
+      lpwsinc_query_type);
 }
 
 static void
@@ -296,6 +309,10 @@ process_32 (GstLPWSinc * self, gfloat * src, gfloat * dst, guint input_samples)
     self->residue[i] = self->residue[i + input_samples];
   for (i = res_start; i < kernel_length * channels; i++)
     self->residue[i] = src[input_samples - kernel_length * channels + i];
+
+  self->residue_length += kernel_length * channels - res_start;
+  if (self->residue_length > kernel_length * channels)
+    self->residue_length = kernel_length * channels;
 }
 
 static void
@@ -333,6 +350,10 @@ process_64 (GstLPWSinc * self, gdouble * src, gdouble * dst,
     self->residue[i] = self->residue[i + input_samples];
   for (i = res_start; i < kernel_length * channels; i++)
     self->residue[i] = src[input_samples - kernel_length * channels + i];
+
+  self->residue_length += kernel_length * channels - res_start;
+  if (self->residue_length > kernel_length * channels)
+    self->residue_length = kernel_length * channels;
 }
 
 static void
@@ -400,12 +421,88 @@ lpwsinc_build_kernel (GstLPWSinc * self)
   }
 
   /* set up the residue memory space */
-  if (self->residue)
-    g_free (self->residue);
-  self->residue =
-      g_new0 (gdouble, len * GST_AUDIO_FILTER (self)->format.channels);
+  if (!self->residue)
+    self->residue =
+        g_new0 (gdouble, len * GST_AUDIO_FILTER (self)->format.channels);
 
   self->have_kernel = TRUE;
+  self->residue_length = 0;
+}
+
+static void
+lpwsinc_push_residue (GstLPWSinc * self)
+{
+  GstBuffer *outbuf;
+  GstFlowReturn res;
+  gint rate = GST_AUDIO_FILTER (self)->format.rate;
+  gint channels = GST_AUDIO_FILTER (self)->format.channels;
+  gint outsize, outsamples;
+  gint diffsize, diffsamples;
+  guint8 *in, *out;
+
+  /* Calculate the number of samples and their memory size that
+   * should be pushed from the residue */
+  outsamples = MIN (self->latency, self->residue_length / channels);
+  outsize = outsamples * channels * (GST_AUDIO_FILTER (self)->format.width / 8);
+  if (outsize == 0)
+    return;
+
+  /* Process the difference between latency and residue_length samples
+   * to start at the actual data instead of starting at the zeros before
+   * when we only got one buffer smaller than latency */
+  diffsamples = self->latency - self->residue_length / channels;
+  diffsize =
+      diffsamples * channels * (GST_AUDIO_FILTER (self)->format.width / 8);
+  if (diffsize > 0) {
+    in = g_new0 (guint8, diffsize);
+    out = g_new0 (guint8, diffsize);
+    self->process (self, in, out, diffsamples * channels);
+    g_free (in);
+    g_free (out);
+  }
+
+  res = gst_pad_alloc_buffer (GST_BASE_TRANSFORM (self)->srcpad,
+      GST_BUFFER_OFFSET_NONE, outsize,
+      GST_PAD_CAPS (GST_BASE_TRANSFORM (self)->srcpad), &outbuf);
+
+  if (G_UNLIKELY (res != GST_FLOW_OK)) {
+    GST_WARNING_OBJECT (self, "failed allocating buffer of %d bytes", outsize);
+    return;
+  }
+
+  /* Convolve the residue with zeros to get the actual remaining data */
+  in = g_new0 (guint8, outsize);
+  self->process (self, in, GST_BUFFER_DATA (outbuf), outsamples * channels);
+  g_free (in);
+
+  /* Set timestamp, offset, etc from the values we
+   * saved when processing the regular buffers */
+  if (GST_CLOCK_TIME_IS_VALID (self->next_ts))
+    GST_BUFFER_TIMESTAMP (outbuf) = self->next_ts;
+  else
+    GST_BUFFER_TIMESTAMP (outbuf) = 0;
+  GST_BUFFER_DURATION (outbuf) =
+      gst_util_uint64_scale (outsamples, GST_SECOND, rate);
+  self->next_ts += gst_util_uint64_scale (outsamples, GST_SECOND, rate);
+
+  if (self->next_off != GST_BUFFER_OFFSET_NONE) {
+    GST_BUFFER_OFFSET (outbuf) = self->next_off;
+    GST_BUFFER_OFFSET_END (outbuf) = self->next_off + outsamples;
+  }
+
+  GST_DEBUG_OBJECT (self, "Pushing residue buffer of size %d with timestamp: %"
+      GST_TIME_FORMAT ", duration: %" GST_TIME_FORMAT ", offset: %lld,"
+      " offset_end: %lld, nsamples: %d", GST_BUFFER_SIZE (outbuf),
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)), GST_BUFFER_OFFSET (outbuf),
+      GST_BUFFER_OFFSET_END (outbuf), outsamples);
+
+  res = gst_pad_push (GST_BASE_TRANSFORM (self)->srcpad, outbuf);
+
+  if (G_UNLIKELY (res != GST_FLOW_OK)) {
+    GST_WARNING_OBJECT (self, "failed to push residue");
+  }
+
 }
 
 /* GstAudioFilter vmethod implementations */
@@ -456,8 +553,12 @@ lpwsinc_transform (GstBaseTransform * base, GstBuffer * inbuf,
 {
   GstLPWSinc *self = GST_LPWSINC (base);
   GstClockTime timestamp;
+  gint channels = GST_AUDIO_FILTER (self)->format.channels;
+  gint rate = GST_AUDIO_FILTER (self)->format.rate;
   gint input_samples =
       GST_BUFFER_SIZE (outbuf) / (GST_AUDIO_FILTER (self)->format.width / 8);
+  gint output_samples = input_samples;
+  gint diff;
 
   /* don't process data in passthrough-mode */
   if (gst_base_transform_is_passthrough (base))
@@ -471,9 +572,100 @@ lpwsinc_transform (GstBaseTransform * base, GstBuffer * inbuf,
   if (!self->have_kernel)
     lpwsinc_build_kernel (self);
 
+  /* Reset the residue if already existing on discont buffers */
+  if (GST_BUFFER_IS_DISCONT (inbuf)) {
+    if (channels && self->residue)
+      memset (self->residue, 0, channels *
+          self->kernel_length * sizeof (gdouble));
+    self->residue_length = 0;
+    self->next_ts = GST_CLOCK_TIME_NONE;
+    self->next_off = GST_BUFFER_OFFSET_NONE;
+  }
+
+  /* Calculate the number of samples we can push out now without outputting
+   * kernel_length/2 zeros in the beginning */
+  diff = (self->kernel_length / 2) * channels - self->residue_length;
+  if (diff > 0)
+    output_samples -= diff;
+
   self->process (self, GST_BUFFER_DATA (inbuf), GST_BUFFER_DATA (outbuf),
       input_samples);
 
+  if (output_samples <= 0) {
+    /* Drop buffer and save original timestamp/offset for later use */
+    if (!GST_CLOCK_TIME_IS_VALID (self->next_ts)
+        && GST_BUFFER_TIMESTAMP_IS_VALID (outbuf))
+      self->next_ts = GST_BUFFER_TIMESTAMP (outbuf);
+    if (self->next_off == GST_BUFFER_OFFSET_NONE
+        && GST_BUFFER_OFFSET_IS_VALID (outbuf))
+      self->next_off = GST_BUFFER_OFFSET (outbuf);
+    return GST_BASE_TRANSFORM_FLOW_DROPPED;
+  } else if (output_samples < input_samples) {
+    /* First (probably partial) buffer after starting from
+     * a clean residue. Use stored timestamp/offset here */
+    if (GST_CLOCK_TIME_IS_VALID (self->next_ts))
+      GST_BUFFER_TIMESTAMP (outbuf) = self->next_ts;
+
+    if (self->next_off != GST_BUFFER_OFFSET_NONE) {
+      GST_BUFFER_OFFSET (outbuf) = self->next_off;
+      if (GST_BUFFER_OFFSET_END_IS_VALID (outbuf))
+        GST_BUFFER_OFFSET_END (outbuf) =
+            self->next_off + output_samples / channels;
+    } else {
+      /* We dropped no buffer, offset is valid, offset_end must be adjusted by diff */
+      if (GST_BUFFER_OFFSET_END_IS_VALID (outbuf))
+        GST_BUFFER_OFFSET_END (outbuf) -= diff / channels;
+    }
+
+    if (GST_BUFFER_DURATION_IS_VALID (outbuf))
+      GST_BUFFER_DURATION (outbuf) -=
+          gst_util_uint64_scale (diff, GST_SECOND, channels * rate);
+
+    GST_BUFFER_DATA (outbuf) +=
+        diff * (GST_AUDIO_FILTER (self)->format.width / 8);
+    GST_BUFFER_SIZE (outbuf) -=
+        diff * (GST_AUDIO_FILTER (self)->format.width / 8);
+  } else {
+    GstClockTime ts_latency =
+        gst_util_uint64_scale (self->latency, GST_SECOND, rate);
+
+    /* Normal buffer, adjust timestamp/offset/etc by latency */
+    if (GST_BUFFER_TIMESTAMP (outbuf) < ts_latency) {
+      GST_WARNING_OBJECT (self, "GST_BUFFER_TIMESTAMP (outbuf) < latency");
+      GST_BUFFER_TIMESTAMP (outbuf) = 0;
+    } else {
+      GST_BUFFER_TIMESTAMP (outbuf) -= ts_latency;
+    }
+
+    if (GST_BUFFER_OFFSET_IS_VALID (outbuf)) {
+      if (GST_BUFFER_OFFSET (outbuf) > self->latency) {
+        GST_BUFFER_OFFSET (outbuf) -= self->latency;
+      } else {
+        GST_WARNING_OBJECT (self, "GST_BUFFER_OFFSET (outbuf) < latency");
+        GST_BUFFER_OFFSET (outbuf) = 0;
+      }
+    }
+
+    if (GST_BUFFER_OFFSET_END_IS_VALID (outbuf)) {
+      if (GST_BUFFER_OFFSET_END (outbuf) > self->latency) {
+        GST_BUFFER_OFFSET_END (outbuf) -= self->latency;
+      } else {
+        GST_WARNING_OBJECT (self, "GST_BUFFER_OFFSET_END (outbuf) < latency");
+        GST_BUFFER_OFFSET_END (outbuf) = 0;
+      }
+    }
+  }
+
+  GST_DEBUG_OBJECT (self, "Pushing buffer of size %d with timestamp: %"
+      GST_TIME_FORMAT ", duration: %" GST_TIME_FORMAT ", offset: %lld,"
+      " offset_end: %lld, nsamples: %d", GST_BUFFER_SIZE (outbuf),
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)), GST_BUFFER_OFFSET (outbuf),
+      GST_BUFFER_OFFSET_END (outbuf), output_samples / channels);
+
+  self->next_ts = GST_BUFFER_TIMESTAMP (outbuf) + GST_BUFFER_DURATION (outbuf);
+  self->next_off = GST_BUFFER_OFFSET_END (outbuf);
+
   return GST_FLOW_OK;
 }
 
@@ -488,9 +680,93 @@ lpwsinc_start (GstBaseTransform * base)
     memset (self->residue, 0, channels *
         self->kernel_length * sizeof (gdouble));
 
+  self->residue_length = 0;
+  self->next_ts = GST_CLOCK_TIME_NONE;
+  self->next_off = GST_BUFFER_OFFSET_NONE;
+
   return TRUE;
 }
 
+static gboolean
+lpwsinc_query (GstPad * pad, GstQuery * query)
+{
+  GstLPWSinc *self = GST_LPWSINC (gst_pad_get_parent (pad));
+  gboolean res = TRUE;
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_LATENCY:
+    {
+      GstClockTime min, max;
+      gboolean live;
+      guint64 latency;
+      GstPad *peer;
+      gint rate = GST_AUDIO_FILTER (self)->format.rate;
+
+      if ((peer = gst_pad_get_peer (GST_BASE_TRANSFORM (self)->sinkpad))) {
+        if ((res = gst_pad_query (peer, query))) {
+          gst_query_parse_latency (query, &live, &min, &max);
+
+          GST_DEBUG_OBJECT (self, "Peer latency: min %"
+              GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+              GST_TIME_ARGS (min), GST_TIME_ARGS (max));
+
+          /* add our own latency */
+          latency =
+              (rate != 0) ? gst_util_uint64_scale (self->latency, GST_SECOND,
+              rate) : 0;
+
+          GST_DEBUG_OBJECT (self, "Our latency: %"
+              GST_TIME_FORMAT, GST_TIME_ARGS (latency));
+
+          min += latency;
+          if (max != GST_CLOCK_TIME_NONE)
+            max += latency;
+
+          GST_DEBUG_OBJECT (self, "Calculated total latency : min %"
+              GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+              GST_TIME_ARGS (min), GST_TIME_ARGS (max));
+
+          gst_query_set_latency (query, live, min, max);
+        }
+        gst_object_unref (peer);
+      }
+      break;
+    }
+    default:
+      res = gst_pad_query_default (pad, query);
+      break;
+  }
+  gst_object_unref (self);
+  return res;
+}
+
+static const GstQueryType *
+lpwsinc_query_type (GstPad * pad)
+{
+  static const GstQueryType types[] = {
+    GST_QUERY_LATENCY,
+    0
+  };
+
+  return types;
+}
+
+static gboolean
+lpwsinc_event (GstBaseTransform * base, GstEvent * event)
+{
+  GstLPWSinc *self = GST_LPWSINC (base);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_EOS:
+      lpwsinc_push_residue (self);
+      break;
+    default:
+      break;
+  }
+
+  return GST_BASE_TRANSFORM_CLASS (parent_class)->event (base, event);
+}
+
 static void
 lpwsinc_set_property (GObject * object, guint prop_id, const GValue * value,
     GParamSpec * pspec)
@@ -507,8 +783,17 @@ lpwsinc_set_property (GObject * object, guint prop_id, const GValue * value,
       val = g_value_get_int (value);
       if (val % 2 == 0)
         val++;
-      self->kernel_length = val;
-      lpwsinc_build_kernel (self);
+
+      if (val != self->kernel_length) {
+        if (self->residue) {
+          lpwsinc_push_residue (self);
+          g_free (self->residue);
+          self->residue = NULL;
+        }
+        self->kernel_length = val;
+        self->latency = val / 2;
+        lpwsinc_build_kernel (self);
+      }
       GST_BASE_TRANSFORM_UNLOCK (self);
       break;
     }
index 8a8cd47..3e09bd5 100644 (file)
@@ -71,6 +71,10 @@ struct _GstLPWSinc {
   gdouble *residue;             /* buffer for left-over samples from previous buffer */
   gdouble *kernel;              /* filter kernel */
   gboolean have_kernel;
+  gint residue_length;
+  guint64 latency;
+  GstClockTime next_ts;
+  guint64 next_off;
 };
 
 struct _GstLPWSincClass {
index 8fe7b02..3d69bc9 100644 (file)
@@ -96,6 +96,7 @@ GST_START_TEST (test_lp_0hz)
   GstCaps *caps;
   gdouble *in, *res, rms;
   gint i;
+  GList *node;
 
   lpwsinc = setup_lpwsinc ();
   /* Set to lowpass */
@@ -120,21 +121,23 @@ GST_START_TEST (test_lp_0hz)
 
   /* pushing gives away my reference ... */
   fail_unless (gst_pad_push (mysrcpad, inbuffer) == GST_FLOW_OK);
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()));
   /* ... and puts a new buffer on the global list */
-  fail_unless_equals_int (g_list_length (buffers), 1);
-  fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL);
+  fail_unless (g_list_length (buffers) >= 1);
 
-  res = (gdouble *) GST_BUFFER_DATA (outbuffer);
-  for (i = 21; i < 128; i++) {
-    fail_unless (res[i] <= 1.01
-        && res[i] >= 0.99, "res[%d] = %lf\n", i, res[i]);
-  }
+  for (node = buffers; node; node = node->next) {
+    gint buffer_length;
 
-  rms = 0.0;
-  for (i = 0; i < 128; i++)
-    rms += res[i] * res[i];
-  rms = sqrt (rms / 128.0);
-  fail_unless (rms >= 0.9);
+    fail_if ((outbuffer = (GstBuffer *) node->data) == NULL);
+
+    res = (gdouble *) GST_BUFFER_DATA (outbuffer);
+    buffer_length = GST_BUFFER_SIZE (outbuffer) / sizeof (gdouble);
+    rms = 0.0;
+    for (i = 0; i < buffer_length; i++)
+      rms += res[i] * res[i];
+    rms = sqrt (rms / buffer_length);
+    fail_unless (rms >= 0.9);
+  }
 
   /* cleanup */
   cleanup_lpwsinc (lpwsinc);
@@ -152,6 +155,7 @@ GST_START_TEST (test_lp_22050hz)
   GstCaps *caps;
   gdouble *in, *res, rms;
   gint i;
+  GList *node;
 
   lpwsinc = setup_lpwsinc ();
   /* Set to lowpass */
@@ -177,21 +181,23 @@ GST_START_TEST (test_lp_22050hz)
 
   /* pushing gives away my reference ... */
   fail_unless (gst_pad_push (mysrcpad, inbuffer) == GST_FLOW_OK);
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()));
   /* ... and puts a new buffer on the global list */
-  fail_unless_equals_int (g_list_length (buffers), 1);
-  fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL);
+  fail_unless (g_list_length (buffers) >= 1);
 
-  res = (gdouble *) GST_BUFFER_DATA (outbuffer);
-  for (i = 21; i < 128; i++) {
-    fail_unless (res[i] <= 0.01
-        && res[i] >= -0.01, "res[%d] = %lf\n", i, res[i]);
-  }
+  for (node = buffers; node; node = node->next) {
+    gint buffer_length;
 
-  rms = 0.0;
-  for (i = 0; i < 128; i++)
-    rms += res[i] * res[i];
-  rms = sqrt (rms / 128.0);
-  fail_unless (rms <= 0.05);
+    fail_if ((outbuffer = (GstBuffer *) node->data) == NULL);
+
+    res = (gdouble *) GST_BUFFER_DATA (outbuffer);
+    buffer_length = GST_BUFFER_SIZE (outbuffer) / sizeof (gdouble);
+    rms = 0.0;
+    for (i = 0; i < buffer_length; i++)
+      rms += res[i] * res[i];
+    rms = sqrt (rms / buffer_length);
+    fail_unless (rms <= 0.1);
+  }
 
   /* cleanup */
   cleanup_lpwsinc (lpwsinc);
@@ -209,6 +215,7 @@ GST_START_TEST (test_hp_0hz)
   GstCaps *caps;
   gdouble *in, *res, rms;
   gint i;
+  GList *node;
 
   lpwsinc = setup_lpwsinc ();
   /* Set to highpass */
@@ -232,21 +239,23 @@ GST_START_TEST (test_hp_0hz)
 
   /* pushing gives away my reference ... */
   fail_unless (gst_pad_push (mysrcpad, inbuffer) == GST_FLOW_OK);
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()));
   /* ... and puts a new buffer on the global list */
-  fail_unless_equals_int (g_list_length (buffers), 1);
-  fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL);
+  fail_unless (g_list_length (buffers) >= 1);
 
-  res = (gdouble *) GST_BUFFER_DATA (outbuffer);
-  for (i = 21; i < 128; i++) {
-    fail_unless (res[i] <= 0.01
-        && res[i] >= -0.01, "res[%d] = %lf\n", i, res[i]);
-  }
+  for (node = buffers; node; node = node->next) {
+    gint buffer_length;
 
-  rms = 0.0;
-  for (i = 0; i < 128; i++)
-    rms += res[i] * res[i];
-  rms = sqrt (rms / 128.0);
-  fail_unless (rms <= 0.05);
+    fail_if ((outbuffer = (GstBuffer *) node->data) == NULL);
+
+    res = (gdouble *) GST_BUFFER_DATA (outbuffer);
+    buffer_length = GST_BUFFER_SIZE (outbuffer) / sizeof (gdouble);
+    rms = 0.0;
+    for (i = 0; i < buffer_length; i++)
+      rms += res[i] * res[i];
+    rms = sqrt (rms / buffer_length);
+    fail_unless (rms <= 0.1);
+  }
 
   /* cleanup */
   cleanup_lpwsinc (lpwsinc);
@@ -264,6 +273,7 @@ GST_START_TEST (test_hp_22050hz)
   GstCaps *caps;
   gdouble *in, *res, rms;
   gint i;
+  GList *node;
 
   lpwsinc = setup_lpwsinc ();
   /* Set to highpass */
@@ -289,21 +299,24 @@ GST_START_TEST (test_hp_22050hz)
 
   /* pushing gives away my reference ... */
   fail_unless (gst_pad_push (mysrcpad, inbuffer) == GST_FLOW_OK);
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()));
   /* ... and puts a new buffer on the global list */
-  fail_unless_equals_int (g_list_length (buffers), 1);
+  fail_unless (g_list_length (buffers) >= 1);
   fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL);
 
-  res = (gdouble *) GST_BUFFER_DATA (outbuffer);
-  for (i = 21; i < 128; i++) {
-    fail_unless (abs (res[i]) <= 1.01
-        && abs (res[i]) >= 0.99, "res[%d] = %lf\n", i, res[i]);
-  }
+  for (node = buffers; node; node = node->next) {
+    gint buffer_length;
 
-  rms = 0.0;
-  for (i = 0; i < 128; i++)
-    rms += res[i] * res[i];
-  rms = sqrt (rms / 128.0);
-  fail_unless (rms >= 0.9);
+    fail_if ((outbuffer = (GstBuffer *) node->data) == NULL);
+
+    res = (gdouble *) GST_BUFFER_DATA (outbuffer);
+    buffer_length = GST_BUFFER_SIZE (outbuffer) / sizeof (gdouble);
+    rms = 0.0;
+    for (i = 0; i < buffer_length; i++)
+      rms += res[i] * res[i];
+    rms = sqrt (rms / buffer_length);
+    fail_unless (rms >= 0.9);
+  }
 
   /* cleanup */
   cleanup_lpwsinc (lpwsinc);
@@ -344,8 +357,9 @@ GST_START_TEST (test_small_buffer)
 
   /* pushing gives away my reference ... */
   fail_unless (gst_pad_push (mysrcpad, inbuffer) == GST_FLOW_OK);
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()));
   /* ... and puts a new buffer on the global list */
-  fail_unless_equals_int (g_list_length (buffers), 1);
+  fail_unless (g_list_length (buffers) >= 1);
   fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL);
 
   /* cleanup */