mpegtsmux: aggregator port
authorMathieu Duponchelle <mathieu@centricular.com>
Tue, 30 Apr 2019 23:09:19 +0000 (01:09 +0200)
committerMathieu Duponchelle <mduponchelle1@gmail.com>
Sun, 19 May 2019 19:40:48 +0000 (19:40 +0000)
14 files changed:
gst/mpegtsmux/gstatscmux.c
gst/mpegtsmux/gstbasetsmux.c
gst/mpegtsmux/gstbasetsmux.h
gst/mpegtsmux/gstbasetsmuxaac.c
gst/mpegtsmux/gstbasetsmuxaac.h
gst/mpegtsmux/gstbasetsmuxjpeg2000.c
gst/mpegtsmux/gstbasetsmuxjpeg2000.h
gst/mpegtsmux/gstbasetsmuxopus.c
gst/mpegtsmux/gstbasetsmuxopus.h
gst/mpegtsmux/gstbasetsmuxttxt.c
gst/mpegtsmux/gstbasetsmuxttxt.h
gst/mpegtsmux/gstmpegtsmux.c
gst/mpegtsmux/tsmux/tsmux.c
tests/check/elements/mpegtsmux.c

index 34b55d2..732d8a6 100644 (file)
@@ -34,7 +34,7 @@ GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
     GST_PAD_ALWAYS,
     GST_STATIC_CAPS ("video/mpegts, "
-        "systemstream = (boolean) true, " "packetsize = (int) 192 ")
+        "systemstream = (boolean) true, " "packetsize = (int) 188 ")
     );
 
 static GstStaticPadTemplate gst_atsc_mux_sink_factory =
@@ -155,7 +155,7 @@ gst_atsc_mux_create_ts_mux (GstBaseTsMux * mpegtsmux)
 
 static guint
 gst_atsc_mux_handle_media_type (GstBaseTsMux * mux, const gchar * media_type,
-    GstBaseTsPadData * ts_data)
+    GstBaseTsMuxPad * pad)
 {
   guint ret = TSMUX_ST_RESERVED;
 
@@ -182,11 +182,11 @@ gst_atsc_mux_class_init (GstATSCMuxClass * klass)
   mpegtsmux_class->create_ts_mux = gst_atsc_mux_create_ts_mux;
   mpegtsmux_class->handle_media_type = gst_atsc_mux_handle_media_type;
 
-  gst_element_class_add_static_pad_template (gstelement_class,
-      &gst_atsc_mux_sink_factory);
+  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+      &gst_atsc_mux_sink_factory, GST_TYPE_BASE_TS_MUX_PAD);
 
-  gst_element_class_add_static_pad_template (gstelement_class,
-      &gst_atsc_mux_src_factory);
+  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+      &gst_atsc_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
 }
 
 static void
index ecec427..f96959f 100644 (file)
 GST_DEBUG_CATEGORY (gst_base_ts_mux_debug);
 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
 
-#define COLLECT_DATA_PAD(collect_data) (((GstCollectData *)(collect_data))->pad)
+/* GstBaseTsMuxPad */
+
+G_DEFINE_TYPE (GstBaseTsMuxPad, gst_base_ts_mux_pad, GST_TYPE_AGGREGATOR_PAD);
+
+/* Internals */
+
+static void
+gst_base_ts_mux_pad_reset (GstBaseTsMuxPad * pad)
+{
+  pad->dts = GST_CLOCK_STIME_NONE;
+  pad->prog_id = -1;
+
+  if (pad->free_func)
+    pad->free_func (pad->prepare_data);
+  pad->prepare_data = NULL;
+  pad->prepare_func = NULL;
+  pad->free_func = NULL;
+
+  if (pad->codec_data)
+    gst_buffer_replace (&pad->codec_data, NULL);
+
+  /* reference owned elsewhere */
+  pad->stream = NULL;
+  pad->prog = NULL;
+
+  if (pad->language) {
+    g_free (pad->language);
+    pad->language = NULL;
+  }
+}
+
+/* GstAggregatorPad implementation */
+
+static GstFlowReturn
+gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
+{
+  gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (agg_pad));
+
+  return GST_FLOW_OK;
+}
+
+/* GObject implementation */
+
+static void
+gst_base_ts_mux_pad_dispose (GObject * obj)
+{
+  GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (obj);
+
+  gst_base_ts_mux_pad_reset (ts_pad);
+}
+
+static void
+gst_base_ts_mux_pad_class_init (GstBaseTsMuxPadClass * klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GstAggregatorPadClass *gstaggpad_class = GST_AGGREGATOR_PAD_CLASS (klass);
+
+  gobject_class->dispose = gst_base_ts_mux_pad_dispose;
+  gstaggpad_class->flush = gst_base_ts_mux_pad_flush;
+}
+
+static void
+gst_base_ts_mux_pad_init (GstBaseTsMuxPad * vaggpad)
+{
+}
+
+/* GstBaseTsMux */
 
 enum
 {
@@ -145,7 +211,7 @@ typedef struct
   GstBuffer *buffer;
 } StreamData;
 
-G_DEFINE_TYPE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_ELEMENT);
+G_DEFINE_TYPE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_AGGREGATOR);
 
 /* Internals */
 
@@ -173,32 +239,6 @@ stream_data_free (StreamData * data)
 #define parent_class gst_base_ts_mux_parent_class
 
 static void
-gst_base_ts_mux_pad_reset (GstBaseTsPadData * pad_data)
-{
-  pad_data->dts = GST_CLOCK_STIME_NONE;
-  pad_data->prog_id = -1;
-
-  if (pad_data->free_func)
-    pad_data->free_func (pad_data->prepare_data);
-  pad_data->prepare_data = NULL;
-  pad_data->prepare_func = NULL;
-  pad_data->free_func = NULL;
-
-  if (pad_data->codec_data)
-    gst_buffer_replace (&pad_data->codec_data, NULL);
-
-  /* reference owned elsewhere */
-  pad_data->stream = NULL;
-  pad_data->prog = NULL;
-
-  if (pad_data->language) {
-    g_free (pad_data->language);
-    pad_data->language = NULL;
-  }
-
-}
-
-static void
 gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
 {
   GstBuffer *buf;
@@ -207,7 +247,9 @@ gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
   GValue value = { 0 };
   GstCaps *caps;
 
-  caps = gst_caps_make_writable (gst_pad_get_current_caps (mux->srcpad));
+  caps =
+      gst_caps_make_writable (gst_pad_get_current_caps (GST_AGGREGATOR_SRC_PAD
+          (mux)));
   structure = gst_caps_get_structure (caps, 0);
 
   g_value_init (&array, GST_TYPE_ARRAY);
@@ -223,44 +265,15 @@ gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
   }
 
   gst_structure_set_value (structure, "streamheader", &array);
-  gst_pad_set_caps (mux->srcpad, caps);
+  gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
   g_value_unset (&array);
   gst_caps_unref (caps);
 }
 
 static void
-gst_base_ts_mux_prepare_srcpad (GstBaseTsMux * mux)
-{
-  GstSegment seg;
-  /* we are not going to seek */
-  GstEvent *new_seg;
-  gchar s_id[32];
-  GstCaps *caps = gst_caps_new_simple ("video/mpegts",
-      "systemstream", G_TYPE_BOOLEAN, TRUE,
-      "packetsize", G_TYPE_INT, mux->packet_size,
-      NULL);
-
-  /* stream-start (FIXME: create id based on input ids) */
-  g_snprintf (s_id, sizeof (s_id), "basetsmux-%08x", g_random_int ());
-  gst_pad_push_event (mux->srcpad, gst_event_new_stream_start (s_id));
-
-  gst_segment_init (&seg, GST_FORMAT_TIME);
-  new_seg = gst_event_new_segment (&seg);
-
-  /* Set caps on src pad from our template and push new segment */
-  gst_pad_set_caps (mux->srcpad, caps);
-  gst_caps_unref (caps);
-
-  if (!gst_pad_push_event (mux->srcpad, new_seg)) {
-    GST_WARNING_OBJECT (mux, "New segment event was not handled downstream");
-  }
-}
-
-static void
 gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
 {
   GstBuffer *buf;
-  GSList *walk;
   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
 
   mux->first = TRUE;
@@ -292,13 +305,6 @@ gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
   gst_event_replace (&mux->force_key_unit_event, NULL);
   gst_buffer_replace (&mux->out_buffer, NULL);
 
-  if (mux->collect) {
-    GST_COLLECT_PADS_STREAM_LOCK (mux->collect);
-    for (walk = mux->collect->data; walk != NULL; walk = g_slist_next (walk))
-      gst_base_ts_mux_pad_reset ((GstBaseTsPadData *) walk->data);
-    GST_COLLECT_PADS_STREAM_UNLOCK (mux->collect);
-  }
-
   if (alloc) {
     g_assert (klass->create_ts_mux);
 
@@ -316,7 +322,7 @@ release_buffer_cb (guint8 * data, void *user_data)
 }
 
 static GstFlowReturn
-gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data)
+gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
 {
   GstFlowReturn ret = GST_FLOW_ERROR;
   GstCaps *caps;
@@ -333,13 +339,13 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data)
   guint8 color_spec = 0;
   j2k_private_data *private_data = NULL;
 
-  pad = ts_data->collect.pad;
+  pad = GST_PAD (ts_pad);
   caps = gst_pad_get_current_caps (pad);
   if (caps == NULL)
     goto not_negotiated;
 
   GST_DEBUG_OBJECT (pad, "Creating stream with PID 0x%04x for caps %"
-      GST_PTR_FORMAT, ts_data->pid, caps);
+      GST_PTR_FORMAT, ts_pad->pid, caps);
 
   s = gst_caps_get_structure (caps, 0);
 
@@ -382,10 +388,10 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data)
           GST_DEBUG_OBJECT (pad,
               "we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
               gst_buffer_get_size (codec_data));
-          ts_data->codec_data = gst_buffer_ref (codec_data);
-          ts_data->prepare_func = gst_base_ts_mux_prepare_aac;
+          ts_pad->codec_data = gst_buffer_ref (codec_data);
+          ts_pad->prepare_func = gst_base_ts_mux_prepare_aac;
         } else {
-          ts_data->codec_data = NULL;
+          ts_pad->codec_data = NULL;
         }
         break;
       }
@@ -420,7 +426,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data)
   } else if (strcmp (mt, "application/x-teletext") == 0) {
     st = TSMUX_ST_PS_TELETEXT;
     /* needs a particularly sized layout */
-    ts_data->prepare_func = gst_base_ts_mux_prepare_teletext;
+    ts_pad->prepare_func = gst_base_ts_mux_prepare_teletext;
   } else if (strcmp (mt, "audio/x-opus") == 0) {
     guint8 channels, mapping_family, stream_count, coupled_count;
     guint8 channel_mapping[256];
@@ -480,7 +486,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data)
     }
 
     st = TSMUX_ST_PS_OPUS;
-    ts_data->prepare_func = gst_base_ts_mux_prepare_opus;
+    ts_pad->prepare_func = gst_base_ts_mux_prepare_opus;
   } else if (strcmp (mt, "meta/x-klv") == 0) {
     st = TSMUX_ST_PS_KLV;
   } else if (strcmp (mt, "image/x-jpc") == 0) {
@@ -568,53 +574,53 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data)
       goto not_negotiated;
     }
     st = TSMUX_ST_VIDEO_JP2K;
-    ts_data->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
-    ts_data->prepare_data = private_data;
-    ts_data->free_func = gst_base_ts_mux_free_jpeg2000;
+    ts_pad->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
+    ts_pad->prepare_data = private_data;
+    ts_pad->free_func = gst_base_ts_mux_free_jpeg2000;
   } else {
     GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
 
     if (klass->handle_media_type) {
-      st = klass->handle_media_type (mux, mt, ts_data);
+      st = klass->handle_media_type (mux, mt, ts_pad);
     }
   }
 
 
   if (st != TSMUX_ST_RESERVED) {
-    ts_data->stream = tsmux_create_stream (mux->tsmux, st, ts_data->pid,
-        ts_data->language);
+    ts_pad->stream = tsmux_create_stream (mux->tsmux, st, ts_pad->pid,
+        ts_pad->language);
   } else {
     GST_DEBUG_OBJECT (pad, "Failed to determine stream type");
   }
 
-  if (ts_data->stream != NULL) {
+  if (ts_pad->stream != NULL) {
     const char *interlace_mode = gst_structure_get_string (s, "interlace-mode");
-    gst_structure_get_int (s, "rate", &ts_data->stream->audio_sampling);
-    gst_structure_get_int (s, "channels", &ts_data->stream->audio_channels);
-    gst_structure_get_int (s, "bitrate", &ts_data->stream->audio_bitrate);
+    gst_structure_get_int (s, "rate", &ts_pad->stream->audio_sampling);
+    gst_structure_get_int (s, "channels", &ts_pad->stream->audio_channels);
+    gst_structure_get_int (s, "bitrate", &ts_pad->stream->audio_bitrate);
 
     /* frame rate */
-    gst_structure_get_fraction (s, "framerate", &ts_data->stream->num,
-        &ts_data->stream->den);
+    gst_structure_get_fraction (s, "framerate", &ts_pad->stream->num,
+        &ts_pad->stream->den);
 
     /* Interlace mode */
-    ts_data->stream->interlace_mode = FALSE;
+    ts_pad->stream->interlace_mode = FALSE;
     if (interlace_mode) {
-      ts_data->stream->interlace_mode =
+      ts_pad->stream->interlace_mode =
           g_str_equal (interlace_mode, "interleaved");
     }
     /* Width and Height */
-    gst_structure_get_int (s, "width", &ts_data->stream->horizontal_size);
-    gst_structure_get_int (s, "height", &ts_data->stream->vertical_size);
+    gst_structure_get_int (s, "width", &ts_pad->stream->horizontal_size);
+    gst_structure_get_int (s, "height", &ts_pad->stream->vertical_size);
 
-    ts_data->stream->color_spec = color_spec;
-    ts_data->stream->max_bitrate = max_rate;
-    ts_data->stream->profile_and_level = profile | main_level;
+    ts_pad->stream->color_spec = color_spec;
+    ts_pad->stream->max_bitrate = max_rate;
+    ts_pad->stream->profile_and_level = profile | main_level;
 
-    ts_data->stream->opus_channel_config_code = opus_channel_config_code;
+    ts_pad->stream->opus_channel_config_code = opus_channel_config_code;
 
-    tsmux_stream_set_buffer_release_func (ts_data->stream, release_buffer_cb);
-    tsmux_program_add_stream (ts_data->prog, ts_data->stream);
+    tsmux_stream_set_buffer_release_func (ts_pad->stream, release_buffer_cb);
+    tsmux_program_add_stream (ts_pad->prog, ts_pad->stream);
 
     ret = GST_FLOW_OK;
   }
@@ -635,19 +641,19 @@ static GstFlowReturn
 gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
 {
   GstFlowReturn ret = GST_FLOW_OK;
-  GSList *walk = mux->collect->data;
+  GList *walk = GST_ELEMENT (mux)->sinkpads;
 
   /* Create the streams */
   while (walk) {
-    GstCollectData *c_data = (GstCollectData *) walk->data;
-    GstBaseTsPadData *ts_data = (GstBaseTsPadData *) walk->data;
+    GstPad *pad = GST_PAD (walk->data);
+    GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
     gchar *name = NULL;
     gchar *pcr_name;
 
-    walk = g_slist_next (walk);
+    walk = g_list_next (walk);
 
-    if (ts_data->prog_id == -1) {
-      name = GST_PAD_NAME (c_data->pad);
+    if (ts_pad->prog_id == -1) {
+      name = GST_PAD_NAME (pad);
       if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map,
               name)) {
         gint idx;
@@ -663,47 +669,47 @@ gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
               idx, name, DEFAULT_PROG_ID);
           idx = DEFAULT_PROG_ID;
         }
-        ts_data->prog_id = idx;
+        ts_pad->prog_id = idx;
       } else {
-        ts_data->prog_id = DEFAULT_PROG_ID;
+        ts_pad->prog_id = DEFAULT_PROG_ID;
       }
     }
 
-    ts_data->prog =
+    ts_pad->prog =
         (TsMuxProgram *) g_hash_table_lookup (mux->programs,
-        GINT_TO_POINTER (ts_data->prog_id));
-    if (ts_data->prog == NULL) {
-      ts_data->prog = tsmux_program_new (mux->tsmux, ts_data->prog_id);
-      if (ts_data->prog == NULL)
+        GINT_TO_POINTER (ts_pad->prog_id));
+    if (ts_pad->prog == NULL) {
+      ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
+      if (ts_pad->prog == NULL)
         goto no_program;
-      tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
+      tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
       g_hash_table_insert (mux->programs,
-          GINT_TO_POINTER (ts_data->prog_id), ts_data->prog);
+          GINT_TO_POINTER (ts_pad->prog_id), ts_pad->prog);
 
       /* Take the first stream of the program for the PCR */
-      GST_DEBUG_OBJECT (COLLECT_DATA_PAD (ts_data),
+      GST_DEBUG_OBJECT (ts_pad,
           "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
-          ts_data->pid, ts_data->prog_id);
+          ts_pad->pid, ts_pad->prog_id);
 
-      tsmux_program_set_pcr_stream (ts_data->prog, ts_data->stream);
+      tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
     }
 
-    if (ts_data->stream == NULL) {
-      ret = gst_base_ts_mux_create_stream (mux, ts_data);
+    if (ts_pad->stream == NULL) {
+      ret = gst_base_ts_mux_create_stream (mux, ts_pad);
       if (ret != GST_FLOW_OK)
         goto no_stream;
     }
 
     /* Check for user-specified PCR PID */
-    pcr_name = g_strdup_printf ("PCR_%d", ts_data->prog->pgm_number);
+    pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
     if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) {
       const gchar *sink_name =
           gst_structure_get_string (mux->prog_map, pcr_name);
 
       if (!g_strcmp0 (name, sink_name)) {
         GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
-            "program (prog_id = %d)", ts_data->pid, ts_data->prog->pgm_number);
-        tsmux_program_set_pcr_stream (ts_data->prog, ts_data->stream);
+            "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
+        tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
       }
     }
     g_free (pcr_name);
@@ -771,6 +777,29 @@ new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
 }
 
 static GstFlowReturn
+finish_buffer_list (GstBaseTsMux * mux, GstBufferList * list)
+{
+  guint i;
+  guint l = gst_buffer_list_length (list);
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  for (i = 0; i < l; i++) {
+    GstBuffer *buf = gst_buffer_list_get (list, i);
+
+    ret =
+        gst_aggregator_finish_buffer (GST_AGGREGATOR (mux),
+        gst_buffer_ref (buf));
+
+    if (ret != GST_FLOW_OK)
+      break;
+  }
+
+  gst_buffer_list_unref (list);
+
+  return ret;
+}
+
+static GstFlowReturn
 gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
 {
   GstBufferList *buffer_list;
@@ -791,7 +820,7 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
   /* no alignment, just push all available data */
   if (align == 0) {
     buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av);
-    return gst_pad_push_list (mux->srcpad, buffer_list);
+    return finish_buffer_list (mux, buffer_list);
   }
 
   align *= packet_size;
@@ -867,7 +896,7 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
     gst_buffer_list_add (buffer_list, buf);
   }
 
-  return gst_pad_push_list (mux->srcpad, buffer_list);
+  return finish_buffer_list (mux, buffer_list);
 }
 
 static GstFlowReturn
@@ -966,102 +995,67 @@ alloc_packet_cb (GstBuffer ** buf, void *user_data)
   klass->allocate_packet (mux, buf);
 }
 
-/* GstElement implementation */
-
-static gboolean
-gst_base_ts_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
-    GstEvent * event, gpointer user_data)
+static GstFlowReturn
+gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
+    GstAggregatorPad * agg_pad, GstBuffer * buf)
 {
-  GstBaseTsMux *mux = GST_BASE_TS_MUX (user_data);
-  gboolean res = FALSE;
-  gboolean forward = TRUE;
-  GstBaseTsPadData *pad_data = (GstBaseTsPadData *) data;
-
-#ifndef GST_DISABLE_GST_DEBUG
-  GstPad *pad;
-
-  pad = data->pad;
-#endif
-
-  switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_CUSTOM_DOWNSTREAM:
-    {
-      GstClockTime timestamp, stream_time, running_time;
-      gboolean all_headers;
-      guint count;
-
-      if (!gst_video_event_is_force_key_unit (event))
-        goto out;
-
-      res = TRUE;
-      forward = FALSE;
-
-      gst_video_event_parse_downstream_force_key_unit (event,
-          &timestamp, &stream_time, &running_time, &all_headers, &count);
-      GST_INFO_OBJECT (pad, "have downstream force-key-unit event, "
-          "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
-          gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
-
-      if (mux->force_key_unit_event != NULL) {
-        GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
-            "as an upstream force key unit is already queued");
-        goto out;
-      }
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstBaseTsMuxPad *best = GST_BASE_TS_MUX_PAD (agg_pad);
+  TsMuxProgram *prog;
+  gint64 pts = GST_CLOCK_STIME_NONE;
+  gint64 dts = GST_CLOCK_STIME_NONE;
+  gboolean delta = TRUE, header = FALSE;
+  StreamData *stream_data;
 
-      if (!all_headers)
-        goto out;
+  GST_DEBUG_OBJECT (mux, "Pads collected");
 
-      mux->pending_key_unit_ts = running_time;
-      gst_event_replace (&mux->force_key_unit_event, event);
-      break;
+  if (G_UNLIKELY (mux->first)) {
+    ret = gst_base_ts_mux_create_streams (mux);
+    if (G_UNLIKELY (ret != GST_FLOW_OK)) {
+      if (buf)
+        gst_buffer_unref (buf);
+      return ret;
     }
-    case GST_EVENT_TAG:{
-      GstTagList *list;
-      gchar *lang = NULL;
 
-      GST_DEBUG_OBJECT (mux, "received tag event");
-      gst_event_parse_tag (event, &list);
+    mux->first = FALSE;
+  }
 
-      /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
-      if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
-        const gchar *lang_code;
+  prog = best->prog;
+  if (prog == NULL)
+    goto no_program;
 
-        lang_code = gst_tag_get_language_code_iso_639_2B (lang);
-        if (lang_code) {
-          GST_DEBUG_OBJECT (pad, "Setting language to '%s'", lang_code);
+  g_assert (buf != NULL);
 
-          g_free (pad_data->language);
-          pad_data->language = g_strdup (lang_code);
-        } else {
-          GST_WARNING_OBJECT (pad, "Did not get language code for '%s'", lang);
-        }
-        g_free (lang);
-      }
+  if (best->prepare_func) {
+    GstBuffer *tmp;
 
-      /* handled this, don't want collectpads to forward it downstream */
-      res = TRUE;
-      forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
-      break;
-    }
-    case GST_EVENT_STREAM_START:{
-      GstStreamFlags flags;
+    tmp = best->prepare_func (buf, best, mux);
+    g_assert (tmp);
+    gst_buffer_unref (buf);
+    buf = tmp;
+  }
 
-      gst_event_parse_stream_flags (event, &flags);
+  if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
+    GstEvent *event;
 
-      /* Don't wait for data on sparse inputs like metadata streams */
-      if ((flags & GST_STREAM_FLAG_SPARSE)) {
-        GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
-        gst_collect_pads_set_waiting (pads, data, FALSE);
-        GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
-      }
-      break;
-    }
-    case GST_EVENT_FLUSH_STOP:{
+    event = check_pending_key_unit_event (mux->force_key_unit_event,
+        &agg_pad->segment, GST_BUFFER_PTS (buf),
+        GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
+    if (event) {
+      GstClockTime running_time;
+      guint count;
       GList *cur;
 
-      /* Send initial segments again after a flush-stop, and also resend the
-       * header sections */
-      mux->first = TRUE;
+      mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
+      gst_event_replace (&mux->force_key_unit_event, NULL);
+
+      gst_video_event_parse_downstream_force_key_unit (event,
+          NULL, NULL, &running_time, NULL, &count);
+
+      GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
+          "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
+          GST_TIME_ARGS (running_time), count);
+      gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
 
       /* output PAT, SI tables */
       tsmux_resend_pat (mux->tsmux);
@@ -1073,109 +1067,105 @@ gst_base_ts_mux_sink_event (GstCollectPads * pads, GstCollectData * data,
 
         tsmux_resend_pmt (program);
       }
-      break;
     }
-    default:
-      break;
   }
 
-out:
-  if (!forward)
-    gst_event_unref (event);
-  else
-    res = gst_collect_pads_event_default (pads, data, event, FALSE);
+  if (G_UNLIKELY (prog->pcr_stream == NULL)) {
+    /* Take the first data stream for the PCR */
+    GST_DEBUG_OBJECT (best,
+        "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
+        best->pid, best->prog_id);
 
-  return res;
-}
+    /* Set the chosen PCR stream */
+    tsmux_program_set_pcr_stream (prog, best->stream);
+  }
 
-static gboolean
-gst_base_ts_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
-  GstBaseTsMux *mux = GST_BASE_TS_MUX (parent);
-  gboolean res = TRUE, forward = TRUE;
+  GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
 
-  switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_CUSTOM_UPSTREAM:
-    {
-      GstIterator *iter;
-      GstIteratorResult iter_ret;
-      GstPad *sinkpad;
-      GValue sinkpad_value = G_VALUE_INIT;
-      GstClockTime running_time;
-      gboolean all_headers, done, res = FALSE;
-      guint count;
+  if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
+    pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
+    GST_DEBUG_OBJECT (mux, "Buffer has PTS  %" GST_TIME_FORMAT " pts %"
+        G_GINT64_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts);
+  }
 
-      if (!gst_video_event_is_force_key_unit (event))
-        break;
+  if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
+    dts = GSTTIME_TO_MPEGTIME (best->dts);
+    GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
+        G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
+  }
 
-      forward = FALSE;
+  /* should not have a DTS without PTS */
+  if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
+    GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
+    pts = dts;
+  }
 
-      gst_video_event_parse_upstream_force_key_unit (event,
-          &running_time, &all_headers, &count);
+  if (best->stream->is_video_stream) {
+    delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
+    header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
+  }
 
-      GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
-          "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
-          gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
-          all_headers, count);
+  if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
+    GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
 
-      if (!all_headers)
-        break;
+    gst_buffer_unref (buf);
+    return GST_FLOW_OK;
+  }
 
-      mux->pending_key_unit_ts = running_time;
-      gst_event_replace (&mux->force_key_unit_event, event);
+  GST_DEBUG_OBJECT (mux, "delta: %d", delta);
 
-      iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
-      done = FALSE;
-      while (!done) {
-        gboolean tmp;
+  stream_data = stream_data_new (buf);
+  tsmux_stream_add_data (best->stream, stream_data->map_info.data,
+      stream_data->map_info.size, stream_data, pts, dts, !delta);
 
-        iter_ret = gst_iterator_next (iter, &sinkpad_value);
-        sinkpad = GST_PAD (g_value_get_object (&sinkpad_value));
+  /* outgoing ts follows ts of PCR program stream */
+  if (prog->pcr_stream == best->stream) {
+    /* prefer DTS if present for PCR as it should be monotone */
+    mux->last_ts =
+        GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
+        GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
+  }
 
-        switch (iter_ret) {
-          case GST_ITERATOR_DONE:
-            done = TRUE;
-            break;
-          case GST_ITERATOR_OK:
-            GST_INFO_OBJECT (pad, "forwarding");
-            tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
-            GST_INFO_OBJECT (mux, "result %d", tmp);
-            /* succeed if at least one pad succeeds */
-            res |= tmp;
-            break;
-          case GST_ITERATOR_ERROR:
-            done = TRUE;
-            break;
-          case GST_ITERATOR_RESYNC:
-            break;
-        }
-        g_value_reset (&sinkpad_value);
-      }
-      g_value_unset (&sinkpad_value);
-      gst_iterator_free (iter);
-      break;
+  mux->is_delta = delta;
+  mux->is_header = header;
+  while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
+    if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
+      /* Failed writing data for some reason. Set appropriate error */
+      GST_DEBUG_OBJECT (mux, "Failed to write data packet");
+      GST_ELEMENT_ERROR (mux, STREAM, MUX,
+          ("Failed writing output data to stream %04x", best->stream->id),
+          (NULL));
+      goto write_fail;
     }
-    default:
-      break;
   }
+  /* flush packet cache */
+  return gst_base_ts_mux_push_packets (mux, FALSE);
 
-  if (forward)
-    res = gst_pad_event_default (pad, parent, event);
-  else
-    gst_event_unref (event);
-
-  return res;
+  /* ERRORS */
+write_fail:
+  {
+    return mux->last_flow_ret;
+  }
+no_program:
+  {
+    if (buf)
+      gst_buffer_unref (buf);
+    GST_ELEMENT_ERROR (mux, STREAM, MUX,
+        ("Stream on pad %" GST_PTR_FORMAT
+            " is not associated with any program", best), (NULL));
+    return GST_FLOW_ERROR;
+  }
 }
 
+/* GstElement implementation */
+
 static GstPad *
 gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
     const gchar * name, const GstCaps * caps)
 {
   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
   gint pid = -1;
-  gchar *pad_name = NULL;
   GstPad *pad = NULL;
-  GstBaseTsPadData *pad_data = NULL;
 
   if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
     if (tsmux_find_stream (mux->tsmux, pid))
@@ -1184,21 +1174,12 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
     pid = tsmux_get_new_pid (mux->tsmux);
   }
 
-  pad_name = g_strdup_printf ("sink_%d", pid);
-  pad = gst_pad_new_from_template (templ, pad_name);
-  g_free (pad_name);
-
-  pad_data = (GstBaseTsPadData *)
-      gst_collect_pads_add_pad (mux->collect, pad, sizeof (GstBaseTsPadData),
-      (GstCollectDataDestroyNotify) (gst_base_ts_mux_pad_reset), TRUE);
-  if (pad_data == NULL)
-    goto pad_failure;
+  pad = (GstPad *)
+      GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
+      templ, name, caps);
 
-  gst_base_ts_mux_pad_reset (pad_data);
-  pad_data->pid = pid;
-
-  if (G_UNLIKELY (!gst_element_add_pad (element, pad)))
-    goto could_not_add;
+  gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (pad));
+  GST_BASE_TS_MUX_PAD (pad)->pid = pid;
 
   return pad;
 
@@ -1209,128 +1190,257 @@ stream_exists:
         (NULL));
     return NULL;
   }
-could_not_add:
-  {
-    GST_ELEMENT_ERROR (element, STREAM, FAILED,
-        ("Internal data stream error."), ("Could not add pad to element"));
-    gst_collect_pads_remove_pad (mux->collect, pad);
-    gst_object_unref (pad);
-    return NULL;
-  }
-pad_failure:
-  {
-    GST_ELEMENT_ERROR (element, STREAM, FAILED,
-        ("Internal data stream error."), ("Could not add pad to collectpads"));
-    gst_object_unref (pad);
-    return NULL;
-  }
 }
 
-static void
-gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
+static gboolean
+gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
 {
+  GstMpegtsSection *section;
   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
 
-  GST_DEBUG_OBJECT (mux, "Pad %" GST_PTR_FORMAT " being released", pad);
+  section = gst_event_parse_mpegts_section (event);
+  gst_event_unref (event);
+
+  if (section) {
+    GST_DEBUG ("Received event with mpegts section");
+
+    /* TODO: Check that the section type is supported */
+    tsmux_add_mpegts_si_section (mux->tsmux, section);
 
-  if (mux->collect) {
-    gst_collect_pads_remove_pad (mux->collect, pad);
+    return TRUE;
   }
 
-  /* chain up */
-  gst_element_remove_pad (element, pad);
+  return FALSE;
 }
 
-static GstStateChangeReturn
-gst_base_ts_mux_change_state (GstElement * element, GstStateChange transition)
+/* GstAggregator implementation */
+
+static gboolean
+gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
+    GstEvent * event)
 {
-  GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
-  GstStateChangeReturn ret;
+  GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+  GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (agg_pad);
+  gboolean res = FALSE;
+  gboolean forward = TRUE;
 
-  switch (transition) {
-    case GST_STATE_CHANGE_NULL_TO_READY:
-      break;
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
-      gst_collect_pads_start (mux->collect);
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_collect_pads_stop (mux->collect);
-      break;
-    case GST_STATE_CHANGE_READY_TO_NULL:
-      break;
-    default:
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CUSTOM_DOWNSTREAM:
+    {
+      GstClockTime timestamp, stream_time, running_time;
+      gboolean all_headers;
+      guint count;
+
+      if (!gst_video_event_is_force_key_unit (event))
+        goto out;
+
+      res = TRUE;
+      forward = FALSE;
+
+      gst_video_event_parse_downstream_force_key_unit (event,
+          &timestamp, &stream_time, &running_time, &all_headers, &count);
+      GST_INFO_OBJECT (ts_pad, "have downstream force-key-unit event, "
+          "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
+          gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
+
+      if (mux->force_key_unit_event != NULL) {
+        GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
+            "as an upstream force key unit is already queued");
+        goto out;
+      }
+
+      if (!all_headers)
+        goto out;
+
+      mux->pending_key_unit_ts = running_time;
+      gst_event_replace (&mux->force_key_unit_event, event);
       break;
-  }
+    }
+    case GST_EVENT_TAG:{
+      GstTagList *list;
+      gchar *lang = NULL;
+
+      GST_DEBUG_OBJECT (mux, "received tag event");
+      gst_event_parse_tag (event, &list);
+
+      /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
+      if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
+        const gchar *lang_code;
+
+        lang_code = gst_tag_get_language_code_iso_639_2B (lang);
+        if (lang_code) {
+          GST_DEBUG_OBJECT (ts_pad, "Setting language to '%s'", lang_code);
 
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+          g_free (ts_pad->language);
+          ts_pad->language = g_strdup (lang_code);
+        } else {
+          GST_WARNING_OBJECT (ts_pad, "Did not get language code for '%s'",
+              lang);
+        }
+        g_free (lang);
+      }
 
-  switch (transition) {
-    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      /* handled this, don't want collectpads to forward it downstream */
+      res = TRUE;
+      forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
       break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_base_ts_mux_reset (mux, TRUE);
+    }
+    case GST_EVENT_STREAM_START:{
+      GstStreamFlags flags;
+
+      gst_event_parse_stream_flags (event, &flags);
+
+      /* Don't wait for data on sparse inputs like metadata streams */
+      /*
+         if ((flags & GST_STREAM_FLAG_SPARSE)) {
+         GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
+         gst_collect_pads_set_waiting (pads, data, FALSE);
+         GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
+         }
+       */
       break;
-    case GST_STATE_CHANGE_READY_TO_NULL:
+    }
+    case GST_EVENT_FLUSH_STOP:{
+      GList *cur;
+
+      /* Send initial segments again after a flush-stop, and also resend the
+       * header sections */
+      mux->first = TRUE;
+
+      /* output PAT, SI tables */
+      tsmux_resend_pat (mux->tsmux);
+      tsmux_resend_si (mux->tsmux);
+
+      /* output PMT for each program */
+      for (cur = mux->tsmux->programs; cur; cur = cur->next) {
+        TsMuxProgram *program = (TsMuxProgram *) cur->data;
+
+        tsmux_resend_pmt (program);
+      }
       break;
+    }
     default:
       break;
   }
 
-  return ret;
+out:
+  if (!forward)
+    gst_event_unref (event);
+  else
+    res = agg_class->sink_event (agg, agg_pad, event);
+
+  return res;
 }
 
 static gboolean
-gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
+gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
 {
-  GstMpegtsSection *section;
-  GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
+  GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+  gboolean res = TRUE, forward = TRUE;
 
-  section = gst_event_parse_mpegts_section (event);
-  gst_event_unref (event);
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CUSTOM_UPSTREAM:
+    {
+      GstIterator *iter;
+      GstIteratorResult iter_ret;
+      GstPad *sinkpad;
+      GValue sinkpad_value = G_VALUE_INIT;
+      GstClockTime running_time;
+      gboolean all_headers, done, res = FALSE;
+      guint count;
 
-  if (section) {
-    GST_DEBUG ("Received event with mpegts section");
+      if (!gst_video_event_is_force_key_unit (event))
+        break;
 
-    /* TODO: Check that the section type is supported */
-    tsmux_add_mpegts_si_section (mux->tsmux, section);
+      forward = FALSE;
 
-    return TRUE;
+      gst_video_event_parse_upstream_force_key_unit (event,
+          &running_time, &all_headers, &count);
+
+      GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
+          "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
+          gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
+          all_headers, count);
+
+      if (!all_headers)
+        break;
+
+      mux->pending_key_unit_ts = running_time;
+      gst_event_replace (&mux->force_key_unit_event, event);
+
+      iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
+      done = FALSE;
+      while (!done) {
+        gboolean tmp;
+
+        iter_ret = gst_iterator_next (iter, &sinkpad_value);
+        sinkpad = GST_PAD (g_value_get_object (&sinkpad_value));
+
+        switch (iter_ret) {
+          case GST_ITERATOR_DONE:
+            done = TRUE;
+            break;
+          case GST_ITERATOR_OK:
+            GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
+            tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
+            GST_INFO_OBJECT (mux, "result %d", tmp);
+            /* succeed if at least one pad succeeds */
+            res |= tmp;
+            break;
+          case GST_ITERATOR_ERROR:
+            done = TRUE;
+            break;
+          case GST_ITERATOR_RESYNC:
+            break;
+        }
+        g_value_reset (&sinkpad_value);
+      }
+      g_value_unset (&sinkpad_value);
+      gst_iterator_free (iter);
+      break;
+    }
+    default:
+      break;
   }
 
-  return FALSE;
-}
+  if (forward)
+    res = agg_class->src_event (agg, event);
+  else
+    gst_event_unref (event);
 
-/* CollectPads implementation */
+  return res;
+}
 
-static GstFlowReturn
-gst_base_ts_mux_clip_inc_running_time (GstCollectPads * pads,
-    GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
-    gpointer user_data)
+static GstBuffer *
+gst_base_ts_mux_clip (GstAggregator * agg,
+    GstAggregatorPad * agg_pad, GstBuffer * buf)
 {
-  GstBaseTsPadData *pad_data = (GstBaseTsPadData *) cdata;
+  GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (agg_pad);
   GstClockTime time;
+  GstBuffer *ret;
 
-  *outbuf = buf;
+  ret = buf;
 
   /* PTS */
   time = GST_BUFFER_PTS (buf);
 
   /* invalid left alone and passed */
   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
-    time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
+    time =
+        gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME, time);
     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
-      GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
+      GST_DEBUG_OBJECT (pad, "clipping buffer on pad outside segment");
       gst_buffer_unref (buf);
-      *outbuf = NULL;
+      ret = NULL;
       goto beach;
     } else {
-      GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " ->  %"
+      GST_LOG_OBJECT (pad, "buffer pts %" GST_TIME_FORMAT " ->  %"
           GST_TIME_FORMAT " running time",
           GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
-      buf = *outbuf = gst_buffer_make_writable (buf);
-      GST_BUFFER_PTS (*outbuf) = time;
+      buf = ret = gst_buffer_make_writable (buf);
+      GST_BUFFER_PTS (ret) = time;
     }
   }
 
@@ -1342,7 +1452,7 @@ gst_base_ts_mux_clip_inc_running_time (GstCollectPads * pads,
     gint sign;
     gint64 dts;
 
-    sign = gst_segment_to_running_time_full (&cdata->segment, GST_FORMAT_TIME,
+    sign = gst_segment_to_running_time_full (&agg_pad->segment, GST_FORMAT_TIME,
         time, &time);
 
     if (sign > 0)
@@ -1350,211 +1460,140 @@ gst_base_ts_mux_clip_inc_running_time (GstCollectPads * pads,
     else
       dts = -((gint64) time);
 
-    GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
+    GST_LOG_OBJECT (pad, "buffer dts %" GST_TIME_FORMAT " -> %"
         GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
         GST_STIME_ARGS (dts));
 
-    if (GST_CLOCK_STIME_IS_VALID (pad_data->dts) && dts < pad_data->dts) {
+    if (GST_CLOCK_STIME_IS_VALID (pad->dts) && dts < pad->dts) {
       /* Ignore DTS going backward */
-      GST_WARNING_OBJECT (cdata->pad, "ignoring DTS going backward");
-      dts = pad_data->dts;
+      GST_WARNING_OBJECT (pad, "ignoring DTS going backward");
+      dts = pad->dts;
     }
 
-    *outbuf = gst_buffer_make_writable (buf);
+    ret = gst_buffer_make_writable (buf);
     if (sign > 0)
-      GST_BUFFER_DTS (*outbuf) = time;
+      GST_BUFFER_DTS (ret) = time;
     else
-      GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE;
+      GST_BUFFER_DTS (ret) = GST_CLOCK_TIME_NONE;
 
-    pad_data->dts = dts;
+    pad->dts = dts;
   } else {
-    pad_data->dts = GST_CLOCK_STIME_NONE;
+    pad->dts = GST_CLOCK_STIME_NONE;
   }
 
 beach:
-  return GST_FLOW_OK;
+  return ret;
 }
 
 static GstFlowReturn
-gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data,
-    GstBuffer * buf, GstBaseTsMux * mux)
+gst_base_ts_mux_update_src_caps (GstAggregator * agg, GstCaps * caps,
+    GstCaps ** ret)
 {
-  GstFlowReturn ret = GST_FLOW_OK;
-  GstBaseTsPadData *best = (GstBaseTsPadData *) data;
-  TsMuxProgram *prog;
-  gint64 pts = GST_CLOCK_STIME_NONE;
-  gint64 dts = GST_CLOCK_STIME_NONE;
-  gboolean delta = TRUE, header = FALSE;
-  StreamData *stream_data;
-
-  GST_DEBUG_OBJECT (mux, "Pads collected");
-
-  if (G_UNLIKELY (mux->first)) {
-    ret = gst_base_ts_mux_create_streams (mux);
-    if (G_UNLIKELY (ret != GST_FLOW_OK)) {
-      if (buf)
-        gst_buffer_unref (buf);
-      return ret;
-    }
-
-    gst_base_ts_mux_prepare_srcpad (mux);
-
-    mux->first = FALSE;
-  }
-
-  if (G_UNLIKELY (best == NULL)) {
-    GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
-    /* EOS */
-    GST_INFO_OBJECT (mux, "EOS");
-    /* drain some possibly cached data */
-    if (klass->drain)
-      klass->drain (mux);
-    gst_base_ts_mux_push_packets (mux, TRUE);
-    gst_pad_push_event (mux->srcpad, gst_event_new_eos ());
-
-    if (buf)
-      gst_buffer_unref (buf);
-
-    return GST_FLOW_OK;
-  }
-
-  prog = best->prog;
-  if (prog == NULL)
-    goto no_program;
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+  GstStructure *s;
 
-  g_assert (buf != NULL);
+  *ret = gst_caps_copy (caps);
+  s = gst_caps_get_structure (*ret, 0);
+  gst_structure_set (s, "packetsize", G_TYPE_INT, mux->packet_size, NULL);
 
-  if (best->prepare_func) {
-    GstBuffer *tmp;
+  return GST_FLOW_OK;
+}
 
-    tmp = best->prepare_func (buf, best, mux);
-    g_assert (tmp);
-    gst_buffer_unref (buf);
-    buf = tmp;
+static GstBaseTsMuxPad *
+gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
+{
+  GstBaseTsMuxPad *pad, *best = NULL;
+  GList *l;
+  GstBuffer *buffer;
+  GstClockTime best_ts = GST_CLOCK_TIME_NONE;
+
+  for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
+    pad = GST_BASE_TS_MUX_PAD (l->data);
+    buffer = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (pad));
+    if (!buffer)
+      continue;
+    if (best_ts == GST_CLOCK_TIME_NONE) {
+      best = pad;
+      best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
+    } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
+      GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
+      if (t < best_ts) {
+        best = pad;
+        best_ts = t;
+      }
+    }
+    gst_buffer_unref (buffer);
   }
+  GST_DEBUG_OBJECT (aggregator,
+      "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
+      GST_TIME_ARGS (best_ts), best);
 
-  if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
-    GstEvent *event;
-
-    event = check_pending_key_unit_event (mux->force_key_unit_event,
-        &best->collect.segment, GST_BUFFER_PTS (buf),
-        GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
-    if (event) {
-      GstClockTime running_time;
-      guint count;
-      GList *cur;
-
-      mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
-      gst_event_replace (&mux->force_key_unit_event, NULL);
-
-      gst_video_event_parse_downstream_force_key_unit (event,
-          NULL, NULL, &running_time, NULL, &count);
-
-      GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
-          "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
-          GST_TIME_ARGS (running_time), count);
-      gst_pad_push_event (mux->srcpad, event);
+  return best;
+}
 
-      /* output PAT, SI tables */
-      tsmux_resend_pat (mux->tsmux);
-      tsmux_resend_si (mux->tsmux);
+static gboolean
+gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
+{
+  GList *l;
 
-      /* output PMT for each program */
-      for (cur = mux->tsmux->programs; cur; cur = cur->next) {
-        TsMuxProgram *program = (TsMuxProgram *) cur->data;
+  for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
+    GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
 
-        tsmux_resend_pmt (program);
-      }
-    }
+    if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad)))
+      return FALSE;
   }
+  return TRUE;
+}
 
-  if (G_UNLIKELY (prog->pcr_stream == NULL)) {
-    /* Take the first data stream for the PCR */
-    GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
-        "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
-        best->pid, best->prog_id);
-
-    /* Set the chosen PCR stream */
-    tsmux_program_set_pcr_stream (prog, best->stream);
-  }
 
-  GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
-      "Chose stream for output (PID: 0x%04x)", best->pid);
+static GstFlowReturn
+gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
+{
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstBaseTsMuxPad *best = gst_base_ts_mux_find_best_pad (agg);
 
-  if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
-    pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
-    GST_DEBUG_OBJECT (mux, "Buffer has PTS  %" GST_TIME_FORMAT " pts %"
-        G_GINT64_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts);
-  }
+  if (best) {
+    GstBuffer *buffer;
 
-  if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
-    dts = GSTTIME_TO_MPEGTIME (best->dts);
-    GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
-        G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
-  }
+    buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best));
 
-  /* should not have a DTS without PTS */
-  if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
-    GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
-    pts = dts;
-  }
+    ret =
+        gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
+        GST_AGGREGATOR_PAD (best), buffer);
 
-  if (best->stream->is_video_stream) {
-    delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
-    header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
+    if (ret != GST_FLOW_OK)
+      goto done;
   }
 
-  if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
-    GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
+  if (gst_base_ts_mux_are_all_pads_eos (mux)) {
+    GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
+    /* drain some possibly cached data */
+    if (klass->drain)
+      klass->drain (mux);
+    gst_base_ts_mux_push_packets (mux, TRUE);
 
-    gst_buffer_unref (buf);
-    return GST_FLOW_OK;
+    ret = GST_FLOW_EOS;
   }
 
-  GST_DEBUG_OBJECT (mux, "delta: %d", delta);
+done:
+  return ret;
+}
 
-  stream_data = stream_data_new (buf);
-  tsmux_stream_add_data (best->stream, stream_data->map_info.data,
-      stream_data->map_info.size, stream_data, pts, dts, !delta);
+static gboolean
+gst_base_ts_mux_start (GstAggregator * agg)
+{
+  gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
 
-  /* outgoing ts follows ts of PCR program stream */
-  if (prog->pcr_stream == best->stream) {
-    /* prefer DTS if present for PCR as it should be monotone */
-    mux->last_ts =
-        GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
-        GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
-  }
+  return TRUE;
+}
 
-  mux->is_delta = delta;
-  mux->is_header = header;
-  while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
-    if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
-      /* Failed writing data for some reason. Set appropriate error */
-      GST_DEBUG_OBJECT (mux, "Failed to write data packet");
-      GST_ELEMENT_ERROR (mux, STREAM, MUX,
-          ("Failed writing output data to stream %04x", best->stream->id),
-          (NULL));
-      goto write_fail;
-    }
-  }
-  /* flush packet cache */
-  return gst_base_ts_mux_push_packets (mux, FALSE);
+static gboolean
+gst_base_ts_mux_stop (GstAggregator * agg)
+{
+  gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), FALSE);
 
-  /* ERRORS */
-write_fail:
-  {
-    return mux->last_flow_ret;
-  }
-no_program:
-  {
-    if (buf)
-      gst_buffer_unref (buf);
-    GST_ELEMENT_ERROR (mux, STREAM, MUX,
-        ("Stream on pad %" GST_PTR_FORMAT
-            " is not associated with any program", COLLECT_DATA_PAD (best)),
-        (NULL));
-    return GST_FLOW_ERROR;
-  }
+  return TRUE;
 }
 
 /* GObject implementation */
@@ -1570,10 +1609,6 @@ gst_base_ts_mux_dispose (GObject * object)
     g_object_unref (mux->out_adapter);
     mux->out_adapter = NULL;
   }
-  if (mux->collect) {
-    gst_object_unref (mux->collect);
-    mux->collect = NULL;
-  }
   if (mux->prog_map) {
     gst_structure_free (mux->prog_map);
     mux->prog_map = NULL;
@@ -1599,7 +1634,7 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
-  GSList *walk;
+  GList *walk;
 
   switch (prop_id) {
     case PROP_PROG_MAP:
@@ -1620,14 +1655,14 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
       break;
     case PROP_PMT_INTERVAL:
-      walk = mux->collect->data;
+      walk = GST_ELEMENT (object)->sinkpads;
       mux->pmt_interval = g_value_get_uint (value);
 
       while (walk) {
-        GstBaseTsPadData *ts_data = (GstBaseTsPadData *) walk->data;
+        GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
 
-        tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
-        walk = g_slist_next (walk);
+        tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
+        walk = g_list_next (walk);
       }
       break;
     case PROP_ALIGNMENT:
@@ -1687,6 +1722,7 @@ gst_base_ts_mux_default_create_ts_mux (GstBaseTsMux * mux)
   TsMux *tsmux = tsmux_new ();
   tsmux_set_write_func (tsmux, new_packet_cb, mux);
   tsmux_set_alloc_func (tsmux, alloc_packet_cb, mux);
+  tsmux_set_bitrate (tsmux, mux->bitrate);
 
   return tsmux;
 }
@@ -1729,6 +1765,7 @@ static void
 gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
 {
   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+  GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass);
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
 
   GST_DEBUG_CATEGORY_INIT (gst_base_ts_mux_debug, "basetsmux", 0,
@@ -1747,10 +1784,16 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
   gobject_class->constructed = gst_base_ts_mux_constructed;
 
   gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
-  gstelement_class->release_pad = gst_base_ts_mux_release_pad;
-  gstelement_class->change_state = gst_base_ts_mux_change_state;
   gstelement_class->send_event = gst_base_ts_mux_send_event;
 
+  gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps;
+  gstagg_class->aggregate = gst_base_ts_mux_aggregate;
+  gstagg_class->clip = gst_base_ts_mux_clip;
+  gstagg_class->sink_event = gst_base_ts_mux_sink_event;
+  gstagg_class->src_event = gst_base_ts_mux_src_event;
+  gstagg_class->start = gst_base_ts_mux_start;
+  gstagg_class->stop = gst_base_ts_mux_stop;
+
   klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux;
   klass->allocate_packet = gst_base_ts_mux_default_allocate_packet;
   klass->output_packet = gst_base_ts_mux_default_output_packet;
@@ -1792,29 +1835,14 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
           " to achieve multiplex-wide constant bitrate",
           0, G_MAXUINT64, TSMUX_DEFAULT_BITRATE,
           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
+
+  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+      &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
 }
 
 static void
 gst_base_ts_mux_init (GstBaseTsMux * mux)
 {
-  mux->srcpad =
-      gst_pad_new_from_static_template (&gst_base_ts_mux_src_factory, "src");
-  gst_pad_use_fixed_caps (mux->srcpad);
-  gst_pad_set_event_function (mux->srcpad,
-      GST_DEBUG_FUNCPTR (gst_base_ts_mux_src_event));
-  gst_element_add_pad (GST_ELEMENT (mux), mux->srcpad);
-
-  mux->collect = gst_collect_pads_new ();
-  gst_collect_pads_set_buffer_function (mux->collect,
-      (GstCollectPadsBufferFunction)
-      GST_DEBUG_FUNCPTR (gst_base_ts_mux_collected_buffer), mux);
-
-  gst_collect_pads_set_event_function (mux->collect,
-      (GstCollectPadsEventFunction)
-      GST_DEBUG_FUNCPTR (gst_base_ts_mux_sink_event), mux);
-  gst_collect_pads_set_clip_function (mux->collect, (GstCollectPadsClipFunction)
-      GST_DEBUG_FUNCPTR (gst_base_ts_mux_clip_inc_running_time), mux);
-
   mux->out_adapter = gst_adapter_new ();
 
   /* properties */
index 9a3ff20..7b89225 100644 (file)
 #include <gst/gst.h>
 #include <gst/base/gstcollectpads.h>
 #include <gst/base/gstadapter.h>
-#include <glib-object.h>
+#include <gst/base/gstaggregator.h>
 
 G_BEGIN_DECLS
 
 #include "tsmux/tsmux.h"
 
-#define GST_TYPE_BASE_TS_MUX  (gst_base_ts_mux_get_type())
-#define GST_BASE_TS_MUX(obj)  (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_BASE_TS_MUX, GstBaseTsMux))
-#define GST_BASE_TS_MUX_CLASS(klass)           (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_BASE_TS_MUX, GstBaseTsMuxClass))
-#define GST_BASE_TS_MUX_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_BASE_TS_MUX,GstBaseTsMuxClass))
-
-#define GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH 188
-
+#define GST_TYPE_BASE_TS_MUX_PAD (gst_base_ts_mux_pad_get_type())
+#define GST_BASE_TS_MUX_PAD(obj) \
+        (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASE_TS_MUX_PAD, GstBaseTsMuxPad))
+#define GST_BASE_TS_MUX_PAD_CAST(obj) ((GstBaseTsMuxPad *)(obj))
+#define GST_BASE_TS_MUX_PAD_CLASS(klass) \
+        (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASE_TS_MUX_PAD, GstBaseTsMuxPadClass))
+#define GST_IS_BASE_TS_MUX_PAD(obj) \
+        (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASE_TS_MUX_PAD))
+#define GST_IS_BASE_TS_MUX_PAD_CLASS(klass) \
+        (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASE_TS_MUX_PAD))
+#define GST_BASE_TS_MUX_PAD_GET_CLASS(obj) \
+        (G_TYPE_INSTANCE_GET_CLASS((obj),GST_TYPE_BASE_TS_MUX_PAD,GstBaseTsMuxPadClass))
+
+typedef struct _GstBaseTsMuxPad GstBaseTsMuxPad;
+typedef struct _GstBaseTsMuxPadClass GstBaseTsMuxPadClass;
+typedef struct _GstBaseTsMuxPadPrivate GstBaseTsMuxPadPrivate;
 typedef struct GstBaseTsMux GstBaseTsMux;
 typedef struct GstBaseTsMuxClass GstBaseTsMuxClass;
 typedef struct GstBaseTsPadData GstBaseTsPadData;
 
-typedef GstBuffer * (*GstBaseTsPadDataPrepareFunction) (GstBuffer * buf,
-    GstBaseTsPadData * data, GstBaseTsMux * mux);
+typedef GstBuffer * (*GstBaseTsMuxPadPrepareFunction) (GstBuffer * buf,
+    GstBaseTsMuxPad * data, GstBaseTsMux * mux);
 
-typedef void (*GstBaseTsPadDataFreePrepareDataFunction) (gpointer prepare_data);
+typedef void (*GstBaseTsMuxPadFreePrepareDataFunction) (gpointer prepare_data);
 
-struct GstBaseTsMux {
-  GstElement parent;
+struct _GstBaseTsMuxPad
+{
+  GstAggregatorPad              parent;
+
+  gint pid;
+  TsMuxStream *stream;
+
+  /* most recent DTS */
+  gint64 dts;
+
+  /* optional codec data available in the caps */
+  GstBuffer *codec_data;
+
+  /* Opaque data pointer to a structure used by the prepare function */
+  gpointer prepare_data;
+
+  /* handler to prepare input data */
+  GstBaseTsMuxPadPrepareFunction prepare_func;
+  /* handler to free the private data */
+  GstBaseTsMuxPadFreePrepareDataFunction free_func;
 
-  GstPad *srcpad;
+  /* program id to which it is attached to (not program pid) */
+  gint prog_id;
+  /* program this stream belongs to */
+  TsMuxProgram *prog;
 
-  GstCollectPads *collect;
+  gchar *language;
+};
+
+struct _GstBaseTsMuxPadClass
+{
+  GstAggregatorPadClass parent_class;
+};
+
+GType gst_base_ts_mux_pad_get_type   (void);
+
+#define GST_TYPE_BASE_TS_MUX  (gst_base_ts_mux_get_type())
+#define GST_BASE_TS_MUX(obj)  (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_BASE_TS_MUX, GstBaseTsMux))
+#define GST_BASE_TS_MUX_CLASS(klass)           (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_BASE_TS_MUX, GstBaseTsMuxClass))
+#define GST_BASE_TS_MUX_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_BASE_TS_MUX,GstBaseTsMuxClass))
+
+#define GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH 188
+
+struct GstBaseTsMux {
+  GstAggregator parent;
 
   TsMux *tsmux;
   GHashTable *programs;
@@ -166,10 +214,10 @@ struct GstBaseTsMux {
  *                 Called at EOS, if the subclass has data it needs to drain.
  */
 struct GstBaseTsMuxClass {
-  GstElementClass parent_class;
+  GstAggregatorClass parent_class;
 
   TsMux *   (*create_ts_mux) (GstBaseTsMux *mux);
-  guint     (*handle_media_type) (GstBaseTsMux *mux, const gchar *media_type, GstBaseTsPadData * ts_data);
+  guint     (*handle_media_type) (GstBaseTsMux *mux, const gchar *media_type, GstBaseTsMuxPad * pad);
   void      (*allocate_packet) (GstBaseTsMux *mux, GstBuffer **buffer);
   gboolean  (*output_packet) (GstBaseTsMux *mux, GstBuffer *buffer, gint64 new_pcr);
   void      (*reset) (GstBaseTsMux *mux);
@@ -179,6 +227,11 @@ struct GstBaseTsMuxClass {
 void gst_base_ts_mux_set_packet_size (GstBaseTsMux *mux, gsize size);
 void gst_base_ts_mux_set_automatic_alignment (GstBaseTsMux *mux, gsize alignment);
 
+typedef GstBuffer * (*GstBaseTsPadDataPrepareFunction) (GstBuffer * buf,
+    GstBaseTsPadData * data, GstBaseTsMux * mux);
+
+typedef void (*GstBaseTsPadDataFreePrepareDataFunction) (gpointer prepare_data);
+
 struct GstBaseTsPadData {
   /* parent */
   GstCollectData collect;
@@ -210,7 +263,6 @@ struct GstBaseTsPadData {
 
 GType gst_base_ts_mux_get_type (void);
 
-
 G_END_DECLS
 
 #endif
index a4e85ee..04dcd00 100644 (file)
@@ -90,7 +90,7 @@
 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
 
 GstBuffer *
-gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data,
+gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux)
 {
   guint8 adts_header[7] = { 0, };
@@ -106,7 +106,7 @@ gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data,
   gst_buffer_copy_into (out_buf, buf,
       GST_BUFFER_COPY_METADATA | GST_BUFFER_COPY_TIMESTAMPS, 0, 0);
 
-  gst_buffer_map (data->codec_data, &codec_data_map, GST_MAP_READ);
+  gst_buffer_map (pad->codec_data, &codec_data_map, GST_MAP_READ);
 
   /* Generate ADTS header */
   obj_type = GST_READ_UINT8 (codec_data_map.data) >> 3;
@@ -149,7 +149,7 @@ gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data,
   /* Now copy complete frame */
   gst_buffer_fill (out_buf, out_offset, buf_map.data, buf_map.size);
 
-  gst_buffer_unmap (data->codec_data, &codec_data_map);
+  gst_buffer_unmap (pad->codec_data, &codec_data_map);
   gst_buffer_unmap (buf, &buf_map);
 
   return out_buf;
index 2ae5091..9e60a56 100644 (file)
@@ -85,7 +85,7 @@
  
 #include "gstbasetsmux.h"
 
-GstBuffer * gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data,
+GstBuffer * gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux);
  
 #endif /* __BASETSMUX_AAC_H__ */
index fca7936..996b76a 100644 (file)
 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
 
 GstBuffer *
-gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsPadData * data,
+gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux)
 {
-  j2k_private_data *private_data = data->prepare_data;
+  j2k_private_data *private_data = pad->prepare_data;
   GstByteWriter wr;
   GstBuffer *out_buf = NULL;
   guint8 *elsm_header = NULL;
index 73fbc38..284f530 100644 (file)
@@ -55,7 +55,7 @@ typedef struct j2k_private_data
   guint8 color_spec;
 } j2k_private_data;
 
-GstBuffer *gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsPadData * data,
+GstBuffer *gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux);
 
 void gst_base_ts_mux_free_jpeg2000 (gpointer prepare_data);
index e642f09..41a8b20 100644 (file)
@@ -91,7 +91,7 @@
 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
 
 GstBuffer *
-gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsPadData * pad_data,
+gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux)
 {
   gssize insize = gst_buffer_get_size (buf);
index b1af5ce..375de52 100644 (file)
@@ -85,7 +85,7 @@
  
 #include "gstbasetsmux.h"
 
-GstBuffer * gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsPadData * data,
+GstBuffer * gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux);
  
 #endif /* __BASETSMUX_OPUS_H__ */
index 9f9d9fe..f9a16e4 100644 (file)
@@ -98,7 +98,7 @@
  */
 
 GstBuffer *
-gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsPadData * pad_data,
+gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux)
 {
   GstBuffer *out_buf;
index 4975aa1..bb2082f 100644 (file)
@@ -85,7 +85,7 @@
  
 #include "gstbasetsmux.h"
 
-GstBuffer * gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsPadData * data,
+GstBuffer * gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsMuxPad * pad,
     GstBaseTsMux * mux);
  
 #endif /* __BASETSMUX_TTXT_H__ */
index 2f244de..f7212eb 100644 (file)
@@ -395,11 +395,11 @@ gst_mpeg_ts_mux_class_init (GstMpegTsMuxClass * klass)
       "Multiplexes media streams into an MPEG Transport Stream",
       "Fluendo <contact@fluendo.com>");
 
-  gst_element_class_add_static_pad_template (gstelement_class,
-      &gst_mpeg_ts_mux_sink_factory);
+  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+      &gst_mpeg_ts_mux_sink_factory, GST_TYPE_BASE_TS_MUX_PAD);
 
-  gst_element_class_add_static_pad_template (gstelement_class,
-      &gst_mpeg_ts_mux_src_factory);
+  gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
+      &gst_mpeg_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
 
   g_object_class_install_property (gobject_class, PROP_M2TS_MODE,
       g_param_spec_boolean ("m2ts-mode", "M2TS(192 bytes) Mode",
index d3a3bb2..71550d4 100644 (file)
@@ -1085,7 +1085,7 @@ pad_stream (TsMux * mux, TsMuxStream * stream, gint64 cur_ts, gint64 * cur_pcr)
         GST_LOG ("Transport stream bitrate: %" G_GUINT64_FORMAT, bitrate);
 
         if (bitrate < mux->bitrate) {
-          GST_LOG_OBJECT (mux, "Padding transport stream");
+          GST_LOG ("Padding transport stream");
 
           if (!tsmux_get_buffer (mux, &buf)) {
             ret = FALSE;
index a8273b6..2c8a495 100644 (file)
@@ -75,14 +75,14 @@ setup_src_pad (GstElement * element,
     sinkpad = gst_element_get_request_pad (element, sinkname);
   fail_if (sinkpad == NULL, "Could not get sink pad from %s",
       GST_ELEMENT_NAME (element));
-  /* references are owned by: 1) us, 2) tsmux, 3) collect pads */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+  /* references are owned by: 1) us, 2) tsmux */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
   fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
       "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
   gst_object_unref (sinkpad);   /* because we got it higher up */
 
-  /* references are owned by: 1) tsmux, 2) collect pads */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
+  /* references are owned by: 1) tsmux */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 1);
 
   if (padname)
     *padname = g_strdup (GST_PAD_NAME (sinkpad));
@@ -98,16 +98,16 @@ teardown_src_pad (GstElement * element, const gchar * sinkname)
   /* clean up floating src pad */
   if (!(sinkpad = gst_element_get_static_pad (element, sinkname)))
     sinkpad = gst_element_get_request_pad (element, sinkname);
-  /* pad refs held by 1) tsmux 2) collectpads and 3) us (through _get) */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+  /* pad refs held by 1) tsmux 2) us (through _get) */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
   srcpad = gst_pad_get_peer (sinkpad);
 
   gst_pad_unlink (srcpad, sinkpad);
   GST_DEBUG ("src %p", srcpad);
 
   /* after unlinking, pad refs still held by
-   * 1) tsmux and 2) collectpads and 3) us (through _get) */
-  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3);
+   * 1) tsmux and 2) us (through _get) */
+  ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2);
   gst_object_unref (sinkpad);
   /* one more ref is held by element itself */
 
@@ -161,6 +161,7 @@ check_tsmux_pad (GstStaticPadTemplate * srctemplate,
   gint i;
   gint pmt_pid = -1, el_pid = -1, pcr_pid = -1, packets = 0;
   gchar *padname;
+  GstQuery *drain;
 
   mux = setup_tsmux (srctemplate, sinkname, &padname);
 
@@ -200,6 +201,10 @@ check_tsmux_pad (GstStaticPadTemplate * srctemplate,
     ts += 40 * GST_MSECOND;
   }
 
+  drain = gst_query_new_drain ();
+  gst_pad_peer_query (mysrcpad, drain);
+  gst_query_unref (drain);
+
   if (check_func)
     check_func (buffers);
 
@@ -366,339 +371,6 @@ GST_START_TEST (test_audio)
 
 GST_END_TEST;
 
-
-typedef struct _TestData
-{
-  GstEvent *sink_event;
-  gint src_events;
-} TestData;
-
-typedef struct _ThreadData
-{
-  GstPad *pad;
-  GstBuffer *buffer;
-  GstFlowReturn flow_return;
-  GThread *thread;
-} ThreadData;
-
-static gboolean
-src_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
-  TestData *data = (TestData *) gst_pad_get_element_private (pad);
-
-  if (event->type == GST_EVENT_CUSTOM_UPSTREAM)
-    data->src_events += 1;
-
-  gst_event_unref (event);
-  return TRUE;
-}
-
-static gboolean
-sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
-  TestData *data = (TestData *) gst_pad_get_element_private (pad);
-
-  if (event->type == GST_EVENT_CUSTOM_DOWNSTREAM)
-    data->sink_event = event;
-
-  gst_event_unref (event);
-  return TRUE;
-}
-
-static void
-link_sinks (GstElement * mpegtsmux,
-    GstPad ** src1, GstPad ** src2, GstPad ** src3, TestData * test_data)
-{
-  GstPad *mux_sink1, *mux_sink2, *mux_sink3;
-
-  /* link 3 sink pads, 2 video 1 audio */
-  *src1 = gst_pad_new_from_static_template (&video_src_template, "src1");
-  gst_pad_set_active (*src1, TRUE);
-  gst_pad_set_element_private (*src1, test_data);
-  gst_pad_set_event_function (*src1, src_event);
-  mux_sink1 = gst_element_get_request_pad (mpegtsmux, "sink_1");
-  fail_unless (gst_pad_link (*src1, mux_sink1) == GST_PAD_LINK_OK);
-
-  *src2 = gst_pad_new_from_static_template (&video_src_template, "src2");
-  gst_pad_set_active (*src2, TRUE);
-  gst_pad_set_element_private (*src2, test_data);
-  gst_pad_set_event_function (*src2, src_event);
-  mux_sink2 = gst_element_get_request_pad (mpegtsmux, "sink_2");
-  fail_unless (gst_pad_link (*src2, mux_sink2) == GST_PAD_LINK_OK);
-
-  *src3 = gst_pad_new_from_static_template (&audio_src_template, "src3");
-  gst_pad_set_active (*src3, TRUE);
-  gst_pad_set_element_private (*src3, test_data);
-  gst_pad_set_event_function (*src3, src_event);
-  mux_sink3 = gst_element_get_request_pad (mpegtsmux, "sink_3");
-  fail_unless (gst_pad_link (*src3, mux_sink3) == GST_PAD_LINK_OK);
-
-  gst_object_unref (mux_sink1);
-  gst_object_unref (mux_sink2);
-  gst_object_unref (mux_sink3);
-}
-
-static void
-link_src (GstElement * mpegtsmux, GstPad ** sink, TestData * test_data)
-{
-  GstPad *mux_src;
-
-  mux_src = gst_element_get_static_pad (mpegtsmux, "src");
-  *sink = gst_pad_new_from_static_template (&sink_template, "sink");
-  gst_pad_set_active (*sink, TRUE);
-  gst_pad_set_event_function (*sink, sink_event);
-  gst_pad_set_element_private (*sink, test_data);
-  fail_unless (gst_pad_link (mux_src, *sink) == GST_PAD_LINK_OK);
-
-  gst_object_unref (mux_src);
-}
-
-static void
-setup_caps (GstElement * mpegtsmux, GstPad * src1, GstPad * src2, GstPad * src3)
-{
-  GstSegment segment;
-  GstCaps *caps;
-
-  gst_segment_init (&segment, GST_FORMAT_TIME);
-
-  caps = gst_caps_new_simple ("video/x-h264",
-      "stream-format", G_TYPE_STRING, "byte-stream",
-      "alignment", G_TYPE_STRING, "nal", NULL);
-  gst_pad_push_event (src1, gst_event_new_stream_start ("1"));
-  gst_pad_push_event (src1, gst_event_new_caps (caps));
-  gst_pad_push_event (src1, gst_event_new_segment (&segment));
-  gst_pad_push_event (src2, gst_event_new_stream_start ("2"));
-  gst_pad_push_event (src2, gst_event_new_caps (caps));
-  gst_pad_push_event (src2, gst_event_new_segment (&segment));
-  gst_caps_unref (caps);
-  caps = gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 4,
-      "stream-format", G_TYPE_STRING, "raw", "framed", G_TYPE_BOOLEAN, TRUE,
-      NULL);
-  gst_pad_push_event (src3, gst_event_new_stream_start ("3"));
-  gst_pad_push_event (src3, gst_event_new_caps (caps));
-  gst_pad_push_event (src3, gst_event_new_segment (&segment));
-  gst_caps_unref (caps);
-}
-
-static gpointer
-pad_push_thread (gpointer user_data)
-{
-  ThreadData *data = (ThreadData *) user_data;
-
-  data->flow_return = gst_pad_push (data->pad, data->buffer);
-
-  return NULL;
-}
-
-static ThreadData *
-pad_push (GstPad * pad, GstBuffer * buffer, GstClockTime timestamp)
-{
-  ThreadData *data;
-
-  data = g_new0 (ThreadData, 1);
-  data->pad = pad;
-  data->buffer = buffer;
-  GST_BUFFER_TIMESTAMP (buffer) = timestamp;
-  data->thread = g_thread_try_new ("gst-check", pad_push_thread, data, NULL);
-
-  return data;
-}
-
-GST_START_TEST (test_force_key_unit_event_downstream)
-{
-  GstElement *mpegtsmux;
-  GstPad *sink;
-  GstPad *src1;
-  GstPad *src2;
-  GstPad *src3;
-  GstEvent *sink_event;
-  GstClockTime timestamp, stream_time, running_time;
-  gboolean all_headers = TRUE;
-  gint count = 0;
-  ThreadData *thread_data_1, *thread_data_2, *thread_data_3, *thread_data_4;
-  TestData test_data = { 0, };
-
-  mpegtsmux = gst_check_setup_element ("mpegtsmux");
-
-  link_src (mpegtsmux, &sink, &test_data);
-  link_sinks (mpegtsmux, &src1, &src2, &src3, &test_data);
-  gst_element_set_state (mpegtsmux, GST_STATE_PLAYING);
-  setup_caps (mpegtsmux, src1, src2, src3);
-
-  /* send a force-key-unit event with running_time=2s */
-  timestamp = stream_time = running_time = 2 * GST_SECOND;
-  sink_event = gst_video_event_new_downstream_force_key_unit (timestamp,
-      stream_time, running_time, all_headers, count);
-
-  fail_unless (gst_pad_push_event (src1, sink_event));
-  fail_unless (test_data.sink_event == NULL);
-
-  /* push 4 buffers, make sure mpegtsmux handles the force-key-unit event when
-   * the buffer with the requested running time is collected */
-  thread_data_1 = pad_push (src1, gst_buffer_new (), 1 * GST_SECOND);
-  thread_data_2 = pad_push (src2, gst_buffer_new (), 2 * GST_SECOND);
-  thread_data_3 = pad_push (src3, gst_buffer_new (), 3 * GST_SECOND);
-
-  g_thread_join (thread_data_1->thread);
-  fail_unless (test_data.sink_event == NULL);
-
-  /* push again on src1 so that the buffer on src2 is collected */
-  thread_data_4 = pad_push (src1, gst_buffer_new (), 4 * GST_SECOND);
-
-  g_thread_join (thread_data_2->thread);
-  fail_unless (test_data.sink_event != NULL);
-
-  gst_element_set_state (mpegtsmux, GST_STATE_NULL);
-
-  g_thread_join (thread_data_3->thread);
-  g_thread_join (thread_data_4->thread);
-
-  g_free (thread_data_1);
-  g_free (thread_data_2);
-  g_free (thread_data_3);
-  g_free (thread_data_4);
-  gst_object_unref (src1);
-  gst_object_unref (src2);
-  gst_object_unref (src3);
-  gst_object_unref (sink);
-  gst_object_unref (mpegtsmux);
-}
-
-GST_END_TEST;
-
-GST_START_TEST (test_force_key_unit_event_upstream)
-{
-  GstElement *mpegtsmux;
-  GstPad *sink;
-  GstPad *src1;
-  GstPad *src2;
-  GstPad *src3;
-  GstClockTime timestamp, stream_time, running_time;
-  gboolean all_headers = TRUE;
-  gint count = 0;
-  TestData test_data = { 0, };
-  ThreadData *thread_data_1, *thread_data_2, *thread_data_3, *thread_data_4;
-  GstEvent *event;
-
-  mpegtsmux = gst_check_setup_element ("mpegtsmux");
-
-  link_src (mpegtsmux, &sink, &test_data);
-  link_sinks (mpegtsmux, &src1, &src2, &src3, &test_data);
-  gst_element_set_state (mpegtsmux, GST_STATE_PLAYING);
-  setup_caps (mpegtsmux, src1, src2, src3);
-
-  /* send an upstream force-key-unit event with running_time=2s */
-  timestamp = stream_time = running_time = 2 * GST_SECOND;
-  event =
-      gst_video_event_new_upstream_force_key_unit (running_time, TRUE, count);
-  fail_unless (gst_pad_push_event (sink, event));
-
-  fail_unless (test_data.sink_event == NULL);
-  fail_unless_equals_int (test_data.src_events, 3);
-
-  /* send downstream events with unrelated seqnums */
-  event = gst_video_event_new_downstream_force_key_unit (timestamp,
-      stream_time, running_time, all_headers, count);
-  fail_unless (gst_pad_push_event (src1, event));
-  event = gst_video_event_new_downstream_force_key_unit (timestamp,
-      stream_time, running_time, all_headers, count);
-  fail_unless (gst_pad_push_event (src2, event));
-
-  /* events should be skipped */
-  fail_unless (test_data.sink_event == NULL);
-
-  /* push 4 buffers, make sure mpegtsmux handles the force-key-unit event when
-   * the buffer with the requested running time is collected */
-  thread_data_1 = pad_push (src1, gst_buffer_new (), 1 * GST_SECOND);
-  thread_data_2 = pad_push (src2, gst_buffer_new (), 2 * GST_SECOND);
-  thread_data_3 = pad_push (src3, gst_buffer_new (), 3 * GST_SECOND);
-
-  g_thread_join (thread_data_1->thread);
-  fail_unless (test_data.sink_event == NULL);
-
-  /* push again on src1 so that the buffer on src2 is collected */
-  thread_data_4 = pad_push (src1, gst_buffer_new (), 4 * GST_SECOND);
-
-  g_thread_join (thread_data_2->thread);
-  fail_unless (test_data.sink_event != NULL);
-
-  gst_element_set_state (mpegtsmux, GST_STATE_NULL);
-
-  g_thread_join (thread_data_3->thread);
-  g_thread_join (thread_data_4->thread);
-
-  g_free (thread_data_1);
-  g_free (thread_data_2);
-  g_free (thread_data_3);
-  g_free (thread_data_4);
-
-  gst_object_unref (src1);
-  gst_object_unref (src2);
-  gst_object_unref (src3);
-  gst_object_unref (sink);
-  gst_object_unref (mpegtsmux);
-}
-
-GST_END_TEST;
-
-static GstFlowReturn expected_flow;
-
-static GstFlowReturn
-flow_test_stat_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buffer)
-{
-  gst_buffer_unref (buffer);
-
-  GST_INFO ("returning flow %s (%d)", gst_flow_get_name (expected_flow),
-      expected_flow);
-  return expected_flow;
-}
-
-GST_START_TEST (test_propagate_flow_status)
-{
-  GstElement *mux;
-  gchar *padname;
-  GstBuffer *inbuffer;
-  GstCaps *caps;
-  guint i;
-
-  GstFlowReturn expected[] = { GST_FLOW_OK, GST_FLOW_FLUSHING, GST_FLOW_EOS,
-    GST_FLOW_NOT_NEGOTIATED, GST_FLOW_ERROR, GST_FLOW_NOT_SUPPORTED
-  };
-
-  mux = setup_tsmux (&video_src_template, "sink_%d", &padname);
-  gst_pad_set_chain_function (mysinkpad, flow_test_stat_chain_func);
-
-  fail_unless (gst_element_set_state (mux,
-          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
-      "could not set to playing");
-
-  caps = gst_caps_from_string (VIDEO_CAPS_STRING);
-  gst_check_setup_events (mysrcpad, mux, caps, GST_FORMAT_TIME);
-  gst_caps_unref (caps);
-
-  for (i = 0; i < G_N_ELEMENTS (expected); ++i) {
-    GstFlowReturn res;
-
-    inbuffer = gst_buffer_new_and_alloc (1);
-    ASSERT_BUFFER_REFCOUNT (inbuffer, "inbuffer", 1);
-
-    expected_flow = expected[i];
-    GST_INFO ("expecting flow %s (%d)", gst_flow_get_name (expected_flow),
-        expected_flow);
-
-    GST_BUFFER_TIMESTAMP (inbuffer) = i * GST_SECOND;
-
-    res = gst_pad_push (mysrcpad, inbuffer);
-
-    fail_unless_equals_int (res, expected[i]);
-  }
-
-  cleanup_tsmux (mux, padname);
-  g_free (padname);
-}
-
-GST_END_TEST;
-
 GST_START_TEST (test_multiple_state_change)
 {
   GstElement *mux;
@@ -716,7 +388,6 @@ GST_START_TEST (test_multiple_state_change)
   size_t num_transitions_to_test = 10;
 
   mux = setup_tsmux (&video_src_template, "sink_%d", &padname);
-  gst_pad_set_chain_function (mysinkpad, flow_test_stat_chain_func);
   gst_segment_init (&segment, GST_FORMAT_TIME);
 
   caps = gst_caps_from_string (VIDEO_CAPS_STRING);
@@ -724,6 +395,7 @@ GST_START_TEST (test_multiple_state_change)
   gst_caps_unref (caps);
 
   for (i = 0; i < num_transitions_to_test; ++i) {
+    GstQuery *drain;
     GstState next_state = states[i % G_N_ELEMENTS (states)];
     fail_unless (gst_element_set_state (mux,
             next_state) == GST_STATE_CHANGE_SUCCESS,
@@ -739,9 +411,12 @@ GST_START_TEST (test_multiple_state_change)
       inbuffer = gst_buffer_new_and_alloc (1);
       ASSERT_BUFFER_REFCOUNT (inbuffer, "inbuffer", 1);
 
-      expected_flow = GST_FLOW_OK;
       GST_BUFFER_PTS (inbuffer) = 0;
       fail_unless (GST_FLOW_OK == gst_pad_push (mysrcpad, inbuffer));
+
+      drain = gst_query_new_drain ();
+      gst_pad_peer_query (mysrcpad, drain);
+      gst_query_unref (drain);
     }
   }
 
@@ -813,9 +488,6 @@ mpegtsmux_suite (void)
 
   tcase_add_test (tc_chain, test_audio);
   tcase_add_test (tc_chain, test_video);
-  tcase_add_test (tc_chain, test_force_key_unit_event_downstream);
-  tcase_add_test (tc_chain, test_force_key_unit_event_upstream);
-  tcase_add_test (tc_chain, test_propagate_flow_status);
   tcase_add_test (tc_chain, test_multiple_state_change);
   tcase_add_test (tc_chain, test_align);
   tcase_add_test (tc_chain, test_keyframe_flag_propagation);