55d80aa0651127badae2f08d5612774af0344ddf
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / mpegtsmux / gstbasetsmux.c
1 /*
2  * Copyright 2006, 2007, 2008, 2009, 2010 Fluendo S.A.
3  *  Authors: Jan Schmidt <jan@fluendo.com>
4  *           Kapil Agrawal <kapil@fluendo.com>
5  *           Julien Moutte <julien@fluendo.com>
6  *
7  * Copyright (C) 2011 Jan Schmidt <thaytan@noraisin.net>
8  *
9  * This library is licensed under 3 different licenses and you
10  * can choose to use it under the terms of any one of them. The
11  * three licenses are the MPL 1.1, the LGPL and the MIT license.
12  *
13  * MPL:
14  *
15  * The contents of this file are subject to the Mozilla Public License
16  * Version 1.1 (the "License"); you may not use this file except in
17  * compliance with the License. You may obtain a copy of the License at
18  * http://www.mozilla.org/MPL/.
19  *
20  * Software distributed under the License is distributed on an "AS IS"
21  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
22  * License for the specific language governing rights and limitations
23  * under the License.
24  *
25  * LGPL:
26  *
27  * This library is free software; you can redistribute it and/or
28  * modify it under the terms of the GNU Library General Public
29  * License as published by the Free Software Foundation; either
30  * version 2 of the License, or (at your option) any later version.
31  *
32  * This library is distributed in the hope that it will be useful,
33  * but WITHOUT ANY WARRANTY; without even the implied warranty of
34  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
35  * Library General Public License for more details.
36  *
37  * You should have received a copy of the GNU Library General Public
38  * License along with this library; if not, write to the
39  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
40  * Boston, MA 02110-1301, USA.
41  *
42  * MIT:
43  *
44  * Unless otherwise indicated, Source Code is licensed under MIT license.
45  * See further explanation attached in License Statement (distributed in the file
46  * LICENSE).
47  *
48  * Permission is hereby granted, free of charge, to any person obtaining a copy of
49  * this software and associated documentation files (the "Software"), to deal in
50  * the Software without restriction, including without limitation the rights to
51  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
52  * of the Software, and to permit persons to whom the Software is furnished to do
53  * so, subject to the following conditions:
54  *
55  * The above copyright notice and this permission notice shall be included in all
56  * copies or substantial portions of the Software.
57  *
58  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
59  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
60  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
61  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
62  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
63  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
64  * SOFTWARE.
65  *
66  * SPDX-License-Identifier: MPL-1.1 OR MIT OR LGPL-2.0-or-later
67  */
68
69 #include <stdio.h>
70 #include <string.h>
71
72 #include <gst/tag/tag.h>
73 #include <gst/video/video.h>
74 #include <gst/mpegts/mpegts.h>
75 #include <gst/pbutils/pbutils.h>
76 #include <gst/videoparsers/gstjpeg2000parse.h>
77 #include <gst/video/video-color.h>
78
79 #include "gstbasetsmux.h"
80 #include "gstbasetsmuxaac.h"
81 #include "gstbasetsmuxttxt.h"
82 #include "gstbasetsmuxopus.h"
83 #include "gstbasetsmuxjpeg2000.h"
84
85 GST_DEBUG_CATEGORY (gst_base_ts_mux_debug);
86 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
87
88 /* GstBaseTsMuxPad */
89
90 G_DEFINE_TYPE (GstBaseTsMuxPad, gst_base_ts_mux_pad, GST_TYPE_AGGREGATOR_PAD);
91
92 /* Internals */
93
94 static void
95 gst_base_ts_mux_pad_reset (GstBaseTsMuxPad * pad)
96 {
97   pad->dts = GST_CLOCK_STIME_NONE;
98   pad->prog_id = -1;
99
100   if (pad->free_func)
101     pad->free_func (pad->prepare_data);
102   pad->prepare_data = NULL;
103   pad->prepare_func = NULL;
104   pad->free_func = NULL;
105
106   if (pad->codec_data)
107     gst_buffer_replace (&pad->codec_data, NULL);
108
109   /* reference owned elsewhere */
110   pad->stream = NULL;
111   pad->prog = NULL;
112
113   if (pad->language) {
114     g_free (pad->language);
115     pad->language = NULL;
116   }
117 }
118
119 /* GstAggregatorPad implementation */
120
121 static GstFlowReturn
122 gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
123 {
124   GList *cur;
125   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
126
127   /* Send initial segments again after a flush-stop, and also resend the
128    * header sections */
129   mux->first = TRUE;
130
131   /* output PAT, SI tables */
132   tsmux_resend_pat (mux->tsmux);
133   tsmux_resend_si (mux->tsmux);
134
135   /* output PMT for each program */
136   for (cur = mux->tsmux->programs; cur; cur = cur->next) {
137     TsMuxProgram *program = (TsMuxProgram *) cur->data;
138
139     tsmux_resend_pmt (program);
140   }
141
142   return GST_FLOW_OK;
143 }
144
145 /* GObject implementation */
146
147 static void
148 gst_base_ts_mux_pad_dispose (GObject * obj)
149 {
150   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (obj);
151
152   gst_base_ts_mux_pad_reset (ts_pad);
153
154   G_OBJECT_CLASS (gst_base_ts_mux_pad_parent_class)->dispose (obj);
155 }
156
157 static void
158 gst_base_ts_mux_pad_class_init (GstBaseTsMuxPadClass * klass)
159 {
160   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
161   GstAggregatorPadClass *gstaggpad_class = GST_AGGREGATOR_PAD_CLASS (klass);
162
163   gobject_class->dispose = gst_base_ts_mux_pad_dispose;
164   gstaggpad_class->flush = gst_base_ts_mux_pad_flush;
165
166   gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX, 0);
167 }
168
169 static void
170 gst_base_ts_mux_pad_init (GstBaseTsMuxPad * vaggpad)
171 {
172 }
173
174 /* GstBaseTsMux */
175
176 enum
177 {
178   PROP_0,
179   PROP_PROG_MAP,
180   PROP_PAT_INTERVAL,
181   PROP_PMT_INTERVAL,
182   PROP_ALIGNMENT,
183   PROP_SI_INTERVAL,
184   PROP_BITRATE,
185   PROP_PCR_INTERVAL,
186   PROP_SCTE_35_PID,
187   PROP_SCTE_35_NULL_INTERVAL
188 };
189
190 #define DEFAULT_SCTE_35_PID 0
191
192 #define BASETSMUX_DEFAULT_ALIGNMENT    -1
193
194 #define CLOCK_BASE 9LL
195 #define CLOCK_FREQ (CLOCK_BASE * 10000) /* 90 kHz PTS clock */
196 #define CLOCK_FREQ_SCR (CLOCK_FREQ * 300)       /* 27 MHz SCR clock */
197
198 #define GSTTIME_TO_MPEGTIME(time) \
199     (((time) > 0 ? (gint64) 1 : (gint64) -1) * \
200     (gint64) gst_util_uint64_scale (ABS(time), CLOCK_BASE, GST_MSECOND/10))
201 /* 27 MHz SCR conversions: */
202 #define MPEG_SYS_TIME_TO_GSTTIME(time) (gst_util_uint64_scale ((time), \
203                         GST_USECOND, CLOCK_FREQ_SCR / 1000000))
204 #define GSTTIME_TO_MPEG_SYS_TIME(time) (gst_util_uint64_scale ((time), \
205                         CLOCK_FREQ_SCR / 1000000, GST_USECOND))
206
207 #define DEFAULT_PROG_ID 0
208
209 static GstStaticPadTemplate gst_base_ts_mux_src_factory =
210 GST_STATIC_PAD_TEMPLATE ("src",
211     GST_PAD_SRC,
212     GST_PAD_ALWAYS,
213     GST_STATIC_CAPS ("video/mpegts, "
214         "systemstream = (boolean) true, " "packetsize = (int) { 188, 192} ")
215     );
216
217 typedef struct
218 {
219   GstMapInfo map_info;
220   GstBuffer *buffer;
221 } StreamData;
222
223 G_DEFINE_TYPE_WITH_CODE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_AGGREGATOR,
224     gst_mpegts_initialize ());
225
226 /* Internals */
227
228 /* Takes over the ref on the buffer */
229 static StreamData *
230 stream_data_new (GstBuffer * buffer)
231 {
232   StreamData *res = g_new (StreamData, 1);
233   res->buffer = buffer;
234   gst_buffer_map (buffer, &(res->map_info), GST_MAP_READ);
235
236   return res;
237 }
238
239 static void
240 stream_data_free (StreamData * data)
241 {
242   if (data) {
243     gst_buffer_unmap (data->buffer, &data->map_info);
244     gst_buffer_unref (data->buffer);
245     g_free (data);
246   }
247 }
248
249 #define parent_class gst_base_ts_mux_parent_class
250
251 static void
252 gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
253 {
254   GstBuffer *buf;
255   GstStructure *structure;
256   GValue array = { 0 };
257   GValue value = { 0 };
258   GstCaps *caps;
259
260   caps = gst_pad_get_current_caps (GST_AGGREGATOR_SRC_PAD (mux));
261
262   /* If we have no caps, we are possibly shutting down */
263   if (!caps)
264     return;
265
266   caps = gst_caps_make_writable (caps);
267   structure = gst_caps_get_structure (caps, 0);
268
269   g_value_init (&array, GST_TYPE_ARRAY);
270
271   GST_LOG_OBJECT (mux, "setting %u packets into streamheader",
272       g_queue_get_length (&mux->streamheader));
273
274   while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader)))) {
275     g_value_init (&value, GST_TYPE_BUFFER);
276     gst_value_take_buffer (&value, buf);
277     gst_value_array_append_value (&array, &value);
278     g_value_unset (&value);
279   }
280
281   gst_structure_set_value (structure, "streamheader", &array);
282   gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
283   g_value_unset (&array);
284   gst_caps_unref (caps);
285 }
286
287 static gboolean
288 steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section,
289     TsMux * mux)
290 {
291   g_hash_table_insert (mux->si_sections, type, section);
292
293   return TRUE;
294 }
295
296 static void
297 gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
298 {
299   GstBuffer *buf;
300   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
301   GHashTable *si_sections = NULL;
302   GList *l;
303
304   mux->first = TRUE;
305   mux->last_flow_ret = GST_FLOW_OK;
306   mux->last_ts = 0;
307   mux->is_delta = TRUE;
308   mux->is_header = FALSE;
309
310   mux->streamheader_sent = FALSE;
311   mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
312   gst_event_replace (&mux->force_key_unit_event, NULL);
313
314   if (mux->out_adapter)
315     gst_adapter_clear (mux->out_adapter);
316   mux->output_ts_offset = GST_CLOCK_TIME_NONE;
317
318   if (mux->tsmux) {
319     if (mux->tsmux->si_sections)
320       si_sections = g_hash_table_ref (mux->tsmux->si_sections);
321
322     tsmux_free (mux->tsmux);
323     mux->tsmux = NULL;
324   }
325
326   if (mux->programs) {
327     g_hash_table_destroy (mux->programs);
328   }
329   mux->programs = g_hash_table_new (g_direct_hash, g_direct_equal);
330
331   while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader))))
332     gst_buffer_unref (buf);
333
334   gst_event_replace (&mux->force_key_unit_event, NULL);
335   gst_buffer_replace (&mux->out_buffer, NULL);
336
337   GST_OBJECT_LOCK (mux);
338
339   for (l = GST_ELEMENT (mux)->sinkpads; l; l = l->next) {
340     gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (l->data));
341   }
342
343   GST_OBJECT_UNLOCK (mux);
344
345   if (alloc) {
346     g_assert (klass->create_ts_mux);
347
348     mux->tsmux = klass->create_ts_mux (mux);
349
350     /* Preserve user-specified sections across resets */
351     if (si_sections)
352       g_hash_table_foreach_steal (si_sections, (GHRFunc) steal_si_section,
353           mux->tsmux);
354   }
355
356   if (si_sections)
357     g_hash_table_unref (si_sections);
358
359   if (klass->reset)
360     klass->reset (mux);
361 }
362
363 static void
364 release_buffer_cb (guint8 * data, void *user_data)
365 {
366   stream_data_free ((StreamData *) user_data);
367 }
368
369 static GstFlowReturn
370 gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
371 {
372   GstFlowReturn ret = GST_FLOW_ERROR;
373   GstCaps *caps;
374   GstStructure *s;
375   GstPad *pad;
376   guint st = TSMUX_ST_RESERVED;
377   const gchar *mt;
378   const GValue *value = NULL;
379   GstBuffer *codec_data = NULL;
380   guint8 opus_channel_config_code = 0;
381   guint16 profile = GST_JPEG2000_PARSE_PROFILE_NONE;
382   guint8 main_level = 0;
383   guint32 max_rate = 0;
384   guint8 color_spec = 0;
385   j2k_private_data *private_data = NULL;
386   const gchar *stream_format = NULL;
387
388   pad = GST_PAD (ts_pad);
389   caps = gst_pad_get_current_caps (pad);
390   if (caps == NULL)
391     goto not_negotiated;
392
393   GST_DEBUG_OBJECT (pad, "Creating stream with PID 0x%04x for caps %"
394       GST_PTR_FORMAT, ts_pad->pid, caps);
395
396   s = gst_caps_get_structure (caps, 0);
397
398   mt = gst_structure_get_name (s);
399   value = gst_structure_get_value (s, "codec_data");
400   if (value != NULL)
401     codec_data = gst_value_get_buffer (value);
402
403   stream_format = gst_structure_get_string (s, "stream-format");
404
405   if (strcmp (mt, "video/x-dirac") == 0) {
406     st = TSMUX_ST_VIDEO_DIRAC;
407   } else if (strcmp (mt, "audio/x-ac3") == 0) {
408     st = TSMUX_ST_PS_AUDIO_AC3;
409   } else if (strcmp (mt, "audio/x-dts") == 0) {
410     st = TSMUX_ST_PS_AUDIO_DTS;
411   } else if (strcmp (mt, "audio/x-lpcm") == 0) {
412     st = TSMUX_ST_PS_AUDIO_LPCM;
413   } else if (strcmp (mt, "video/x-h264") == 0) {
414     st = TSMUX_ST_VIDEO_H264;
415   } else if (strcmp (mt, "video/x-h265") == 0) {
416     st = TSMUX_ST_VIDEO_HEVC;
417   } else if (strcmp (mt, "audio/mpeg") == 0) {
418     gint mpegversion;
419
420     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
421       GST_ERROR_OBJECT (pad, "caps missing mpegversion");
422       goto not_negotiated;
423     }
424
425     switch (mpegversion) {
426       case 1:{
427         int mpegaudioversion = 1;       /* Assume mpegaudioversion=1 for backwards compatibility */
428         (void) gst_structure_get_int (s, "mpegaudioversion", &mpegaudioversion);
429
430         if (mpegaudioversion == 1)
431           st = TSMUX_ST_AUDIO_MPEG1;
432         else
433           st = TSMUX_ST_AUDIO_MPEG2;
434         break;
435       }
436       case 2:{
437         /* mpegversion=2 in GStreamer refers to MPEG-2 Part 7 audio,  */
438
439         st = TSMUX_ST_AUDIO_AAC;
440
441         /* Check the stream format. If raw, make dummy internal codec data from the caps */
442         if (g_strcmp0 (stream_format, "raw") == 0) {
443           ts_pad->codec_data =
444               gst_base_ts_mux_aac_mpeg2_make_codec_data (mux, caps);
445           ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg2;
446           if (ts_pad->codec_data == NULL) {
447             GST_ERROR_OBJECT (mux, "Invalid or incomplete caps for MPEG-2 AAC");
448             goto not_negotiated;
449           }
450         }
451         break;
452       }
453       case 4:
454       {
455         st = TSMUX_ST_AUDIO_AAC;
456
457         /* Check the stream format. We need codec_data with RAW streams and mpegversion=4 */
458         if (g_strcmp0 (stream_format, "raw") == 0) {
459           if (codec_data) {
460             GST_DEBUG_OBJECT (pad,
461                 "we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
462                 gst_buffer_get_size (codec_data));
463             ts_pad->codec_data = gst_buffer_ref (codec_data);
464             ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg4;
465           } else {
466             ts_pad->codec_data = NULL;
467             GST_ERROR_OBJECT (mux, "Need codec_data for raw MPEG-4 AAC");
468             goto not_negotiated;
469           }
470         }
471         break;
472       }
473       default:
474         GST_WARNING_OBJECT (pad, "unsupported mpegversion %d", mpegversion);
475         goto not_negotiated;
476     }
477   } else if (strcmp (mt, "video/mpeg") == 0) {
478     gint mpegversion;
479
480     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
481       GST_ERROR_OBJECT (pad, "caps missing mpegversion");
482       goto not_negotiated;
483     }
484
485     switch (mpegversion) {
486       case 1:
487         st = TSMUX_ST_VIDEO_MPEG1;
488         break;
489       case 2:
490         st = TSMUX_ST_VIDEO_MPEG2;
491         break;
492       case 4:
493         st = TSMUX_ST_VIDEO_MPEG4;
494         break;
495       default:
496         GST_WARNING_OBJECT (pad, "unsupported mpegversion %d", mpegversion);
497         goto not_negotiated;
498     }
499   } else if (strcmp (mt, "subpicture/x-dvb") == 0) {
500     st = TSMUX_ST_PS_DVB_SUBPICTURE;
501   } else if (strcmp (mt, "application/x-teletext") == 0) {
502     st = TSMUX_ST_PS_TELETEXT;
503     /* needs a particularly sized layout */
504     ts_pad->prepare_func = gst_base_ts_mux_prepare_teletext;
505   } else if (strcmp (mt, "audio/x-opus") == 0) {
506     guint8 channels, mapping_family, stream_count, coupled_count;
507     guint8 channel_mapping[256];
508
509     if (!gst_codec_utils_opus_parse_caps (caps, NULL, &channels,
510             &mapping_family, &stream_count, &coupled_count, channel_mapping)) {
511       GST_ERROR_OBJECT (pad, "Incomplete Opus caps");
512       goto not_negotiated;
513     }
514
515     if (channels <= 2 && mapping_family == 0) {
516       opus_channel_config_code = channels;
517     } else if (channels == 2 && mapping_family == 255 && stream_count == 1
518         && coupled_count == 1) {
519       /* Dual mono */
520       opus_channel_config_code = 0;
521     } else if (channels >= 2 && channels <= 8 && mapping_family == 1) {
522       static const guint8 coupled_stream_counts[9] = {
523         1, 0, 1, 1, 2, 2, 2, 3, 3
524       };
525       static const guint8 channel_map_a[8][8] = {
526         {0},
527         {0, 1},
528         {0, 2, 1},
529         {0, 1, 2, 3},
530         {0, 4, 1, 2, 3},
531         {0, 4, 1, 2, 3, 5},
532         {0, 4, 1, 2, 3, 5, 6},
533         {0, 6, 1, 2, 3, 4, 5, 7},
534       };
535       static const guint8 channel_map_b[8][8] = {
536         {0},
537         {0, 1},
538         {0, 1, 2},
539         {0, 1, 2, 3},
540         {0, 1, 2, 3, 4},
541         {0, 1, 2, 3, 4, 5},
542         {0, 1, 2, 3, 4, 5, 6},
543         {0, 1, 2, 3, 4, 5, 6, 7},
544       };
545
546       /* Vorbis mapping */
547       if (stream_count == channels - coupled_stream_counts[channels] &&
548           coupled_count == coupled_stream_counts[channels] &&
549           memcmp (channel_mapping, channel_map_a[channels - 1],
550               channels) == 0) {
551         opus_channel_config_code = channels;
552       } else if (stream_count == channels - coupled_stream_counts[channels] &&
553           coupled_count == coupled_stream_counts[channels] &&
554           memcmp (channel_mapping, channel_map_b[channels - 1],
555               channels) == 0) {
556         opus_channel_config_code = channels | 0x80;
557       } else {
558         GST_FIXME_OBJECT (pad, "Opus channel mapping not handled");
559         goto not_negotiated;
560       }
561     }
562
563     st = TSMUX_ST_PS_OPUS;
564     ts_pad->prepare_func = gst_base_ts_mux_prepare_opus;
565   } else if (strcmp (mt, "meta/x-klv") == 0) {
566     st = TSMUX_ST_PS_KLV;
567   } else if (strcmp (mt, "image/x-jpc") == 0) {
568     /*
569      * See this document for more details on standard:
570      *
571      * https://www.itu.int/rec/T-REC-H.222.0-201206-S/en
572      *  Annex S describes J2K details
573      *  Page 104 of this document describes J2k video descriptor
574      */
575
576     const GValue *vProfile = gst_structure_get_value (s, "profile");
577     const GValue *vMainlevel = gst_structure_get_value (s, "main-level");
578     const GValue *vFramerate = gst_structure_get_value (s, "framerate");
579     const GValue *vColorimetry = gst_structure_get_value (s, "colorimetry");
580     private_data = g_new0 (j2k_private_data, 1);
581     /* for now, we relax the condition that profile must exist and equal
582      * GST_JPEG2000_PARSE_PROFILE_BC_SINGLE */
583     if (vProfile) {
584       profile = g_value_get_int (vProfile);
585       if (profile != GST_JPEG2000_PARSE_PROFILE_BC_SINGLE) {
586         GST_LOG_OBJECT (pad, "Invalid JPEG 2000 profile %d", profile);
587         /*goto not_negotiated; */
588       }
589     }
590     /* for now, we will relax the condition that the main level must be present */
591     if (vMainlevel) {
592       main_level = g_value_get_uint (vMainlevel);
593       if (main_level > 11) {
594         GST_ERROR_OBJECT (pad, "Invalid main level %d", main_level);
595         goto not_negotiated;
596       }
597       if (main_level >= 6) {
598         max_rate = 2 ^ (main_level - 6) * 1600 * 1000000;
599       } else {
600         switch (main_level) {
601           case 0:
602           case 1:
603           case 2:
604           case 3:
605             max_rate = 200 * 1000000;
606             break;
607           case 4:
608             max_rate = 400 * 1000000;
609             break;
610           case 5:
611             max_rate = 800 * 1000000;
612             break;
613           default:
614             break;
615         }
616       }
617     } else {
618       /*GST_ERROR_OBJECT (pad, "Missing main level");
619          goto not_negotiated; */
620     }
621     /* We always mux video in J2K-over-MPEG-TS non-interlaced mode */
622     private_data->interlace = FALSE;
623     private_data->den = 0;
624     private_data->num = 0;
625     private_data->max_bitrate = max_rate;
626     private_data->color_spec = 1;
627     /* these two fields are not used, since we always mux as non-interlaced */
628     private_data->Fic = 1;
629     private_data->Fio = 0;
630
631     /* Get Framerate */
632     if (vFramerate != NULL) {
633       /* Data for ELSM header */
634       private_data->num = gst_value_get_fraction_numerator (vFramerate);
635       private_data->den = gst_value_get_fraction_denominator (vFramerate);
636     }
637     /* Get Colorimetry */
638     if (vColorimetry) {
639       const char *colorimetry = g_value_get_string (vColorimetry);
640       color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_SRGB;  /* RGB as default */
641       if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT601)) {
642         color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC601;
643       } else {
644         if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT709)
645             || g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_SMPTE240M)) {
646           color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC709;
647         }
648       }
649       private_data->color_spec = color_spec;
650     } else {
651       GST_ERROR_OBJECT (pad, "Colorimetry not present in caps");
652       goto not_negotiated;
653     }
654     st = TSMUX_ST_VIDEO_JP2K;
655     ts_pad->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
656     ts_pad->prepare_data = private_data;
657     ts_pad->free_func = gst_base_ts_mux_free_jpeg2000;
658   } else {
659     GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
660
661     if (klass->handle_media_type) {
662       st = klass->handle_media_type (mux, mt, ts_pad);
663     }
664   }
665
666
667   if (st != TSMUX_ST_RESERVED) {
668     ts_pad->stream = tsmux_create_stream (mux->tsmux, st, ts_pad->pid,
669         ts_pad->language);
670   } else {
671     GST_DEBUG_OBJECT (pad, "Failed to determine stream type");
672   }
673
674   if (ts_pad->stream != NULL) {
675     const char *interlace_mode = gst_structure_get_string (s, "interlace-mode");
676     gst_structure_get_int (s, "rate", &ts_pad->stream->audio_sampling);
677     gst_structure_get_int (s, "channels", &ts_pad->stream->audio_channels);
678     gst_structure_get_int (s, "bitrate", &ts_pad->stream->audio_bitrate);
679
680     /* frame rate */
681     gst_structure_get_fraction (s, "framerate", &ts_pad->stream->num,
682         &ts_pad->stream->den);
683
684     /* Interlace mode */
685     ts_pad->stream->interlace_mode = FALSE;
686     if (interlace_mode) {
687       ts_pad->stream->interlace_mode =
688           g_str_equal (interlace_mode, "interleaved");
689     }
690     /* Width and Height */
691     gst_structure_get_int (s, "width", &ts_pad->stream->horizontal_size);
692     gst_structure_get_int (s, "height", &ts_pad->stream->vertical_size);
693
694     ts_pad->stream->color_spec = color_spec;
695     ts_pad->stream->max_bitrate = max_rate;
696     ts_pad->stream->profile_and_level = profile | main_level;
697
698     ts_pad->stream->opus_channel_config_code = opus_channel_config_code;
699
700     tsmux_stream_set_buffer_release_func (ts_pad->stream, release_buffer_cb);
701     tsmux_program_add_stream (ts_pad->prog, ts_pad->stream);
702
703     ret = GST_FLOW_OK;
704   }
705   gst_caps_unref (caps);
706   return ret;
707   /* ERRORS */
708 not_negotiated:
709   {
710     g_free (private_data);
711     GST_DEBUG_OBJECT (pad, "Sink pad caps were not set before pushing");
712     if (caps)
713       gst_caps_unref (caps);
714     return GST_FLOW_NOT_NEGOTIATED;
715   }
716 }
717
718 static gboolean
719 is_valid_pmt_pid (guint16 pmt_pid)
720 {
721   if (pmt_pid < 0x0010 || pmt_pid > 0x1ffe)
722     return FALSE;
723   return TRUE;
724 }
725
726 static GstFlowReturn
727 gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
728 {
729   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
730   gchar *name = NULL;
731   gchar *prop_name;
732   GstFlowReturn ret = GST_FLOW_OK;
733
734   if (ts_pad->prog_id == -1) {
735     name = GST_PAD_NAME (pad);
736     if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) {
737       gint idx;
738       gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
739       if (!ret) {
740         GST_ELEMENT_ERROR (mux, STREAM, MUX,
741             ("Reading program map failed. Assuming default"), (NULL));
742         idx = DEFAULT_PROG_ID;
743       }
744       if (idx < 0) {
745         GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
746             "than zero; DEFAULT_PROGRAM = %d is used instead",
747             idx, name, DEFAULT_PROG_ID);
748         idx = DEFAULT_PROG_ID;
749       }
750       ts_pad->prog_id = idx;
751     } else {
752       ts_pad->prog_id = DEFAULT_PROG_ID;
753     }
754   }
755
756   ts_pad->prog =
757       (TsMuxProgram *) g_hash_table_lookup (mux->programs,
758       GINT_TO_POINTER (ts_pad->prog_id));
759   if (ts_pad->prog == NULL) {
760     ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
761     if (ts_pad->prog == NULL)
762       goto no_program;
763     tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
764     tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
765     tsmux_program_set_scte35_interval (ts_pad->prog, mux->scte35_null_interval);
766     g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
767         ts_pad->prog);
768
769     /* Check for user-specified PMT PID */
770     prop_name = g_strdup_printf ("PMT_%d", ts_pad->prog->pgm_number);
771     if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
772       guint pmt_pid;
773
774       if (gst_structure_get_uint (mux->prog_map, prop_name, &pmt_pid)) {
775         if (is_valid_pmt_pid (pmt_pid)) {
776           GST_DEBUG_OBJECT (mux, "User specified pid=%u as PMT for "
777               "program (prog_id = %d)", pmt_pid, ts_pad->prog->pgm_number);
778           tsmux_program_set_pmt_pid (ts_pad->prog, pmt_pid);
779         } else {
780           GST_ELEMENT_WARNING (mux, LIBRARY, SETTINGS,
781               ("User specified PMT pid %u for program %d is not valid.",
782                   pmt_pid, ts_pad->prog->pgm_number), (NULL));
783         }
784       }
785     }
786     g_free (prop_name);
787   }
788
789   if (ts_pad->stream == NULL) {
790     ret = gst_base_ts_mux_create_stream (mux, ts_pad);
791     if (ret != GST_FLOW_OK)
792       goto no_stream;
793   }
794
795   if (ts_pad->prog->pcr_stream == NULL) {
796     /* Take the first stream of the program for the PCR */
797     GST_DEBUG_OBJECT (ts_pad,
798         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
799         ts_pad->pid, ts_pad->prog_id);
800
801     tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
802   }
803
804   /* Check for user-specified PCR PID */
805   prop_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
806   if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
807     const gchar *sink_name =
808         gst_structure_get_string (mux->prog_map, prop_name);
809
810     if (!g_strcmp0 (name, sink_name)) {
811       GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
812           "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
813       tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
814     }
815   }
816   g_free (prop_name);
817
818   return ret;
819
820   /* ERRORS */
821 no_program:
822   {
823     GST_ELEMENT_ERROR (mux, STREAM, MUX,
824         ("Could not create new program"), (NULL));
825     return GST_FLOW_ERROR;
826   }
827 no_stream:
828   {
829     GST_ELEMENT_ERROR (mux, STREAM, MUX,
830         ("Could not create handler for stream"), (NULL));
831     return ret;
832   }
833 }
834
835 static gboolean
836 gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
837     gpointer user_data)
838 {
839   GstFlowReturn *ret = user_data;
840
841   *ret = gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad);
842
843   return *ret == GST_FLOW_OK;
844 }
845
846 static GstFlowReturn
847 gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
848 {
849   GstFlowReturn ret = GST_FLOW_OK;
850
851   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (mux),
852       gst_base_ts_mux_create_pad_stream_func, &ret);
853
854   return ret;
855 }
856
857 static void
858 new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
859     guint len)
860 {
861   /* Packets should be at least 188 bytes, but check anyway */
862   g_assert (len >= 2 || !data);
863
864   if (!mux->streamheader_sent && data) {
865     guint pid = ((data[1] & 0x1f) << 8) | data[2];
866     /* if it's a PAT or a PMT */
867     if (pid == 0x00 || (pid >= TSMUX_START_PMT_PID && pid < TSMUX_START_ES_PID)) {
868       GstBuffer *hbuf;
869
870       if (!buf) {
871         hbuf = gst_buffer_new_and_alloc (len);
872         gst_buffer_fill (hbuf, 0, data, len);
873       } else {
874         hbuf = gst_buffer_copy (buf);
875       }
876       GST_LOG_OBJECT (mux,
877           "Collecting packet with pid 0x%04x into streamheaders", pid);
878
879       g_queue_push_tail (&mux->streamheader, hbuf);
880     } else if (!g_queue_is_empty (&mux->streamheader)) {
881       gst_base_ts_mux_set_header_on_caps (mux);
882       mux->streamheader_sent = TRUE;
883     }
884   }
885
886   if (buf) {
887     if (mux->is_header) {
888       GST_LOG_OBJECT (mux, "marking as header buffer");
889       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_HEADER);
890     }
891     if (mux->is_delta) {
892       GST_LOG_OBJECT (mux, "marking as delta unit");
893       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
894     } else {
895       GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
896       mux->is_delta = TRUE;
897     }
898   }
899 }
900
901 static GstFlowReturn
902 gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
903 {
904   GstBufferList *buffer_list;
905   gint align = mux->alignment;
906   gint av, packet_size;
907
908   packet_size = mux->packet_size;
909
910   if (align < 0)
911     align = mux->automatic_alignment;
912
913   av = gst_adapter_available (mux->out_adapter);
914   GST_LOG_OBJECT (mux, "align %d, av %d", align, av);
915
916   if (av == 0)
917     return GST_FLOW_OK;
918
919   /* no alignment, just push all available data */
920   if (align == 0) {
921     buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av);
922     return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux),
923         buffer_list);
924   }
925
926   align *= packet_size;
927
928   if (!force && align > av)
929     return GST_FLOW_OK;
930
931   buffer_list = gst_buffer_list_new_sized ((av / align) + 1);
932
933   GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
934   while (align <= av) {
935     GstBuffer *buf;
936     GstClockTime pts;
937
938     pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
939     buf = gst_adapter_take_buffer (mux->out_adapter, align);
940
941     GST_BUFFER_PTS (buf) = pts;
942
943     gst_buffer_list_add (buffer_list, buf);
944     av -= align;
945   }
946
947   if (av > 0 && force) {
948     GstBuffer *buf;
949     GstClockTime pts;
950     guint8 *data;
951     guint32 header;
952     gint dummy;
953     GstMapInfo map;
954
955     GST_LOG_OBJECT (mux, "handling %d leftover bytes", av);
956
957     pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
958     buf = gst_buffer_new_and_alloc (align);
959
960     GST_BUFFER_PTS (buf) = pts;
961
962     gst_buffer_map (buf, &map, GST_MAP_READ);
963     data = map.data;
964
965     gst_adapter_copy (mux->out_adapter, data, 0, av);
966     gst_adapter_clear (mux->out_adapter);
967
968     data += av;
969     header = GST_READ_UINT32_BE (data - packet_size);
970
971     dummy = (map.size - av) / packet_size;
972     GST_LOG_OBJECT (mux, "adding %d null packets", dummy);
973
974     for (; dummy > 0; dummy--) {
975       gint offset;
976
977       if (packet_size > GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH) {
978         GST_WRITE_UINT32_BE (data, header);
979         /* simply increase header a bit and never mind too much */
980         header++;
981         offset = 4;
982       } else {
983         offset = 0;
984       }
985       GST_WRITE_UINT8 (data + offset, TSMUX_SYNC_BYTE);
986       /* null packet PID */
987       GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
988       /* no adaptation field exists | continuity counter undefined */
989       GST_WRITE_UINT8 (data + offset + 3, 0x10);
990       /* payload */
991       memset (data + offset + 4, 0, GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH - 4);
992       data += packet_size;
993     }
994
995     gst_buffer_unmap (buf, &map);
996     gst_buffer_list_add (buffer_list, buf);
997   }
998
999   return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux), buffer_list);
1000 }
1001
1002 static GstFlowReturn
1003 gst_base_ts_mux_collect_packet (GstBaseTsMux * mux, GstBuffer * buf)
1004 {
1005   GST_LOG_OBJECT (mux, "collecting packet size %" G_GSIZE_FORMAT,
1006       gst_buffer_get_size (buf));
1007   gst_adapter_push (mux->out_adapter, buf);
1008
1009   return GST_FLOW_OK;
1010 }
1011
1012 static GstEvent *
1013 check_pending_key_unit_event (GstEvent * pending_event, GstSegment * segment,
1014     GstClockTime timestamp, guint flags, GstClockTime pending_key_unit_ts)
1015 {
1016   GstClockTime running_time, stream_time;
1017   gboolean all_headers;
1018   guint count;
1019   GstEvent *event = NULL;
1020
1021   g_assert (segment != NULL);
1022
1023   if (pending_event == NULL)
1024     goto out;
1025
1026   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1027       timestamp == GST_CLOCK_TIME_NONE)
1028     goto out;
1029
1030   running_time = timestamp;
1031
1032   GST_INFO ("now %" GST_TIME_FORMAT " wanted %" GST_TIME_FORMAT,
1033       GST_TIME_ARGS (running_time), GST_TIME_ARGS (pending_key_unit_ts));
1034   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1035       running_time < pending_key_unit_ts)
1036     goto out;
1037
1038   if (flags & GST_BUFFER_FLAG_DELTA_UNIT) {
1039     GST_INFO ("pending force key unit, waiting for keyframe");
1040     goto out;
1041   }
1042
1043   stream_time = gst_segment_to_stream_time (segment,
1044       GST_FORMAT_TIME, timestamp);
1045
1046   if (GST_EVENT_TYPE (pending_event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
1047     gst_video_event_parse_downstream_force_key_unit (pending_event,
1048         NULL, NULL, NULL, &all_headers, &count);
1049   } else {
1050     gst_video_event_parse_upstream_force_key_unit (pending_event, NULL,
1051         &all_headers, &count);
1052   }
1053
1054   event =
1055       gst_video_event_new_downstream_force_key_unit (timestamp, stream_time,
1056       running_time, all_headers, count);
1057   gst_event_set_seqnum (event, gst_event_get_seqnum (pending_event));
1058
1059 out:
1060   return event;
1061 }
1062
1063 /* Called when the TsMux has prepared a packet for output. Return FALSE
1064  * on error */
1065 static gboolean
1066 new_packet_cb (GstBuffer * buf, void *user_data, gint64 new_pcr)
1067 {
1068   GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1069   GstAggregator *agg = GST_AGGREGATOR (mux);
1070   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1071   GstMapInfo map;
1072   GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1073
1074   g_assert (klass->output_packet);
1075
1076   gst_buffer_map (buf, &map, GST_MAP_READWRITE);
1077
1078   if (!GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1079     /* tsmux isn't generating timestamps. Use the input times */
1080     GST_BUFFER_PTS (buf) = mux->last_ts;
1081   }
1082
1083   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1084     if (!GST_CLOCK_TIME_IS_VALID (mux->output_ts_offset)) {
1085       GstClockTime output_start_time = agg_segment->position;
1086       if (agg_segment->position == -1
1087           || agg_segment->position < agg_segment->start) {
1088         output_start_time = agg_segment->start;
1089       }
1090
1091       mux->output_ts_offset =
1092           GST_CLOCK_DIFF (GST_BUFFER_PTS (buf), output_start_time);
1093
1094       GST_DEBUG_OBJECT (mux, "New output ts offset %" GST_STIME_FORMAT,
1095           GST_STIME_ARGS (mux->output_ts_offset));
1096     }
1097
1098     if (GST_CLOCK_TIME_IS_VALID (mux->output_ts_offset)) {
1099       GST_BUFFER_PTS (buf) += mux->output_ts_offset;
1100     }
1101   }
1102
1103   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1104     agg_segment->position = GST_BUFFER_PTS (buf);
1105   }
1106
1107   /* do common init (flags and streamheaders) */
1108   new_packet_common_init (mux, buf, map.data, map.size);
1109
1110   gst_buffer_unmap (buf, &map);
1111
1112   return klass->output_packet (mux, buf, new_pcr);
1113 }
1114
1115 /* called when TsMux needs new packet to write into */
1116 static void
1117 alloc_packet_cb (GstBuffer ** buf, void *user_data)
1118 {
1119   GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1120   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1121
1122   g_assert (klass->allocate_packet);
1123
1124   klass->allocate_packet (mux, buf);
1125 }
1126
1127 static GstFlowReturn
1128 gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
1129     GstAggregatorPad * agg_pad, GstBuffer * buf)
1130 {
1131   GstFlowReturn ret = GST_FLOW_OK;
1132   GstBaseTsMuxPad *best = GST_BASE_TS_MUX_PAD (agg_pad);
1133   TsMuxProgram *prog;
1134   gint64 pts = GST_CLOCK_STIME_NONE;
1135   gint64 dts = GST_CLOCK_STIME_NONE;
1136   gboolean delta = TRUE, header = FALSE;
1137   StreamData *stream_data;
1138   GstMpegtsSection *scte_section = NULL;
1139
1140   GST_DEBUG_OBJECT (mux, "Pads collected");
1141
1142   if (buf && gst_buffer_get_size (buf) == 0
1143       && GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP)) {
1144     gst_buffer_unref (buf);
1145     return GST_FLOW_OK;
1146   }
1147
1148   if (G_UNLIKELY (mux->first)) {
1149     ret = gst_base_ts_mux_create_streams (mux);
1150     if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1151       if (buf)
1152         gst_buffer_unref (buf);
1153       return ret;
1154     }
1155
1156     mux->first = FALSE;
1157   }
1158
1159   prog = best->prog;
1160   if (prog == NULL) {
1161     GList *cur;
1162
1163     gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best));
1164     tsmux_resend_pat (mux->tsmux);
1165     tsmux_resend_si (mux->tsmux);
1166     prog = best->prog;
1167     g_assert_nonnull (prog);
1168
1169     /* output PMT for each program */
1170     for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1171       TsMuxProgram *program = (TsMuxProgram *) cur->data;
1172
1173       tsmux_resend_pmt (program);
1174     }
1175   }
1176
1177   g_assert (buf != NULL);
1178
1179   if (best->prepare_func) {
1180     GstBuffer *tmp;
1181
1182     tmp = best->prepare_func (buf, best, mux);
1183     g_assert (tmp);
1184     gst_buffer_unref (buf);
1185     buf = tmp;
1186   }
1187
1188   if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
1189     GstEvent *event;
1190
1191     event = check_pending_key_unit_event (mux->force_key_unit_event,
1192         &agg_pad->segment, GST_BUFFER_PTS (buf),
1193         GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
1194     if (event) {
1195       GstClockTime running_time;
1196       guint count;
1197       GList *cur;
1198
1199       mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
1200       gst_event_replace (&mux->force_key_unit_event, NULL);
1201
1202       gst_video_event_parse_downstream_force_key_unit (event,
1203           NULL, NULL, &running_time, NULL, &count);
1204
1205       GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
1206           "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
1207           GST_TIME_ARGS (running_time), count);
1208       gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
1209
1210       /* output PAT, SI tables */
1211       tsmux_resend_pat (mux->tsmux);
1212       tsmux_resend_si (mux->tsmux);
1213
1214       /* output PMT for each program */
1215       for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1216         TsMuxProgram *program = (TsMuxProgram *) cur->data;
1217
1218         tsmux_resend_pmt (program);
1219       }
1220     }
1221   }
1222
1223   if (G_UNLIKELY (prog->pcr_stream == NULL)) {
1224     /* Take the first data stream for the PCR */
1225     GST_DEBUG_OBJECT (best,
1226         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
1227         best->pid, best->prog_id);
1228
1229     /* Set the chosen PCR stream */
1230     tsmux_program_set_pcr_stream (prog, best->stream);
1231   }
1232
1233   GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
1234
1235   GST_OBJECT_LOCK (mux);
1236   scte_section = mux->pending_scte35_section;
1237   mux->pending_scte35_section = NULL;
1238   GST_OBJECT_UNLOCK (mux);
1239   if (G_UNLIKELY (scte_section)) {
1240     GST_DEBUG_OBJECT (mux, "Sending pending SCTE section");
1241     if (!tsmux_send_section (mux->tsmux, scte_section))
1242       GST_ERROR_OBJECT (mux, "Error sending SCTE section !");
1243   }
1244
1245   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1246     pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
1247     GST_DEBUG_OBJECT (mux, "Buffer has PTS  %" GST_TIME_FORMAT " pts %"
1248         G_GINT64_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts);
1249   }
1250
1251   if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
1252     dts = GSTTIME_TO_MPEGTIME (best->dts);
1253     GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
1254         G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
1255   }
1256
1257   /* should not have a DTS without PTS */
1258   if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
1259     GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
1260     pts = dts;
1261   }
1262
1263   if (best->stream->is_video_stream) {
1264     delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1265     header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
1266   }
1267
1268   if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
1269     GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
1270
1271     gst_buffer_unref (buf);
1272     return GST_FLOW_OK;
1273   }
1274
1275   GST_DEBUG_OBJECT (mux, "delta: %d", delta);
1276
1277   stream_data = stream_data_new (buf);
1278   tsmux_stream_add_data (best->stream, stream_data->map_info.data,
1279       stream_data->map_info.size, stream_data, pts, dts, !delta);
1280
1281   /* outgoing ts follows ts of PCR program stream */
1282   if (prog->pcr_stream == best->stream) {
1283     /* prefer DTS if present for PCR as it should be monotone */
1284     mux->last_ts =
1285         GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
1286         GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
1287   }
1288
1289   mux->is_delta = delta;
1290   mux->is_header = header;
1291   while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
1292     if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
1293       /* Failed writing data for some reason. Set appropriate error */
1294       GST_DEBUG_OBJECT (mux, "Failed to write data packet");
1295       GST_ELEMENT_ERROR (mux, STREAM, MUX,
1296           ("Failed writing output data to stream %04x", best->stream->id),
1297           (NULL));
1298       goto write_fail;
1299     }
1300   }
1301   /* flush packet cache */
1302   return gst_base_ts_mux_push_packets (mux, FALSE);
1303
1304   /* ERRORS */
1305 write_fail:
1306   {
1307     return mux->last_flow_ret;
1308   }
1309 }
1310
1311 /* GstElement implementation */
1312 static gboolean
1313 gst_base_ts_mux_has_pad_with_pid (GstBaseTsMux * mux, guint16 pid)
1314 {
1315   GList *l;
1316   gboolean res = FALSE;
1317
1318   GST_OBJECT_LOCK (mux);
1319
1320   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1321     GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
1322
1323     if (tpad->pid == pid) {
1324       res = TRUE;
1325       break;
1326     }
1327   }
1328
1329   GST_OBJECT_UNLOCK (mux);
1330   return res;
1331 }
1332
1333 static GstPad *
1334 gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
1335     const gchar * name, const GstCaps * caps)
1336 {
1337   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1338   gint pid = -1;
1339   GstPad *pad = NULL;
1340   gchar *free_name = NULL;
1341
1342   if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
1343     if (tsmux_find_stream (mux->tsmux, pid))
1344       goto stream_exists;
1345     /* Make sure we don't use reserved PID.
1346      * FIXME : This should be extended to other variants (ex: ATSC) reserved PID */
1347     if (pid < TSMUX_START_ES_PID)
1348       goto invalid_stream_pid;
1349   } else {
1350     do {
1351       pid = tsmux_get_new_pid (mux->tsmux);
1352     } while (gst_base_ts_mux_has_pad_with_pid (mux, pid));
1353
1354     /* Name the pad correctly after the selected pid */
1355     name = free_name = g_strdup_printf ("sink_%d", pid);
1356   }
1357
1358   pad = (GstPad *)
1359       GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
1360       templ, name, caps);
1361
1362   gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (pad));
1363   GST_BASE_TS_MUX_PAD (pad)->pid = pid;
1364
1365   g_free (free_name);
1366
1367   return pad;
1368
1369   /* ERRORS */
1370 stream_exists:
1371   {
1372     GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"),
1373         (NULL));
1374     return NULL;
1375   }
1376
1377 invalid_stream_pid:
1378   {
1379     GST_ELEMENT_ERROR (element, STREAM, MUX,
1380         ("Invalid Elementary stream PID (0x%02u < 0x40)", pid), (NULL));
1381     return NULL;
1382   }
1383 }
1384
1385 static void
1386 gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
1387 {
1388   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1389
1390   if (mux->tsmux) {
1391     GList *cur;
1392     GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
1393     gint pid = ts_pad->pid;
1394
1395     if (ts_pad->prog) {
1396       if (ts_pad->prog->pcr_stream == ts_pad->stream) {
1397         tsmux_program_set_pcr_stream (ts_pad->prog, NULL);
1398       }
1399       if (tsmux_remove_stream (mux->tsmux, pid, ts_pad->prog)) {
1400         g_hash_table_remove (mux->programs, GINT_TO_POINTER (ts_pad->prog_id));
1401       }
1402     }
1403
1404     tsmux_resend_pat (mux->tsmux);
1405     tsmux_resend_si (mux->tsmux);
1406
1407     /* output PMT for each program */
1408     for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1409       TsMuxProgram *program = (TsMuxProgram *) cur->data;
1410
1411       tsmux_resend_pmt (program);
1412     }
1413   }
1414
1415   GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad);
1416 }
1417
1418 static gboolean
1419 gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
1420 {
1421   GstMpegtsSection *section;
1422   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1423
1424   section = gst_event_parse_mpegts_section (event);
1425
1426   if (section) {
1427     GST_DEBUG ("Received event with mpegts section");
1428
1429     if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
1430       /* Will be sent from the streaming threads */
1431       GST_DEBUG_OBJECT (mux, "Storing SCTE event");
1432       GST_OBJECT_LOCK (element);
1433       if (mux->pending_scte35_section)
1434         gst_mpegts_section_unref (mux->pending_scte35_section);
1435       mux->pending_scte35_section = section;
1436       GST_OBJECT_UNLOCK (element);
1437     } else {
1438       /* TODO: Check that the section type is supported */
1439       tsmux_add_mpegts_si_section (mux->tsmux, section);
1440     }
1441
1442     gst_event_unref (event);
1443
1444     return TRUE;
1445   }
1446
1447   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1448 }
1449
1450 /* GstAggregator implementation */
1451
1452 static gboolean
1453 gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
1454     GstEvent * event)
1455 {
1456   GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
1457   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
1458   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (agg_pad);
1459   gboolean res = FALSE;
1460   gboolean forward = TRUE;
1461
1462   switch (GST_EVENT_TYPE (event)) {
1463     case GST_EVENT_CUSTOM_DOWNSTREAM:
1464     {
1465       GstClockTime timestamp, stream_time, running_time;
1466       gboolean all_headers;
1467       guint count;
1468
1469       if (!gst_video_event_is_force_key_unit (event))
1470         goto out;
1471
1472       res = TRUE;
1473       forward = FALSE;
1474
1475       gst_video_event_parse_downstream_force_key_unit (event,
1476           &timestamp, &stream_time, &running_time, &all_headers, &count);
1477       GST_INFO_OBJECT (ts_pad, "have downstream force-key-unit event, "
1478           "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
1479           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
1480
1481       if (mux->force_key_unit_event != NULL) {
1482         GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
1483             "as an upstream force key unit is already queued");
1484         goto out;
1485       }
1486
1487       if (!all_headers)
1488         goto out;
1489
1490       mux->pending_key_unit_ts = running_time;
1491       gst_event_replace (&mux->force_key_unit_event, event);
1492       break;
1493     }
1494     case GST_EVENT_TAG:{
1495       GstTagList *list;
1496       gchar *lang = NULL;
1497
1498       GST_DEBUG_OBJECT (mux, "received tag event");
1499       gst_event_parse_tag (event, &list);
1500
1501       /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
1502       if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
1503         const gchar *lang_code;
1504
1505         lang_code = gst_tag_get_language_code_iso_639_2B (lang);
1506         if (lang_code) {
1507           GST_DEBUG_OBJECT (ts_pad, "Setting language to '%s'", lang_code);
1508
1509           g_free (ts_pad->language);
1510           ts_pad->language = g_strdup (lang_code);
1511         } else {
1512           GST_WARNING_OBJECT (ts_pad, "Did not get language code for '%s'",
1513               lang);
1514         }
1515         g_free (lang);
1516       }
1517
1518       /* handled this, don't want collectpads to forward it downstream */
1519       res = TRUE;
1520       forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
1521       break;
1522     }
1523     case GST_EVENT_STREAM_START:{
1524       GstStreamFlags flags;
1525
1526       gst_event_parse_stream_flags (event, &flags);
1527
1528       /* Don't wait for data on sparse inputs like metadata streams */
1529       /*
1530          if ((flags & GST_STREAM_FLAG_SPARSE)) {
1531          GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
1532          gst_collect_pads_set_waiting (pads, data, FALSE);
1533          GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
1534          }
1535        */
1536       break;
1537     }
1538     default:
1539       break;
1540   }
1541
1542 out:
1543   if (!forward)
1544     gst_event_unref (event);
1545   else
1546     res = agg_class->sink_event (agg, agg_pad, event);
1547
1548   return res;
1549 }
1550
1551 static gboolean
1552 gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
1553 {
1554   GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
1555   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
1556   gboolean res = TRUE, forward = TRUE;
1557
1558   switch (GST_EVENT_TYPE (event)) {
1559     case GST_EVENT_CUSTOM_UPSTREAM:
1560     {
1561       GstIterator *iter;
1562       GValue sinkpad_value = G_VALUE_INIT;
1563       GstClockTime running_time;
1564       gboolean all_headers, done = FALSE, res = FALSE;
1565       guint count;
1566
1567       if (!gst_video_event_is_force_key_unit (event))
1568         break;
1569
1570       forward = FALSE;
1571
1572       gst_video_event_parse_upstream_force_key_unit (event,
1573           &running_time, &all_headers, &count);
1574
1575       GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
1576           "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
1577           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
1578           all_headers, count);
1579
1580       if (!all_headers)
1581         break;
1582
1583       mux->pending_key_unit_ts = running_time;
1584       gst_event_replace (&mux->force_key_unit_event, event);
1585
1586       iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
1587
1588       while (!done) {
1589         switch (gst_iterator_next (iter, &sinkpad_value)) {
1590           case GST_ITERATOR_OK:{
1591             GstPad *sinkpad = g_value_get_object (&sinkpad_value);
1592             gboolean tmp;
1593
1594             GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
1595             tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
1596             GST_INFO_OBJECT (mux, "result %d", tmp);
1597             /* succeed if at least one pad succeeds */
1598             res |= tmp;
1599             break;
1600           }
1601           case GST_ITERATOR_DONE:
1602             done = TRUE;
1603             break;
1604           case GST_ITERATOR_RESYNC:
1605             gst_iterator_resync (iter);
1606             break;
1607           case GST_ITERATOR_ERROR:
1608             g_assert_not_reached ();
1609             break;
1610         }
1611         g_value_reset (&sinkpad_value);
1612       }
1613       g_value_unset (&sinkpad_value);
1614       gst_iterator_free (iter);
1615       break;
1616     }
1617     default:
1618       break;
1619   }
1620
1621   if (forward)
1622     res = agg_class->src_event (agg, event);
1623   else
1624     gst_event_unref (event);
1625
1626   return res;
1627 }
1628
1629 static GstBuffer *
1630 gst_base_ts_mux_clip (GstAggregator * agg,
1631     GstAggregatorPad * agg_pad, GstBuffer * buf)
1632 {
1633   GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (agg_pad);
1634   GstClockTime time;
1635   GstBuffer *ret;
1636
1637   ret = buf;
1638
1639   /* PTS */
1640   time = GST_BUFFER_PTS (buf);
1641
1642   /* invalid left alone and passed */
1643   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
1644     time =
1645         gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME, time);
1646     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
1647       GST_DEBUG_OBJECT (pad, "clipping buffer on pad outside segment");
1648       gst_buffer_unref (buf);
1649       ret = NULL;
1650       goto beach;
1651     } else {
1652       GST_LOG_OBJECT (pad, "buffer pts %" GST_TIME_FORMAT " ->  %"
1653           GST_TIME_FORMAT " running time",
1654           GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
1655       buf = ret = gst_buffer_make_writable (buf);
1656       GST_BUFFER_PTS (ret) = time;
1657     }
1658   }
1659
1660   /* DTS */
1661   time = GST_BUFFER_DTS (buf);
1662
1663   /* invalid left alone and passed */
1664   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
1665     gint sign;
1666     gint64 dts;
1667
1668     sign = gst_segment_to_running_time_full (&agg_pad->segment, GST_FORMAT_TIME,
1669         time, &time);
1670
1671     if (sign > 0)
1672       dts = (gint64) time;
1673     else
1674       dts = -((gint64) time);
1675
1676     GST_LOG_OBJECT (pad, "buffer dts %" GST_TIME_FORMAT " -> %"
1677         GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
1678         GST_STIME_ARGS (dts));
1679
1680     if (GST_CLOCK_STIME_IS_VALID (pad->dts) && dts < pad->dts) {
1681       /* Ignore DTS going backward */
1682       GST_WARNING_OBJECT (pad, "ignoring DTS going backward");
1683       dts = pad->dts;
1684     }
1685
1686     ret = gst_buffer_make_writable (buf);
1687     if (sign > 0)
1688       GST_BUFFER_DTS (ret) = time;
1689     else
1690       GST_BUFFER_DTS (ret) = GST_CLOCK_TIME_NONE;
1691
1692     pad->dts = dts;
1693   } else {
1694     pad->dts = GST_CLOCK_STIME_NONE;
1695   }
1696
1697 beach:
1698   return ret;
1699 }
1700
1701 static GstFlowReturn
1702 gst_base_ts_mux_update_src_caps (GstAggregator * agg, GstCaps * caps,
1703     GstCaps ** ret)
1704 {
1705   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
1706   GstStructure *s;
1707
1708   *ret = gst_caps_copy (caps);
1709   s = gst_caps_get_structure (*ret, 0);
1710   gst_structure_set (s, "packetsize", G_TYPE_INT, mux->packet_size, NULL);
1711
1712   return GST_FLOW_OK;
1713 }
1714
1715 static GstBaseTsMuxPad *
1716 gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
1717 {
1718   GstBaseTsMuxPad *best = NULL;
1719   GstClockTime best_ts = GST_CLOCK_TIME_NONE;
1720   GList *l;
1721
1722   GST_OBJECT_LOCK (aggregator);
1723
1724   for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
1725     GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
1726     GstAggregatorPad *apad = GST_AGGREGATOR_PAD_CAST (tpad);
1727     GstBuffer *buffer;
1728
1729     buffer = gst_aggregator_pad_peek_buffer (apad);
1730     if (!buffer)
1731       continue;
1732     if (best_ts == GST_CLOCK_TIME_NONE) {
1733       best = tpad;
1734       best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
1735     } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
1736       GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
1737       if (t < best_ts) {
1738         best = tpad;
1739         best_ts = t;
1740       }
1741     }
1742     gst_buffer_unref (buffer);
1743   }
1744
1745   if (best)
1746     gst_object_ref (best);
1747
1748   GST_OBJECT_UNLOCK (aggregator);
1749
1750   GST_DEBUG_OBJECT (aggregator,
1751       "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
1752       GST_TIME_ARGS (best_ts), best);
1753
1754   return best;
1755 }
1756
1757 static gboolean
1758 gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
1759 {
1760   GList *l;
1761   gboolean ret = TRUE;
1762
1763   GST_OBJECT_LOCK (mux);
1764
1765   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1766     GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
1767
1768     if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) {
1769       ret = FALSE;
1770       break;
1771     }
1772   }
1773
1774   GST_OBJECT_UNLOCK (mux);
1775
1776   return ret;
1777 }
1778
1779
1780 static GstFlowReturn
1781 gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
1782 {
1783   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
1784   GstFlowReturn ret = GST_FLOW_OK;
1785   GstBaseTsMuxPad *best = gst_base_ts_mux_find_best_pad (agg);
1786
1787   if (best) {
1788     GstBuffer *buffer;
1789
1790     buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best));
1791
1792     ret =
1793         gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
1794         GST_AGGREGATOR_PAD (best), buffer);
1795
1796     gst_object_unref (best);
1797
1798     if (ret != GST_FLOW_OK)
1799       goto done;
1800   }
1801
1802   if (gst_base_ts_mux_are_all_pads_eos (mux)) {
1803     GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1804     /* drain some possibly cached data */
1805     if (klass->drain)
1806       klass->drain (mux);
1807     gst_base_ts_mux_push_packets (mux, TRUE);
1808
1809     ret = GST_FLOW_EOS;
1810   }
1811
1812 done:
1813   return ret;
1814 }
1815
1816 static gboolean
1817 gst_base_ts_mux_start (GstAggregator * agg)
1818 {
1819   gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
1820
1821   return TRUE;
1822 }
1823
1824 static gboolean
1825 gst_base_ts_mux_stop (GstAggregator * agg)
1826 {
1827   gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
1828
1829   return TRUE;
1830 }
1831
1832 /* GObject implementation */
1833
1834 static void
1835 gst_base_ts_mux_dispose (GObject * object)
1836 {
1837   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
1838
1839   gst_base_ts_mux_reset (mux, FALSE);
1840
1841   if (mux->out_adapter) {
1842     g_object_unref (mux->out_adapter);
1843     mux->out_adapter = NULL;
1844   }
1845   if (mux->prog_map) {
1846     gst_structure_free (mux->prog_map);
1847     mux->prog_map = NULL;
1848   }
1849   if (mux->programs) {
1850     g_hash_table_destroy (mux->programs);
1851     mux->programs = NULL;
1852   }
1853   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
1854 }
1855
1856 static void
1857 gst_base_ts_mux_constructed (GObject * object)
1858 {
1859   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
1860
1861   /* initial state */
1862   gst_base_ts_mux_reset (mux, TRUE);
1863 }
1864
1865 static void
1866 gst_base_ts_mux_set_property (GObject * object, guint prop_id,
1867     const GValue * value, GParamSpec * pspec)
1868 {
1869   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
1870   GList *l;
1871
1872   switch (prop_id) {
1873     case PROP_PROG_MAP:
1874     {
1875       const GstStructure *s = gst_value_get_structure (value);
1876       if (mux->prog_map) {
1877         gst_structure_free (mux->prog_map);
1878       }
1879       if (s)
1880         mux->prog_map = gst_structure_copy (s);
1881       else
1882         mux->prog_map = NULL;
1883       break;
1884     }
1885     case PROP_PAT_INTERVAL:
1886       mux->pat_interval = g_value_get_uint (value);
1887       if (mux->tsmux)
1888         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
1889       break;
1890     case PROP_PMT_INTERVAL:
1891       mux->pmt_interval = g_value_get_uint (value);
1892       GST_OBJECT_LOCK (mux);
1893       for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1894         GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
1895
1896         tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
1897       }
1898       GST_OBJECT_UNLOCK (mux);
1899       break;
1900     case PROP_ALIGNMENT:
1901       mux->alignment = g_value_get_int (value);
1902       break;
1903     case PROP_SI_INTERVAL:
1904       mux->si_interval = g_value_get_uint (value);
1905       tsmux_set_si_interval (mux->tsmux, mux->si_interval);
1906       break;
1907     case PROP_BITRATE:
1908       mux->bitrate = g_value_get_uint64 (value);
1909       if (mux->tsmux)
1910         tsmux_set_bitrate (mux->tsmux, mux->bitrate);
1911       break;
1912     case PROP_PCR_INTERVAL:
1913       mux->pcr_interval = g_value_get_uint (value);
1914       if (mux->tsmux)
1915         tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
1916       break;
1917     case PROP_SCTE_35_PID:
1918       mux->scte35_pid = g_value_get_uint (value);
1919       break;
1920     case PROP_SCTE_35_NULL_INTERVAL:
1921       mux->scte35_null_interval = g_value_get_uint (value);
1922       break;
1923     default:
1924       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1925       break;
1926   }
1927 }
1928
1929 static void
1930 gst_base_ts_mux_get_property (GObject * object, guint prop_id,
1931     GValue * value, GParamSpec * pspec)
1932 {
1933   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
1934
1935   switch (prop_id) {
1936     case PROP_PROG_MAP:
1937       gst_value_set_structure (value, mux->prog_map);
1938       break;
1939     case PROP_PAT_INTERVAL:
1940       g_value_set_uint (value, mux->pat_interval);
1941       break;
1942     case PROP_PMT_INTERVAL:
1943       g_value_set_uint (value, mux->pmt_interval);
1944       break;
1945     case PROP_ALIGNMENT:
1946       g_value_set_int (value, mux->alignment);
1947       break;
1948     case PROP_SI_INTERVAL:
1949       g_value_set_uint (value, mux->si_interval);
1950       break;
1951     case PROP_BITRATE:
1952       g_value_set_uint64 (value, mux->bitrate);
1953       break;
1954     case PROP_PCR_INTERVAL:
1955       g_value_set_uint (value, mux->pcr_interval);
1956       break;
1957     case PROP_SCTE_35_PID:
1958       g_value_set_uint (value, mux->scte35_pid);
1959       break;
1960     case PROP_SCTE_35_NULL_INTERVAL:
1961       g_value_set_uint (value, mux->scte35_null_interval);
1962       break;
1963     default:
1964       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1965       break;
1966   }
1967 }
1968
1969 /* Default vmethods implementation */
1970
1971 static TsMux *
1972 gst_base_ts_mux_default_create_ts_mux (GstBaseTsMux * mux)
1973 {
1974   TsMux *tsmux = tsmux_new ();
1975   tsmux_set_write_func (tsmux, new_packet_cb, mux);
1976   tsmux_set_alloc_func (tsmux, alloc_packet_cb, mux);
1977   tsmux_set_pat_interval (tsmux, mux->pat_interval);
1978   tsmux_set_si_interval (tsmux, mux->si_interval);
1979   tsmux_set_bitrate (tsmux, mux->bitrate);
1980   tsmux_set_pcr_interval (tsmux, mux->pcr_interval);
1981
1982   return tsmux;
1983 }
1984
1985 static void
1986 gst_base_ts_mux_default_allocate_packet (GstBaseTsMux * mux,
1987     GstBuffer ** buffer)
1988 {
1989   GstBuffer *buf;
1990
1991   buf = gst_buffer_new_and_alloc (mux->packet_size);
1992
1993   *buffer = buf;
1994 }
1995
1996 static gboolean
1997 gst_base_ts_mux_default_output_packet (GstBaseTsMux * mux, GstBuffer * buffer,
1998     gint64 new_pcr)
1999 {
2000   gst_base_ts_mux_collect_packet (mux, buffer);
2001
2002   return TRUE;
2003 }
2004
2005 /* Subclass API */
2006
2007 void
2008 gst_base_ts_mux_set_packet_size (GstBaseTsMux * mux, gsize size)
2009 {
2010   mux->packet_size = size;
2011 }
2012
2013 void
2014 gst_base_ts_mux_set_automatic_alignment (GstBaseTsMux * mux, gsize alignment)
2015 {
2016   mux->automatic_alignment = alignment;
2017 }
2018
2019 static void
2020 gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
2021 {
2022   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
2023   GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass);
2024   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
2025
2026   GST_DEBUG_CATEGORY_INIT (gst_base_ts_mux_debug, "basetsmux", 0,
2027       "MPEG Transport Stream muxer");
2028
2029   gst_element_class_set_static_metadata (gstelement_class,
2030       "MPEG Transport Stream Muxer", "Codec/Muxer",
2031       "Multiplexes media streams into an MPEG Transport Stream",
2032       "Fluendo <contact@fluendo.com>");
2033
2034   gobject_class->set_property =
2035       GST_DEBUG_FUNCPTR (gst_base_ts_mux_set_property);
2036   gobject_class->get_property =
2037       GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property);
2038   gobject_class->dispose = gst_base_ts_mux_dispose;
2039   gobject_class->constructed = gst_base_ts_mux_constructed;
2040
2041   gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
2042   gstelement_class->release_pad = gst_base_ts_mux_release_pad;
2043   gstelement_class->send_event = gst_base_ts_mux_send_event;
2044
2045   gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps;
2046   gstagg_class->aggregate = gst_base_ts_mux_aggregate;
2047   gstagg_class->clip = gst_base_ts_mux_clip;
2048   gstagg_class->sink_event = gst_base_ts_mux_sink_event;
2049   gstagg_class->src_event = gst_base_ts_mux_src_event;
2050   gstagg_class->start = gst_base_ts_mux_start;
2051   gstagg_class->stop = gst_base_ts_mux_stop;
2052
2053   klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux;
2054   klass->allocate_packet = gst_base_ts_mux_default_allocate_packet;
2055   klass->output_packet = gst_base_ts_mux_default_output_packet;
2056
2057   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PROG_MAP,
2058       g_param_spec_boxed ("prog-map", "Program map",
2059           "A GstStructure specifies the mapping from elementary streams to programs",
2060           GST_TYPE_STRUCTURE,
2061           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2062
2063   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PAT_INTERVAL,
2064       g_param_spec_uint ("pat-interval", "PAT interval",
2065           "Set the interval (in ticks of the 90kHz clock) for writing out the PAT table",
2066           1, G_MAXUINT, TSMUX_DEFAULT_PAT_INTERVAL,
2067           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2068
2069   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PMT_INTERVAL,
2070       g_param_spec_uint ("pmt-interval", "PMT interval",
2071           "Set the interval (in ticks of the 90kHz clock) for writing out the PMT table",
2072           1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL,
2073           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2074
2075   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_ALIGNMENT,
2076       g_param_spec_int ("alignment", "packet alignment",
2077           "Number of packets per buffer (padded with dummy packets on EOS) "
2078           "(-1 = auto, 0 = all available packets, 7 for UDP streaming)",
2079           -1, G_MAXINT, BASETSMUX_DEFAULT_ALIGNMENT,
2080           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2081
2082   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SI_INTERVAL,
2083       g_param_spec_uint ("si-interval", "SI interval",
2084           "Set the interval (in ticks of the 90kHz clock) for writing out the Service"
2085           "Information tables", 1, G_MAXUINT, TSMUX_DEFAULT_SI_INTERVAL,
2086           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2087
2088   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BITRATE,
2089       g_param_spec_uint64 ("bitrate", "Bitrate (in bits per second)",
2090           "Set the target bitrate, will insert null packets as padding "
2091           " to achieve multiplex-wide constant bitrate (0 means no padding)",
2092           0, G_MAXUINT64, TSMUX_DEFAULT_BITRATE,
2093           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2094
2095   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PCR_INTERVAL,
2096       g_param_spec_uint ("pcr-interval", "PCR interval",
2097           "Set the interval (in ticks of the 90kHz clock) for writing PCR",
2098           1, G_MAXUINT, TSMUX_DEFAULT_PCR_INTERVAL,
2099           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2100
2101   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SCTE_35_PID,
2102       g_param_spec_uint ("scte-35-pid", "SCTE-35 PID",
2103           "PID to use for inserting SCTE-35 packets (0: unused)",
2104           0, G_MAXUINT, DEFAULT_SCTE_35_PID,
2105           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2106
2107   g_object_class_install_property (G_OBJECT_CLASS (klass),
2108       PROP_SCTE_35_NULL_INTERVAL, g_param_spec_uint ("scte-35-null-interval",
2109           "SCTE-35 NULL packet interval",
2110           "Set the interval (in ticks of the 90kHz clock) for writing SCTE-35 NULL (heartbeat) packets."
2111           " (only valid if scte-35-pid is different from 0)", 1, G_MAXUINT,
2112           TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL,
2113           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2114
2115   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
2116       &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
2117
2118   gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX_PAD, 0);
2119 }
2120
2121 static void
2122 gst_base_ts_mux_init (GstBaseTsMux * mux)
2123 {
2124   mux->out_adapter = gst_adapter_new ();
2125
2126   /* properties */
2127   mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL;
2128   mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL;
2129   mux->si_interval = TSMUX_DEFAULT_SI_INTERVAL;
2130   mux->pcr_interval = TSMUX_DEFAULT_PCR_INTERVAL;
2131   mux->prog_map = NULL;
2132   mux->alignment = BASETSMUX_DEFAULT_ALIGNMENT;
2133   mux->bitrate = TSMUX_DEFAULT_BITRATE;
2134   mux->scte35_pid = DEFAULT_SCTE_35_PID;
2135   mux->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
2136
2137   mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
2138   mux->automatic_alignment = 0;
2139 }