rmdemux: Make sure we have enough data available when parsing audio/video packets
[platform/upstream/gstreamer.git] / gst / realmedia / rmdemux.c
index e6b8ec6..68b0736 100644 (file)
@@ -20,8 +20,8 @@
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 #ifdef HAVE_CONFIG_H
@@ -55,7 +55,6 @@ struct _GstRMDemuxStream
 
   int id;
   GstPad *pad;
-  GstFlowReturn last_flow;
   gboolean discont;
   int timescale;
 
@@ -112,13 +111,13 @@ GST_STATIC_PAD_TEMPLATE ("sink",
     );
 
 static GstStaticPadTemplate gst_rmdemux_videosrc_template =
-GST_STATIC_PAD_TEMPLATE ("video_%02d",
+GST_STATIC_PAD_TEMPLATE ("video_%u",
     GST_PAD_SRC,
     GST_PAD_SOMETIMES,
     GST_STATIC_CAPS_ANY);
 
 static GstStaticPadTemplate gst_rmdemux_audiosrc_template =
-GST_STATIC_PAD_TEMPLATE ("audio_%02d",
+GST_STATIC_PAD_TEMPLATE ("audio_%u",
     GST_PAD_SRC,
     GST_PAD_SOMETIMES,
     GST_STATIC_CAPS_ANY);
@@ -134,18 +133,20 @@ static void gst_rmdemux_init (GstRMDemux * rmdemux);
 static void gst_rmdemux_finalize (GObject * object);
 static GstStateChangeReturn gst_rmdemux_change_state (GstElement * element,
     GstStateChange transition);
-static GstFlowReturn gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_rmdemux_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer);
 static void gst_rmdemux_loop (GstPad * pad);
-static gboolean gst_rmdemux_sink_activate (GstPad * sinkpad);
-static gboolean gst_rmdemux_sink_activate_push (GstPad * sinkpad,
-    gboolean active);
-static gboolean gst_rmdemux_sink_activate_pull (GstPad * sinkpad,
-    gboolean active);
-static gboolean gst_rmdemux_sink_event (GstPad * pad, GstEvent * event);
-static gboolean gst_rmdemux_src_event (GstPad * pad, GstEvent * event);
+static gboolean gst_rmdemux_sink_activate (GstPad * sinkpad,
+    GstObject * parent);
+static gboolean gst_rmdemux_sink_activate_mode (GstPad * sinkpad,
+    GstObject * parent, GstPadMode mode, gboolean active);
+static gboolean gst_rmdemux_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static gboolean gst_rmdemux_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
 static void gst_rmdemux_send_event (GstRMDemux * rmdemux, GstEvent * event);
-static const GstQueryType *gst_rmdemux_src_query_types (GstPad * pad);
-static gboolean gst_rmdemux_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_rmdemux_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 static gboolean gst_rmdemux_perform_seek (GstRMDemux * rmdemux,
     GstEvent * event);
 
@@ -196,13 +197,13 @@ gst_rmdemux_base_init (GstRMDemuxClass * klass)
 {
   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
 
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&gst_rmdemux_sink_template));
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&gst_rmdemux_videosrc_template));
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&gst_rmdemux_audiosrc_template));
-  gst_element_class_set_details_simple (element_class, "RealMedia Demuxer",
+  gst_element_class_add_static_pad_template (element_class,
+      &gst_rmdemux_sink_template);
+  gst_element_class_add_static_pad_template (element_class,
+      &gst_rmdemux_videosrc_template);
+  gst_element_class_add_static_pad_template (element_class,
+      &gst_rmdemux_audiosrc_template);
+  gst_element_class_set_static_metadata (element_class, "RealMedia Demuxer",
       "Codec/Demuxer",
       "Demultiplex a RealMedia file into audio and video streams",
       "David Schleef <ds@schleef.org>");
@@ -236,6 +237,10 @@ gst_rmdemux_finalize (GObject * object)
     g_object_unref (rmdemux->adapter);
     rmdemux->adapter = NULL;
   }
+  if (rmdemux->flowcombiner) {
+    gst_flow_combiner_free (rmdemux->flowcombiner);
+    rmdemux->flowcombiner = NULL;
+  }
 
   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
 }
@@ -251,10 +256,8 @@ gst_rmdemux_init (GstRMDemux * rmdemux)
       GST_DEBUG_FUNCPTR (gst_rmdemux_chain));
   gst_pad_set_activate_function (rmdemux->sinkpad,
       GST_DEBUG_FUNCPTR (gst_rmdemux_sink_activate));
-  gst_pad_set_activatepull_function (rmdemux->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_rmdemux_sink_activate_pull));
-  gst_pad_set_activatepush_function (rmdemux->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_rmdemux_sink_activate_push));
+  gst_pad_set_activatemode_function (rmdemux->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_rmdemux_sink_activate_mode));
 
   gst_element_add_pad (GST_ELEMENT (rmdemux), rmdemux->sinkpad);
 
@@ -262,40 +265,36 @@ gst_rmdemux_init (GstRMDemux * rmdemux)
   rmdemux->first_ts = GST_CLOCK_TIME_NONE;
   rmdemux->base_ts = GST_CLOCK_TIME_NONE;
   rmdemux->need_newsegment = TRUE;
+  rmdemux->have_group_id = FALSE;
+  rmdemux->group_id = G_MAXUINT;
+  rmdemux->flowcombiner = gst_flow_combiner_new ();
 
   gst_rm_utils_run_tests ();
 }
 
 static gboolean
-gst_rmdemux_sink_event (GstPad * pad, GstEvent * event)
+gst_rmdemux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
-  GstRMDemux *rmdemux;
   gboolean ret;
 
-  rmdemux = GST_RMDEMUX (gst_pad_get_parent (pad));
-
-  GST_LOG_OBJECT (pad, "%s event", GST_EVENT_TYPE_NAME (event));
-
   switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_NEWSEGMENT:
+    case GST_EVENT_SEGMENT:
       gst_event_unref (event);
       ret = TRUE;
       break;
     default:
-      ret = gst_pad_event_default (pad, event);
+      ret = gst_pad_event_default (pad, parent, event);
       break;
   }
-
-  gst_object_unref (rmdemux);
   return ret;
 }
 
 static gboolean
-gst_rmdemux_src_event (GstPad * pad, GstEvent * event)
+gst_rmdemux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
   gboolean ret = TRUE;
 
-  GstRMDemux *rmdemux = GST_RMDEMUX (GST_PAD_PARENT (pad));
+  GstRMDemux *rmdemux = GST_RMDEMUX (parent);
 
   GST_LOG_OBJECT (rmdemux, "handling src event");
 
@@ -330,7 +329,7 @@ gst_rmdemux_src_event (GstPad * pad, GstEvent * event)
     }
     default:
       GST_LOG_OBJECT (rmdemux, "Event on src: type=%d", GST_EVENT_TYPE (event));
-      ret = gst_pad_event_default (pad, event);
+      ret = gst_pad_event_default (pad, parent, event);
       break;
   }
 
@@ -350,7 +349,9 @@ gst_rmdemux_validate_offset (GstRMDemux * rmdemux)
   GstFlowReturn flowret;
   guint16 version, length;
   gboolean ret = TRUE;
+  GstMapInfo map;
 
+  buffer = NULL;
   flowret = gst_pad_pull_range (rmdemux->sinkpad, rmdemux->offset, 4, &buffer);
 
   if (flowret != GST_FLOW_OK) {
@@ -367,19 +368,21 @@ gst_rmdemux_validate_offset (GstRMDemux * rmdemux)
    * 4 bytes, and we can check that it won't take us past our known total size
    */
 
-  version = RMDEMUX_GUINT16_GET (GST_BUFFER_DATA (buffer));
+  gst_buffer_map (buffer, &map, GST_MAP_READ);
+  version = RMDEMUX_GUINT16_GET (map.data);
   if (version != 0 && version != 1) {
     GST_DEBUG_OBJECT (rmdemux, "Expected version 0 or 1, got %d",
         (int) version);
     ret = FALSE;
   }
 
-  length = RMDEMUX_GUINT16_GET (GST_BUFFER_DATA (buffer) + 2);
+  length = RMDEMUX_GUINT16_GET (map.data + 2);
   /* TODO: Also check against total stream length */
   if (length < 4) {
     GST_DEBUG_OBJECT (rmdemux, "Expected length >= 4, got %d", (int) length);
     ret = FALSE;
   }
+  gst_buffer_unmap (buffer, &map);
 
   if (ret) {
     rmdemux->offset += 4;
@@ -388,6 +391,7 @@ gst_rmdemux_validate_offset (GstRMDemux * rmdemux)
   } else {
     GST_WARNING_OBJECT (rmdemux, "Failed to validate seek offset at %d",
         rmdemux->offset);
+    gst_buffer_unref (buffer);
   }
 
   return ret;
@@ -400,9 +404,6 @@ find_seek_offset_bytes (GstRMDemux * rmdemux, guint target)
   GSList *cur;
   gboolean ret = FALSE;
 
-  if (target < 0)
-    return FALSE;
-
   for (cur = rmdemux->streams; cur; cur = cur->next) {
     GstRMDemuxStream *stream = cur->data;
 
@@ -525,20 +526,12 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, GstEvent * event)
 
   GST_LOG_OBJECT (rmdemux, "Took streamlock");
 
-  /* close current segment first */
-  if (rmdemux->segment_running && !flush) {
-    GstEvent *newseg;
-
-    newseg = gst_event_new_new_segment (TRUE, rmdemux->segment.rate,
-        GST_FORMAT_TIME, rmdemux->segment.start,
-        rmdemux->segment.last_stop, rmdemux->segment.time);
-
-    gst_rmdemux_send_event (rmdemux, newseg);
-  }
-
   if (event) {
-    gst_segment_set_seek (&rmdemux->segment, rate, format, flags,
-        cur_type, cur, stop_type, stop, &update);
+    if (!gst_segment_do_seek (&rmdemux->segment, rate, format, flags,
+            cur_type, cur, stop_type, stop, &update)) {
+      ret = FALSE;
+      goto done;
+    }
   }
 
   GST_DEBUG_OBJECT (rmdemux, "segment positions set to %" GST_TIME_FORMAT "-%"
@@ -547,7 +540,7 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, GstEvent * event)
 
   /* we need to stop flushing on the sinkpad as we're going to use it
    * next. We can do this as we have the STREAM lock now. */
-  gst_pad_push_event (rmdemux->sinkpad, gst_event_new_flush_stop ());
+  gst_pad_push_event (rmdemux->sinkpad, gst_event_new_flush_stop (TRUE));
 
   GST_LOG_OBJECT (rmdemux, "Pushed FLUSH_STOP event");
 
@@ -560,7 +553,7 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, GstEvent * event)
    * offset we just tried. If we run out of places to try, treat that as a fatal
    * error.
    */
-  if (!find_seek_offset_time (rmdemux, rmdemux->segment.last_stop)) {
+  if (!find_seek_offset_time (rmdemux, rmdemux->segment.position)) {
     GST_LOG_OBJECT (rmdemux, "Failed to find seek offset by time");
     ret = FALSE;
     goto done;
@@ -586,7 +579,7 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, GstEvent * event)
     rmdemux->state = RMDEMUX_STATE_DATA_PACKET;
 
     if (flush)
-      gst_rmdemux_send_event (rmdemux, gst_event_new_flush_stop ());
+      gst_rmdemux_send_event (rmdemux, gst_event_new_flush_stop (TRUE));
 
     /* must send newsegment event from streaming thread, so just set flag */
     rmdemux->need_newsegment = TRUE;
@@ -595,13 +588,13 @@ gst_rmdemux_perform_seek (GstRMDemux * rmdemux, GstEvent * event)
     if (rmdemux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
       gst_element_post_message (GST_ELEMENT_CAST (rmdemux),
           gst_message_new_segment_start (GST_OBJECT_CAST (rmdemux),
-              GST_FORMAT_TIME, rmdemux->segment.last_stop));
+              GST_FORMAT_TIME, rmdemux->segment.position));
     }
 
     /* restart our task since it might have been stopped when we did the 
      * flush. */
     gst_pad_start_task (rmdemux->sinkpad, (GstTaskFunction) gst_rmdemux_loop,
-        rmdemux->sinkpad);
+        rmdemux->sinkpad, NULL);
   }
 
 done:
@@ -619,12 +612,12 @@ error:
 
 
 static gboolean
-gst_rmdemux_src_query (GstPad * pad, GstQuery * query)
+gst_rmdemux_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
 {
   gboolean res = FALSE;
   GstRMDemux *rmdemux;
 
-  rmdemux = GST_RMDEMUX (gst_pad_get_parent (pad));
+  rmdemux = GST_RMDEMUX (parent);
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_POSITION:
@@ -661,26 +654,44 @@ gst_rmdemux_src_query (GstPad * pad, GstQuery * query)
       }
       break;
     }
+    case GST_QUERY_SEGMENT:
+    {
+      GstFormat format;
+      gint64 start, stop;
+
+      format = rmdemux->segment.format;
+
+      start =
+          gst_segment_to_stream_time (&rmdemux->segment, format,
+          rmdemux->segment.start);
+      if ((stop = rmdemux->segment.stop) == -1)
+        stop = rmdemux->segment.duration;
+      else
+        stop = gst_segment_to_stream_time (&rmdemux->segment, format, stop);
+
+      gst_query_set_segment (query, rmdemux->segment.rate, format, start, stop);
+      res = TRUE;
+      break;
+    }
     default:
-      res = gst_pad_query_default (pad, query);
+      res = gst_pad_query_default (pad, parent, query);
       break;
   }
 
-  gst_object_unref (rmdemux);
   return res;
 }
 
-static const GstQueryType *
-gst_rmdemux_src_query_types (GstPad * pad)
+static void
+gst_rmdemux_free_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
 {
-  static const GstQueryType query_types[] = {
-    GST_QUERY_POSITION,
-    GST_QUERY_DURATION,
-    GST_QUERY_SEEKING,
-    0
-  };
-
-  return query_types;
+  g_object_unref (stream->adapter);
+  gst_rmdemux_stream_clear_cached_subpackets (rmdemux, stream);
+  if (stream->pending_tags)
+    gst_tag_list_unref (stream->pending_tags);
+  if (stream->subpackets)
+    g_ptr_array_free (stream->subpackets, TRUE);
+  g_free (stream->index);
+  g_free (stream);
 }
 
 static void
@@ -695,21 +706,20 @@ gst_rmdemux_reset (GstRMDemux * rmdemux)
   for (cur = rmdemux->streams; cur; cur = cur->next) {
     GstRMDemuxStream *stream = cur->data;
 
-    g_object_unref (stream->adapter);
-    gst_rmdemux_stream_clear_cached_subpackets (rmdemux, stream);
+    gst_flow_combiner_remove_pad (rmdemux->flowcombiner, stream->pad);
     gst_element_remove_pad (GST_ELEMENT (rmdemux), stream->pad);
-    if (stream->pending_tags)
-      gst_tag_list_free (stream->pending_tags);
-    if (stream->subpackets)
-      g_ptr_array_free (stream->subpackets, TRUE);
-    g_free (stream->index);
-    g_free (stream);
+    gst_rmdemux_free_stream (rmdemux, stream);
   }
   g_slist_free (rmdemux->streams);
   rmdemux->streams = NULL;
   rmdemux->n_audio_streams = 0;
   rmdemux->n_video_streams = 0;
 
+  if (rmdemux->pending_tags != NULL) {
+    gst_tag_list_unref (rmdemux->pending_tags);
+    rmdemux->pending_tags = NULL;
+  }
+
   gst_adapter_clear (rmdemux->adapter);
   rmdemux->state = RMDEMUX_STATE_HEADER;
   rmdemux->have_pads = FALSE;
@@ -718,6 +728,9 @@ gst_rmdemux_reset (GstRMDemux * rmdemux)
   rmdemux->first_ts = GST_CLOCK_TIME_NONE;
   rmdemux->base_ts = GST_CLOCK_TIME_NONE;
   rmdemux->need_newsegment = TRUE;
+
+  rmdemux->have_group_id = FALSE;
+  rmdemux->group_id = G_MAXUINT;
 }
 
 static GstStateChangeReturn
@@ -766,55 +779,71 @@ gst_rmdemux_change_state (GstElement * element, GstStateChange transition)
  * pull based.
  */
 static gboolean
-gst_rmdemux_sink_activate (GstPad * sinkpad)
+gst_rmdemux_sink_activate (GstPad * sinkpad, GstObject * parent)
 {
-  if (gst_pad_check_pull_range (sinkpad)) {
-    return gst_pad_activate_pull (sinkpad, TRUE);
-  } else {
-    return gst_pad_activate_push (sinkpad, TRUE);
-  }
-}
+  GstQuery *query;
+  gboolean pull_mode;
 
-/* this function gets called when we activate ourselves in push mode.
- * We cannot seek (ourselves) in the stream */
-static gboolean
-gst_rmdemux_sink_activate_push (GstPad * pad, gboolean active)
-{
-  GstRMDemux *rmdemux;
+  query = gst_query_new_scheduling ();
 
-  rmdemux = GST_RMDEMUX (GST_PAD_PARENT (pad));
+  if (!gst_pad_peer_query (sinkpad, query)) {
+    gst_query_unref (query);
+    goto activate_push;
+  }
 
-  GST_DEBUG_OBJECT (rmdemux, "activate_push");
+  pull_mode = gst_query_has_scheduling_mode_with_flags (query,
+      GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE);
+  gst_query_unref (query);
 
-  rmdemux->seekable = FALSE;
+  if (!pull_mode)
+    goto activate_push;
 
-  return TRUE;
+  GST_DEBUG_OBJECT (sinkpad, "activating pull");
+  return gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, TRUE);
+
+activate_push:
+  {
+    GST_DEBUG_OBJECT (sinkpad, "activating push");
+    return gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PUSH, TRUE);
+  }
 }
 
-/* this function gets called when we activate ourselves in pull mode.
- * We can perform  random access to the resource and we start a task
- * to start reading */
 static gboolean
-gst_rmdemux_sink_activate_pull (GstPad * pad, gboolean active)
+gst_rmdemux_sink_activate_mode (GstPad * sinkpad, GstObject * parent,
+    GstPadMode mode, gboolean active)
 {
-  GstRMDemux *rmdemux;
+  gboolean res;
+  GstRMDemux *demux;
 
-  rmdemux = GST_RMDEMUX (GST_PAD_PARENT (pad));
+  demux = GST_RMDEMUX (parent);
 
-  GST_DEBUG_OBJECT (rmdemux, "activate_pull");
-
-  if (active) {
-    rmdemux->seekable = TRUE;
-    rmdemux->offset = 0;
-    rmdemux->loop_state = RMDEMUX_LOOP_STATE_HEADER;
-    rmdemux->data_offset = G_MAXUINT;
-
-    return gst_pad_start_task (pad, (GstTaskFunction) gst_rmdemux_loop, pad);
-  } else {
-    return gst_pad_stop_task (pad);
+  switch (mode) {
+    case GST_PAD_MODE_PUSH:
+      demux->seekable = FALSE;
+      demux->running = active;
+      res = TRUE;
+      break;
+    case GST_PAD_MODE_PULL:
+      if (active) {
+        demux->seekable = TRUE;
+        demux->offset = 0;
+        demux->loop_state = RMDEMUX_LOOP_STATE_HEADER;
+        demux->data_offset = G_MAXUINT;
+        res =
+            gst_pad_start_task (sinkpad, (GstTaskFunction) gst_rmdemux_loop,
+            sinkpad, NULL);
+      } else {
+        res = gst_pad_stop_task (sinkpad);
+      }
+      break;
+    default:
+      res = FALSE;
+      break;
   }
+  return res;
 }
 
+
 /* random access mode - just pass over to our chain function */
 static void
 gst_rmdemux_loop (GstPad * pad)
@@ -841,7 +870,7 @@ gst_rmdemux_loop (GstPad * pad)
       break;
     case RMDEMUX_STATE_EOS:
       GST_LOG_OBJECT (rmdemux, "At EOS, pausing task");
-      ret = GST_FLOW_UNEXPECTED;
+      ret = GST_FLOW_EOS;
       goto need_pause;
     default:
       GST_LOG_OBJECT (rmdemux, "Default: requires %d bytes (state is %d)",
@@ -849,6 +878,7 @@ gst_rmdemux_loop (GstPad * pad)
       size = rmdemux->size;
   }
 
+  buffer = NULL;
   ret = gst_pad_pull_range (pad, rmdemux->offset, size, &buffer);
   if (ret != GST_FLOW_OK) {
     if (rmdemux->offset == rmdemux->index_offset) {
@@ -868,10 +898,10 @@ gst_rmdemux_loop (GstPad * pad)
     }
   }
 
-  size = GST_BUFFER_SIZE (buffer);
+  size = gst_buffer_get_size (buffer);
 
   /* Defer to the chain function */
-  ret = gst_rmdemux_chain (pad, buffer);
+  ret = gst_rmdemux_chain (pad, GST_OBJECT_CAST (rmdemux), buffer);
   if (ret != GST_FLOW_OK) {
     GST_DEBUG_OBJECT (rmdemux, "Chain flow failed at offset 0x%08x",
         rmdemux->offset);
@@ -918,7 +948,7 @@ need_pause:
     rmdemux->segment_running = FALSE;
     gst_pad_pause_task (rmdemux->sinkpad);
 
-    if (ret == GST_FLOW_UNEXPECTED) {
+    if (ret == GST_FLOW_EOS) {
       /* perform EOS logic */
       if (rmdemux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
         gint64 stop;
@@ -932,14 +962,15 @@ need_pause:
         gst_element_post_message (GST_ELEMENT (rmdemux),
             gst_message_new_segment_done (GST_OBJECT (rmdemux),
                 GST_FORMAT_TIME, stop));
+        gst_rmdemux_send_event (rmdemux,
+            gst_event_new_segment_done (GST_FORMAT_TIME, stop));
       } else {
         /* normal playback, send EOS to all linked pads */
         GST_LOG_OBJECT (rmdemux, "Sending EOS, at end of stream");
         gst_rmdemux_send_event (rmdemux, gst_event_new_eos ());
       }
-    } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
-      GST_ELEMENT_ERROR (rmdemux, STREAM, FAILED,
-          (NULL), ("stream stopped, reason %s", reason));
+    } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
+      GST_ELEMENT_FLOW_ERROR (rmdemux, ret);
       gst_rmdemux_send_event (rmdemux, gst_event_new_eos ());
     }
     return;
@@ -960,25 +991,29 @@ gst_rmdemux_fourcc_isplausible (guint32 fourcc)
 }
 
 static GstFlowReturn
-gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
+gst_rmdemux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstFlowReturn ret = GST_FLOW_OK;
   const guint8 *data;
   guint16 version;
   guint avail;
 
-  GstRMDemux *rmdemux = GST_RMDEMUX (GST_PAD_PARENT (pad));
+  GstRMDemux *rmdemux = GST_RMDEMUX (parent);
 
   if (rmdemux->base_ts == -1) {
-    rmdemux->base_ts = GST_BUFFER_TIMESTAMP (buffer);
+    if (GST_BUFFER_DTS_IS_VALID (buffer))
+      rmdemux->base_ts = GST_BUFFER_DTS (buffer);
+    else
+      rmdemux->base_ts = GST_BUFFER_PTS (buffer);
+
     GST_LOG_OBJECT (rmdemux, "base_ts %" GST_TIME_FORMAT,
         GST_TIME_ARGS (rmdemux->base_ts));
   }
 
   gst_adapter_push (rmdemux->adapter, buffer);
 
-  GST_LOG_OBJECT (rmdemux, "Chaining buffer of size %d",
-      GST_BUFFER_SIZE (buffer));
+  GST_LOG_OBJECT (rmdemux, "Chaining buffer of size %" G_GSIZE_FORMAT,
+      gst_buffer_get_size (buffer));
 
   while (TRUE) {
     avail = gst_adapter_available (rmdemux->adapter);
@@ -990,7 +1025,7 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
         if (gst_adapter_available (rmdemux->adapter) < HEADER_SIZE)
           goto unlock;
 
-        data = gst_adapter_peek (rmdemux->adapter, HEADER_SIZE);
+        data = gst_adapter_map (rmdemux->adapter, HEADER_SIZE);
 
         rmdemux->object_id = RMDEMUX_FOURCC_GET (data + 0);
         rmdemux->size = RMDEMUX_GUINT32_GET (data + 4) - HEADER_SIZE;
@@ -1004,6 +1039,7 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
            * happen. */
           GST_WARNING_OBJECT (rmdemux, "Bogus looking header, unprintable "
               "FOURCC");
+          gst_adapter_unmap (rmdemux->adapter);
           gst_adapter_flush (rmdemux->adapter, 4);
 
           break;
@@ -1015,6 +1051,7 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
             GST_FOURCC_ARGS (rmdemux->object_id), rmdemux->size,
             rmdemux->object_version);
 
+        gst_adapter_unmap (rmdemux->adapter);
         gst_adapter_flush (rmdemux->adapter, HEADER_SIZE);
 
         switch (rmdemux->object_id) {
@@ -1060,12 +1097,13 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
           goto unlock;
 
         if ((rmdemux->object_version == 0) || (rmdemux->object_version == 1)) {
-          data = gst_adapter_peek (rmdemux->adapter, rmdemux->size);
-
+          data = gst_adapter_map (rmdemux->adapter, rmdemux->size);
           gst_rmdemux_parse__rmf (rmdemux, data, rmdemux->size);
+          gst_adapter_unmap (rmdemux->adapter);
+          gst_adapter_flush (rmdemux->adapter, rmdemux->size);
+        } else {
+          gst_adapter_flush (rmdemux->adapter, rmdemux->size);
         }
-
-        gst_adapter_flush (rmdemux->adapter, rmdemux->size);
         rmdemux->state = RMDEMUX_STATE_HEADER;
         break;
       }
@@ -1073,11 +1111,12 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
       {
         if (gst_adapter_available (rmdemux->adapter) < rmdemux->size)
           goto unlock;
-        data = gst_adapter_peek (rmdemux->adapter, rmdemux->size);
 
+        data = gst_adapter_map (rmdemux->adapter, rmdemux->size);
         gst_rmdemux_parse_prop (rmdemux, data, rmdemux->size);
-
+        gst_adapter_unmap (rmdemux->adapter);
         gst_adapter_flush (rmdemux->adapter, rmdemux->size);
+
         rmdemux->state = RMDEMUX_STATE_HEADER;
         break;
       }
@@ -1085,11 +1124,12 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
       {
         if (gst_adapter_available (rmdemux->adapter) < rmdemux->size)
           goto unlock;
-        data = gst_adapter_peek (rmdemux->adapter, rmdemux->size);
 
+        data = gst_adapter_map (rmdemux->adapter, rmdemux->size);
         gst_rmdemux_parse_mdpr (rmdemux, data, rmdemux->size);
-
+        gst_adapter_unmap (rmdemux->adapter);
         gst_adapter_flush (rmdemux->adapter, rmdemux->size);
+
         rmdemux->state = RMDEMUX_STATE_HEADER;
         break;
       }
@@ -1097,11 +1137,12 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
       {
         if (gst_adapter_available (rmdemux->adapter) < rmdemux->size)
           goto unlock;
-        data = gst_adapter_peek (rmdemux->adapter, rmdemux->size);
 
+        data = gst_adapter_map (rmdemux->adapter, rmdemux->size);
         gst_rmdemux_parse_cont (rmdemux, data, rmdemux->size);
-
+        gst_adapter_unmap (rmdemux->adapter);
         gst_adapter_flush (rmdemux->adapter, rmdemux->size);
+
         rmdemux->state = RMDEMUX_STATE_HEADER;
         break;
       }
@@ -1116,15 +1157,14 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
 
         /* The actual header is only 8 bytes */
         rmdemux->size = DATA_SIZE;
-        GST_LOG_OBJECT (rmdemux, "data available %d",
+        GST_LOG_OBJECT (rmdemux, "data available %" G_GSIZE_FORMAT,
             gst_adapter_available (rmdemux->adapter));
         if (gst_adapter_available (rmdemux->adapter) < rmdemux->size)
           goto unlock;
 
-        data = gst_adapter_peek (rmdemux->adapter, rmdemux->size);
-
+        data = gst_adapter_map (rmdemux->adapter, rmdemux->size);
         gst_rmdemux_parse_data (rmdemux, data, rmdemux->size);
-
+        gst_adapter_unmap (rmdemux->adapter);
         gst_adapter_flush (rmdemux->adapter, rmdemux->size);
 
         rmdemux->state = RMDEMUX_STATE_DATA_PACKET;
@@ -1134,11 +1174,11 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
       {
         if (gst_adapter_available (rmdemux->adapter) < rmdemux->size)
           goto unlock;
-        data = gst_adapter_peek (rmdemux->adapter, rmdemux->size);
 
+        data = gst_adapter_map (rmdemux->adapter, rmdemux->size);
         rmdemux->size = gst_rmdemux_parse_indx (rmdemux, data, rmdemux->size);
-
         /* Only flush the header */
+        gst_adapter_unmap (rmdemux->adapter);
         gst_adapter_flush (rmdemux->adapter, HEADER_SIZE);
 
         rmdemux->state = RMDEMUX_STATE_INDX_DATA;
@@ -1151,10 +1191,9 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
           if (gst_adapter_available (rmdemux->adapter) < rmdemux->size)
             goto unlock;
 
-          data = gst_adapter_peek (rmdemux->adapter, rmdemux->size);
-
+          data = gst_adapter_map (rmdemux->adapter, rmdemux->size);
           gst_rmdemux_parse_indx_data (rmdemux, data, rmdemux->size);
-
+          gst_adapter_unmap (rmdemux->adapter);
           gst_adapter_flush (rmdemux->adapter, rmdemux->size);
         }
 
@@ -1163,11 +1202,13 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
       }
       case RMDEMUX_STATE_DATA_PACKET:
       {
+        guint8 header[4];
+
         if (gst_adapter_available (rmdemux->adapter) < 2)
           goto unlock;
 
-        data = gst_adapter_peek (rmdemux->adapter, 2);
-        version = RMDEMUX_GUINT16_GET (data);
+        gst_adapter_copy (rmdemux->adapter, header, 0, 2);
+        version = RMDEMUX_GUINT16_GET (header);
         GST_LOG_OBJECT (rmdemux, "Data packet with version=%d", version);
 
         if (version == 0 || version == 1) {
@@ -1175,9 +1216,10 @@ gst_rmdemux_chain (GstPad * pad, GstBuffer * buffer)
 
           if (gst_adapter_available (rmdemux->adapter) < 4)
             goto unlock;
-          data = gst_adapter_peek (rmdemux->adapter, 4);
 
-          length = RMDEMUX_GUINT16_GET (data + 2);
+          gst_adapter_copy (rmdemux->adapter, header, 0, 4);
+
+          length = RMDEMUX_GUINT16_GET (header + 2);
           GST_LOG_OBJECT (rmdemux, "Got length %d", length);
 
           if (length < 4) {
@@ -1265,7 +1307,6 @@ gst_rmdemux_send_event (GstRMDemux * rmdemux, GstEvent * event)
         stream->next_ts = -1;
         stream->last_seq = -1;
         stream->next_seq = -1;
-        stream->last_flow = GST_FLOW_OK;
         break;
       default:
         break;
@@ -1282,10 +1323,11 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
   GstCaps *stream_caps = NULL;
   const gchar *codec_tag = NULL;
   gchar *codec_name = NULL;
+  gchar *stream_id;
   int version = 0;
 
   if (stream->subtype == GST_RMDEMUX_STREAM_VIDEO) {
-    char *name = g_strdup_printf ("video_%02d", rmdemux->n_video_streams);
+    char *name = g_strdup_printf ("video_%u", rmdemux->n_video_streams);
 
     stream->pad =
         gst_pad_new_from_static_template (&gst_rmdemux_videosrc_template, name);
@@ -1308,7 +1350,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
         break;
       default:
         stream_caps = gst_caps_new_simple ("video/x-unknown-fourcc",
-            "fourcc", GST_TYPE_FOURCC, stream->fourcc, NULL);
+            "fourcc", G_TYPE_UINT, stream->fourcc, NULL);
         GST_WARNING_OBJECT (rmdemux,
             "Unknown video FOURCC code \"%" GST_FOURCC_FORMAT "\" (%08x)",
             GST_FOURCC_ARGS (stream->fourcc), stream->fourcc);
@@ -1333,7 +1375,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
     rmdemux->n_video_streams++;
 
   } else if (stream->subtype == GST_RMDEMUX_STREAM_AUDIO) {
-    char *name = g_strdup_printf ("audio_%02d", rmdemux->n_audio_streams);
+    char *name = g_strdup_printf ("audio_%u", rmdemux->n_audio_streams);
 
     stream->pad =
         gst_pad_new_from_static_template (&gst_rmdemux_audiosrc_template, name);
@@ -1380,7 +1422,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
 
         /* Sony ATRAC3 */
       case GST_RM_AUD_ATRC:
-        stream_caps = gst_caps_new_simple ("audio/x-vnd.sony.atrac3", NULL);
+        stream_caps = gst_caps_new_empty_simple ("audio/x-vnd.sony.atrac3");
         stream->needs_descrambling = TRUE;
         stream->subpackets_needed = stream->height;
         stream->subpackets = NULL;
@@ -1397,7 +1439,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
         /* RALF is lossless */
       case GST_RM_AUD_RALF:
         GST_DEBUG_OBJECT (rmdemux, "RALF");
-        stream_caps = gst_caps_new_simple ("audio/x-ralf-mpeg4-generic", NULL);
+        stream_caps = gst_caps_new_empty_simple ("audio/x-ralf-mpeg4-generic");
         break;
 
       case GST_RM_AUD_SIPR:
@@ -1405,12 +1447,13 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
         if (stream->flavor > 3) {
           GST_WARNING_OBJECT (rmdemux, "bad SIPR flavor %d, freeing it",
               stream->flavor);
-          g_free (stream);
+          g_object_unref (stream->pad);
+          gst_rmdemux_free_stream (rmdemux, stream);
           goto beach;
         }
 
         GST_DEBUG_OBJECT (rmdemux, "SIPR");
-        stream_caps = gst_caps_new_simple ("audio/x-sipro", NULL);
+        stream_caps = gst_caps_new_empty_simple ("audio/x-sipro");
         stream->needs_descrambling = TRUE;
         stream->subpackets_needed = stream->height;
         stream->subpackets = NULL;
@@ -1420,7 +1463,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
 
       default:
         stream_caps = gst_caps_new_simple ("video/x-unknown-fourcc",
-            "fourcc", GST_TYPE_FOURCC, stream->fourcc, NULL);
+            "fourcc", G_TYPE_UINT, stream->fourcc, NULL);
         GST_WARNING_OBJECT (rmdemux,
             "Unknown audio FOURCC code \"%" GST_FOURCC_FORMAT "\" (%08x)",
             GST_FOURCC_ARGS (stream->fourcc), stream->fourcc);
@@ -1448,7 +1491,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
   } else {
     GST_WARNING_OBJECT (rmdemux, "not adding stream of type %d, freeing it",
         stream->subtype);
-    g_free (stream);
+    gst_rmdemux_free_stream (rmdemux, stream);
     goto beach;
   }
 
@@ -1461,6 +1504,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
       stream_caps);
 
   if (stream->pad && stream_caps) {
+    GstEvent *event;
 
     GST_LOG_OBJECT (rmdemux, "%d bytes of extra data for stream %s",
         stream->extra_data_size, GST_PAD_NAME (stream->pad));
@@ -1470,8 +1514,7 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
       GstBuffer *buffer;
 
       buffer = gst_buffer_new_and_alloc (stream->extra_data_size);
-      memcpy (GST_BUFFER_DATA (buffer), stream->extra_data,
-          stream->extra_data_size);
+      gst_buffer_fill (buffer, 0, stream->extra_data, stream->extra_data_size);
 
       gst_caps_set_simple (stream_caps, "codec_data", GST_TYPE_BUFFER,
           buffer, NULL);
@@ -1481,29 +1524,53 @@ gst_rmdemux_add_stream (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
 
     gst_pad_use_fixed_caps (stream->pad);
 
-    gst_pad_set_caps (stream->pad, stream_caps);
     gst_pad_set_event_function (stream->pad,
         GST_DEBUG_FUNCPTR (gst_rmdemux_src_event));
-    gst_pad_set_query_type_function (stream->pad,
-        GST_DEBUG_FUNCPTR (gst_rmdemux_src_query_types));
     gst_pad_set_query_function (stream->pad,
         GST_DEBUG_FUNCPTR (gst_rmdemux_src_query));
 
     GST_DEBUG_OBJECT (rmdemux, "adding pad %s with caps %" GST_PTR_FORMAT
         ", stream_id=%d", GST_PAD_NAME (stream->pad), stream_caps, stream->id);
     gst_pad_set_active (stream->pad, TRUE);
-    gst_element_add_pad (GST_ELEMENT_CAST (rmdemux), stream->pad);
+
+    stream_id =
+        gst_pad_create_stream_id_printf (stream->pad,
+        GST_ELEMENT_CAST (rmdemux), "%03u", stream->id);
+
+    event =
+        gst_pad_get_sticky_event (rmdemux->sinkpad, GST_EVENT_STREAM_START, 0);
+    if (event) {
+      if (gst_event_parse_group_id (event, &rmdemux->group_id))
+        rmdemux->have_group_id = TRUE;
+      else
+        rmdemux->have_group_id = FALSE;
+      gst_event_unref (event);
+    } else if (!rmdemux->have_group_id) {
+      rmdemux->have_group_id = TRUE;
+      rmdemux->group_id = gst_util_group_id_next ();
+    }
+
+    event = gst_event_new_stream_start (stream_id);
+    if (rmdemux->have_group_id)
+      gst_event_set_group_id (event, rmdemux->group_id);
+
+    gst_pad_push_event (stream->pad, event);
+    g_free (stream_id);
+
+    gst_pad_set_caps (stream->pad, stream_caps);
 
     codec_name = gst_pb_utils_get_codec_description (stream_caps);
 
     /* save for later, we must send the tags after the newsegment event */
     if (codec_tag != NULL && codec_name != NULL) {
       if (stream->pending_tags == NULL)
-        stream->pending_tags = gst_tag_list_new ();
+        stream->pending_tags = gst_tag_list_new_empty ();
       gst_tag_list_add (stream->pending_tags, GST_TAG_MERGE_KEEP,
           codec_tag, codec_name, NULL);
       g_free (codec_name);
     }
+    gst_element_add_pad (GST_ELEMENT_CAST (rmdemux), stream->pad);
+    gst_flow_combiner_add_pad (rmdemux->flowcombiner, stream->pad);
   }
 
 beach:
@@ -1574,7 +1641,6 @@ gst_rmdemux_parse_mdpr (GstRMDemux * rmdemux, const guint8 * data, int length)
   stream->seek_offset = 0;
   stream->last_ts = -1;
   stream->next_ts = -1;
-  stream->last_flow = GST_FLOW_OK;
   stream->discont = TRUE;
   stream->adapter = gst_adapter_new ();
   GST_LOG_OBJECT (rmdemux, "stream_number=%d", stream->id);
@@ -1587,13 +1653,13 @@ gst_rmdemux_parse_mdpr (GstRMDemux * rmdemux, const guint8 * data, int length)
   GST_LOG_OBJECT (rmdemux, "Stream avg bitrate=%u", avg_bitrate);
   if (max_bitrate != 0) {
     if (stream->pending_tags == NULL)
-      stream->pending_tags = gst_tag_list_new ();
+      stream->pending_tags = gst_tag_list_new_empty ();
     gst_tag_list_add (stream->pending_tags, GST_TAG_MERGE_REPLACE,
         GST_TAG_MAXIMUM_BITRATE, max_bitrate, NULL);
   }
   if (avg_bitrate != 0) {
     if (stream->pending_tags == NULL)
-      stream->pending_tags = gst_tag_list_new ();
+      stream->pending_tags = gst_tag_list_new_empty ();
     gst_tag_list_add (stream->pending_tags, GST_TAG_MERGE_REPLACE,
         GST_TAG_BITRATE, avg_bitrate, NULL);
   }
@@ -1864,42 +1930,21 @@ gst_rmdemux_parse_cont (GstRMDemux * rmdemux, const guint8 * data, int length)
   GstTagList *tags;
 
   tags = gst_rm_utils_read_tags (data, length, gst_rm_utils_read_string16);
-  if (tags) {
-    gst_element_found_tags (GST_ELEMENT (rmdemux), tags);
-  }
-}
 
-static GstFlowReturn
-gst_rmdemux_combine_flows (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
-    GstFlowReturn ret)
-{
-  GSList *cur;
+  if (tags) {
+    GstTagList *old_tags = rmdemux->pending_tags;
 
-  /* store the value */
-  stream->last_flow = ret;
+    GST_LOG_OBJECT (rmdemux, "tags: %" GST_PTR_FORMAT, tags);
 
-  /* if it's success we can return the value right away */
-  if (ret == GST_FLOW_OK)
-    goto done;
+    rmdemux->pending_tags =
+        gst_tag_list_merge (old_tags, tags, GST_TAG_MERGE_APPEND);
 
-  /* any other error that is not-linked can be returned right
-   * away */
-  if (ret != GST_FLOW_NOT_LINKED)
-    goto done;
+    gst_tag_list_unref (tags);
+    if (old_tags)
+      gst_tag_list_unref (old_tags);
 
-  for (cur = rmdemux->streams; cur; cur = cur->next) {
-    GstRMDemuxStream *ostream = cur->data;
-
-    ret = ostream->last_flow;
-    /* some other return value (must be SUCCESS but we can return
-     * other values as well) */
-    if (ret != GST_FLOW_NOT_LINKED)
-      goto done;
+    gst_tag_list_set_scope (rmdemux->pending_tags, GST_TAG_SCOPE_GLOBAL);
   }
-  /* if we get here, all other pads were unlinked and we return
-   * NOT_LINKED then */
-done:
-  return ret;
 }
 
 static void
@@ -1920,6 +1965,7 @@ gst_rmdemux_descramble_audio (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
 {
   GstFlowReturn ret = GST_FLOW_ERROR;
   GstBuffer *outbuf;
+  GstMapInfo outmap;
   guint packet_size = stream->packet_size;
   guint height = stream->subpackets->len;
   guint leaf_size = stream->leaf_size;
@@ -1931,40 +1977,49 @@ gst_rmdemux_descramble_audio (GstRMDemux * rmdemux, GstRMDemuxStream * stream)
       leaf_size, height);
 
   outbuf = gst_buffer_new_and_alloc (height * packet_size);
-  gst_buffer_set_caps (outbuf, GST_PAD_CAPS (stream->pad));
+  gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
 
   for (p = 0; p < height; ++p) {
     GstBuffer *b = g_ptr_array_index (stream->subpackets, p);
-    guint8 *b_data = GST_BUFFER_DATA (b);
+    GstMapInfo map;
 
-    if (p == 0)
-      GST_BUFFER_TIMESTAMP (outbuf) = GST_BUFFER_TIMESTAMP (b);
+    gst_buffer_map (b, &map, GST_MAP_READ);
+
+    if (p == 0) {
+      GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (b);
+      GST_BUFFER_DTS (outbuf) = GST_BUFFER_DTS (b);
+    }
 
     for (x = 0; x < packet_size / leaf_size; ++x) {
       guint idx;
 
       idx = height * x + ((height + 1) / 2) * (p % 2) + (p / 2);
+
       /* GST_LOG ("%3u => %3u", (height * p) + x, idx); */
-      memcpy (GST_BUFFER_DATA (outbuf) + leaf_size * idx, b_data, leaf_size);
-      b_data += leaf_size;
+      memcpy (outmap.data + leaf_size * idx, map.data + leaf_size * x,
+          leaf_size);
     }
+    gst_buffer_unmap (b, &map);
   }
+  gst_buffer_unmap (outbuf, &outmap);
 
   /* some decoders, such as realaudiodec, need to be fed in packet units */
   for (p = 0; p < height; ++p) {
     GstBuffer *subbuf;
 
-    subbuf = gst_buffer_create_sub (outbuf, p * packet_size, packet_size);
+    subbuf =
+        gst_buffer_copy_region (outbuf, GST_BUFFER_COPY_ALL, p * packet_size,
+        packet_size);
 
-    GST_LOG_OBJECT (rmdemux, "pushing buffer timestamp %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (subbuf)));
+    GST_LOG_OBJECT (rmdemux, "pushing buffer dts %" GST_TIME_FORMAT ", pts %"
+        GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_DTS (subbuf)),
+        GST_TIME_ARGS (GST_BUFFER_PTS (subbuf)));
 
     if (stream->discont) {
       GST_BUFFER_FLAG_SET (subbuf, GST_BUFFER_FLAG_DISCONT);
       stream->discont = FALSE;
     }
 
-    gst_buffer_set_caps (subbuf, GST_PAD_CAPS (stream->pad));
     ret = gst_pad_push (stream->pad, subbuf);
     if (ret != GST_FLOW_OK)
       break;
@@ -2003,7 +2058,7 @@ gst_rmdemux_descramble_mp4a_audio (GstRMDemux * rmdemux,
   GstFlowReturn res;
   GstBuffer *buf, *outbuf;
   guint frames, index, i;
-  guint8 *data;
+  GstMapInfo map;
   GstClockTime timestamp;
 
   res = GST_FLOW_OK;
@@ -2012,19 +2067,20 @@ gst_rmdemux_descramble_mp4a_audio (GstRMDemux * rmdemux,
   g_ptr_array_index (stream->subpackets, 0) = NULL;
   g_ptr_array_set_size (stream->subpackets, 0);
 
-  data = GST_BUFFER_DATA (buf);
-  timestamp = GST_BUFFER_TIMESTAMP (buf);
+  gst_buffer_map (buf, &map, GST_MAP_READ);
+  timestamp = GST_BUFFER_PTS (buf);
 
-  frames = (data[1] & 0xf0) >> 4;
+  frames = (map.data[1] & 0xf0) >> 4;
   index = 2 * frames + 2;
 
   for (i = 0; i < frames; i++) {
-    guint len = (data[i * 2 + 2] << 8) | data[i * 2 + 3];
+    guint len = (map.data[i * 2 + 2] << 8) | map.data[i * 2 + 3];
 
-    outbuf = gst_buffer_create_sub (buf, index, len);
-    if (i == 0)
-      GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
-    gst_buffer_set_caps (outbuf, GST_PAD_CAPS (stream->pad));
+    outbuf = gst_buffer_copy_region (buf, GST_BUFFER_COPY_ALL, index, len);
+    if (i == 0) {
+      GST_BUFFER_PTS (outbuf) = timestamp;
+      GST_BUFFER_DTS (outbuf) = timestamp;
+    }
 
     index += len;
 
@@ -2036,6 +2092,7 @@ gst_rmdemux_descramble_mp4a_audio (GstRMDemux * rmdemux,
     if (res != GST_FLOW_OK)
       break;
   }
+  gst_buffer_unmap (buf, &map);
   gst_buffer_unref (buf);
   return res;
 }
@@ -2046,6 +2103,7 @@ gst_rmdemux_descramble_sipr_audio (GstRMDemux * rmdemux,
 {
   GstFlowReturn ret;
   GstBuffer *outbuf;
+  GstMapInfo outmap;
   guint packet_size = stream->packet_size;
   guint height = stream->subpackets->len;
   guint p;
@@ -2056,20 +2114,23 @@ gst_rmdemux_descramble_sipr_audio (GstRMDemux * rmdemux,
       stream->leaf_size, height);
 
   outbuf = gst_buffer_new_and_alloc (height * packet_size);
-  gst_buffer_set_caps (outbuf, GST_PAD_CAPS (stream->pad));
+  gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
 
   for (p = 0; p < height; ++p) {
     GstBuffer *b = g_ptr_array_index (stream->subpackets, p);
-    guint8 *b_data = GST_BUFFER_DATA (b);
 
-    if (p == 0)
-      GST_BUFFER_TIMESTAMP (outbuf) = GST_BUFFER_TIMESTAMP (b);
+    if (p == 0) {
+      GST_BUFFER_DTS (outbuf) = GST_BUFFER_DTS (b);
+      GST_BUFFER_PTS (outbuf) = GST_BUFFER_PTS (b);
+    }
 
-    memcpy (GST_BUFFER_DATA (outbuf) + packet_size * p, b_data, packet_size);
+    gst_buffer_extract (b, 0, outmap.data + packet_size * p, packet_size);
   }
+  gst_buffer_unmap (outbuf, &outmap);
 
-  GST_LOG_OBJECT (rmdemux, "pushing buffer timestamp %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)));
+  GST_LOG_OBJECT (rmdemux, "pushing buffer dts %" GST_TIME_FORMAT ", pts %"
+      GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
+      GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
 
   if (stream->discont) {
     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
@@ -2078,7 +2139,6 @@ gst_rmdemux_descramble_sipr_audio (GstRMDemux * rmdemux,
 
   outbuf = gst_rm_utils_descramble_sipr_buffer (outbuf);
 
-  gst_buffer_set_caps (outbuf, GST_PAD_CAPS (stream->pad));
   ret = gst_pad_push (stream->pad, outbuf);
 
   gst_rmdemux_stream_clear_cached_subpackets (rmdemux, stream);
@@ -2095,8 +2155,9 @@ gst_rmdemux_handle_scrambled_packet (GstRMDemux * rmdemux,
   if (stream->subpackets == NULL)
     stream->subpackets = g_ptr_array_sized_new (stream->subpackets_needed);
 
-  GST_LOG ("Got subpacket %u/%u, len=%u, key=%d", stream->subpackets->len + 1,
-      stream->subpackets_needed, GST_BUFFER_SIZE (buf), keyframe);
+  GST_LOG ("Got subpacket %u/%u, len=%" G_GSIZE_FORMAT ", key=%d",
+      stream->subpackets->len + 1, stream->subpackets_needed,
+      gst_buffer_get_size (buf), keyframe);
 
   if (keyframe && stream->subpackets->len > 0) {
     gst_rmdemux_stream_clear_cached_subpackets (rmdemux, stream);
@@ -2125,142 +2186,13 @@ gst_rmdemux_handle_scrambled_packet (GstRMDemux * rmdemux,
       ret = gst_rmdemux_descramble_sipr_audio (rmdemux, stream);
       break;
     default:
+      ret = GST_FLOW_ERROR;
       g_assert_not_reached ();
   }
 
   return ret;
 }
 
-static GstClockTime
-gst_rmdemux_fix_timestamp (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
-    guint8 * data, GstClockTime timestamp)
-{
-  guint8 frame_type;
-  guint16 seq;
-  GstClockTime ts = timestamp;
-
-  if (timestamp == GST_CLOCK_TIME_NONE)
-    goto done;
-
-  /* only adjust when we have a stream with B frames */
-  if (stream->format < 0x20200002)
-    goto done;
-
-  /* Fix timestamp. */
-  switch (stream->fourcc) {
-    case GST_RM_VDO_RV10:
-      goto done;
-    case GST_RM_VDO_RV20:
-    {
-      /*
-       * Bit  1- 2: frame type
-       * Bit  3- 9: ?
-       * Bit 10-22: sequence number
-       * Bit 23-32: ?
-       */
-      frame_type = (data[0] >> 6) & 0x03;
-      seq = ((data[1] & 0x7f) << 6) + ((data[2] & 0xfc) >> 2);
-      break;
-    }
-    case GST_RM_VDO_RV30:
-    {
-      /*
-       * Bit  1- 2: ?
-       * Bit     3: skip packet if 1
-       * Bit  4- 5: frame type
-       * Bit  6-12: ?
-       * Bit 13-25: sequence number
-       * Bit 26-32: ?
-       */
-      frame_type = (data[0] >> 3) & 0x03;
-      seq = ((data[1] & 0x0f) << 9) + (data[2] << 1) + ((data[3] & 0x80) >> 7);
-      break;
-    }
-    case GST_RM_VDO_RV40:
-    {
-      /*
-       * Bit     1: skip packet if 1
-       * Bit  2- 3: frame type
-       * Bit  4-13: ?
-       * Bit 14-26: sequence number
-       * Bit 27-32: ?
-       */
-      frame_type = (data[0] >> 5) & 0x03;
-      seq = ((data[1] & 0x07) << 10) + (data[2] << 2) + ((data[3] & 0xc0) >> 6);
-      break;
-    }
-    default:
-      goto unknown_version;
-  }
-
-  switch (frame_type) {
-    case 0:
-    case 1:
-    {
-      GST_LOG_OBJECT (rmdemux, "I frame %d", frame_type);
-      /* I frame */
-      if (stream->next_ts == -1)
-        stream->next_ts = timestamp;
-      else
-        timestamp = stream->next_ts;
-      stream->last_ts = stream->next_ts;
-      stream->next_ts = ts;
-      stream->last_seq = stream->next_seq;
-      stream->next_seq = seq;
-      break;
-    }
-    case 2:
-    {
-      GST_LOG_OBJECT (rmdemux, "P frame");
-      /* P frame */
-      timestamp = stream->last_ts = stream->next_ts;
-      if (seq < stream->next_seq)
-        stream->next_ts += (seq + 0x2000 - stream->next_seq) * GST_MSECOND;
-      else
-        stream->next_ts += (seq - stream->next_seq) * GST_MSECOND;
-      stream->last_seq = stream->next_seq;
-      stream->next_seq = seq;
-      break;
-    }
-    case 3:
-    {
-      GST_LOG_OBJECT (rmdemux, "B frame");
-      /* B frame */
-      if (seq < stream->last_seq) {
-        timestamp =
-            (seq + 0x2000 - stream->last_seq) * GST_MSECOND + stream->last_ts;
-      } else {
-        timestamp = (seq - stream->last_seq) * GST_MSECOND + stream->last_ts;
-      }
-      break;
-    }
-    default:
-      goto unknown_frame_type;
-  }
-
-done:
-  GST_LOG_OBJECT (rmdemux,
-      "timestamp %" GST_TIME_FORMAT " -> %" GST_TIME_FORMAT, GST_TIME_ARGS (ts),
-      GST_TIME_ARGS (timestamp));
-
-  return timestamp;
-
-  /* Errors */
-unknown_version:
-  {
-    GST_ELEMENT_ERROR (rmdemux, STREAM, DECODE,
-        ("Unknown version: %i.", stream->version), (NULL));
-    return GST_FLOW_ERROR;
-  }
-
-unknown_frame_type:
-  {
-    GST_ELEMENT_ERROR (rmdemux, STREAM, DECODE, ("Unknown frame type %d.",
-            frame_type), (NULL));
-    return GST_FLOW_ERROR;
-  }
-}
-
 #define PARSE_NUMBER(data, size, number, label) \
 G_STMT_START {                                  \
   if (size < 2)                                 \
@@ -2285,15 +2217,21 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
     GstClockTime timestamp, gboolean key)
 {
   GstFlowReturn ret;
-  const guint8 *data, *base;
-  guint size;
+  GstMapInfo map;
+  const guint8 *data;
+  gsize size;
+
+  gst_buffer_map (in, &map, GST_MAP_READ);
+
+  if (map.size < offset)
+    goto not_enough_data;
+
+  data = map.data + offset;
+  size = map.size - offset;
 
-  base = GST_BUFFER_DATA (in);
-  data = base + offset;
-  size = GST_BUFFER_SIZE (in) - offset;
   /* if size <= 2, we want this method to return the same GstFlowReturn as it
    * was previously for that given stream. */
-  ret = stream->last_flow;
+  ret = GST_PAD_LAST_FLOW_RETURN (stream->pad);
 
   while (size > 2) {
     guint8 pkg_header;
@@ -2339,8 +2277,9 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
     }
 
     GST_DEBUG_OBJECT (rmdemux,
-        "seq %d, subseq %d, offset %d, length %d, size %d, header %02x",
-        pkg_seqnum, pkg_subseq, pkg_offset, pkg_length, size, pkg_header);
+        "seq %d, subseq %d, offset %d, length %d, size %" G_GSIZE_FORMAT
+        ", header %02x", pkg_seqnum, pkg_subseq, pkg_offset, pkg_length, size,
+        pkg_header);
 
     /* calc size of fragment */
     if ((pkg_header & 0xc0) == 0x80) {
@@ -2353,8 +2292,13 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
     }
     GST_DEBUG_OBJECT (rmdemux, "fragment size %d", fragment_size);
 
+    if (map.size < (data - map.data) + fragment_size)
+      goto not_enough_data;
+
     /* get the fragment */
-    fragment = gst_buffer_create_sub (in, data - base, fragment_size);
+    fragment =
+        gst_buffer_copy_region (in, GST_BUFFER_COPY_ALL, data - map.data,
+        fragment_size);
 
     if (pkg_subseq == 1) {
       GST_DEBUG_OBJECT (rmdemux, "start new fragment");
@@ -2384,6 +2328,7 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
     /* flush fragment when complete */
     if (stream->frag_current >= stream->frag_length) {
       GstBuffer *out;
+      GstMapInfo outmap;
       guint8 *outdata;
       guint header_size;
       gint i, avail;
@@ -2407,7 +2352,8 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
       avail = gst_adapter_available (stream->adapter);
 
       out = gst_buffer_new_and_alloc (header_size + avail);
-      outdata = GST_BUFFER_DATA (out);
+      gst_buffer_map (out, &outmap, GST_MAP_WRITE);
+      outdata = outmap.data;
 
       /* create header */
       *outdata++ = stream->frag_count - 1;
@@ -2426,8 +2372,6 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
       stream->frag_count = 0;
       stream->frag_length = 0;
 
-      gst_buffer_set_caps (out, GST_PAD_CAPS (stream->pad));
-
       if (timestamp != -1) {
         if (rmdemux->first_ts != -1 && timestamp > rmdemux->first_ts)
           timestamp -= rmdemux->first_ts;
@@ -2437,10 +2381,11 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
         if (rmdemux->base_ts != -1)
           timestamp += rmdemux->base_ts;
       }
-      timestamp =
-          gst_rmdemux_fix_timestamp (rmdemux, stream, outdata, timestamp);
+      gst_buffer_unmap (out, &outmap);
 
-      GST_BUFFER_TIMESTAMP (out) = timestamp;
+      /* video has DTS */
+      GST_BUFFER_DTS (out) = timestamp;
+      GST_BUFFER_PTS (out) = GST_CLOCK_TIME_NONE;
 
       GST_LOG_OBJECT (rmdemux, "pushing timestamp %" GST_TIME_FORMAT,
           GST_TIME_ARGS (timestamp));
@@ -2455,7 +2400,7 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
       }
 
       ret = gst_pad_push (stream->pad, out);
-      ret = gst_rmdemux_combine_flows (rmdemux, stream, ret);
+      ret = gst_flow_combiner_update_flow (rmdemux->flowcombiner, ret);
       if (ret != GST_FLOW_OK)
         break;
 
@@ -2464,8 +2409,10 @@ gst_rmdemux_parse_video_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
     data += fragment_size;
     size -= fragment_size;
   }
-  GST_DEBUG_OBJECT (rmdemux, "%d bytes left", size);
+  GST_DEBUG_OBJECT (rmdemux, "%" G_GSIZE_FORMAT " bytes left", size);
 
+done:
+  gst_buffer_unmap (in, &map);
   gst_buffer_unref (in);
 
   return ret;
@@ -2475,16 +2422,16 @@ not_enough_data:
   {
     GST_ELEMENT_WARNING (rmdemux, STREAM, DECODE, ("Skipping bad packet."),
         (NULL));
-    gst_buffer_unref (in);
-    return GST_FLOW_OK;
+    ret = GST_FLOW_OK;
+    goto done;
   }
 too_many_fragments:
   {
     GST_ELEMENT_ERROR (rmdemux, STREAM, DECODE,
         ("Got more fragments (%u) than can be handled (%u)",
             stream->frag_count, MAX_FRAGS), (NULL));
-    gst_buffer_unref (in);
-    return GST_FLOW_ERROR;
+    ret = GST_FLOW_ERROR;
+    goto done;
   }
 }
 
@@ -2495,16 +2442,11 @@ gst_rmdemux_parse_audio_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
 {
   GstFlowReturn ret;
   GstBuffer *buffer;
-  const guint8 *data;
-  guint size;
-
-  data = GST_BUFFER_DATA (in) + offset;
-  size = GST_BUFFER_SIZE (in) - offset;
 
-  buffer = gst_buffer_new_and_alloc (size);
-  gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad));
+  if (gst_buffer_get_size (in) < offset)
+    goto not_enough_data;
 
-  memcpy (GST_BUFFER_DATA (buffer), (guint8 *) data, size);
+  buffer = gst_buffer_copy_region (in, GST_BUFFER_COPY_MEMORY, offset, -1);
 
   if (rmdemux->first_ts != -1 && timestamp > rmdemux->first_ts)
     timestamp -= rmdemux->first_ts;
@@ -2514,7 +2456,8 @@ gst_rmdemux_parse_audio_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
   if (rmdemux->base_ts != -1)
     timestamp += rmdemux->base_ts;
 
-  GST_BUFFER_TIMESTAMP (buffer) = timestamp;
+  GST_BUFFER_PTS (buffer) = timestamp;
+  GST_BUFFER_DTS (buffer) = timestamp;
 
   if (stream->needs_descrambling) {
     GST_LOG_OBJECT (rmdemux, "descramble timestamp %" GST_TIME_FORMAT,
@@ -2522,9 +2465,9 @@ gst_rmdemux_parse_audio_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
     ret = gst_rmdemux_handle_scrambled_packet (rmdemux, stream, buffer, key);
   } else {
     GST_LOG_OBJECT (rmdemux,
-        "Pushing buffer of size %d, timestamp %" GST_TIME_FORMAT "to pad %s",
-        GST_BUFFER_SIZE (buffer), GST_TIME_ARGS (timestamp),
-        GST_PAD_NAME (stream->pad));
+        "Pushing buffer of size %" G_GSIZE_FORMAT ", timestamp %"
+        GST_TIME_FORMAT "to pad %s", gst_buffer_get_size (buffer),
+        GST_TIME_ARGS (timestamp), GST_PAD_NAME (stream->pad));
 
     if (stream->discont) {
       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
@@ -2533,9 +2476,19 @@ gst_rmdemux_parse_audio_packet (GstRMDemux * rmdemux, GstRMDemuxStream * stream,
     ret = gst_pad_push (stream->pad, buffer);
   }
 
+done:
   gst_buffer_unref (in);
 
   return ret;
+
+  /* ERRORS */
+not_enough_data:
+  {
+    GST_ELEMENT_WARNING (rmdemux, STREAM, DECODE, ("Skipping bad packet."),
+        (NULL));
+    ret = GST_FLOW_OK;
+    goto done;
+  }
 }
 
 static GstFlowReturn
@@ -2543,16 +2496,21 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, GstBuffer * in, guint16 version)
 {
   guint16 id;
   GstRMDemuxStream *stream;
-  guint size;
+  gsize size, offset;
   GstFlowReturn cret, ret;
   GstClockTime timestamp;
   gboolean key;
-  guint8 *data, *base;
+  GstMapInfo map;
+  guint8 *data;
   guint8 flags;
   guint32 ts;
 
-  base = data = GST_BUFFER_DATA (in);
-  size = GST_BUFFER_SIZE (in);
+  gst_buffer_map (in, &map, GST_MAP_READ);
+  data = map.data;
+  size = map.size;
+
+  if (size < 4 + 6 + 1 + 2)
+    goto not_enough_data;
 
   /* stream number */
   id = RMDEMUX_GUINT16_GET (data);
@@ -2565,10 +2523,10 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, GstBuffer * in, guint16 version)
   ts = RMDEMUX_GUINT32_GET (data + 2);
   timestamp = ts * GST_MSECOND;
 
-  gst_segment_set_last_stop (&rmdemux->segment, GST_FORMAT_TIME, timestamp);
+  rmdemux->segment.position = timestamp;
 
   GST_LOG_OBJECT (rmdemux, "Parsing a packet for stream=%d, timestamp=%"
-      GST_TIME_FORMAT ", size %u, version=%d, ts=%u", id,
+      GST_TIME_FORMAT ", size %" G_GSIZE_FORMAT ", version=%d, ts=%u", id,
       GST_TIME_ARGS (timestamp), size, version, ts);
 
   if (rmdemux->first_ts == GST_CLOCK_TIME_NONE) {
@@ -2589,54 +2547,67 @@ gst_rmdemux_parse_packet (GstRMDemux * rmdemux, GstBuffer * in, guint16 version)
 
   /* version 1 has an extra byte */
   if (version == 1) {
+    if (size < 1)
+      goto not_enough_data;
+
     data += 1;
     size -= 1;
   }
+  offset = data - map.data;
+  gst_buffer_unmap (in, &map);
+
   key = (flags & 0x02) != 0;
   GST_DEBUG_OBJECT (rmdemux, "flags %d, Keyframe %d", flags, key);
 
   if (rmdemux->need_newsegment) {
     GstEvent *event;
 
-    event = gst_event_new_new_segment (FALSE, rmdemux->segment.rate,
-        rmdemux->segment.format, rmdemux->segment.start,
-        rmdemux->segment.stop, rmdemux->segment.time);
+    event = gst_event_new_segment (&rmdemux->segment);
 
     GST_DEBUG_OBJECT (rmdemux, "sending NEWSEGMENT event, segment.start= %"
         GST_TIME_FORMAT, GST_TIME_ARGS (rmdemux->segment.start));
 
     gst_rmdemux_send_event (rmdemux, event);
     rmdemux->need_newsegment = FALSE;
+
+    if (rmdemux->pending_tags != NULL) {
+      gst_rmdemux_send_event (rmdemux,
+          gst_event_new_tag (rmdemux->pending_tags));
+      rmdemux->pending_tags = NULL;
+    }
   }
 
   if (stream->pending_tags != NULL) {
     GST_LOG_OBJECT (stream->pad, "tags %" GST_PTR_FORMAT, stream->pending_tags);
-    gst_element_found_tags_for_pad (GST_ELEMENT_CAST (rmdemux), stream->pad,
-        stream->pending_tags);
+    gst_pad_push_event (stream->pad, gst_event_new_tag (stream->pending_tags));
     stream->pending_tags = NULL;
   }
 
   if ((rmdemux->offset + size) <= stream->seek_offset) {
     GST_DEBUG_OBJECT (rmdemux,
-        "Stream %d is skipping: seek_offset=%d, offset=%d, size=%u",
-        stream->id, stream->seek_offset, rmdemux->offset, size);
+        "Stream %d is skipping: seek_offset=%d, offset=%d, size=%"
+        G_GSIZE_FORMAT, stream->id, stream->seek_offset, rmdemux->offset, size);
     cret = GST_FLOW_OK;
+    gst_buffer_unref (in);
     goto beach;
   }
 
   /* do special headers */
   if (stream->subtype == GST_RMDEMUX_STREAM_VIDEO) {
     ret =
-        gst_rmdemux_parse_video_packet (rmdemux, stream, in, data - base,
+        gst_rmdemux_parse_video_packet (rmdemux, stream, in, offset,
         version, timestamp, key);
   } else if (stream->subtype == GST_RMDEMUX_STREAM_AUDIO) {
     ret =
-        gst_rmdemux_parse_audio_packet (rmdemux, stream, in, data - base,
+        gst_rmdemux_parse_audio_packet (rmdemux, stream, in, offset,
         version, timestamp, key);
-  } else
+  } else {
+    gst_buffer_unref (in);
     ret = GST_FLOW_OK;
+  }
 
-  cret = gst_rmdemux_combine_flows (rmdemux, stream, ret);
+  cret = gst_flow_combiner_update_pad_flow (rmdemux->flowcombiner, stream->pad,
+      ret);
 
 beach:
   return cret;
@@ -2646,6 +2617,18 @@ unknown_stream:
   {
     GST_WARNING_OBJECT (rmdemux, "No stream for stream id %d in parsing "
         "data packet", id);
+    gst_buffer_unmap (in, &map);
+    gst_buffer_unref (in);
+    return GST_FLOW_OK;
+  }
+
+  /* ERRORS */
+not_enough_data:
+  {
+    GST_ELEMENT_WARNING (rmdemux, STREAM, DECODE, ("Skipping bad packet."),
+        (NULL));
+    gst_buffer_unmap (in, &map);
+    gst_buffer_unref (in);
     return GST_FLOW_OK;
   }
 }