mpegtsmux: Use GST_CLOCK_STIME_NONE for output_ts_offset
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / mpegtsmux / gstbasetsmux.c
1 /*
2  * Copyright 2006, 2007, 2008, 2009, 2010 Fluendo S.A.
3  *  Authors: Jan Schmidt <jan@fluendo.com>
4  *           Kapil Agrawal <kapil@fluendo.com>
5  *           Julien Moutte <julien@fluendo.com>
6  *
7  * Copyright (C) 2011 Jan Schmidt <thaytan@noraisin.net>
8  *
9  * This library is licensed under 3 different licenses and you
10  * can choose to use it under the terms of any one of them. The
11  * three licenses are the MPL 1.1, the LGPL and the MIT license.
12  *
13  * MPL:
14  *
15  * The contents of this file are subject to the Mozilla Public License
16  * Version 1.1 (the "License"); you may not use this file except in
17  * compliance with the License. You may obtain a copy of the License at
18  * http://www.mozilla.org/MPL/.
19  *
20  * Software distributed under the License is distributed on an "AS IS"
21  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
22  * License for the specific language governing rights and limitations
23  * under the License.
24  *
25  * LGPL:
26  *
27  * This library is free software; you can redistribute it and/or
28  * modify it under the terms of the GNU Library General Public
29  * License as published by the Free Software Foundation; either
30  * version 2 of the License, or (at your option) any later version.
31  *
32  * This library is distributed in the hope that it will be useful,
33  * but WITHOUT ANY WARRANTY; without even the implied warranty of
34  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
35  * Library General Public License for more details.
36  *
37  * You should have received a copy of the GNU Library General Public
38  * License along with this library; if not, write to the
39  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
40  * Boston, MA 02110-1301, USA.
41  *
42  * MIT:
43  *
44  * Unless otherwise indicated, Source Code is licensed under MIT license.
45  * See further explanation attached in License Statement (distributed in the file
46  * LICENSE).
47  *
48  * Permission is hereby granted, free of charge, to any person obtaining a copy of
49  * this software and associated documentation files (the "Software"), to deal in
50  * the Software without restriction, including without limitation the rights to
51  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
52  * of the Software, and to permit persons to whom the Software is furnished to do
53  * so, subject to the following conditions:
54  *
55  * The above copyright notice and this permission notice shall be included in all
56  * copies or substantial portions of the Software.
57  *
58  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
59  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
60  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
61  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
62  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
63  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
64  * SOFTWARE.
65  *
66  * SPDX-License-Identifier: MPL-1.1 OR MIT OR LGPL-2.0-or-later
67  */
68 #ifdef HAVE_CONFIG_H
69 #include "config.h"
70 #endif
71
72 #include <stdio.h>
73 #include <string.h>
74
75 #include <gst/tag/tag.h>
76 #include <gst/video/video.h>
77 #include <gst/mpegts/mpegts.h>
78 #include <gst/pbutils/pbutils.h>
79 #include <gst/videoparsers/gstjpeg2000parse.h>
80 #include <gst/video/video-color.h>
81
82 #include "gstbasetsmux.h"
83 #include "gstbasetsmuxaac.h"
84 #include "gstbasetsmuxttxt.h"
85 #include "gstbasetsmuxopus.h"
86 #include "gstbasetsmuxjpeg2000.h"
87
88 GST_DEBUG_CATEGORY (gst_base_ts_mux_debug);
89 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
90
91 /* GstBaseTsMuxPad */
92
93 G_DEFINE_TYPE (GstBaseTsMuxPad, gst_base_ts_mux_pad, GST_TYPE_AGGREGATOR_PAD);
94
95 /* Internals */
96
97 static void
98 gst_base_ts_mux_pad_reset (GstBaseTsMuxPad * pad)
99 {
100   pad->dts = GST_CLOCK_STIME_NONE;
101   pad->prog_id = -1;
102
103   if (pad->free_func)
104     pad->free_func (pad->prepare_data);
105   pad->prepare_data = NULL;
106   pad->prepare_func = NULL;
107   pad->free_func = NULL;
108
109   if (pad->codec_data)
110     gst_buffer_replace (&pad->codec_data, NULL);
111
112   /* reference owned elsewhere */
113   pad->stream = NULL;
114   pad->prog = NULL;
115
116   if (pad->language) {
117     g_free (pad->language);
118     pad->language = NULL;
119   }
120 }
121
122 /* GstAggregatorPad implementation */
123
124 static GstFlowReturn
125 gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
126 {
127   GList *cur;
128   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
129
130   /* Send initial segments again after a flush-stop, and also resend the
131    * header sections */
132   g_mutex_lock (&mux->lock);
133   mux->first = TRUE;
134
135   /* output PAT, SI tables */
136   tsmux_resend_pat (mux->tsmux);
137   tsmux_resend_si (mux->tsmux);
138
139   /* output PMT for each program */
140   for (cur = mux->tsmux->programs; cur; cur = cur->next) {
141     TsMuxProgram *program = (TsMuxProgram *) cur->data;
142
143     tsmux_resend_pmt (program);
144   }
145   g_mutex_unlock (&mux->lock);
146
147   return GST_FLOW_OK;
148 }
149
150 /* GObject implementation */
151
152 static void
153 gst_base_ts_mux_pad_dispose (GObject * obj)
154 {
155   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (obj);
156
157   gst_base_ts_mux_pad_reset (ts_pad);
158
159   G_OBJECT_CLASS (gst_base_ts_mux_pad_parent_class)->dispose (obj);
160 }
161
162 static void
163 gst_base_ts_mux_pad_class_init (GstBaseTsMuxPadClass * klass)
164 {
165   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
166   GstAggregatorPadClass *gstaggpad_class = GST_AGGREGATOR_PAD_CLASS (klass);
167
168   gobject_class->dispose = gst_base_ts_mux_pad_dispose;
169   gstaggpad_class->flush = gst_base_ts_mux_pad_flush;
170
171   gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX, 0);
172 }
173
174 static void
175 gst_base_ts_mux_pad_init (GstBaseTsMuxPad * vaggpad)
176 {
177 }
178
179 /* GstBaseTsMux */
180
181 enum
182 {
183   PROP_0,
184   PROP_PROG_MAP,
185   PROP_PAT_INTERVAL,
186   PROP_PMT_INTERVAL,
187   PROP_ALIGNMENT,
188   PROP_SI_INTERVAL,
189   PROP_BITRATE,
190   PROP_PCR_INTERVAL,
191   PROP_SCTE_35_PID,
192   PROP_SCTE_35_NULL_INTERVAL
193 };
194
195 #define DEFAULT_SCTE_35_PID 0
196
197 #define BASETSMUX_DEFAULT_ALIGNMENT    -1
198
199 #define CLOCK_BASE 9LL
200 #define CLOCK_FREQ (CLOCK_BASE * 10000) /* 90 kHz PTS clock */
201 #define CLOCK_FREQ_SCR (CLOCK_FREQ * 300)       /* 27 MHz SCR clock */
202 #define TS_MUX_CLOCK_BASE (TSMUX_CLOCK_FREQ * 10 * 360)
203
204 #define GSTTIME_TO_MPEGTIME(time) \
205     (((time) > 0 ? (gint64) 1 : (gint64) -1) * \
206     (gint64) gst_util_uint64_scale (ABS(time), CLOCK_BASE, GST_MSECOND/10))
207 /* 27 MHz SCR conversions: */
208 #define MPEG_SYS_TIME_TO_GSTTIME(time) (gst_util_uint64_scale ((time), \
209                         GST_USECOND, CLOCK_FREQ_SCR / 1000000))
210 #define GSTTIME_TO_MPEG_SYS_TIME(time) (gst_util_uint64_scale ((time), \
211                         CLOCK_FREQ_SCR / 1000000, GST_USECOND))
212
213 #define DEFAULT_PROG_ID 0
214
215 static GstStaticPadTemplate gst_base_ts_mux_src_factory =
216 GST_STATIC_PAD_TEMPLATE ("src",
217     GST_PAD_SRC,
218     GST_PAD_ALWAYS,
219     GST_STATIC_CAPS ("video/mpegts, "
220         "systemstream = (boolean) true, " "packetsize = (int) { 188, 192} ")
221     );
222
223 typedef struct
224 {
225   GstMapInfo map_info;
226   GstBuffer *buffer;
227 } StreamData;
228
229 G_DEFINE_TYPE_WITH_CODE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_AGGREGATOR,
230     gst_mpegts_initialize ());
231
232 /* Internals */
233
234 /* Takes over the ref on the buffer */
235 static StreamData *
236 stream_data_new (GstBuffer * buffer)
237 {
238   StreamData *res = g_new (StreamData, 1);
239   res->buffer = buffer;
240   gst_buffer_map (buffer, &(res->map_info), GST_MAP_READ);
241
242   return res;
243 }
244
245 static void
246 stream_data_free (StreamData * data)
247 {
248   if (data) {
249     gst_buffer_unmap (data->buffer, &data->map_info);
250     gst_buffer_unref (data->buffer);
251     g_free (data);
252   }
253 }
254
255 #define parent_class gst_base_ts_mux_parent_class
256
257 static void
258 gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
259 {
260   GstBuffer *buf;
261   GstStructure *structure;
262   GValue array = { 0 };
263   GValue value = { 0 };
264   GstCaps *caps;
265
266   caps = gst_pad_get_current_caps (GST_AGGREGATOR_SRC_PAD (mux));
267
268   /* If we have no caps, we are possibly shutting down */
269   if (!caps)
270     return;
271
272   caps = gst_caps_make_writable (caps);
273   structure = gst_caps_get_structure (caps, 0);
274
275   g_value_init (&array, GST_TYPE_ARRAY);
276
277   GST_LOG_OBJECT (mux, "setting %u packets into streamheader",
278       g_queue_get_length (&mux->streamheader));
279
280   while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader)))) {
281     g_value_init (&value, GST_TYPE_BUFFER);
282     gst_value_take_buffer (&value, buf);
283     gst_value_array_append_value (&array, &value);
284     g_value_unset (&value);
285   }
286
287   gst_structure_set_value (structure, "streamheader", &array);
288   gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
289   g_value_unset (&array);
290   gst_caps_unref (caps);
291 }
292
293 static gboolean
294 steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section,
295     TsMux * mux)
296 {
297   g_hash_table_insert (mux->si_sections, type, section);
298
299   return TRUE;
300 }
301
302 /* Must be called with mux->lock held */
303 static void
304 gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
305 {
306   GstBuffer *buf;
307   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
308   GHashTable *si_sections = NULL;
309   GList *l;
310
311   mux->first = TRUE;
312   mux->last_flow_ret = GST_FLOW_OK;
313   mux->last_ts = 0;
314   mux->is_delta = TRUE;
315   mux->is_header = FALSE;
316
317   mux->streamheader_sent = FALSE;
318   mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
319   gst_event_replace (&mux->force_key_unit_event, NULL);
320
321   if (mux->out_adapter)
322     gst_adapter_clear (mux->out_adapter);
323   mux->output_ts_offset = GST_CLOCK_STIME_NONE;
324
325   if (mux->tsmux) {
326     if (mux->tsmux->si_sections)
327       si_sections = g_hash_table_ref (mux->tsmux->si_sections);
328
329     tsmux_free (mux->tsmux);
330     mux->tsmux = NULL;
331   }
332
333   if (mux->programs) {
334     g_hash_table_destroy (mux->programs);
335   }
336   mux->programs = g_hash_table_new (g_direct_hash, g_direct_equal);
337
338   while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader))))
339     gst_buffer_unref (buf);
340
341   gst_event_replace (&mux->force_key_unit_event, NULL);
342   gst_buffer_replace (&mux->out_buffer, NULL);
343
344   GST_OBJECT_LOCK (mux);
345
346   for (l = GST_ELEMENT (mux)->sinkpads; l; l = l->next) {
347     gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (l->data));
348   }
349
350   GST_OBJECT_UNLOCK (mux);
351
352   if (alloc) {
353     g_assert (klass->create_ts_mux);
354
355     mux->tsmux = klass->create_ts_mux (mux);
356
357     /* Preserve user-specified sections across resets */
358     if (si_sections)
359       g_hash_table_foreach_steal (si_sections, (GHRFunc) steal_si_section,
360           mux->tsmux);
361   }
362
363   if (si_sections)
364     g_hash_table_unref (si_sections);
365
366   mux->last_scte35_event_seqnum = GST_SEQNUM_INVALID;
367
368   if (klass->reset)
369     klass->reset (mux);
370 }
371
372 static void
373 release_buffer_cb (guint8 * data, void *user_data)
374 {
375   stream_data_free ((StreamData *) user_data);
376 }
377
378 /* Must be called with mux->lock held */
379 static GstFlowReturn
380 gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux,
381     GstBaseTsMuxPad * ts_pad, GstCaps * caps)
382 {
383   GstStructure *s;
384   guint st = TSMUX_ST_RESERVED;
385   const gchar *mt;
386   const GValue *value = NULL;
387   GstBuffer *codec_data = NULL;
388   guint8 opus_channel_config_code = 0;
389   guint16 profile = GST_JPEG2000_PARSE_PROFILE_NONE;
390   guint8 main_level = 0;
391   guint32 max_rate = 0;
392   guint8 color_spec = 0;
393   const gchar *stream_format = NULL;
394   const char *interlace_mode = NULL;
395   gchar *pmt_name;
396
397   GST_DEBUG_OBJECT (ts_pad,
398       "%s stream with PID 0x%04x for caps %" GST_PTR_FORMAT,
399       ts_pad->stream ? "Recreating" : "Creating", ts_pad->pid, caps);
400
401   s = gst_caps_get_structure (caps, 0);
402
403   mt = gst_structure_get_name (s);
404   value = gst_structure_get_value (s, "codec_data");
405   if (value != NULL)
406     codec_data = gst_value_get_buffer (value);
407
408   g_clear_pointer (&ts_pad->codec_data, gst_buffer_unref);
409   ts_pad->prepare_func = NULL;
410
411   stream_format = gst_structure_get_string (s, "stream-format");
412
413   if (strcmp (mt, "video/x-dirac") == 0) {
414     st = TSMUX_ST_VIDEO_DIRAC;
415   } else if (strcmp (mt, "audio/x-ac3") == 0) {
416     st = TSMUX_ST_PS_AUDIO_AC3;
417   } else if (strcmp (mt, "audio/x-dts") == 0) {
418     st = TSMUX_ST_PS_AUDIO_DTS;
419   } else if (strcmp (mt, "audio/x-lpcm") == 0) {
420     st = TSMUX_ST_PS_AUDIO_LPCM;
421   } else if (strcmp (mt, "video/x-h264") == 0) {
422     st = TSMUX_ST_VIDEO_H264;
423   } else if (strcmp (mt, "video/x-h265") == 0) {
424     st = TSMUX_ST_VIDEO_HEVC;
425   } else if (strcmp (mt, "audio/mpeg") == 0) {
426     gint mpegversion;
427
428     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
429       GST_ERROR_OBJECT (ts_pad, "caps missing mpegversion");
430       goto not_negotiated;
431     }
432
433     switch (mpegversion) {
434       case 1:{
435         int mpegaudioversion = 1;       /* Assume mpegaudioversion=1 for backwards compatibility */
436         (void) gst_structure_get_int (s, "mpegaudioversion", &mpegaudioversion);
437
438         if (mpegaudioversion == 1)
439           st = TSMUX_ST_AUDIO_MPEG1;
440         else
441           st = TSMUX_ST_AUDIO_MPEG2;
442         break;
443       }
444       case 2:{
445         /* mpegversion=2 in GStreamer refers to MPEG-2 Part 7 audio,  */
446
447         st = TSMUX_ST_AUDIO_AAC;
448
449         /* Check the stream format. If raw, make dummy internal codec data from the caps */
450         if (g_strcmp0 (stream_format, "raw") == 0) {
451           ts_pad->codec_data =
452               gst_base_ts_mux_aac_mpeg2_make_codec_data (mux, caps);
453           ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg2;
454           if (ts_pad->codec_data == NULL) {
455             GST_ERROR_OBJECT (mux, "Invalid or incomplete caps for MPEG-2 AAC");
456             goto not_negotiated;
457           }
458         }
459         break;
460       }
461       case 4:
462       {
463         st = TSMUX_ST_AUDIO_AAC;
464
465         /* Check the stream format. We need codec_data with RAW streams and mpegversion=4 */
466         if (g_strcmp0 (stream_format, "raw") == 0) {
467           if (codec_data) {
468             GST_DEBUG_OBJECT (ts_pad,
469                 "we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
470                 gst_buffer_get_size (codec_data));
471             ts_pad->codec_data = gst_buffer_ref (codec_data);
472             ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg4;
473           } else {
474             ts_pad->codec_data = NULL;
475             GST_ERROR_OBJECT (mux, "Need codec_data for raw MPEG-4 AAC");
476             goto not_negotiated;
477           }
478         } else if (codec_data) {
479           ts_pad->codec_data = gst_buffer_ref (codec_data);
480         } else {
481           ts_pad->codec_data = NULL;
482         }
483         break;
484       }
485       default:
486         GST_WARNING_OBJECT (ts_pad, "unsupported mpegversion %d", mpegversion);
487         goto not_negotiated;
488     }
489   } else if (strcmp (mt, "video/mpeg") == 0) {
490     gint mpegversion;
491
492     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
493       GST_ERROR_OBJECT (ts_pad, "caps missing mpegversion");
494       goto not_negotiated;
495     }
496
497     switch (mpegversion) {
498       case 1:
499         st = TSMUX_ST_VIDEO_MPEG1;
500         break;
501       case 2:
502         st = TSMUX_ST_VIDEO_MPEG2;
503         break;
504       case 4:
505         st = TSMUX_ST_VIDEO_MPEG4;
506         break;
507       default:
508         GST_WARNING_OBJECT (ts_pad, "unsupported mpegversion %d", mpegversion);
509         goto not_negotiated;
510     }
511   } else if (strcmp (mt, "subpicture/x-dvb") == 0) {
512     st = TSMUX_ST_PS_DVB_SUBPICTURE;
513   } else if (strcmp (mt, "application/x-teletext") == 0) {
514     st = TSMUX_ST_PS_TELETEXT;
515     /* needs a particularly sized layout */
516     ts_pad->prepare_func = gst_base_ts_mux_prepare_teletext;
517   } else if (strcmp (mt, "audio/x-opus") == 0) {
518     guint8 channels, mapping_family, stream_count, coupled_count;
519     guint8 channel_mapping[256];
520
521     if (!gst_codec_utils_opus_parse_caps (caps, NULL, &channels,
522             &mapping_family, &stream_count, &coupled_count, channel_mapping)) {
523       GST_ERROR_OBJECT (ts_pad, "Incomplete Opus caps");
524       goto not_negotiated;
525     }
526
527     if (channels <= 2 && mapping_family == 0) {
528       opus_channel_config_code = channels;
529     } else if (channels == 2 && mapping_family == 255 && stream_count == 1
530         && coupled_count == 1) {
531       /* Dual mono */
532       opus_channel_config_code = 0;
533     } else if (channels >= 2 && channels <= 8 && mapping_family == 1) {
534       static const guint8 coupled_stream_counts[9] = {
535         1, 0, 1, 1, 2, 2, 2, 3, 3
536       };
537       static const guint8 channel_map_a[8][8] = {
538         {0},
539         {0, 1},
540         {0, 2, 1},
541         {0, 1, 2, 3},
542         {0, 4, 1, 2, 3},
543         {0, 4, 1, 2, 3, 5},
544         {0, 4, 1, 2, 3, 5, 6},
545         {0, 6, 1, 2, 3, 4, 5, 7},
546       };
547       static const guint8 channel_map_b[8][8] = {
548         {0},
549         {0, 1},
550         {0, 1, 2},
551         {0, 1, 2, 3},
552         {0, 1, 2, 3, 4},
553         {0, 1, 2, 3, 4, 5},
554         {0, 1, 2, 3, 4, 5, 6},
555         {0, 1, 2, 3, 4, 5, 6, 7},
556       };
557
558       /* Vorbis mapping */
559       if (stream_count == channels - coupled_stream_counts[channels] &&
560           coupled_count == coupled_stream_counts[channels] &&
561           memcmp (channel_mapping, channel_map_a[channels - 1],
562               channels) == 0) {
563         opus_channel_config_code = channels;
564       } else if (stream_count == channels - coupled_stream_counts[channels] &&
565           coupled_count == coupled_stream_counts[channels] &&
566           memcmp (channel_mapping, channel_map_b[channels - 1],
567               channels) == 0) {
568         opus_channel_config_code = channels | 0x80;
569       } else {
570         GST_FIXME_OBJECT (ts_pad, "Opus channel mapping not handled");
571         goto not_negotiated;
572       }
573     }
574
575     st = TSMUX_ST_PS_OPUS;
576     ts_pad->prepare_func = gst_base_ts_mux_prepare_opus;
577   } else if (strcmp (mt, "meta/x-klv") == 0) {
578     st = TSMUX_ST_PS_KLV;
579   } else if (strcmp (mt, "image/x-jpc") == 0) {
580     /*
581      * See this document for more details on standard:
582      *
583      * https://www.itu.int/rec/T-REC-H.222.0-201206-S/en
584      *  Annex S describes J2K details
585      *  Page 104 of this document describes J2k video descriptor
586      */
587
588     const GValue *vProfile = gst_structure_get_value (s, "profile");
589     const GValue *vMainlevel = gst_structure_get_value (s, "main-level");
590     const GValue *vFramerate = gst_structure_get_value (s, "framerate");
591     const GValue *vColorimetry = gst_structure_get_value (s, "colorimetry");
592     j2k_private_data *private_data;
593
594     /* for now, we relax the condition that profile must exist and equal
595      * GST_JPEG2000_PARSE_PROFILE_BC_SINGLE */
596     if (vProfile) {
597       profile = g_value_get_int (vProfile);
598       if (profile != GST_JPEG2000_PARSE_PROFILE_BC_SINGLE) {
599         GST_LOG_OBJECT (ts_pad, "Invalid JPEG 2000 profile %d", profile);
600         /* goto not_negotiated; */
601       }
602     }
603     /* for now, we will relax the condition that the main level must be present */
604     if (vMainlevel) {
605       main_level = g_value_get_uint (vMainlevel);
606       if (main_level > 11) {
607         GST_ERROR_OBJECT (ts_pad, "Invalid main level %d", main_level);
608         goto not_negotiated;
609       }
610       if (main_level >= 6) {
611         max_rate = 2 ^ (main_level - 6) * 1600 * 1000000;
612       } else {
613         switch (main_level) {
614           case 0:
615           case 1:
616           case 2:
617           case 3:
618             max_rate = 200 * 1000000;
619             break;
620           case 4:
621             max_rate = 400 * 1000000;
622             break;
623           case 5:
624             max_rate = 800 * 1000000;
625             break;
626           default:
627             break;
628         }
629       }
630     } else {
631       /* GST_ERROR_OBJECT (ts_pad, "Missing main level");
632        * goto not_negotiated; */
633     }
634
635     /* We always mux video in J2K-over-MPEG-TS non-interlaced mode */
636     private_data = g_new0 (j2k_private_data, 1);
637     private_data->interlace = FALSE;
638     private_data->den = 0;
639     private_data->num = 0;
640     private_data->max_bitrate = max_rate;
641     private_data->color_spec = 1;
642     /* these two fields are not used, since we always mux as non-interlaced */
643     private_data->Fic = 1;
644     private_data->Fio = 0;
645
646     /* Get Framerate */
647     if (vFramerate != NULL) {
648       /* Data for ELSM header */
649       private_data->num = gst_value_get_fraction_numerator (vFramerate);
650       private_data->den = gst_value_get_fraction_denominator (vFramerate);
651     }
652     /* Get Colorimetry */
653     if (vColorimetry) {
654       const char *colorimetry = g_value_get_string (vColorimetry);
655       color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_SRGB;  /* RGB as default */
656       if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT601)) {
657         color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC601;
658       } else {
659         if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT709)
660             || g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_SMPTE240M)) {
661           color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC709;
662         }
663       }
664       private_data->color_spec = color_spec;
665     } else {
666       GST_ERROR_OBJECT (ts_pad, "Colorimetry not present in caps");
667       g_free (private_data);
668       goto not_negotiated;
669     }
670     st = TSMUX_ST_VIDEO_JP2K;
671     ts_pad->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
672     ts_pad->prepare_data = private_data;
673     ts_pad->free_func = gst_base_ts_mux_free_jpeg2000;
674   } else {
675     GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
676
677     if (klass->handle_media_type) {
678       st = klass->handle_media_type (mux, mt, ts_pad);
679     }
680   }
681
682   if (st == TSMUX_ST_RESERVED) {
683     GST_ERROR_OBJECT (ts_pad, "Failed to determine stream type");
684     goto error;
685   }
686
687   if (ts_pad->stream && st != ts_pad->stream->stream_type) {
688     GST_ELEMENT_ERROR (mux, STREAM, MUX,
689         ("Stream type change from %02x to %02x not supported",
690             ts_pad->stream->stream_type, st), NULL);
691     goto error;
692   }
693
694   if (ts_pad->stream == NULL) {
695     ts_pad->stream =
696         tsmux_create_stream (mux->tsmux, st, ts_pad->pid, ts_pad->language);
697     if (ts_pad->stream == NULL)
698       goto error;
699   }
700
701   pmt_name = g_strdup_printf ("PMT_%d", ts_pad->pid);
702   if (mux->prog_map && gst_structure_has_field (mux->prog_map, pmt_name)) {
703     gst_structure_get_int (mux->prog_map, pmt_name, &ts_pad->stream->pmt_index);
704   }
705   g_free (pmt_name);
706
707   interlace_mode = gst_structure_get_string (s, "interlace-mode");
708   gst_structure_get_int (s, "rate", &ts_pad->stream->audio_sampling);
709   gst_structure_get_int (s, "channels", &ts_pad->stream->audio_channels);
710   gst_structure_get_int (s, "bitrate", &ts_pad->stream->audio_bitrate);
711
712   /* frame rate */
713   gst_structure_get_fraction (s, "framerate", &ts_pad->stream->num,
714       &ts_pad->stream->den);
715
716   /* Interlace mode */
717   ts_pad->stream->interlace_mode = FALSE;
718   if (interlace_mode) {
719     ts_pad->stream->interlace_mode =
720         g_str_equal (interlace_mode, "interleaved");
721   }
722
723   /* Width and Height */
724   gst_structure_get_int (s, "width", &ts_pad->stream->horizontal_size);
725   gst_structure_get_int (s, "height", &ts_pad->stream->vertical_size);
726
727   ts_pad->stream->color_spec = color_spec;
728   ts_pad->stream->max_bitrate = max_rate;
729   ts_pad->stream->profile_and_level = profile | main_level;
730
731   ts_pad->stream->opus_channel_config_code = opus_channel_config_code;
732
733   tsmux_stream_set_buffer_release_func (ts_pad->stream, release_buffer_cb);
734
735   return GST_FLOW_OK;
736
737   /* ERRORS */
738 not_negotiated:
739   return GST_FLOW_NOT_NEGOTIATED;
740
741 error:
742   return GST_FLOW_ERROR;
743 }
744
745 static gboolean
746 is_valid_pmt_pid (guint16 pmt_pid)
747 {
748   if (pmt_pid < 0x0010 || pmt_pid > 0x1ffe)
749     return FALSE;
750   return TRUE;
751 }
752
753 /* Must be called with mux->lock held */
754 static GstFlowReturn
755 gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
756 {
757   GstCaps *caps = gst_pad_get_current_caps (GST_PAD (ts_pad));
758   GstFlowReturn ret;
759
760   if (caps == NULL) {
761     GST_DEBUG_OBJECT (ts_pad, "Sink pad caps were not set before pushing");
762     return GST_FLOW_NOT_NEGOTIATED;
763   }
764
765   ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
766   gst_caps_unref (caps);
767
768   if (ret == GST_FLOW_OK) {
769     tsmux_program_add_stream (ts_pad->prog, ts_pad->stream);
770   }
771
772   return ret;
773 }
774
775 /* Must be called with mux->lock held */
776 static GstFlowReturn
777 gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
778 {
779   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
780   gchar *name = NULL;
781   gchar *prop_name;
782   GstFlowReturn ret = GST_FLOW_OK;
783
784   if (ts_pad->prog_id == -1) {
785     name = GST_PAD_NAME (pad);
786     if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) {
787       gint idx;
788       gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
789       if (!ret) {
790         GST_ELEMENT_ERROR (mux, STREAM, MUX,
791             ("Reading program map failed. Assuming default"), (NULL));
792         idx = DEFAULT_PROG_ID;
793       }
794       if (idx < 0) {
795         GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
796             "than zero; DEFAULT_PROGRAM = %d is used instead",
797             idx, name, DEFAULT_PROG_ID);
798         idx = DEFAULT_PROG_ID;
799       }
800       ts_pad->prog_id = idx;
801     } else {
802       ts_pad->prog_id = DEFAULT_PROG_ID;
803     }
804   }
805
806   ts_pad->prog =
807       (TsMuxProgram *) g_hash_table_lookup (mux->programs,
808       GINT_TO_POINTER (ts_pad->prog_id));
809   if (ts_pad->prog == NULL) {
810     ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
811     if (ts_pad->prog == NULL)
812       goto no_program;
813     tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
814     tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
815     tsmux_program_set_scte35_interval (ts_pad->prog, mux->scte35_null_interval);
816     g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
817         ts_pad->prog);
818
819     /* Check for user-specified PMT PID */
820     prop_name = g_strdup_printf ("PMT_%d", ts_pad->prog->pgm_number);
821     if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
822       guint pmt_pid;
823
824       if (gst_structure_get_uint (mux->prog_map, prop_name, &pmt_pid)) {
825         if (is_valid_pmt_pid (pmt_pid)) {
826           GST_DEBUG_OBJECT (mux, "User specified pid=%u as PMT for "
827               "program (prog_id = %d)", pmt_pid, ts_pad->prog->pgm_number);
828           tsmux_program_set_pmt_pid (ts_pad->prog, pmt_pid);
829         } else {
830           GST_ELEMENT_WARNING (mux, LIBRARY, SETTINGS,
831               ("User specified PMT pid %u for program %d is not valid.",
832                   pmt_pid, ts_pad->prog->pgm_number), (NULL));
833         }
834       }
835     }
836     g_free (prop_name);
837   }
838
839   if (ts_pad->stream == NULL) {
840     ret = gst_base_ts_mux_create_stream (mux, ts_pad);
841     if (ret != GST_FLOW_OK)
842       goto no_stream;
843   }
844
845   if (ts_pad->prog->pcr_stream == NULL) {
846     /* Take the first stream of the program for the PCR */
847     GST_DEBUG_OBJECT (ts_pad,
848         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
849         ts_pad->pid, ts_pad->prog_id);
850
851     tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
852   }
853
854   /* Check for user-specified PCR PID */
855   prop_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
856   if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
857     const gchar *sink_name =
858         gst_structure_get_string (mux->prog_map, prop_name);
859
860     if (!g_strcmp0 (name, sink_name)) {
861       GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
862           "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
863       tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
864     }
865   }
866   g_free (prop_name);
867
868   return ret;
869
870   /* ERRORS */
871 no_program:
872   {
873     GST_ELEMENT_ERROR (mux, STREAM, MUX,
874         ("Could not create new program"), (NULL));
875     return GST_FLOW_ERROR;
876   }
877 no_stream:
878   {
879     GST_ELEMENT_ERROR (mux, STREAM, MUX,
880         ("Could not create handler for stream"), (NULL));
881     return ret;
882   }
883 }
884
885 /* Must be called with mux->lock held */
886 static gboolean
887 gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
888     gpointer user_data)
889 {
890   GstFlowReturn *ret = user_data;
891
892   *ret = gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad);
893
894   return *ret == GST_FLOW_OK;
895 }
896
897 /* Must be called with mux->lock held */
898 static GstFlowReturn
899 gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
900 {
901   GstFlowReturn ret = GST_FLOW_OK;
902
903   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (mux),
904       gst_base_ts_mux_create_pad_stream_func, &ret);
905
906   return ret;
907 }
908
909 static void
910 new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
911     guint len)
912 {
913   /* Packets should be at least 188 bytes, but check anyway */
914   g_assert (len >= 2 || !data);
915
916   if (!mux->streamheader_sent && data) {
917     guint pid = ((data[1] & 0x1f) << 8) | data[2];
918     /* if it's a PAT or a PMT */
919     if (pid == 0x00 || (pid >= TSMUX_START_PMT_PID && pid < TSMUX_START_ES_PID)) {
920       GstBuffer *hbuf;
921
922       if (!buf) {
923         hbuf = gst_buffer_new_and_alloc (len);
924         gst_buffer_fill (hbuf, 0, data, len);
925       } else {
926         hbuf = gst_buffer_copy (buf);
927       }
928       GST_LOG_OBJECT (mux,
929           "Collecting packet with pid 0x%04x into streamheaders", pid);
930
931       g_queue_push_tail (&mux->streamheader, hbuf);
932     } else if (!g_queue_is_empty (&mux->streamheader)) {
933       gst_base_ts_mux_set_header_on_caps (mux);
934       mux->streamheader_sent = TRUE;
935     }
936   }
937
938   if (buf) {
939     if (mux->is_header) {
940       GST_LOG_OBJECT (mux, "marking as header buffer");
941       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_HEADER);
942     }
943     if (mux->is_delta) {
944       GST_LOG_OBJECT (mux, "marking as delta unit");
945       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
946     } else {
947       GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
948       mux->is_delta = TRUE;
949     }
950   }
951 }
952
953 static GstFlowReturn
954 gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
955 {
956   GstBufferList *buffer_list;
957   gint align = mux->alignment;
958   gint av, packet_size;
959
960   packet_size = mux->packet_size;
961
962   if (align < 0)
963     align = mux->automatic_alignment;
964
965   av = gst_adapter_available (mux->out_adapter);
966   GST_LOG_OBJECT (mux, "align %d, av %d", align, av);
967
968   if (av == 0)
969     return GST_FLOW_OK;
970
971   /* no alignment, just push all available data */
972   if (align == 0) {
973     buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av);
974     return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux),
975         buffer_list);
976   }
977
978   align *= packet_size;
979
980   if (!force && align > av)
981     return GST_FLOW_OK;
982
983   buffer_list = gst_buffer_list_new_sized ((av / align) + 1);
984
985   GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
986   while (align <= av) {
987     GstBuffer *buf;
988     GstClockTime pts;
989
990     pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
991     buf = gst_adapter_take_buffer (mux->out_adapter, align);
992
993     GST_BUFFER_PTS (buf) = pts;
994
995     gst_buffer_list_add (buffer_list, buf);
996     av -= align;
997   }
998
999   if (av > 0 && force) {
1000     GstBuffer *buf;
1001     GstClockTime pts;
1002     guint8 *data;
1003     guint32 header;
1004     gint dummy;
1005     GstMapInfo map;
1006
1007     GST_LOG_OBJECT (mux, "handling %d leftover bytes", av);
1008
1009     pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
1010     buf = gst_buffer_new_and_alloc (align);
1011
1012     GST_BUFFER_PTS (buf) = pts;
1013
1014     gst_buffer_map (buf, &map, GST_MAP_READ);
1015     data = map.data;
1016
1017     gst_adapter_copy (mux->out_adapter, data, 0, av);
1018     gst_adapter_clear (mux->out_adapter);
1019
1020     data += av;
1021     header = GST_READ_UINT32_BE (data - packet_size);
1022
1023     dummy = (map.size - av) / packet_size;
1024     GST_LOG_OBJECT (mux, "adding %d null packets", dummy);
1025
1026     for (; dummy > 0; dummy--) {
1027       gint offset;
1028
1029       if (packet_size > GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH) {
1030         GST_WRITE_UINT32_BE (data, header);
1031         /* simply increase header a bit and never mind too much */
1032         header++;
1033         offset = 4;
1034       } else {
1035         offset = 0;
1036       }
1037       GST_WRITE_UINT8 (data + offset, TSMUX_SYNC_BYTE);
1038       /* null packet PID */
1039       GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
1040       /* no adaptation field exists | continuity counter undefined */
1041       GST_WRITE_UINT8 (data + offset + 3, 0x10);
1042       /* payload */
1043       memset (data + offset + 4, 0, GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH - 4);
1044       data += packet_size;
1045     }
1046
1047     gst_buffer_unmap (buf, &map);
1048     gst_buffer_list_add (buffer_list, buf);
1049   }
1050
1051   return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux), buffer_list);
1052 }
1053
1054 static GstFlowReturn
1055 gst_base_ts_mux_collect_packet (GstBaseTsMux * mux, GstBuffer * buf)
1056 {
1057   GST_LOG_OBJECT (mux, "collecting packet size %" G_GSIZE_FORMAT,
1058       gst_buffer_get_size (buf));
1059   gst_adapter_push (mux->out_adapter, buf);
1060
1061   return GST_FLOW_OK;
1062 }
1063
1064 static GstEvent *
1065 check_pending_key_unit_event (GstEvent * pending_event, GstSegment * segment,
1066     GstClockTime timestamp, guint flags, GstClockTime pending_key_unit_ts)
1067 {
1068   GstClockTime running_time, stream_time;
1069   gboolean all_headers;
1070   guint count;
1071   GstEvent *event = NULL;
1072
1073   g_assert (segment != NULL);
1074
1075   if (pending_event == NULL)
1076     goto out;
1077
1078   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1079       timestamp == GST_CLOCK_TIME_NONE)
1080     goto out;
1081
1082   running_time = timestamp;
1083
1084   GST_INFO ("now %" GST_TIME_FORMAT " wanted %" GST_TIME_FORMAT,
1085       GST_TIME_ARGS (running_time), GST_TIME_ARGS (pending_key_unit_ts));
1086   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1087       running_time < pending_key_unit_ts)
1088     goto out;
1089
1090   if (flags & GST_BUFFER_FLAG_DELTA_UNIT) {
1091     GST_INFO ("pending force key unit, waiting for keyframe");
1092     goto out;
1093   }
1094
1095   stream_time = gst_segment_to_stream_time (segment,
1096       GST_FORMAT_TIME, timestamp);
1097
1098   if (GST_EVENT_TYPE (pending_event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
1099     gst_video_event_parse_downstream_force_key_unit (pending_event,
1100         NULL, NULL, NULL, &all_headers, &count);
1101   } else {
1102     gst_video_event_parse_upstream_force_key_unit (pending_event, NULL,
1103         &all_headers, &count);
1104   }
1105
1106   event =
1107       gst_video_event_new_downstream_force_key_unit (timestamp, stream_time,
1108       running_time, all_headers, count);
1109   gst_event_set_seqnum (event, gst_event_get_seqnum (pending_event));
1110
1111 out:
1112   return event;
1113 }
1114
1115 /* Called when the TsMux has prepared a packet for output. Return FALSE
1116  * on error */
1117 static gboolean
1118 new_packet_cb (GstBuffer * buf, void *user_data, gint64 new_pcr)
1119 {
1120   GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1121   GstAggregator *agg = GST_AGGREGATOR (mux);
1122   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1123   GstMapInfo map;
1124   GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1125
1126   g_assert (klass->output_packet);
1127
1128   gst_buffer_map (buf, &map, GST_MAP_READWRITE);
1129
1130   if (!GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1131     /* tsmux isn't generating timestamps. Use the input times */
1132     GST_BUFFER_PTS (buf) = mux->last_ts;
1133   }
1134
1135   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1136     if (!GST_CLOCK_STIME_IS_VALID (mux->output_ts_offset)) {
1137       GstClockTime output_start_time = agg_segment->position;
1138       if (agg_segment->position == -1
1139           || agg_segment->position < agg_segment->start) {
1140         output_start_time = agg_segment->start;
1141       }
1142
1143       mux->output_ts_offset =
1144           GST_CLOCK_DIFF (GST_BUFFER_PTS (buf), output_start_time);
1145
1146       GST_DEBUG_OBJECT (mux, "New output ts offset %" GST_STIME_FORMAT,
1147           GST_STIME_ARGS (mux->output_ts_offset));
1148     }
1149
1150     GST_BUFFER_PTS (buf) += mux->output_ts_offset;
1151   }
1152
1153   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1154     agg_segment->position = GST_BUFFER_PTS (buf);
1155   }
1156
1157   /* do common init (flags and streamheaders) */
1158   new_packet_common_init (mux, buf, map.data, map.size);
1159
1160   gst_buffer_unmap (buf, &map);
1161
1162   return klass->output_packet (mux, buf, new_pcr);
1163 }
1164
1165 /* called when TsMux needs new packet to write into */
1166 static void
1167 alloc_packet_cb (GstBuffer ** buf, void *user_data)
1168 {
1169   GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1170   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1171
1172   g_assert (klass->allocate_packet);
1173
1174   klass->allocate_packet (mux, buf);
1175 }
1176
1177 static GstFlowReturn
1178 gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
1179     GstAggregatorPad * agg_pad, GstBuffer * buf)
1180 {
1181   GstFlowReturn ret = GST_FLOW_OK;
1182   GstBaseTsMuxPad *best = GST_BASE_TS_MUX_PAD (agg_pad);
1183   TsMuxProgram *prog;
1184   gint64 pts = GST_CLOCK_STIME_NONE;
1185   gint64 dts = GST_CLOCK_STIME_NONE;
1186   gboolean delta = TRUE, header = FALSE;
1187   StreamData *stream_data;
1188   GstMpegtsSection *scte_section = NULL;
1189
1190   GST_DEBUG_OBJECT (mux, "Pads collected");
1191
1192   if (buf && gst_buffer_get_size (buf) == 0
1193       && GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP)) {
1194     gst_buffer_unref (buf);
1195     return GST_FLOW_OK;
1196   }
1197
1198   g_mutex_lock (&mux->lock);
1199   if (G_UNLIKELY (mux->first)) {
1200     ret = gst_base_ts_mux_create_streams (mux);
1201     if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1202       if (buf)
1203         gst_buffer_unref (buf);
1204       g_mutex_unlock (&mux->lock);
1205       return ret;
1206     }
1207
1208     mux->first = FALSE;
1209   }
1210
1211   prog = best->prog;
1212   if (prog == NULL) {
1213     GList *cur;
1214
1215     gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best));
1216     tsmux_resend_pat (mux->tsmux);
1217     tsmux_resend_si (mux->tsmux);
1218     prog = best->prog;
1219     g_assert_nonnull (prog);
1220
1221     /* output PMT for each program */
1222     for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1223       TsMuxProgram *program = (TsMuxProgram *) cur->data;
1224
1225       tsmux_resend_pmt (program);
1226     }
1227   }
1228
1229   g_assert (buf != NULL);
1230
1231   if (best->prepare_func) {
1232     GstBuffer *tmp;
1233
1234     tmp = best->prepare_func (buf, best, mux);
1235     g_assert (tmp);
1236     gst_buffer_unref (buf);
1237     buf = tmp;
1238   }
1239
1240   if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
1241     GstEvent *event;
1242
1243     g_mutex_unlock (&mux->lock);
1244     event = check_pending_key_unit_event (mux->force_key_unit_event,
1245         &agg_pad->segment, GST_BUFFER_PTS (buf),
1246         GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
1247     if (event) {
1248       GstClockTime running_time;
1249       guint count;
1250       GList *cur;
1251
1252       mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
1253       gst_event_replace (&mux->force_key_unit_event, NULL);
1254
1255       gst_video_event_parse_downstream_force_key_unit (event,
1256           NULL, NULL, &running_time, NULL, &count);
1257
1258       GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
1259           "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
1260           GST_TIME_ARGS (running_time), count);
1261       gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
1262
1263       g_mutex_lock (&mux->lock);
1264       /* output PAT, SI tables */
1265       tsmux_resend_pat (mux->tsmux);
1266       tsmux_resend_si (mux->tsmux);
1267
1268       /* output PMT for each program */
1269       for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1270         TsMuxProgram *program = (TsMuxProgram *) cur->data;
1271
1272         tsmux_resend_pmt (program);
1273       }
1274     } else {
1275       g_mutex_lock (&mux->lock);
1276     }
1277   }
1278
1279   if (G_UNLIKELY (prog->pcr_stream == NULL)) {
1280     /* Take the first data stream for the PCR */
1281     GST_DEBUG_OBJECT (best,
1282         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
1283         best->pid, best->prog_id);
1284
1285     /* Set the chosen PCR stream */
1286     tsmux_program_set_pcr_stream (prog, best->stream);
1287   }
1288
1289   GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
1290
1291   GST_OBJECT_LOCK (mux);
1292   scte_section = mux->pending_scte35_section;
1293   mux->pending_scte35_section = NULL;
1294   GST_OBJECT_UNLOCK (mux);
1295   if (G_UNLIKELY (scte_section)) {
1296     GST_DEBUG_OBJECT (mux, "Sending pending SCTE section");
1297     if (!tsmux_send_section (mux->tsmux, scte_section))
1298       GST_ERROR_OBJECT (mux, "Error sending SCTE section !");
1299   }
1300
1301   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1302     pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
1303     GST_DEBUG_OBJECT (mux, "Buffer has PTS  %" GST_TIME_FORMAT " pts %"
1304         G_GINT64_FORMAT "%s", GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts,
1305         !GST_BUFFER_FLAG_IS_SET (buf,
1306             GST_BUFFER_FLAG_DELTA_UNIT) ? " (keyframe)" : "");
1307   }
1308
1309   if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
1310     dts = GSTTIME_TO_MPEGTIME (best->dts);
1311     GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
1312         G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
1313   }
1314
1315   /* should not have a DTS without PTS */
1316   if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
1317     GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
1318     pts = dts;
1319   }
1320
1321   if (best->stream->is_video_stream) {
1322     delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1323     header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
1324   }
1325
1326   if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
1327     GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
1328
1329     gst_buffer_unref (buf);
1330     g_mutex_unlock (&mux->lock);
1331     return GST_FLOW_OK;
1332   }
1333
1334   GST_DEBUG_OBJECT (mux, "delta: %d", delta);
1335
1336   if (gst_buffer_get_size (buf) > 0) {
1337     stream_data = stream_data_new (buf);
1338     tsmux_stream_add_data (best->stream, stream_data->map_info.data,
1339         stream_data->map_info.size, stream_data, pts, dts, !delta);
1340   }
1341
1342   /* outgoing ts follows ts of PCR program stream */
1343   if (prog->pcr_stream == best->stream) {
1344     /* prefer DTS if present for PCR as it should be monotone */
1345     mux->last_ts =
1346         GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
1347         GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
1348   }
1349
1350   mux->is_delta = delta;
1351   mux->is_header = header;
1352   while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
1353     if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
1354       /* Failed writing data for some reason. Set appropriate error */
1355       GST_DEBUG_OBJECT (mux, "Failed to write data packet");
1356       GST_ELEMENT_ERROR (mux, STREAM, MUX,
1357           ("Failed writing output data to stream %04x", best->stream->id),
1358           (NULL));
1359       goto write_fail;
1360     }
1361   }
1362   g_mutex_unlock (&mux->lock);
1363   /* flush packet cache */
1364   return gst_base_ts_mux_push_packets (mux, FALSE);
1365
1366   /* ERRORS */
1367 write_fail:
1368   {
1369     return mux->last_flow_ret;
1370   }
1371 }
1372
1373 /* GstElement implementation */
1374 static gboolean
1375 gst_base_ts_mux_has_pad_with_pid (GstBaseTsMux * mux, guint16 pid)
1376 {
1377   GList *l;
1378   gboolean res = FALSE;
1379
1380   GST_OBJECT_LOCK (mux);
1381
1382   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1383     GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
1384
1385     if (tpad->pid == pid) {
1386       res = TRUE;
1387       break;
1388     }
1389   }
1390
1391   GST_OBJECT_UNLOCK (mux);
1392   return res;
1393 }
1394
1395 static GstPad *
1396 gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
1397     const gchar * name, const GstCaps * caps)
1398 {
1399   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1400   gint pid = -1;
1401   GstPad *pad = NULL;
1402   gchar *free_name = NULL;
1403
1404   g_mutex_lock (&mux->lock);
1405   if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
1406     if (tsmux_find_stream (mux->tsmux, pid)) {
1407       g_mutex_unlock (&mux->lock);
1408       goto stream_exists;
1409     }
1410     /* Make sure we don't use reserved PID.
1411      * FIXME : This should be extended to other variants (ex: ATSC) reserved PID */
1412     if (pid < TSMUX_START_ES_PID)
1413       goto invalid_stream_pid;
1414   } else {
1415     do {
1416       pid = tsmux_get_new_pid (mux->tsmux);
1417     } while (gst_base_ts_mux_has_pad_with_pid (mux, pid));
1418
1419     /* Name the pad correctly after the selected pid */
1420     name = free_name = g_strdup_printf ("sink_%d", pid);
1421   }
1422   g_mutex_unlock (&mux->lock);
1423
1424   pad = (GstPad *)
1425       GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
1426       templ, name, caps);
1427
1428   gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (pad));
1429   GST_BASE_TS_MUX_PAD (pad)->pid = pid;
1430
1431   g_free (free_name);
1432
1433   return pad;
1434
1435   /* ERRORS */
1436 stream_exists:
1437   {
1438     GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"),
1439         (NULL));
1440     return NULL;
1441   }
1442
1443 invalid_stream_pid:
1444   {
1445     GST_ELEMENT_ERROR (element, STREAM, MUX,
1446         ("Invalid Elementary stream PID (0x%02u < 0x40)", pid), (NULL));
1447     return NULL;
1448   }
1449 }
1450
1451 static void
1452 gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
1453 {
1454   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1455
1456   g_mutex_lock (&mux->lock);
1457   if (mux->tsmux) {
1458     GList *cur;
1459     GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
1460     gint pid = ts_pad->pid;
1461
1462     if (ts_pad->prog) {
1463       if (ts_pad->prog->pcr_stream == ts_pad->stream) {
1464         tsmux_program_set_pcr_stream (ts_pad->prog, NULL);
1465       }
1466       if (tsmux_remove_stream (mux->tsmux, pid, ts_pad->prog)) {
1467         g_hash_table_remove (mux->programs, GINT_TO_POINTER (ts_pad->prog_id));
1468       }
1469     }
1470
1471     tsmux_resend_pat (mux->tsmux);
1472     tsmux_resend_si (mux->tsmux);
1473
1474     /* output PMT for each program */
1475     for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1476       TsMuxProgram *program = (TsMuxProgram *) cur->data;
1477
1478       tsmux_resend_pmt (program);
1479     }
1480   }
1481   g_mutex_unlock (&mux->lock);
1482
1483   GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad);
1484 }
1485
1486 /* GstAggregator implementation */
1487
1488 static void
1489 request_keyframe (GstBaseTsMux * mux, GstClockTime running_time)
1490 {
1491   GList *l;
1492   GST_OBJECT_LOCK (mux);
1493
1494   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1495     gst_pad_push_event (GST_PAD (l->data),
1496         gst_video_event_new_upstream_force_key_unit (running_time, TRUE, 0));
1497   }
1498
1499   GST_OBJECT_UNLOCK (mux);
1500 }
1501
1502 static const guint32 crc_tab[256] = {
1503   0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b,
1504   0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
1505   0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd, 0x4c11db70, 0x48d0c6c7,
1506   0x4593e01e, 0x4152fda9, 0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
1507   0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011, 0x791d4014, 0x7ddc5da3,
1508   0x709f7b7a, 0x745e66cd, 0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039,
1509   0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5, 0xbe2b5b58, 0xbaea46ef,
1510   0xb7a96036, 0xb3687d81, 0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d,
1511   0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49, 0xc7361b4c, 0xc3f706fb,
1512   0xceb42022, 0xca753d95, 0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
1513   0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d, 0x34867077, 0x30476dc0,
1514   0x3d044b19, 0x39c556ae, 0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072,
1515   0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16, 0x018aeb13, 0x054bf6a4,
1516   0x0808d07d, 0x0cc9cdca, 0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde,
1517   0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02, 0x5e9f46bf, 0x5a5e5b08,
1518   0x571d7dd1, 0x53dc6066, 0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
1519   0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e, 0xbfa1b04b, 0xbb60adfc,
1520   0xb6238b25, 0xb2e29692, 0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6,
1521   0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a, 0xe0b41de7, 0xe4750050,
1522   0xe9362689, 0xedf73b3e, 0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2,
1523   0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686, 0xd5b88683, 0xd1799b34,
1524   0xdc3abded, 0xd8fba05a, 0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
1525   0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb, 0x4f040d56, 0x4bc510e1,
1526   0x46863638, 0x42472b8f, 0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53,
1527   0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47, 0x36194d42, 0x32d850f5,
1528   0x3f9b762c, 0x3b5a6b9b, 0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff,
1529   0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623, 0xf12f560e, 0xf5ee4bb9,
1530   0xf8ad6d60, 0xfc6c70d7, 0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
1531   0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f, 0xc423cd6a, 0xc0e2d0dd,
1532   0xcda1f604, 0xc960ebb3, 0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7,
1533   0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b, 0x9b3660c6, 0x9ff77d71,
1534   0x92b45ba8, 0x9675461f, 0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3,
1535   0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640, 0x4e8ee645, 0x4a4ffbf2,
1536   0x470cdd2b, 0x43cdc09c, 0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
1537   0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24, 0x119b4be9, 0x155a565e,
1538   0x18197087, 0x1cd86d30, 0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec,
1539   0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088, 0x2497d08d, 0x2056cd3a,
1540   0x2d15ebe3, 0x29d4f654, 0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0,
1541   0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c, 0xe3a1cbc1, 0xe760d676,
1542   0xea23f0af, 0xeee2ed18, 0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
1543   0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662,
1544   0x933eb0bb, 0x97ffad0c, 0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
1545   0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
1546 };
1547
1548 static guint32
1549 _calc_crc32 (const guint8 * data, guint datalen)
1550 {
1551   gint i;
1552   guint32 crc = 0xffffffff;
1553
1554   for (i = 0; i < datalen; i++) {
1555     crc = (crc << 8) ^ crc_tab[((crc >> 24) ^ *data++) & 0xff];
1556   }
1557   return crc;
1558 }
1559
1560 #define MPEGTIME_TO_GSTTIME(t) ((t) * (guint64)100000 / 9)
1561
1562 static GstMpegtsSCTESpliceEvent *
1563 copy_splice (GstMpegtsSCTESpliceEvent * splice)
1564 {
1565   return g_boxed_copy (GST_TYPE_MPEGTS_SCTE_SPLICE_EVENT, splice);
1566 }
1567
1568 static void
1569 free_splice (GstMpegtsSCTESpliceEvent * splice)
1570 {
1571   g_boxed_free (GST_TYPE_MPEGTS_SCTE_SPLICE_EVENT, splice);
1572 }
1573
1574 /* FIXME: get rid of this when depending on glib >= 2.62 */
1575
1576 static GPtrArray *
1577 _g_ptr_array_copy (GPtrArray * array,
1578     GCopyFunc func, GFreeFunc free_func, gpointer user_data)
1579 {
1580   GPtrArray *new_array;
1581
1582   g_return_val_if_fail (array != NULL, NULL);
1583
1584   new_array = g_ptr_array_new_with_free_func (free_func);
1585
1586   g_ptr_array_set_size (new_array, array->len);
1587
1588   if (func != NULL) {
1589     guint i;
1590
1591     for (i = 0; i < array->len; i++)
1592       new_array->pdata[i] = func (array->pdata[i], user_data);
1593   } else if (array->len > 0) {
1594     memcpy (new_array->pdata, array->pdata,
1595         array->len * sizeof (*array->pdata));
1596   }
1597
1598   new_array->len = array->len;
1599
1600   return new_array;
1601 }
1602
1603 static GstMpegtsSCTESIT *
1604 deep_copy_sit (const GstMpegtsSCTESIT * sit)
1605 {
1606   GstMpegtsSCTESIT *sit_copy = g_boxed_copy (GST_TYPE_MPEGTS_SCTE_SIT, sit);
1607   GPtrArray *splices_copy =
1608       _g_ptr_array_copy (sit_copy->splices, (GCopyFunc) copy_splice,
1609       (GFreeFunc) free_splice, NULL);
1610
1611   g_ptr_array_unref (sit_copy->splices);
1612   sit_copy->splices = splices_copy;
1613
1614   return sit_copy;
1615 }
1616
1617 /* Takes ownership of @section.
1618  *
1619  * This function is a bit complex because the SCTE sections can
1620  * have various origins:
1621  *
1622  * * Sections created by the application with the gst_mpegts_scte_*_new()
1623  *   API. The splice times / durations contained by these are expressed
1624  *   in the GStreamer running time domain, and must be translated to
1625  *   our local PES time domain. In this case, we will packetize the section
1626  *   ourselves.
1627  *
1628  * * Sections passed through from tsdemux: this case is complicated as
1629  *   splice times in the incoming stream may be encrypted, with pts_adjustment
1630  *   being the only timing field guaranteed *not* to be encrypted. In this
1631  *   case, the original binary data (section->data) will be reinjected as is
1632  *   in the output stream, with pts_adjustment adjusted. tsdemux provides us
1633  *   with the pts_offset it introduces, the difference between the original
1634  *   PES PTSs and the running times it outputs.
1635  *
1636  * Additionally, in either of these cases when the splice times aren't encrypted
1637  * we want to make use of those to request keyframes. For the passthrough case,
1638  * as the splice times are left untouched tsdemux provides us with the running
1639  * times the section originally referred to. We cannot calculate it locally
1640  * because we would need to have access to the information that the timestamps
1641  * in the original PES domain have wrapped around, and how many times they have
1642  * done so. While we could probably make educated guesses, tsdemux (more specifically
1643  * mpegtspacketizer) already keeps track of that, and it seemed more logical to
1644  * perform the calculation there and forward it alongside the downstream events.
1645  *
1646  * Finally, while we can't request keyframes at splice points in the encrypted
1647  * case, if the input stream was compliant in that regard and no reencoding took
1648  * place the splice times will still match with valid splice points, it is up
1649  * to the application to ensure that that is the case.
1650  */
1651 static void
1652 handle_scte35_section (GstBaseTsMux * mux, GstEvent * event,
1653     GstMpegtsSection * section, guint64 mpeg_pts_offset,
1654     GstStructure * rtime_map)
1655 {
1656   GstMpegtsSCTESIT *sit;
1657   guint i;
1658   gboolean forward = TRUE;
1659   guint64 pts_adjust;
1660   guint8 *section_data;
1661   guint8 *crc;
1662   gboolean translate = FALSE;
1663
1664   sit = (GstMpegtsSCTESIT *) gst_mpegts_section_get_scte_sit (section);
1665
1666   /* When the application injects manually constructed splice events,
1667    * their time domain is the GStreamer running time, we receive them
1668    * unpacketized and translate the fields in the SIT to local PTS.
1669    *
1670    * We make a copy of the SIT in order to make sure we can rewrite it.
1671    */
1672   if (sit->is_running_time) {
1673     sit = deep_copy_sit (sit);
1674     translate = TRUE;
1675   }
1676
1677   switch (sit->splice_command_type) {
1678     case GST_MTS_SCTE_SPLICE_COMMAND_NULL:
1679       /* We implement heartbeating ourselves */
1680       forward = FALSE;
1681       break;
1682     case GST_MTS_SCTE_SPLICE_COMMAND_SCHEDULE:
1683       /* No need to request keyframes at this point, splice_insert
1684        * messages will precede the future splice points and we
1685        * can request keyframes then. Only translate if needed.
1686        */
1687       if (translate) {
1688         for (i = 0; i < sit->splices->len; i++) {
1689           GstMpegtsSCTESpliceEvent *sevent =
1690               g_ptr_array_index (sit->splices, i);
1691
1692           if (sevent->program_splice_time_specified)
1693             sevent->program_splice_time =
1694                 GSTTIME_TO_MPEGTIME (sevent->program_splice_time) +
1695                 TS_MUX_CLOCK_BASE;
1696
1697           if (sevent->duration_flag)
1698             sevent->break_duration =
1699                 GSTTIME_TO_MPEGTIME (sevent->break_duration);
1700         }
1701       }
1702       break;
1703     case GST_MTS_SCTE_SPLICE_COMMAND_INSERT:
1704       /* We want keyframes at splice points */
1705       if (sit->fully_parsed && (rtime_map || translate)) {
1706
1707         for (i = 0; i < sit->splices->len; i++) {
1708           guint64 running_time = GST_CLOCK_TIME_NONE;
1709
1710           GstMpegtsSCTESpliceEvent *sevent =
1711               g_ptr_array_index (sit->splices, i);
1712           if (sevent->program_splice_time_specified) {
1713             if (rtime_map) {
1714               gchar *field_name = g_strdup_printf ("event-%u-splice-time",
1715                   sevent->splice_event_id);
1716               if (gst_structure_get_uint64 (rtime_map, field_name,
1717                       &running_time)) {
1718                 GST_DEBUG_OBJECT (mux,
1719                     "Requesting keyframe for splice point at %" GST_TIME_FORMAT,
1720                     GST_TIME_ARGS (running_time));
1721                 request_keyframe (mux, running_time);
1722               }
1723               g_free (field_name);
1724             } else {
1725               g_assert (translate == TRUE);
1726               running_time = sevent->program_splice_time;
1727               GST_DEBUG_OBJECT (mux,
1728                   "Requesting keyframe for splice point at %" GST_TIME_FORMAT,
1729                   GST_TIME_ARGS (running_time));
1730               request_keyframe (mux, running_time);
1731               sevent->program_splice_time =
1732                   GSTTIME_TO_MPEGTIME (running_time) + TS_MUX_CLOCK_BASE;
1733             }
1734           } else {
1735             GST_DEBUG_OBJECT (mux,
1736                 "Requesting keyframe for immediate splice point");
1737             request_keyframe (mux, GST_CLOCK_TIME_NONE);
1738           }
1739
1740           if (sevent->duration_flag) {
1741             if (translate) {
1742               sevent->break_duration =
1743                   GSTTIME_TO_MPEGTIME (sevent->break_duration);
1744             }
1745
1746             /* Even if auto_return is FALSE, when a break_duration is specified it
1747              * is intended as a redundancy mechanism in case the follow-up
1748              * splice insert goes missing.
1749              *
1750              * Schedule a keyframe at that point (if we can calculate its position
1751              * accurately).
1752              */
1753             if (GST_CLOCK_TIME_IS_VALID (running_time)) {
1754               running_time += MPEGTIME_TO_GSTTIME (sevent->break_duration);
1755               GST_DEBUG_OBJECT (mux,
1756                   "Requesting keyframe for end of break at %" GST_TIME_FORMAT,
1757                   GST_TIME_ARGS (running_time));
1758               request_keyframe (mux, running_time);
1759             }
1760           }
1761         }
1762       }
1763       break;
1764     case GST_MTS_SCTE_SPLICE_COMMAND_TIME:{
1765       /* Adjust timestamps and potentially request keyframes */
1766       gboolean do_request_keyframes = FALSE;
1767
1768       /* TODO: we can probably be a little more fine-tuned about determining
1769        * whether a keyframe is actually needed, but this at least takes care
1770        * of the requirement in 10.3.4 that a keyframe should not be created
1771        * when the signal contains only a time_descriptor.
1772        */
1773       if (sit->fully_parsed && (rtime_map || translate)) {
1774         for (i = 0; i < sit->descriptors->len; i++) {
1775           GstMpegtsDescriptor *descriptor =
1776               g_ptr_array_index (sit->descriptors, i);
1777
1778           switch (descriptor->tag) {
1779             case GST_MTS_SCTE_DESC_AVAIL:
1780             case GST_MTS_SCTE_DESC_DTMF:
1781             case GST_MTS_SCTE_DESC_SEGMENTATION:
1782               do_request_keyframes = TRUE;
1783               break;
1784             case GST_MTS_SCTE_DESC_TIME:
1785             case GST_MTS_SCTE_DESC_AUDIO:
1786               break;
1787           }
1788
1789           if (do_request_keyframes)
1790             break;
1791         }
1792
1793         if (sit->splice_time_specified) {
1794           GstClockTime running_time = GST_CLOCK_TIME_NONE;
1795
1796           if (rtime_map) {
1797             if (do_request_keyframes
1798                 && gst_structure_get_uint64 (rtime_map, "splice-time",
1799                     &running_time)) {
1800               GST_DEBUG_OBJECT (mux,
1801                   "Requesting keyframe for time signal at %" GST_TIME_FORMAT,
1802                   GST_TIME_ARGS (running_time));
1803               request_keyframe (mux, running_time);
1804             }
1805           } else {
1806             g_assert (translate);
1807             running_time = sit->splice_time;
1808             sit->splice_time =
1809                 GSTTIME_TO_MPEGTIME (running_time) + TS_MUX_CLOCK_BASE;
1810             if (do_request_keyframes) {
1811               GST_DEBUG_OBJECT (mux,
1812                   "Requesting keyframe for time signal at %" GST_TIME_FORMAT,
1813                   GST_TIME_ARGS (running_time));
1814               request_keyframe (mux, running_time);
1815             }
1816           }
1817         } else if (do_request_keyframes) {
1818           GST_DEBUG_OBJECT (mux,
1819               "Requesting keyframe for immediate time signal");
1820           request_keyframe (mux, GST_CLOCK_TIME_NONE);
1821         }
1822       }
1823       break;
1824     }
1825     case GST_MTS_SCTE_SPLICE_COMMAND_BANDWIDTH:
1826     case GST_MTS_SCTE_SPLICE_COMMAND_PRIVATE:
1827       /* Just let those go through untouched, none of our business */
1828       break;
1829     default:
1830       break;
1831   }
1832
1833   if (!forward) {
1834     gst_mpegts_section_unref (section);
1835     return;
1836   }
1837
1838   if (!translate) {
1839     g_assert (section->data);
1840     /* Calculate the final adjustment, as a sum of:
1841      * - The adjustment in the original packet
1842      * - The offset introduced between the original local PTS
1843      *   and the GStreamer PTS output by tsdemux
1844      * - Our own 1-hour offset
1845      */
1846     pts_adjust = sit->pts_adjustment + mpeg_pts_offset + TS_MUX_CLOCK_BASE;
1847
1848     /* Account for offsets potentially introduced between the demuxer and us */
1849     pts_adjust +=
1850         GSTTIME_TO_MPEGTIME (gst_event_get_running_time_offset (event));
1851
1852     pts_adjust &= 0x1ffffffff;
1853     section_data = g_memdup2 (section->data, section->section_length);
1854     section_data[4] |= pts_adjust >> 32;
1855     section_data[5] = pts_adjust >> 24;
1856     section_data[6] = pts_adjust >> 16;
1857     section_data[7] = pts_adjust >> 8;
1858     section_data[8] = pts_adjust;
1859
1860     /* Now rewrite our checksum */
1861     crc = section_data + section->section_length - 4;
1862     GST_WRITE_UINT32_BE (crc, _calc_crc32 (section_data, crc - section_data));
1863
1864     GST_OBJECT_LOCK (mux);
1865     GST_DEBUG_OBJECT (mux, "Storing SCTE section");
1866     if (mux->pending_scte35_section)
1867       gst_mpegts_section_unref (mux->pending_scte35_section);
1868     mux->pending_scte35_section =
1869         gst_mpegts_section_new (mux->scte35_pid, section_data,
1870         section->section_length);
1871     GST_OBJECT_UNLOCK (mux);
1872
1873     gst_mpegts_section_unref (section);
1874   } else {
1875     GST_OBJECT_LOCK (mux);
1876     GST_DEBUG_OBJECT (mux, "Storing SCTE section");
1877     gst_mpegts_section_unref (section);
1878     if (mux->pending_scte35_section)
1879       gst_mpegts_section_unref (mux->pending_scte35_section);
1880     mux->pending_scte35_section =
1881         gst_mpegts_section_from_scte_sit (sit, mux->scte35_pid);;
1882     GST_OBJECT_UNLOCK (mux);
1883   }
1884 }
1885
1886 static gboolean
1887 gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
1888 {
1889   GstMpegtsSection *section;
1890   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1891
1892   section = gst_event_parse_mpegts_section (event);
1893
1894   if (section) {
1895     GST_DEBUG ("Received event with mpegts section");
1896
1897     if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
1898       handle_scte35_section (mux, event, section, 0, NULL);
1899     } else {
1900       g_mutex_lock (&mux->lock);
1901       /* TODO: Check that the section type is supported */
1902       tsmux_add_mpegts_si_section (mux->tsmux, section);
1903       g_mutex_unlock (&mux->lock);
1904     }
1905
1906     gst_event_unref (event);
1907
1908     return TRUE;
1909   }
1910
1911   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1912 }
1913
1914 /* GstAggregator implementation */
1915
1916 static gboolean
1917 gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
1918     GstEvent * event)
1919 {
1920   GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
1921   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
1922   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (agg_pad);
1923   gboolean res = FALSE;
1924   gboolean forward = TRUE;
1925
1926   switch (GST_EVENT_TYPE (event)) {
1927     case GST_EVENT_CAPS:
1928     {
1929       GstCaps *caps;
1930       GstFlowReturn ret;
1931       GList *cur;
1932
1933       g_mutex_lock (&mux->lock);
1934       if (ts_pad->stream == NULL) {
1935         g_mutex_unlock (&mux->lock);
1936         break;
1937       }
1938
1939       forward = FALSE;
1940
1941       gst_event_parse_caps (event, &caps);
1942       if (!caps || !gst_caps_is_fixed (caps)) {
1943         g_mutex_unlock (&mux->lock);
1944         break;
1945       }
1946
1947       ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
1948       if (ret != GST_FLOW_OK) {
1949         g_mutex_unlock (&mux->lock);
1950         break;
1951       }
1952
1953       mux->tsmux->pat_changed = TRUE;
1954       mux->tsmux->si_changed = TRUE;
1955       tsmux_resend_pat (mux->tsmux);
1956       tsmux_resend_si (mux->tsmux);
1957
1958       /* output PMT for each program */
1959       for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1960         TsMuxProgram *program = (TsMuxProgram *) cur->data;
1961
1962         program->pmt_changed = TRUE;
1963         tsmux_resend_pmt (program);
1964       }
1965       g_mutex_unlock (&mux->lock);
1966
1967       res = TRUE;
1968       break;
1969     }
1970     case GST_EVENT_CUSTOM_DOWNSTREAM:
1971     {
1972       GstClockTime timestamp, stream_time, running_time;
1973       gboolean all_headers;
1974       guint count;
1975       const GstStructure *s;
1976
1977       s = gst_event_get_structure (event);
1978
1979       if (gst_structure_has_name (s, "scte-sit") && mux->scte35_pid != 0) {
1980
1981         /* When operating downstream of tsdemux, tsdemux will send out events
1982          * on all its source pads for each splice table it encounters. If we
1983          * are remuxing multiple streams it has demuxed, this means we could
1984          * unnecessarily repeat the same table multiple times, we avoid that
1985          * by deduplicating thanks to the event sequm
1986          */
1987         if (gst_event_get_seqnum (event) != mux->last_scte35_event_seqnum) {
1988           GstMpegtsSection *section;
1989
1990           gst_structure_get (s, "section", GST_TYPE_MPEGTS_SECTION, &section,
1991               NULL);
1992           if (section) {
1993             guint64 mpeg_pts_offset = 0;
1994             GstStructure *rtime_map = NULL;
1995
1996             gst_structure_get (s, "running-time-map", GST_TYPE_STRUCTURE,
1997                 &rtime_map, NULL);
1998             gst_structure_get_uint64 (s, "mpeg-pts-offset", &mpeg_pts_offset);
1999
2000             handle_scte35_section (mux, event, section, mpeg_pts_offset,
2001                 rtime_map);
2002             if (rtime_map)
2003               gst_structure_free (rtime_map);
2004             mux->last_scte35_event_seqnum = gst_event_get_seqnum (event);
2005           } else {
2006             GST_WARNING_OBJECT (ts_pad,
2007                 "Ignoring scte-sit event without a section");
2008           }
2009         } else {
2010           GST_DEBUG_OBJECT (ts_pad, "Ignoring duplicate scte-sit event");
2011         }
2012         res = TRUE;
2013         forward = FALSE;
2014         goto out;
2015       }
2016
2017       if (!gst_video_event_is_force_key_unit (event))
2018         goto out;
2019
2020       res = TRUE;
2021       forward = FALSE;
2022
2023       gst_video_event_parse_downstream_force_key_unit (event,
2024           &timestamp, &stream_time, &running_time, &all_headers, &count);
2025       GST_INFO_OBJECT (ts_pad, "have downstream force-key-unit event, "
2026           "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
2027           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
2028
2029       if (mux->force_key_unit_event != NULL) {
2030         GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
2031             "as an upstream force key unit is already queued");
2032         goto out;
2033       }
2034
2035       if (!all_headers)
2036         goto out;
2037
2038       mux->pending_key_unit_ts = running_time;
2039       gst_event_replace (&mux->force_key_unit_event, event);
2040       break;
2041     }
2042     case GST_EVENT_TAG:{
2043       GstTagList *list;
2044       gchar *lang = NULL;
2045
2046       GST_DEBUG_OBJECT (mux, "received tag event");
2047       gst_event_parse_tag (event, &list);
2048
2049       /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
2050       if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
2051         const gchar *lang_code;
2052
2053         lang_code = gst_tag_get_language_code_iso_639_2B (lang);
2054         if (lang_code) {
2055           GST_DEBUG_OBJECT (ts_pad, "Setting language to '%s'", lang_code);
2056
2057           g_free (ts_pad->language);
2058           ts_pad->language = g_strdup (lang_code);
2059         } else {
2060           GST_WARNING_OBJECT (ts_pad, "Did not get language code for '%s'",
2061               lang);
2062         }
2063         g_free (lang);
2064       }
2065
2066       /* handled this, don't want collectpads to forward it downstream */
2067       res = TRUE;
2068       forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
2069       break;
2070     }
2071     case GST_EVENT_STREAM_START:{
2072       GstStreamFlags flags;
2073
2074       gst_event_parse_stream_flags (event, &flags);
2075
2076       /* Don't wait for data on sparse inputs like metadata streams */
2077       /*
2078          if ((flags & GST_STREAM_FLAG_SPARSE)) {
2079          GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
2080          gst_collect_pads_set_waiting (pads, data, FALSE);
2081          GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
2082          }
2083        */
2084       break;
2085     }
2086     default:
2087       break;
2088   }
2089
2090 out:
2091   if (!forward)
2092     gst_event_unref (event);
2093   else
2094     res = agg_class->sink_event (agg, agg_pad, event);
2095
2096   return res;
2097 }
2098
2099 static gboolean
2100 gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
2101 {
2102   GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
2103   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2104   gboolean res = TRUE, forward = TRUE;
2105
2106   switch (GST_EVENT_TYPE (event)) {
2107     case GST_EVENT_CUSTOM_UPSTREAM:
2108     {
2109       GstIterator *iter;
2110       GValue sinkpad_value = G_VALUE_INIT;
2111       GstClockTime running_time;
2112       gboolean all_headers, done = FALSE, res = FALSE;
2113       guint count;
2114
2115       if (!gst_video_event_is_force_key_unit (event))
2116         break;
2117
2118       forward = FALSE;
2119
2120       gst_video_event_parse_upstream_force_key_unit (event,
2121           &running_time, &all_headers, &count);
2122
2123       GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
2124           "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
2125           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
2126           all_headers, count);
2127
2128       if (!all_headers)
2129         break;
2130
2131       mux->pending_key_unit_ts = running_time;
2132       gst_event_replace (&mux->force_key_unit_event, event);
2133
2134       iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
2135
2136       while (!done) {
2137         switch (gst_iterator_next (iter, &sinkpad_value)) {
2138           case GST_ITERATOR_OK:{
2139             GstPad *sinkpad = g_value_get_object (&sinkpad_value);
2140             gboolean tmp;
2141
2142             GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
2143             tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
2144             GST_INFO_OBJECT (mux, "result %d", tmp);
2145             /* succeed if at least one pad succeeds */
2146             res |= tmp;
2147             break;
2148           }
2149           case GST_ITERATOR_DONE:
2150             done = TRUE;
2151             break;
2152           case GST_ITERATOR_RESYNC:
2153             gst_iterator_resync (iter);
2154             break;
2155           case GST_ITERATOR_ERROR:
2156             g_assert_not_reached ();
2157             break;
2158         }
2159         g_value_reset (&sinkpad_value);
2160       }
2161       g_value_unset (&sinkpad_value);
2162       gst_iterator_free (iter);
2163       break;
2164     }
2165     default:
2166       break;
2167   }
2168
2169   if (forward)
2170     res = agg_class->src_event (agg, event);
2171   else
2172     gst_event_unref (event);
2173
2174   return res;
2175 }
2176
2177 static GstBuffer *
2178 gst_base_ts_mux_clip (GstAggregator * agg,
2179     GstAggregatorPad * agg_pad, GstBuffer * buf)
2180 {
2181   GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (agg_pad);
2182   GstClockTime time;
2183   GstBuffer *ret;
2184
2185   ret = buf;
2186
2187   /* PTS */
2188   time = GST_BUFFER_PTS (buf);
2189
2190   /* invalid left alone and passed */
2191   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
2192     time =
2193         gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME, time);
2194     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
2195       GST_DEBUG_OBJECT (pad, "clipping buffer on pad outside segment");
2196       gst_buffer_unref (buf);
2197       ret = NULL;
2198       goto beach;
2199     } else {
2200       GST_LOG_OBJECT (pad, "buffer pts %" GST_TIME_FORMAT " ->  %"
2201           GST_TIME_FORMAT " running time",
2202           GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
2203       buf = ret = gst_buffer_make_writable (buf);
2204       GST_BUFFER_PTS (ret) = time;
2205     }
2206   }
2207
2208   /* DTS */
2209   time = GST_BUFFER_DTS (buf);
2210
2211   /* invalid left alone and passed */
2212   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
2213     gint sign;
2214     gint64 dts;
2215
2216     sign = gst_segment_to_running_time_full (&agg_pad->segment, GST_FORMAT_TIME,
2217         time, &time);
2218
2219     if (sign > 0)
2220       dts = (gint64) time;
2221     else
2222       dts = -((gint64) time);
2223
2224     GST_LOG_OBJECT (pad, "buffer dts %" GST_TIME_FORMAT " -> %"
2225         GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
2226         GST_STIME_ARGS (dts));
2227
2228     if (GST_CLOCK_STIME_IS_VALID (pad->dts) && dts < pad->dts) {
2229       /* Ignore DTS going backward */
2230       GST_WARNING_OBJECT (pad, "ignoring DTS going backward");
2231       dts = pad->dts;
2232     }
2233
2234     ret = gst_buffer_make_writable (buf);
2235     if (sign > 0)
2236       GST_BUFFER_DTS (ret) = time;
2237     else
2238       GST_BUFFER_DTS (ret) = GST_CLOCK_TIME_NONE;
2239
2240     pad->dts = dts;
2241   } else {
2242     pad->dts = GST_CLOCK_STIME_NONE;
2243   }
2244
2245 beach:
2246   return ret;
2247 }
2248
2249 static GstFlowReturn
2250 gst_base_ts_mux_update_src_caps (GstAggregator * agg, GstCaps * caps,
2251     GstCaps ** ret)
2252 {
2253   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2254   GstStructure *s;
2255
2256   *ret = gst_caps_copy (caps);
2257   s = gst_caps_get_structure (*ret, 0);
2258   gst_structure_set (s, "packetsize", G_TYPE_INT, mux->packet_size, NULL);
2259
2260   return GST_FLOW_OK;
2261 }
2262
2263 static GstBaseTsMuxPad *
2264 gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
2265 {
2266   GstBaseTsMuxPad *best = NULL;
2267   GstClockTime best_ts = GST_CLOCK_TIME_NONE;
2268   GList *l;
2269
2270   GST_OBJECT_LOCK (aggregator);
2271
2272   for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
2273     GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
2274     GstAggregatorPad *apad = GST_AGGREGATOR_PAD_CAST (tpad);
2275     GstBuffer *buffer;
2276
2277     buffer = gst_aggregator_pad_peek_buffer (apad);
2278     if (!buffer)
2279       continue;
2280     if (best_ts == GST_CLOCK_TIME_NONE) {
2281       best = tpad;
2282       best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
2283     } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
2284       GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
2285       if (t < best_ts) {
2286         best = tpad;
2287         best_ts = t;
2288       }
2289     }
2290     gst_buffer_unref (buffer);
2291   }
2292
2293   if (best)
2294     gst_object_ref (best);
2295
2296   GST_OBJECT_UNLOCK (aggregator);
2297
2298   GST_DEBUG_OBJECT (aggregator,
2299       "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
2300       GST_TIME_ARGS (best_ts), best);
2301
2302   return best;
2303 }
2304
2305 static gboolean
2306 gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
2307 {
2308   GList *l;
2309   gboolean ret = TRUE;
2310
2311   GST_OBJECT_LOCK (mux);
2312
2313   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
2314     GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
2315
2316     if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) {
2317       ret = FALSE;
2318       break;
2319     }
2320   }
2321
2322   GST_OBJECT_UNLOCK (mux);
2323
2324   return ret;
2325 }
2326
2327
2328 static GstFlowReturn
2329 gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
2330 {
2331   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2332   GstFlowReturn ret = GST_FLOW_OK;
2333   GstBaseTsMuxPad *best = gst_base_ts_mux_find_best_pad (agg);
2334
2335   if (best) {
2336     GstBuffer *buffer;
2337
2338     buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best));
2339     if (!buffer) {
2340       /* We might have gotten a flush event after we picked the pad */
2341       goto done;
2342     }
2343
2344     ret =
2345         gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
2346         GST_AGGREGATOR_PAD (best), buffer);
2347
2348     gst_object_unref (best);
2349
2350     if (ret != GST_FLOW_OK)
2351       goto done;
2352   }
2353
2354   if (gst_base_ts_mux_are_all_pads_eos (mux)) {
2355     GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
2356     /* drain some possibly cached data */
2357     if (klass->drain)
2358       klass->drain (mux);
2359     gst_base_ts_mux_push_packets (mux, TRUE);
2360
2361     ret = GST_FLOW_EOS;
2362   }
2363
2364 done:
2365   return ret;
2366 }
2367
2368 static gboolean
2369 gst_base_ts_mux_start (GstAggregator * agg)
2370 {
2371   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2372
2373   g_mutex_lock (&mux->lock);
2374   gst_base_ts_mux_reset (mux, TRUE);
2375   g_mutex_unlock (&mux->lock);
2376
2377   return TRUE;
2378 }
2379
2380 static gboolean
2381 gst_base_ts_mux_stop (GstAggregator * agg)
2382 {
2383   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2384
2385   g_mutex_lock (&mux->lock);
2386   gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
2387   g_mutex_unlock (&mux->lock);
2388
2389   return TRUE;
2390 }
2391
2392 /* GObject implementation */
2393
2394 static void
2395 gst_base_ts_mux_dispose (GObject * object)
2396 {
2397   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2398
2399   g_mutex_lock (&mux->lock);
2400   gst_base_ts_mux_reset (mux, FALSE);
2401
2402   if (mux->out_adapter) {
2403     g_object_unref (mux->out_adapter);
2404     mux->out_adapter = NULL;
2405   }
2406   if (mux->prog_map) {
2407     gst_structure_free (mux->prog_map);
2408     mux->prog_map = NULL;
2409   }
2410   if (mux->programs) {
2411     g_hash_table_destroy (mux->programs);
2412     mux->programs = NULL;
2413   }
2414   g_mutex_unlock (&mux->lock);
2415   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
2416 }
2417
2418 static void
2419 gst_base_ts_mux_finalize (GObject * object)
2420 {
2421   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2422
2423   g_mutex_clear (&mux->lock);
2424   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
2425 }
2426
2427 static void
2428 gst_base_ts_mux_constructed (GObject * object)
2429 {
2430   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2431
2432   /* initial state */
2433   g_mutex_lock (&mux->lock);
2434   gst_base_ts_mux_reset (mux, TRUE);
2435   g_mutex_unlock (&mux->lock);
2436 }
2437
2438 static void
2439 gst_base_ts_mux_set_property (GObject * object, guint prop_id,
2440     const GValue * value, GParamSpec * pspec)
2441 {
2442   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2443   GList *l;
2444
2445   switch (prop_id) {
2446     case PROP_PROG_MAP:
2447     {
2448       const GstStructure *s = gst_value_get_structure (value);
2449       if (mux->prog_map) {
2450         gst_structure_free (mux->prog_map);
2451       }
2452       if (s)
2453         mux->prog_map = gst_structure_copy (s);
2454       else
2455         mux->prog_map = NULL;
2456       break;
2457     }
2458     case PROP_PAT_INTERVAL:
2459       mux->pat_interval = g_value_get_uint (value);
2460       g_mutex_lock (&mux->lock);
2461       if (mux->tsmux)
2462         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
2463       g_mutex_unlock (&mux->lock);
2464       break;
2465     case PROP_PMT_INTERVAL:
2466       mux->pmt_interval = g_value_get_uint (value);
2467       GST_OBJECT_LOCK (mux);
2468       for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
2469         GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
2470
2471         g_mutex_lock (&mux->lock);
2472         tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
2473         g_mutex_unlock (&mux->lock);
2474       }
2475       GST_OBJECT_UNLOCK (mux);
2476       break;
2477     case PROP_ALIGNMENT:
2478       mux->alignment = g_value_get_int (value);
2479       break;
2480     case PROP_SI_INTERVAL:
2481       mux->si_interval = g_value_get_uint (value);
2482       g_mutex_lock (&mux->lock);
2483       tsmux_set_si_interval (mux->tsmux, mux->si_interval);
2484       g_mutex_unlock (&mux->lock);
2485       break;
2486     case PROP_BITRATE:
2487       mux->bitrate = g_value_get_uint64 (value);
2488       g_mutex_lock (&mux->lock);
2489       if (mux->tsmux)
2490         tsmux_set_bitrate (mux->tsmux, mux->bitrate);
2491       g_mutex_unlock (&mux->lock);
2492       break;
2493     case PROP_PCR_INTERVAL:
2494       mux->pcr_interval = g_value_get_uint (value);
2495       g_mutex_lock (&mux->lock);
2496       if (mux->tsmux)
2497         tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
2498       g_mutex_unlock (&mux->lock);
2499       break;
2500     case PROP_SCTE_35_PID:
2501       mux->scte35_pid = g_value_get_uint (value);
2502       break;
2503     case PROP_SCTE_35_NULL_INTERVAL:
2504       mux->scte35_null_interval = g_value_get_uint (value);
2505       break;
2506     default:
2507       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2508       break;
2509   }
2510 }
2511
2512 static void
2513 gst_base_ts_mux_get_property (GObject * object, guint prop_id,
2514     GValue * value, GParamSpec * pspec)
2515 {
2516   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2517
2518   switch (prop_id) {
2519     case PROP_PROG_MAP:
2520       gst_value_set_structure (value, mux->prog_map);
2521       break;
2522     case PROP_PAT_INTERVAL:
2523       g_value_set_uint (value, mux->pat_interval);
2524       break;
2525     case PROP_PMT_INTERVAL:
2526       g_value_set_uint (value, mux->pmt_interval);
2527       break;
2528     case PROP_ALIGNMENT:
2529       g_value_set_int (value, mux->alignment);
2530       break;
2531     case PROP_SI_INTERVAL:
2532       g_value_set_uint (value, mux->si_interval);
2533       break;
2534     case PROP_BITRATE:
2535       g_value_set_uint64 (value, mux->bitrate);
2536       break;
2537     case PROP_PCR_INTERVAL:
2538       g_value_set_uint (value, mux->pcr_interval);
2539       break;
2540     case PROP_SCTE_35_PID:
2541       g_value_set_uint (value, mux->scte35_pid);
2542       break;
2543     case PROP_SCTE_35_NULL_INTERVAL:
2544       g_value_set_uint (value, mux->scte35_null_interval);
2545       break;
2546     default:
2547       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2548       break;
2549   }
2550 }
2551
2552 /* Default vmethods implementation */
2553
2554 static TsMux *
2555 gst_base_ts_mux_default_create_ts_mux (GstBaseTsMux * mux)
2556 {
2557   TsMux *tsmux = tsmux_new ();
2558   tsmux_set_write_func (tsmux, new_packet_cb, mux);
2559   tsmux_set_alloc_func (tsmux, alloc_packet_cb, mux);
2560   tsmux_set_pat_interval (tsmux, mux->pat_interval);
2561   tsmux_set_si_interval (tsmux, mux->si_interval);
2562   tsmux_set_bitrate (tsmux, mux->bitrate);
2563   tsmux_set_pcr_interval (tsmux, mux->pcr_interval);
2564
2565   return tsmux;
2566 }
2567
2568 static void
2569 gst_base_ts_mux_default_allocate_packet (GstBaseTsMux * mux,
2570     GstBuffer ** buffer)
2571 {
2572   GstBuffer *buf;
2573
2574   buf = gst_buffer_new_and_alloc (mux->packet_size);
2575
2576   *buffer = buf;
2577 }
2578
2579 static gboolean
2580 gst_base_ts_mux_default_output_packet (GstBaseTsMux * mux, GstBuffer * buffer,
2581     gint64 new_pcr)
2582 {
2583   gst_base_ts_mux_collect_packet (mux, buffer);
2584
2585   return TRUE;
2586 }
2587
2588 /* Subclass API */
2589
2590 void
2591 gst_base_ts_mux_set_packet_size (GstBaseTsMux * mux, gsize size)
2592 {
2593   mux->packet_size = size;
2594 }
2595
2596 void
2597 gst_base_ts_mux_set_automatic_alignment (GstBaseTsMux * mux, gsize alignment)
2598 {
2599   mux->automatic_alignment = alignment;
2600 }
2601
2602 static void
2603 gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
2604 {
2605   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
2606   GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass);
2607   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
2608
2609   GST_DEBUG_CATEGORY_INIT (gst_base_ts_mux_debug, "basetsmux", 0,
2610       "MPEG Transport Stream muxer");
2611
2612   gst_element_class_set_static_metadata (gstelement_class,
2613       "MPEG Transport Stream Muxer", "Codec/Muxer",
2614       "Multiplexes media streams into an MPEG Transport Stream",
2615       "Fluendo <contact@fluendo.com>");
2616
2617   gobject_class->set_property =
2618       GST_DEBUG_FUNCPTR (gst_base_ts_mux_set_property);
2619   gobject_class->get_property =
2620       GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property);
2621   gobject_class->dispose = gst_base_ts_mux_dispose;
2622   gobject_class->finalize = gst_base_ts_mux_finalize;
2623   gobject_class->constructed = gst_base_ts_mux_constructed;
2624
2625   gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
2626   gstelement_class->release_pad = gst_base_ts_mux_release_pad;
2627   gstelement_class->send_event = gst_base_ts_mux_send_event;
2628
2629   gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps;
2630   gstagg_class->aggregate = gst_base_ts_mux_aggregate;
2631   gstagg_class->clip = gst_base_ts_mux_clip;
2632   gstagg_class->sink_event = gst_base_ts_mux_sink_event;
2633   gstagg_class->src_event = gst_base_ts_mux_src_event;
2634   gstagg_class->start = gst_base_ts_mux_start;
2635   gstagg_class->stop = gst_base_ts_mux_stop;
2636
2637   klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux;
2638   klass->allocate_packet = gst_base_ts_mux_default_allocate_packet;
2639   klass->output_packet = gst_base_ts_mux_default_output_packet;
2640
2641   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PROG_MAP,
2642       g_param_spec_boxed ("prog-map", "Program map",
2643           "A GstStructure specifies the mapping from elementary streams to programs",
2644           GST_TYPE_STRUCTURE,
2645           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2646
2647   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PAT_INTERVAL,
2648       g_param_spec_uint ("pat-interval", "PAT interval",
2649           "Set the interval (in ticks of the 90kHz clock) for writing out the PAT table",
2650           1, G_MAXUINT, TSMUX_DEFAULT_PAT_INTERVAL,
2651           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2652
2653   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PMT_INTERVAL,
2654       g_param_spec_uint ("pmt-interval", "PMT interval",
2655           "Set the interval (in ticks of the 90kHz clock) for writing out the PMT table",
2656           1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL,
2657           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2658
2659   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_ALIGNMENT,
2660       g_param_spec_int ("alignment", "packet alignment",
2661           "Number of packets per buffer (padded with dummy packets on EOS) "
2662           "(-1 = auto, 0 = all available packets, 7 for UDP streaming)",
2663           -1, G_MAXINT, BASETSMUX_DEFAULT_ALIGNMENT,
2664           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2665
2666   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SI_INTERVAL,
2667       g_param_spec_uint ("si-interval", "SI interval",
2668           "Set the interval (in ticks of the 90kHz clock) for writing out the Service"
2669           "Information tables", 1, G_MAXUINT, TSMUX_DEFAULT_SI_INTERVAL,
2670           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2671
2672   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BITRATE,
2673       g_param_spec_uint64 ("bitrate", "Bitrate (in bits per second)",
2674           "Set the target bitrate, will insert null packets as padding "
2675           " to achieve multiplex-wide constant bitrate (0 means no padding)",
2676           0, G_MAXUINT64, TSMUX_DEFAULT_BITRATE,
2677           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2678
2679   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PCR_INTERVAL,
2680       g_param_spec_uint ("pcr-interval", "PCR interval",
2681           "Set the interval (in ticks of the 90kHz clock) for writing PCR",
2682           1, G_MAXUINT, TSMUX_DEFAULT_PCR_INTERVAL,
2683           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2684
2685   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SCTE_35_PID,
2686       g_param_spec_uint ("scte-35-pid", "SCTE-35 PID",
2687           "PID to use for inserting SCTE-35 packets (0: unused)",
2688           0, G_MAXUINT, DEFAULT_SCTE_35_PID,
2689           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2690
2691   g_object_class_install_property (G_OBJECT_CLASS (klass),
2692       PROP_SCTE_35_NULL_INTERVAL, g_param_spec_uint ("scte-35-null-interval",
2693           "SCTE-35 NULL packet interval",
2694           "Set the interval (in ticks of the 90kHz clock) for writing SCTE-35 NULL (heartbeat) packets."
2695           " (only valid if scte-35-pid is different from 0)", 1, G_MAXUINT,
2696           TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL,
2697           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2698
2699   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
2700       &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
2701
2702   gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX_PAD, 0);
2703 }
2704
2705 static void
2706 gst_base_ts_mux_init (GstBaseTsMux * mux)
2707 {
2708   mux->out_adapter = gst_adapter_new ();
2709
2710   /* properties */
2711   mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL;
2712   mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL;
2713   mux->si_interval = TSMUX_DEFAULT_SI_INTERVAL;
2714   mux->pcr_interval = TSMUX_DEFAULT_PCR_INTERVAL;
2715   mux->prog_map = NULL;
2716   mux->alignment = BASETSMUX_DEFAULT_ALIGNMENT;
2717   mux->bitrate = TSMUX_DEFAULT_BITRATE;
2718   mux->scte35_pid = DEFAULT_SCTE_35_PID;
2719   mux->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
2720
2721   mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
2722   mux->automatic_alignment = 0;
2723
2724   g_mutex_init (&mux->lock);
2725 }