qtmux: port to GstAggregator
authorMathieu Duponchelle <mathieu@centricular.com>
Tue, 3 Dec 2019 14:30:06 +0000 (15:30 +0100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Mon, 16 Dec 2019 14:17:38 +0000 (14:17 +0000)
gst/isomp4/gstqtmux.c
gst/isomp4/gstqtmux.h
tests/check/elements/qtmux.c

index ac3b73e9f9d4045ad5ebbda6199ffb107451e80e..20be6a7b85419af733823c82dc5f8dd36fac5d85 100644 (file)
 #include <glib/gstdio.h>
 
 #include <gst/gst.h>
-#include <gst/base/gstcollectpads.h>
 #include <gst/base/gstbytereader.h>
 #include <gst/base/gstbitreader.h>
 #include <gst/audio/audio.h>
@@ -268,37 +267,7 @@ enum
 
 #define DEFAULT_PAD_TRAK_TIMESCALE          0
 
-GType gst_qt_mux_pad_get_type (void);
-
-#define GST_TYPE_QT_MUX_PAD \
-  (gst_qt_mux_pad_get_type())
-#define GST_QT_MUX_PAD(obj) \
-  (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_QT_MUX_PAD, GstQTMuxPad))
-#define GST_QT_MUX_PAD_CLASS(klass) \
-  (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_QT_MUX_PAD, GstQTMuxPadClass))
-#define GST_IS_QT_MUX_PAD(obj) \
-  (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_QT_MUX_PAD))
-#define GST_IS_QT_MUX_PAD_CLASS(klass) \
-  (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_QT_MUX_PAD))
-#define GST_QT_MUX_PAD_CAST(obj) \
-  ((GstQTMuxPad *)(obj))
-
-typedef struct _GstQTMuxPad GstQTMuxPad;
-typedef struct _GstQTMuxPadClass GstQTMuxPadClass;
-
-struct _GstQTMuxPad
-{
-  GstPad parent;
-
-  guint32 trak_timescale;
-};
-
-struct _GstQTMuxPadClass
-{
-  GstPadClass parent;
-};
-
-G_DEFINE_TYPE (GstQTMuxPad, gst_qt_mux_pad, GST_TYPE_PAD);
+G_DEFINE_TYPE (GstQTMuxPad, gst_qt_mux_pad, GST_TYPE_AGGREGATOR_PAD);
 
 static void
 gst_qt_mux_pad_set_property (GObject * object,
@@ -426,9 +395,6 @@ enum
 
 static void gst_qt_mux_finalize (GObject * object);
 
-static GstStateChangeReturn gst_qt_mux_change_state (GstElement * element,
-    GstStateChange transition);
-
 /* property functions */
 static void gst_qt_mux_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec);
@@ -441,12 +407,22 @@ static GstPad *gst_qt_mux_request_new_pad (GstElement * element,
 static void gst_qt_mux_release_pad (GstElement * element, GstPad * pad);
 
 /* event */
-static gboolean gst_qt_mux_sink_event (GstCollectPads * pads,
-    GstCollectData * data, GstEvent * event, gpointer user_data);
-
-static GstFlowReturn gst_qt_mux_collected (GstCollectPads * pads,
-    gpointer user_data);
-static GstFlowReturn gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad,
+static gboolean gst_qt_mux_sink_event (GstAggregator * agg,
+    GstAggregatorPad * agg_pad, GstEvent * event);
+
+/* aggregator */
+static GstAggregatorPad *gst_qt_mux_create_new_pad (GstAggregator * self,
+    GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps);
+static GstFlowReturn gst_qt_mux_aggregate (GstAggregator * agg,
+    gboolean timeout);
+static GstBuffer *gst_qt_mux_clip_running_time (GstAggregator * agg,
+    GstAggregatorPad * agg_pad, GstBuffer * buf);
+static gboolean gst_qt_mux_start (GstAggregator * agg);
+static gboolean gst_qt_mux_stop (GstAggregator * agg);
+
+/* internal */
+
+static GstFlowReturn gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTMuxPad * pad,
     GstBuffer * buf);
 
 static GstFlowReturn
@@ -484,8 +460,8 @@ gst_qt_mux_base_init (gpointer g_class)
   g_free (description);
 
   /* pad templates */
-  srctempl = gst_pad_template_new ("src", GST_PAD_SRC,
-      GST_PAD_ALWAYS, params->src_caps);
+  srctempl = gst_pad_template_new_with_gtype ("src", GST_PAD_SRC,
+      GST_PAD_ALWAYS, params->src_caps, GST_TYPE_AGGREGATOR_PAD);
   gst_element_class_add_pad_template (element_class, srctempl);
 
   if (params->audio_sink_caps) {
@@ -524,6 +500,7 @@ gst_qt_mux_class_init (GstQTMuxClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
+  GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass);
   GParamFlags streamable_flags;
   const gchar *streamable_desc;
   gboolean streamable;
@@ -659,12 +636,18 @@ gst_qt_mux_class_init (GstQTMuxClass * klass)
 
   gstelement_class->request_new_pad =
       GST_DEBUG_FUNCPTR (gst_qt_mux_request_new_pad);
-  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_qt_mux_change_state);
   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_qt_mux_release_pad);
+
+  gstagg_class->sink_event = gst_qt_mux_sink_event;
+  gstagg_class->aggregate = gst_qt_mux_aggregate;
+  gstagg_class->clip = gst_qt_mux_clip_running_time;
+  gstagg_class->start = gst_qt_mux_start;
+  gstagg_class->stop = gst_qt_mux_stop;
+  gstagg_class->create_new_pad = gst_qt_mux_create_new_pad;
 }
 
 static void
-gst_qt_mux_pad_reset (GstQTPad * qtpad)
+gst_qt_mux_pad_reset (GstQTMuxPad * qtpad)
 {
   qtpad->fourcc = 0;
   qtpad->is_out_of_order = FALSE;
@@ -726,6 +709,7 @@ static void
 gst_qt_mux_reset (GstQTMux * qtmux, gboolean alloc)
 {
   GSList *walk;
+  GList *l;
 
   qtmux->state = GST_QT_MUX_STATE_NONE;
   qtmux->header_size = 0;
@@ -769,8 +753,8 @@ gst_qt_mux_reset (GstQTMux * qtmux, gboolean alloc)
   GST_OBJECT_UNLOCK (qtmux);
 
   /* reset pad data */
-  for (walk = qtmux->sinkpads; walk; walk = g_slist_next (walk)) {
-    GstQTPad *qtpad = (GstQTPad *) walk->data;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
     gst_qt_mux_pad_reset (qtpad);
 
     /* hm, moov_free above yanked the traks away from us,
@@ -781,8 +765,8 @@ gst_qt_mux_reset (GstQTMux * qtmux, gboolean alloc)
   if (alloc) {
     qtmux->moov = atom_moov_new (qtmux->context);
     /* ensure all is as nice and fresh as request_new_pad would provide it */
-    for (walk = qtmux->sinkpads; walk; walk = g_slist_next (walk)) {
-      GstQTPad *qtpad = (GstQTPad *) walk->data;
+    for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+      GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
       qtpad->trak = atom_trak_new (qtmux->context);
       atom_moov_add_trak (qtmux->moov, qtpad->trak);
@@ -800,26 +784,64 @@ gst_qt_mux_reset (GstQTMux * qtmux, gboolean alloc)
   qtmux->reserved_duration_remaining = GST_CLOCK_TIME_NONE;
 }
 
+static GstBuffer *
+gst_qt_mux_clip_running_time (GstAggregator * agg,
+    GstAggregatorPad * agg_pad, GstBuffer * buf)
+{
+  GstQTMuxPad *qtpad = GST_QT_MUX_PAD (agg_pad);
+  GstBuffer *outbuf = buf;
+
+  /* invalid left alone and passed */
+  if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS_OR_PTS (buf)))) {
+    GstClockTime time;
+    GstClockTime buf_dts, abs_dts;
+    gint dts_sign;
+
+    time = GST_BUFFER_PTS (buf);
+
+    if (GST_CLOCK_TIME_IS_VALID (time)) {
+      time =
+          gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME,
+          time);
+      if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
+        GST_DEBUG_OBJECT (agg_pad, "clipping buffer on pad outside segment %"
+            GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
+        gst_buffer_unref (buf);
+        return NULL;
+      }
+    }
+
+    GST_LOG_OBJECT (agg_pad, "buffer pts %" GST_TIME_FORMAT " -> %"
+        GST_TIME_FORMAT " running time",
+        GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
+    outbuf = gst_buffer_make_writable (buf);
+    GST_BUFFER_PTS (outbuf) = time;
+
+    dts_sign = gst_segment_to_running_time_full (&agg_pad->segment,
+        GST_FORMAT_TIME, GST_BUFFER_DTS (outbuf), &abs_dts);
+    buf_dts = GST_BUFFER_DTS (outbuf);
+    if (dts_sign > 0) {
+      GST_BUFFER_DTS (outbuf) = abs_dts;
+      qtpad->dts = abs_dts;
+    } else if (dts_sign < 0) {
+      GST_BUFFER_DTS (outbuf) = GST_CLOCK_TIME_NONE;
+      qtpad->dts = -((gint64) abs_dts);
+    } else {
+      GST_BUFFER_DTS (outbuf) = GST_CLOCK_TIME_NONE;
+      qtpad->dts = GST_CLOCK_STIME_NONE;
+    }
+
+    GST_LOG_OBJECT (agg_pad, "buffer dts %" GST_TIME_FORMAT " -> %"
+        GST_STIME_FORMAT " running time", GST_TIME_ARGS (buf_dts),
+        GST_STIME_ARGS (qtpad->dts));
+  }
+
+  return outbuf;
+}
+
 static void
 gst_qt_mux_init (GstQTMux * qtmux, GstQTMuxClass * qtmux_klass)
 {
-  GstElementClass *klass = GST_ELEMENT_CLASS (qtmux_klass);
-  GstPadTemplate *templ;
-
-  templ = gst_element_class_get_pad_template (klass, "src");
-  qtmux->srcpad = gst_pad_new_from_template (templ, "src");
-  gst_pad_use_fixed_caps (qtmux->srcpad);
-  gst_element_add_pad (GST_ELEMENT (qtmux), qtmux->srcpad);
-
-  qtmux->sinkpads = NULL;
-  qtmux->collect = gst_collect_pads_new ();
-  gst_collect_pads_set_event_function (qtmux->collect,
-      GST_DEBUG_FUNCPTR (gst_qt_mux_sink_event), qtmux);
-  gst_collect_pads_set_clip_function (qtmux->collect,
-      GST_DEBUG_FUNCPTR (gst_collect_pads_clip_running_time), qtmux);
-  gst_collect_pads_set_function (qtmux->collect,
-      GST_DEBUG_FUNCPTR (gst_qt_mux_collected), qtmux);
-
   /* properties set to default upon construction */
 
   qtmux->reserved_max_duration = DEFAULT_RESERVED_MAX_DURATION;
@@ -851,15 +873,12 @@ gst_qt_mux_finalize (GObject * object)
   g_free (qtmux->moov_recov_file_path);
 
   atoms_context_free (qtmux->context);
-  gst_object_unref (qtmux->collect);
-
-  g_slist_free (qtmux->sinkpads);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 static GstBuffer *
-gst_qt_mux_prepare_jpc_buffer (GstQTPad * qtpad, GstBuffer * buf,
+gst_qt_mux_prepare_jpc_buffer (GstQTMuxPad * qtpad, GstBuffer * buf,
     GstQTMux * qtmux)
 {
   GstBuffer *newbuf;
@@ -924,7 +943,7 @@ extract_608_field_from_s334_1a (const guint8 * ccdata, gsize ccdata_size,
 
 
 static GstBuffer *
-gst_qt_mux_prepare_caption_buffer (GstQTPad * qtpad, GstBuffer * buf,
+gst_qt_mux_prepare_caption_buffer (GstQTMuxPad * qtpad, GstBuffer * buf,
     GstQTMux * qtmux)
 {
   GstBuffer *newbuf = NULL;
@@ -1053,7 +1072,7 @@ gst_qt_mux_prepare_caption_buffer (GstQTPad * qtpad, GstBuffer * buf,
 }
 
 static GstBuffer *
-gst_qt_mux_prepare_tx3g_buffer (GstQTPad * qtpad, GstBuffer * buf,
+gst_qt_mux_prepare_tx3g_buffer (GstQTMuxPad * qtpad, GstBuffer * buf,
     GstQTMux * qtmux)
 {
   GstBuffer *newbuf;
@@ -1093,7 +1112,7 @@ gst_qt_mux_prepare_tx3g_buffer (GstQTPad * qtpad, GstBuffer * buf,
 }
 
 static void
-gst_qt_mux_pad_add_ac3_extension (GstQTMux * qtmux, GstQTPad * qtpad,
+gst_qt_mux_pad_add_ac3_extension (GstQTMux * qtmux, GstQTMuxPad * qtpad,
     guint8 fscod, guint8 frmsizcod, guint8 bsid, guint8 bsmod, guint8 acmod,
     guint8 lfe_on)
 {
@@ -1107,7 +1126,7 @@ gst_qt_mux_pad_add_ac3_extension (GstQTMux * qtmux, GstQTPad * qtpad,
 }
 
 static GstBuffer *
-gst_qt_mux_prepare_parse_ac3_frame (GstQTPad * qtpad, GstBuffer * buf,
+gst_qt_mux_prepare_parse_ac3_frame (GstQTMuxPad * qtpad, GstBuffer * buf,
     GstQTMux * qtmux)
 {
   GstMapInfo map;
@@ -1115,7 +1134,7 @@ gst_qt_mux_prepare_parse_ac3_frame (GstQTPad * qtpad, GstBuffer * buf,
   guint off;
 
   if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
-    GST_WARNING_OBJECT (qtpad->collect.pad, "Failed to map buffer");
+    GST_WARNING_OBJECT (qtpad, "Failed to map buffer");
     return buf;
   }
 
@@ -1130,8 +1149,7 @@ gst_qt_mux_prepare_parse_ac3_frame (GstQTPad * qtpad, GstBuffer * buf,
     GstBitReader bits;
     guint8 fscod, frmsizcod, bsid, bsmod, acmod, lfe_on;
 
-    GST_DEBUG_OBJECT (qtpad->collect.pad, "Found ac3 sync point at offset: %u",
-        off);
+    GST_DEBUG_OBJECT (qtpad, "Found ac3 sync point at offset: %u", off);
 
     gst_bit_reader_init (&bits, map.data, map.size);
 
@@ -1158,7 +1176,7 @@ gst_qt_mux_prepare_parse_ac3_frame (GstQTPad * qtpad, GstBuffer * buf,
 
     /* AC-3 spec says that those values should be constant for the
      * whole stream when muxed in mp4. We trust the input follows it */
-    GST_DEBUG_OBJECT (qtpad->collect.pad, "Data parsed, removing "
+    GST_DEBUG_OBJECT (qtpad, "Data parsed, removing "
         "prepare buffer function");
     qtpad->prepare_buf_func = NULL;
   }
@@ -1169,7 +1187,7 @@ done:
 }
 
 static GstBuffer *
-gst_qt_mux_create_empty_tx3g_buffer (GstQTPad * qtpad, gint64 duration)
+gst_qt_mux_create_empty_tx3g_buffer (GstQTMuxPad * qtpad, gint64 duration)
 {
   guint8 *data;
 
@@ -1798,7 +1816,7 @@ static void
 gst_qt_mux_setup_metadata (GstQTMux * qtmux)
 {
   const GstTagList *tags = NULL;
-  GSList *walk;
+  GList *l;
 
   GST_OBJECT_LOCK (qtmux);
   if (qtmux->tags_changed) {
@@ -1825,19 +1843,17 @@ gst_qt_mux_setup_metadata (GstQTMux * qtmux)
     GST_DEBUG_OBJECT (qtmux, "No new tags received");
   }
 
-  for (walk = qtmux->sinkpads; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qpad = (GstQTPad *) cdata;
-    GstPad *pad = qpad->collect.pad;
+  for (l = GST_ELEMENT (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qpad = GST_QT_MUX_PAD (l->data);
 
     if (qpad->tags_changed && qpad->tags) {
-      GST_DEBUG_OBJECT (pad, "Adding tags");
+      GST_DEBUG_OBJECT (qpad, "Adding tags");
       gst_tag_list_remove_tag (qpad->tags, GST_TAG_CONTAINER_FORMAT);
       gst_qt_mux_add_metadata_tags (qtmux, qpad->tags, &qpad->trak->udta);
       qpad->tags_changed = FALSE;
-      GST_DEBUG_OBJECT (pad, "Tags added");
+      GST_DEBUG_OBJECT (qpad, "Tags added");
     } else {
-      GST_DEBUG_OBJECT (pad, "No new tags received");
+      GST_DEBUG_OBJECT (qpad, "No new tags received");
     }
   }
 }
@@ -1881,7 +1897,7 @@ gst_qt_mux_send_buffer (GstQTMux * qtmux, GstBuffer * buf, guint64 * offset,
       res = GST_FLOW_OK;
   } else {
     GST_LOG_OBJECT (qtmux, "downstream");
-    res = gst_pad_push (qtmux->srcpad, buf);
+    res = gst_aggregator_finish_buffer (GST_AGGREGATOR (qtmux), buf);
   }
 
   if (res != GST_FLOW_OK)
@@ -2064,7 +2080,7 @@ gst_qt_mux_update_mdat_size (GstQTMux * qtmux, guint64 mdat_pos,
   /* seek and rewrite the header */
   gst_segment_init (&segment, GST_FORMAT_BYTES);
   segment.start = mdat_pos;
-  gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+  gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
 
   return gst_qt_mux_send_mdat_header (qtmux, offset, mdat_size, TRUE,
       fsync_after);
@@ -2154,7 +2170,7 @@ gst_qt_mux_set_header_on_caps (GstQTMux * mux, GstBuffer * buf)
   GValue value = { 0 };
   GstCaps *caps, *tcaps;
 
-  tcaps = gst_pad_get_current_caps (mux->srcpad);
+  tcaps = gst_pad_get_current_caps (GST_AGGREGATOR_SRC_PAD (mux));
   caps = gst_caps_copy (tcaps);
   gst_caps_unref (tcaps);
 
@@ -2170,7 +2186,7 @@ gst_qt_mux_set_header_on_caps (GstQTMux * mux, GstBuffer * buf)
 
   gst_structure_set_value (structure, "streamheader", &array);
   g_value_unset (&array);
-  gst_pad_set_caps (mux->srcpad, caps);
+  gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
   gst_caps_unref (caps);
 }
 
@@ -2219,7 +2235,7 @@ gst_qt_mux_send_free_atom (GstQTMux * qtmux, guint64 * off, guint32 size,
     /* Make sure downstream position ends up at the end of this free box */
     gst_segment_init (&segment, GST_FORMAT_BYTES);
     segment.start = *off;
-    gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+    gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
   }
 
   return ret;
@@ -2272,14 +2288,13 @@ gst_qt_mux_send_moov (GstQTMux * qtmux, guint64 * _offset,
   guint8 *data;
   GstBuffer *buf;
   GstFlowReturn ret = GST_FLOW_OK;
-  GSList *walk;
+  GList *l;
   guint64 current_time = atoms_get_current_qt_time ();
 
   /* update modification times */
   qtmux->moov->mvhd.time_info.modification_time = current_time;
-  for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qtpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
     qtpad->trak->mdia.mdhd.time_info.modification_time = current_time;
     qtpad->trak->tkhd.modification_time = current_time;
@@ -2383,7 +2398,7 @@ gst_qt_mux_downstream_is_seekable (GstQTMux * qtmux)
   gboolean seekable = FALSE;
   GstQuery *query = gst_query_new_seeking (GST_FORMAT_BYTES);
 
-  if (gst_pad_peer_query (qtmux->srcpad, query)) {
+  if (gst_pad_peer_query (GST_AGGREGATOR_SRC_PAD (qtmux), query)) {
     gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
     GST_INFO_OBJECT (qtmux, "downstream is %sseekable", seekable ? "" : "not ");
   } else {
@@ -2399,7 +2414,7 @@ gst_qt_mux_downstream_is_seekable (GstQTMux * qtmux)
 static void
 gst_qt_mux_prepare_moov_recovery (GstQTMux * qtmux)
 {
-  GSList *walk;
+  GList *l;
   gboolean fail = FALSE;
   AtomFTYP *ftyp = NULL;
   GstBuffer *prefix = NULL;
@@ -2417,7 +2432,8 @@ gst_qt_mux_prepare_moov_recovery (GstQTMux * qtmux)
   gst_qt_mux_prepare_ftyp (qtmux, &ftyp, &prefix);
 
   if (!atoms_recov_write_headers (qtmux->moov_recov_file, ftyp, prefix,
-          qtmux->moov, qtmux->timescale, g_slist_length (qtmux->sinkpads))) {
+          qtmux->moov, qtmux->timescale,
+          g_list_length (GST_ELEMENT (qtmux)->sinkpads))) {
     GST_WARNING_OBJECT (qtmux, "Failed to write moov recovery file " "headers");
     goto fail;
   }
@@ -2426,9 +2442,8 @@ gst_qt_mux_prepare_moov_recovery (GstQTMux * qtmux)
   if (prefix)
     gst_buffer_unref (prefix);
 
-  for (walk = qtmux->sinkpads; walk && !fail; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qpad = (GstQTMuxPad *) l->data;
     /* write info for each stream */
     fail = atoms_recov_write_trak_info (qtmux->moov_recov_file, qpad->trak);
     if (fail) {
@@ -2447,7 +2462,7 @@ fail:
 }
 
 static guint64
-prefill_get_block_index (GstQTMux * qtmux, GstQTPad * qpad)
+prefill_get_block_index (GstQTMux * qtmux, GstQTMuxPad * qpad)
 {
   switch (qpad->fourcc) {
     case FOURCC_apch:
@@ -2471,7 +2486,7 @@ prefill_get_block_index (GstQTMux * qtmux, GstQTPad * qpad)
 }
 
 static guint
-prefill_get_sample_size (GstQTMux * qtmux, GstQTPad * qpad)
+prefill_get_sample_size (GstQTMux * qtmux, GstQTMuxPad * qpad)
 {
   switch (qpad->fourcc) {
     case FOURCC_apch:
@@ -2532,7 +2547,7 @@ prefill_get_sample_size (GstQTMux * qtmux, GstQTPad * qpad)
     case FOURCC_c708:{
       if (qpad->first_cc_sample_size == 0) {
         GstBuffer *buf =
-            gst_collect_pads_peek (qtmux->collect, (GstCollectData *) qpad);
+            gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (qpad));
         g_assert (buf != NULL);
         qpad->first_cc_sample_size = gst_buffer_get_size (buf);
         g_assert (qpad->first_cc_sample_size != 0);
@@ -2565,7 +2580,7 @@ prefill_get_sample_size (GstQTMux * qtmux, GstQTPad * qpad)
 }
 
 static GstClockTime
-prefill_get_next_timestamp (GstQTMux * qtmux, GstQTPad * qpad)
+prefill_get_next_timestamp (GstQTMux * qtmux, GstQTMuxPad * qpad)
 {
   switch (qpad->fourcc) {
     case FOURCC_apch:
@@ -2603,7 +2618,7 @@ prefill_get_next_timestamp (GstQTMux * qtmux, GstQTPad * qpad)
 }
 
 static GstBuffer *
-prefill_raw_audio_prepare_buf_func (GstQTPad * qtpad, GstBuffer * buf,
+prefill_raw_audio_prepare_buf_func (GstQTMuxPad * qtpad, GstBuffer * buf,
     GstQTMux * qtmux)
 {
   guint64 block_idx;
@@ -2623,8 +2638,7 @@ prefill_raw_audio_prepare_buf_func (GstQTPad * qtpad, GstBuffer * buf,
       qtpad->expected_sample_duration_d * atom_trak_get_timescale (qtpad->trak),
       qtpad->expected_sample_duration_n) - qtpad->raw_audio_adapter_offset;
 
-  if ((!GST_COLLECT_PADS_STATE_IS_SET (&qtpad->collect,
-              GST_COLLECT_PADS_STATE_EOS)
+  if ((!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (qtpad))
           && gst_adapter_available (qtpad->raw_audio_adapter) <
           nsamples * qtpad->sample_size)
       || gst_adapter_available (qtpad->raw_audio_adapter) == 0) {
@@ -2641,8 +2655,7 @@ prefill_raw_audio_prepare_buf_func (GstQTPad * qtpad, GstBuffer * buf,
 
   buf =
       gst_adapter_take_buffer (qtpad->raw_audio_adapter,
-      !GST_COLLECT_PADS_STATE_IS_SET (&qtpad->collect,
-          GST_COLLECT_PADS_STATE_EOS) ? nsamples *
+      !gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (qtpad)) ? nsamples *
       qtpad->sample_size : gst_adapter_available (qtpad->raw_audio_adapter));
   GST_BUFFER_PTS (buf) = input_timestamp;
   GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
@@ -2675,13 +2688,12 @@ prefill_raw_audio_prepare_buf_func (GstQTPad * qtpad, GstBuffer * buf,
 static void
 find_video_sample_duration (GstQTMux * qtmux, guint * dur_n, guint * dur_d)
 {
-  GSList *walk;
+  GList *l;
 
   /* Find the (first) video track and assume that we have to output
    * in that size */
-  for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *tmp_qpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *tmp_qpad = (GstQTMuxPad *) l->data;
 
     if (tmp_qpad->trak->is_video) {
       *dur_n = tmp_qpad->expected_sample_duration_n;
@@ -2690,7 +2702,7 @@ find_video_sample_duration (GstQTMux * qtmux, guint * dur_n, guint * dur_d)
     }
   }
 
-  if (walk == NULL) {
+  if (l == NULL) {
     GST_INFO_OBJECT (qtmux,
         "Found no video framerate, using 40ms audio buffers");
     *dur_n = 25;
@@ -2700,7 +2712,7 @@ find_video_sample_duration (GstQTMux * qtmux, guint * dur_n, guint * dur_d)
 
 /* Called when all pads are prerolled to adjust and  */
 static gboolean
-prefill_update_sample_size (GstQTMux * qtmux, GstQTPad * qpad)
+prefill_update_sample_size (GstQTMux * qtmux, GstQTMuxPad * qpad)
 {
   switch (qpad->fourcc) {
     case FOURCC_apch:
@@ -2743,11 +2755,10 @@ prefill_update_sample_size (GstQTMux * qtmux, GstQTPad * qpad)
 
 /* Only called at startup when doing the "fake" iteration of all tracks in order
  * to prefill the sample tables in the header.  */
-static GstQTPad *
+static GstQTMuxPad *
 find_best_pad_prefill_start (GstQTMux * qtmux)
 {
-  GSList *walk;
-  GstQTPad *best_pad = NULL;
+  GstQTMuxPad *best_pad = NULL;
 
   /* If interleave limits have been specified and the current pad is within
    * those interleave limits, pick that one, otherwise let's try to figure out
@@ -2764,7 +2775,7 @@ find_best_pad_prefill_start (GstQTMux * qtmux)
     if (qtmux->current_pad->total_duration < qtmux->reserved_max_duration) {
       best_pad = qtmux->current_pad;
     }
-  } else if (qtmux->collect->data->next) {
+  } else if (GST_ELEMENT_CAST (qtmux)->sinkpads->next) {
     /* Attempt to try another pad if we have one. Otherwise use the only pad
      * present */
     best_pad = qtmux->current_pad = NULL;
@@ -2773,11 +2784,11 @@ find_best_pad_prefill_start (GstQTMux * qtmux)
   /* The next best pad is the one which has the lowest timestamp and hasn't
    * exceeded the reserved max duration */
   if (!best_pad) {
+    GList *l;
     GstClockTime best_time = GST_CLOCK_TIME_NONE;
 
-    for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-      GstCollectData *cdata = (GstCollectData *) walk->data;
-      GstQTPad *qtpad = (GstQTPad *) cdata;
+    for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+      GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
       GstClockTime timestamp;
 
       if (qtpad->total_duration >= qtmux->reserved_max_duration)
@@ -2804,15 +2815,14 @@ find_best_pad_prefill_start (GstQTMux * qtmux)
 static gboolean
 gst_qt_mux_prefill_samples (GstQTMux * qtmux)
 {
-  GstQTPad *qpad;
-  GSList *walk;
+  GstQTMuxPad *qpad;
+  GList *l;
   GstQTMuxClass *qtmux_klass = (GstQTMuxClass *) (G_OBJECT_GET_CLASS (qtmux));
 
   /* Update expected sample sizes/durations as needed, this is for raw
    * audio where samples are actual audio samples. */
-  for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qpad = (GstQTMuxPad *) l->data;
 
     if (!prefill_update_sample_size (qtmux, qpad))
       return FALSE;
@@ -2822,11 +2832,10 @@ gst_qt_mux_prefill_samples (GstQTMux * qtmux)
     /* For the first sample check/update timecode as needed. We do that before
      * all actual samples as the code in gst_qt_mux_add_buffer() does it with
      * initial buffer directly, not with last_buf */
-    for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-      GstCollectData *cdata = (GstCollectData *) walk->data;
-      GstQTPad *qpad = (GstQTPad *) cdata;
+    for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+      GstQTMuxPad *qpad = (GstQTMuxPad *) l->data;
       GstBuffer *buffer =
-          gst_collect_pads_peek (qtmux->collect, (GstCollectData *) qpad);
+          gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (qpad));
       GstVideoTimeCodeMeta *tc_meta;
 
       if (buffer && (tc_meta = gst_buffer_get_video_time_code_meta (buffer))
@@ -2946,11 +2955,9 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
   GstQTMuxClass *qtmux_klass = (GstQTMuxClass *) (G_OBJECT_GET_CLASS (qtmux));
   GstFlowReturn ret = GST_FLOW_OK;
   GstCaps *caps;
-  GstSegment segment;
-  gchar s_id[32];
   GstClockTime reserved_max_duration;
   guint reserved_bytes_per_sec_per_trak;
-  GSList *walk;
+  GList *l;
 
   GST_DEBUG_OBJECT (qtmux, "starting file");
 
@@ -2959,15 +2966,12 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
   reserved_bytes_per_sec_per_trak = qtmux->reserved_bytes_per_sec_per_trak;
   GST_OBJECT_UNLOCK (qtmux);
 
-  /* stream-start (FIXME: create id based on input ids) */
-  g_snprintf (s_id, sizeof (s_id), "qtmux-%08x", g_random_int ());
-  gst_pad_push_event (qtmux->srcpad, gst_event_new_stream_start (s_id));
-
-  caps = gst_caps_copy (gst_pad_get_pad_template_caps (qtmux->srcpad));
+  caps =
+      gst_caps_copy (gst_pad_get_pad_template_caps (GST_AGGREGATOR_SRC_PAD
+          (qtmux)));
   /* qtmux has structure with and without variant, remove all but the first */
-  while (gst_caps_get_size (caps) > 1)
-    gst_caps_remove_structure (caps, 1);
-  gst_pad_set_caps (qtmux->srcpad, caps);
+  g_assert (gst_caps_truncate (caps));
+  gst_aggregator_set_src_caps (GST_AGGREGATOR (qtmux), caps);
   gst_caps_unref (caps);
 
   /* Default is 'normal' mode */
@@ -3033,23 +3037,17 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
       break;
   }
 
-  /* let downstream know we think in BYTES and expect to do seeking later on */
-  gst_segment_init (&segment, GST_FORMAT_BYTES);
-  gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
-
   GST_OBJECT_LOCK (qtmux);
 
   if (qtmux->timescale == 0) {
     guint32 suggested_timescale = 0;
-    GSList *walk;
 
     /* Calculate a reasonable timescale for the moov:
      * If there is video, it is the biggest video track timescale or an even
      * multiple of it if it's smaller than 1800.
      * Otherwise it is 1800 */
-    for (walk = qtmux->sinkpads; walk; walk = g_slist_next (walk)) {
-      GstCollectData *cdata = (GstCollectData *) walk->data;
-      GstQTPad *qpad = (GstQTPad *) cdata;
+    for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+      GstQTMuxPad *qpad = (GstQTMuxPad *) l->data;
 
       if (!qpad->trak)
         continue;
@@ -3076,11 +3074,10 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
   {
     guint video_width = 0, video_height = 0;
     guint32 video_timescale = 0;
-    GSList *walk;
+    GList *l;
 
-    for (walk = qtmux->sinkpads; walk; walk = g_slist_next (walk)) {
-      GstCollectData *cdata = (GstCollectData *) walk->data;
-      GstQTPad *qpad = (GstQTPad *) cdata;
+    for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+      GstQTMuxPad *qpad = (GstQTMuxPad *) l->data;
 
       if (!qpad->trak)
         continue;
@@ -3090,11 +3087,10 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
         continue;
 
       if (video_width == 0 || video_height == 0 || video_timescale == 0) {
-        GSList *walk2;
+        GList *l2;
 
-        for (walk2 = qtmux->sinkpads; walk2; walk2 = g_slist_next (walk2)) {
-          GstCollectData *cdata2 = (GstCollectData *) walk2->data;
-          GstQTPad *qpad2 = (GstQTPad *) cdata2;
+        for (l2 = GST_ELEMENT_CAST (qtmux)->sinkpads; l2; l2 = l2->next) {
+          GstQTMuxPad *qpad2 = (GstQTMuxPad *) l2->data;
 
           if (!qpad2->trak)
             continue;
@@ -3270,14 +3266,15 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
       /* last_moov_size now contains the full size of the moov, moov_pos the
        * position. This allows us to rewrite it in the very end as needed */
       qtmux->reserved_moov_size =
-          qtmux->last_moov_size + 12 * g_slist_length (qtmux->sinkpads) + 8;
+          qtmux->last_moov_size +
+          12 * g_list_length (GST_ELEMENT (qtmux)->sinkpads) + 8;
 
       /* Send an additional free atom at the end so we definitely have space
        * to rewrite the moov header at the end and remove the samples that
        * were not actually written */
       ret =
           gst_qt_mux_send_free_atom (qtmux, &qtmux->header_size,
-          12 * g_slist_length (qtmux->sinkpads) + 8, FALSE);
+          12 * g_list_length (GST_ELEMENT (qtmux)->sinkpads) + 8, FALSE);
       if (ret != GST_FLOW_OK)
         return ret;
 
@@ -3303,14 +3300,14 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
 
         gst_segment_init (&segment, GST_FORMAT_BYTES);
         segment.start = qtmux->moov_pos;
-        gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+        gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
 
         ret = gst_qt_mux_send_moov (qtmux, NULL, 0, FALSE, FALSE);
         if (ret != GST_FLOW_OK)
           return ret;
 
         segment.start = qtmux->header_size;
-        gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+        gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
       }
 
       qtmux->current_chunk_size = 0;
@@ -3320,9 +3317,8 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
       qtmux->current_pad = NULL;
       qtmux->longest_chunk = GST_CLOCK_TIME_NONE;
 
-      for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-        GstCollectData *cdata = (GstCollectData *) walk->data;
-        GstQTPad *qtpad = (GstQTPad *) cdata;
+      for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+        GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
         qtpad->total_bytes = 0;
         qtpad->total_duration = 0;
@@ -3399,28 +3395,26 @@ static GstFlowReturn
 gst_qt_mux_send_last_buffers (GstQTMux * qtmux)
 {
   GstFlowReturn ret = GST_FLOW_OK;
-  GSList *walk;
+  GList *l;
 
-  for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qtpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
     /* avoid add_buffer complaining if not negotiated
      * in which case no buffers either, so skipping */
     if (!qtpad->fourcc) {
       GST_DEBUG_OBJECT (qtmux, "Pad %s has never had buffers",
-          GST_PAD_NAME (qtpad->collect.pad));
+          GST_PAD_NAME (qtpad));
       continue;
     }
 
     /* send last buffer; also flushes possibly queued buffers/ts */
     GST_DEBUG_OBJECT (qtmux, "Sending the last buffer for pad %s",
-        GST_PAD_NAME (qtpad->collect.pad));
+        GST_PAD_NAME (qtpad));
     ret = gst_qt_mux_add_buffer (qtmux, qtpad, NULL);
     if (ret != GST_FLOW_OK) {
       GST_WARNING_OBJECT (qtmux, "Failed to send last buffer for %s, "
-          "flow return: %s", GST_PAD_NAME (qtpad->collect.pad),
-          gst_flow_get_name (ret));
+          "flow return: %s", GST_PAD_NAME (qtpad), gst_flow_get_name (ret));
     }
   }
 
@@ -3430,7 +3424,7 @@ gst_qt_mux_send_last_buffers (GstQTMux * qtmux)
 static void
 gst_qt_mux_update_global_statistics (GstQTMux * qtmux)
 {
-  GSList *walk;
+  GList *l;
 
   /* for setting some subtitles fields */
   guint max_width = 0;
@@ -3438,13 +3432,12 @@ gst_qt_mux_update_global_statistics (GstQTMux * qtmux)
 
   qtmux->first_ts = qtmux->last_dts = GST_CLOCK_TIME_NONE;
 
-  for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qtpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
     if (!qtpad->fourcc) {
       GST_DEBUG_OBJECT (qtmux, "Pad %s has never had buffers",
-          GST_PAD_NAME (qtpad->collect.pad));
+          GST_PAD_NAME (qtpad));
       continue;
     }
 
@@ -3494,13 +3487,12 @@ gst_qt_mux_update_global_statistics (GstQTMux * qtmux)
 
   /* need to update values on subtitle traks now that we know the
    * max width and height */
-  for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qtpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
     if (!qtpad->fourcc) {
       GST_DEBUG_OBJECT (qtmux, "Pad %s has never had buffers",
-          GST_PAD_NAME (qtpad->collect.pad));
+          GST_PAD_NAME (qtpad));
       continue;
     }
 
@@ -3515,16 +3507,15 @@ gst_qt_mux_update_global_statistics (GstQTMux * qtmux)
 static void
 gst_qt_mux_update_edit_lists (GstQTMux * qtmux)
 {
-  GSList *walk;
+  GList *l;
 
   GST_DEBUG_OBJECT (qtmux, "Media first ts selected: %" GST_TIME_FORMAT,
       GST_TIME_ARGS (qtmux->first_ts));
   /* add/update EDTSs for late streams. configure_moov will have
    * set the trak durations above by summing the sample tables,
    * here we extend that if needing to insert an empty segment */
-  for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-    GstCollectData *cdata = (GstCollectData *) walk->data;
-    GstQTPad *qtpad = (GstQTPad *) cdata;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
     atom_trak_edts_clear (qtpad->trak);
 
@@ -3551,7 +3542,7 @@ gst_qt_mux_update_edit_lists (GstQTMux * qtmux)
         if (trak_lateness > 0 && diff > qtmux->start_gap_threshold) {
           GST_DEBUG_OBJECT (qtmux,
               "Pad %s is a late stream by %" GST_TIME_FORMAT,
-              GST_PAD_NAME (qtpad->collect.pad), GST_TIME_ARGS (diff));
+              GST_PAD_NAME (qtpad), GST_TIME_ARGS (diff));
 
           atom_trak_set_elst_entry (qtpad->trak, 0, lateness, (guint32) - 1,
               (guint32) (1 * 65536.0));
@@ -3599,7 +3590,7 @@ gst_qt_mux_update_edit_lists (GstQTMux * qtmux)
 }
 
 static GstFlowReturn
-gst_qt_mux_update_timecode (GstQTMux * qtmux, GstQTPad * qtpad)
+gst_qt_mux_update_timecode (GstQTMux * qtmux, GstQTMuxPad * qtpad)
 {
   GstSegment segment;
   GstBuffer *buf;
@@ -3614,7 +3605,7 @@ gst_qt_mux_update_timecode (GstQTMux * qtmux, GstQTPad * qtpad)
 
   gst_segment_init (&segment, GST_FORMAT_BYTES);
   segment.start = offset;
-  gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+  gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
 
   buf = gst_buffer_new_and_alloc (4);
   gst_buffer_map (buf, &map, GST_MAP_WRITE);
@@ -3635,7 +3626,7 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
   gboolean ret = GST_FLOW_OK;
   guint64 offset = 0, size = 0;
   gboolean large_file;
-  GSList *walk;
+  GList *l;
 
   GST_DEBUG_OBJECT (qtmux, "Updating remaining values and sending last data");
 
@@ -3650,8 +3641,8 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
   }
 
   gst_qt_mux_update_global_statistics (qtmux);
-  for (walk = qtmux->collect->data; walk; walk = walk->next) {
-    GstQTPad *qtpad = (GstQTPad *) walk->data;
+  for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+    GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
 
     if (qtpad->tc_pos != -1) {
       /* File is being stopped and timecode hasn't been updated. Update it now
@@ -3689,7 +3680,7 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
       /* seek and rewrite the header */
       gst_segment_init (&segment, GST_FORMAT_BYTES);
       segment.start = qtmux->moov_pos;
-      gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+      gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
       /* no need to seek back */
       return gst_qt_mux_send_moov (qtmux, NULL, 0, FALSE, FALSE);
     }
@@ -3704,12 +3695,11 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
           qtmux->mdat_size, NULL, TRUE);
     }
     case GST_QT_MUX_MODE_ROBUST_RECORDING_PREFILL:{
-      GSList *walk;
+      GList *l;
       guint32 next_track_id = qtmux->moov->mvhd.next_track_id;
 
-      for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-        GstCollectData *cdata = (GstCollectData *) walk->data;
-        GstQTPad *qpad = (GstQTPad *) cdata;
+      for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+        GstQTMuxPad *qpad = (GstQTMuxPad *) l->data;
         guint64 block_idx;
         AtomSTBL *stbl = &qpad->trak->mdia.minf.stbl;
 
@@ -3838,9 +3828,8 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
        * reserved for this in the moov and the pre-finalized moov would have
        * broken A/V synchronization. Error out here now
        */
-      for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-        GstCollectData *cdata = (GstCollectData *) walk->data;
-        GstQTPad *qpad = (GstQTPad *) cdata;
+      for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+        GstQTMuxPad *qpad = (GstQTMuxPad *) l->data;
 
         if (qpad->trak->edts
             && g_slist_length (qpad->trak->edts->elst.entries) > 1) {
@@ -3859,7 +3848,7 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
 
         gst_segment_init (&segment, GST_FORMAT_BYTES);
         segment.start = qtmux->moov_pos;
-        gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+        gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
 
         ret =
             gst_qt_mux_send_moov (qtmux, NULL, qtmux->reserved_moov_size, FALSE,
@@ -3985,7 +3974,7 @@ ftyp_error:
 }
 
 static GstFlowReturn
-gst_qt_mux_pad_fragment_add_buffer (GstQTMux * qtmux, GstQTPad * pad,
+gst_qt_mux_pad_fragment_add_buffer (GstQTMux * qtmux, GstQTMuxPad * pad,
     GstBuffer * buf, gboolean force, guint32 nsamples, gint64 dts,
     guint32 delta, guint32 size, gboolean sync, gint64 pts_offset)
 {
@@ -4196,7 +4185,7 @@ gst_qt_mux_robust_recording_rewrite_moov (GstQTMux * qtmux)
   /* seek and rewrite the MOOV atom */
   gst_segment_init (&segment, GST_FORMAT_BYTES);
   segment.start = new_moov_offset;
-  gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+  gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
 
   ret =
       gst_qt_mux_send_moov (qtmux, NULL, qtmux->reserved_moov_size, FALSE,
@@ -4242,7 +4231,7 @@ gst_qt_mux_robust_recording_rewrite_moov (GstQTMux * qtmux)
    * where they need after this, or they don't need it */
   gst_segment_init (&segment, GST_FORMAT_BYTES);
   segment.start = freeA_offset;
-  gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+  gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
 
   ret = gst_qt_mux_send_free_atom (qtmux, NULL, new_freeA_size, TRUE);
 
@@ -4298,13 +4287,13 @@ gst_qt_mux_robust_recording_update (GstQTMux * qtmux, GstClockTime position)
   /* Seek back to previous position */
   gst_segment_init (&segment, GST_FORMAT_BYTES);
   segment.start = mdat_offset;
-  gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+  gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
 
   return flow_ret;
 }
 
 static GstFlowReturn
-gst_qt_mux_register_and_push_sample (GstQTMux * qtmux, GstQTPad * pad,
+gst_qt_mux_register_and_push_sample (GstQTMux * qtmux, GstQTMuxPad * pad,
     GstBuffer * buffer, gboolean is_last_buffer, guint nsamples,
     gint64 last_dts, gint64 scaled_duration, guint sample_size,
     guint64 chunk_offset, gboolean sync, gboolean do_pts, gint64 pts_offset)
@@ -4398,7 +4387,7 @@ gst_qt_mux_register_and_push_sample (GstQTMux * qtmux, GstQTPad * pad,
 }
 
 static void
-gst_qt_mux_register_buffer_in_chunk (GstQTMux * qtmux, GstQTPad * pad,
+gst_qt_mux_register_buffer_in_chunk (GstQTMux * qtmux, GstQTMuxPad * pad,
     guint buffer_size, GstClockTime duration)
 {
   /* not that much happens here,
@@ -4415,7 +4404,7 @@ gst_qt_mux_register_buffer_in_chunk (GstQTMux * qtmux, GstQTPad * pad,
 }
 
 static GstFlowReturn
-gst_qt_mux_check_and_update_timecode (GstQTMux * qtmux, GstQTPad * pad,
+gst_qt_mux_check_and_update_timecode (GstQTMux * qtmux, GstQTMuxPad * pad,
     GstBuffer * buf, GstFlowReturn ret)
 {
   GstVideoTimeCodeMeta *tc_meta;
@@ -4519,7 +4508,7 @@ gst_qt_mux_check_and_update_timecode (GstQTMux * qtmux, GstQTPad * pad,
       /* Reset writing position */
       gst_segment_init (&segment, GST_FORMAT_BYTES);
       segment.start = bk_size;
-      gst_pad_push_event (qtmux->srcpad, gst_event_new_segment (&segment));
+      gst_aggregator_update_segment (GST_AGGREGATOR (qtmux), &segment);
     }
   }
 
@@ -4530,7 +4519,7 @@ gst_qt_mux_check_and_update_timecode (GstQTMux * qtmux, GstQTPad * pad,
  * Here we push the buffer and update the tables in the track atoms
  */
 static GstFlowReturn
-gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
+gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTMuxPad * pad, GstBuffer * buf)
 {
   GstBuffer *last_buf = NULL;
   GstClockTime duration;
@@ -4542,6 +4531,13 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
   GstFlowReturn ret = GST_FLOW_OK;
   guint buffer_size;
 
+  /* GAP event, nothing to do */
+  if (buf && gst_buffer_get_size (buf) == 0 &&
+      GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP)) {
+    gst_buffer_unref (buf);
+    return GST_FLOW_OK;
+  }
+
   if (!pad->fourcc)
     goto not_negotiated;
 
@@ -4569,12 +4565,11 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
 #ifndef GST_DISABLE_GST_DEBUG
     if (buf == NULL) {
       GST_DEBUG_OBJECT (qtmux, "Pad %s has no previous buffer stored and "
-          "received NULL buffer, doing nothing",
-          GST_PAD_NAME (pad->collect.pad));
+          "received NULL buffer, doing nothing", GST_PAD_NAME (pad));
     } else {
       GST_LOG_OBJECT (qtmux,
           "Pad %s has no previous buffer stored, storing now",
-          GST_PAD_NAME (pad->collect.pad));
+          GST_PAD_NAME (pad));
     }
 #endif
     goto exit;
@@ -4601,12 +4596,11 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
       GST_DEBUG ("setting first_ts to %" G_GUINT64_FORMAT, pad->first_ts);
     } else {
       GST_WARNING_OBJECT (qtmux, "First buffer for pad %s has no timestamp, "
-          "using 0 as first timestamp", GST_PAD_NAME (pad->collect.pad));
+          "using 0 as first timestamp", GST_PAD_NAME (pad));
       pad->first_ts = pad->first_dts = 0;
     }
     GST_DEBUG_OBJECT (qtmux, "Stored first timestamp for pad %s %"
-        GST_TIME_FORMAT, GST_PAD_NAME (pad->collect.pad),
-        GST_TIME_ARGS (pad->first_ts));
+        GST_TIME_FORMAT, GST_PAD_NAME (pad), GST_TIME_ARGS (pad->first_ts));
   }
 
   if (buf && GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) &&
@@ -4669,7 +4663,7 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
     GST_DEBUG_OBJECT (qtmux,
         "Switching to next chunk for pad %s:%s: offset %" G_GUINT64_FORMAT
         ", size %" G_GUINT64_FORMAT ", duration %" GST_TIME_FORMAT,
-        GST_DEBUG_PAD_NAME (pad->collect.pad), qtmux->current_chunk_offset,
+        GST_DEBUG_PAD_NAME (pad), qtmux->current_chunk_offset,
         qtmux->current_chunk_size,
         GST_TIME_ARGS (qtmux->current_chunk_duration));
     qtmux->current_pad = pad;
@@ -4760,7 +4754,7 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
 
   GST_LOG_OBJECT (qtmux,
       "Pad (%s) dts updated to %" GST_TIME_FORMAT,
-      GST_PAD_NAME (pad->collect.pad), GST_TIME_ARGS (pad->last_dts));
+      GST_PAD_NAME (pad), GST_TIME_ARGS (pad->last_dts));
   GST_LOG_OBJECT (qtmux,
       "Adding %d samples to track, duration: %" G_GUINT64_FORMAT
       " size: %" G_GUINT32_FORMAT " chunk offset: %" G_GUINT64_FORMAT,
@@ -4770,7 +4764,7 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
   if (pad->sync &&
       !GST_BUFFER_FLAG_IS_SET (last_buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
     GST_LOG_OBJECT (qtmux, "Adding new sync sample entry for track of pad %s",
-        GST_PAD_NAME (pad->collect.pad));
+        GST_PAD_NAME (pad));
     sync = TRUE;
   }
 
@@ -4796,8 +4790,7 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
           || !GST_CLOCK_TIME_IS_VALID (qtmux->longest_chunk))) {
     GST_DEBUG_OBJECT (qtmux,
         "New longest chunk found: %" GST_TIME_FORMAT ", pad %s",
-        GST_TIME_ARGS (qtmux->current_chunk_duration),
-        GST_PAD_NAME (pad->collect.pad));
+        GST_TIME_ARGS (qtmux->current_chunk_duration), GST_PAD_NAME (pad));
     qtmux->longest_chunk = qtmux->current_chunk_duration;
   }
 
@@ -4878,7 +4871,7 @@ gst_qt_mux_add_buffer (GstQTMux * qtmux, GstQTPad * pad, GstBuffer * buf)
       g_assert_not_reached ();
       GST_WARNING_OBJECT (qtmux,
           "no empty buffer creation function found for pad %s",
-          GST_PAD_NAME (pad->collect.pad));
+          GST_PAD_NAME (pad));
     }
   }
 
@@ -4919,7 +4912,7 @@ not_negotiated:
   {
     GST_ELEMENT_ERROR (qtmux, CORE, NEGOTIATION, (NULL),
         ("format wasn't negotiated before buffer flow on pad %s",
-            GST_PAD_NAME (pad->collect.pad)));
+            GST_PAD_NAME (pad)));
     if (buf)
       gst_buffer_unref (buf);
     return GST_FLOW_NOT_NEGOTIATED;
@@ -4936,17 +4929,17 @@ sample_error:
  * MP4 however, thus we need to offset DTS so that it starts from 0.
  */
 static void
-gst_qt_pad_adjust_buffer_dts (GstQTMux * qtmux, GstQTPad * pad,
-    GstCollectData * cdata, GstBuffer ** buf)
+gst_qt_pad_adjust_buffer_dts (GstQTMux * qtmux, GstQTMuxPad * pad,
+    GstBuffer ** buf)
 {
   GstClockTime pts;
   gint64 dts;
 
   pts = GST_BUFFER_PTS (*buf);
-  dts = GST_COLLECT_PADS_DTS (cdata);
+  dts = pad->dts;
 
   GST_LOG_OBJECT (qtmux, "selected pad %s with PTS %" GST_TIME_FORMAT
-      " and DTS %" GST_STIME_FORMAT, GST_PAD_NAME (cdata->pad),
+      " and DTS %" GST_STIME_FORMAT, GST_PAD_NAME (pad),
       GST_TIME_ARGS (pts), GST_STIME_ARGS (dts));
 
   if (!GST_CLOCK_TIME_IS_VALID (pad->dts_adjustment)) {
@@ -4982,24 +4975,23 @@ gst_qt_pad_adjust_buffer_dts (GstQTMux * qtmux, GstQTPad * pad,
   }
 }
 
-static GstQTPad *
-find_best_pad (GstQTMux * qtmux, GstCollectPads * pads)
+static GstQTMuxPad *
+find_best_pad (GstQTMux * qtmux)
 {
-  GSList *walk;
-  GstQTPad *best_pad = NULL;
+  GList *l;
+  GstQTMuxPad *best_pad = NULL;
 
   if (qtmux->mux_mode == GST_QT_MUX_MODE_ROBUST_RECORDING_PREFILL) {
     guint64 smallest_offset = G_MAXUINT64;
     guint64 chunk_offset = 0;
 
-    for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-      GstCollectData *cdata = (GstCollectData *) walk->data;
-      GstQTPad *qtpad = (GstQTPad *) cdata;
+    for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+      GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
       const TrakBufferEntryInfo *sample_entry;
       guint64 block_idx, current_block_idx;
       guint64 chunk_offset_offset = 0;
       GstBuffer *tmp_buf =
-          gst_collect_pads_peek (pads, (GstCollectData *) qtpad);
+          gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (qtpad));
 
       /* Check for EOS pads and just skip them */
       if (!tmp_buf && !qtpad->last_buf && (!qtpad->raw_audio_adapter
@@ -5052,36 +5044,36 @@ find_best_pad (GstQTMux * qtmux, GstCollectPads * pads)
       && qtmux->mux_mode != GST_QT_MUX_MODE_FRAGMENTED
       && qtmux->mux_mode != GST_QT_MUX_MODE_FRAGMENTED_STREAMABLE) {
     GstBuffer *tmp_buf =
-        gst_collect_pads_peek (pads, (GstCollectData *) qtmux->current_pad);
+        gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD
+        (qtmux->current_pad));
 
     if (tmp_buf || qtmux->current_pad->last_buf) {
       best_pad = qtmux->current_pad;
       if (tmp_buf)
         gst_buffer_unref (tmp_buf);
       GST_DEBUG_OBJECT (qtmux, "Reusing pad %s:%s",
-          GST_DEBUG_PAD_NAME (best_pad->collect.pad));
+          GST_DEBUG_PAD_NAME (best_pad));
     }
-  } else if (qtmux->collect->data->next) {
+  } else if (GST_ELEMENT (qtmux)->sinkpads->next) {
     /* Only switch pads if we have more than one, otherwise
      * we can just put everything into a single chunk and save
      * a few bytes of offsets
      */
     if (qtmux->current_pad)
       GST_DEBUG_OBJECT (qtmux, "Switching from pad %s:%s",
-          GST_DEBUG_PAD_NAME (qtmux->current_pad->collect.pad));
+          GST_DEBUG_PAD_NAME (qtmux->current_pad));
     best_pad = qtmux->current_pad = NULL;
   }
 
   if (!best_pad) {
     GstClockTime best_time = GST_CLOCK_TIME_NONE;
 
-    for (walk = qtmux->collect->data; walk; walk = g_slist_next (walk)) {
-      GstCollectData *cdata = (GstCollectData *) walk->data;
-      GstQTPad *qtpad = (GstQTPad *) cdata;
+    for (l = GST_ELEMENT_CAST (qtmux)->sinkpads; l; l = l->next) {
+      GstQTMuxPad *qtpad = (GstQTMuxPad *) l->data;
       GstBuffer *tmp_buf;
       GstClockTime timestamp;
 
-      tmp_buf = gst_collect_pads_peek (pads, cdata);
+      tmp_buf = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (qtpad));;
       if (!tmp_buf) {
         /* This one is newly EOS now, finish it for real */
         if (qtpad->last_buf) {
@@ -5108,7 +5100,7 @@ find_best_pad (GstQTMux * qtmux, GstCollectPads * pads)
 
     if (best_pad) {
       GST_DEBUG_OBJECT (qtmux, "Choosing pad %s:%s",
-          GST_DEBUG_PAD_NAME (best_pad->collect.pad));
+          GST_DEBUG_PAD_NAME (best_pad));
     } else {
       GST_DEBUG_OBJECT (qtmux, "No best pad: EOS");
     }
@@ -5117,12 +5109,24 @@ find_best_pad (GstQTMux * qtmux, GstCollectPads * pads)
   return best_pad;
 }
 
+static gboolean
+gst_qt_mux_are_all_pads_eos (GstQTMux * mux)
+{
+  GList *l;
+
+  for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
+    if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (l->data)))
+      return FALSE;
+  }
+  return TRUE;
+}
+
 static GstFlowReturn
-gst_qt_mux_collected (GstCollectPads * pads, gpointer user_data)
+gst_qt_mux_aggregate (GstAggregator * agg, gboolean timeout)
 {
   GstFlowReturn ret = GST_FLOW_OK;
-  GstQTMux *qtmux = GST_QT_MUX_CAST (user_data);
-  GstQTPad *best_pad = NULL;
+  GstQTMux *qtmux = GST_QT_MUX_CAST (agg);
+  GstQTMuxPad *best_pad = NULL;
 
   if (G_UNLIKELY (qtmux->state == GST_QT_MUX_STATE_STARTED)) {
     if ((ret = gst_qt_mux_start_file (qtmux)) != GST_FLOW_OK)
@@ -5134,7 +5138,7 @@ gst_qt_mux_collected (GstCollectPads * pads, gpointer user_data)
   if (G_UNLIKELY (qtmux->state == GST_QT_MUX_STATE_EOS))
     return GST_FLOW_EOS;
 
-  best_pad = find_best_pad (qtmux, pads);
+  best_pad = find_best_pad (qtmux);
 
   /* clipping already converted to running time */
   if (best_pad != NULL) {
@@ -5149,22 +5153,21 @@ gst_qt_mux_collected (GstCollectPads * pads, gpointer user_data)
     if (qtmux->mux_mode != GST_QT_MUX_MODE_ROBUST_RECORDING_PREFILL ||
         best_pad->raw_audio_adapter == NULL ||
         best_pad->raw_audio_adapter_pts == GST_CLOCK_TIME_NONE)
-      buf = gst_collect_pads_pop (pads, (GstCollectData *) best_pad);
+      buf = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best_pad));
 
     g_assert (buf || best_pad->last_buf || (best_pad->raw_audio_adapter
             && gst_adapter_available (best_pad->raw_audio_adapter) > 0));
 
     if (buf)
-      gst_qt_pad_adjust_buffer_dts (qtmux, best_pad,
-          (GstCollectData *) best_pad, &buf);
+      gst_qt_pad_adjust_buffer_dts (qtmux, best_pad, &buf);
 
     ret = gst_qt_mux_add_buffer (qtmux, best_pad, buf);
-  } else {
+  } else if (gst_qt_mux_are_all_pads_eos (qtmux)) {
+
     qtmux->state = GST_QT_MUX_STATE_EOS;
     ret = gst_qt_mux_stop_file (qtmux);
     if (ret == GST_FLOW_OK) {
-      GST_DEBUG_OBJECT (qtmux, "Pushing eos");
-      gst_pad_push_event (qtmux->srcpad, gst_event_new_eos ());
+      GST_DEBUG_OBJECT (qtmux, "We are eos");
       ret = GST_FLOW_EOS;
     } else {
       GST_WARNING_OBJECT (qtmux, "Failed to stop file: %s",
@@ -5226,9 +5229,9 @@ gst_qt_mux_can_renegotiate (GstQTMux * qtmux, GstPad * pad, GstCaps * caps)
 }
 
 static gboolean
-gst_qt_mux_audio_sink_set_caps (GstQTPad * qtpad, GstCaps * caps)
+gst_qt_mux_audio_sink_set_caps (GstQTMuxPad * qtpad, GstCaps * caps)
 {
-  GstPad *pad = qtpad->collect.pad;
+  GstPad *pad = GST_PAD (qtpad);
   GstQTMux *qtmux = GST_QT_MUX_CAST (gst_pad_get_parent (pad));
   GstQTMuxClass *qtmux_klass = (GstQTMuxClass *) (G_OBJECT_GET_CLASS (qtmux));
   GstStructure *structure;
@@ -5573,9 +5576,9 @@ refuse_caps:
 }
 
 static gboolean
-gst_qt_mux_video_sink_set_caps (GstQTPad * qtpad, GstCaps * caps)
+gst_qt_mux_video_sink_set_caps (GstQTMuxPad * qtpad, GstCaps * caps)
 {
-  GstPad *pad = qtpad->collect.pad;
+  GstPad *pad = GST_PAD (qtpad);
   GstQTMux *qtmux = GST_QT_MUX_CAST (gst_pad_get_parent (pad));
   GstQTMuxClass *qtmux_klass = (GstQTMuxClass *) (G_OBJECT_GET_CLASS (qtmux));
   GstStructure *structure;
@@ -6131,9 +6134,9 @@ refuse_caps:
 }
 
 static gboolean
-gst_qt_mux_subtitle_sink_set_caps (GstQTPad * qtpad, GstCaps * caps)
+gst_qt_mux_subtitle_sink_set_caps (GstQTMuxPad * qtpad, GstCaps * caps)
 {
-  GstPad *pad = qtpad->collect.pad;
+  GstPad *pad = GST_PAD (qtpad);
   GstQTMux *qtmux = GST_QT_MUX_CAST (gst_pad_get_parent (pad));
   GstStructure *structure;
   SubtitleSampleEntry entry = { 0, };
@@ -6184,9 +6187,9 @@ refuse_caps:
 }
 
 static gboolean
-gst_qt_mux_caption_sink_set_caps (GstQTPad * qtpad, GstCaps * caps)
+gst_qt_mux_caption_sink_set_caps (GstQTMuxPad * qtpad, GstCaps * caps)
 {
-  GstPad *pad = qtpad->collect.pad;
+  GstPad *pad = GST_PAD (qtpad);
   GstQTMux *qtmux = GST_QT_MUX_CAST (gst_pad_get_parent (pad));
   GstStructure *structure;
   guint32 fourcc_entry;
@@ -6247,29 +6250,30 @@ refuse_caps:
 }
 
 static gboolean
-gst_qt_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
-    GstEvent * event, gpointer user_data)
+gst_qt_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
+    GstEvent * event)
 {
+  GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
+  GstQTMuxPad *qtmux_pad;
   GstQTMux *qtmux;
   guint32 avg_bitrate = 0, max_bitrate = 0;
-  GstPad *pad = data->pad;
+  GstPad *pad = GST_PAD (agg_pad);
   gboolean ret = TRUE;
 
-  qtmux = GST_QT_MUX_CAST (user_data);
+  qtmux = GST_QT_MUX_CAST (agg);
+  qtmux_pad = GST_QT_MUX_PAD_CAST (agg_pad);
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_CAPS:
     {
       GstCaps *caps;
-      GstQTPad *collect_pad;
 
       gst_event_parse_caps (event, &caps);
 
       /* find stream data */
-      collect_pad = (GstQTPad *) gst_pad_get_element_private (pad);
-      g_assert (collect_pad);
-      g_assert (collect_pad->set_caps);
+      g_assert (qtmux_pad->set_caps);
 
-      ret = collect_pad->set_caps (collect_pad, caps);
+      ret = qtmux_pad->set_caps (qtmux_pad, caps);
       gst_event_unref (event);
       event = NULL;
       break;
@@ -6279,11 +6283,9 @@ gst_qt_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
       GstTagSetter *setter = GST_TAG_SETTER (qtmux);
       GstTagMergeMode mode;
       gchar *code;
-      GstQTPad *collect_pad;
 
       GST_OBJECT_LOCK (qtmux);
       mode = gst_tag_setter_get_tag_merge_mode (setter);
-      collect_pad = (GstQTPad *) gst_pad_get_element_private (pad);
 
       gst_event_parse_tag (event, &list);
       GST_DEBUG_OBJECT (qtmux, "received tag event on pad %s:%s : %"
@@ -6293,32 +6295,27 @@ gst_qt_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
         gst_tag_setter_merge_tags (setter, list, mode);
         qtmux->tags_changed = TRUE;
       } else {
-        if (!collect_pad->tags)
-          collect_pad->tags = gst_tag_list_new_empty ();
-        gst_tag_list_insert (collect_pad->tags, list, mode);
-        collect_pad->tags_changed = TRUE;
+        if (!qtmux_pad->tags)
+          qtmux_pad->tags = gst_tag_list_new_empty ();
+        gst_tag_list_insert (qtmux_pad->tags, list, mode);
+        qtmux_pad->tags_changed = TRUE;
       }
       GST_OBJECT_UNLOCK (qtmux);
 
       if (gst_tag_list_get_uint (list, GST_TAG_BITRATE, &avg_bitrate) |
           gst_tag_list_get_uint (list, GST_TAG_MAXIMUM_BITRATE, &max_bitrate)) {
-        GstQTPad *qtpad = gst_pad_get_element_private (pad);
-        g_assert (qtpad);
-
         if (avg_bitrate > 0 && avg_bitrate < G_MAXUINT32)
-          qtpad->avg_bitrate = avg_bitrate;
+          qtmux_pad->avg_bitrate = avg_bitrate;
         if (max_bitrate > 0 && max_bitrate < G_MAXUINT32)
-          qtpad->max_bitrate = max_bitrate;
+          qtmux_pad->max_bitrate = max_bitrate;
       }
 
       if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &code)) {
         const char *iso_code = gst_tag_get_language_code_iso_639_2T (code);
         if (iso_code) {
-          GstQTPad *qtpad = gst_pad_get_element_private (pad);
-          g_assert (qtpad);
-          if (qtpad->trak) {
+          if (qtmux_pad->trak) {
             /* https://developer.apple.com/library/mac/#documentation/QuickTime/QTFF/QTFFChap4/qtff4.html */
-            qtpad->trak->mdia.mdhd.language_code = language_code (iso_code);
+            qtmux_pad->trak->mdia.mdhd.language_code = language_code (iso_code);
           }
         }
         g_free (code);
@@ -6334,7 +6331,7 @@ gst_qt_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
   }
 
   if (event != NULL)
-    return gst_collect_pads_event_default (pads, data, event, FALSE);
+    ret = agg_class->sink_event (agg, agg_pad, event);
 
   return ret;
 }
@@ -6343,30 +6340,18 @@ static void
 gst_qt_mux_release_pad (GstElement * element, GstPad * pad)
 {
   GstQTMux *mux = GST_QT_MUX_CAST (element);
-  GSList *walk;
 
   GST_DEBUG_OBJECT (element, "Releasing %s:%s", GST_DEBUG_PAD_NAME (pad));
 
-  for (walk = mux->sinkpads; walk; walk = g_slist_next (walk)) {
-    GstQTPad *qtpad = (GstQTPad *) walk->data;
-    GST_DEBUG ("Checking %s:%s", GST_DEBUG_PAD_NAME (qtpad->collect.pad));
-    if (qtpad->collect.pad == pad) {
-      /* this is it, remove */
-      mux->sinkpads = g_slist_delete_link (mux->sinkpads, walk);
-      gst_element_remove_pad (element, pad);
-      break;
-    }
-  }
+  gst_element_remove_pad (element, pad);
 
-  if (mux->current_pad && mux->current_pad->collect.pad == pad) {
+  if (mux->current_pad && GST_PAD (mux->current_pad) == pad) {
     mux->current_pad = NULL;
     mux->current_chunk_size = 0;
     mux->current_chunk_duration = 0;
   }
 
-  gst_collect_pads_remove_pad (mux->collect, pad);
-
-  if (mux->sinkpads == NULL) {
+  if (GST_ELEMENT (mux)->sinkpads == NULL) {
     /* No more outstanding request pads, reset our counters */
     mux->video_pads = 0;
     mux->audio_pads = 0;
@@ -6374,18 +6359,24 @@ gst_qt_mux_release_pad (GstElement * element, GstPad * pad)
   }
 }
 
+static GstAggregatorPad *
+gst_qt_mux_create_new_pad (GstAggregator * self,
+    GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
+{
+  return g_object_new (GST_TYPE_QT_MUX_PAD, "name", req_name, "direction",
+      templ->direction, "template", templ, NULL);
+}
+
 static GstPad *
 gst_qt_mux_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
 {
   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
   GstQTMux *qtmux = GST_QT_MUX_CAST (element);
-  GstQTPad *collect_pad;
-  GstPad *newpad;
+  GstQTMuxPad *qtpad;
   GstQTPadSetCapsFunc setcaps_func;
   gchar *name;
   gint pad_id;
-  gboolean lock = TRUE;
 
   if (templ->direction != GST_PAD_SINK)
     goto wrong_direction;
@@ -6414,7 +6405,6 @@ gst_qt_mux_request_new_pad (GstElement * element,
     } else {
       name = g_strdup_printf ("subtitle_%u", qtmux->subtitle_pads++);
     }
-    lock = FALSE;
   } else if (templ == gst_element_class_get_pad_template (klass, "caption_%u")) {
     setcaps_func = gst_qt_mux_caption_sink_set_caps;
     if (req_name != NULL && sscanf (req_name, "caption_%u", &pad_id) == 1) {
@@ -6422,34 +6412,27 @@ gst_qt_mux_request_new_pad (GstElement * element,
     } else {
       name = g_strdup_printf ("caption_%u", qtmux->caption_pads++);
     }
-    lock = FALSE;
   } else
     goto wrong_template;
 
   GST_DEBUG_OBJECT (qtmux, "Requested pad: %s", name);
 
-  /* create pad and add to collections */
-  newpad =
-      g_object_new (GST_TYPE_QT_MUX_PAD, "name", name, "direction",
-      templ->direction, "template", templ, NULL);
+  qtpad = (GstQTMuxPad *)
+      GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
+      templ, name, caps);
+
   g_free (name);
-  collect_pad = (GstQTPad *)
-      gst_collect_pads_add_pad (qtmux->collect, newpad, sizeof (GstQTPad),
-      (GstCollectDataDestroyNotify) (gst_qt_mux_pad_reset), lock);
-  /* set up pad */
-  gst_qt_mux_pad_reset (collect_pad);
-  collect_pad->trak = atom_trak_new (qtmux->context);
-  atom_moov_add_trak (qtmux->moov, collect_pad->trak);
 
-  qtmux->sinkpads = g_slist_append (qtmux->sinkpads, collect_pad);
+  /* set up pad */
+  gst_qt_mux_pad_reset (qtpad);
+  qtpad->trak = atom_trak_new (qtmux->context);
+  atom_moov_add_trak (qtmux->moov, qtpad->trak);
 
   /* set up pad functions */
-  collect_pad->set_caps = setcaps_func;
+  qtpad->set_caps = setcaps_func;
+  qtpad->dts = G_MININT64;
 
-  gst_pad_set_active (newpad, TRUE);
-  gst_element_add_pad (element, newpad);
-
-  return newpad;
+  return GST_PAD (qtpad);
 
   /* ERRORS */
 wrong_direction:
@@ -6650,43 +6633,29 @@ gst_qt_mux_set_property (GObject * object,
   GST_OBJECT_UNLOCK (qtmux);
 }
 
-static GstStateChangeReturn
-gst_qt_mux_change_state (GstElement * element, GstStateChange transition)
+static gboolean
+gst_qt_mux_start (GstAggregator * agg)
 {
-  GstStateChangeReturn ret;
-  GstQTMux *qtmux = GST_QT_MUX_CAST (element);
+  GstQTMux *qtmux = GST_QT_MUX_CAST (agg);
+  GstSegment segment;
 
-  switch (transition) {
-    case GST_STATE_CHANGE_NULL_TO_READY:
-      break;
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
-      gst_collect_pads_start (qtmux->collect);
-      qtmux->state = GST_QT_MUX_STATE_STARTED;
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_collect_pads_stop (qtmux->collect);
-      break;
-    default:
-      break;
-  }
+  qtmux->state = GST_QT_MUX_STATE_STARTED;
 
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  /* let downstream know we think in BYTES and expect to do seeking later on */
+  gst_segment_init (&segment, GST_FORMAT_BYTES);
+  gst_aggregator_update_segment (agg, &segment);
 
-  switch (transition) {
-    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_qt_mux_reset (qtmux, TRUE);
-      break;
-    case GST_STATE_CHANGE_READY_TO_NULL:
-      break;
-    default:
-      break;
-  }
+  return TRUE;
+}
 
-  return ret;
+static gboolean
+gst_qt_mux_stop (GstAggregator * agg)
+{
+  GstQTMux *qtmux = GST_QT_MUX_CAST (agg);
+
+  gst_qt_mux_reset (qtmux, TRUE);
+
+  return TRUE;
 }
 
 gboolean
@@ -6750,7 +6719,8 @@ gst_qt_mux_register (GstPlugin * plugin)
     }
 
     /* create the type now */
-    type = g_type_register_static (GST_TYPE_ELEMENT, prop->type_name, &typeinfo,
+    type =
+        g_type_register_static (GST_TYPE_AGGREGATOR, prop->type_name, &typeinfo,
         0);
     g_type_set_qdata (type, GST_QT_MUX_PARAMS_QDATA, (gpointer) params);
     g_type_add_interface_static (type, GST_TYPE_TAG_SETTER, &tag_setter_info);
index 0e2fd75e9316ef7c476e6e23b117e0661d2cde61..d18871babec3b420f5c285713e592a54d4cc7c62 100644 (file)
@@ -44,7 +44,7 @@
 #define __GST_QT_MUX_H__
 
 #include <gst/gst.h>
-#include <gst/base/gstcollectpads.h>
+#include <gst/base/gstaggregator.h>
 
 #include "fourcc.h"
 #include "atoms.h"
@@ -63,7 +63,8 @@ G_BEGIN_DECLS
 
 typedef struct _GstQTMux GstQTMux;
 typedef struct _GstQTMuxClass GstQTMuxClass;
-typedef struct _GstQTPad GstQTPad;
+typedef struct _GstQTMuxPad GstQTMuxPad;
+typedef struct _GstQTMuxPadClass GstQTMuxPadClass;
 
 /*
  * GstQTPadPrepareBufferFunc
@@ -75,17 +76,31 @@ typedef struct _GstQTPad GstQTPad;
  * being muxed. (Originally added for image/x-jpc support, for which buffers
  * need to be wrapped into a isom box)
  */
-typedef GstBuffer * (*GstQTPadPrepareBufferFunc) (GstQTPad * pad,
+typedef GstBuffer * (*GstQTPadPrepareBufferFunc) (GstQTMuxPad * pad,
     GstBuffer * buf, GstQTMux * qtmux);
-
-typedef gboolean (*GstQTPadSetCapsFunc) (GstQTPad * pad, GstCaps * caps);
-typedef GstBuffer * (*GstQTPadCreateEmptyBufferFunc) (GstQTPad * pad, gint64 duration);
-
-#define QTMUX_NO_OF_TS   10
-
-struct _GstQTPad
+typedef gboolean (*GstQTPadSetCapsFunc) (GstQTMuxPad * pad, GstCaps * caps);
+typedef GstBuffer * (*GstQTPadCreateEmptyBufferFunc) (GstQTMuxPad * pad, gint64 duration);
+
+GType gst_qt_mux_pad_get_type (void);
+
+#define GST_TYPE_QT_MUX_PAD \
+  (gst_qt_mux_pad_get_type())
+#define GST_QT_MUX_PAD(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_QT_MUX_PAD, GstQTMuxPad))
+#define GST_QT_MUX_PAD_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_QT_MUX_PAD, GstQTMuxPadClass))
+#define GST_IS_QT_MUX_PAD(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_QT_MUX_PAD))
+#define GST_IS_QT_MUX_PAD_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_QT_MUX_PAD))
+#define GST_QT_MUX_PAD_CAST(obj) \
+  ((GstQTMuxPad *)(obj))
+
+struct _GstQTMuxPad
 {
-  GstCollectData collect;       /* we extend the CollectData */
+  GstAggregatorPad parent;
+
+  guint32 trak_timescale;
 
   /* fourcc id of stream */
   guint32 fourcc;
@@ -122,6 +137,8 @@ struct _GstQTPad
   GstClockTime first_ts;
   GstClockTime first_dts;
 
+  gint64 dts; /* the signed version of the DTS converted to running time. */
+
   /* all the atom and chunk book-keeping is delegated here
    * unowned/uncounted reference, parent MOOV owns */
   AtomTRAK *trak;
@@ -162,6 +179,13 @@ struct _GstQTPad
   GstFlowReturn flow_status;
 };
 
+struct _GstQTMuxPadClass
+{
+  GstAggregatorPadClass parent;
+};
+
+#define QTMUX_NO_OF_TS   10
+
 typedef enum _GstQTMuxState
 {
   GST_QT_MUX_STATE_NONE,
@@ -181,11 +205,7 @@ typedef enum _GstQtMuxMode {
 
 struct _GstQTMux
 {
-  GstElement element;
-
-  GstPad *srcpad;
-  GstCollectPads *collect;
-  GSList *sinkpads;
+  GstAggregator parent;
 
   /* state */
   GstQTMuxState state;
@@ -215,7 +235,7 @@ struct _GstQTMux
   GstClockTime last_dts;
 
   /* Last pad we used for writing the current chunk */
-  GstQTPad *current_pad;
+  GstQTMuxPad *current_pad;
   guint64 current_chunk_size;
   GstClockTime current_chunk_duration;
   guint64 current_chunk_offset;
@@ -299,7 +319,7 @@ struct _GstQTMux
 
 struct _GstQTMuxClass
 {
-  GstElementClass parent_class;
+  GstAggregatorClass parent_class;
 
   GstQTMuxFormat format;
 };
index 54fb642e49444f9ce2978ea308188f60c8d799bd..5da17acf515aea6a88d8208cfab9cc478dab2c27 100644 (file)
@@ -126,14 +126,14 @@ setup_src_pad (GstElement * element,
     sinkpad = gst_element_get_request_pad (element, sinkname);
   fail_if (sinkpad == NULL, "Could not get sink pad from %s",
       GST_ELEMENT_NAME (element));
-  /* references are owned by: 1) us, 2) qtmux, 3) collect pads */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+  /* references are owned by: 1) us, 2) qtmux */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
   fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
       "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
   gst_object_unref (sinkpad);   /* because we got it higher up */
 
-  /* references are owned by: 1) qtmux, 2) collect pads */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
+  /* references are owned by: 1) qtmux */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 1);
 
   return srcpad;
 }
@@ -146,14 +146,14 @@ teardown_src_pad (GstPad * srcpad)
   /* clean up floating src pad */
   sinkpad = gst_pad_get_peer (srcpad);
   fail_if (sinkpad == NULL);
-  /* pad refs held by 1) qtmux 2) collectpads and 3) us (through _get_peer) */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+  /* pad refs held by 1) qtmux 2) us (through _get_peer) */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
 
   gst_pad_unlink (srcpad, sinkpad);
 
   /* after unlinking, pad refs still held by
-   * 1) qtmux and 2) collectpads and 3) us (through _get_peer) */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+   * 1) qtmux and 2) us (through _get_peer) */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
   gst_object_unref (sinkpad);
   /* one more ref is held by element itself */
 
@@ -177,6 +177,28 @@ qtmux_sinkpad_query (GstPad * pad, GstObject * parent, GstQuery * query)
   return ret;
 }
 
+static gboolean have_eos;
+static GCond eos_cond;
+static GMutex event_mutex;
+
+static gboolean
+qtmux_sinkpad_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  gboolean res = TRUE;
+
+  g_mutex_lock (&event_mutex);
+  if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+    have_eos = TRUE;
+    GST_DEBUG ("signal EOS");
+    g_cond_broadcast (&eos_cond);
+  }
+  g_mutex_unlock (&event_mutex);
+
+  gst_event_unref (event);
+
+  return res;
+}
+
 static GstElement *
 setup_qtmux (GstStaticPadTemplate * srctemplate, const gchar * sinkname,
     gboolean seekable)
@@ -184,12 +206,18 @@ setup_qtmux (GstStaticPadTemplate * srctemplate, const gchar * sinkname,
   GstElement *qtmux;
 
   GST_DEBUG ("setup_qtmux");
+
+  g_cond_init (&eos_cond);
+  g_mutex_init (&event_mutex);
+  have_eos = FALSE;
+
   qtmux = gst_check_setup_element ("qtmux");
   mysrcpad = setup_src_pad (qtmux, srctemplate, sinkname);
   mysinkpad = gst_check_setup_sink_pad (qtmux, &sinktemplate);
 
   downstream_is_seekable = seekable;
   gst_pad_set_query_function (mysinkpad, qtmux_sinkpad_query);
+  gst_pad_set_event_function (mysinkpad, qtmux_sinkpad_event);
 
   gst_pad_set_active (mysrcpad, TRUE);
   gst_pad_set_active (mysinkpad, TRUE);
@@ -197,6 +225,16 @@ setup_qtmux (GstStaticPadTemplate * srctemplate, const gchar * sinkname,
   return qtmux;
 }
 
+static void
+wait_for_eos (void)
+{
+  g_mutex_lock (&event_mutex);
+  while (!have_eos)
+    g_cond_wait (&eos_cond, &event_mutex);
+  g_mutex_unlock (&event_mutex);
+}
+
+
 static void
 cleanup_qtmux (GstElement * qtmux, const gchar * sinkname)
 {
@@ -250,6 +288,9 @@ check_qtmux_pad (GstStaticPadTemplate * srctemplate, const gchar * sinkname,
   /* send eos to have moov written */
   fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()) == TRUE);
 
+  /* Muxing occurs on the aggregate thread */
+  wait_for_eos ();
+
   num_buffers = g_list_length (buffers);
   /* at least expect ftyp, mdat header, buffer chunk and moov */
   fail_unless (num_buffers >= 4);
@@ -342,6 +383,8 @@ check_qtmux_pad_fragmented (GstStaticPadTemplate * srctemplate,
   /* send eos to have all written */
   fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()) == TRUE);
 
+  wait_for_eos ();
+
   num_buffers = g_list_length (buffers);
   /* at least expect ftyp, moov, moof, mdat header, buffer chunk
    * and optionally mfra */
@@ -833,6 +876,7 @@ test_average_bitrate_custom (const gchar * elementname,
   gint64 total_bytes = 0;
   GstClockTime total_duration = 0;
   GstSegment segment;
+  GstBus *bus;
 
   location = g_strdup_printf ("%s/%s-%d", g_get_tmp_dir (), "qtmuxtest",
       g_random_int ());
@@ -845,6 +889,9 @@ test_average_bitrate_custom (const gchar * elementname,
   fail_unless (mysrcpad != NULL);
   gst_pad_set_active (mysrcpad, TRUE);
 
+  bus = gst_bus_new ();
+  gst_element_set_bus (filesink, bus);
+  gst_object_unref (bus);
 
   fail_unless (gst_element_set_state (filesink,
           GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE,
@@ -878,9 +925,14 @@ test_average_bitrate_custom (const gchar * elementname,
   /* send eos to have moov written */
   fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_eos ()) == TRUE);
 
+  gst_message_unref (gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE,
+          GST_MESSAGE_EOS));
+
   gst_element_set_state (qtmux, GST_STATE_NULL);
   gst_element_set_state (filesink, GST_STATE_NULL);
 
+  gst_element_set_bus (filesink, NULL);
+
   gst_check_drop_buffers ();
   gst_pad_set_active (mysrcpad, FALSE);
   teardown_src_pad (mysrcpad);
@@ -1212,6 +1264,7 @@ run_muxing_test (struct TestInputData *input1, struct TestInputData *input2)
   gchar *location;
   GstElement *qtmux;
   GstElement *filesink;
+  GstBus *bus;
 
   location = g_strdup_printf ("%s/%s-%d", g_get_tmp_dir (), "qtmuxtest",
       g_random_int ());
@@ -1220,6 +1273,10 @@ run_muxing_test (struct TestInputData *input1, struct TestInputData *input2)
   g_object_set (filesink, "location", location, NULL);
   gst_element_link (qtmux, filesink);
 
+  bus = gst_bus_new ();
+  gst_element_set_bus (filesink, bus);
+  gst_object_unref (bus);
+
   input1->srcpad = setup_src_pad (qtmux, &srcvideorawtemplate, "video_%u");
   fail_unless (input1->srcpad != NULL);
   gst_pad_set_active (input1->srcpad, TRUE);
@@ -1240,16 +1297,19 @@ run_muxing_test (struct TestInputData *input1, struct TestInputData *input2)
   input2->thread =
       g_thread_new ("test-push-data-2", test_input_push_data, input2);
 
-  /* FIXME set a mainloop and wait for EOS */
-
   g_thread_join (input1->thread);
   g_thread_join (input2->thread);
   input1->thread = NULL;
   input2->thread = NULL;
 
+  gst_message_unref (gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE,
+          GST_MESSAGE_EOS));
+
   gst_element_set_state (qtmux, GST_STATE_NULL);
   gst_element_set_state (filesink, GST_STATE_NULL);
 
+  gst_element_set_bus (filesink, NULL);
+
   check_output (location, input1, input2);
 
   gst_object_unref (filesink);