gst: remove unnecessary GLIB_DISABLE_DEPRECATION_WARNINGS
[platform/upstream/gstreamer.git] / gst / mpegtsmux / mpegtsmux.c
1 /* 
2  * Copyright 2006, 2007, 2008, 2009, 2010 Fluendo S.A. 
3  *  Authors: Jan Schmidt <jan@fluendo.com>
4  *           Kapil Agrawal <kapil@fluendo.com>
5  *           Julien Moutte <julien@fluendo.com>
6  *
7  * Copyright (C) 2011 Jan Schmidt <thaytan@noraisin.net>
8  *
9  * This library is licensed under 4 different licenses and you
10  * can choose to use it under the terms of any one of them. The
11  * four licenses are the MPL 1.1, the LGPL, the GPL and the MIT
12  * license.
13  *
14  * MPL:
15  * 
16  * The contents of this file are subject to the Mozilla Public License
17  * Version 1.1 (the "License"); you may not use this file except in
18  * compliance with the License. You may obtain a copy of the License at
19  * http://www.mozilla.org/MPL/.
20  *
21  * Software distributed under the License is distributed on an "AS IS"
22  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
23  * License for the specific language governing rights and limitations
24  * under the License.
25  *
26  * LGPL:
27  *
28  * This library is free software; you can redistribute it and/or
29  * modify it under the terms of the GNU Library General Public
30  * License as published by the Free Software Foundation; either
31  * version 2 of the License, or (at your option) any later version.
32  *
33  * This library is distributed in the hope that it will be useful,
34  * but WITHOUT ANY WARRANTY; without even the implied warranty of
35  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
36  * Library General Public License for more details.
37  *
38  * You should have received a copy of the GNU Library General Public
39  * License along with this library; if not, write to the
40  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
41  * Boston, MA 02110-1301, USA.
42  *
43  * GPL:
44  *
45  * This program is free software; you can redistribute it and/or modify
46  * it under the terms of the GNU General Public License as published by
47  * the Free Software Foundation; either version 2 of the License, or
48  * (at your option) any later version.
49  *
50  * This program is distributed in the hope that it will be useful,
51  * but WITHOUT ANY WARRANTY; without even the implied warranty of
52  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
53  * GNU General Public License for more details.
54  *
55  * You should have received a copy of the GNU General Public License
56  * along with this program; if not, write to the Free Software
57  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
58  *
59  * MIT:
60  *
61  * Unless otherwise indicated, Source Code is licensed under MIT license.
62  * See further explanation attached in License Statement (distributed in the file
63  * LICENSE).
64  * 
65  * Permission is hereby granted, free of charge, to any person obtaining a copy of
66  * this software and associated documentation files (the "Software"), to deal in
67  * the Software without restriction, including without limitation the rights to
68  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
69  * of the Software, and to permit persons to whom the Software is furnished to do
70  * so, subject to the following conditions:
71  * 
72  * The above copyright notice and this permission notice shall be included in all
73  * copies or substantial portions of the Software.
74  * 
75  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
76  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
77  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
78  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
79  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
80  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
81  * SOFTWARE.
82  *
83  */
84
85 #ifdef HAVE_CONFIG_H
86 #include "config.h"
87 #endif
88 #include <stdio.h>
89 #include <string.h>
90
91 #include <gst/tag/tag.h>
92 #include <gst/video/video.h>
93 #include <gst/mpegts/mpegts.h>
94
95 #include "mpegtsmux.h"
96
97 #include "mpegtsmux_aac.h"
98 #include "mpegtsmux_ttxt.h"
99
100 GST_DEBUG_CATEGORY (mpegtsmux_debug);
101 #define GST_CAT_DEFAULT mpegtsmux_debug
102
103 enum
104 {
105   ARG_0,
106   ARG_PROG_MAP,
107   ARG_M2TS_MODE,
108   ARG_PAT_INTERVAL,
109   ARG_PMT_INTERVAL,
110   ARG_ALIGNMENT,
111   ARG_SI_INTERVAL
112 };
113
114 #define MPEGTSMUX_DEFAULT_ALIGNMENT    -1
115 #define MPEGTSMUX_DEFAULT_M2TS         FALSE
116
117 static GstStaticPadTemplate mpegtsmux_sink_factory =
118     GST_STATIC_PAD_TEMPLATE ("sink_%d",
119     GST_PAD_SINK,
120     GST_PAD_REQUEST,
121     GST_STATIC_CAPS ("video/mpeg, "
122         "parsed = (boolean) TRUE, "
123         "mpegversion = (int) { 1, 2, 4 }, "
124         "systemstream = (boolean) false; "
125         "video/x-dirac;"
126         "video/x-h264,stream-format=(string)byte-stream,"
127         "alignment=(string){au, nal}; "
128         "audio/mpeg, "
129         "parsed = (boolean) TRUE, "
130         "mpegversion = (int) { 1, 2 };"
131         "audio/mpeg, "
132         "framed = (boolean) TRUE, "
133         "mpegversion = (int) 4, stream-format = (string) { raw, adts };"
134         "audio/x-lpcm, "
135         "width = (int) { 16, 20, 24 }, "
136         "rate = (int) { 48000, 96000 }, "
137         "channels = (int) [ 1, 8 ], "
138         "dynamic_range = (int) [ 0, 255 ], "
139         "emphasis = (boolean) { FALSE, TRUE }, "
140         "mute = (boolean) { FALSE, TRUE }; "
141         "audio/x-ac3, framed = (boolean) TRUE;"
142         "audio/x-dts, framed = (boolean) TRUE;"
143         "subpicture/x-dvb;" "application/x-teletext"));
144
145 static GstStaticPadTemplate mpegtsmux_src_factory =
146 GST_STATIC_PAD_TEMPLATE ("src",
147     GST_PAD_SRC,
148     GST_PAD_ALWAYS,
149     GST_STATIC_CAPS ("video/mpegts, "
150         "systemstream = (boolean) true, " "packetsize = (int) { 188, 192} ")
151     );
152
153 static void gst_mpegtsmux_set_property (GObject * object, guint prop_id,
154     const GValue * value, GParamSpec * pspec);
155 static void gst_mpegtsmux_get_property (GObject * object, guint prop_id,
156     GValue * value, GParamSpec * pspec);
157
158 static void mpegtsmux_reset (MpegTsMux * mux, gboolean alloc);
159 static void mpegtsmux_dispose (GObject * object);
160 static void alloc_packet_cb (GstBuffer ** _buf, void *user_data);
161 static gboolean new_packet_cb (GstBuffer * buf, void *user_data,
162     gint64 new_pcr);
163 static void release_buffer_cb (guint8 * data, void *user_data);
164 static GstFlowReturn mpegtsmux_collect_packet (MpegTsMux * mux,
165     GstBuffer * buf);
166 static GstFlowReturn mpegtsmux_push_packets (MpegTsMux * mux, gboolean force);
167 static gboolean new_packet_m2ts (MpegTsMux * mux, GstBuffer * buf,
168     gint64 new_pcr);
169
170 static void mpegtsdemux_prepare_srcpad (MpegTsMux * mux);
171 GstFlowReturn mpegtsmux_clip_inc_running_time (GstCollectPads * pads,
172     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
173     gpointer user_data);
174 static GstFlowReturn mpegtsmux_collected_buffer (GstCollectPads * pads,
175     GstCollectData * data, GstBuffer * buf, MpegTsMux * mux);
176
177 static gboolean mpegtsmux_sink_event (GstCollectPads * pads,
178     GstCollectData * data, GstEvent * event, gpointer user_data);
179 static GstPad *mpegtsmux_request_new_pad (GstElement * element,
180     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
181 static void mpegtsmux_release_pad (GstElement * element, GstPad * pad);
182 static GstStateChangeReturn mpegtsmux_change_state (GstElement * element,
183     GstStateChange transition);
184 static gboolean mpegtsmux_send_event (GstElement * element, GstEvent * event);
185 static void mpegtsdemux_set_header_on_caps (MpegTsMux * mux);
186 static gboolean mpegtsmux_src_event (GstPad * pad, GstObject * parent,
187     GstEvent * event);
188
189 #if 0
190 static void mpegtsmux_set_index (GstElement * element, GstIndex * index);
191 static GstIndex *mpegtsmux_get_index (GstElement * element);
192
193 static GstFormat pts_format;
194 static GstFormat spn_format;
195 #endif
196
197 typedef struct
198 {
199   GstMapInfo map_info;
200   GstBuffer *buffer;
201 } StreamData;
202
203 G_DEFINE_TYPE (MpegTsMux, mpegtsmux, GST_TYPE_ELEMENT)
204
205 /* Takes over the ref on the buffer */
206      static StreamData *stream_data_new (GstBuffer * buffer)
207 {
208   StreamData *res = g_new (StreamData, 1);
209   res->buffer = buffer;
210   gst_buffer_map (buffer, &(res->map_info), GST_MAP_READ);
211
212   return res;
213 }
214
215 static void
216 stream_data_free (StreamData * data)
217 {
218   if (data) {
219     gst_buffer_unmap (data->buffer, &data->map_info);
220     gst_buffer_unref (data->buffer);
221     g_free (data);
222   }
223 }
224
225 #define parent_class mpegtsmux_parent_class
226
227 static void
228 mpegtsmux_class_init (MpegTsMuxClass * klass)
229 {
230   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
231   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
232
233   gst_element_class_add_pad_template (gstelement_class,
234       gst_static_pad_template_get (&mpegtsmux_sink_factory));
235   gst_element_class_add_pad_template (gstelement_class,
236       gst_static_pad_template_get (&mpegtsmux_src_factory));
237
238   gst_element_class_set_static_metadata (gstelement_class,
239       "MPEG Transport Stream Muxer", "Codec/Muxer",
240       "Multiplexes media streams into an MPEG Transport Stream",
241       "Fluendo <contact@fluendo.com>");
242
243   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_mpegtsmux_set_property);
244   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_mpegtsmux_get_property);
245   gobject_class->dispose = mpegtsmux_dispose;
246
247   gstelement_class->request_new_pad = mpegtsmux_request_new_pad;
248   gstelement_class->release_pad = mpegtsmux_release_pad;
249   gstelement_class->change_state = mpegtsmux_change_state;
250   gstelement_class->send_event = mpegtsmux_send_event;
251
252 #if 0
253   gstelement_class->set_index = GST_DEBUG_FUNCPTR (mpegtsmux_set_index);
254   gstelement_class->get_index = GST_DEBUG_FUNCPTR (mpegtsmux_get_index);
255 #endif
256
257   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PROG_MAP,
258       g_param_spec_boxed ("prog-map", "Program map",
259           "A GstStructure specifies the mapping from elementary streams to programs",
260           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261
262   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_M2TS_MODE,
263       g_param_spec_boolean ("m2ts-mode", "M2TS(192 bytes) Mode",
264           "Set to TRUE to output Blu-Ray disc format with 192 byte packets. "
265           "FALSE for standard TS format with 188 byte packets.",
266           MPEGTSMUX_DEFAULT_M2TS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267
268   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PAT_INTERVAL,
269       g_param_spec_uint ("pat-interval", "PAT interval",
270           "Set the interval (in ticks of the 90kHz clock) for writing out the PAT table",
271           1, G_MAXUINT, TSMUX_DEFAULT_PAT_INTERVAL,
272           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273
274   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PMT_INTERVAL,
275       g_param_spec_uint ("pmt-interval", "PMT interval",
276           "Set the interval (in ticks of the 90kHz clock) for writing out the PMT table",
277           1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL,
278           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
279
280   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ALIGNMENT,
281       g_param_spec_int ("alignment", "packet alignment",
282           "Number of packets per buffer (padded with dummy packets on EOS) "
283           "(-1 = auto, 0 = all available packets)",
284           -1, G_MAXINT, MPEGTSMUX_DEFAULT_ALIGNMENT,
285           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286
287   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SI_INTERVAL,
288       g_param_spec_uint ("si-interval", "SI interval",
289           "Set the interval (in ticks of the 90kHz clock) for writing out the Service"
290           "Information tables", 1, G_MAXUINT, TSMUX_DEFAULT_SI_INTERVAL,
291           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 }
293
294 static void
295 mpegtsmux_init (MpegTsMux * mux)
296 {
297   mux->srcpad =
298       gst_pad_new_from_static_template (&mpegtsmux_src_factory, "src");
299   gst_pad_use_fixed_caps (mux->srcpad);
300   gst_pad_set_event_function (mux->srcpad,
301       GST_DEBUG_FUNCPTR (mpegtsmux_src_event));
302   gst_element_add_pad (GST_ELEMENT (mux), mux->srcpad);
303
304   mux->collect = gst_collect_pads_new ();
305   gst_collect_pads_set_buffer_function (mux->collect,
306       (GstCollectPadsBufferFunction)
307       GST_DEBUG_FUNCPTR (mpegtsmux_collected_buffer), mux);
308
309   gst_collect_pads_set_event_function (mux->collect,
310       (GstCollectPadsEventFunction) GST_DEBUG_FUNCPTR (mpegtsmux_sink_event),
311       mux);
312   gst_collect_pads_set_clip_function (mux->collect, (GstCollectPadsClipFunction)
313       GST_DEBUG_FUNCPTR (mpegtsmux_clip_inc_running_time), mux);
314
315   mux->tsmux = tsmux_new ();
316   tsmux_set_write_func (mux->tsmux, new_packet_cb, mux);
317
318   mux->adapter = gst_adapter_new ();
319   mux->out_adapter = gst_adapter_new ();
320
321   /* properties */
322   mux->m2ts_mode = MPEGTSMUX_DEFAULT_M2TS;
323   mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL;
324   mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL;
325   mux->si_interval = TSMUX_DEFAULT_SI_INTERVAL;
326   mux->prog_map = NULL;
327   mux->alignment = MPEGTSMUX_DEFAULT_ALIGNMENT;
328
329   /* initial state */
330   mpegtsmux_reset (mux, TRUE);
331 }
332
333 static void
334 mpegtsmux_pad_reset (MpegTsPadData * pad_data)
335 {
336   pad_data->pid = 0;
337   pad_data->min_dts = GST_CLOCK_TIME_NONE;
338   pad_data->prog_id = -1;
339 #if 0
340   pad_data->prog_id = -1;
341   pad_data->element_index_writer_id = -1;
342 #endif
343
344   if (pad_data->free_func)
345     pad_data->free_func (pad_data->prepare_data);
346   pad_data->prepare_data = NULL;
347   pad_data->prepare_func = NULL;
348   pad_data->free_func = NULL;
349
350   if (pad_data->codec_data)
351     gst_buffer_replace (&pad_data->codec_data, NULL);
352
353   /* reference owned elsewhere */
354   pad_data->stream = NULL;
355   pad_data->prog = NULL;
356
357   if (pad_data->language) {
358     g_free (pad_data->language);
359     pad_data->language = NULL;
360   }
361
362 }
363
364 static void
365 mpegtsmux_reset (MpegTsMux * mux, gboolean alloc)
366 {
367   GSList *walk;
368
369   mux->first = TRUE;
370   mux->last_flow_ret = GST_FLOW_OK;
371   mux->previous_pcr = -1;
372   mux->pcr_rate_num = mux->pcr_rate_den = 1;
373   mux->last_ts = 0;
374   mux->is_delta = TRUE;
375
376   mux->streamheader_sent = FALSE;
377   mux->force_key_unit_event = NULL;
378   mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
379 #if 0
380   mux->spn_count = 0;
381
382   if (mux->element_index) {
383     gst_object_unref (mux->element_index);
384     mux->element_index = NULL;
385   }
386 #endif
387   if (mux->adapter)
388     gst_adapter_clear (mux->adapter);
389   if (mux->out_adapter)
390     gst_adapter_clear (mux->out_adapter);
391
392   if (mux->tsmux) {
393     tsmux_free (mux->tsmux);
394     mux->tsmux = NULL;
395   }
396
397   memset (mux->programs, 0, sizeof (mux->programs));
398
399   if (mux->streamheader) {
400     GstBuffer *buf;
401     GList *sh;
402
403     sh = mux->streamheader;
404     while (sh) {
405       buf = sh->data;
406       gst_buffer_unref (buf);
407       sh = g_list_next (sh);
408     }
409     g_list_free (mux->streamheader);
410     mux->streamheader = NULL;
411   }
412   gst_event_replace (&mux->force_key_unit_event, NULL);
413   gst_buffer_replace (&mux->out_buffer, NULL);
414
415   if (mux->collect) {
416     GST_COLLECT_PADS_STREAM_LOCK (mux->collect);
417     for (walk = mux->collect->data; walk != NULL; walk = g_slist_next (walk))
418       mpegtsmux_pad_reset ((MpegTsPadData *) walk->data);
419     GST_COLLECT_PADS_STREAM_UNLOCK (mux->collect);
420   }
421
422   if (alloc) {
423     mux->tsmux = tsmux_new ();
424     tsmux_set_write_func (mux->tsmux, new_packet_cb, mux);
425     tsmux_set_alloc_func (mux->tsmux, alloc_packet_cb, mux);
426   }
427 }
428
429 static void
430 mpegtsmux_dispose (GObject * object)
431 {
432   MpegTsMux *mux = GST_MPEG_TSMUX (object);
433
434   mpegtsmux_reset (mux, FALSE);
435
436   if (mux->adapter) {
437     g_object_unref (mux->adapter);
438     mux->adapter = NULL;
439   }
440   if (mux->out_adapter) {
441     g_object_unref (mux->out_adapter);
442     mux->out_adapter = NULL;
443   }
444   if (mux->collect) {
445     gst_object_unref (mux->collect);
446     mux->collect = NULL;
447   }
448   if (mux->prog_map) {
449     gst_structure_free (mux->prog_map);
450     mux->prog_map = NULL;
451   }
452   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
453 }
454
455 static void
456 gst_mpegtsmux_set_property (GObject * object, guint prop_id,
457     const GValue * value, GParamSpec * pspec)
458 {
459   MpegTsMux *mux = GST_MPEG_TSMUX (object);
460   GSList *walk;
461
462   switch (prop_id) {
463     case ARG_M2TS_MODE:
464       /*set incase if the output stream need to be of 192 bytes */
465       mux->m2ts_mode = g_value_get_boolean (value);
466       break;
467     case ARG_PROG_MAP:
468     {
469       const GstStructure *s = gst_value_get_structure (value);
470       if (mux->prog_map) {
471         gst_structure_free (mux->prog_map);
472       }
473       if (s)
474         mux->prog_map = gst_structure_copy (s);
475       else
476         mux->prog_map = NULL;
477       break;
478     }
479     case ARG_PAT_INTERVAL:
480       mux->pat_interval = g_value_get_uint (value);
481       if (mux->tsmux)
482         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
483       break;
484     case ARG_PMT_INTERVAL:
485       walk = mux->collect->data;
486       mux->pmt_interval = g_value_get_uint (value);
487
488       while (walk) {
489         MpegTsPadData *ts_data = (MpegTsPadData *) walk->data;
490
491         tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
492         walk = g_slist_next (walk);
493       }
494       break;
495     case ARG_ALIGNMENT:
496       mux->alignment = g_value_get_int (value);
497       break;
498     case ARG_SI_INTERVAL:
499       mux->si_interval = g_value_get_uint (value);
500       tsmux_set_si_interval (mux->tsmux, mux->si_interval);
501       break;
502     default:
503       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
504       break;
505   }
506 }
507
508 static void
509 gst_mpegtsmux_get_property (GObject * object, guint prop_id,
510     GValue * value, GParamSpec * pspec)
511 {
512   MpegTsMux *mux = GST_MPEG_TSMUX (object);
513
514   switch (prop_id) {
515     case ARG_M2TS_MODE:
516       g_value_set_boolean (value, mux->m2ts_mode);
517       break;
518     case ARG_PROG_MAP:
519       gst_value_set_structure (value, mux->prog_map);
520       break;
521     case ARG_PAT_INTERVAL:
522       g_value_set_uint (value, mux->pat_interval);
523       break;
524     case ARG_PMT_INTERVAL:
525       g_value_set_uint (value, mux->pmt_interval);
526       break;
527     case ARG_ALIGNMENT:
528       g_value_set_int (value, mux->alignment);
529       break;
530     case ARG_SI_INTERVAL:
531       g_value_set_uint (value, mux->si_interval);
532       break;
533     default:
534       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
535       break;
536   }
537 }
538
539 #if 0
540 static void
541 mpegtsmux_set_index (GstElement * element, GstIndex * index)
542 {
543   MpegTsMux *mux = GST_MPEG_TSMUX (element);
544
545   GST_OBJECT_LOCK (mux);
546   if (mux->element_index)
547     gst_object_unref (mux->element_index);
548   mux->element_index = index ? gst_object_ref (index) : NULL;
549   GST_OBJECT_UNLOCK (mux);
550
551   GST_DEBUG_OBJECT (mux, "Set index %" GST_PTR_FORMAT, mux->element_index);
552 }
553
554 static GstIndex *
555 mpegtsmux_get_index (GstElement * element)
556 {
557   GstIndex *result = NULL;
558   MpegTsMux *mux = GST_MPEG_TSMUX (element);
559
560   GST_OBJECT_LOCK (mux);
561   if (mux->element_index)
562     result = gst_object_ref (mux->element_index);
563   GST_OBJECT_UNLOCK (mux);
564
565   GST_DEBUG_OBJECT (mux, "Returning index %" GST_PTR_FORMAT, result);
566
567   return result;
568 }
569 #endif
570
571 static void
572 release_buffer_cb (guint8 * data, void *user_data)
573 {
574   stream_data_free (user_data);
575 }
576
577 static GstFlowReturn
578 mpegtsmux_create_stream (MpegTsMux * mux, MpegTsPadData * ts_data)
579 {
580   GstFlowReturn ret = GST_FLOW_ERROR;
581   GstCaps *caps;
582   GstStructure *s;
583   GstPad *pad;
584   TsMuxStreamType st = TSMUX_ST_RESERVED;
585   const gchar *mt;
586   const GValue *value = NULL;
587   GstBuffer *codec_data = NULL;
588
589   pad = ts_data->collect.pad;
590   caps = gst_pad_get_current_caps (pad);
591   if (caps == NULL)
592     goto not_negotiated;
593
594   GST_DEBUG_OBJECT (pad, "Creating stream with PID 0x%04x for caps %"
595       GST_PTR_FORMAT, ts_data->pid, caps);
596
597   s = gst_caps_get_structure (caps, 0);
598   g_return_val_if_fail (s != NULL, FALSE);
599
600   mt = gst_structure_get_name (s);
601   value = gst_structure_get_value (s, "codec_data");
602   if (value != NULL)
603     codec_data = gst_value_get_buffer (value);
604
605   if (strcmp (mt, "video/x-dirac") == 0) {
606     st = TSMUX_ST_VIDEO_DIRAC;
607   } else if (strcmp (mt, "audio/x-ac3") == 0) {
608     st = TSMUX_ST_PS_AUDIO_AC3;
609   } else if (strcmp (mt, "audio/x-dts") == 0) {
610     st = TSMUX_ST_PS_AUDIO_DTS;
611   } else if (strcmp (mt, "audio/x-lpcm") == 0) {
612     st = TSMUX_ST_PS_AUDIO_LPCM;
613   } else if (strcmp (mt, "video/x-h264") == 0) {
614     st = TSMUX_ST_VIDEO_H264;
615   } else if (strcmp (mt, "audio/mpeg") == 0) {
616     gint mpegversion;
617
618     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
619       GST_ERROR_OBJECT (pad, "caps missing mpegversion");
620       goto not_negotiated;
621     }
622
623     switch (mpegversion) {
624       case 1:
625         st = TSMUX_ST_AUDIO_MPEG1;
626         break;
627       case 2:
628         st = TSMUX_ST_AUDIO_MPEG2;
629         break;
630       case 4:
631       {
632         st = TSMUX_ST_AUDIO_AAC;
633         if (codec_data) {       /* TODO - Check stream format - codec data should only come with RAW stream */
634           GST_DEBUG_OBJECT (pad,
635               "we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
636               gst_buffer_get_size (codec_data));
637           ts_data->codec_data = gst_buffer_ref (codec_data);
638           ts_data->prepare_func = mpegtsmux_prepare_aac;
639         } else {
640           ts_data->codec_data = NULL;
641         }
642         break;
643       }
644       default:
645         GST_WARNING_OBJECT (pad, "unsupported mpegversion %d", mpegversion);
646         goto not_negotiated;
647     }
648   } else if (strcmp (mt, "video/mpeg") == 0) {
649     gint mpegversion;
650
651     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
652       GST_ERROR_OBJECT (pad, "caps missing mpegversion");
653       goto not_negotiated;
654     }
655
656     switch (mpegversion) {
657       case 1:
658         st = TSMUX_ST_VIDEO_MPEG1;
659         break;
660       case 2:
661         st = TSMUX_ST_VIDEO_MPEG2;
662         break;
663       case 4:
664         st = TSMUX_ST_VIDEO_MPEG4;
665         break;
666       default:
667         GST_WARNING_OBJECT (pad, "unsupported mpegversion %d", mpegversion);
668         goto not_negotiated;
669     }
670   } else if (strcmp (mt, "subpicture/x-dvb") == 0) {
671     st = TSMUX_ST_PS_DVB_SUBPICTURE;
672   } else if (strcmp (mt, "application/x-teletext") == 0) {
673     st = TSMUX_ST_PS_TELETEXT;
674     /* needs a particularly sized layout */
675     ts_data->prepare_func = mpegtsmux_prepare_teletext;
676   }
677
678   if (st != TSMUX_ST_RESERVED) {
679     ts_data->stream = tsmux_create_stream (mux->tsmux, st, ts_data->pid,
680         ts_data->language);
681   } else {
682     GST_DEBUG_OBJECT (pad, "Failed to determine stream type");
683   }
684
685   if (ts_data->stream != NULL) {
686     gst_structure_get_int (s, "rate", &ts_data->stream->audio_sampling);
687     gst_structure_get_int (s, "channels", &ts_data->stream->audio_channels);
688     gst_structure_get_int (s, "bitrate", &ts_data->stream->audio_bitrate);
689
690     tsmux_stream_set_buffer_release_func (ts_data->stream, release_buffer_cb);
691     tsmux_program_add_stream (ts_data->prog, ts_data->stream);
692
693     ret = GST_FLOW_OK;
694   }
695 #if 0
696   GST_OBJECT_LOCK (mux);
697   if (mux->element_index) {
698     gboolean parsed = FALSE;
699
700     if (ts_data->stream->is_video_stream) {
701       if (gst_structure_get_boolean (s, "parsed", &parsed) && parsed) {
702         if (ts_data->element_index_writer_id == -1) {
703           gst_index_get_writer_id (mux->element_index, GST_OBJECT (mux),
704               &ts_data->element_index_writer_id);
705           GST_DEBUG_OBJECT (mux, "created GstIndex writer_id = %d for stream",
706               ts_data->element_index_writer_id);
707           gst_index_add_format (mux->element_index,
708               ts_data->element_index_writer_id, pts_format);
709           gst_index_add_format (mux->element_index,
710               ts_data->element_index_writer_id, spn_format);
711         }
712       } else {
713         GST_WARNING_OBJECT (pad, "no indexing for (unparsed) stream !");
714       }
715     }
716   }
717   GST_OBJECT_UNLOCK (mux);
718 #endif
719
720   gst_caps_unref (caps);
721   return ret;
722
723   /* ERRORS */
724 not_negotiated:
725   {
726     GST_DEBUG_OBJECT (pad, "Sink pad caps were not set before pushing");
727     if (caps)
728       gst_caps_unref (caps);
729     return GST_FLOW_NOT_NEGOTIATED;
730   }
731 }
732
733 static GstFlowReturn
734 mpegtsmux_create_streams (MpegTsMux * mux)
735 {
736   GstFlowReturn ret = GST_FLOW_OK;
737   GSList *walk = mux->collect->data;
738
739   /* Create the streams */
740   while (walk) {
741     GstCollectData *c_data = (GstCollectData *) walk->data;
742     MpegTsPadData *ts_data = (MpegTsPadData *) walk->data;
743     gchar *name = NULL;
744
745     walk = g_slist_next (walk);
746
747     if (ts_data->prog_id == -1) {
748       name = GST_PAD_NAME (c_data->pad);
749       if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map,
750               name)) {
751         gint idx;
752         gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
753         if (!ret) {
754           GST_ELEMENT_ERROR (mux, STREAM, MUX,
755               ("Reading program map failed. Assuming default"), (NULL));
756           idx = DEFAULT_PROG_ID;
757         }
758         if (idx < 0 || idx >= MAX_PROG_NUMBER) {
759           GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s out "
760               "of range (max = %d); DEFAULT_PROGRAM = %d is used instead",
761               idx, name, MAX_PROG_NUMBER, DEFAULT_PROG_ID);
762           idx = DEFAULT_PROG_ID;
763         }
764         ts_data->prog_id = idx;
765       } else {
766         ts_data->prog_id = DEFAULT_PROG_ID;
767       }
768     }
769
770     ts_data->prog = mux->programs[ts_data->prog_id];
771     if (ts_data->prog == NULL) {
772       ts_data->prog = tsmux_program_new (mux->tsmux, ts_data->prog_id);
773       if (ts_data->prog == NULL)
774         goto no_program;
775       tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval);
776       mux->programs[ts_data->prog_id] = ts_data->prog;
777     }
778
779     if (ts_data->stream == NULL) {
780       ret = mpegtsmux_create_stream (mux, ts_data);
781       if (ret != GST_FLOW_OK)
782         goto no_stream;
783     }
784   }
785
786   return GST_FLOW_OK;
787
788   /* ERRORS */
789 no_program:
790   {
791     GST_ELEMENT_ERROR (mux, STREAM, MUX,
792         ("Could not create new program"), (NULL));
793     return GST_FLOW_ERROR;
794   }
795 no_stream:
796   {
797     GST_ELEMENT_ERROR (mux, STREAM, MUX,
798         ("Could not create handler for stream"), (NULL));
799     return ret;
800   }
801 }
802
803
804 #define COLLECT_DATA_PAD(collect_data) (((GstCollectData *)(collect_data))->pad)
805
806 static gboolean
807 mpegtsmux_sink_event (GstCollectPads * pads, GstCollectData * data,
808     GstEvent * event, gpointer user_data)
809 {
810   MpegTsMux *mux = GST_MPEG_TSMUX (user_data);
811   gboolean res = FALSE;
812   gboolean forward = TRUE;
813   MpegTsPadData *pad_data = (MpegTsPadData *) data;
814
815 #ifndef GST_DISABLE_GST_DEBUG
816   GstPad *pad;
817
818   pad = data->pad;
819 #endif
820
821   switch (GST_EVENT_TYPE (event)) {
822     case GST_EVENT_CUSTOM_DOWNSTREAM:
823     {
824       GstClockTime timestamp, stream_time, running_time;
825       gboolean all_headers;
826       guint count;
827
828       if (!gst_video_event_is_force_key_unit (event))
829         goto out;
830
831       res = TRUE;
832       forward = FALSE;
833
834       gst_video_event_parse_downstream_force_key_unit (event,
835           &timestamp, &stream_time, &running_time, &all_headers, &count);
836       GST_INFO_OBJECT (pad, "have downstream force-key-unit event, "
837           "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
838           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
839
840       if (mux->force_key_unit_event != NULL) {
841         GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
842             "as an upstream force key unit is already queued");
843         goto out;
844       }
845
846       if (!all_headers)
847         goto out;
848
849       mux->pending_key_unit_ts = running_time;
850       gst_event_replace (&mux->force_key_unit_event, event);
851       break;
852     }
853     case GST_EVENT_TAG:{
854       GstTagList *list;
855       gchar *lang = NULL;
856
857       GST_DEBUG_OBJECT (mux, "received tag event");
858       gst_event_parse_tag (event, &list);
859
860       /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
861       if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
862         const gchar *lang_code;
863
864         lang_code = gst_tag_get_language_code_iso_639_2B (lang);
865         if (lang_code) {
866           GST_DEBUG_OBJECT (pad, "Setting language to '%s'", lang_code);
867           pad_data->language = g_strdup (lang_code);
868         } else {
869           GST_WARNING_OBJECT (pad, "Did not get language code for '%s'", lang);
870         }
871         g_free (lang);
872       }
873
874       /* handled this, don't want collectpads to forward it downstream */
875       res = TRUE;
876       forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
877       break;
878     }
879     default:
880       break;
881   }
882
883 out:
884   if (!forward)
885     gst_event_unref (event);
886   else
887     res = gst_collect_pads_event_default (pads, data, event, FALSE);
888
889   return res;
890 }
891
892 static gboolean
893 mpegtsmux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
894 {
895   MpegTsMux *mux = GST_MPEG_TSMUX (gst_pad_get_parent (pad));
896   gboolean res = TRUE, forward = TRUE;
897
898   switch (GST_EVENT_TYPE (event)) {
899     case GST_EVENT_CUSTOM_UPSTREAM:
900     {
901       GstIterator *iter;
902       GstIteratorResult iter_ret;
903       GstPad *sinkpad;
904       GValue sinkpad_value = G_VALUE_INIT;
905       GstClockTime running_time;
906       gboolean all_headers, done;
907       guint count;
908
909       if (!gst_video_event_is_force_key_unit (event))
910         break;
911
912       forward = FALSE;
913
914       gst_video_event_parse_upstream_force_key_unit (event,
915           &running_time, &all_headers, &count);
916
917       GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
918           "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
919           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
920           all_headers, count);
921
922       if (!all_headers)
923         break;
924
925       mux->pending_key_unit_ts = running_time;
926       gst_event_replace (&mux->force_key_unit_event, event);
927
928       iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
929       done = FALSE;
930       while (!done) {
931         gboolean res = FALSE, tmp;
932         iter_ret = gst_iterator_next (iter, &sinkpad_value);
933         sinkpad = g_value_get_object (&sinkpad_value);
934
935         switch (iter_ret) {
936           case GST_ITERATOR_DONE:
937             done = TRUE;
938             break;
939           case GST_ITERATOR_OK:
940             GST_INFO_OBJECT (pad, "forwarding");
941             tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
942             GST_INFO_OBJECT (mux, "result %d", tmp);
943             /* succeed if at least one pad succeeds */
944             res |= tmp;
945             break;
946           case GST_ITERATOR_ERROR:
947             done = TRUE;
948             break;
949           case GST_ITERATOR_RESYNC:
950             break;
951         }
952         g_value_reset (&sinkpad_value);
953       }
954       g_value_unset (&sinkpad_value);
955       gst_iterator_free (iter);
956       break;
957     }
958     default:
959       break;
960   }
961
962   if (forward)
963     res = gst_pad_event_default (pad, parent, event);
964   else
965     gst_event_unref (event);
966
967   gst_object_unref (mux);
968   return res;
969 }
970
971 static GstEvent *
972 check_pending_key_unit_event (GstEvent * pending_event, GstSegment * segment,
973     GstClockTime timestamp, guint flags, GstClockTime pending_key_unit_ts)
974 {
975   GstClockTime running_time, stream_time;
976   gboolean all_headers;
977   guint count;
978   GstEvent *event = NULL;
979
980   g_return_val_if_fail (pending_event != NULL, NULL);
981   g_return_val_if_fail (segment != NULL, NULL);
982
983   if (pending_event == NULL)
984     goto out;
985
986   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
987       timestamp == GST_CLOCK_TIME_NONE)
988     goto out;
989
990   running_time = gst_segment_to_running_time (segment,
991       GST_FORMAT_TIME, timestamp);
992
993   GST_INFO ("now %" GST_TIME_FORMAT " wanted %" GST_TIME_FORMAT,
994       GST_TIME_ARGS (running_time), GST_TIME_ARGS (pending_key_unit_ts));
995   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
996       running_time < pending_key_unit_ts)
997     goto out;
998
999   if (flags & GST_BUFFER_FLAG_DELTA_UNIT) {
1000     GST_INFO ("pending force key unit, waiting for keyframe");
1001     goto out;
1002   }
1003
1004   stream_time = gst_segment_to_stream_time (segment,
1005       GST_FORMAT_TIME, timestamp);
1006
1007   gst_video_event_parse_upstream_force_key_unit (pending_event,
1008       NULL, &all_headers, &count);
1009
1010   event =
1011       gst_video_event_new_downstream_force_key_unit (timestamp, stream_time,
1012       running_time, all_headers, count);
1013   gst_event_set_seqnum (event, gst_event_get_seqnum (pending_event));
1014
1015 out:
1016   return event;
1017 }
1018
1019 GstFlowReturn
1020 mpegtsmux_clip_inc_running_time (GstCollectPads * pads,
1021     GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
1022     gpointer user_data)
1023 {
1024   MpegTsPadData *pad_data = (MpegTsPadData *) cdata;
1025   GstClockTime time;
1026
1027   *outbuf = buf;
1028
1029   /* PTS */
1030   time = GST_BUFFER_TIMESTAMP (buf);
1031
1032   /* invalid left alone and passed */
1033   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
1034     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
1035     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
1036       GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
1037       gst_buffer_unref (buf);
1038       *outbuf = NULL;
1039       goto beach;
1040     } else {
1041       GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %"
1042           GST_TIME_FORMAT " running time",
1043           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
1044       buf = *outbuf = gst_buffer_make_writable (buf);
1045       GST_BUFFER_TIMESTAMP (*outbuf) = time;
1046     }
1047   }
1048
1049   /* DTS */
1050   time = GST_BUFFER_DTS (buf);
1051
1052   /* invalid left alone and passed */
1053   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
1054     time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
1055     /* may have to decode out-of-segment, so pass INVALID */
1056     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
1057       GST_DEBUG_OBJECT (cdata->pad, "running dts outside segment");
1058     } else {
1059       GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %"
1060           GST_TIME_FORMAT " running time",
1061           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
1062       if (GST_CLOCK_TIME_IS_VALID (pad_data->min_dts) &&
1063           time < pad_data->min_dts) {
1064         /* Ignore DTS going backward */
1065         GST_WARNING_OBJECT (cdata->pad, "ignoring DTS going backward");
1066         time = pad_data->min_dts;
1067       }
1068       buf = *outbuf = gst_buffer_make_writable (buf);
1069       GST_BUFFER_DTS (*outbuf) = time;
1070     }
1071   }
1072
1073   buf = *outbuf;
1074   if (pad_data->prepare_func) {
1075     MpegTsMux *mux = (MpegTsMux *) user_data;
1076
1077     *outbuf = pad_data->prepare_func (buf, pad_data, mux);
1078     g_assert (*outbuf);
1079     gst_buffer_unref (buf);
1080   }
1081
1082 beach:
1083   return GST_FLOW_OK;
1084 }
1085
1086 static GstFlowReturn
1087 mpegtsmux_collected_buffer (GstCollectPads * pads, GstCollectData * data,
1088     GstBuffer * buf, MpegTsMux * mux)
1089 {
1090   GstFlowReturn ret = GST_FLOW_OK;
1091   MpegTsPadData *best = (MpegTsPadData *) data;
1092   TsMuxProgram *prog;
1093   gint64 pts = -1;
1094   guint64 dts = -1;
1095   gboolean delta = TRUE;
1096   StreamData *stream_data;
1097
1098   GST_DEBUG_OBJECT (mux, "Pads collected");
1099
1100   if (G_UNLIKELY (mux->first)) {
1101     ret = mpegtsmux_create_streams (mux);
1102     if (G_UNLIKELY (ret != GST_FLOW_OK))
1103       return ret;
1104
1105     mpegtsdemux_prepare_srcpad (mux);
1106
1107     mux->first = FALSE;
1108   }
1109
1110   if (G_UNLIKELY (best == NULL)) {
1111     /* EOS */
1112     /* drain some possibly cached data */
1113     new_packet_m2ts (mux, NULL, -1);
1114     mpegtsmux_push_packets (mux, TRUE);
1115     gst_pad_push_event (mux->srcpad, gst_event_new_eos ());
1116
1117     return GST_FLOW_OK;
1118   }
1119
1120   prog = best->prog;
1121   if (prog == NULL)
1122     goto no_program;
1123
1124   g_assert (buf != NULL);
1125
1126   if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
1127     GstEvent *event;
1128
1129     event = check_pending_key_unit_event (mux->force_key_unit_event,
1130         &best->collect.segment, GST_BUFFER_TIMESTAMP (buf),
1131         GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
1132     if (event) {
1133       GstClockTime running_time;
1134       guint count;
1135       GList *cur;
1136
1137       mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
1138       gst_event_replace (&mux->force_key_unit_event, NULL);
1139
1140       gst_video_event_parse_downstream_force_key_unit (event,
1141           NULL, NULL, &running_time, NULL, &count);
1142
1143       GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
1144           "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
1145           GST_TIME_ARGS (running_time), count);
1146       gst_pad_push_event (mux->srcpad, event);
1147
1148       /* output PAT */
1149       mux->tsmux->last_pat_ts = -1;
1150
1151       /* output PMT for each program */
1152       for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1153         TsMuxProgram *program = (TsMuxProgram *) cur->data;
1154
1155         program->last_pmt_ts = -1;
1156       }
1157       tsmux_program_set_pcr_stream (prog, NULL);
1158     }
1159   }
1160
1161   if (G_UNLIKELY (prog->pcr_stream == NULL)) {
1162     /* Take the first data stream for the PCR */
1163     GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
1164         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
1165         MPEG_TS_PAD_DATA (best)->pid, MPEG_TS_PAD_DATA (best)->prog_id);
1166
1167     /* Set the chosen PCR stream */
1168     tsmux_program_set_pcr_stream (prog, best->stream);
1169   }
1170
1171   GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best),
1172       "Chose stream for output (PID: 0x%04x)", best->pid);
1173
1174   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1175     pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
1176     GST_DEBUG_OBJECT (mux, "Buffer has PTS %" GST_TIME_FORMAT " pts %"
1177         G_GINT64_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts);
1178   }
1179
1180   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf))) {
1181     dts = GSTTIME_TO_MPEGTIME (GST_BUFFER_DTS (buf));
1182     GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_TIME_FORMAT " dts %"
1183         G_GINT64_FORMAT, GST_TIME_ARGS (GST_BUFFER_DTS (buf)), dts);
1184   }
1185
1186   /* should not have a DTS without PTS */
1187   if (pts == -1 && dts != -1) {
1188     GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
1189     pts = dts;
1190   }
1191
1192   if (best->stream->is_video_stream) {
1193     delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1194 #if 0
1195     GST_OBJECT_LOCK (mux);
1196     if (mux->element_index && !delta && best->element_index_writer_id != -1) {
1197       gst_index_add_association (mux->element_index,
1198           best->element_index_writer_id,
1199           GST_ASSOCIATION_FLAG_KEY_UNIT, spn_format, mux->spn_count,
1200           pts_format, pts, NULL);
1201     }
1202     GST_OBJECT_UNLOCK (mux);
1203 #endif
1204   }
1205   GST_DEBUG_OBJECT (mux, "delta: %d", delta);
1206
1207   stream_data = stream_data_new (buf);
1208   tsmux_stream_add_data (best->stream, stream_data->map_info.data,
1209       stream_data->map_info.size, stream_data, pts, dts, !delta);
1210
1211   /* outgoing ts follows ts of PCR program stream */
1212   if (prog->pcr_stream == best->stream) {
1213     /* prefer DTS if present for PCR as it should be monotone */
1214     mux->last_ts =
1215         GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
1216         GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
1217   }
1218
1219   mux->is_delta = delta;
1220   while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
1221     if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
1222       /* Failed writing data for some reason. Set appropriate error */
1223       GST_DEBUG_OBJECT (mux, "Failed to write data packet");
1224       GST_ELEMENT_ERROR (mux, STREAM, MUX,
1225           ("Failed writing output data to stream %04x", best->stream->id),
1226           (NULL));
1227       goto write_fail;
1228     }
1229   }
1230   /* flush packet cache */
1231   return mpegtsmux_push_packets (mux, FALSE);
1232
1233   /* ERRORS */
1234 write_fail:
1235   {
1236     return mux->last_flow_ret;
1237   }
1238 no_program:
1239   {
1240     GST_ELEMENT_ERROR (mux, STREAM, MUX,
1241         ("Stream on pad %" GST_PTR_FORMAT
1242             " is not associated with any program", COLLECT_DATA_PAD (best)),
1243         (NULL));
1244     return GST_FLOW_ERROR;
1245   }
1246 }
1247
1248 static GstPad *
1249 mpegtsmux_request_new_pad (GstElement * element, GstPadTemplate * templ,
1250     const gchar * name, const GstCaps * caps)
1251 {
1252   MpegTsMux *mux = GST_MPEG_TSMUX (element);
1253   gint pid = -1;
1254   gchar *pad_name = NULL;
1255   GstPad *pad = NULL;
1256   MpegTsPadData *pad_data = NULL;
1257
1258   if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
1259     if (tsmux_find_stream (mux->tsmux, pid))
1260       goto stream_exists;
1261   } else {
1262     pid = tsmux_get_new_pid (mux->tsmux);
1263   }
1264
1265   pad_name = g_strdup_printf ("sink_%d", pid);
1266   pad = gst_pad_new_from_template (templ, pad_name);
1267   g_free (pad_name);
1268
1269   pad_data = (MpegTsPadData *)
1270       gst_collect_pads_add_pad (mux->collect, pad, sizeof (MpegTsPadData),
1271       (GstCollectDataDestroyNotify) (mpegtsmux_pad_reset), TRUE);
1272   if (pad_data == NULL)
1273     goto pad_failure;
1274
1275   mpegtsmux_pad_reset (pad_data);
1276   pad_data->pid = pid;
1277
1278   if (G_UNLIKELY (!gst_element_add_pad (element, pad)))
1279     goto could_not_add;
1280
1281   return pad;
1282
1283   /* ERRORS */
1284 stream_exists:
1285   {
1286     GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"),
1287         (NULL));
1288     return NULL;
1289   }
1290 could_not_add:
1291   {
1292     GST_ELEMENT_ERROR (element, STREAM, FAILED,
1293         ("Internal data stream error."), ("Could not add pad to element"));
1294     gst_collect_pads_remove_pad (mux->collect, pad);
1295     gst_object_unref (pad);
1296     return NULL;
1297   }
1298 pad_failure:
1299   {
1300     GST_ELEMENT_ERROR (element, STREAM, FAILED,
1301         ("Internal data stream error."), ("Could not add pad to collectpads"));
1302     gst_object_unref (pad);
1303     return NULL;
1304   }
1305 }
1306
1307 static void
1308 mpegtsmux_release_pad (GstElement * element, GstPad * pad)
1309 {
1310   MpegTsMux *mux = GST_MPEG_TSMUX (element);
1311
1312   GST_DEBUG_OBJECT (mux, "Pad %" GST_PTR_FORMAT " being released", pad);
1313
1314   if (mux->collect) {
1315     gst_collect_pads_remove_pad (mux->collect, pad);
1316   }
1317
1318   /* chain up */
1319   gst_element_remove_pad (element, pad);
1320 }
1321
1322 static void
1323 new_packet_common_init (MpegTsMux * mux, GstBuffer * buf, guint8 * data,
1324     guint len)
1325 {
1326   /* Packets should be at least 188 bytes, but check anyway */
1327   g_return_if_fail (len >= 2 || !data);
1328
1329   if (!mux->streamheader_sent && data) {
1330     guint pid = ((data[1] & 0x1f) << 8) | data[2];
1331     /* if it's a PAT or a PMT */
1332     if (pid == 0x00 || (pid >= TSMUX_START_PMT_PID && pid < TSMUX_START_ES_PID)) {
1333       GstBuffer *hbuf;
1334
1335       if (!buf) {
1336         hbuf = gst_buffer_new_and_alloc (len);
1337         gst_buffer_fill (hbuf, 0, data, len);
1338       } else {
1339         hbuf = gst_buffer_copy (buf);
1340       }
1341       mux->streamheader = g_list_append (mux->streamheader, hbuf);
1342     } else if (mux->streamheader) {
1343       mpegtsdemux_set_header_on_caps (mux);
1344       mux->streamheader_sent = TRUE;
1345     }
1346   }
1347
1348   if (buf) {
1349     if (mux->is_delta) {
1350       GST_LOG_OBJECT (mux, "marking as delta unit");
1351       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1352     } else {
1353       GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
1354       mux->is_delta = TRUE;
1355     }
1356   }
1357 }
1358
1359 static GstFlowReturn
1360 mpegtsmux_push_packets (MpegTsMux * mux, gboolean force)
1361 {
1362   gint align = mux->alignment;
1363   gint av, packet_size;
1364   GstBuffer *buf;
1365   GstFlowReturn ret = GST_FLOW_OK;
1366   GstClockTime ts;
1367
1368   if (mux->m2ts_mode) {
1369     packet_size = M2TS_PACKET_LENGTH;
1370     if (align < 0)
1371       align = 32;
1372   } else {
1373     packet_size = NORMAL_TS_PACKET_LENGTH;
1374     if (align < 0)
1375       align = 0;
1376   }
1377
1378   av = gst_adapter_available (mux->out_adapter);
1379   GST_LOG_OBJECT (mux, "align %d, av %d", align, av);
1380
1381   if (!align)
1382     align = av;
1383   else
1384     align *= packet_size;
1385
1386   /* FIXME: what about DTS here? */
1387   GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
1388   if (G_LIKELY ((align <= av) && av)) {
1389     GST_LOG_OBJECT (mux, "pushing %d aligned bytes", av - (av % align));
1390     ts = gst_adapter_prev_pts (mux->out_adapter, NULL);
1391     buf = gst_adapter_take_buffer (mux->out_adapter, av - (av % align));
1392     g_assert (buf);
1393     GST_BUFFER_PTS (buf) = ts;
1394
1395     ret = gst_pad_push (mux->srcpad, buf);
1396     av = av % align;
1397   }
1398
1399   if (av && force) {
1400     guint8 *data;
1401     guint32 header;
1402     gint dummy;
1403     GstMapInfo map;
1404
1405     GST_LOG_OBJECT (mux, "handling %d leftover bytes", av);
1406     buf = gst_buffer_new_and_alloc (align);
1407     gst_buffer_map (buf, &map, GST_MAP_READ);
1408     data = map.data;
1409     ts = gst_adapter_prev_pts (mux->out_adapter, NULL);
1410
1411     gst_adapter_copy (mux->out_adapter, data, 0, av);
1412     gst_adapter_clear (mux->out_adapter);
1413     GST_BUFFER_PTS (buf) = ts;
1414
1415     data += av;
1416     header = GST_READ_UINT32_BE (data - packet_size);
1417
1418     dummy = (map.size - av) / packet_size;
1419     GST_LOG_OBJECT (mux, "adding %d null packets", dummy);
1420
1421     for (; dummy > 0; dummy--) {
1422       gint offset;
1423
1424       if (packet_size > NORMAL_TS_PACKET_LENGTH) {
1425         GST_WRITE_UINT32_BE (data, header);
1426         /* simply increase header a bit and never mind too much */
1427         header++;
1428         offset = 4;
1429       } else {
1430         offset = 0;
1431       }
1432       GST_WRITE_UINT8 (data + offset, TSMUX_SYNC_BYTE);
1433       /* null packet PID */
1434       GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
1435       /* no adaptation field exists | continuity counter undefined */
1436       GST_WRITE_UINT8 (data + offset + 3, 0x10);
1437       /* payload */
1438       memset (data + offset + 4, 0, NORMAL_TS_PACKET_LENGTH - 4);
1439       data += packet_size;
1440     }
1441
1442     gst_buffer_unmap (buf, &map);
1443
1444     ret = gst_pad_push (mux->srcpad, buf);
1445   }
1446
1447   return ret;
1448 }
1449
1450 static GstFlowReturn
1451 mpegtsmux_collect_packet (MpegTsMux * mux, GstBuffer * buf)
1452 {
1453   GST_LOG_OBJECT (mux, "collecting packet size %" G_GSIZE_FORMAT,
1454       gst_buffer_get_size (buf));
1455   gst_adapter_push (mux->out_adapter, buf);
1456
1457   return GST_FLOW_OK;
1458 }
1459
1460 static gboolean
1461 new_packet_m2ts (MpegTsMux * mux, GstBuffer * buf, gint64 new_pcr)
1462 {
1463   GstBuffer *out_buf;
1464   int chunk_bytes;
1465   GstMapInfo map;
1466
1467   GST_LOG_OBJECT (mux, "Have buffer %p with new_pcr=%" G_GINT64_FORMAT,
1468       buf, new_pcr);
1469
1470   chunk_bytes = gst_adapter_available (mux->adapter);
1471
1472   if (G_LIKELY (buf)) {
1473     if (new_pcr < 0) {
1474       /* If there is no pcr in current ts packet then just add the packet
1475          to the adapter for later output when we see a PCR */
1476       GST_LOG_OBJECT (mux, "Accumulating non-PCR packet");
1477       gst_adapter_push (mux->adapter, buf);
1478       goto exit;
1479     }
1480
1481     /* no first interpolation point yet, then this is the one,
1482      * otherwise it is the second interpolation point */
1483     if (mux->previous_pcr < 0 && chunk_bytes) {
1484       mux->previous_pcr = new_pcr;
1485       mux->previous_offset = chunk_bytes;
1486       GST_LOG_OBJECT (mux, "Accumulating non-PCR packet");
1487       gst_adapter_push (mux->adapter, buf);
1488       goto exit;
1489     }
1490   } else {
1491     g_assert (new_pcr == -1);
1492   }
1493
1494   /* interpolate if needed, and 2 points available */
1495   if (chunk_bytes && (new_pcr != mux->previous_pcr)) {
1496     gint64 offset = 0;
1497
1498     GST_LOG_OBJECT (mux, "Processing pending packets; "
1499         "previous pcr %" G_GINT64_FORMAT ", previous offset %d, "
1500         "current pcr %" G_GINT64_FORMAT ", current offset %d",
1501         mux->previous_pcr, (gint) mux->previous_offset,
1502         new_pcr, (gint) chunk_bytes);
1503
1504     g_assert (chunk_bytes > mux->previous_offset);
1505     /* if draining, use previous rate */
1506     if (G_LIKELY (new_pcr > 0)) {
1507       mux->pcr_rate_num = new_pcr - mux->previous_pcr;
1508       mux->pcr_rate_den = chunk_bytes - mux->previous_offset;
1509     }
1510
1511     while (offset < chunk_bytes) {
1512       guint64 cur_pcr, ts;
1513
1514       /* Loop, pulling packets of the adapter, updating their 4 byte
1515        * timestamp header and pushing */
1516
1517       /* interpolate PCR */
1518       if (G_LIKELY (offset >= mux->previous_offset))
1519         cur_pcr = mux->previous_pcr +
1520             gst_util_uint64_scale (offset - mux->previous_offset,
1521             mux->pcr_rate_num, mux->pcr_rate_den);
1522       else
1523         cur_pcr = mux->previous_pcr -
1524             gst_util_uint64_scale (mux->previous_offset - offset,
1525             mux->pcr_rate_num, mux->pcr_rate_den);
1526
1527       /* FIXME: what about DTS here? */
1528       ts = gst_adapter_prev_pts (mux->adapter, NULL);
1529       out_buf = gst_adapter_take_buffer (mux->adapter, M2TS_PACKET_LENGTH);
1530       g_assert (out_buf);
1531       offset += M2TS_PACKET_LENGTH;
1532
1533       GST_BUFFER_PTS (out_buf) = ts;
1534
1535       gst_buffer_map (out_buf, &map, GST_MAP_WRITE);
1536
1537       /* The header is the bottom 30 bits of the PCR, apparently not
1538        * encoded into base + ext as in the packets themselves */
1539       GST_WRITE_UINT32_BE (map.data, cur_pcr & 0x3FFFFFFF);
1540       gst_buffer_unmap (out_buf, &map);
1541
1542       GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %"
1543           G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, cur_pcr);
1544       mpegtsmux_collect_packet (mux, out_buf);
1545     }
1546   }
1547
1548   if (G_UNLIKELY (!buf))
1549     goto exit;
1550
1551   gst_buffer_map (buf, &map, GST_MAP_WRITE);
1552
1553   /* Finally, output the passed in packet */
1554   /* Only write the bottom 30 bits of the PCR */
1555   GST_WRITE_UINT32_BE (map.data, new_pcr & 0x3FFFFFFF);
1556
1557   gst_buffer_unmap (buf, &map);
1558
1559   GST_LOG_OBJECT (mux, "Outputting a packet of length %d PCR %"
1560       G_GUINT64_FORMAT, M2TS_PACKET_LENGTH, new_pcr);
1561   mpegtsmux_collect_packet (mux, buf);
1562
1563   if (new_pcr != mux->previous_pcr) {
1564     mux->previous_pcr = new_pcr;
1565     mux->previous_offset = -M2TS_PACKET_LENGTH;
1566   }
1567
1568 exit:
1569   return TRUE;
1570 }
1571
1572 /* Called when the TsMux has prepared a packet for output. Return FALSE
1573  * on error */
1574 static gboolean
1575 new_packet_cb (GstBuffer * buf, void *user_data, gint64 new_pcr)
1576 {
1577   MpegTsMux *mux = (MpegTsMux *) user_data;
1578   gint offset = 0;
1579   GstMapInfo map;
1580
1581 #if 0
1582   GST_LOG_OBJECT (mux, "handling packet %d", mux->spn_count);
1583   mux->spn_count++;
1584 #endif
1585
1586   if (mux->m2ts_mode) {
1587     offset = 4;
1588     gst_buffer_set_size (buf, NORMAL_TS_PACKET_LENGTH + offset);
1589   }
1590
1591   gst_buffer_map (buf, &map, GST_MAP_READWRITE);
1592
1593   if (offset) {
1594     /* there should be a better way to do this */
1595     memmove (map.data + offset, map.data, map.size - offset);
1596   }
1597
1598   GST_BUFFER_PTS (buf) = mux->last_ts;
1599   /* do common init (flags and streamheaders) */
1600   new_packet_common_init (mux, buf, map.data + offset, map.size);
1601
1602   gst_buffer_unmap (buf, &map);
1603
1604   /* all is meant for downstream, including any prefix */
1605   if (offset)
1606     return new_packet_m2ts (mux, buf, new_pcr);
1607   else
1608     mpegtsmux_collect_packet (mux, buf);
1609
1610   return TRUE;
1611 }
1612
1613 /* called when TsMux needs new packet to write into */
1614 static void
1615 alloc_packet_cb (GstBuffer ** _buf, void *user_data)
1616 {
1617   MpegTsMux *mux = (MpegTsMux *) user_data;
1618   GstBuffer *buf;
1619   gint offset = 0;
1620
1621   if (mux->m2ts_mode == TRUE)
1622     offset = 4;
1623
1624   buf = gst_buffer_new_and_alloc (NORMAL_TS_PACKET_LENGTH + offset);
1625   gst_buffer_set_size (buf, NORMAL_TS_PACKET_LENGTH);
1626
1627   *_buf = buf;
1628 }
1629
1630 static void
1631 mpegtsdemux_set_header_on_caps (MpegTsMux * mux)
1632 {
1633   GstBuffer *buf;
1634   GstStructure *structure;
1635   GValue array = { 0 };
1636   GValue value = { 0 };
1637   GstCaps *caps;
1638   GList *sh;
1639
1640   caps = gst_caps_make_writable (gst_pad_get_current_caps (mux->srcpad));
1641   structure = gst_caps_get_structure (caps, 0);
1642
1643   g_value_init (&array, GST_TYPE_ARRAY);
1644
1645   sh = mux->streamheader;
1646   while (sh) {
1647     buf = sh->data;
1648     g_value_init (&value, GST_TYPE_BUFFER);
1649     gst_value_take_buffer (&value, buf);
1650     gst_value_array_append_value (&array, &value);
1651     g_value_unset (&value);
1652     sh = g_list_next (sh);
1653   }
1654
1655   g_list_free (mux->streamheader);
1656   mux->streamheader = NULL;
1657
1658   gst_structure_set_value (structure, "streamheader", &array);
1659   gst_pad_set_caps (mux->srcpad, caps);
1660   g_value_unset (&array);
1661   gst_caps_unref (caps);
1662 }
1663
1664 static void
1665 mpegtsdemux_prepare_srcpad (MpegTsMux * mux)
1666 {
1667   GstSegment seg;
1668   /* we are not going to seek */
1669   GstEvent *new_seg;
1670   gchar s_id[32];
1671   GstCaps *caps = gst_caps_new_simple ("video/mpegts",
1672       "systemstream", G_TYPE_BOOLEAN, TRUE,
1673       "packetsize", G_TYPE_INT,
1674       (mux->m2ts_mode ? M2TS_PACKET_LENGTH : NORMAL_TS_PACKET_LENGTH),
1675       NULL);
1676
1677   /* stream-start (FIXME: create id based on input ids) */
1678   g_snprintf (s_id, sizeof (s_id), "mpegtsmux-%08x", g_random_int ());
1679   gst_pad_push_event (mux->srcpad, gst_event_new_stream_start (s_id));
1680
1681   gst_segment_init (&seg, GST_FORMAT_TIME);
1682   new_seg = gst_event_new_segment (&seg);
1683
1684   /* Set caps on src pad from our template and push new segment */
1685   gst_pad_set_caps (mux->srcpad, caps);
1686   gst_caps_unref (caps);
1687
1688   if (!gst_pad_push_event (mux->srcpad, new_seg)) {
1689     GST_WARNING_OBJECT (mux, "New segment event was not handled downstream");
1690   }
1691 }
1692
1693 static GstStateChangeReturn
1694 mpegtsmux_change_state (GstElement * element, GstStateChange transition)
1695 {
1696   MpegTsMux *mux = GST_MPEG_TSMUX (element);
1697   GstStateChangeReturn ret;
1698
1699   switch (transition) {
1700     case GST_STATE_CHANGE_NULL_TO_READY:
1701       break;
1702     case GST_STATE_CHANGE_READY_TO_PAUSED:
1703       gst_collect_pads_start (mux->collect);
1704       break;
1705     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1706       break;
1707     case GST_STATE_CHANGE_PAUSED_TO_READY:
1708       gst_collect_pads_stop (mux->collect);
1709       break;
1710     case GST_STATE_CHANGE_READY_TO_NULL:
1711       break;
1712     default:
1713       break;
1714   }
1715
1716   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1717
1718   switch (transition) {
1719     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1720       break;
1721     case GST_STATE_CHANGE_PAUSED_TO_READY:
1722       mpegtsmux_reset (mux, TRUE);
1723       break;
1724     case GST_STATE_CHANGE_READY_TO_NULL:
1725       break;
1726     default:
1727       break;
1728   }
1729
1730   return ret;
1731 }
1732
1733 static gboolean
1734 mpegtsmux_send_event (GstElement * element, GstEvent * event)
1735 {
1736   GstMpegtsSection *section;
1737   MpegTsMux *mux = GST_MPEG_TSMUX (element);
1738
1739   g_return_val_if_fail (event != NULL, FALSE);
1740
1741   section = gst_event_parse_mpegts_section (event);
1742   gst_event_unref (event);
1743
1744   if (section) {
1745     GST_DEBUG ("Received event with mpegts section");
1746
1747     /* TODO: Check that the section type is supported */
1748     tsmux_add_mpegts_si_section (mux->tsmux, section);
1749
1750     return TRUE;
1751   }
1752
1753   return FALSE;
1754 }
1755
1756 static gboolean
1757 plugin_init (GstPlugin * plugin)
1758 {
1759   gst_mpegts_initialize ();
1760   if (!gst_element_register (plugin, "mpegtsmux", GST_RANK_PRIMARY,
1761           mpegtsmux_get_type ()))
1762     return FALSE;
1763
1764   GST_DEBUG_CATEGORY_INIT (mpegtsmux_debug, "mpegtsmux", 0,
1765       "MPEG Transport Stream muxer");
1766
1767   return TRUE;
1768 }
1769
1770 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR,
1771     mpegtsmux, "MPEG-TS muxer",
1772     plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);