78e7cbd5688bbce1cf575966489779138fc02550
[platform/upstream/gstreamer.git] / gst / mpegtsmux / mpegtsmux.c
1 /* 
2  * Copyright 2006, 2007, 2008, 2009, 2010 Fluendo S.A. 
3  *  Authors: Jan Schmidt <jan@fluendo.com>
4  *           Kapil Agrawal <kapil@fluendo.com>
5  *           Julien Moutte <julien@fluendo.com>
6  *
7  * Copyright (C) 2011 Jan Schmidt <thaytan@noraisin.net>
8  *
9  * This library is licensed under 4 different licenses and you
10  * can choose to use it under the terms of any one of them. The
11  * four licenses are the MPL 1.1, the LGPL, the GPL and the MIT
12  * license.
13  *
14  * MPL:
15  * 
16  * The contents of this file are subject to the Mozilla Public License
17  * Version 1.1 (the "License"); you may not use this file except in
18  * compliance with the License. You may obtain a copy of the License at
19  * http://www.mozilla.org/MPL/.
20  *
21  * Software distributed under the License is distributed on an "AS IS"
22  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
23  * License for the specific language governing rights and limitations
24  * under the License.
25  *
26  * LGPL:
27  *
28  * This library is free software; you can redistribute it and/or
29  * modify it under the terms of the GNU Library General Public
30  * License as published by the Free Software Foundation; either
31  * version 2 of the License, or (at your option) any later version.
32  *
33  * This library is distributed in the hope that it will be useful,
34  * but WITHOUT ANY WARRANTY; without even the implied warranty of
35  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
36  * Library General Public License for more details.
37  *
38  * You should have received a copy of the GNU Library General Public
39  * License along with this library; if not, write to the
40  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
41  * Boston, MA 02110-1301, USA.
42  *
43  * GPL:
44  *
45  * This program is free software; you can redistribute it and/or modify
46  * it under the terms of the GNU General Public License as published by
47  * the Free Software Foundation; either version 2 of the License, or
48  * (at your option) any later version.
49  *
50  * This program is distributed in the hope that it will be useful,
51  * but WITHOUT ANY WARRANTY; without even the implied warranty of
52  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
53  * GNU General Public License for more details.
54  *
55  * You should have received a copy of the GNU General Public License
56  * along with this program; if not, write to the Free Software
57  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
58  *
59  * MIT:
60  *
61  * Unless otherwise indicated, Source Code is licensed under MIT license.
62  * See further explanation attached in License Statement (distributed in the file
63  * LICENSE).
64  * 
65  * Permission is hereby granted, free of charge, to any person obtaining a copy of
66  * this software and associated documentation files (the "Software"), to deal in
67  * the Software without restriction, including without limitation the rights to
68  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
69  * of the Software, and to permit persons to whom the Software is furnished to do
70  * so, subject to the following conditions:
71  * 
72  * The above copyright notice and this permission notice shall be included in all
73  * copies or substantial portions of the Software.
74  * 
75  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
76  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
77  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
78  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
79  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
80  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
81  * SOFTWARE.
82  *
83  */
84
85 #ifdef HAVE_CONFIG_H
86 #include "config.h"
87 #endif
88 #include <stdio.h>
89 #include <string.h>
90
91 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
92  * with newer GLib versions (>= 2.31.0) */
93 #define GLIB_DISABLE_DEPRECATION_WARNINGS
94
95 #include <gst/video/video.h>
96
97 #include "mpegtsmux.h"
98
99 #include "mpegtsmux_aac.h"
100 #include "mpegtsmux_ttxt.h"
101
102 GST_DEBUG_CATEGORY (mpegtsmux_debug);
103 #define GST_CAT_DEFAULT mpegtsmux_debug
104
105 enum
106 {
107   ARG_0,
108   ARG_PROG_MAP,
109   ARG_M2TS_MODE,
110   ARG_PAT_INTERVAL,
111   ARG_PMT_INTERVAL,
112   ARG_ALIGNMENT
113 };
114
115 #define MPEGTSMUX_DEFAULT_ALIGNMENT    -1
116 #define MPEGTSMUX_DEFAULT_M2TS         FALSE
117
118 static GstStaticPadTemplate mpegtsmux_sink_factory =
119     GST_STATIC_PAD_TEMPLATE ("sink_%d",
120     GST_PAD_SINK,
121     GST_PAD_REQUEST,
122     GST_STATIC_CAPS ("video/mpeg, "
123         "mpegversion = (int) { 1, 2, 4 }, "
124         "systemstream = (boolean) false; "
125         "video/x-dirac;"
126         "video/x-h264,stream-format=(string)byte-stream;"
127         "audio/mpeg, "
128         "mpegversion = (int) { 1, 2 };"
129         "audio/mpeg, "
130         "mpegversion = (int) 4, stream-format = (string) { raw, adts };"
131         "audio/x-lpcm, "
132         "width = (int) { 16, 20, 24 }, "
133         "rate = (int) { 48000, 96000 }, "
134         "channels = (int) [ 1, 8 ], "
135         "dynamic_range = (int) [ 0, 255 ], "
136         "emphasis = (boolean) { FALSE, TRUE }, "
137         "mute = (boolean) { FALSE, TRUE }; " "audio/x-ac3;" "audio/x-dts;"
138         "subpicture/x-dvb;" "private/teletext"));
139
140 static GstStaticPadTemplate mpegtsmux_src_factory =
141 GST_STATIC_PAD_TEMPLATE ("src",
142     GST_PAD_SRC,
143     GST_PAD_ALWAYS,
144     GST_STATIC_CAPS ("video/mpegts, "
145         "systemstream = (boolean) true, " "packetsize = (int) { 188, 192} ")
146     );
147
148 static void gst_mpegtsmux_set_property (GObject * object, guint prop_id,
149     const GValue * value, GParamSpec * pspec);
150 static void gst_mpegtsmux_get_property (GObject * object, guint prop_id,
151     GValue * value, GParamSpec * pspec);
152
153 static void mpegtsmux_reset (MpegTsMux * mux, gboolean alloc);
154 static void mpegtsmux_dispose (GObject * object);
155 static void alloc_packet_cb (GstBuffer ** _buf, void *user_data);
156 static gboolean new_packet_cb (GstBuffer * buf, void *user_data,
157     gint64 new_pcr);
158 static void release_buffer_cb (guint8 * data, void *user_data);
159 static GstFlowReturn mpegtsmux_collect_packet (MpegTsMux * mux,
160     GstBuffer * buf);
161 static GstFlowReturn mpegtsmux_push_packets (MpegTsMux * mux, gboolean force);
162 static gboolean new_packet_m2ts (MpegTsMux * mux, GstBuffer * buf,
163     gint64 new_pcr);
164
165 static void mpegtsdemux_prepare_srcpad (MpegTsMux * mux);
166 GstFlowReturn mpegtsmux_clip_inc_running_time (GstCollectPads * pads,
167     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
168     gpointer user_data);
169 static GstFlowReturn mpegtsmux_collected_buffer (GstCollectPads * pads,
170     GstCollectData * data, GstBuffer * buf, MpegTsMux * mux);
171
172 static gboolean mpegtsmux_sink_event (GstCollectPads * pads,
173     GstCollectData * data, GstEvent * event, gpointer user_data);
174 static GstPad *mpegtsmux_request_new_pad (GstElement * element,
175     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
176 static void mpegtsmux_release_pad (GstElement * element, GstPad * pad);
177 static GstStateChangeReturn mpegtsmux_change_state (GstElement * element,
178     GstStateChange transition);
179 static void mpegtsdemux_set_header_on_caps (MpegTsMux * mux);
180 static gboolean mpegtsmux_src_event (GstPad * pad, GstObject * parent,
181     GstEvent * event);
182
183 #if 0
184 static void mpegtsmux_set_index (GstElement * element, GstIndex * index);
185 static GstIndex *mpegtsmux_get_index (GstElement * element);
186
187 static GstFormat pts_format;
188 static GstFormat spn_format;
189 #endif
190
191 typedef struct
192 {
193   GstMapInfo map_info;
194   GstBuffer *buffer;
195 } StreamData;
196
197 G_DEFINE_TYPE (MpegTsMux, mpegtsmux, GST_TYPE_ELEMENT)
198
199 /* Takes over the ref on the buffer */
200      static StreamData *stream_data_new (GstBuffer * buffer)
201 {
202   StreamData *res = g_new (StreamData, 1);
203   res->buffer = buffer;
204   gst_buffer_map (buffer, &(res->map_info), GST_MAP_READ);
205
206   return res;
207 }
208
209 static void
210 stream_data_free (StreamData * data)
211 {
212   if (data) {
213     gst_buffer_unmap (data->buffer, &data->map_info);
214     gst_buffer_unref (data->buffer);
215     g_free (data);
216   }
217 }
218
219 #define parent_class mpegtsmux_parent_class
220
221 static void
222 mpegtsmux_class_init (MpegTsMuxClass * klass)
223 {
224   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
225   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
226
227   gst_element_class_add_pad_template (gstelement_class,
228       gst_static_pad_template_get (&mpegtsmux_sink_factory));
229   gst_element_class_add_pad_template (gstelement_class,
230       gst_static_pad_template_get (&mpegtsmux_src_factory));
231
232   gst_element_class_set_static_metadata (gstelement_class,
233       "MPEG Transport Stream Muxer", "Codec/Muxer",
234       "Multiplexes media streams into an MPEG Transport Stream",
235       "Fluendo <contact@fluendo.com>");
236
237   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_mpegtsmux_set_property);
238   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_mpegtsmux_get_property);
239   gobject_class->dispose = mpegtsmux_dispose;
240
241   gstelement_class->request_new_pad = mpegtsmux_request_new_pad;
242   gstelement_class->release_pad = mpegtsmux_release_pad;
243   gstelement_class->change_state = mpegtsmux_change_state;
244
245 #if 0
246   gstelement_class->set_index = GST_DEBUG_FUNCPTR (mpegtsmux_set_index);
247   gstelement_class->get_index = GST_DEBUG_FUNCPTR (mpegtsmux_get_index);
248 #endif
249
250   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PROG_MAP,
251       g_param_spec_boxed ("prog-map", "Program map",
252           "A GstStructure specifies the mapping from elementary streams to programs",
253           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254
255   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_M2TS_MODE,
256       g_param_spec_boolean ("m2ts-mode", "M2TS(192 bytes) Mode",
257           "Set to TRUE to output Blu-Ray disc format with 192 byte packets. "
258           "FALSE for standard TS format with 188 byte packets.",
259           MPEGTSMUX_DEFAULT_M2TS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
260
261   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PAT_INTERVAL,
262       g_param_spec_uint ("pat-interval", "PAT interval",
263           "Set the interval (in ticks of the 90kHz clock) for writing out the PAT table",
264           1, G_MAXUINT, TSMUX_DEFAULT_PAT_INTERVAL,
265           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266
267   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PMT_INTERVAL,
268       g_param_spec_uint ("pmt-interval", "PMT interval",
269           "Set the interval (in ticks of the 90kHz clock) for writing out the PMT table",
270           1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL,
271           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
272
273   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ALIGNMENT,
274       g_param_spec_int ("alignment", "packet alignment",
275           "Number of packets per buffer (padded with dummy packets on EOS) "
276           "(-1 = auto, 0 = all available packets)",
277           -1, G_MAXINT, MPEGTSMUX_DEFAULT_ALIGNMENT,
278           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
279 }
280
281 static void
282 mpegtsmux_init (MpegTsMux * mux)
283 {
284   mux->srcpad =
285       gst_pad_new_from_static_template (&mpegtsmux_src_factory, "src");
286   gst_pad_use_fixed_caps (mux->srcpad);
287   gst_pad_set_event_function (mux->srcpad,
288       GST_DEBUG_FUNCPTR (mpegtsmux_src_event));
289   gst_element_add_pad (GST_ELEMENT (mux), mux->srcpad);
290
291   mux->collect = gst_collect_pads_new ();
292   gst_collect_pads_set_buffer_function (mux->collect,
293       (GstCollectPadsBufferFunction)
294       GST_DEBUG_FUNCPTR (mpegtsmux_collected_buffer), mux);
295
296   gst_collect_pads_set_event_function (mux->collect,
297       (GstCollectPadsEventFunction) GST_DEBUG_FUNCPTR (mpegtsmux_sink_event),
298       mux);
299   gst_collect_pads_set_clip_function (mux->collect, (GstCollectPadsClipFunction)
300       GST_DEBUG_FUNCPTR (mpegtsmux_clip_inc_running_time), mux);
301
302   mux->tsmux = tsmux_new ();
303   tsmux_set_write_func (mux->tsmux, new_packet_cb, mux);
304
305   mux->adapter = gst_adapter_new ();
306   mux->out_adapter = gst_adapter_new ();
307
308   /* properties */
309   mux->m2ts_mode = MPEGTSMUX_DEFAULT_M2TS;
310   mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL;
311   mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL;
312   mux->prog_map = NULL;
313   mux->alignment = MPEGTSMUX_DEFAULT_ALIGNMENT;
314
315   /* initial state */
316   mpegtsmux_reset (mux, TRUE);
317 }
318
319 static void
320 mpegtsmux_pad_reset (MpegTsPadData * pad_data)
321 {
322   pad_data->pid = 0;
323   pad_data->last_pts = GST_CLOCK_TIME_NONE;
324   pad_data->last_dts = GST_CLOCK_TIME_NONE;
325   pad_data->prog_id = -1;
326 #if 0
327   pad_data->prog_id = -1;
328   pad_data->element_index_writer_id = -1;
329 #endif
330
331   if (pad_data->free_func)
332     pad_data->free_func (pad_data->prepare_data);
333   pad_data->prepare_data = NULL;
334   pad_data->prepare_func = NULL;
335   pad_data->free_func = NULL;
336
337   if (pad_data->codec_data)
338     gst_buffer_replace (&pad_data->codec_data, NULL);
339
340   /* reference owned elsewhere */
341   pad_data->stream = NULL;
342   pad_data->prog = NULL;
343 }
344
345 static void
346 mpegtsmux_reset (MpegTsMux * mux, gboolean alloc)
347 {
348   GSList *walk;
349
350   mux->first = TRUE;
351   mux->last_flow_ret = GST_FLOW_OK;
352   mux->previous_pcr = -1;
353   mux->pcr_rate_num = mux->pcr_rate_den = 1;
354   mux->last_ts = 0;
355   mux->is_delta = TRUE;
356
357   mux->streamheader_sent = FALSE;
358   mux->force_key_unit_event = NULL;
359   mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
360 #if 0
361   mux->spn_count = 0;
362
363   if (mux->element_index) {
364     gst_object_unref (mux->element_index);
365     mux->element_index = NULL;
366   }
367 #endif
368   gst_adapter_clear (mux->adapter);
369   gst_adapter_clear (mux->out_adapter);
370
371   if (mux->tsmux) {
372     tsmux_free (mux->tsmux);
373     mux->tsmux = NULL;
374   }
375
376   memset (mux->programs, 0, sizeof (mux->programs));
377
378   if (mux->streamheader) {
379     GstBuffer *buf;
380     GList *sh;
381
382     sh = mux->streamheader;
383     while (sh) {
384       buf = sh->data;
385       gst_buffer_unref (buf);
386       sh = g_list_next (sh);
387     }
388     g_list_free (mux->streamheader);
389     mux->streamheader = NULL;
390   }
391   gst_event_replace (&mux->force_key_unit_event, NULL);
392   gst_buffer_replace (&mux->out_buffer, NULL);
393
394   GST_COLLECT_PADS_STREAM_LOCK (mux->collect);
395   for (walk = mux->collect->data; walk != NULL; walk = g_slist_next (walk))
396     mpegtsmux_pad_reset ((MpegTsPadData *) walk->data);
397   GST_COLLECT_PADS_STREAM_UNLOCK (mux->collect);
398
399   if (alloc) {
400     mux->tsmux = tsmux_new ();
401     tsmux_set_write_func (mux->tsmux, new_packet_cb, mux);
402     tsmux_set_alloc_func (mux->tsmux, alloc_packet_cb, mux);
403   }
404 }
405
406 static void
407 mpegtsmux_dispose (GObject * object)
408 {
409   MpegTsMux *mux = GST_MPEG_TSMUX (object);
410
411   mpegtsmux_reset (mux, FALSE);
412
413   if (mux->adapter) {
414     g_object_unref (mux->adapter);
415     mux->adapter = NULL;
416   }
417   if (mux->out_adapter) {
418     g_object_unref (mux->out_adapter);
419     mux->out_adapter = NULL;
420   }
421   if (mux->collect) {
422     gst_object_unref (mux->collect);
423     mux->collect = NULL;
424   }
425   if (mux->prog_map) {
426     gst_structure_free (mux->prog_map);
427     mux->prog_map = NULL;
428   }
429   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
430 }
431
432 static void
433 gst_mpegtsmux_set_property (GObject * object, guint prop_id,
434     const GValue * value, GParamSpec * pspec)
435 {
436   MpegTsMux *mux = GST_MPEG_TSMUX (object);
437   GSList *walk;
438
439   switch (prop_id) {
440     case ARG_M2TS_MODE:
441       /*set incase if the output stream need to be of 192 bytes */
442       mux->m2ts_mode = g_value_get_boolean (value);
443       break;
444     case ARG_PROG_MAP:
445     {
446       const GstStructure *s = gst_value_get_structure (value);
447       if (mux->prog_map) {
448         gst_structure_free (mux->prog_map);
449       }
450       if (s)
451         mux->prog_map = gst_structure_copy (s);
452       else
453         mux->prog_map = NULL;
454       break;
455     }
456     case ARG_PAT_INTERVAL:
457       mux->pat_interval = g_value_get_uint (value);
458       if (mux->tsmux)
459         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
460       break;
461     case ARG_PMT_INTERVAL:
462       walk = mux->collect->data;
463       mux->pmt_interval = g_value_get_uint (value);
464
465       while (walk) {
466         MpegTsPadData *ts_data = (MpegTsPadData *) walk->data;
467
468         tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
469         walk = g_slist_next (walk);
470       }
471       break;
472     case ARG_ALIGNMENT:
473       mux->alignment = g_value_get_int (value);
474       break;
475     default:
476       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
477       break;
478   }
479 }
480
481 static void
482 gst_mpegtsmux_get_property (GObject * object, guint prop_id,
483     GValue * value, GParamSpec * pspec)
484 {
485   MpegTsMux *mux = GST_MPEG_TSMUX (object);
486
487   switch (prop_id) {
488     case ARG_M2TS_MODE:
489       g_value_set_boolean (value, mux->m2ts_mode);
490       break;
491     case ARG_PROG_MAP:
492       gst_value_set_structure (value, mux->prog_map);
493       break;
494     case ARG_PAT_INTERVAL:
495       g_value_set_uint (value, mux->pat_interval);
496       break;
497     case ARG_PMT_INTERVAL:
498       g_value_set_uint (value, mux->pmt_interval);
499       break;
500     case ARG_ALIGNMENT:
501       g_value_set_int (value, mux->alignment);
502       break;
503     default:
504       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
505       break;
506   }
507 }
508
509 #if 0
510 static void
511 mpegtsmux_set_index (GstElement * element, GstIndex * index)
512 {
513   MpegTsMux *mux = GST_MPEG_TSMUX (element);
514
515   GST_OBJECT_LOCK (mux);
516   if (mux->element_index)
517     gst_object_unref (mux->element_index);
518   mux->element_index = index ? gst_object_ref (index) : NULL;
519   GST_OBJECT_UNLOCK (mux);
520
521   GST_DEBUG_OBJECT (mux, "Set index %" GST_PTR_FORMAT, mux->element_index);
522 }
523
524 static GstIndex *
525 mpegtsmux_get_index (GstElement * element)
526 {
527   GstIndex *result = NULL;
528   MpegTsMux *mux = GST_MPEG_TSMUX (element);
529
530   GST_OBJECT_LOCK (mux);
531   if (mux->element_index)
532     result = gst_object_ref (mux->element_index);
533   GST_OBJECT_UNLOCK (mux);
534
535   GST_DEBUG_OBJECT (mux, "Returning index %" GST_PTR_FORMAT, result);
536
537   return result;
538 }
539 #endif
540
541 static void
542 release_buffer_cb (guint8 * data, void *user_data)
543 {
544   stream_data_free (user_data);
545 }
546
547 static GstFlowReturn
548 mpegtsmux_create_stream (MpegTsMux * mux, MpegTsPadData * ts_data)
549 {
550   GstFlowReturn ret = GST_FLOW_ERROR;
551   GstCaps *caps;
552   GstStructure *s;
553   GstPad *pad;
554   TsMuxStreamType st = TSMUX_ST_RESERVED;
555   const gchar *mt;
556   const GValue *value = NULL;
557   GstBuffer *codec_data = NULL;
558
559   pad = ts_data->collect.pad;
560   caps = gst_pad_get_current_caps (pad);
561   if (caps == NULL)
562     goto not_negotiated;
563
564   GST_DEBUG_OBJECT (pad, "Creating stream with PID 0x%04x for caps %"
565       GST_PTR_FORMAT, ts_data->pid, caps);
566
567   s = gst_caps_get_structure (caps, 0);
568   g_return_val_if_fail (s != NULL, FALSE);
569
570   mt = gst_structure_get_name (s);
571   value = gst_structure_get_value (s, "codec_data");
572   if (value != NULL)
573     codec_data = gst_value_get_buffer (value);
574
575   if (strcmp (mt, "video/x-dirac") == 0) {
576     st = TSMUX_ST_VIDEO_DIRAC;
577   } else if (strcmp (mt, "audio/x-ac3") == 0) {
578     st = TSMUX_ST_PS_AUDIO_AC3;
579   } else if (strcmp (mt, "audio/x-dts") == 0) {
580     st = TSMUX_ST_PS_AUDIO_DTS;
581   } else if (strcmp (mt, "audio/x-lpcm") == 0) {
582     st = TSMUX_ST_PS_AUDIO_LPCM;
583   } else if (strcmp (mt, "video/x-h264") == 0) {
584     st = TSMUX_ST_VIDEO_H264;
585   } else if (strcmp (mt, "audio/mpeg") == 0) {
586     gint mpegversion;
587
588     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
589       GST_ERROR_OBJECT (pad, "caps missing mpegversion");
590       goto not_negotiated;
591     }
592
593     switch (mpegversion) {
594       case 1:
595         st = TSMUX_ST_AUDIO_MPEG1;
596         break;
597       case 2:
598         st = TSMUX_ST_AUDIO_MPEG2;
599         break;
600       case 4:
601       {
602         st = TSMUX_ST_AUDIO_AAC;
603         if (codec_data) {       /* TODO - Check stream format - codec data should only come with RAW stream */
604           GST_DEBUG_OBJECT (pad,
605               "we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
606               gst_buffer_get_size (codec_data));
607           ts_data->codec_data = gst_buffer_ref (codec_data);
608           ts_data->prepare_func = mpegtsmux_prepare_aac;
609         } else {
610           ts_data->codec_data = NULL;
611         }
612         break;
613       }
614       default:
615         GST_WARNING_OBJECT (pad, "unsupported mpegversion %d", mpegversion);
616         goto not_negotiated;
617     }
618   } else if (strcmp (mt, "video/mpeg") == 0) {
619     gint mpegversion;
620
621     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
622       GST_ERROR_OBJECT (pad, "caps missing mpegversion");
623       goto not_negotiated;
624     }
625
626     switch (mpegversion) {
627       case 1:
628         st = TSMUX_ST_VIDEO_MPEG1;
629         break;
630       case 2:
631         st = TSMUX_ST_VIDEO_MPEG2;
632         break;
633       case 4:
634         st = TSMUX_ST_VIDEO_MPEG4;
635         break;
636       default:
637         GST_WARNING_OBJECT (pad, "unsupported mpegversion %d", mpegversion);
638         goto not_negotiated;
639     }
640   } else if (strcmp (mt, "subpicture/x-dvb") == 0) {
641     st = TSMUX_ST_PS_DVB_SUBPICTURE;
642   } else if (strcmp (mt, "private/teletext") == 0) {
643     st = TSMUX_ST_PS_TELETEXT;
644     /* needs a particularly sized layout */
645     ts_data->prepare_func = mpegtsmux_prepare_teletext;
646   }
647
648   if (st != TSMUX_ST_RESERVED) {
649     ts_data->stream = tsmux_create_stream (mux->tsmux, st, ts_data->pid);
650   } else {
651     GST_DEBUG_OBJECT (pad, "Failed to determine stream type");
652   }
653
654   if (ts_data->stream != NULL) {
655     gst_structure_get_int (s, "rate", &ts_data->stream->audio_sampling);
656     gst_structure_get_int (s, "channels", &ts_data->stream->audio_channels);
657     gst_structure_get_int (s, "bitrate", &ts_data->stream->audio_bitrate);
658
659     tsmux_stream_set_buffer_release_func (ts_data->stream, release_buffer_cb);
660     tsmux_program_add_stream (ts_data->prog, ts_data->stream);
661
662     ret = GST_FLOW_OK;
663   }
664 #if 0
665   GST_OBJECT_LOCK (mux);
666   if (mux->element_index) {
667     gboolean parsed = FALSE;
668
669     if (ts_data->stream->is_video_stream) {
670       if (gst_structure_get_boolean (s, "parsed", &parsed) && parsed) {
671         if (ts_data->element_index_writer_id == -1) {
672           gst_index_get_writer_id (mux->element_index, GST_OBJECT (mux),
673               &ts_data->element_index_writer_id);
674           GST_DEBUG_OBJECT (mux, "created GstIndex writer_id = %d for stream",
675               ts_data->element_index_writer_id);
676           gst_index_add_format (mux->element_index,
677               ts_data->element_index_writer_id, pts_format);
678           gst_index_add_format (mux->element_index,
679               ts_data->element_index_writer_id, spn_format);
680         }
681       } else {
682         GST_WARNING_OBJECT (pad, "no indexing for (unparsed) stream !");
683       }
684     }
685   }
686   GST_OBJECT_UNLOCK (mux);
687 #endif
688
689   gst_caps_unref (caps);
690   return ret;
691
692   /* ERRORS */
693 not_negotiated:
694   {
695     GST_DEBUG_OBJECT (pad, "Sink pad caps were not set before pushing");
696     if (caps)
697       gst_caps_unref (caps);
698     return GST_FLOW_NOT_NEGOTIATED;
699   }
700 }
701
702 static GstFlowReturn
703 mpegtsmux_create_streams (MpegTsMux * mux)
704 {
705   GstFlowReturn ret = GST_FLOW_OK;
706   GSList *walk = mux->collect->data;
707
708   /* Create the streams */
709   while (walk) {
710     GstCollectData *c_data = (GstCollectData *) walk->data;
711     MpegTsPadData *ts_data = (MpegTsPadData *) walk->data;
712     gchar *name = NULL;
713
714     walk = g_slist_next (walk);
715
716     if (ts_data->prog_id == -1) {
717       name = GST_PAD_NAME (c_data->pad);
718       if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map,
719               name)) {
720         gint idx;
721         gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
722         if (!ret) {
723           GST_ELEMENT_ERROR (mux, STREAM, MUX,
724               ("Reading program map failed. Assuming default"), (NULL));
725           idx = DEFAULT_PROG_ID;
726         }
727         if (idx < 0 || idx >= MAX_PROG_NUMBER) {
728           GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s out "
729               "of range (max = %d); DEFAULT_PROGRAM = %d is used instead",
730               idx, name, MAX_PROG_NUMBER, DEFAULT_PROG_ID);
731           idx = DEFAULT_PROG_ID;
732         }
733         ts_data->prog_id = idx;
734       } else {
735         ts_data->prog_id = DEFAULT_PROG_ID;
736       }
737     }
738
739     ts_data->prog = mux->programs[ts_data->prog_id];
740     if (ts_data->prog == NULL) {
741       ts_data->prog = tsmux_program_new (mux->tsmux);
742       if (ts_data->prog == NULL)
743         goto no_program;
744       tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
745       mux->programs[ts_data->prog_id] = ts_data->prog;
746     }
747
748     if (ts_data->stream == NULL) {
749       ret = mpegtsmux_create_stream (mux, ts_data);
750       if (ret != GST_FLOW_OK)
751         goto no_stream;
752     }
753   }
754
755   return GST_FLOW_OK;
756
757   /* ERRORS */
758 no_program:
759   {
760     GST_ELEMENT_ERROR (mux, STREAM, MUX,
761         ("Could not create new program"), (NULL));
762     return GST_FLOW_ERROR;
763   }
764 no_stream:
765   {
766     GST_ELEMENT_ERROR (mux, STREAM, MUX,
767         ("Could not create handler for stream"), (NULL));
768     return ret;
769   }
770 }
771
772
773 #define COLLECT_DATA_PAD(collect_data) (((GstCollectData *)(collect_data))->pad)
774
775 static gboolean
776 mpegtsmux_sink_event (GstCollectPads * pads, GstCollectData * data,
777     GstEvent * event, gpointer user_data)
778 {
779   MpegTsMux *mux = GST_MPEG_TSMUX (user_data);
780   gboolean res = FALSE;
781   gboolean forward = TRUE;
782   GstPad *pad;
783
784   pad = data->pad;
785
786   switch (GST_EVENT_TYPE (event)) {
787     case GST_EVENT_CUSTOM_DOWNSTREAM:
788     {
789       GstClockTime timestamp, stream_time, running_time;
790       gboolean all_headers;
791       guint count;
792
793       if (!gst_video_event_is_force_key_unit (event))
794         goto out;
795
796       res = TRUE;
797       forward = FALSE;
798
799       gst_video_event_parse_downstream_force_key_unit (event,
800           &timestamp, &stream_time, &running_time, &all_headers, &count);
801       GST_INFO_OBJECT (mux, "have downstream force-key-unit event on pad %s, "
802           "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
803           gst_pad_get_name (pad), gst_event_get_seqnum (event),
804           GST_TIME_ARGS (running_time), count);
805
806       if (mux->force_key_unit_event != NULL) {
807         GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
808             "as an upstream force key unit is already queued");
809         goto out;
810       }
811
812       if (!all_headers)
813         goto out;
814
815       mux->pending_key_unit_ts = running_time;
816       gst_event_replace (&mux->force_key_unit_event, event);
817       break;
818     }
819     default:
820       break;
821   }
822
823 out:
824   if (!forward)
825     gst_event_unref (event);
826   else
827     res = gst_collect_pads_event_default (pads, data, event, FALSE);
828
829   return res;
830 }
831
832 static gboolean
833 mpegtsmux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
834 {
835   MpegTsMux *mux = GST_MPEG_TSMUX (gst_pad_get_parent (pad));
836   gboolean res = TRUE, forward = TRUE;
837
838   switch (GST_EVENT_TYPE (event)) {
839     case GST_EVENT_CUSTOM_UPSTREAM:
840     {
841       GstIterator *iter;
842       GstIteratorResult iter_ret;
843       GstPad *sinkpad;
844       GValue sinkpad_value = G_VALUE_INIT;
845       GstClockTime running_time;
846       gboolean all_headers, done;
847       guint count;
848
849       if (!gst_video_event_is_force_key_unit (event))
850         break;
851
852       forward = FALSE;
853
854       gst_video_event_parse_upstream_force_key_unit (event,
855           &running_time, &all_headers, &count);
856
857       GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
858           "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
859           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
860           all_headers, count);
861
862       if (!all_headers)
863         break;
864
865       mux->pending_key_unit_ts = running_time;
866       gst_event_replace (&mux->force_key_unit_event, event);
867
868       iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
869       done = FALSE;
870       while (!done) {
871         gboolean res = FALSE, tmp;
872         iter_ret = gst_iterator_next (iter, &sinkpad_value);
873         sinkpad = g_value_get_object (&sinkpad_value);
874
875         switch (iter_ret) {
876           case GST_ITERATOR_DONE:
877             done = TRUE;
878             break;
879           case GST_ITERATOR_OK:
880             GST_INFO_OBJECT (mux, "forwarding to %s",
881                 gst_pad_get_name (sinkpad));
882             tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
883             GST_INFO_OBJECT (mux, "result %d", tmp);
884             /* succeed if at least one pad succeeds */
885             res |= tmp;
886             break;
887           case GST_ITERATOR_ERROR:
888             done = TRUE;
889             break;
890           case GST_ITERATOR_RESYNC:
891             break;
892         }
893         g_value_reset (&sinkpad_value);
894       }
895       g_value_unset (&sinkpad_value);
896       gst_iterator_free (iter);
897       break;
898     }
899     default:
900       break;
901   }
902
903   if (forward)
904     res = gst_pad_event_default (pad, parent, event);
905   else
906     gst_event_unref (event);
907
908   gst_object_unref (mux);
909   return res;
910 }
911
912 static GstEvent *
913 check_pending_key_unit_event (GstEvent * pending_event, GstSegment * segment,
914     GstClockTime timestamp, guint flags, GstClockTime pending_key_unit_ts)
915 {
916   GstClockTime running_time, stream_time;
917   gboolean all_headers;
918   guint count;
919   GstEvent *event = NULL;
920
921   g_return_val_if_fail (pending_event != NULL, NULL);
922   g_return_val_if_fail (segment != NULL, NULL);
923
924   if (pending_event == NULL)
925     goto out;
926
927   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
928       timestamp == GST_CLOCK_TIME_NONE)
929     goto out;
930
931   running_time = gst_segment_to_running_time (segment,
932       GST_FORMAT_TIME, timestamp);
933
934   GST_INFO ("now %" GST_TIME_FORMAT " wanted %" GST_TIME_FORMAT,
935       GST_TIME_ARGS (running_time), GST_TIME_ARGS (pending_key_unit_ts));
936   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
937       running_time < pending_key_unit_ts)
938     goto out;
939
940   if (flags & GST_BUFFER_FLAG_DELTA_UNIT) {
941     GST_INFO ("pending force key unit, waiting for keyframe");
942     goto out;
943   }
944
945   stream_time = gst_segment_to_stream_time (segment,
946       GST_FORMAT_TIME, timestamp);
947
948   gst_video_event_parse_upstream_force_key_unit (pending_event,
949       NULL, &all_headers, &count);
950
951   event =
952       gst_video_event_new_downstream_force_key_unit (timestamp, stream_time,
953       running_time, all_headers, count);
954   gst_event_set_seqnum (event, gst_event_get_seqnum (pending_event));
955
956 out:
957   return event;
958 }
959
960 GstFlowReturn
961 mpegtsmux_clip_inc_running_time (GstCollectPads * pads,
962     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
963     gpointer user_data)
964 {
965   MpegTsPadData *pad_data = (MpegTsPadData *) cdata;
966   GstClockTime time;
967
968   *outbuf = buf;
969
970   /* PTS */
971   time = GST_BUFFER_TIMESTAMP (buf);
972
973   /* invalid left alone and passed */
974   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
975     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
976     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
977       GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
978       gst_buffer_unref (buf);
979       *outbuf = NULL;
980     } else {
981       GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
982           GST_TIME_FORMAT " running time",
983           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
984       pad_data->last_pts = time;
985       buf = *outbuf = gst_buffer_make_writable (buf);
986       GST_BUFFER_TIMESTAMP (*outbuf) = time;
987     }
988   }
989
990   /* DTS */
991   time = GST_BUFFER_DTS (buf);
992
993   /* invalid left alone and passed */
994   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
995     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
996     /* may have to decode out-of-segment, so pass INVALID */
997     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
998       GST_DEBUG_OBJECT (cdata->pad, "running dts outside segment");
999     } else {
1000       GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
1001           GST_TIME_FORMAT " running time",
1002           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
1003       if (GST_CLOCK_TIME_IS_VALID (pad_data->last_dts) &&
1004           time < pad_data->last_dts) {
1005         /* FIXME DTS/PTS mess again;
1006          * probably needs a whole lot more subtle handling (cf qtmux) */
1007         GST_WARNING_OBJECT (cdata->pad, "ignoring DTS going backward");
1008         time = pad_data->last_dts;
1009       } else {
1010         pad_data->last_dts = time;
1011       }
1012       buf = *outbuf = gst_buffer_make_writable (buf);
1013       GST_BUFFER_DTS (*outbuf) = time;
1014     }
1015   }
1016
1017   buf = *outbuf;
1018   if (pad_data->prepare_func) {
1019     MpegTsMux *mux = (MpegTsMux *) user_data;
1020
1021     buf = pad_data->prepare_func (buf, pad_data, mux);
1022     if (buf)
1023       gst_buffer_replace (outbuf, buf);
1024   }
1025
1026   return GST_FLOW_OK;
1027 }
1028
1029 static GstFlowReturn
1030 mpegtsmux_collected_buffer (GstCollectPads * pads, GstCollectData * data,
1031     GstBuffer * buf, MpegTsMux * mux)
1032 {
1033   GstFlowReturn ret = GST_FLOW_OK;
1034   MpegTsPadData *best = (MpegTsPadData *) data;
1035   TsMuxProgram *prog;
1036   gint64 pts = -1;
1037   guint64 dts = -1;
1038   gboolean delta = TRUE;
1039   StreamData *stream_data;
1040
1041   GST_DEBUG_OBJECT (mux, "Pads collected");
1042
1043   if (G_UNLIKELY (mux->first)) {
1044     ret = mpegtsmux_create_streams (mux);
1045     if (G_UNLIKELY (ret != GST_FLOW_OK))
1046       return ret;
1047
1048     mpegtsdemux_prepare_srcpad (mux);
1049
1050     mux->first = FALSE;
1051   }
1052
1053   if (G_UNLIKELY (best == NULL)) {
1054     /* EOS */
1055     /* drain some possibly cached data */
1056     new_packet_m2ts (mux, NULL, -1);
1057     mpegtsmux_push_packets (mux, TRUE);
1058     gst_pad_push_event (mux->srcpad, gst_event_new_eos ());
1059
1060     return GST_FLOW_OK;
1061   }
1062
1063   prog = best->prog;
1064   if (prog == NULL)
1065     goto no_program;
1066
1067   g_assert (buf != NULL);
1068
1069   if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
1070     GstEvent *event;
1071
1072     event = check_pending_key_unit_event (mux->force_key_unit_event,
1073         &best->collect.segment, GST_BUFFER_TIMESTAMP (buf),
1074         GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
1075     if (event) {
1076       GstClockTime running_time;
1077       guint count;
1078       GList *cur;
1079
1080       mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
1081       gst_event_replace (&mux->force_key_unit_event, NULL);
1082
1083       gst_video_event_parse_downstream_force_key_unit (event,
1084           NULL, NULL, &running_time, NULL, &count);
1085
1086       GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
1087           "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
1088           GST_TIME_ARGS (running_time), count);
1089       gst_pad_push_event (mux->srcpad, event);
1090
1091       /* output PAT */
1092       mux->tsmux->last_pat_ts = -1;
1093
1094       /* output PMT for each program */
1095       for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1096         TsMuxProgram *program = (TsMuxProgram *) cur->data;
1097
1098         program->last_pmt_ts = -1;
1099       }
1100       tsmux_program_set_pcr_stream (prog, NULL);
1101     }
1102   }
1103
1104   if (G_UNLIKELY (prog->pcr_stream == NULL)) {
1105     /* Take the first data stream for the PCR */
1106     GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
1107         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
1108         MPEG_TS_PAD_DATA (best)->pid, MPEG_TS_PAD_DATA (best)->prog_id);
1109
1110     /* Set the chosen PCR stream */
1111     tsmux_program_set_pcr_stream (prog, best->stream);
1112   }
1113
1114   GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
1115       "Chose stream for output (PID: 0x%04x)", best->pid);
1116
1117   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf)) &&
1118       GST_CLOCK_TIME_IS_VALID (best->last_pts)) {
1119     pts = GSTTIME_TO_MPEGTIME (best->last_pts);
1120     GST_DEBUG_OBJECT (mux, "Buffer has PTS %" GST_TIME_FORMAT " pts %"
1121         G_GINT64_FORMAT, GST_TIME_ARGS (best->last_pts), pts);
1122   }
1123
1124   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) &&
1125       GST_CLOCK_TIME_IS_VALID (best->last_dts)) {
1126     dts = GSTTIME_TO_MPEGTIME (best->last_dts);
1127     GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_TIME_FORMAT " dts %"
1128         G_GINT64_FORMAT, GST_TIME_ARGS (best->last_dts), dts);
1129   }
1130
1131   /* should not have a DTS without PTS */
1132   if (pts == -1 && dts != -1) {
1133     GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
1134     pts = dts;
1135   }
1136
1137   if (best->stream->is_video_stream) {
1138     delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1139 #if 0
1140     GST_OBJECT_LOCK (mux);
1141     if (mux->element_index && !delta && best->element_index_writer_id != -1) {
1142       gst_index_add_association (mux->element_index,
1143           best->element_index_writer_id,
1144           GST_ASSOCIATION_FLAG_KEY_UNIT, spn_format, mux->spn_count,
1145           pts_format, pts, NULL);
1146     }
1147     GST_OBJECT_UNLOCK (mux);
1148 #endif
1149   }
1150   GST_DEBUG_OBJECT (mux, "delta: %d", delta);
1151
1152   stream_data = stream_data_new (buf);
1153   tsmux_stream_add_data (best->stream, stream_data->map_info.data,
1154       stream_data->map_info.size, stream_data, pts, dts, !delta);
1155
1156   /* outgoing ts follows ts of PCR program stream */
1157   if (prog->pcr_stream == best->stream) {
1158     /* prefer DTS if present for PCR as it should be monotone */
1159     mux->last_ts =
1160         GST_CLOCK_TIME_IS_VALID (best->last_dts) ? best->last_dts : best->
1161         last_pts;
1162   }
1163
1164   mux->is_delta = delta;
1165   while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
1166     if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
1167       /* Failed writing data for some reason. Set appropriate error */
1168       GST_DEBUG_OBJECT (mux, "Failed to write data packet");
1169       GST_ELEMENT_ERROR (mux, STREAM, MUX,
1170           ("Failed writing output data to stream %04x", best->stream->id),
1171           (NULL));
1172       goto write_fail;
1173     }
1174   }
1175   /* flush packet cache */
1176   return mpegtsmux_push_packets (mux, FALSE);
1177
1178   /* ERRORS */
1179 write_fail:
1180   {
1181     return mux->last_flow_ret;
1182   }
1183 no_program:
1184   {
1185     GST_ELEMENT_ERROR (mux, STREAM, MUX,
1186         ("Stream on pad %" GST_PTR_FORMAT
1187             " is not associated with any program", COLLECT_DATA_PAD (best)),
1188         (NULL));
1189     return GST_FLOW_ERROR;
1190   }
1191 }
1192
1193 static GstPad *
1194 mpegtsmux_request_new_pad (GstElement * element, GstPadTemplate * templ,
1195     const gchar * name, const GstCaps * caps)
1196 {
1197   MpegTsMux *mux = GST_MPEG_TSMUX (element);
1198   gint pid = -1;
1199   gchar *pad_name = NULL;
1200   GstPad *pad = NULL;
1201   MpegTsPadData *pad_data = NULL;
1202
1203   if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
1204     if (tsmux_find_stream (mux->tsmux, pid))
1205       goto stream_exists;
1206   } else {
1207     pid = tsmux_get_new_pid (mux->tsmux);
1208   }
1209
1210   pad_name = g_strdup_printf ("sink_%d", pid);
1211   pad = gst_pad_new_from_template (templ, pad_name);
1212   g_free (pad_name);
1213
1214   pad_data = (MpegTsPadData *)
1215       gst_collect_pads_add_pad (mux->collect, pad, sizeof (MpegTsPadData),
1216       (GstCollectDataDestroyNotify) (mpegtsmux_pad_reset), TRUE);
1217   if (pad_data == NULL)
1218     goto pad_failure;
1219
1220   mpegtsmux_pad_reset (pad_data);
1221   pad_data->pid = pid;
1222
1223   if (G_UNLIKELY (!gst_element_add_pad (element, pad)))
1224     goto could_not_add;
1225
1226   return pad;
1227
1228   /* ERRORS */
1229 stream_exists:
1230   {
1231     GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"),
1232         (NULL));
1233     return NULL;
1234   }
1235 could_not_add:
1236   {
1237     GST_ELEMENT_ERROR (element, STREAM, FAILED,
1238         ("Internal data stream error."), ("Could not add pad to element"));
1239     gst_collect_pads_remove_pad (mux->collect, pad);
1240     gst_object_unref (pad);
1241     return NULL;
1242   }
1243 pad_failure:
1244   {
1245     GST_ELEMENT_ERROR (element, STREAM, FAILED,
1246         ("Internal data stream error."), ("Could not add pad to collectpads"));
1247     gst_object_unref (pad);
1248     return NULL;
1249   }
1250 }
1251
1252 static void
1253 mpegtsmux_release_pad (GstElement * element, GstPad * pad)
1254 {
1255   MpegTsMux *mux = GST_MPEG_TSMUX (element);
1256
1257   GST_DEBUG_OBJECT (mux, "Pad %" GST_PTR_FORMAT " being released", pad);
1258
1259   if (mux->collect) {
1260     gst_collect_pads_remove_pad (mux->collect, pad);
1261   }
1262
1263   /* chain up */
1264   gst_element_remove_pad (element, pad);
1265 }
1266
1267 static void
1268 new_packet_common_init (MpegTsMux * mux, GstBuffer * buf, guint8 * data,
1269     guint len)
1270 {
1271   /* Packets should be at least 188 bytes, but check anyway */
1272   g_return_if_fail (len >= 2 || !data);
1273
1274   if (!mux->streamheader_sent && data) {
1275     guint pid = ((data[1] & 0x1f) << 8) | data[2];
1276     /* if it's a PAT or a PMT */
1277     if (pid == 0x00 || (pid >= TSMUX_START_PMT_PID && pid < TSMUX_START_ES_PID)) {
1278       GstBuffer *hbuf;
1279
1280       if (!buf) {
1281         hbuf = gst_buffer_new_and_alloc (len);
1282         gst_buffer_fill (hbuf, 0, data, len);
1283       } else {
1284         hbuf = gst_buffer_copy (buf);
1285       }
1286       mux->streamheader = g_list_append (mux->streamheader, hbuf);
1287     } else if (mux->streamheader) {
1288       mpegtsdemux_set_header_on_caps (mux);
1289       mux->streamheader_sent = TRUE;
1290     }
1291   }
1292
1293   if (buf) {
1294     if (mux->is_delta) {
1295       GST_LOG_OBJECT (mux, "marking as delta unit");
1296       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1297     } else {
1298       GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
1299       mux->is_delta = TRUE;
1300     }
1301   }
1302 }
1303
1304 static GstFlowReturn
1305 mpegtsmux_push_packets (MpegTsMux * mux, gboolean force)
1306 {
1307   gint align = mux->alignment;
1308   gint av, packet_size;
1309   GstBuffer *buf;
1310   GstFlowReturn ret = GST_FLOW_OK;
1311   GstClockTime ts;
1312
1313   if (mux->m2ts_mode) {
1314     packet_size = M2TS_PACKET_LENGTH;
1315     if (align < 0)
1316       align = 32;
1317   } else {
1318     packet_size = NORMAL_TS_PACKET_LENGTH;
1319     if (align < 0)
1320       align = 0;
1321   }
1322
1323   av = gst_adapter_available (mux->out_adapter);
1324   GST_LOG_OBJECT (mux, "align %d, av %d", align, av);
1325
1326   if (!align)
1327     align = av;
1328   else
1329     align *= packet_size;
1330
1331   /* FIXME: what about DTS here? */
1332   GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
1333   if (G_LIKELY ((align <= av) && av)) {
1334     GST_LOG_OBJECT (mux, "pushing %d aligned bytes", av - (av % align));
1335     ts = gst_adapter_prev_pts (mux->out_adapter, NULL);
1336     buf = gst_adapter_take_buffer (mux->out_adapter, av - (av % align));
1337     g_assert (buf);
1338     GST_BUFFER_PTS (buf) = ts;
1339
1340     ret = gst_pad_push (mux->srcpad, buf);
1341     av = av % align;
1342   }
1343
1344   if (av && force) {
1345     guint8 *data;
1346     guint32 header;
1347     gint dummy;
1348     GstMapInfo map;
1349
1350     GST_LOG_OBJECT (mux, "handling %d leftover bytes", av);
1351     buf = gst_buffer_new_and_alloc (align);
1352     gst_buffer_map (buf, &map, GST_MAP_READ);
1353     data = map.data;
1354     ts = gst_adapter_prev_pts (mux->out_adapter, NULL);
1355
1356     gst_adapter_copy (mux->out_adapter, data, 0, av);
1357     gst_adapter_clear (mux->out_adapter);
1358     GST_BUFFER_PTS (buf) = ts;
1359
1360     data += av;
1361     header = GST_READ_UINT32_BE (data - packet_size);
1362
1363     dummy = (map.size - av) / packet_size;
1364     GST_LOG_OBJECT (mux, "adding %d null packets", dummy);
1365
1366     for (; dummy > 0; dummy--) {
1367       gint offset;
1368
1369       if (packet_size > NORMAL_TS_PACKET_LENGTH) {
1370         GST_WRITE_UINT32_BE (data, header);
1371         /* simply increase header a bit and never mind too much */
1372         header++;
1373         offset = 4;
1374       } else {
1375         offset = 0;
1376       }
1377       GST_WRITE_UINT8 (data + offset, TSMUX_SYNC_BYTE);
1378       /* null packet PID */
1379       GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
1380       /* no adaptation field exists | continuity counter undefined */
1381       GST_WRITE_UINT8 (data + offset + 3, 0x10);
1382       /* payload */
1383       memset (data + offset + 4, 0, NORMAL_TS_PACKET_LENGTH - 4);
1384       data += packet_size;
1385     }
1386
1387     gst_buffer_unmap (buf, &map);
1388
1389     ret = gst_pad_push (mux->srcpad, buf);
1390   }
1391
1392   return ret;
1393 }
1394
1395 static GstFlowReturn
1396 mpegtsmux_collect_packet (MpegTsMux * mux, GstBuffer * buf)
1397 {
1398   GST_LOG_OBJECT (mux, "collecting packet size %" G_GSIZE_FORMAT,
1399       gst_buffer_get_size (buf));
1400   gst_adapter_push (mux->out_adapter, buf);
1401
1402   return GST_FLOW_OK;
1403 }
1404
1405 static gboolean
1406 new_packet_m2ts (MpegTsMux * mux, GstBuffer * buf, gint64 new_pcr)
1407 {
1408   GstBuffer *out_buf;
1409   int chunk_bytes;
1410   GstMapInfo map;
1411
1412   GST_LOG_OBJECT (mux, "Have buffer %p with new_pcr=%" G_GINT64_FORMAT,
1413       buf, new_pcr);
1414
1415   chunk_bytes = gst_adapter_available (mux->adapter);
1416
1417   if (G_LIKELY (buf)) {
1418     if (new_pcr < 0) {
1419       /* If there is no pcr in current ts packet then just add the packet
1420          to the adapter for later output when we see a PCR */
1421       GST_LOG_OBJECT (mux, "Accumulating non-PCR packet");
1422       gst_adapter_push (mux->adapter, buf);
1423       goto exit;
1424     }
1425
1426     /* no first interpolation point yet, then this is the one,
1427      * otherwise it is the second interpolation point */
1428     if (mux->previous_pcr < 0 && chunk_bytes) {
1429       mux->previous_pcr = new_pcr;
1430       mux->previous_offset = chunk_bytes;
1431       GST_LOG_OBJECT (mux, "Accumulating non-PCR packet");
1432       gst_adapter_push (mux->adapter, buf);
1433       goto exit;
1434     }
1435   } else {
1436     g_assert (new_pcr == -1);
1437   }
1438
1439   /* interpolate if needed, and 2 points available */
1440   if (chunk_bytes && (new_pcr != mux->previous_pcr)) {
1441     gint64 offset = 0;
1442
1443     GST_LOG_OBJECT (mux, "Processing pending packets; "
1444         "previous pcr %" G_GINT64_FORMAT ", previous offset %d, "
1445         "current pcr %" G_GINT64_FORMAT ", current offset %d",
1446         mux->previous_pcr, (gint) mux->previous_offset,
1447         new_pcr, (gint) chunk_bytes);
1448
1449     g_assert (chunk_bytes > mux->previous_offset);
1450     /* if draining, use previous rate */
1451     if (G_LIKELY (new_pcr > 0)) {
1452       mux->pcr_rate_num = new_pcr - mux->previous_pcr;
1453       mux->pcr_rate_den = chunk_bytes - mux->previous_offset;
1454     }
1455
1456     while (offset < chunk_bytes) {
1457       guint64 cur_pcr, ts;
1458
1459       /* Loop, pulling packets of the adapter, updating their 4 byte
1460        * timestamp header and pushing */
1461
1462       /* interpolate PCR */
1463       if (G_LIKELY (offset >= mux->previous_offset))
1464         cur_pcr = mux->previous_pcr +
1465             gst_util_uint64_scale (offset - mux->previous_offset,
1466             mux->pcr_rate_num, mux->pcr_rate_den);
1467       else
1468         cur_pcr = mux->previous_pcr -
1469             gst_util_uint64_scale (mux->previous_offset - offset,
1470             mux->pcr_rate_num, mux->pcr_rate_den);
1471
1472       /* FIXME: what about DTS here? */
1473       ts = gst_adapter_prev_pts (mux->adapter, NULL);
1474       out_buf = gst_adapter_take_buffer (mux->adapter, M2TS_PACKET_LENGTH);
1475       g_assert (out_buf);
1476       offset += M2TS_PACKET_LENGTH;
1477
1478       GST_BUFFER_PTS (out_buf) = ts;
1479
1480       gst_buffer_map (out_buf, &map, GST_MAP_WRITE);
1481
1482       /* The header is the bottom 30 bits of the PCR, apparently not
1483        * encoded into base + ext as in the packets themselves */
1484       GST_WRITE_UINT32_BE (map.data, cur_pcr & 0x3FFFFFFF);
1485       gst_buffer_unmap (out_buf, &map);
1486
1487       GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %"
1488           G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, cur_pcr);
1489       mpegtsmux_collect_packet (mux, out_buf);
1490     }
1491   }
1492
1493   if (G_UNLIKELY (!buf))
1494     goto exit;
1495
1496   gst_buffer_map (buf, &map, GST_MAP_WRITE);
1497
1498   /* Finally, output the passed in packet */
1499   /* Only write the bottom 30 bits of the PCR */
1500   GST_WRITE_UINT32_BE (map.data, new_pcr & 0x3FFFFFFF);
1501
1502   gst_buffer_unmap (buf, &map);
1503
1504   GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %"
1505       G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, new_pcr);
1506   mpegtsmux_collect_packet (mux, buf);
1507
1508   if (new_pcr != mux->previous_pcr) {
1509     mux->previous_pcr = new_pcr;
1510     mux->previous_offset = -M2TS_PACKET_LENGTH;
1511   }
1512
1513 exit:
1514   return TRUE;
1515 }
1516
1517 /* Called when the TsMux has prepared a packet for output. Return FALSE
1518  * on error */
1519 static gboolean
1520 new_packet_cb (GstBuffer * buf, void *user_data, gint64 new_pcr)
1521 {
1522   MpegTsMux *mux = (MpegTsMux *) user_data;
1523   gint offset = 0;
1524   GstMapInfo map;
1525
1526 #if 0
1527   GST_LOG_OBJECT (mux, "handling packet %d", mux->spn_count);
1528   mux->spn_count++;
1529 #endif
1530
1531   if (mux->m2ts_mode) {
1532     offset = 4;
1533     gst_buffer_set_size (buf, NORMAL_TS_PACKET_LENGTH + offset);
1534   }
1535
1536   gst_buffer_map (buf, &map, GST_MAP_READWRITE);
1537
1538   if (offset) {
1539     /* there should be a better way to do this */
1540     memmove (map.data + offset, map.data, map.size - offset);
1541   }
1542
1543   GST_BUFFER_PTS (buf) = mux->last_ts;
1544   /* do common init (flags and streamheaders) */
1545   new_packet_common_init (mux, buf, map.data + offset, map.size);
1546
1547   gst_buffer_unmap (buf, &map);
1548
1549   /* all is meant for downstream, including any prefix */
1550   if (offset)
1551     return new_packet_m2ts (mux, buf, new_pcr);
1552   else
1553     mpegtsmux_collect_packet (mux, buf);
1554
1555   return TRUE;
1556 }
1557
1558 /* called when TsMux needs new packet to write into */
1559 static void
1560 alloc_packet_cb (GstBuffer ** _buf, void *user_data)
1561 {
1562   MpegTsMux *mux = (MpegTsMux *) user_data;
1563   GstBuffer *buf;
1564   gint offset = 0;
1565
1566   if (mux->m2ts_mode == TRUE)
1567     offset = 4;
1568
1569   buf = gst_buffer_new_and_alloc (NORMAL_TS_PACKET_LENGTH + offset);
1570   gst_buffer_set_size (buf, NORMAL_TS_PACKET_LENGTH);
1571
1572   *_buf = buf;
1573 }
1574
1575 static void
1576 mpegtsdemux_set_header_on_caps (MpegTsMux * mux)
1577 {
1578   GstBuffer *buf;
1579   GstStructure *structure;
1580   GValue array = { 0 };
1581   GValue value = { 0 };
1582   GstCaps *caps;
1583   GList *sh;
1584
1585   caps = gst_caps_make_writable (gst_pad_get_current_caps (mux->srcpad));
1586   structure = gst_caps_get_structure (caps, 0);
1587
1588   g_value_init (&array, GST_TYPE_ARRAY);
1589
1590   sh = mux->streamheader;
1591   while (sh) {
1592     buf = sh->data;
1593     g_value_init (&value, GST_TYPE_BUFFER);
1594     gst_value_take_buffer (&value, buf);
1595     gst_value_array_append_value (&array, &value);
1596     g_value_unset (&value);
1597     sh = g_list_next (sh);
1598   }
1599
1600   g_list_free (mux->streamheader);
1601   mux->streamheader = NULL;
1602
1603   gst_structure_set_value (structure, "streamheader", &array);
1604   gst_pad_set_caps (mux->srcpad, caps);
1605   g_value_unset (&array);
1606   gst_caps_unref (caps);
1607 }
1608
1609 static void
1610 mpegtsdemux_prepare_srcpad (MpegTsMux * mux)
1611 {
1612   GstSegment seg;
1613   /* we are not going to seek */
1614   GstEvent *new_seg;
1615   GstCaps *caps = gst_caps_new_simple ("video/mpegts",
1616       "systemstream", G_TYPE_BOOLEAN, TRUE,
1617       "packetsize", G_TYPE_INT,
1618       (mux->m2ts_mode ? M2TS_PACKET_LENGTH : NORMAL_TS_PACKET_LENGTH),
1619       NULL);
1620
1621   gst_segment_init (&seg, GST_FORMAT_TIME);
1622   new_seg = gst_event_new_segment (&seg);
1623
1624   /* Set caps on src pad from our template and push new segment */
1625   gst_pad_set_caps (mux->srcpad, caps);
1626   gst_caps_unref (caps);
1627
1628   if (!gst_pad_push_event (mux->srcpad, new_seg)) {
1629     GST_WARNING_OBJECT (mux, "New segment event was not handled downstream");
1630   }
1631 }
1632
1633 static GstStateChangeReturn
1634 mpegtsmux_change_state (GstElement * element, GstStateChange transition)
1635 {
1636   MpegTsMux *mux = GST_MPEG_TSMUX (element);
1637   GstStateChangeReturn ret;
1638
1639   switch (transition) {
1640     case GST_STATE_CHANGE_NULL_TO_READY:
1641       break;
1642     case GST_STATE_CHANGE_READY_TO_PAUSED:
1643       gst_collect_pads_start (mux->collect);
1644       break;
1645     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1646       break;
1647     case GST_STATE_CHANGE_PAUSED_TO_READY:
1648       gst_collect_pads_stop (mux->collect);
1649       break;
1650     case GST_STATE_CHANGE_READY_TO_NULL:
1651       break;
1652     default:
1653       break;
1654   }
1655
1656   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1657
1658   switch (transition) {
1659     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1660       break;
1661     case GST_STATE_CHANGE_PAUSED_TO_READY:
1662       mpegtsmux_reset (mux, TRUE);
1663       break;
1664     case GST_STATE_CHANGE_READY_TO_NULL:
1665       break;
1666     default:
1667       break;
1668   }
1669
1670   return ret;
1671 }
1672
1673 static gboolean
1674 plugin_init (GstPlugin * plugin)
1675 {
1676   if (!gst_element_register (plugin, "mpegtsmux", GST_RANK_PRIMARY,
1677           mpegtsmux_get_type ()))
1678     return FALSE;
1679
1680   GST_DEBUG_CATEGORY_INIT (mpegtsmux_debug, "mpegtsmux", 0,
1681       "MPEG Transport Stream muxer");
1682
1683   return TRUE;
1684 }
1685
1686 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR,
1687     mpegtsmux, "MPEG-TS muxer",
1688     plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);