ext/wavpack/gstwavpackparse.*: Make wavpackparse also work in push-mode (not seekable...
authorSebastian Dröge <slomo@circular-chaos.org>
Tue, 15 Aug 2006 20:29:45 +0000 (20:29 +0000)
committerTim-Philipp Müller <tim@centricular.net>
Tue, 15 Aug 2006 20:29:45 +0000 (20:29 +0000)
Original commit message from CVS:
* ext/wavpack/gstwavpackparse.c: (gst_wavpack_parse_class_init),
(gst_wavpack_parse_reset), (gst_wavpack_parse_get_src_query_types),
(gst_wavpack_parse_src_query),
(gst_wavpack_parse_handle_seek_event),
(gst_wavpack_parse_sink_event), (gst_wavpack_parse_init),
(gst_wavpack_parse_create_src_pad),
(gst_wavpack_parse_push_buffer), (gst_wavpack_parse_loop),
(gst_wavpack_parse_chain), (gst_wavpack_parse_sink_activate),
(gst_wavpack_parse_sink_activate_pull):
* ext/wavpack/gstwavpackparse.h:
Patch by: Sebastian Dröge <slomo at circular-chaos.org>
Make wavpackparse also work in push-mode (not seekable yet though);
some small clean-ups along the way; add support for SEEKING query
and query types function. (#351495).

ext/wavpack/gstwavpackparse.c
ext/wavpack/gstwavpackparse.h

index b311af3..720b25b 100644 (file)
@@ -57,9 +57,9 @@ static GstStaticPadTemplate wvc_src_factory = GST_STATIC_PAD_TEMPLATE ("wvcsrc",
     GST_STATIC_CAPS ("audio/x-wavpack-correction, " "framed = (boolean) true")
     );
 
-static gboolean gst_wavepack_parse_sink_activate (GstPad * sinkpad);
+static gboolean gst_wavpack_parse_sink_activate (GstPad * sinkpad);
 static gboolean
-gst_wavepack_parse_sink_activate_pull (GstPad * sinkpad, gboolean active);
+gst_wavpack_parse_sink_activate_pull (GstPad * sinkpad, gboolean active);
 
 static void gst_wavpack_parse_loop (GstElement * element);
 static GstStateChangeReturn gst_wavpack_parse_change_state (GstElement *
@@ -68,6 +68,7 @@ static void gst_wavpack_parse_reset (GstWavpackParse * wavpackparse);
 static gint64 gst_wavpack_parse_get_upstream_length (GstWavpackParse * wvparse);
 static GstBuffer *gst_wavpack_parse_pull_buffer (GstWavpackParse * wvparse,
     gint64 offset, guint size, GstFlowReturn * flow);
+static GstFlowReturn gst_wavpack_parse_chain (GstPad * pad, GstBuffer * buf);
 
 GST_BOILERPLATE (GstWavpackParse, gst_wavpack_parse, GstElement,
     GST_TYPE_ELEMENT);
@@ -108,7 +109,7 @@ gst_wavpack_parse_class_init (GstWavpackParseClass * klass)
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
 
-  gobject_class->finalize = gst_wavpack_parse_finalize;
+  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_wavpack_parse_finalize);
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_wavpack_parse_change_state);
 }
@@ -188,7 +189,7 @@ gst_wavpack_parse_index_append_entry (GstWavpackParse * wvparse,
 static void
 gst_wavpack_parse_reset (GstWavpackParse * wavpackparse)
 {
-  wavpackparse->total_samples = 0;
+  wavpackparse->total_samples = -1;
   wavpackparse->samplerate = 0;
   wavpackparse->channels = 0;
 
@@ -203,6 +204,12 @@ gst_wavpack_parse_reset (GstWavpackParse * wavpackparse)
     wavpackparse->entries = NULL;
   }
 
+  if (wavpackparse->adapter) {
+    gst_adapter_clear (wavpackparse->adapter);
+    g_object_unref (wavpackparse->adapter);
+    wavpackparse->adapter = NULL;
+  }
+
   if (wavpackparse->srcpad != NULL) {
     gboolean res;
 
@@ -220,6 +227,19 @@ gst_wavpack_parse_reset (GstWavpackParse * wavpackparse)
   wavpackparse->queued_events = NULL;
 }
 
+static const GstQueryType *
+gst_wavpack_parse_get_src_query_types (GstPad * pad)
+{
+  static const GstQueryType types[] = {
+    GST_QUERY_POSITION,
+    GST_QUERY_DURATION,
+    GST_QUERY_SEEKING,
+    0
+  };
+
+  return types;
+}
+
 static gboolean
 gst_wavpack_parse_src_query (GstPad * pad, GstQuery * query)
 {
@@ -268,10 +288,12 @@ gst_wavpack_parse_src_query (GstPad * pad, GstQuery * query)
 
       GST_OBJECT_LOCK (wavpackparse);
       rate = wavpackparse->samplerate;
-      len = wavpackparse->total_samples;
+      /* FIXME: return 0 if we work in push based mode to let totem
+       * recognize that we can't seek */
+      len = (wavpackparse->adapter) ? 0 : wavpackparse->total_samples;
       GST_OBJECT_UNLOCK (wavpackparse);
 
-      if (len <= 0 || rate == 0) {
+      if (len < 0 || rate == 0) {
         GST_DEBUG_OBJECT (wavpackparse, "haven't read header yet");
         break;
       }
@@ -295,6 +317,24 @@ gst_wavpack_parse_src_query (GstPad * pad, GstQuery * query)
       }
       break;
     }
+    case GST_QUERY_SEEKING:{
+      gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
+      if (format == GST_FORMAT_TIME || format == GST_FORMAT_DEFAULT) {
+        gboolean seekable;
+        gint64 duration = -1;
+
+        gst_pad_query_duration (pad, &format, &duration);
+
+        /* can't seek in streaming mode yet */
+        GST_OBJECT_LOCK (wavpackparse);
+        seekable = (wavpackparse->adapter != NULL);
+        GST_OBJECT_UNLOCK (wavpackparse);
+
+        gst_query_set_seeking (query, GST_FORMAT_TIME, seekable, 0, duration);
+        ret = TRUE;
+      }
+      break;
+    }
     default:{
       ret = gst_pad_query_default (pad, query);
       break;
@@ -433,6 +473,11 @@ gst_wavpack_parse_handle_seek_event (GstWavpackParse * wvparse,
   gint64 chunk_start;           /* first sample in chunk we seek to           */
   guint rate;
 
+  if (wvparse->adapter) {
+    GST_DEBUG_OBJECT (wvparse, "seeking in streaming mode not implemented yet");
+    return FALSE;
+  }
+
   gst_event_parse_seek (event, &speed, &format, &seek_flags, &start_type,
       &start, &stop_type, &stop);
 
@@ -544,14 +589,41 @@ gst_wavpack_parse_sink_event (GstPad * pad, GstEvent * event)
 
   parse = GST_WAVPACK_PARSE (gst_pad_get_parent (pad));
 
-  /* stream lock is recursive, should be fine for all events */
-  GST_PAD_STREAM_LOCK (pad);
-  if (parse->srcpad == NULL) {
-    parse->queued_events = g_list_append (parse->queued_events, event);
-  } else {
-    ret = gst_pad_push_event (parse->srcpad, event);
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_STOP:{
+      if (parse->adapter) {
+        gst_adapter_clear (parse->adapter);
+      }
+      ret = gst_pad_push_event (pad, event);
+      break;
+    }
+    case GST_EVENT_NEWSEGMENT:{
+      parse->need_newsegment = TRUE;
+      gst_event_unref (event);
+      ret = TRUE;
+      break;
+    }
+    case GST_EVENT_EOS:{
+      if (parse->adapter) {
+        /* remove all bytes that are left in the adapter after EOS. They can't
+         * be a complete Wavpack block and we can't do anything with them */
+        gst_adapter_clear (parse->adapter);
+      }
+      ret = gst_pad_push_event (pad, event);
+      break;
+    }
+    default:{
+      /* stream lock is recursive, should be fine for all events */
+      GST_PAD_STREAM_LOCK (pad);
+      if (parse->srcpad == NULL) {
+        parse->queued_events = g_list_append (parse->queued_events, event);
+      } else {
+        ret = gst_pad_push_event (parse->srcpad, event);
+      }
+      GST_PAD_STREAM_UNLOCK (pad);
+    }
   }
-  GST_PAD_STREAM_UNLOCK (pad);
+
 
   gst_object_unref (parse);
   return ret;
@@ -589,11 +661,13 @@ gst_wavpack_parse_init (GstWavpackParse * wavpackparse,
   wavpackparse->sinkpad = gst_pad_new_from_template (tmpl, "sink");
 
   gst_pad_set_activate_function (wavpackparse->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_wavepack_parse_sink_activate));
+      GST_DEBUG_FUNCPTR (gst_wavpack_parse_sink_activate));
   gst_pad_set_activatepull_function (wavpackparse->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_wavepack_parse_sink_activate_pull));
+      GST_DEBUG_FUNCPTR (gst_wavpack_parse_sink_activate_pull));
   gst_pad_set_event_function (wavpackparse->sinkpad,
       GST_DEBUG_FUNCPTR (gst_wavpack_parse_sink_event));
+  gst_pad_set_chain_function (wavpackparse->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_wavpack_parse_chain));
 
   gst_element_add_pad (GST_ELEMENT (wavpackparse), wavpackparse->sinkpad);
 
@@ -738,6 +812,8 @@ gst_wavpack_parse_create_src_pad (GstWavpackParse * wvparse, GstBuffer * buf,
 
   gst_pad_set_query_function (wvparse->srcpad,
       GST_DEBUG_FUNCPTR (gst_wavpack_parse_src_query));
+  gst_pad_set_query_type_function (wvparse->srcpad,
+      GST_DEBUG_FUNCPTR (gst_wavpack_parse_get_src_query_types));
   gst_pad_set_event_function (wvparse->srcpad,
       GST_DEBUG_FUNCPTR (gst_wavpack_parse_src_event));
 
@@ -752,6 +828,43 @@ gst_wavpack_parse_create_src_pad (GstWavpackParse * wvparse, GstBuffer * buf,
   return TRUE;
 }
 
+static GstFlowReturn
+gst_wavpack_parse_push_buffer (GstWavpackParse * wvparse, GstBuffer * buf,
+    WavpackHeader * header)
+{
+  wvparse->current_offset += header->ckSize + 8;
+
+  wvparse->segment.last_stop = header->block_index;
+
+  if (wvparse->need_newsegment) {
+    if (gst_wavpack_parse_send_newsegment (wvparse, FALSE))
+      wvparse->need_newsegment = FALSE;
+  }
+
+  /* send any queued events */
+  if (wvparse->queued_events) {
+    GList *l;
+
+    for (l = wvparse->queued_events; l != NULL; l = l->next) {
+      gst_pad_push_event (wvparse->srcpad, GST_EVENT (l->data));
+    }
+    g_list_free (wvparse->queued_events);
+    wvparse->queued_events = NULL;
+  }
+
+  GST_BUFFER_TIMESTAMP (buf) = gst_util_uint64_scale_int (header->block_index,
+      GST_SECOND, wvparse->samplerate);
+  GST_BUFFER_DURATION (buf) = gst_util_uint64_scale_int (header->block_samples,
+      GST_SECOND, wvparse->samplerate);
+  GST_BUFFER_OFFSET (buf) = header->block_index;
+  gst_buffer_set_caps (buf, GST_PAD_CAPS (wvparse->srcpad));
+
+  GST_LOG_OBJECT (wvparse, "Pushing buffer with time %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
+
+  return gst_pad_push (wvparse->srcpad, buf);
+}
+
 static void
 gst_wavpack_parse_loop (GstElement * element)
 {
@@ -797,37 +910,7 @@ gst_wavpack_parse_loop (GstElement * element)
   gst_wavpack_parse_index_append_entry (wavpackparse,
       wavpackparse->current_offset, header.block_index, header.block_samples);
 
-  wavpackparse->current_offset += header.ckSize + 8;
-
-  wavpackparse->segment.last_stop = header.block_index;
-
-  if (wavpackparse->need_newsegment) {
-    if (gst_wavpack_parse_send_newsegment (wavpackparse, FALSE))
-      wavpackparse->need_newsegment = FALSE;
-  }
-
-  /* send any queued events */
-  if (wavpackparse->queued_events) {
-    GList *l;
-
-    for (l = wavpackparse->queued_events; l != NULL; l = l->next) {
-      gst_pad_push_event (wavpackparse->srcpad, GST_EVENT (l->data));
-    }
-    g_list_free (wavpackparse->queued_events);
-    wavpackparse->queued_events = NULL;
-  }
-
-  GST_BUFFER_TIMESTAMP (buf) = gst_util_uint64_scale_int (header.block_index,
-      GST_SECOND, wavpackparse->samplerate);
-  GST_BUFFER_DURATION (buf) = gst_util_uint64_scale_int (header.block_samples,
-      GST_SECOND, wavpackparse->samplerate);
-  GST_BUFFER_OFFSET (buf) = header.block_index;
-  gst_buffer_set_caps (buf, GST_PAD_CAPS (wavpackparse->srcpad));
-
-  GST_LOG_OBJECT (wavpackparse, "Pushing buffer with time %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
-
-  flow_ret = gst_pad_push (wavpackparse->srcpad, buf);
+  flow_ret = gst_wavpack_parse_push_buffer (wavpackparse, buf, &header);
   if (flow_ret != GST_FLOW_OK) {
     GST_DEBUG_OBJECT (wavpackparse, "Push failed, flow: %s",
         gst_flow_get_name (flow_ret));
@@ -852,6 +935,61 @@ pause:
   }
 }
 
+static GstFlowReturn
+gst_wavpack_parse_chain (GstPad * pad, GstBuffer * buf)
+{
+  GstWavpackParse *wvparse = GST_WAVPACK_PARSE (GST_PAD_PARENT (pad));
+  GstFlowReturn ret = GST_FLOW_OK;
+  WavpackHeader wph;
+  const guint8 *tmp_buf;
+
+  if (!wvparse->adapter) {
+    wvparse->adapter = gst_adapter_new ();
+  }
+
+  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DISCONT)) {
+    gst_adapter_clear (wvparse->adapter);
+  }
+
+  gst_adapter_push (wvparse->adapter, buf);
+
+  if (gst_adapter_available (wvparse->adapter) < sizeof (WavpackHeader))
+    return ret;
+
+  tmp_buf = gst_adapter_peek (wvparse->adapter, sizeof (WavpackHeader));
+  gst_wavpack_read_header (&wph, (guint8 *) tmp_buf);
+
+  /* FIXME: should check for wavpack marker here and re-sync if not */
+
+  while (gst_adapter_available (wvparse->adapter) >= wph.ckSize + 4 * 1 + 4) {
+    GstBuffer *outbuf =
+        gst_adapter_take_buffer (wvparse->adapter, wph.ckSize + 4 * 1 + 4);
+
+    if (!outbuf)
+      return GST_FLOW_ERROR;
+
+    if (wvparse->srcpad == NULL) {
+      if (!gst_wavpack_parse_create_src_pad (wvparse, outbuf, &wph)) {
+        GST_ELEMENT_ERROR (wvparse, STREAM, DECODE, (NULL), (NULL));
+        ret = GST_FLOW_ERROR;
+        break;
+      }
+    }
+
+    ret = gst_wavpack_parse_push_buffer (wvparse, outbuf, &wph);
+
+    if (ret != GST_FLOW_OK)
+      break;
+
+    if (gst_adapter_available (wvparse->adapter) >= sizeof (WavpackHeader)) {
+      tmp_buf = gst_adapter_peek (wvparse->adapter, sizeof (WavpackHeader));
+      gst_wavpack_read_header (&wph, (guint8 *) tmp_buf);
+    }
+  }
+
+  return ret;
+}
+
 static GstStateChangeReturn
 gst_wavpack_parse_change_state (GstElement * element, GstStateChange transition)
 {
@@ -880,19 +1018,18 @@ gst_wavpack_parse_change_state (GstElement * element, GstStateChange transition)
   return ret;
 }
 
-
 static gboolean
-gst_wavepack_parse_sink_activate (GstPad * sinkpad)
+gst_wavpack_parse_sink_activate (GstPad * sinkpad)
 {
   if (gst_pad_check_pull_range (sinkpad)) {
     return gst_pad_activate_pull (sinkpad, TRUE);
   } else {
-    return FALSE;
+    return gst_pad_activate_push (sinkpad, TRUE);
   }
 }
 
 static gboolean
-gst_wavepack_parse_sink_activate_pull (GstPad * sinkpad, gboolean active)
+gst_wavpack_parse_sink_activate_pull (GstPad * sinkpad, gboolean active)
 {
   gboolean result;
 
index 37321d8..d849da7 100644 (file)
@@ -23,6 +23,7 @@
 #define __GST_WAVPACK_PARSE_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstadapter.h>
 
 G_BEGIN_DECLS
 
@@ -67,6 +68,8 @@ struct _GstWavpackParse
   GstSegment     segment;     /* the currently configured segment, in
                                * samples/audio frames (DEFAULT format) */
 
+  GstAdapter    *adapter;     /* when operating chain-based, otherwise NULL */
+
   /* Array of GstWavpackParseIndexEntry structs, mapping known
    * sample offsets to byte offsets. Is kept increasing without
    * gaps (ie. append only and consecutive entries must always