ext/amrwb/gstamrwbparse.*: Add flush seek handler. Taken from recent armnbparse changes.
authorStefan Kost <ensonic@users.sourceforge.net>
Thu, 9 Oct 2008 09:21:44 +0000 (09:21 +0000)
committerStefan Kost <ensonic@users.sourceforge.net>
Thu, 9 Oct 2008 09:21:44 +0000 (09:21 +0000)
Original commit message from CVS:
* ext/amrwb/gstamrwbparse.c:
* ext/amrwb/gstamrwbparse.h:
Add flush seek handler. Taken from recent armnbparse changes.
Sync the code more and use #defines for HEADER.

ChangeLog
ext/amrwb/gstamrwbparse.c
ext/amrwb/gstamrwbparse.h

index 3437a4d..cc3ab71 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,13 @@
 
        * ext/amrwb/gstamrwbparse.c:
        * ext/amrwb/gstamrwbparse.h:
+         Add flush seek handler. Taken from recent armnbparse changes.
+         Sync the code more and use #defines for HEADER.
+
+2008-10-09  Stefan Kost  <ensonic@users.sf.net>
+
+       * ext/amrwb/gstamrwbparse.c:
+       * ext/amrwb/gstamrwbparse.h:
           Fix the duration query. Also set caps on the pads and buffers more
           correctly. Taken from recent armnbparse changes.
 
index de8e1c6..bcee170 100644 (file)
@@ -56,6 +56,9 @@ GST_DEBUG_CATEGORY_STATIC (gst_amrwbparse_debug);
 
 extern const UWord8 block_size[];
 
+#define AMRWB_HEADER_SIZE 9
+#define AMRWB_HEADER_STR "#!AMR-WB\n"
+
 static void gst_amrwbparse_base_init (gpointer klass);
 static void gst_amrwbparse_class_init (GstAmrwbParseClass * klass);
 static void gst_amrwbparse_init (GstAmrwbParse * amrwbparse,
@@ -64,11 +67,15 @@ static void gst_amrwbparse_init (GstAmrwbParse * amrwbparse,
 static const GstQueryType *gst_amrwbparse_querytypes (GstPad * pad);
 static gboolean gst_amrwbparse_query (GstPad * pad, GstQuery * query);
 
+static gboolean gst_amrwbparse_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_amrwbparse_src_event (GstPad * pad, GstEvent * event);
 static GstFlowReturn gst_amrwbparse_chain (GstPad * pad, GstBuffer * buffer);
 static void gst_amrwbparse_loop (GstPad * pad);
 static gboolean gst_amrwbparse_sink_activate (GstPad * sinkpad);
 static gboolean gst_amrwbparse_sink_activate_pull (GstPad * sinkpad,
     gboolean active);
+static gboolean gst_amrwbparse_sink_activate_push (GstPad * sinkpad,
+    gboolean active);
 static GstStateChangeReturn gst_amrwbparse_state_change (GstElement * element,
     GstStateChange transition);
 
@@ -117,16 +124,20 @@ gst_amrwbparse_init (GstAmrwbParse * amrwbparse, GstAmrwbParseClass * klass)
       gst_pad_new_from_static_template (&sink_template, "sink");
   gst_pad_set_chain_function (amrwbparse->sinkpad,
       GST_DEBUG_FUNCPTR (gst_amrwbparse_chain));
-
+  gst_pad_set_event_function (amrwbparse->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_amrwbparse_sink_event));
   gst_pad_set_activate_function (amrwbparse->sinkpad,
       gst_amrwbparse_sink_activate);
   gst_pad_set_activatepull_function (amrwbparse->sinkpad,
       gst_amrwbparse_sink_activate_pull);
-
+  gst_pad_set_activatepush_function (amrwbparse->sinkpad,
+      gst_amrwbparse_sink_activate_push);
   gst_element_add_pad (GST_ELEMENT (amrwbparse), amrwbparse->sinkpad);
 
   /* create the src pad */
   amrwbparse->srcpad = gst_pad_new_from_static_template (&src_template, "src");
+  gst_pad_set_event_function (amrwbparse->srcpad,
+      GST_DEBUG_FUNCPTR (gst_amrwbparse_src_event));
   gst_pad_set_query_function (amrwbparse->srcpad,
       GST_DEBUG_FUNCPTR (gst_amrwbparse_query));
   gst_pad_set_query_type_function (amrwbparse->srcpad,
@@ -158,6 +169,7 @@ gst_amrwbparse_querytypes (GstPad * pad)
 {
   static const GstQueryType list[] = {
     GST_QUERY_POSITION,
+    GST_QUERY_DURATION,
     0
   };
 
@@ -234,9 +246,193 @@ gst_amrwbparse_query (GstPad * pad, GstQuery * query)
 }
 
 
+static gboolean
+gst_amrwbparse_handle_pull_seek (GstAmrwbParse * amrwbparse, GstPad * pad,
+    GstEvent * event)
+{
+  GstFormat format;
+  gdouble rate;
+  GstSeekFlags flags;
+  GstSeekType cur_type, stop_type;
+  gint64 cur, stop;
+  gint64 byte_cur = -1, byte_stop = -1;
+  gboolean flush;
+
+  gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
+      &stop_type, &stop);
+
+  GST_DEBUG_OBJECT (amrwbparse, "Performing seek to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (cur));
+
+  /* For any format other than TIME, see if upstream handles
+   * it directly or fail. For TIME, try upstream, but do it ourselves if
+   * it fails upstream */
+  if (format != GST_FORMAT_TIME) {
+    return gst_pad_push_event (amrwbparse->sinkpad, event);
+  } else {
+    if (gst_pad_push_event (amrwbparse->sinkpad, event))
+      return TRUE;
+  }
+
+  flush = flags & GST_SEEK_FLAG_FLUSH;
+
+  /* send flush start */
+  if (flush)
+    gst_pad_push_event (amrwbparse->sinkpad, gst_event_new_flush_start ());
+  /* we only handle FLUSH seeks at the moment */
+  else
+    return FALSE;
+
+  /* grab streaming lock, this should eventually be possible, either
+   * because the task is paused or our streaming thread stopped
+   * because our peer is flushing. */
+  GST_PAD_STREAM_LOCK (amrwbparse->sinkpad);
+
+  /* Convert the TIME to the appropriate BYTE position at which to resume
+   * decoding. */
+  cur = cur / (20 * GST_MSECOND) * (20 * GST_MSECOND);
+  if (cur != -1)
+    byte_cur = amrwbparse->block * (cur / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE;
+  if (stop != -1)
+    byte_stop =
+        amrwbparse->block * (stop / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE;
+  amrwbparse->offset = byte_cur;
+  amrwbparse->ts = cur;
+
+  GST_DEBUG_OBJECT (amrwbparse, "Seeking to byte range %" G_GINT64_FORMAT
+      " to %" G_GINT64_FORMAT, byte_cur, cur);
+
+  /* and prepare to continue streaming */
+  /* send flush stop, peer will accept data and events again. We
+   * are not yet providing data as we still have the STREAM_LOCK. */
+  gst_pad_push_event (amrwbparse->sinkpad, gst_event_new_flush_stop ());
+  gst_pad_push_event (amrwbparse->srcpad, gst_event_new_new_segment (FALSE,
+          rate, format, cur, -1, cur));
+
+  /* and restart the task in case it got paused explicitely or by
+   * the FLUSH_START event we pushed out. */
+  gst_pad_start_task (amrwbparse->sinkpad,
+      (GstTaskFunction) gst_amrwbparse_loop, amrwbparse->sinkpad);
+
+  /* and release the lock again so we can continue streaming */
+  GST_PAD_STREAM_UNLOCK (amrwbparse->sinkpad);
+
+  return TRUE;
+}
+
+static gboolean
+gst_amrwbparse_handle_push_seek (GstAmrwbParse * amrwbparse, GstPad * pad,
+    GstEvent * event)
+{
+  GstFormat format;
+  gdouble rate;
+  GstSeekFlags flags;
+  GstSeekType cur_type, stop_type;
+  gint64 cur, stop;
+  gint64 byte_cur = -1, byte_stop = -1;
+
+  gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
+      &stop_type, &stop);
+
+  GST_DEBUG_OBJECT (amrwbparse, "Performing seek to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (cur));
+
+  /* For any format other than TIME, see if upstream handles
+   * it directly or fail. For TIME, try upstream, but do it ourselves if
+   * it fails upstream */
+  if (format != GST_FORMAT_TIME) {
+    return gst_pad_push_event (amrwbparse->sinkpad, event);
+  } else {
+    if (gst_pad_push_event (amrwbparse->sinkpad, event))
+      return TRUE;
+  }
+
+  /* Convert the TIME to the appropriate BYTE position at which to resume
+   * decoding. */
+  cur = cur / (20 * GST_MSECOND) * (20 * GST_MSECOND);
+  if (cur != -1)
+    byte_cur = amrwbparse->block * (cur / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE;
+  if (stop != -1)
+    byte_stop =
+        amrwbparse->block * (stop / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE;
+  amrwbparse->ts = cur;
+
+  GST_DEBUG_OBJECT (amrwbparse, "Seeking to byte range %" G_GINT64_FORMAT
+      " to %" G_GINT64_FORMAT, byte_cur, byte_stop);
+
+  /* Send BYTE based seek upstream */
+  event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, cur_type,
+      byte_cur, stop_type, byte_stop);
+
+  return gst_pad_push_event (amrwbparse->sinkpad, event);
+}
+
+static gboolean
+gst_amrwbparse_src_event (GstPad * pad, GstEvent * event)
+{
+  GstAmrwbParse *amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (pad));
+  gboolean res;
+
+  GST_DEBUG_OBJECT (amrwbparse, "handling event %d", GST_EVENT_TYPE (event));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:
+      if (amrwbparse->seek_handler)
+        res = amrwbparse->seek_handler (amrwbparse, pad, event);
+      else
+        res = FALSE;
+      break;
+    default:
+      res = gst_pad_push_event (amrwbparse->sinkpad, event);
+      break;
+  }
+  gst_object_unref (amrwbparse);
+
+  return res;
+}
+
+
 /*
  * Data reading.
  */
+static gboolean
+gst_amrwbparse_sink_event (GstPad * pad, GstEvent * event)
+{
+  GstAmrwbParse *amrwbparse;
+  gboolean res;
+
+  amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (pad));
+
+  GST_LOG ("handling event %d", GST_EVENT_TYPE (event));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_START:
+      res = gst_pad_push_event (amrwbparse->srcpad, event);
+      break;
+    case GST_EVENT_FLUSH_STOP:
+      gst_adapter_clear (amrwbparse->adapter);
+      gst_segment_init (&amrwbparse->segment, GST_FORMAT_TIME);
+      res = gst_pad_push_event (amrwbparse->srcpad, event);
+      break;
+    case GST_EVENT_EOS:
+      res = gst_pad_push_event (amrwbparse->srcpad, event);
+      break;
+    case GST_EVENT_NEWSEGMENT:
+    {
+      /* eat for now, we send a newsegment at start with infinite
+       * duration. */
+      gst_event_unref (event);
+      res = TRUE;
+      break;
+    }
+    default:
+      res = gst_pad_push_event (amrwbparse->srcpad, event);
+      break;
+  }
+  gst_object_unref (amrwbparse);
+
+  return res;
+}
 
 /* streaming mode */
 static GstFlowReturn
@@ -247,9 +443,17 @@ gst_amrwbparse_chain (GstPad * pad, GstBuffer * buffer)
   gint mode;
   const guint8 *data;
   GstBuffer *out;
+  GstClockTime timestamp;
 
   amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (pad));
 
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+    GST_DEBUG_OBJECT (amrwbparse, "Lock on timestamp %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (timestamp));
+    amrwbparse->ts = timestamp;
+  }
+
   gst_adapter_push (amrwbparse->adapter, buffer);
 
   /* init */
@@ -257,14 +461,14 @@ gst_amrwbparse_chain (GstPad * pad, GstBuffer * buffer)
     GstEvent *segev;
     GstCaps *caps;
 
-    if (gst_adapter_available (amrwbparse->adapter) < 9)
+    if (gst_adapter_available (amrwbparse->adapter) < AMRWB_HEADER_SIZE)
       goto done;
 
-    data = gst_adapter_peek (amrwbparse->adapter, 9);
-    if (memcmp (data, "#!AMR-WB\n", 9) != 0)
+    data = gst_adapter_peek (amrwbparse->adapter, AMRWB_HEADER_SIZE);
+    if (memcmp (data, AMRWB_HEADER_STR, AMRWB_HEADER_SIZE) != 0)
       goto done;
 
-    gst_adapter_flush (amrwbparse->adapter, 9);
+    gst_adapter_flush (amrwbparse->adapter, AMRWB_HEADER_SIZE);
 
     amrwbparse->need_header = FALSE;
 
@@ -323,39 +527,39 @@ static gboolean
 gst_amrwbparse_pull_header (GstAmrwbParse * amrwbparse)
 {
   GstBuffer *buffer;
-  gboolean ret = TRUE;
+  GstFlowReturn ret;
   guint8 *data;
   gint size;
-  const guint8 magic_number_size = 9;   /* sizeof("#!AMR-WB\n")-1 */
 
-  if (GST_FLOW_OK != gst_pad_pull_range (amrwbparse->sinkpad,
-          amrwbparse->offset, magic_number_size, &buffer)) {
-    ret = FALSE;
-    goto done;
-  }
+  ret = gst_pad_pull_range (amrwbparse->sinkpad, G_GUINT64_CONSTANT (0),
+      AMRWB_HEADER_SIZE, &buffer);
+  if (ret != GST_FLOW_OK)
+    return FALSE;
 
   data = GST_BUFFER_DATA (buffer);
   size = GST_BUFFER_SIZE (buffer);
 
-  if (size < magic_number_size) {
-    /* not enough */
-    ret = FALSE;
-    goto done;
-  }
-
-  if (memcmp (data, "#!AMR-WB\n", magic_number_size)) {
-    /* no header */
-    ret = FALSE;
-    goto done;
-  }
+  if (size < AMRWB_HEADER_SIZE)
+    goto not_enough;
 
-  amrwbparse->offset += magic_number_size;
-
-done:
+  if (memcmp (data, AMRWB_HEADER_STR, AMRWB_HEADER_SIZE))
+    goto no_header;
 
   gst_buffer_unref (buffer);
-  return ret;
 
+  amrwbparse->offset = AMRWB_HEADER_SIZE;
+  return TRUE;
+
+not_enough:
+  {
+    gst_buffer_unref (buffer);
+    return FALSE;
+  }
+no_header:
+  {
+    gst_buffer_unref (buffer);
+    return FALSE;
+  }
 }
 
 /* random access mode, could just read a fixed size buffer and push it to
@@ -374,6 +578,7 @@ gst_amrwbparse_loop (GstPad * pad)
 
   /* init */
   if (G_UNLIKELY (amrwbparse->need_header)) {
+    GstCaps *caps;
 
     if (!gst_amrwbparse_pull_header (amrwbparse)) {
       GST_ELEMENT_ERROR (amrwbparse, STREAM, WRONG_TYPE, (NULL), (NULL));
@@ -381,6 +586,11 @@ gst_amrwbparse_loop (GstPad * pad)
       goto need_pause;
     }
 
+    caps = gst_caps_new_simple ("audio/AMR-WB",
+        "rate", G_TYPE_INT, 16000, "channels", G_TYPE_INT, 1, NULL);
+    gst_pad_set_caps (amrwbparse->srcpad, caps);
+    gst_caps_unref (caps);
+
     GST_DEBUG_OBJECT (amrwbparse, "Sending newsegment event");
     gst_pad_push_event (amrwbparse->srcpad,
         gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
@@ -389,8 +599,8 @@ gst_amrwbparse_loop (GstPad * pad)
     amrwbparse->need_header = FALSE;
   }
 
-  ret = gst_pad_pull_range (amrwbparse->sinkpad,
-      amrwbparse->offset, 1, &buffer);
+  ret =
+      gst_pad_pull_range (amrwbparse->sinkpad, amrwbparse->offset, 1, &buffer);
 
   if (ret == GST_FLOW_UNEXPECTED)
     goto eos;
@@ -463,36 +673,60 @@ eos:
 static gboolean
 gst_amrwbparse_sink_activate (GstPad * sinkpad)
 {
+  gboolean result = FALSE;
   GstAmrwbParse *amrwbparse;
 
-  amrwbparse = GST_AMRWBPARSE (GST_PAD_PARENT (sinkpad));
+  amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (sinkpad));
+
   if (gst_pad_check_pull_range (sinkpad)) {
-    return gst_pad_activate_pull (sinkpad, TRUE);
+    GST_DEBUG ("Trying to activate in pull mode");
+    amrwbparse->seekable = TRUE;
+    amrwbparse->ts = 0;
+    result = gst_pad_activate_pull (sinkpad, TRUE);
   } else {
+    GST_DEBUG ("Try to activate in push mode");
     amrwbparse->seekable = FALSE;
-    return gst_pad_activate_push (sinkpad, TRUE);
+    result = gst_pad_activate_push (sinkpad, TRUE);
   }
+
+  gst_object_unref (amrwbparse);
+  return result;
 }
 
 
+
+static gboolean
+gst_amrwbparse_sink_activate_push (GstPad * sinkpad, gboolean active)
+{
+  GstAmrwbParse *amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (sinkpad));
+
+  if (active) {
+    amrwbparse->seek_handler = gst_amrwbparse_handle_push_seek;
+  } else {
+    amrwbparse->seek_handler = NULL;
+  }
+
+  gst_object_unref (amrwbparse);
+  return TRUE;
+}
+
 static gboolean
 gst_amrwbparse_sink_activate_pull (GstPad * sinkpad, gboolean active)
 {
   gboolean result;
   GstAmrwbParse *amrwbparse;
 
-  amrwbparse = GST_AMRWBPARSE (GST_PAD_PARENT (sinkpad));
+  amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (sinkpad));
   if (active) {
-    amrwbparse->need_header = TRUE;
-    amrwbparse->seekable = TRUE;
-    amrwbparse->ts = 0;
-    /* if we have a scheduler we can start the task */
+    amrwbparse->seek_handler = gst_amrwbparse_handle_pull_seek;
     result = gst_pad_start_task (sinkpad,
         (GstTaskFunction) gst_amrwbparse_loop, sinkpad);
   } else {
+    amrwbparse->seek_handler = NULL;
     result = gst_pad_stop_task (sinkpad);
   }
 
+  gst_object_unref (amrwbparse);
   return result;
 }
 
@@ -509,6 +743,10 @@ gst_amrwbparse_state_change (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
+      amrwbparse->need_header = TRUE;
+      amrwbparse->ts = -1;
+      amrwbparse->block = 0;
+      gst_segment_init (&amrwbparse->segment, GST_FORMAT_TIME);
       break;
     default:
       break;
index 06ec1d5..6cbe7b7 100644 (file)
@@ -42,6 +42,9 @@ G_BEGIN_DECLS
 typedef struct _GstAmrwbParse GstAmrwbParse;
 typedef struct _GstAmrwbParseClass GstAmrwbParseClass;
 
+typedef gboolean (*GstAmrwbSeekHandler) (GstAmrwbParse * amrwbparse, GstPad * pad,
+                   GstEvent * event);
+
 struct _GstAmrwbParse {
   GstElement element;
 
@@ -54,8 +57,13 @@ struct _GstAmrwbParse {
   gboolean need_header;
   gint64 offset;
   gint block;
+  
+  GstAmrwbSeekHandler seek_handler;
 
   guint64 ts;
+
+  /* for seeking etc */
+  GstSegment segment;
 };
 
 struct _GstAmrwbParseClass {