flv: flvmux ported to the GstAggregator
authorVincent Penquerc'h <vincent.penquerch@collabora.com>
Wed, 20 Dec 2017 20:23:26 +0000 (15:23 -0500)
committerOlivier Crête <olivier.crete@collabora.com>
Wed, 20 Dec 2017 20:34:11 +0000 (15:34 -0500)
This makes it possible to create a flv file from a live source and not stop
when there are packet drops.

https://bugzilla.gnome.org/show_bug.cgi?id=782920

gst/flv/gstflvmux.c
gst/flv/gstflvmux.h

index 89ae86d..a4e151d 100644 (file)
@@ -1,6 +1,9 @@
 /* GStreamer
  *
  * Copyright (c) 2008,2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
+ * Copyright (c) 2008-2017 Collabora Ltd
+ *  @author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
+ *  @author: Vincent Penquerc'h <vincent.penquerch@collabora.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -86,38 +89,75 @@ static GstStaticPadTemplate audiosink_templ = GST_STATIC_PAD_TEMPLATE ("audio",
         "audio/x-speex, channels = (int) 1, rate = (int) 16000;")
     );
 
+G_DEFINE_TYPE (GstFlvMuxPad, gst_flv_mux_pad, GST_TYPE_AGGREGATOR_PAD);
+
 #define gst_flv_mux_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstFlvMux, gst_flv_mux, GST_TYPE_ELEMENT,
+G_DEFINE_TYPE_WITH_CODE (GstFlvMux, gst_flv_mux, GST_TYPE_AGGREGATOR,
     G_IMPLEMENT_INTERFACE (GST_TYPE_TAG_SETTER, NULL));
 
-static void gst_flv_mux_finalize (GObject * object);
 static GstFlowReturn
-gst_flv_mux_handle_buffer (GstCollectPads * pads, GstCollectData * cdata,
-    GstBuffer * buf, gpointer user_data);
+gst_flv_mux_aggregate (GstAggregator * aggregator, gboolean timeout);
 static gboolean
-gst_flv_mux_handle_sink_event (GstCollectPads * pads, GstCollectData * data,
-    GstEvent * event, gpointer user_data);
-
-static gboolean gst_flv_mux_handle_src_event (GstPad * pad, GstObject * parent,
+gst_flv_mux_sink_event (GstAggregator * aggregator, GstAggregatorPad * pad,
     GstEvent * event);
-static GstPad *gst_flv_mux_request_new_pad (GstElement * element,
+
+static GstAggregatorPad *gst_flv_mux_create_new_pad (GstAggregator * agg,
     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps);
 static void gst_flv_mux_release_pad (GstElement * element, GstPad * pad);
 
-static gboolean gst_flv_mux_video_pad_setcaps (GstPad * pad, GstCaps * caps);
-static gboolean gst_flv_mux_audio_pad_setcaps (GstPad * pad, GstCaps * caps);
+static gboolean gst_flv_mux_video_pad_setcaps (GstFlvMuxPad * pad,
+    GstCaps * caps);
+static gboolean gst_flv_mux_audio_pad_setcaps (GstFlvMuxPad * pad,
+    GstCaps * caps);
 
 static void gst_flv_mux_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
 static void gst_flv_mux_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec);
-
-static GstStateChangeReturn
-gst_flv_mux_change_state (GstElement * element, GstStateChange transition);
+static void gst_flv_mux_finalize (GObject * object);
 
 static void gst_flv_mux_reset (GstElement * element);
-static void gst_flv_mux_reset_pad (GstFlvMux * mux, GstFlvPad * pad,
-    gboolean video);
+static void gst_flv_mux_reset_pad (GstFlvMuxPad * pad);
+
+static void gst_flv_mux_pad_finalize (GObject * object);
+
+static gboolean gst_flv_mux_start (GstAggregator * aggregator);
+static GstFlowReturn gst_flv_mux_flush (GstAggregator * aggregator);
+static GstClockTime gst_flv_mux_get_next_time (GstAggregator * aggregator);
+static GstFlowReturn gst_flv_mux_write_eos (GstFlvMux * mux);
+static GstFlowReturn gst_flv_mux_write_header (GstFlvMux * mux);
+static GstFlowReturn gst_flv_mux_rewrite_header (GstFlvMux * mux);
+static gboolean gst_flv_mux_are_all_pads_eos (GstFlvMux * mux);
+
+
+static GstFlowReturn
+gst_flv_mux_pad_flush (GstAggregatorPad * pad, GstAggregator * aggregator)
+{
+  GstFlvMuxPad *flvpad = GST_FLV_MUX_PAD (pad);
+
+  flvpad->last_timestamp = 0;
+  flvpad->pts = GST_CLOCK_STIME_NONE;
+  flvpad->dts = GST_CLOCK_STIME_NONE;
+
+  return GST_FLOW_OK;
+}
+
+static void
+gst_flv_mux_pad_class_init (GstFlvMuxPadClass * klass)
+{
+  GstAggregatorPadClass *aggregatorpad_class = (GstAggregatorPadClass *) klass;
+  GObjectClass *gobject_class = (GObjectClass *) klass;
+
+  gobject_class->finalize = gst_flv_mux_pad_finalize;
+
+  aggregatorpad_class->flush = GST_DEBUG_FUNCPTR (gst_flv_mux_pad_flush);
+}
+
+static void
+gst_flv_mux_pad_init (GstFlvMuxPad * pad)
+{
+  gst_flv_mux_reset_pad (pad);
+}
 
 typedef struct
 {
@@ -154,30 +194,18 @@ _gst_buffer_new_and_alloc (gsize size, GstBuffer ** buffer, guint8 ** data)
   *buffer = _gst_buffer_new_wrapped (*data, size, g_free);
 }
 
-static GstFlowReturn
-gst_flv_mux_clip_running_time (GstCollectPads * pads,
-    GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
-    gpointer user_data)
-{
-  buf = gst_buffer_make_writable (buf);
-
-  if (!GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf)))
-    GST_BUFFER_PTS (buf) = GST_BUFFER_DTS (buf);
-
-  return gst_collect_pads_clip_running_time (pads, cdata, buf, outbuf,
-      user_data);
-}
-
 static void
 gst_flv_mux_class_init (GstFlvMuxClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
+  GstAggregatorClass *gstaggregator_class;
 
   GST_DEBUG_CATEGORY_INIT (flvmux_debug, "flvmux", 0, "FLV muxer");
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
+  gstaggregator_class = (GstAggregatorClass *) klass;
 
   gobject_class->get_property = gst_flv_mux_get_property;
   gobject_class->set_property = gst_flv_mux_set_property;
@@ -201,15 +229,21 @@ gst_flv_mux_class_init (GstFlvMuxClass * klass)
           "The value of metadatacreator in the meta packet.",
           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_flv_mux_change_state);
-  gstelement_class->request_new_pad =
-      GST_DEBUG_FUNCPTR (gst_flv_mux_request_new_pad);
+  gstaggregator_class->create_new_pad =
+      GST_DEBUG_FUNCPTR (gst_flv_mux_create_new_pad);
   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_flv_mux_release_pad);
 
-  gst_element_class_add_static_pad_template (gstelement_class,
-      &videosink_templ);
-  gst_element_class_add_static_pad_template (gstelement_class,
-      &audiosink_templ);
+  gstaggregator_class->start = GST_DEBUG_FUNCPTR (gst_flv_mux_start);
+  gstaggregator_class->aggregate = GST_DEBUG_FUNCPTR (gst_flv_mux_aggregate);
+  gstaggregator_class->sink_event = GST_DEBUG_FUNCPTR (gst_flv_mux_sink_event);
+  gstaggregator_class->flush = GST_DEBUG_FUNCPTR (gst_flv_mux_flush);
+  gstaggregator_class->get_next_time =
+      GST_DEBUG_FUNCPTR (gst_flv_mux_get_next_time);
+
+  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+      &videosink_templ, GST_TYPE_FLV_MUX_PAD);
+  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+      &audiosink_templ, GST_TYPE_FLV_MUX_PAD);
   gst_element_class_add_static_pad_template (gstelement_class, &src_templ);
   gst_element_class_set_static_metadata (gstelement_class, "FLV muxer",
       "Codec/Muxer",
@@ -222,9 +256,7 @@ gst_flv_mux_class_init (GstFlvMuxClass * klass)
 static void
 gst_flv_mux_init (GstFlvMux * mux)
 {
-  mux->srcpad = gst_pad_new_from_static_template (&src_templ, "src");
-  gst_pad_set_event_function (mux->srcpad, gst_flv_mux_handle_src_event);
-  gst_element_add_pad (GST_ELEMENT (mux), mux->srcpad);
+  mux->srcpad = GST_AGGREGATOR_CAST (mux)->srcpad;
 
   /* property */
   mux->streamable = DEFAULT_STREAMABLE;
@@ -232,14 +264,6 @@ gst_flv_mux_init (GstFlvMux * mux)
 
   mux->new_tags = FALSE;
 
-  mux->collect = gst_collect_pads_new ();
-  gst_collect_pads_set_buffer_function (mux->collect,
-      GST_DEBUG_FUNCPTR (gst_flv_mux_handle_buffer), mux);
-  gst_collect_pads_set_event_function (mux->collect,
-      GST_DEBUG_FUNCPTR (gst_flv_mux_handle_sink_event), mux);
-  gst_collect_pads_set_clip_function (mux->collect,
-      GST_DEBUG_FUNCPTR (gst_flv_mux_clip_running_time), mux);
-
   gst_flv_mux_reset (GST_ELEMENT (mux));
 }
 
@@ -248,30 +272,49 @@ gst_flv_mux_finalize (GObject * object)
 {
   GstFlvMux *mux = GST_FLV_MUX (object);
 
-  gst_object_unref (mux->collect);
+  gst_flv_mux_reset (GST_ELEMENT (object));
   g_free (mux->metadatacreator);
 
-  G_OBJECT_CLASS (parent_class)->finalize (object);
+  G_OBJECT_CLASS (gst_flv_mux_parent_class)->finalize (object);
 }
 
 static void
-gst_flv_mux_reset (GstElement * element)
+gst_flv_mux_pad_finalize (GObject * object)
 {
-  GstFlvMux *mux = GST_FLV_MUX (element);
-  GSList *sl;
+  GstFlvMuxPad *pad = GST_FLV_MUX_PAD (object);
 
-  for (sl = mux->collect->data; sl != NULL; sl = g_slist_next (sl)) {
-    GstFlvPad *cpad = (GstFlvPad *) sl->data;
+  gst_flv_mux_reset_pad (pad);
 
-    gst_flv_mux_reset_pad (mux, cpad, cpad->video);
-  }
+  G_OBJECT_CLASS (gst_flv_mux_pad_parent_class)->finalize (object);
+}
+
+static GstFlowReturn
+gst_flv_mux_flush (GstAggregator * aggregator)
+{
+  /* TODO: What is the right behaviour on flush? Should we just ignore it ?
+   * This still needs to be defined. */
+
+  gst_flv_mux_reset (GST_ELEMENT (aggregator));
+  return GST_FLOW_OK;
+}
+
+static gboolean
+gst_flv_mux_start (GstAggregator * aggregator)
+{
+  gst_flv_mux_reset (GST_ELEMENT (aggregator));
+  return TRUE;
+}
+
+static void
+gst_flv_mux_reset (GstElement * element)
+{
+  GstFlvMux *mux = GST_FLV_MUX (element);
 
   g_list_foreach (mux->index, (GFunc) gst_flv_mux_index_entry_free, NULL);
   g_list_free (mux->index);
   mux->index = NULL;
   mux->byte_count = 0;
 
-  mux->have_audio = mux->have_video = FALSE;
   mux->duration = GST_CLOCK_TIME_NONE;
   mux->new_tags = FALSE;
   mux->first_timestamp = GST_CLOCK_STIME_NONE;
@@ -282,46 +325,27 @@ gst_flv_mux_reset (GstElement * element)
   gst_tag_setter_reset_tags (GST_TAG_SETTER (mux));
 }
 
-static gboolean
-gst_flv_mux_handle_src_event (GstPad * pad, GstObject * parent,
-    GstEvent * event)
-{
-  GstEventType type;
-
-  type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
-
-  switch (type) {
-    case GST_EVENT_SEEK:
-      /* disable seeking for now */
-      return FALSE;
-    default:
-      break;
-  }
-
-  return gst_pad_event_default (pad, parent, event);
-}
-
 /* Extract per-codec relevant tags for
  * insertion into the metadata later - ie bitrate,
  * but maybe others in the future */
 static void
 gst_flv_mux_store_codec_tags (GstFlvMux * mux,
-    GstFlvPad * flvpad, GstTagList * list)
+    GstFlvMuxPad * flvpad, GstTagList * list)
 {
   /* Look for a bitrate as either nominal or actual bitrate tag */
-  if (gst_tag_list_get_uint (list, GST_TAG_NOMINAL_BITRATE, &flvpad->bitrate) ||
-      gst_tag_list_get_uint (list, GST_TAG_BITRATE, &flvpad->bitrate)) {
+  if (gst_tag_list_get_uint (list, GST_TAG_NOMINAL_BITRATE, &flvpad->bitrate)
+      || gst_tag_list_get_uint (list, GST_TAG_BITRATE, &flvpad->bitrate)) {
     GST_DEBUG_OBJECT (mux, "Stored bitrate for pad %" GST_PTR_FORMAT " = %u",
         flvpad, flvpad->bitrate);
   }
 }
 
 static gboolean
-gst_flv_mux_handle_sink_event (GstCollectPads * pads, GstCollectData * data,
-    GstEvent * event, gpointer user_data)
+gst_flv_mux_sink_event (GstAggregator * aggregator, GstAggregatorPad * pad,
+    GstEvent * event)
 {
-  GstFlvMux *mux = GST_FLV_MUX (user_data);
-  GstFlvPad *flvpad = (GstFlvPad *) data;
+  GstFlvMux *mux = GST_FLV_MUX (aggregator);
+  GstFlvMuxPad *flvpad = (GstFlvMuxPad *) pad;
   gboolean ret = TRUE;
 
   switch (GST_EVENT_TYPE (event)) {
@@ -331,17 +355,13 @@ gst_flv_mux_handle_sink_event (GstCollectPads * pads, GstCollectData * data,
 
       gst_event_parse_caps (event, &caps);
 
-      /* find stream data */
-      g_assert (flvpad);
-
-      if (flvpad->video) {
-        ret = gst_flv_mux_video_pad_setcaps (data->pad, caps);
+      if (mux->video_pad == flvpad) {
+        ret = gst_flv_mux_video_pad_setcaps (flvpad, caps);
+      } else if (mux->audio_pad == flvpad) {
+        ret = gst_flv_mux_audio_pad_setcaps (flvpad, caps);
       } else {
-        ret = gst_flv_mux_audio_pad_setcaps (data->pad, caps);
+        g_assert_not_reached ();
       }
-      /* and eat */
-      gst_event_unref (event);
-      event = NULL;
       break;
     }
     case GST_EVENT_TAG:{
@@ -354,40 +374,38 @@ gst_flv_mux_handle_sink_event (GstCollectPads * pads, GstCollectData * data,
       gst_flv_mux_store_codec_tags (mux, flvpad, list);
       mux->new_tags = TRUE;
       ret = TRUE;
-      gst_event_unref (event);
-      event = NULL;
       break;
     }
     default:
       break;
   }
 
-  if (event != NULL)
-    return gst_collect_pads_event_default (pads, data, event, FALSE);
+  if (!ret)
+    return FALSE;
 
-  return ret;
+  return GST_AGGREGATOR_CLASS (parent_class)->sink_event (aggregator, pad,
+      event);;
 }
 
 static gboolean
-gst_flv_mux_video_pad_setcaps (GstPad * pad, GstCaps * caps)
+gst_flv_mux_video_pad_setcaps (GstFlvMuxPad * pad, GstCaps * caps)
 {
   GstFlvMux *mux = GST_FLV_MUX (gst_pad_get_parent (pad));
-  GstFlvPad *cpad = (GstFlvPad *) gst_pad_get_element_private (pad);
   gboolean ret = TRUE;
   GstStructure *s;
 
   s = gst_caps_get_structure (caps, 0);
 
   if (strcmp (gst_structure_get_name (s), "video/x-flash-video") == 0) {
-    cpad->video_codec = 2;
+    pad->codec = 2;
   } else if (strcmp (gst_structure_get_name (s), "video/x-flash-screen") == 0) {
-    cpad->video_codec = 3;
+    pad->codec = 3;
   } else if (strcmp (gst_structure_get_name (s), "video/x-vp6-flash") == 0) {
-    cpad->video_codec = 4;
+    pad->codec = 4;
   } else if (strcmp (gst_structure_get_name (s), "video/x-vp6-alpha") == 0) {
-    cpad->video_codec = 5;
+    pad->codec = 5;
   } else if (strcmp (gst_structure_get_name (s), "video/x-h264") == 0) {
-    cpad->video_codec = 7;
+    pad->codec = 7;
   } else {
     ret = FALSE;
   }
@@ -396,7 +414,7 @@ gst_flv_mux_video_pad_setcaps (GstPad * pad, GstCaps * caps)
     const GValue *val = gst_structure_get_value (s, "codec_data");
 
     if (val)
-      cpad->video_codec_data = gst_buffer_ref (gst_value_get_buffer (val));
+      pad->codec_data = gst_buffer_ref (gst_value_get_buffer (val));
   }
 
   gst_object_unref (mux);
@@ -405,10 +423,9 @@ gst_flv_mux_video_pad_setcaps (GstPad * pad, GstCaps * caps)
 }
 
 static gboolean
-gst_flv_mux_audio_pad_setcaps (GstPad * pad, GstCaps * caps)
+gst_flv_mux_audio_pad_setcaps (GstFlvMuxPad * pad, GstCaps * caps)
 {
   GstFlvMux *mux = GST_FLV_MUX (gst_pad_get_parent (pad));
-  GstFlvPad *cpad = (GstFlvPad *) gst_pad_get_element_private (pad);
   gboolean ret = TRUE;
   GstStructure *s;
 
@@ -417,7 +434,7 @@ gst_flv_mux_audio_pad_setcaps (GstPad * pad, GstCaps * caps)
   if (strcmp (gst_structure_get_name (s), "audio/x-adpcm") == 0) {
     const gchar *layout = gst_structure_get_string (s, "layout");
     if (layout && strcmp (layout, "swf") == 0) {
-      cpad->audio_codec = 1;
+      pad->codec = 1;
     } else {
       ret = FALSE;
     }
@@ -432,14 +449,14 @@ gst_flv_mux_audio_pad_setcaps (GstPad * pad, GstCaps * caps)
           gint rate;
 
           if (gst_structure_get_int (s, "rate", &rate) && rate == 8000)
-            cpad->audio_codec = 14;
+            pad->codec = 14;
           else
-            cpad->audio_codec = 2;
+            pad->codec = 2;
         } else {
           ret = FALSE;
         }
       } else if (mpegversion == 4 || mpegversion == 2) {
-        cpad->audio_codec = 10;
+        pad->codec = 10;
       } else {
         ret = FALSE;
       }
@@ -452,34 +469,34 @@ gst_flv_mux_audio_pad_setcaps (GstPad * pad, GstCaps * caps)
     if (gst_structure_get_int (s, "rate", &rate)
         && gst_structure_get_int (s, "channels", &channels)) {
       if (channels == 1 && rate == 16000)
-        cpad->audio_codec = 4;
+        pad->codec = 4;
       else if (channels == 1 && rate == 8000)
-        cpad->audio_codec = 5;
+        pad->codec = 5;
       else
-        cpad->audio_codec = 6;
+        pad->codec = 6;
     } else {
-      cpad->audio_codec = 6;
+      pad->codec = 6;
     }
   } else if (strcmp (gst_structure_get_name (s), "audio/x-raw") == 0) {
     GstAudioInfo info;
 
     if (gst_audio_info_from_caps (&info, caps)) {
-      cpad->audio_codec = 3;
+      pad->codec = 3;
 
       if (GST_AUDIO_INFO_WIDTH (&info) == 8)
-        cpad->width = 0;
+        pad->width = 0;
       else if (GST_AUDIO_INFO_WIDTH (&info) == 16)
-        cpad->width = 1;
+        pad->width = 1;
       else
         ret = FALSE;
     } else
       ret = FALSE;
   } else if (strcmp (gst_structure_get_name (s), "audio/x-alaw") == 0) {
-    cpad->audio_codec = 7;
+    pad->codec = 7;
   } else if (strcmp (gst_structure_get_name (s), "audio/x-mulaw") == 0) {
-    cpad->audio_codec = 8;
+    pad->codec = 8;
   } else if (strcmp (gst_structure_get_name (s), "audio/x-speex") == 0) {
-    cpad->audio_codec = 11;
+    pad->codec = 11;
   } else {
     ret = FALSE;
   }
@@ -488,60 +505,57 @@ gst_flv_mux_audio_pad_setcaps (GstPad * pad, GstCaps * caps)
     gint rate, channels;
 
     if (gst_structure_get_int (s, "rate", &rate)) {
-      if (cpad->audio_codec == 10)
-        cpad->rate = 3;
+      if (pad->codec == 10)
+        pad->rate = 3;
       else if (rate == 5512)
-        cpad->rate = 0;
+        pad->rate = 0;
       else if (rate == 11025)
-        cpad->rate = 1;
+        pad->rate = 1;
       else if (rate == 22050)
-        cpad->rate = 2;
+        pad->rate = 2;
       else if (rate == 44100)
-        cpad->rate = 3;
-      else if (rate == 8000 && (cpad->audio_codec == 5
-              || cpad->audio_codec == 14))
-        cpad->rate = 0;
-      else if (rate == 16000 && (cpad->audio_codec == 4
-              || cpad->audio_codec == 11))
-        cpad->rate = 0;
+        pad->rate = 3;
+      else if (rate == 8000 && (pad->codec == 5 || pad->codec == 14))
+        pad->rate = 0;
+      else if (rate == 16000 && (pad->codec == 4 || pad->codec == 11))
+        pad->rate = 0;
       else
         ret = FALSE;
-    } else if (cpad->audio_codec == 10) {
-      cpad->rate = 3;
+    } else if (pad->codec == 10) {
+      pad->rate = 3;
     } else {
       ret = FALSE;
     }
 
     if (gst_structure_get_int (s, "channels", &channels)) {
-      if (cpad->audio_codec == 4 || cpad->audio_codec == 5
-          || cpad->audio_codec == 6 || cpad->audio_codec == 11)
-        cpad->channels = 0;
-      else if (cpad->audio_codec == 10)
-        cpad->channels = 1;
+      if (pad->codec == 4 || pad->codec == 5
+          || pad->codec == 6 || pad->codec == 11)
+        pad->channels = 0;
+      else if (pad->codec == 10)
+        pad->channels = 1;
       else if (channels == 1)
-        cpad->channels = 0;
+        pad->channels = 0;
       else if (channels == 2)
-        cpad->channels = 1;
+        pad->channels = 1;
       else
         ret = FALSE;
-    } else if (cpad->audio_codec == 4 || cpad->audio_codec == 5
-        || cpad->audio_codec == 6) {
-      cpad->channels = 0;
-    } else if (cpad->audio_codec == 10) {
-      cpad->channels = 1;
+    } else if (pad->codec == 4 || pad->codec == 5 || pad->codec == 6) {
+      pad->channels = 0;
+    } else if (pad->codec == 10) {
+      pad->channels = 1;
     } else {
       ret = FALSE;
     }
 
-    if (cpad->audio_codec != 3)
-      cpad->width = 1;
+    if (pad->codec != 3)
+      pad->width = 1;
   }
 
   if (ret && gst_structure_has_field (s, "codec_data")) {
     const GValue *val = gst_structure_get_value (s, "codec_data");
 
     if (val)
-      cpad->audio_codec_data = gst_buffer_ref (gst_value_get_buffer (val));
+      pad->codec_data = gst_buffer_ref (gst_value_get_buffer (val));
   }
 
   gst_object_unref (mux);
@@ -550,35 +564,29 @@ gst_flv_mux_audio_pad_setcaps (GstPad * pad, GstCaps * caps)
 }
 
 static void
-gst_flv_mux_reset_pad (GstFlvMux * mux, GstFlvPad * cpad, gboolean video)
+gst_flv_mux_reset_pad (GstFlvMuxPad * pad)
 {
-  cpad->video = video;
-
-  if (cpad->audio_codec_data)
-    gst_buffer_unref (cpad->audio_codec_data);
-  cpad->audio_codec_data = NULL;
-  cpad->audio_codec = G_MAXUINT;
-  cpad->rate = G_MAXUINT;
-  cpad->width = G_MAXUINT;
-  cpad->channels = G_MAXUINT;
-
-  if (cpad->video_codec_data)
-    gst_buffer_unref (cpad->video_codec_data);
-  cpad->video_codec_data = NULL;
-  cpad->video_codec = G_MAXUINT;
-  cpad->last_timestamp = 0;
-  cpad->pts = GST_CLOCK_STIME_NONE;
-  cpad->dts = GST_CLOCK_STIME_NONE;
+  GST_DEBUG_OBJECT (pad, "resetting pad");
+
+  if (pad->codec_data)
+    gst_buffer_unref (pad->codec_data);
+  pad->codec_data = NULL;
+  pad->codec = G_MAXUINT;
+  pad->rate = G_MAXUINT;
+  pad->width = G_MAXUINT;
+  pad->channels = G_MAXUINT;
+
+  gst_flv_mux_pad_flush (GST_AGGREGATOR_PAD_CAST (pad), NULL);
 }
 
-static GstPad *
-gst_flv_mux_request_new_pad (GstElement * element,
+static GstAggregatorPad *
+gst_flv_mux_create_new_pad (GstAggregator * agg,
     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
 {
-  GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
-  GstFlvMux *mux = GST_FLV_MUX (element);
-  GstFlvPad *cpad;
-  GstPad *pad = NULL;
+  GstElementClass *klass = GST_ELEMENT_GET_CLASS (agg);
+  GstAggregatorPad *aggpad;
+  GstFlvMux *mux = GST_FLV_MUX (agg);
+  GstFlvMuxPad *pad = NULL;
   const gchar *name = NULL;
   gboolean video;
 
@@ -588,19 +596,17 @@ gst_flv_mux_request_new_pad (GstElement * element,
   }
 
   if (templ == gst_element_class_get_pad_template (klass, "audio")) {
-    if (mux->have_audio) {
+    if (mux->audio_pad) {
       GST_WARNING_OBJECT (mux, "Already have an audio pad");
       return NULL;
     }
-    mux->have_audio = TRUE;
     name = "audio";
     video = FALSE;
   } else if (templ == gst_element_class_get_pad_template (klass, "video")) {
-    if (mux->have_video) {
+    if (mux->video_pad) {
       GST_WARNING_OBJECT (mux, "Already have a video pad");
       return NULL;
     }
-    mux->have_video = TRUE;
     name = "video";
     video = TRUE;
   } else {
@@ -608,28 +614,41 @@ gst_flv_mux_request_new_pad (GstElement * element,
     return NULL;
   }
 
-  pad = gst_pad_new_from_template (templ, name);
-  cpad = (GstFlvPad *) gst_collect_pads_add_pad (mux->collect, pad,
-      sizeof (GstFlvPad), NULL, TRUE);
+  aggpad =
+      GST_AGGREGATOR_CLASS (gst_flv_mux_parent_class)->create_new_pad (agg,
+      templ, name, caps);
+  if (aggpad == NULL)
+    return NULL;
 
-  cpad->audio_codec_data = NULL;
-  cpad->video_codec_data = NULL;
-  gst_flv_mux_reset_pad (mux, cpad, video);
+  pad = GST_FLV_MUX_PAD (aggpad);
 
-  gst_pad_set_active (pad, TRUE);
-  gst_element_add_pad (element, pad);
+  gst_flv_mux_reset_pad (pad);
 
-  return pad;
+  if (video)
+    mux->video_pad = pad;
+  else
+    mux->audio_pad = pad;
+
+  return aggpad;
 }
 
 static void
 gst_flv_mux_release_pad (GstElement * element, GstPad * pad)
 {
-  GstFlvMux *mux = GST_FLV_MUX (GST_PAD_PARENT (pad));
-  GstFlvPad *cpad = (GstFlvPad *) gst_pad_get_element_private (pad);
+  GstFlvMux *mux = GST_FLV_MUX (element);
+  GstFlvMuxPad *flvpad = GST_FLV_MUX_PAD (pad);
+
+  gst_pad_set_active (pad, FALSE);
+  gst_flv_mux_reset_pad (flvpad);
+
+  if (flvpad == mux->video_pad) {
+    mux->video_pad = NULL;
+  } else if (flvpad == mux->audio_pad) {
+    mux->audio_pad = NULL;
+  } else {
+    GST_WARNING_OBJECT (pad, "Pad is not known audio or video pad");
+  }
 
-  gst_flv_mux_reset_pad (mux, cpad, cpad->video);
-  gst_collect_pads_remove_pad (mux->collect, pad);
   gst_element_remove_pad (element, pad);
 }
 
@@ -640,7 +659,7 @@ gst_flv_mux_push (GstFlvMux * mux, GstBuffer * buffer)
    * total output size in bytes, but it doesn't matter at that point */
   mux->byte_count += gst_buffer_get_size (buffer);
 
-  return gst_pad_push (mux->srcpad, buffer);
+  return gst_aggregator_finish_buffer (GST_AGGREGATOR_CAST (mux), buffer);
 }
 
 static GstBuffer *
@@ -648,6 +667,8 @@ gst_flv_mux_create_header (GstFlvMux * mux)
 {
   GstBuffer *header;
   guint8 *data;
+  gboolean have_audio;
+  gboolean have_video;
 
   _gst_buffer_new_and_alloc (9 + 4, &header, &data);
 
@@ -656,7 +677,10 @@ gst_flv_mux_create_header (GstFlvMux * mux)
   data[2] = 'V';
   data[3] = 0x01;               /* Version */
 
-  data[4] = (mux->have_audio << 2) | mux->have_video;   /* flags */
+  have_audio = (mux->audio_pad && mux->audio_pad->codec != G_MAXUINT);
+  have_video = (mux->video_pad && mux->video_pad->codec != G_MAXUINT);
+
+  data[4] = (have_audio << 2) | have_video;     /* flags */
   GST_WRITE_UINT32_BE (data + 5, 9);    /* data offset */
   GST_WRITE_UINT32_BE (data + 9, 0);    /* previous tag size */
 
@@ -820,13 +844,13 @@ tags:
     goto end;
 
   if (mux->duration == GST_CLOCK_TIME_NONE) {
-    GSList *l;
+    GList *l;
     guint64 dur;
 
-    for (l = mux->collect->data; l; l = l->next) {
-      GstCollectData *cdata = l->data;
+    for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
+      GstFlvMuxPad *pad = GST_FLV_MUX_PAD (l->data);
 
-      if (gst_pad_peer_query_duration (cdata->pad, GST_FORMAT_TIME,
+      if (gst_pad_peer_query_duration (GST_PAD (pad), GST_FORMAT_TIME,
               (gint64 *) & dur) && dur != GST_CLOCK_TIME_NONE) {
         if (mux->duration == GST_CLOCK_TIME_NONE)
           mux->duration = dur;
@@ -849,23 +873,11 @@ tags:
     gst_buffer_unmap (script_tag, &map);
   }
 
-  if (mux->have_video) {
-    GstPad *video_pad = NULL;
+  if (mux->video_pad && mux->video_pad->codec != G_MAXUINT) {
     GstCaps *caps = NULL;
-    GstFlvPad *cpad;
-    GSList *l = mux->collect->data;
-
-    for (; l; l = l->next) {
-      cpad = l->data;
-      if (cpad && cpad->video) {
-        video_pad = cpad->collect.pad;
-        break;
-      }
-    }
 
-    if (video_pad) {
-      caps = gst_pad_get_current_caps (video_pad);
-    }
+    if (mux->video_pad)
+      caps = gst_pad_get_current_caps (GST_PAD (mux->video_pad));
 
     if (caps != NULL) {
       GstStructure *s;
@@ -873,10 +885,10 @@ tags:
       gint num, den;
 
       GST_DEBUG_OBJECT (mux, "putting videocodecid %d in the metadata",
-          cpad->video_codec);
+          mux->video_pad->codec);
 
       tmp = gst_flv_mux_create_number_script_value ("videocodecid",
-          cpad->video_codec);
+          mux->video_pad->codec);
       script_tag = gst_buffer_append (script_tag, tmp);
       tags_written++;
 
@@ -929,43 +941,29 @@ tags:
       }
 
       GST_DEBUG_OBJECT (mux, "putting videodatarate %u KB/s in the metadata",
-          cpad->bitrate / 1024);
+          mux->video_pad->bitrate / 1024);
       tmp = gst_flv_mux_create_number_script_value ("videodatarate",
-          cpad->bitrate / 1024);
+          mux->video_pad->bitrate / 1024);
       script_tag = gst_buffer_append (script_tag, tmp);
       tags_written++;
     }
   }
 
-  if (mux->have_audio) {
-    GstPad *audio_pad = NULL;
-    GstFlvPad *cpad;
-    GSList *l = mux->collect->data;
-
-    for (; l; l = l->next) {
-      cpad = l->data;
-      if (cpad && !cpad->video) {
-        audio_pad = cpad->collect.pad;
-        break;
-      }
-    }
-
-    if (audio_pad) {
-      GST_DEBUG_OBJECT (mux, "putting audiocodecid %d in the metadata",
-          cpad->audio_codec);
+  if (mux->audio_pad && mux->audio_pad->codec != G_MAXUINT) {
+    GST_DEBUG_OBJECT (mux, "putting audiocodecid %d in the metadata",
+        mux->audio_pad->codec);
 
-      tmp = gst_flv_mux_create_number_script_value ("audiocodecid",
-          cpad->audio_codec);
-      script_tag = gst_buffer_append (script_tag, tmp);
-      tags_written++;
+    tmp = gst_flv_mux_create_number_script_value ("audiocodecid",
+        mux->audio_pad->codec);
+    script_tag = gst_buffer_append (script_tag, tmp);
+    tags_written++;
 
-      GST_DEBUG_OBJECT (mux, "putting audiodatarate %u KB/s in the metadata",
-          cpad->bitrate / 1024);
-      tmp = gst_flv_mux_create_number_script_value ("audiodatarate",
-          cpad->bitrate / 1024);
-      script_tag = gst_buffer_append (script_tag, tmp);
-      tags_written++;
-    }
+    GST_DEBUG_OBJECT (mux, "putting audiodatarate %u KB/s in the metadata",
+        mux->audio_pad->bitrate / 1024);
+    tmp = gst_flv_mux_create_number_script_value ("audiodatarate",
+        mux->audio_pad->bitrate / 1024);
+    script_tag = gst_buffer_append (script_tag, tmp);
+    tags_written++;
   }
 
   _gst_buffer_new_and_alloc (2 + 15 + 1 + 2 + strlen (mux->metadatacreator),
@@ -1049,7 +1047,7 @@ exit:
 
 static GstBuffer *
 gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
-    GstFlvPad * cpad, gboolean is_codec_data)
+    GstFlvMuxPad * pad, gboolean is_codec_data)
 {
   GstBuffer *tag;
   GstMapInfo map;
@@ -1058,11 +1056,11 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
   guint8 *data, *bdata = NULL;
   gsize bsize = 0;
 
-  if (!GST_CLOCK_STIME_IS_VALID (cpad->dts)) {
-    pts = dts = cpad->last_timestamp / GST_MSECOND;
+  if (!GST_CLOCK_STIME_IS_VALID (pad->dts)) {
+    pts = dts = pad->last_timestamp / GST_MSECOND;
   } else {
-    pts = cpad->pts / GST_MSECOND;
-    dts = cpad->dts / GST_MSECOND;
+    pts = pad->pts / GST_MSECOND;
+    dts = pad->dts / GST_MSECOND;
   }
 
   /* Be safe in case TS are buggy */
@@ -1077,7 +1075,7 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
     pts = dts + cts;
   }
 
-  GST_LOG_OBJECT (mux, "got pts %i dts %i cts %i\n", pts, dts, cts);
+  GST_LOG_OBJECT (mux, "got pts %i dts %i cts %i", pts, dts, cts);
 
   if (buffer != NULL) {
     gst_buffer_map (buffer, &map, GST_MAP_READ);
@@ -1086,15 +1084,15 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
   }
 
   size = 11;
-  if (cpad->video) {
+  if (mux->video_pad == pad) {
     size += 1;
-    if (cpad->video_codec == 7)
+    if (pad->codec == 7)
       size += 4 + bsize;
     else
       size += bsize;
   } else {
     size += 1;
-    if (cpad->audio_codec == 10)
+    if (pad->codec == 10)
       size += 1 + bsize;
     else
       size += bsize;
@@ -1104,7 +1102,7 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
   _gst_buffer_new_and_alloc (size, &tag, &data);
   memset (data, 0, size);
 
-  data[0] = (cpad->video) ? 9 : 8;
+  data[0] = (mux->video_pad == pad) ? 9 : 8;
 
   data[1] = ((size - 11 - 4) >> 16) & 0xff;
   data[2] = ((size - 11 - 4) >> 8) & 0xff;
@@ -1115,15 +1113,15 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
 
   data[8] = data[9] = data[10] = 0;
 
-  if (cpad->video) {
+  if (mux->video_pad == pad) {
     if (buffer && GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
       data[11] |= 2 << 4;
     else
       data[11] |= 1 << 4;
 
-    data[11] |= cpad->video_codec & 0x0f;
+    data[11] |= pad->codec & 0x0f;
 
-    if (cpad->video_codec == 7) {
+    if (pad->codec == 7) {
       if (is_codec_data) {
         data[12] = 0;
         GST_WRITE_UINT24_BE (data + 13, 0);
@@ -1141,16 +1139,16 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
       memcpy (data + 11 + 1, bdata, bsize);
     }
   } else {
-    data[11] |= (cpad->audio_codec << 4) & 0xf0;
-    data[11] |= (cpad->rate << 2) & 0x0c;
-    data[11] |= (cpad->width << 1) & 0x02;
-    data[11] |= (cpad->channels << 0) & 0x01;
+    data[11] |= (pad->codec << 4) & 0xf0;
+    data[11] |= (pad->rate << 2) & 0x0c;
+    data[11] |= (pad->width << 1) & 0x02;
+    data[11] |= (pad->channels << 0) & 0x01;
 
     GST_DEBUG_OBJECT (mux, "Creating byte %02x with "
-        "audio_codec:%d, rate:%d, width:%d, channels:%d",
-        data[11], cpad->audio_codec, cpad->rate, cpad->width, cpad->channels);
+        "codec:%d, rate:%d, width:%d, channels:%d",
+        data[11], pad->codec, pad->rate, pad->width, pad->channels);
 
-    if (cpad->audio_codec == 10) {
+    if (pad->codec == 10) {
       data[12] = is_codec_data ? 0 : 1;
 
       memcpy (data + 11 + 1 + 1, bdata, bsize);
@@ -1182,7 +1180,7 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
 
     /* mark the buffer if it's an audio buffer and there's also video being muxed
      * or it's a video interframe */
-    if ((mux->have_video && !cpad->video) ||
+    if (mux->video_pad == pad &&
         GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
       GST_BUFFER_FLAG_SET (tag, GST_BUFFER_FLAG_DELTA_UNIT);
   } else {
@@ -1196,22 +1194,22 @@ gst_flv_mux_buffer_to_tag_internal (GstFlvMux * mux, GstBuffer * buffer,
 
 static inline GstBuffer *
 gst_flv_mux_buffer_to_tag (GstFlvMux * mux, GstBuffer * buffer,
-    GstFlvPad * cpad)
+    GstFlvMuxPad * pad)
 {
-  return gst_flv_mux_buffer_to_tag_internal (mux, buffer, cpad, FALSE);
+  return gst_flv_mux_buffer_to_tag_internal (mux, buffer, pad, FALSE);
 }
 
 static inline GstBuffer *
 gst_flv_mux_codec_data_buffer_to_tag (GstFlvMux * mux, GstBuffer * buffer,
-    GstFlvPad * cpad)
+    GstFlvMuxPad * pad)
 {
-  return gst_flv_mux_buffer_to_tag_internal (mux, buffer, cpad, TRUE);
+  return gst_flv_mux_buffer_to_tag_internal (mux, buffer, pad, TRUE);
 }
 
 static inline GstBuffer *
-gst_flv_mux_eos_to_tag (GstFlvMux * mux, GstFlvPad * cpad)
+gst_flv_mux_eos_to_tag (GstFlvMux * mux, GstFlvMuxPad * pad)
 {
-  return gst_flv_mux_buffer_to_tag_internal (mux, NULL, cpad, FALSE);
+  return gst_flv_mux_buffer_to_tag_internal (mux, NULL, pad, FALSE);
 }
 
 static void
@@ -1237,10 +1235,8 @@ gst_flv_mux_write_header (GstFlvMux * mux)
   GstCaps *caps;
   GstStructure *structure;
   GValue streamheader = { 0 };
-  GSList *l;
+  GList *l;
   GstFlowReturn ret;
-  GstSegment segment;
-  gchar s_id[32];
 
   /* if not streaming, check if downstream is seekable */
   if (!mux->streamable) {
@@ -1271,26 +1267,24 @@ gst_flv_mux_write_header (GstFlvMux * mux)
   video_codec_data = NULL;
   audio_codec_data = NULL;
 
-  for (l = mux->collect->data; l != NULL; l = l->next) {
-    GstFlvPad *cpad = l->data;
+  for (l = GST_ELEMENT_CAST (mux)->sinkpads; l != NULL; l = l->next) {
+    GstFlvMuxPad *pad = l->data;
 
     /* Get H.264 and AAC codec data, if present */
-    if (cpad && cpad->video && cpad->video_codec == 7) {
-      if (cpad->video_codec_data == NULL)
+    if (pad && mux->video_pad == pad && pad->codec == 7) {
+      if (pad->codec_data == NULL)
         GST_WARNING_OBJECT (mux, "Codec data for video stream not found, "
             "output might not be playable");
       else
         video_codec_data =
-            gst_flv_mux_codec_data_buffer_to_tag (mux, cpad->video_codec_data,
-            cpad);
-    } else if (cpad && !cpad->video && cpad->audio_codec == 10) {
-      if (cpad->audio_codec_data == NULL)
+            gst_flv_mux_codec_data_buffer_to_tag (mux, pad->codec_data, pad);
+    } else if (pad && mux->audio_pad == pad && pad->codec == 10) {
+      if (pad->codec_data == NULL)
         GST_WARNING_OBJECT (mux, "Codec data for audio stream not found, "
             "output might not be playable");
       else
         audio_codec_data =
-            gst_flv_mux_codec_data_buffer_to_tag (mux, cpad->audio_codec_data,
-            cpad);
+            gst_flv_mux_codec_data_buffer_to_tag (mux, pad->codec_data, pad);
     }
   }
 
@@ -1316,25 +1310,16 @@ gst_flv_mux_write_header (GstFlvMux * mux)
   if (audio_codec_data != NULL)
     gst_flv_mux_put_buffer_in_streamheader (&streamheader, audio_codec_data);
 
-  /* stream-start (FIXME: create id based on input ids) */
-  g_snprintf (s_id, sizeof (s_id), "flvmux-%08x", g_random_int ());
-  gst_pad_push_event (mux->srcpad, gst_event_new_stream_start (s_id));
-
   /* create the caps and put the streamheader in them */
   caps = gst_caps_new_empty_simple ("video/x-flv");
   structure = gst_caps_get_structure (caps, 0);
   gst_structure_set_value (structure, "streamheader", &streamheader);
   g_value_unset (&streamheader);
 
-  gst_pad_set_caps (mux->srcpad, caps);
+  gst_aggregator_set_src_caps (GST_AGGREGATOR_CAST (mux), caps);
 
   gst_caps_unref (caps);
 
-  /* segment */
-  gst_segment_init (&segment,
-      mux->streamable ? GST_FORMAT_TIME : GST_FORMAT_BYTES);
-  gst_pad_push_event (mux->srcpad, gst_event_new_segment (&segment));
-
   /* push the header buffer, the metadata and the codec info, if any */
   ret = gst_flv_mux_push (mux, header);
   if (ret != GST_FLOW_OK)
@@ -1369,47 +1354,63 @@ failure_audio_codec_data:
   return ret;
 }
 
+static GstClockTime
+gst_flv_mux_segment_to_running_time (const GstSegment * segment, GstClockTime t)
+{
+  /* we can get a dts before the segment, if dts < pts and pts is inside
+   * the segment, so we consider early times as 0 */
+  if (t < segment->start)
+    return 0;
+  return gst_segment_to_running_time (segment, GST_FORMAT_TIME, t);
+}
+
 static void
-gst_flv_mux_update_index (GstFlvMux * mux, GstBuffer * buffer, GstFlvPad * cpad)
+gst_flv_mux_update_index (GstFlvMux * mux, GstBuffer * buffer,
+    GstFlvMuxPad * pad)
 {
   /*
    * Add the tag byte offset and to the index if it's a valid seek point, which
    * means it's either a video keyframe or if there is no video pad (in that
    * case every FLV tag is a valid seek point)
    */
-  if (mux->have_video &&
-      (!cpad->video ||
-          GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)))
+  if (mux->video_pad == pad &&
+      GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
     return;
 
   if (GST_BUFFER_PTS_IS_VALID (buffer)) {
     GstFlvMuxIndexEntry *entry = g_slice_new (GstFlvMuxIndexEntry);
+    GstClockTime pts =
+        gst_flv_mux_segment_to_running_time (&GST_AGGREGATOR_PAD
+        (pad)->segment, GST_BUFFER_PTS (buffer));
     entry->position = mux->byte_count;
-    entry->time = gst_guint64_to_gdouble (GST_BUFFER_PTS (buffer)) / GST_SECOND;
+    entry->time = gst_guint64_to_gdouble (pts) / GST_SECOND;
     mux->index = g_list_prepend (mux->index, entry);
   }
 }
 
 static GstFlowReturn
-gst_flv_mux_write_buffer (GstFlvMux * mux, GstFlvPad * cpad, GstBuffer * buffer)
+gst_flv_mux_write_buffer (GstFlvMux * mux, GstFlvMuxPad * pad,
+    GstBuffer * buffer)
 {
   GstBuffer *tag;
   GstFlowReturn ret;
-  GstClockTime dts = GST_BUFFER_DTS (buffer);
+  GstClockTime dts =
+      gst_flv_mux_segment_to_running_time (&GST_AGGREGATOR_PAD (pad)->segment,
+      GST_BUFFER_DTS (buffer));
 
   /* clipping function arranged for running_time */
 
   if (!mux->streamable)
-    gst_flv_mux_update_index (mux, buffer, cpad);
+    gst_flv_mux_update_index (mux, buffer, pad);
 
-  tag = gst_flv_mux_buffer_to_tag (mux, buffer, cpad);
+  tag = gst_flv_mux_buffer_to_tag (mux, buffer, pad);
 
   gst_buffer_unref (buffer);
 
   ret = gst_flv_mux_push (mux, tag);
 
   if (ret == GST_FLOW_OK && GST_CLOCK_TIME_IS_VALID (dts))
-    cpad->last_timestamp = dts;
+    pad->last_timestamp = dts;
 
 
   return ret;
@@ -1418,45 +1419,49 @@ gst_flv_mux_write_buffer (GstFlvMux * mux, GstFlvPad * cpad, GstBuffer * buffer)
 static guint64
 gst_flv_mux_determine_duration (GstFlvMux * mux)
 {
-  GSList *l;
+  GList *l;
   GstClockTime duration = GST_CLOCK_TIME_NONE;
 
   GST_DEBUG_OBJECT (mux, "trying to determine the duration "
       "from pad timestamps");
 
-  for (l = mux->collect->data; l != NULL; l = l->next) {
-    GstFlvPad *cpad = l->data;
+  for (l = GST_ELEMENT_CAST (mux)->sinkpads; l != NULL; l = l->next) {
+    GstFlvMuxPad *pad = GST_FLV_MUX_PAD (l->data);
 
-    if (cpad && (cpad->last_timestamp != GST_CLOCK_TIME_NONE)) {
+    if (pad && (pad->last_timestamp != GST_CLOCK_TIME_NONE)) {
       if (duration == GST_CLOCK_TIME_NONE)
-        duration = cpad->last_timestamp;
+        duration = pad->last_timestamp;
       else
-        duration = MAX (duration, cpad->last_timestamp);
+        duration = MAX (duration, pad->last_timestamp);
     }
   }
 
   return duration;
 }
 
+static gboolean
+gst_flv_mux_are_all_pads_eos (GstFlvMux * mux)
+{
+  GList *l;
+
+  for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
+    GstFlvMuxPad *pad = GST_FLV_MUX_PAD (l->data);
+
+    if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad)))
+      return FALSE;
+  }
+  return TRUE;
+}
+
 static GstFlowReturn
 gst_flv_mux_write_eos (GstFlvMux * mux)
 {
   GstBuffer *tag;
-  GstFlvPad *video_pad = NULL;
-  GSList *l = mux->collect->data;
 
-  if (!mux->have_video)
+  if (mux->video_pad == NULL)
     return GST_FLOW_OK;
 
-  for (; l; l = l->next) {
-    GstFlvPad *cpad = l->data;
-    if (cpad && cpad->video) {
-      video_pad = cpad;
-      break;
-    }
-  }
-
-  tag = gst_flv_mux_eos_to_tag (mux, video_pad);
+  tag = gst_flv_mux_eos_to_tag (mux, mux->video_pad);
 
   return gst_flv_mux_push (mux, tag);
 }
@@ -1599,33 +1604,71 @@ gst_flv_mux_rewrite_header (GstFlvMux * mux)
   return gst_flv_mux_push (mux, rewrite);
 }
 
+static GstFlvMuxPad *
+gst_flv_mux_find_best_pad (GstAggregator * aggregator, GstClockTime * ts)
+{
+  GstAggregatorPad *apad;
+  GstFlvMuxPad *pad, *best = NULL;
+  GList *l;
+  GstBuffer *buffer;
+  GstClockTime best_ts = GST_CLOCK_TIME_NONE;
+
+  for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
+    apad = GST_AGGREGATOR_PAD (l->data);
+    pad = GST_FLV_MUX_PAD (l->data);
+    buffer = gst_aggregator_pad_get_buffer (GST_AGGREGATOR_PAD (pad));
+    if (!buffer)
+      continue;
+    if (best_ts == GST_CLOCK_TIME_NONE) {
+      best = pad;
+      best_ts = gst_flv_mux_segment_to_running_time (&apad->segment,
+          GST_BUFFER_DTS_OR_PTS (buffer));
+    } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
+      gint64 t = gst_flv_mux_segment_to_running_time (&apad->segment,
+          GST_BUFFER_DTS_OR_PTS (buffer));
+      if (t < best_ts) {
+        best = pad;
+        best_ts = t;
+      }
+    }
+  }
+  GST_DEBUG_OBJECT (aggregator,
+      "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
+      GST_TIME_ARGS (best_ts), best);
+  if (ts)
+    *ts = best_ts;
+  return best;
+}
+
 static GstFlowReturn
-gst_flv_mux_handle_buffer (GstCollectPads * pads, GstCollectData * cdata,
-    GstBuffer * buffer, gpointer user_data)
+gst_flv_mux_aggregate (GstAggregator * aggregator, gboolean timeout)
 {
-  GstFlvMux *mux = GST_FLV_MUX (user_data);
-  GstFlvPad *best;
+  GstFlvMux *mux = GST_FLV_MUX (aggregator);
+  GstFlvMuxPad *best;
   gint64 best_time = GST_CLOCK_STIME_NONE;
   GstFlowReturn ret;
+  GstClockTime ts;
+  GstBuffer *buffer = NULL;
 
   if (mux->state == GST_FLV_MUX_STATE_HEADER) {
-    if (mux->collect->data == NULL) {
+    if (GST_ELEMENT_CAST (mux)->sinkpads == NULL) {
       GST_ELEMENT_ERROR (mux, STREAM, MUX, (NULL),
           ("No input streams configured"));
       return GST_FLOW_ERROR;
     }
 
     ret = gst_flv_mux_write_header (mux);
-    if (ret != GST_FLOW_OK) {
-      gst_buffer_unref (buffer);
+    if (ret != GST_FLOW_OK)
       return ret;
-    }
     mux->state = GST_FLV_MUX_STATE_DATA;
 
-    if (cdata && GST_COLLECT_PADS_DTS_IS_VALID (cdata))
-      mux->first_timestamp = GST_COLLECT_PADS_DTS (cdata);
+    best = gst_flv_mux_find_best_pad (aggregator, &ts);
+    if (best && GST_CLOCK_STIME_IS_VALID (ts))
+      mux->first_timestamp = ts;
     else
       mux->first_timestamp = 0;
+  } else {
+    best = gst_flv_mux_find_best_pad (aggregator, &ts);
   }
 
   if (mux->new_tags) {
@@ -1635,21 +1678,25 @@ gst_flv_mux_handle_buffer (GstCollectPads * pads, GstCollectData * cdata,
     mux->new_tags = FALSE;
   }
 
-  best = (GstFlvPad *) cdata;
   if (best) {
+    buffer = gst_aggregator_pad_steal_buffer (GST_AGGREGATOR_PAD (best));
     g_assert (buffer);
-    best->dts = GST_COLLECT_PADS_DTS (cdata);
+    best->dts =
+        gst_flv_mux_segment_to_running_time (&GST_AGGREGATOR_PAD
+        (best)->segment, GST_BUFFER_DTS_OR_PTS (buffer));
 
     if (GST_CLOCK_STIME_IS_VALID (best->dts))
       best_time = best->dts - mux->first_timestamp;
 
     if (GST_BUFFER_PTS_IS_VALID (buffer))
-      best->pts = GST_BUFFER_PTS (buffer);
+      best->pts =
+          gst_flv_mux_segment_to_running_time (&GST_AGGREGATOR_PAD
+          (best)->segment, GST_BUFFER_PTS (buffer));
     else
       best->pts = best->dts;
 
-    GST_LOG_OBJECT (mux, "got buffer PTS %" GST_TIME_FORMAT " DTS %"
-        GST_STIME_FORMAT "\n", GST_TIME_ARGS (best->pts),
+    GST_LOG_OBJECT (best, "got buffer PTS %" GST_TIME_FORMAT " DTS %"
+        GST_STIME_FORMAT, GST_TIME_ARGS (best->pts),
         GST_STIME_ARGS (best->dts));
   } else {
     best_time = GST_CLOCK_STIME_NONE;
@@ -1661,19 +1708,22 @@ gst_flv_mux_handle_buffer (GstCollectPads * pads, GstCollectData * cdata,
   if (!mux->streamable && (GST_CLOCK_STIME_IS_VALID (best_time))
       && best_time / GST_MSECOND > G_MAXINT32) {
     GST_WARNING_OBJECT (mux, "Timestamp larger than FLV supports - EOS");
-    gst_buffer_unref (buffer);
-    buffer = NULL;
+    if (buffer) {
+      gst_buffer_unref (buffer);
+      buffer = NULL;
+    }
     best = NULL;
   }
 
   if (best) {
     return gst_flv_mux_write_buffer (mux, best, buffer);
   } else {
-    /* FIXME check return values */
-    gst_flv_mux_write_eos (mux);
-    gst_flv_mux_rewrite_header (mux);
-    gst_pad_push_event (mux->srcpad, gst_event_new_eos ());
-    return GST_FLOW_EOS;
+    if (gst_flv_mux_are_all_pads_eos (mux)) {
+      gst_flv_mux_write_eos (mux);
+      gst_flv_mux_rewrite_header (mux);
+      gst_pad_push_event (mux->srcpad, gst_event_new_eos ());
+    }
+    return GST_FLOW_OK;
   }
 }
 
@@ -1727,40 +1777,32 @@ gst_flv_mux_set_property (GObject * object,
   }
 }
 
-static GstStateChangeReturn
-gst_flv_mux_change_state (GstElement * element, GstStateChange transition)
+static GstClockTime
+gst_flv_mux_get_next_time_for_segment (GstAggregator * aggregator,
+    const GstSegment * segment)
 {
-  GstStateChangeReturn ret;
-  GstFlvMux *mux = GST_FLV_MUX (element);
+  GstClockTime next_time;
 
-  switch (transition) {
-    case GST_STATE_CHANGE_NULL_TO_READY:
-      break;
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
-      gst_collect_pads_start (mux->collect);
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_collect_pads_stop (mux->collect);
-      break;
-    default:
-      break;
-  }
+  GST_OBJECT_LOCK (aggregator);
+  if (segment->position == -1 || segment->position < segment->start)
+    next_time = segment->start;
+  else
+    next_time = segment->position;
 
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  if (segment->stop != -1 && next_time > segment->stop)
+    next_time = segment->stop;
 
-  switch (transition) {
-    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_flv_mux_reset (GST_ELEMENT (mux));
-      break;
-    case GST_STATE_CHANGE_READY_TO_NULL:
-      break;
-    default:
-      break;
-  }
+  next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
+  GST_OBJECT_UNLOCK (aggregator);
 
-  return ret;
+  GST_DEBUG_OBJECT (aggregator, "next_time: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (next_time));
+  return next_time;
+}
+
+static GstClockTime
+gst_flv_mux_get_next_time (GstAggregator * aggregator)
+{
+  return gst_flv_mux_get_next_time_for_segment (aggregator,
+      &aggregator->segment);
 }
index 2cf4593..4db5093 100644 (file)
@@ -1,6 +1,9 @@
 /* GStreamer
  *
- * Copyright (c) 2008 Sebastian Dröge <sebastian.droege@collabora.co.uk>
+ * Copyright (c) 2008,2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
+ * Copyright (c) 2008-2017 Collabora Ltd
+ *  @author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
+ *  @author: Vincent Penquerc'h <vincent.penquerch@collabora.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 #define __GST_FLV_MUX_H__
 
 #include <gst/gst.h>
-#include <gst/base/gstcollectpads.h>
+#include <gst/base/gstaggregator.h>
 
 G_BEGIN_DECLS
 
+#define GST_TYPE_FLV_MUX_PAD (gst_flv_mux_pad_get_type())
+#define GST_FLV_MUX_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_FLV_MUX_PAD, GstFlvMuxPad))
+#define GST_FLV_MUX_PAD_CAST(obj) ((GstFlvMuxPad *)(obj))
+#define GST_FLV_MUX_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_FLV_MUX_PAD, GstFlvMuxPad))
+#define GST_IS_FLV_MUX_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_FLV_MUX_PAD))
+#define GST_IS_FLV_MUX_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_FLV_MUX_PAD))
+
+typedef struct _GstFlvMuxPad GstFlvMuxPad;
+typedef struct _GstFlvMuxPadClass GstFlvMuxPadClass;
+
 #define GST_TYPE_FLV_MUX \
   (gst_flv_mux_get_type ())
 #define GST_FLV_MUX(obj) \
   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_FLV_MUX, GstFlvMux))
+#define GST_FLV_MUX_CAST(obj) ((GstFlvMux *)obj)
 #define GST_FLV_MUX_CLASS(klass) \
   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_FLV_MUX, GstFlvMuxClass))
 #define GST_IS_FLV_MUX(obj) \
@@ -37,27 +51,26 @@ G_BEGIN_DECLS
 #define GST_IS_FLV_MUX_CLASS(klass) \
   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_FLV_MUX))
 
-typedef struct
+struct _GstFlvMuxPad
 {
-  GstCollectData collect;
-
-  gboolean video;
+  GstAggregatorPad aggregator_pad;
 
-  guint audio_codec;
+  guint codec;
   guint rate;
   guint width;
   guint channels;
-  GstBuffer *audio_codec_data;
-
-  guint video_codec;
-  GstBuffer *video_codec_data;
+  GstBuffer *codec_data;
 
   guint bitrate;
 
   GstClockTime last_timestamp;
   gint64 pts;
   gint64 dts;
-} GstFlvPad;
+};
+
+typedef struct _GstFlvMuxPadClass {
+  GstAggregatorPadClass parent;
+} GstFlvMuxPadClass;
 
 typedef enum
 {
@@ -66,15 +79,14 @@ typedef enum
 } GstFlvMuxState;
 
 typedef struct _GstFlvMux {
-  GstElement     element;
+  GstAggregator   aggregator;
 
   GstPad         *srcpad;
-  GstCollectPads *collect;
 
   /* <private> */
   GstFlvMuxState state;
-  gboolean have_audio;
-  gboolean have_video;
+  GstFlvMuxPad *audio_pad;
+  GstFlvMuxPad *video_pad;
   gboolean streamable;
   gchar *metadatacreator;
 
@@ -87,9 +99,10 @@ typedef struct _GstFlvMux {
 } GstFlvMux;
 
 typedef struct _GstFlvMuxClass {
-  GstElementClass parent;
+  GstAggregatorClass parent;
 } GstFlvMuxClass;
 
+GType    gst_flv_mux_pad_get_type(void);
 GType    gst_flv_mux_get_type    (void);
 
 G_END_DECLS