multifile: attempt to fix docs build issue on build bot
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * is required, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * The splitting process is driven by the video stream contents, and
37  * the video stream must contain closed GOPs for the output file parts
38  * to be played individually correctly.
39  *
40  * <refsect2>
41  * <title>Example pipelines</title>
42  * |[
43  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
44  * ]|
45  * Records a video stream captured from a v4l2 device and muxes it into
46  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
47  * and 1MB maximum size.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <string.h>
56 #include "gstsplitmuxsink.h"
57
58 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
59 #define GST_CAT_DEFAULT splitmux_debug
60
61 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
62 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
63 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
64 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
65
66 enum
67 {
68   PROP_0,
69   PROP_LOCATION,
70   PROP_MAX_SIZE_TIME,
71   PROP_MAX_SIZE_BYTES,
72   PROP_MUXER_OVERHEAD,
73   PROP_MUXER,
74   PROP_SINK
75 };
76
77 #define DEFAULT_MAX_SIZE_TIME       0
78 #define DEFAULT_MAX_SIZE_BYTES      0
79 #define DEFAULT_MUXER_OVERHEAD      0.02
80 #define DEFAULT_MUXER "mp4mux"
81 #define DEFAULT_SINK "filesink"
82
83 static GstStaticPadTemplate video_sink_template =
84 GST_STATIC_PAD_TEMPLATE ("video",
85     GST_PAD_SINK,
86     GST_PAD_REQUEST,
87     GST_STATIC_CAPS_ANY);
88 static GstStaticPadTemplate audio_sink_template =
89 GST_STATIC_PAD_TEMPLATE ("audio_%u",
90     GST_PAD_SINK,
91     GST_PAD_REQUEST,
92     GST_STATIC_CAPS_ANY);
93 static GstStaticPadTemplate subtitle_sink_template =
94 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
95     GST_PAD_SINK,
96     GST_PAD_REQUEST,
97     GST_STATIC_CAPS_ANY);
98
99 static GQuark PAD_CONTEXT;
100
101 static void
102 _do_init (void)
103 {
104   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
105 }
106
107 #define gst_splitmux_sink_parent_class parent_class
108 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
109     _do_init ());
110
111 static gboolean create_elements (GstSplitMuxSink * splitmux);
112 static gboolean create_sink (GstSplitMuxSink * splitmux);
113 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
114     const GValue * value, GParamSpec * pspec);
115 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
116     GValue * value, GParamSpec * pspec);
117 static void gst_splitmux_sink_dispose (GObject * object);
118 static void gst_splitmux_sink_finalize (GObject * object);
119
120 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
121     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
122 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
123
124 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
125     element, GstStateChange transition);
126
127 static void bus_handler (GstBin * bin, GstMessage * msg);
128 static void set_next_filename (GstSplitMuxSink * splitmux);
129 static void start_next_fragment (GstSplitMuxSink * splitmux);
130 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
131 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
132
133 static MqStreamBuf *
134 mq_stream_buf_new (void)
135 {
136   return g_slice_new0 (MqStreamBuf);
137 }
138
139 static void
140 mq_stream_buf_free (MqStreamBuf * data)
141 {
142   g_slice_free (MqStreamBuf, data);
143 }
144
145 static void
146 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
147 {
148   GObjectClass *gobject_class = (GObjectClass *) klass;
149   GstElementClass *gstelement_class = (GstElementClass *) klass;
150   GstBinClass *gstbin_class = (GstBinClass *) klass;
151
152   gobject_class->set_property = gst_splitmux_sink_set_property;
153   gobject_class->get_property = gst_splitmux_sink_get_property;
154   gobject_class->dispose = gst_splitmux_sink_dispose;
155   gobject_class->finalize = gst_splitmux_sink_finalize;
156
157   gst_element_class_set_static_metadata (gstelement_class,
158       "Split Muxing Bin", "Generic/Bin/Muxer",
159       "Convenience bin that muxes incoming streams into multiple time/size limited files",
160       "Jan Schmidt <jan@centricular.com>");
161
162   gst_element_class_add_pad_template (gstelement_class,
163       gst_static_pad_template_get (&video_sink_template));
164   gst_element_class_add_pad_template (gstelement_class,
165       gst_static_pad_template_get (&audio_sink_template));
166   gst_element_class_add_pad_template (gstelement_class,
167       gst_static_pad_template_get (&subtitle_sink_template));
168
169   gstelement_class->change_state =
170       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
171   gstelement_class->request_new_pad =
172       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
173   gstelement_class->release_pad =
174       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
175
176   gstbin_class->handle_message = bus_handler;
177
178   g_object_class_install_property (gobject_class, PROP_LOCATION,
179       g_param_spec_string ("location", "File Output Pattern",
180           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
181           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
183       g_param_spec_double ("mux-overhead", "Muxing Overhead",
184           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
185           DEFAULT_MUXER_OVERHEAD,
186           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
187
188   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
189       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
190           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
191           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
193       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
194           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
195           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   g_object_class_install_property (gobject_class, PROP_MUXER,
198       g_param_spec_object ("muxer", "Muxer",
199           "The muxer element to use (NULL = default mp4mux)",
200           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201   g_object_class_install_property (gobject_class, PROP_SINK,
202       g_param_spec_object ("sink", "Sink",
203           "The sink element (or element chain) to use (NULL = default filesink)",
204           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
205 }
206
207 static void
208 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
209 {
210   g_mutex_init (&splitmux->lock);
211   g_cond_init (&splitmux->data_cond);
212
213   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
214   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
215   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
216
217   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
218 }
219
220 static void
221 gst_splitmux_reset (GstSplitMuxSink * splitmux)
222 {
223   if (splitmux->mq)
224     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
225   if (splitmux->muxer)
226     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
227   if (splitmux->active_sink)
228     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
229
230   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
231       NULL;
232 }
233
234 static void
235 gst_splitmux_sink_dispose (GObject * object)
236 {
237   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
238
239   G_OBJECT_CLASS (parent_class)->dispose (object);
240
241   /* Calling parent dispose invalidates all child pointers */
242   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
243       NULL;
244 }
245
246 static void
247 gst_splitmux_sink_finalize (GObject * object)
248 {
249   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
250   g_cond_clear (&splitmux->data_cond);
251   if (splitmux->provided_sink)
252     gst_object_unref (splitmux->provided_sink);
253   if (splitmux->provided_muxer)
254     gst_object_unref (splitmux->provided_muxer);
255
256   g_free (splitmux->location);
257
258   /* Make sure to free any un-released contexts */
259   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
260   g_list_free (splitmux->contexts);
261
262   G_OBJECT_CLASS (parent_class)->finalize (object);
263 }
264
265 static void
266 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
267     const GValue * value, GParamSpec * pspec)
268 {
269   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
270
271   switch (prop_id) {
272     case PROP_LOCATION:{
273       GST_OBJECT_LOCK (splitmux);
274       g_free (splitmux->location);
275       splitmux->location = g_value_dup_string (value);
276       GST_OBJECT_UNLOCK (splitmux);
277       break;
278     }
279     case PROP_MAX_SIZE_BYTES:
280       GST_OBJECT_LOCK (splitmux);
281       splitmux->threshold_bytes = g_value_get_uint64 (value);
282       GST_OBJECT_UNLOCK (splitmux);
283       break;
284     case PROP_MAX_SIZE_TIME:
285       GST_OBJECT_LOCK (splitmux);
286       splitmux->threshold_time = g_value_get_uint64 (value);
287       GST_OBJECT_UNLOCK (splitmux);
288       break;
289     case PROP_MUXER_OVERHEAD:
290       GST_OBJECT_LOCK (splitmux);
291       splitmux->mux_overhead = g_value_get_double (value);
292       GST_OBJECT_UNLOCK (splitmux);
293       break;
294     case PROP_SINK:
295       GST_SPLITMUX_LOCK (splitmux);
296       if (splitmux->provided_sink)
297         gst_object_unref (splitmux->provided_sink);
298       splitmux->provided_sink = g_value_dup_object (value);
299       GST_SPLITMUX_UNLOCK (splitmux);
300       break;
301     case PROP_MUXER:
302       GST_SPLITMUX_LOCK (splitmux);
303       if (splitmux->provided_muxer)
304         gst_object_unref (splitmux->provided_muxer);
305       splitmux->provided_muxer = g_value_dup_object (value);
306       GST_SPLITMUX_UNLOCK (splitmux);
307       break;
308     default:
309       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
310       break;
311   }
312 }
313
314 static void
315 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
316     GValue * value, GParamSpec * pspec)
317 {
318   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
319
320   switch (prop_id) {
321     case PROP_LOCATION:
322       GST_OBJECT_LOCK (splitmux);
323       g_value_set_string (value, splitmux->location);
324       GST_OBJECT_UNLOCK (splitmux);
325       break;
326     case PROP_MAX_SIZE_BYTES:
327       GST_OBJECT_LOCK (splitmux);
328       g_value_set_uint64 (value, splitmux->threshold_bytes);
329       GST_OBJECT_UNLOCK (splitmux);
330       break;
331     case PROP_MAX_SIZE_TIME:
332       GST_OBJECT_LOCK (splitmux);
333       g_value_set_uint64 (value, splitmux->threshold_time);
334       GST_OBJECT_UNLOCK (splitmux);
335       break;
336     case PROP_MUXER_OVERHEAD:
337       GST_OBJECT_LOCK (splitmux);
338       g_value_set_double (value, splitmux->mux_overhead);
339       GST_OBJECT_UNLOCK (splitmux);
340       break;
341     case PROP_SINK:
342       GST_SPLITMUX_LOCK (splitmux);
343       g_value_set_object (value, splitmux->provided_sink);
344       GST_SPLITMUX_UNLOCK (splitmux);
345       break;
346     case PROP_MUXER:
347       GST_SPLITMUX_LOCK (splitmux);
348       g_value_set_object (value, splitmux->provided_muxer);
349       GST_SPLITMUX_UNLOCK (splitmux);
350       break;
351     default:
352       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
353       break;
354   }
355 }
356
357 static GstPad *
358 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
359 {
360   gchar *tmp, *sinkname, *srcname;
361   GstPad *mq_src;
362
363   sinkname = gst_pad_get_name (sink_pad);
364   tmp = sinkname + 5;
365   srcname = g_strdup_printf ("src_%s", tmp);
366
367   mq_src = gst_element_get_static_pad (mq, srcname);
368
369   g_free (sinkname);
370   g_free (srcname);
371
372   return mq_src;
373 }
374
375 static gboolean
376 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
377     GstPad ** src_pad)
378 {
379   GstPad *mq_sink;
380   GstPad *mq_src;
381
382   /* Request a pad from multiqueue, then connect this one, then
383    * discover the corresponding output pad and return both */
384   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
385   if (mq_sink == NULL)
386     return FALSE;
387
388   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
389   if (mq_src == NULL)
390     goto fail;
391
392   *sink_pad = mq_sink;
393   *src_pad = mq_src;
394
395   return TRUE;
396
397 fail:
398   gst_element_release_request_pad (splitmux->mq, mq_sink);
399   return FALSE;
400 }
401
402 static MqStreamCtx *
403 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
404 {
405   MqStreamCtx *ctx;
406
407   ctx = g_new0 (MqStreamCtx, 1);
408   g_atomic_int_set (&ctx->refcount, 1);
409   ctx->splitmux = splitmux;
410   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
411   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
412   ctx->in_running_time = ctx->out_running_time = 0;
413   g_queue_init (&ctx->queued_bufs);
414   return ctx;
415 }
416
417 static void
418 mq_stream_ctx_free (MqStreamCtx * ctx)
419 {
420   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
421   g_queue_clear (&ctx->queued_bufs);
422   g_free (ctx);
423 }
424
425 static void
426 mq_stream_ctx_unref (MqStreamCtx * ctx)
427 {
428   if (g_atomic_int_dec_and_test (&ctx->refcount))
429     mq_stream_ctx_free (ctx);
430 }
431
432 static void
433 mq_stream_ctx_ref (MqStreamCtx * ctx)
434 {
435   g_atomic_int_inc (&ctx->refcount);
436 }
437
438 static void
439 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
440 {
441   ctx->sink_pad_block_id = 0;
442   mq_stream_ctx_unref (ctx);
443 }
444
445 static void
446 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
447 {
448   ctx->src_pad_block_id = 0;
449   mq_stream_ctx_unref (ctx);
450 }
451
452 /* Called with lock held, drops the lock to send EOS to the
453  * pad
454  */
455 static void
456 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
457 {
458   GstEvent *eos;
459   GstPad *pad;
460
461   eos = gst_event_new_eos ();
462   pad = gst_pad_get_peer (ctx->srcpad);
463
464   ctx->out_eos = TRUE;
465
466   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
467   GST_SPLITMUX_UNLOCK (splitmux);
468   gst_pad_send_event (pad, eos);
469   GST_SPLITMUX_LOCK (splitmux);
470
471   gst_object_unref (pad);
472 }
473
474 /* Called with splitmux lock held to check if this output
475  * context needs to sleep to wait for the release of the
476  * next GOP, or to send EOS to close out the current file
477  */
478 static void
479 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
480 {
481   do {
482
483     GST_LOG_OBJECT (ctx->srcpad,
484         "Checking running time %" GST_TIME_FORMAT " against max %"
485         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
486         GST_TIME_ARGS (splitmux->max_out_running_time));
487
488     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
489         ctx->out_running_time < splitmux->max_out_running_time)
490       return;
491
492     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
493       return;
494
495     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
496       if (ctx->out_eos == FALSE) {
497         send_eos (splitmux, ctx);
498         continue;
499       }
500     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
501       start_next_fragment (splitmux);
502       continue;
503     }
504
505     GST_INFO_OBJECT (ctx->srcpad,
506         "Sleeping for running time %"
507         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
508         GST_TIME_ARGS (ctx->out_running_time),
509         GST_TIME_ARGS (splitmux->max_out_running_time));
510     ctx->out_blocked = TRUE;
511     /* Expand the mq if needed before sleeping */
512     check_queue_length (splitmux, ctx);
513     GST_SPLITMUX_WAIT (splitmux);
514     ctx->out_blocked = FALSE;
515     GST_INFO_OBJECT (ctx->srcpad,
516         "Woken for new max running time %" GST_TIME_FORMAT,
517         GST_TIME_ARGS (splitmux->max_out_running_time));
518   } while (1);
519 }
520
521 static GstPadProbeReturn
522 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
523 {
524   GstSplitMuxSink *splitmux = ctx->splitmux;
525   MqStreamBuf *buf_info = NULL;
526
527   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
528
529   /* FIXME: Handle buffer lists, until then make it clear they won't work */
530   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
531     g_warning ("Buffer list handling not implemented");
532     return GST_PAD_PROBE_DROP;
533   }
534   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
535     GstEvent *event = gst_pad_probe_info_get_event (info);
536
537     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
538
539     switch (GST_EVENT_TYPE (event)) {
540       case GST_EVENT_SEGMENT:
541         gst_event_copy_segment (event, &ctx->out_segment);
542         break;
543       case GST_EVENT_FLUSH_STOP:
544         GST_SPLITMUX_LOCK (splitmux);
545         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
546         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
547         g_queue_clear (&ctx->queued_bufs);
548         ctx->flushing = FALSE;
549         GST_SPLITMUX_UNLOCK (splitmux);
550         break;
551       case GST_EVENT_FLUSH_START:
552         GST_SPLITMUX_LOCK (splitmux);
553         GST_LOG_OBJECT (pad, "Flush start");
554         ctx->flushing = TRUE;
555         GST_SPLITMUX_BROADCAST (splitmux);
556         GST_SPLITMUX_UNLOCK (splitmux);
557         break;
558       case GST_EVENT_EOS:
559         GST_SPLITMUX_LOCK (splitmux);
560         if (splitmux->state == SPLITMUX_STATE_STOPPED)
561           goto beach;
562         ctx->out_eos = TRUE;
563         GST_SPLITMUX_UNLOCK (splitmux);
564         break;
565       case GST_EVENT_GAP:{
566         GstClockTime gap_ts;
567
568         gst_event_parse_gap (event, &gap_ts, NULL);
569         if (gap_ts == GST_CLOCK_TIME_NONE)
570           break;
571
572         GST_SPLITMUX_LOCK (splitmux);
573
574         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
575             GST_FORMAT_TIME, gap_ts);
576
577         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
578             GST_TIME_ARGS (gap_ts));
579
580         if (splitmux->state == SPLITMUX_STATE_STOPPED)
581           goto beach;
582         ctx->out_running_time = gap_ts;
583         complete_or_wait_on_out (splitmux, ctx);
584         GST_SPLITMUX_UNLOCK (splitmux);
585         break;
586       }
587       default:
588         break;
589     }
590     return GST_PAD_PROBE_PASS;
591   }
592
593   /* Allow everything through until the configured next stopping point */
594   GST_SPLITMUX_LOCK (splitmux);
595
596   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
597   if (buf_info == NULL)
598     /* Can only happen due to a poorly timed flush */
599     goto beach;
600
601   /* If we have popped a keyframe, decrement the queued_gop count */
602   if (buf_info->keyframe && splitmux->queued_gops > 0)
603     splitmux->queued_gops--;
604
605   ctx->out_running_time = buf_info->run_ts;
606
607   GST_LOG_OBJECT (splitmux,
608       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
609       " size %" G_GSIZE_FORMAT,
610       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
611
612   complete_or_wait_on_out (splitmux, ctx);
613
614   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
615       splitmux->muxed_out_time < buf_info->run_ts)
616     splitmux->muxed_out_time = buf_info->run_ts;
617
618   splitmux->muxed_out_bytes += buf_info->buf_size;
619
620 #ifndef GST_DISABLE_GST_DEBUG
621   {
622     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
623     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
624         " run ts %" GST_TIME_FORMAT, buf,
625         GST_TIME_ARGS (ctx->out_running_time));
626   }
627 #endif
628
629   GST_SPLITMUX_UNLOCK (splitmux);
630
631   mq_stream_buf_free (buf_info);
632
633   return GST_PAD_PROBE_PASS;
634
635 beach:
636   GST_SPLITMUX_UNLOCK (splitmux);
637   return GST_PAD_PROBE_DROP;
638 }
639
640 static gboolean
641 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
642 {
643   return gst_pad_send_event (peer, gst_event_ref (*event));
644 }
645
646 static void
647 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
648 {
649   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
650
651   gst_pad_sticky_events_foreach (ctx->srcpad,
652       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
653
654   /* Clear EOS flag */
655   ctx->out_eos = FALSE;
656
657   gst_object_unref (peer);
658 }
659
660 /* Called with lock held when a fragment
661  * reaches EOS and it is time to restart
662  * a new fragment
663  */
664 static void
665 start_next_fragment (GstSplitMuxSink * splitmux)
666 {
667   /* 1 change to new file */
668   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
669   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
670
671   set_next_filename (splitmux);
672
673   gst_element_sync_state_with_parent (splitmux->active_sink);
674   gst_element_sync_state_with_parent (splitmux->muxer);
675
676   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
677
678   /* Switch state and go back to processing */
679   splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
680   if (!splitmux->video_ctx->in_eos)
681     splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
682   else
683     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
684
685   /* Store the overflow parameters as the basis for the next fragment */
686   splitmux->mux_start_time = splitmux->muxed_out_time;
687   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
688
689   GST_DEBUG_OBJECT (splitmux,
690       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
691       GST_TIME_ARGS (splitmux->max_out_running_time));
692
693   GST_SPLITMUX_BROADCAST (splitmux);
694 }
695
696 static void
697 bus_handler (GstBin * bin, GstMessage * message)
698 {
699   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
700
701   switch (GST_MESSAGE_TYPE (message)) {
702     case GST_MESSAGE_EOS:
703       /* If the state is draining out the current file, drop this EOS */
704       GST_SPLITMUX_LOCK (splitmux);
705       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
706           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
707         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
708         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
709         GST_SPLITMUX_BROADCAST (splitmux);
710
711         gst_message_unref (message);
712         GST_SPLITMUX_UNLOCK (splitmux);
713         return;
714       }
715       GST_SPLITMUX_UNLOCK (splitmux);
716       break;
717     default:
718       break;
719   }
720
721   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
722 }
723
724 /* Called with splitmux lock held */
725 /* Called when entering ProcessingCompleteGop state
726  * Assess if mq contents overflowed the current file
727  *   -> If yes, need to switch to new file
728  *   -> if no, set max_out_running_time to let this GOP in and
729  *      go to COLLECTING_GOP_START state
730  */
731 static void
732 handle_gathered_gop (GstSplitMuxSink * splitmux)
733 {
734   GList *cur;
735   gsize queued_bytes = 0;
736   GstClockTime queued_time = 0;
737
738   /* Assess if the multiqueue contents overflowed the current file */
739   for (cur = g_list_first (splitmux->contexts);
740       cur != NULL; cur = g_list_next (cur)) {
741     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
742     if (tmpctx->in_running_time > queued_time)
743       queued_time = tmpctx->in_running_time;
744     queued_bytes += tmpctx->in_bytes;
745   }
746
747   g_assert (queued_bytes >= splitmux->mux_start_bytes);
748   g_assert (queued_time >= splitmux->mux_start_time);
749
750   queued_bytes -= splitmux->mux_start_bytes;
751   queued_time -= splitmux->mux_start_time;
752
753   /* Expand queued bytes estimate by muxer overhead */
754   queued_bytes += (queued_bytes * splitmux->mux_overhead);
755
756   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
757       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
758
759   /* Check for overrun - have we output at least one byte and overrun
760    * either threshold? */
761   if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) &&
762       ((splitmux->threshold_bytes > 0 &&
763               queued_bytes >= splitmux->threshold_bytes) ||
764           (splitmux->threshold_time > 0 &&
765               queued_time >= splitmux->threshold_time))) {
766
767     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
768
769     GST_INFO_OBJECT (splitmux,
770         "mq overflowed since last, draining out. max out TS is %"
771         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
772     GST_SPLITMUX_BROADCAST (splitmux);
773
774   } else {
775     /* No overflow */
776     GST_LOG_OBJECT (splitmux,
777         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
778         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
779         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
780         queued_bytes, GST_TIME_ARGS (queued_time));
781
782     /* Wake everyone up to push this one GOP, then sleep */
783     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
784     if (!splitmux->video_ctx->in_eos)
785       splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
786     else
787       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
788
789     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
790         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
791     GST_SPLITMUX_BROADCAST (splitmux);
792   }
793
794 }
795
796 /* Called with splitmux lock held */
797 /* Called from each input pad when it is has all the pieces
798  * for a GOP or EOS, starting with the video pad which has set the
799  * splitmux->max_in_running_time
800  */
801 static void
802 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
803 {
804   GList *cur;
805   gboolean ready = TRUE;
806
807   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
808     /* Iterate each pad, and check that the input running time is at least
809      * up to the video runnning time, and if so handle the collected GOP */
810     GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
811     for (cur = g_list_first (splitmux->contexts);
812         cur != NULL; cur = g_list_next (cur)) {
813       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
814
815       GST_LOG_OBJECT (splitmux,
816           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
817           " EOS %d", tmpctx, tmpctx->srcpad,
818           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
819
820       if (tmpctx->in_running_time < splitmux->max_in_running_time &&
821           !tmpctx->in_eos) {
822         GST_LOG_OBJECT (splitmux,
823             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
824             tmpctx, tmpctx->srcpad);
825         ready = FALSE;
826         break;
827       }
828     }
829     if (ready) {
830       GST_DEBUG_OBJECT (splitmux,
831           "Collected GOP is complete. Processing (ctx %p)", ctx);
832       /* All pads have a complete GOP, release it into the multiqueue */
833       handle_gathered_gop (splitmux);
834     }
835   }
836
837   /* Some pad is not yet ready, or GOP is being pushed
838    * either way, sleep and wait to get woken */
839   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
840           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
841       !ctx->flushing) {
842
843     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
844         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
845         "GOP complete" : "EOF draining", ctx);
846     GST_SPLITMUX_WAIT (splitmux);
847
848     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
849   }
850 }
851
852 static void
853 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
854 {
855   GList *cur;
856   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
857
858   GST_DEBUG_OBJECT (ctx->sinkpad,
859       "Checking queue length len %u cur_max %u queued gops %u",
860       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
861
862   if (cur_len >= splitmux->mq_max_buffers) {
863     gboolean allow_grow = FALSE;
864
865     /* If collecting a GOP and this pad might block,
866      * and there isn't already a pending GOP in the queue
867      * then grow
868      */
869     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
870         ctx->in_running_time < splitmux->max_in_running_time &&
871         splitmux->queued_gops <= 1) {
872       allow_grow = TRUE;
873     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
874         ctx->is_video) {
875       allow_grow = TRUE;
876     }
877
878     if (!allow_grow) {
879       for (cur = g_list_first (splitmux->contexts);
880           cur != NULL; cur = g_list_next (cur)) {
881         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
882         GST_DEBUG_OBJECT (tmpctx->sinkpad,
883             " len %u out_blocked %d",
884             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
885         /* If another stream is starving, grow */
886         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
887           allow_grow = TRUE;
888         }
889       }
890     }
891
892     if (allow_grow) {
893       splitmux->mq_max_buffers = cur_len + 1;
894
895       GST_INFO_OBJECT (splitmux,
896           "Multiqueue overrun - enlarging to %u buffers ctx %p",
897           splitmux->mq_max_buffers, ctx);
898
899       g_object_set (splitmux->mq, "max-size-buffers",
900           splitmux->mq_max_buffers, NULL);
901     }
902   }
903 }
904
905 static GstPadProbeReturn
906 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
907 {
908   GstSplitMuxSink *splitmux = ctx->splitmux;
909   GstBuffer *buf;
910   MqStreamBuf *buf_info = NULL;
911   GstClockTime ts;
912   gboolean loop_again;
913   gboolean keyframe = FALSE;
914
915   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
916
917   /* FIXME: Handle buffer lists, until then make it clear they won't work */
918   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
919     g_warning ("Buffer list handling not implemented");
920     return GST_PAD_PROBE_DROP;
921   }
922   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
923     GstEvent *event = gst_pad_probe_info_get_event (info);
924     switch (GST_EVENT_TYPE (event)) {
925       case GST_EVENT_SEGMENT:
926         gst_event_copy_segment (event, &ctx->in_segment);
927         break;
928       case GST_EVENT_FLUSH_STOP:
929         GST_SPLITMUX_LOCK (splitmux);
930         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
931         ctx->in_eos = FALSE;
932         ctx->in_bytes = 0;
933         ctx->in_running_time = 0;
934         GST_SPLITMUX_UNLOCK (splitmux);
935         break;
936       case GST_EVENT_EOS:
937         GST_SPLITMUX_LOCK (splitmux);
938         ctx->in_eos = TRUE;
939
940         if (splitmux->state == SPLITMUX_STATE_STOPPED)
941           goto beach;
942
943         if (ctx->is_video) {
944           GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
945           /* Act as if this is a new keyframe with infinite timestamp */
946           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
947           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
948           /* Wake up other input pads to collect this GOP */
949           GST_SPLITMUX_BROADCAST (splitmux);
950           check_completed_gop (splitmux, ctx);
951         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
952           /* If we are waiting for a GOP to be completed (ie, for aux
953            * pads to catch up), then this pad is complete, so check
954            * if the whole GOP is.
955            */
956           check_completed_gop (splitmux, ctx);
957         }
958         GST_SPLITMUX_UNLOCK (splitmux);
959         break;
960       default:
961         break;
962     }
963     return GST_PAD_PROBE_PASS;
964   }
965
966   buf = gst_pad_probe_info_get_buffer (info);
967   ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
968       GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
969   buf_info = mq_stream_buf_new ();
970
971   if (GST_BUFFER_PTS_IS_VALID (buf))
972     ts = GST_BUFFER_PTS (buf);
973   else
974     ts = GST_BUFFER_DTS (buf);
975
976   GST_SPLITMUX_LOCK (splitmux);
977
978   if (splitmux->state == SPLITMUX_STATE_STOPPED)
979     goto beach;
980
981   /* If this buffer has a timestamp, advance the input timestamp of the
982    * stream */
983   if (GST_CLOCK_TIME_IS_VALID (ts)) {
984     GstClockTime running_time =
985         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
986         GST_BUFFER_TIMESTAMP (buf));
987
988     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
989         (ctx->in_running_time == GST_CLOCK_TIME_NONE
990             || running_time > ctx->in_running_time))
991       ctx->in_running_time = running_time;
992   }
993
994   /* Try to make sure we have a valid running time */
995   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
996     ctx->in_running_time =
997         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
998         ctx->in_segment.start);
999   }
1000
1001   buf_info->run_ts = ctx->in_running_time;
1002   buf_info->buf_size = gst_buffer_get_size (buf);
1003
1004   /* Update total input byte counter for overflow detect */
1005   ctx->in_bytes += buf_info->buf_size;
1006
1007   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1008       " total in_bytes %" G_GSIZE_FORMAT,
1009       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1010
1011   loop_again = TRUE;
1012   do {
1013     if (ctx->flushing)
1014       break;
1015
1016     switch (splitmux->state) {
1017       case SPLITMUX_STATE_COLLECTING_GOP_START:
1018         if (ctx->is_video) {
1019           /* If a keyframe, we have a complete GOP */
1020           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1021               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1022               splitmux->max_in_running_time >= ctx->in_running_time) {
1023             /* Pass this buffer through */
1024             loop_again = FALSE;
1025             break;
1026           }
1027           GST_INFO_OBJECT (pad,
1028               "Have keyframe with running time %" GST_TIME_FORMAT,
1029               GST_TIME_ARGS (ctx->in_running_time));
1030           keyframe = TRUE;
1031           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1032           splitmux->max_in_running_time = ctx->in_running_time;
1033           /* Wake up other input pads to collect this GOP */
1034           GST_SPLITMUX_BROADCAST (splitmux);
1035           check_completed_gop (splitmux, ctx);
1036         } else {
1037           /* We're still waiting for a keyframe on the video pad, sleep */
1038           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1039           GST_SPLITMUX_WAIT (splitmux);
1040           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1041               splitmux->state);
1042         }
1043         break;
1044       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1045         /* After a GOP start is found, this buffer might complete the GOP */
1046         /* If we overran the target timestamp, it might be time to process
1047          * the GOP, otherwise bail out for more data
1048          */
1049         GST_LOG_OBJECT (pad,
1050             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1051             GST_TIME_ARGS (ctx->in_running_time),
1052             GST_TIME_ARGS (splitmux->max_in_running_time));
1053
1054         if (ctx->in_running_time < splitmux->max_in_running_time) {
1055           loop_again = FALSE;
1056           break;
1057         }
1058
1059         GST_LOG_OBJECT (pad,
1060             "Collected last packet of GOP. Checking other pads");
1061         check_completed_gop (splitmux, ctx);
1062         break;
1063       case SPLITMUX_STATE_ENDING_FILE:
1064       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1065         /* A fragment is ending, wait until that's done before continuing */
1066         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1067         GST_SPLITMUX_WAIT (splitmux);
1068         GST_DEBUG_OBJECT (pad,
1069             "Done sleeping for fragment restart state now %d", splitmux->state);
1070         break;
1071       default:
1072         loop_again = FALSE;
1073         break;
1074     }
1075   } while (loop_again);
1076
1077   if (keyframe) {
1078     splitmux->queued_gops++;
1079     buf_info->keyframe = TRUE;
1080   }
1081
1082   /* Now add this buffer to the queue just before returning */
1083   g_queue_push_head (&ctx->queued_bufs, buf_info);
1084
1085   /* Check the buffer will fit in the mq */
1086   check_queue_length (splitmux, ctx);
1087
1088   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1089       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1090
1091   GST_SPLITMUX_UNLOCK (splitmux);
1092   return GST_PAD_PROBE_PASS;
1093
1094 beach:
1095   GST_SPLITMUX_UNLOCK (splitmux);
1096   if (buf_info)
1097     mq_stream_buf_free (buf_info);
1098   return GST_PAD_PROBE_PASS;
1099 }
1100
1101 static GstPad *
1102 gst_splitmux_sink_request_new_pad (GstElement * element,
1103     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1104 {
1105   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1106   GstPadTemplate *mux_template = NULL;
1107   GstPad *res = NULL;
1108   GstPad *mq_sink, *mq_src;
1109   gchar *gname;
1110   gboolean is_video = FALSE;
1111   MqStreamCtx *ctx;
1112
1113   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1114
1115   GST_SPLITMUX_LOCK (splitmux);
1116   if (!create_elements (splitmux))
1117     goto fail;
1118
1119   if (templ->name_template) {
1120     if (g_str_equal (templ->name_template, "video")) {
1121       /* FIXME: Look for a pad template with matching caps, rather than by name */
1122       mux_template =
1123           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1124           (splitmux->muxer), "video_%u");
1125       is_video = TRUE;
1126       name = NULL;
1127     } else {
1128       mux_template =
1129           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1130           (splitmux->muxer), templ->name_template);
1131     }
1132   }
1133
1134   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1135   if (res == NULL)
1136     goto fail;
1137
1138   if (is_video)
1139     gname = g_strdup ("video");
1140   else if (name == NULL)
1141     gname = gst_pad_get_name (res);
1142   else
1143     gname = g_strdup (name);
1144
1145   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1146     gst_element_release_request_pad (splitmux->muxer, res);
1147     gst_object_unref (GST_OBJECT (res));
1148     goto fail;
1149   }
1150
1151   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1152     gst_element_release_request_pad (splitmux->muxer, res);
1153     gst_object_unref (GST_OBJECT (res));
1154     gst_element_release_request_pad (splitmux->mq, mq_sink);
1155     gst_object_unref (GST_OBJECT (mq_sink));
1156     goto fail;
1157   }
1158
1159   gst_object_unref (GST_OBJECT (res));
1160
1161   ctx = mq_stream_ctx_new (splitmux);
1162   ctx->is_video = is_video;
1163   ctx->srcpad = mq_src;
1164   ctx->sinkpad = mq_sink;
1165
1166   mq_stream_ctx_ref (ctx);
1167   ctx->src_pad_block_id =
1168       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1169       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1170       _pad_block_destroy_src_notify);
1171   if (is_video)
1172     splitmux->video_ctx = ctx;
1173
1174   res = gst_ghost_pad_new (gname, mq_sink);
1175   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1176
1177   mq_stream_ctx_ref (ctx);
1178   ctx->sink_pad_block_id =
1179       gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1180       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1181       _pad_block_destroy_sink_notify);
1182
1183   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1184       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1185
1186   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1187
1188   g_free (gname);
1189
1190   gst_object_unref (mq_sink);
1191   gst_object_unref (mq_src);
1192
1193   gst_pad_set_active (res, TRUE);
1194   gst_element_add_pad (element, res);
1195   GST_SPLITMUX_UNLOCK (splitmux);
1196
1197   return res;
1198 fail:
1199   GST_SPLITMUX_UNLOCK (splitmux);
1200   return NULL;
1201 }
1202
1203 static void
1204 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1205 {
1206   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1207   GstPad *mqsink, *mqsrc, *muxpad;
1208   MqStreamCtx *ctx =
1209       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1210
1211   GST_SPLITMUX_LOCK (splitmux);
1212
1213   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1214     goto fail;                  /* Elements don't exist yet - nothing to release */
1215
1216   GST_INFO_OBJECT (pad, "releasing request pad");
1217
1218   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1219   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1220   muxpad = gst_pad_get_peer (mqsrc);
1221
1222   /* Remove the context from our consideration */
1223   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1224
1225   if (ctx->sink_pad_block_id)
1226     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1227
1228   if (ctx->src_pad_block_id)
1229     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1230
1231   /* Can release the context now */
1232   mq_stream_ctx_unref (ctx);
1233
1234   /* Release and free the mq input */
1235   gst_element_release_request_pad (splitmux->mq, mqsink);
1236
1237   /* Release and free the muxer input */
1238   gst_element_release_request_pad (splitmux->muxer, muxpad);
1239
1240   gst_object_unref (mqsink);
1241   gst_object_unref (mqsrc);
1242   gst_object_unref (muxpad);
1243
1244   gst_element_remove_pad (element, pad);
1245
1246 fail:
1247   GST_SPLITMUX_UNLOCK (splitmux);
1248 }
1249
1250 static GstElement *
1251 create_element (GstSplitMuxSink * splitmux,
1252     const gchar * factory, const gchar * name)
1253 {
1254   GstElement *ret = gst_element_factory_make (factory, name);
1255   if (ret == NULL) {
1256     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1257     return NULL;
1258   }
1259
1260   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1261     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1262     gst_object_unref (ret);
1263     return NULL;
1264   }
1265
1266   return ret;
1267 }
1268
1269 static gboolean
1270 create_elements (GstSplitMuxSink * splitmux)
1271 {
1272   /* Create internal elements */
1273   if (splitmux->mq == NULL) {
1274     if ((splitmux->mq =
1275             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1276       goto fail;
1277
1278     splitmux->mq_max_buffers = 5;
1279     /* No bytes or time limit, we limit buffers manually */
1280     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1281         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1282   }
1283
1284   if (splitmux->muxer == NULL) {
1285     if (splitmux->provided_muxer == NULL) {
1286       if ((splitmux->muxer =
1287               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1288         goto fail;
1289     } else {
1290       splitmux->muxer = splitmux->provided_muxer;
1291       if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) {
1292         g_warning ("Could not add muxer element - splitmuxsink will not work");
1293         goto fail;
1294       }
1295     }
1296   }
1297
1298   return TRUE;
1299 fail:
1300   return FALSE;
1301 }
1302
1303 static GstElement *
1304 find_sink (GstElement * e)
1305 {
1306   GstElement *res = NULL;
1307   GstIterator *iter;
1308   gboolean done = FALSE;
1309   GValue data = { 0, };
1310
1311   if (!GST_IS_BIN (e))
1312     return e;
1313
1314   iter = gst_bin_iterate_sinks (GST_BIN (e));
1315   while (!done) {
1316     switch (gst_iterator_next (iter, &data)) {
1317       case GST_ITERATOR_OK:
1318       {
1319         GstElement *child = g_value_get_object (&data);
1320         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1321                 "location") != NULL) {
1322           res = child;
1323           done = TRUE;
1324         }
1325         g_value_reset (&data);
1326         break;
1327       }
1328       case GST_ITERATOR_RESYNC:
1329         gst_iterator_resync (iter);
1330         break;
1331       case GST_ITERATOR_DONE:
1332         done = TRUE;
1333         break;
1334       case GST_ITERATOR_ERROR:
1335         g_assert_not_reached ();
1336         break;
1337     }
1338   }
1339   g_value_unset (&data);
1340   gst_iterator_free (iter);
1341
1342   return res;
1343 }
1344
1345 static gboolean
1346 create_sink (GstSplitMuxSink * splitmux)
1347 {
1348   g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
1349
1350   if (splitmux->provided_sink == NULL) {
1351     if ((splitmux->sink =
1352             create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1353       goto fail;
1354     splitmux->active_sink = splitmux->sink;
1355   } else {
1356     if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) {
1357       g_warning ("Could not add sink elements - splitmuxsink will not work");
1358       goto fail;
1359     }
1360
1361     splitmux->active_sink = splitmux->provided_sink;
1362
1363     /* Find the sink element */
1364     splitmux->sink = find_sink (splitmux->active_sink);
1365     if (splitmux->sink == NULL) {
1366       g_warning
1367           ("Could not locate sink element in provided sink - splitmuxsink will not work");
1368       goto fail;
1369     }
1370   }
1371
1372   if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1373     g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1374     goto fail;
1375   }
1376
1377   return TRUE;
1378 fail:
1379   return FALSE;
1380 }
1381
1382 #ifdef __GNUC__
1383 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1384 #endif
1385 static void
1386 set_next_filename (GstSplitMuxSink * splitmux)
1387 {
1388   if (splitmux->location) {
1389     gchar *fname;
1390
1391     fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
1392     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1393     g_object_set (splitmux->sink, "location", fname, NULL);
1394     g_free (fname);
1395
1396     splitmux->fragment_id++;
1397   }
1398 }
1399
1400 static GstStateChangeReturn
1401 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1402 {
1403   GstStateChangeReturn ret;
1404   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1405
1406   switch (transition) {
1407     case GST_STATE_CHANGE_NULL_TO_READY:{
1408       GST_SPLITMUX_LOCK (splitmux);
1409       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1410         ret = GST_STATE_CHANGE_FAILURE;
1411         GST_SPLITMUX_UNLOCK (splitmux);
1412         goto beach;
1413       }
1414       GST_SPLITMUX_UNLOCK (splitmux);
1415       splitmux->fragment_id = 0;
1416       set_next_filename (splitmux);
1417       break;
1418     }
1419     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1420       GST_SPLITMUX_LOCK (splitmux);
1421       /* Start by collecting one input on each pad */
1422       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1423       splitmux->max_in_running_time = 0;
1424       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1425       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1426       GST_SPLITMUX_UNLOCK (splitmux);
1427       break;
1428     }
1429     case GST_STATE_CHANGE_PAUSED_TO_READY:
1430     case GST_STATE_CHANGE_READY_TO_NULL:
1431       GST_SPLITMUX_LOCK (splitmux);
1432       splitmux->state = SPLITMUX_STATE_STOPPED;
1433       /* Wake up any blocked threads */
1434       GST_LOG_OBJECT (splitmux,
1435           "State change -> NULL or READY. Waking threads");
1436       GST_SPLITMUX_BROADCAST (splitmux);
1437       GST_SPLITMUX_UNLOCK (splitmux);
1438       break;
1439     default:
1440       break;
1441   }
1442
1443   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1444   if (ret == GST_STATE_CHANGE_FAILURE)
1445     goto beach;
1446
1447   switch (transition) {
1448     case GST_STATE_CHANGE_READY_TO_NULL:
1449       GST_SPLITMUX_LOCK (splitmux);
1450       splitmux->fragment_id = 0;
1451       gst_splitmux_reset (splitmux);
1452       GST_SPLITMUX_UNLOCK (splitmux);
1453       break;
1454     default:
1455       break;
1456   }
1457
1458 beach:
1459
1460   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1461       ret == GST_STATE_CHANGE_FAILURE) {
1462     /* Cleanup elements on failed transition out of NULL */
1463     gst_splitmux_reset (splitmux);
1464   }
1465   return ret;
1466 }
1467
1468 gboolean
1469 register_splitmuxsink (GstPlugin * plugin)
1470 {
1471   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1472       "Split File Muxing Sink");
1473
1474   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1475       GST_TYPE_SPLITMUX_SINK);
1476 }