Revert "splitmuxsink: Mask async-start/done while switching files."
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * is required, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * The splitting process is driven by the video stream contents, and
37  * the video stream must contain closed GOPs for the output file parts
38  * to be played individually correctly.
39  *
40  * <refsect2>
41  * <title>Example pipelines</title>
42  * |[
43  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
44  * ]|
45  * Records a video stream captured from a v4l2 device and muxes it into
46  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
47  * and 1MB maximum size.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <string.h>
56 #include "gstsplitmuxsink.h"
57
58 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
59 #define GST_CAT_DEFAULT splitmux_debug
60
61 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
62 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
63 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
64 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
65
66 enum
67 {
68   PROP_0,
69   PROP_LOCATION,
70   PROP_MAX_SIZE_TIME,
71   PROP_MAX_SIZE_BYTES,
72   PROP_MUXER_OVERHEAD,
73   PROP_MUXER,
74   PROP_SINK
75 };
76
77 #define DEFAULT_MAX_SIZE_TIME       0
78 #define DEFAULT_MAX_SIZE_BYTES      0
79 #define DEFAULT_MUXER_OVERHEAD      0.02
80 #define DEFAULT_MUXER "mp4mux"
81 #define DEFAULT_SINK "filesink"
82
83 static GstStaticPadTemplate video_sink_template =
84 GST_STATIC_PAD_TEMPLATE ("video",
85     GST_PAD_SINK,
86     GST_PAD_REQUEST,
87     GST_STATIC_CAPS_ANY);
88 static GstStaticPadTemplate audio_sink_template =
89 GST_STATIC_PAD_TEMPLATE ("audio_%u",
90     GST_PAD_SINK,
91     GST_PAD_REQUEST,
92     GST_STATIC_CAPS_ANY);
93 static GstStaticPadTemplate subtitle_sink_template =
94 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
95     GST_PAD_SINK,
96     GST_PAD_REQUEST,
97     GST_STATIC_CAPS_ANY);
98
99 static GQuark PAD_CONTEXT;
100
101 static void
102 _do_init (void)
103 {
104   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
105 }
106
107 #define gst_splitmux_sink_parent_class parent_class
108 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
109     _do_init ());
110
111 static gboolean create_elements (GstSplitMuxSink * splitmux);
112 static gboolean create_sink (GstSplitMuxSink * splitmux);
113 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
114     const GValue * value, GParamSpec * pspec);
115 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
116     GValue * value, GParamSpec * pspec);
117 static void gst_splitmux_sink_dispose (GObject * object);
118 static void gst_splitmux_sink_finalize (GObject * object);
119
120 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
121     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
122 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
123
124 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
125     element, GstStateChange transition);
126
127 static void bus_handler (GstBin * bin, GstMessage * msg);
128 static void set_next_filename (GstSplitMuxSink * splitmux);
129 static void start_next_fragment (GstSplitMuxSink * splitmux);
130 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
131 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
132
133 static MqStreamBuf *
134 mq_stream_buf_new (void)
135 {
136   return g_slice_new0 (MqStreamBuf);
137 }
138
139 static void
140 mq_stream_buf_free (MqStreamBuf * data)
141 {
142   g_slice_free (MqStreamBuf, data);
143 }
144
145 static void
146 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
147 {
148   GObjectClass *gobject_class = (GObjectClass *) klass;
149   GstElementClass *gstelement_class = (GstElementClass *) klass;
150   GstBinClass *gstbin_class = (GstBinClass *) klass;
151
152   gobject_class->set_property = gst_splitmux_sink_set_property;
153   gobject_class->get_property = gst_splitmux_sink_get_property;
154   gobject_class->dispose = gst_splitmux_sink_dispose;
155   gobject_class->finalize = gst_splitmux_sink_finalize;
156
157   gst_element_class_set_static_metadata (gstelement_class,
158       "Split Muxing Bin", "Generic/Bin/Muxer",
159       "Convenience bin that muxes incoming streams into multiple time/size limited files",
160       "Jan Schmidt <jan@centricular.com>");
161
162   gst_element_class_add_pad_template (gstelement_class,
163       gst_static_pad_template_get (&video_sink_template));
164   gst_element_class_add_pad_template (gstelement_class,
165       gst_static_pad_template_get (&audio_sink_template));
166   gst_element_class_add_pad_template (gstelement_class,
167       gst_static_pad_template_get (&subtitle_sink_template));
168
169   gstelement_class->change_state =
170       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
171   gstelement_class->request_new_pad =
172       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
173   gstelement_class->release_pad =
174       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
175
176   gstbin_class->handle_message = bus_handler;
177
178   g_object_class_install_property (gobject_class, PROP_LOCATION,
179       g_param_spec_string ("location", "File Output Pattern",
180           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
181           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
183       g_param_spec_double ("mux-overhead", "Muxing Overhead",
184           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
185           DEFAULT_MUXER_OVERHEAD,
186           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
187
188   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
189       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
190           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
191           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
193       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
194           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
195           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   g_object_class_install_property (gobject_class, PROP_MUXER,
198       g_param_spec_object ("muxer", "Muxer",
199           "The muxer element to use (NULL = default mp4mux)",
200           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201   g_object_class_install_property (gobject_class, PROP_SINK,
202       g_param_spec_object ("sink", "Sink",
203           "The sink element (or element chain) to use (NULL = default filesink)",
204           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
205 }
206
207 static void
208 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
209 {
210   g_mutex_init (&splitmux->lock);
211   g_cond_init (&splitmux->data_cond);
212
213   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
214   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
215   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
216
217   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
218 }
219
220 static void
221 gst_splitmux_reset (GstSplitMuxSink * splitmux)
222 {
223   if (splitmux->mq)
224     gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
225   if (splitmux->muxer)
226     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
227   if (splitmux->active_sink)
228     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
229
230   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
231       NULL;
232 }
233
234 static void
235 gst_splitmux_sink_dispose (GObject * object)
236 {
237   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
238
239   G_OBJECT_CLASS (parent_class)->dispose (object);
240
241   /* Calling parent dispose invalidates all child pointers */
242   splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
243       NULL;
244 }
245
246 static void
247 gst_splitmux_sink_finalize (GObject * object)
248 {
249   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
250   g_cond_clear (&splitmux->data_cond);
251   g_mutex_clear (&splitmux->lock);
252   if (splitmux->provided_sink)
253     gst_object_unref (splitmux->provided_sink);
254   if (splitmux->provided_muxer)
255     gst_object_unref (splitmux->provided_muxer);
256
257   g_free (splitmux->location);
258
259   /* Make sure to free any un-released contexts */
260   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
261   g_list_free (splitmux->contexts);
262
263   G_OBJECT_CLASS (parent_class)->finalize (object);
264 }
265
266 static void
267 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
268     const GValue * value, GParamSpec * pspec)
269 {
270   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
271
272   switch (prop_id) {
273     case PROP_LOCATION:{
274       GST_OBJECT_LOCK (splitmux);
275       g_free (splitmux->location);
276       splitmux->location = g_value_dup_string (value);
277       GST_OBJECT_UNLOCK (splitmux);
278       break;
279     }
280     case PROP_MAX_SIZE_BYTES:
281       GST_OBJECT_LOCK (splitmux);
282       splitmux->threshold_bytes = g_value_get_uint64 (value);
283       GST_OBJECT_UNLOCK (splitmux);
284       break;
285     case PROP_MAX_SIZE_TIME:
286       GST_OBJECT_LOCK (splitmux);
287       splitmux->threshold_time = g_value_get_uint64 (value);
288       GST_OBJECT_UNLOCK (splitmux);
289       break;
290     case PROP_MUXER_OVERHEAD:
291       GST_OBJECT_LOCK (splitmux);
292       splitmux->mux_overhead = g_value_get_double (value);
293       GST_OBJECT_UNLOCK (splitmux);
294       break;
295     case PROP_SINK:
296       GST_OBJECT_LOCK (splitmux);
297       if (splitmux->provided_sink)
298         gst_object_unref (splitmux->provided_sink);
299       splitmux->provided_sink = g_value_dup_object (value);
300       GST_OBJECT_UNLOCK (splitmux);
301       break;
302     case PROP_MUXER:
303       GST_OBJECT_LOCK (splitmux);
304       if (splitmux->provided_muxer)
305         gst_object_unref (splitmux->provided_muxer);
306       splitmux->provided_muxer = g_value_dup_object (value);
307       GST_OBJECT_UNLOCK (splitmux);
308       break;
309     default:
310       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
311       break;
312   }
313 }
314
315 static void
316 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
317     GValue * value, GParamSpec * pspec)
318 {
319   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
320
321   switch (prop_id) {
322     case PROP_LOCATION:
323       GST_OBJECT_LOCK (splitmux);
324       g_value_set_string (value, splitmux->location);
325       GST_OBJECT_UNLOCK (splitmux);
326       break;
327     case PROP_MAX_SIZE_BYTES:
328       GST_OBJECT_LOCK (splitmux);
329       g_value_set_uint64 (value, splitmux->threshold_bytes);
330       GST_OBJECT_UNLOCK (splitmux);
331       break;
332     case PROP_MAX_SIZE_TIME:
333       GST_OBJECT_LOCK (splitmux);
334       g_value_set_uint64 (value, splitmux->threshold_time);
335       GST_OBJECT_UNLOCK (splitmux);
336       break;
337     case PROP_MUXER_OVERHEAD:
338       GST_OBJECT_LOCK (splitmux);
339       g_value_set_double (value, splitmux->mux_overhead);
340       GST_OBJECT_UNLOCK (splitmux);
341       break;
342     case PROP_SINK:
343       GST_OBJECT_LOCK (splitmux);
344       g_value_set_object (value, splitmux->provided_sink);
345       GST_OBJECT_UNLOCK (splitmux);
346       break;
347     case PROP_MUXER:
348       GST_OBJECT_LOCK (splitmux);
349       g_value_set_object (value, splitmux->provided_muxer);
350       GST_OBJECT_UNLOCK (splitmux);
351       break;
352     default:
353       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
354       break;
355   }
356 }
357
358 static GstPad *
359 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
360 {
361   gchar *tmp, *sinkname, *srcname;
362   GstPad *mq_src;
363
364   sinkname = gst_pad_get_name (sink_pad);
365   tmp = sinkname + 5;
366   srcname = g_strdup_printf ("src_%s", tmp);
367
368   mq_src = gst_element_get_static_pad (mq, srcname);
369
370   g_free (sinkname);
371   g_free (srcname);
372
373   return mq_src;
374 }
375
376 static gboolean
377 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
378     GstPad ** src_pad)
379 {
380   GstPad *mq_sink;
381   GstPad *mq_src;
382
383   /* Request a pad from multiqueue, then connect this one, then
384    * discover the corresponding output pad and return both */
385   mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
386   if (mq_sink == NULL)
387     return FALSE;
388
389   mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
390   if (mq_src == NULL)
391     goto fail;
392
393   *sink_pad = mq_sink;
394   *src_pad = mq_src;
395
396   return TRUE;
397
398 fail:
399   gst_element_release_request_pad (splitmux->mq, mq_sink);
400   return FALSE;
401 }
402
403 static MqStreamCtx *
404 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
405 {
406   MqStreamCtx *ctx;
407
408   ctx = g_new0 (MqStreamCtx, 1);
409   g_atomic_int_set (&ctx->refcount, 1);
410   ctx->splitmux = splitmux;
411   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
412   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
413   ctx->in_running_time = ctx->out_running_time = 0;
414   g_queue_init (&ctx->queued_bufs);
415   return ctx;
416 }
417
418 static void
419 mq_stream_ctx_free (MqStreamCtx * ctx)
420 {
421   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
422   g_queue_clear (&ctx->queued_bufs);
423   g_free (ctx);
424 }
425
426 static void
427 mq_stream_ctx_unref (MqStreamCtx * ctx)
428 {
429   if (g_atomic_int_dec_and_test (&ctx->refcount))
430     mq_stream_ctx_free (ctx);
431 }
432
433 static void
434 mq_stream_ctx_ref (MqStreamCtx * ctx)
435 {
436   g_atomic_int_inc (&ctx->refcount);
437 }
438
439 static void
440 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
441 {
442   ctx->sink_pad_block_id = 0;
443   mq_stream_ctx_unref (ctx);
444 }
445
446 static void
447 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
448 {
449   ctx->src_pad_block_id = 0;
450   mq_stream_ctx_unref (ctx);
451 }
452
453 /* Called with lock held, drops the lock to send EOS to the
454  * pad
455  */
456 static void
457 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
458 {
459   GstEvent *eos;
460   GstPad *pad;
461
462   eos = gst_event_new_eos ();
463   pad = gst_pad_get_peer (ctx->srcpad);
464
465   ctx->out_eos = TRUE;
466
467   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
468   GST_SPLITMUX_UNLOCK (splitmux);
469   gst_pad_send_event (pad, eos);
470   GST_SPLITMUX_LOCK (splitmux);
471
472   gst_object_unref (pad);
473 }
474
475 /* Called with splitmux lock held to check if this output
476  * context needs to sleep to wait for the release of the
477  * next GOP, or to send EOS to close out the current file
478  */
479 static void
480 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
481 {
482   do {
483
484     GST_LOG_OBJECT (ctx->srcpad,
485         "Checking running time %" GST_TIME_FORMAT " against max %"
486         GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
487         GST_TIME_ARGS (splitmux->max_out_running_time));
488
489     if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
490         ctx->out_running_time < splitmux->max_out_running_time) {
491       splitmux->have_muxed_something = TRUE;
492       return;
493     }
494
495     if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
496       return;
497
498     if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
499       if (ctx->out_eos == FALSE) {
500         send_eos (splitmux, ctx);
501         continue;
502       }
503     } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
504       start_next_fragment (splitmux);
505       continue;
506     }
507
508     GST_INFO_OBJECT (ctx->srcpad,
509         "Sleeping for running time %"
510         GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
511         GST_TIME_ARGS (ctx->out_running_time),
512         GST_TIME_ARGS (splitmux->max_out_running_time));
513     ctx->out_blocked = TRUE;
514     /* Expand the mq if needed before sleeping */
515     check_queue_length (splitmux, ctx);
516     GST_SPLITMUX_WAIT (splitmux);
517     ctx->out_blocked = FALSE;
518     GST_INFO_OBJECT (ctx->srcpad,
519         "Woken for new max running time %" GST_TIME_FORMAT,
520         GST_TIME_ARGS (splitmux->max_out_running_time));
521   } while (1);
522 }
523
524 static GstPadProbeReturn
525 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
526 {
527   GstSplitMuxSink *splitmux = ctx->splitmux;
528   MqStreamBuf *buf_info = NULL;
529
530   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
531
532   /* FIXME: Handle buffer lists, until then make it clear they won't work */
533   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
534     g_warning ("Buffer list handling not implemented");
535     return GST_PAD_PROBE_DROP;
536   }
537   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
538     GstEvent *event = gst_pad_probe_info_get_event (info);
539
540     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
541
542     switch (GST_EVENT_TYPE (event)) {
543       case GST_EVENT_SEGMENT:
544         gst_event_copy_segment (event, &ctx->out_segment);
545         break;
546       case GST_EVENT_FLUSH_STOP:
547         GST_SPLITMUX_LOCK (splitmux);
548         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
549         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
550         g_queue_clear (&ctx->queued_bufs);
551         ctx->flushing = FALSE;
552         GST_SPLITMUX_UNLOCK (splitmux);
553         break;
554       case GST_EVENT_FLUSH_START:
555         GST_SPLITMUX_LOCK (splitmux);
556         GST_LOG_OBJECT (pad, "Flush start");
557         ctx->flushing = TRUE;
558         GST_SPLITMUX_BROADCAST (splitmux);
559         GST_SPLITMUX_UNLOCK (splitmux);
560         break;
561       case GST_EVENT_EOS:
562         GST_SPLITMUX_LOCK (splitmux);
563         if (splitmux->state == SPLITMUX_STATE_STOPPED)
564           goto beach;
565         ctx->out_eos = TRUE;
566         GST_SPLITMUX_UNLOCK (splitmux);
567         break;
568       case GST_EVENT_GAP:{
569         GstClockTime gap_ts;
570
571         gst_event_parse_gap (event, &gap_ts, NULL);
572         if (gap_ts == GST_CLOCK_TIME_NONE)
573           break;
574
575         GST_SPLITMUX_LOCK (splitmux);
576
577         gap_ts = gst_segment_to_running_time (&ctx->out_segment,
578             GST_FORMAT_TIME, gap_ts);
579
580         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
581             GST_TIME_ARGS (gap_ts));
582
583         if (splitmux->state == SPLITMUX_STATE_STOPPED)
584           goto beach;
585         ctx->out_running_time = gap_ts;
586         complete_or_wait_on_out (splitmux, ctx);
587         GST_SPLITMUX_UNLOCK (splitmux);
588         break;
589       }
590       default:
591         break;
592     }
593     return GST_PAD_PROBE_PASS;
594   }
595
596   /* Allow everything through until the configured next stopping point */
597   GST_SPLITMUX_LOCK (splitmux);
598
599   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
600   if (buf_info == NULL)
601     /* Can only happen due to a poorly timed flush */
602     goto beach;
603
604   /* If we have popped a keyframe, decrement the queued_gop count */
605   if (buf_info->keyframe && splitmux->queued_gops > 0)
606     splitmux->queued_gops--;
607
608   ctx->out_running_time = buf_info->run_ts;
609
610   GST_LOG_OBJECT (splitmux,
611       "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
612       " size %" G_GSIZE_FORMAT,
613       pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
614
615   complete_or_wait_on_out (splitmux, ctx);
616
617   if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
618       splitmux->muxed_out_time < buf_info->run_ts)
619     splitmux->muxed_out_time = buf_info->run_ts;
620
621   splitmux->muxed_out_bytes += buf_info->buf_size;
622
623 #ifndef GST_DISABLE_GST_DEBUG
624   {
625     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
626     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
627         " run ts %" GST_TIME_FORMAT, buf,
628         GST_TIME_ARGS (ctx->out_running_time));
629   }
630 #endif
631
632   GST_SPLITMUX_UNLOCK (splitmux);
633
634   mq_stream_buf_free (buf_info);
635
636   return GST_PAD_PROBE_PASS;
637
638 beach:
639   GST_SPLITMUX_UNLOCK (splitmux);
640   return GST_PAD_PROBE_DROP;
641 }
642
643 static gboolean
644 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
645 {
646   return gst_pad_send_event (peer, gst_event_ref (*event));
647 }
648
649 static void
650 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
651 {
652   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
653
654   gst_pad_sticky_events_foreach (ctx->srcpad,
655       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
656
657   /* Clear EOS flag */
658   ctx->out_eos = FALSE;
659
660   gst_object_unref (peer);
661 }
662
663 /* Called with lock held when a fragment
664  * reaches EOS and it is time to restart
665  * a new fragment
666  */
667 static void
668 start_next_fragment (GstSplitMuxSink * splitmux)
669 {
670   /* 1 change to new file */
671   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
672   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
673
674   set_next_filename (splitmux);
675
676   gst_element_sync_state_with_parent (splitmux->active_sink);
677   gst_element_sync_state_with_parent (splitmux->muxer);
678
679   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
680
681   /* Switch state and go back to processing */
682   splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
683
684   if (!splitmux->video_ctx->in_eos) {
685     splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
686   } else {
687     splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
688     splitmux->have_muxed_something = FALSE;
689   }
690   splitmux->have_muxed_something =
691       (splitmux->video_ctx->in_running_time > splitmux->muxed_out_time);
692
693   /* Store the overflow parameters as the basis for the next fragment */
694   splitmux->mux_start_time = splitmux->muxed_out_time;
695   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
696
697   GST_DEBUG_OBJECT (splitmux,
698       "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
699       GST_TIME_ARGS (splitmux->max_out_running_time));
700
701   GST_SPLITMUX_BROADCAST (splitmux);
702 }
703
704 static void
705 bus_handler (GstBin * bin, GstMessage * message)
706 {
707   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
708
709   switch (GST_MESSAGE_TYPE (message)) {
710     case GST_MESSAGE_EOS:
711       /* If the state is draining out the current file, drop this EOS */
712       GST_SPLITMUX_LOCK (splitmux);
713       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
714           splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
715         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
716         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
717         GST_SPLITMUX_BROADCAST (splitmux);
718
719         gst_message_unref (message);
720         GST_SPLITMUX_UNLOCK (splitmux);
721         return;
722       }
723       GST_SPLITMUX_UNLOCK (splitmux);
724       break;
725     default:
726       break;
727   }
728
729   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
730 }
731
732 /* Called with splitmux lock held */
733 /* Called when entering ProcessingCompleteGop state
734  * Assess if mq contents overflowed the current file
735  *   -> If yes, need to switch to new file
736  *   -> if no, set max_out_running_time to let this GOP in and
737  *      go to COLLECTING_GOP_START state
738  */
739 static void
740 handle_gathered_gop (GstSplitMuxSink * splitmux)
741 {
742   GList *cur;
743   gsize queued_bytes = 0;
744   GstClockTime queued_time = 0;
745
746   /* Assess if the multiqueue contents overflowed the current file */
747   for (cur = g_list_first (splitmux->contexts);
748       cur != NULL; cur = g_list_next (cur)) {
749     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
750     if (tmpctx->in_running_time > queued_time)
751       queued_time = tmpctx->in_running_time;
752     queued_bytes += tmpctx->in_bytes;
753   }
754
755   g_assert (queued_bytes >= splitmux->mux_start_bytes);
756   g_assert (queued_time >= splitmux->mux_start_time);
757
758   queued_bytes -= splitmux->mux_start_bytes;
759   queued_time -= splitmux->mux_start_time;
760
761   /* Expand queued bytes estimate by muxer overhead */
762   queued_bytes += (queued_bytes * splitmux->mux_overhead);
763
764   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
765       " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
766
767   /* Check for overrun - have we output at least one byte and overrun
768    * either threshold? */
769   if (splitmux->have_muxed_something &&
770       ((splitmux->threshold_bytes > 0 &&
771               queued_bytes >= splitmux->threshold_bytes) ||
772           (splitmux->threshold_time > 0 &&
773               queued_time >= splitmux->threshold_time))) {
774
775     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
776
777     GST_INFO_OBJECT (splitmux,
778         "mq overflowed since last, draining out. max out TS is %"
779         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
780     GST_SPLITMUX_BROADCAST (splitmux);
781
782   } else {
783     /* No overflow */
784     GST_LOG_OBJECT (splitmux,
785         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
786         " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
787         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
788         queued_bytes, GST_TIME_ARGS (queued_time));
789
790     /* Wake everyone up to push this one GOP, then sleep */
791     splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
792     splitmux->have_muxed_something = TRUE;
793
794     if (!splitmux->video_ctx->in_eos)
795       splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
796     else
797       splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
798
799     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
800         GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
801     GST_SPLITMUX_BROADCAST (splitmux);
802   }
803
804 }
805
806 /* Called with splitmux lock held */
807 /* Called from each input pad when it is has all the pieces
808  * for a GOP or EOS, starting with the video pad which has set the
809  * splitmux->max_in_running_time
810  */
811 static void
812 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
813 {
814   GList *cur;
815   gboolean ready = TRUE;
816
817   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
818     /* Iterate each pad, and check that the input running time is at least
819      * up to the video runnning time, and if so handle the collected GOP */
820     GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
821     for (cur = g_list_first (splitmux->contexts);
822         cur != NULL; cur = g_list_next (cur)) {
823       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
824
825       GST_LOG_OBJECT (splitmux,
826           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
827           " EOS %d", tmpctx, tmpctx->srcpad,
828           GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
829
830       if (tmpctx->in_running_time < splitmux->max_in_running_time &&
831           !tmpctx->in_eos) {
832         GST_LOG_OBJECT (splitmux,
833             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
834             tmpctx, tmpctx->srcpad);
835         ready = FALSE;
836         break;
837       }
838     }
839     if (ready) {
840       GST_DEBUG_OBJECT (splitmux,
841           "Collected GOP is complete. Processing (ctx %p)", ctx);
842       /* All pads have a complete GOP, release it into the multiqueue */
843       handle_gathered_gop (splitmux);
844     }
845   }
846
847   /* Some pad is not yet ready, or GOP is being pushed
848    * either way, sleep and wait to get woken */
849   while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
850           splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
851       !ctx->flushing) {
852
853     GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
854         splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
855         "GOP complete" : "EOF draining", ctx);
856     GST_SPLITMUX_WAIT (splitmux);
857
858     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
859   }
860 }
861
862 static void
863 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
864 {
865   GList *cur;
866   guint cur_len = g_queue_get_length (&ctx->queued_bufs);
867
868   GST_DEBUG_OBJECT (ctx->sinkpad,
869       "Checking queue length len %u cur_max %u queued gops %u",
870       cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
871
872   if (cur_len >= splitmux->mq_max_buffers) {
873     gboolean allow_grow = FALSE;
874
875     /* If collecting a GOP and this pad might block,
876      * and there isn't already a pending GOP in the queue
877      * then grow
878      */
879     if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
880         ctx->in_running_time < splitmux->max_in_running_time &&
881         splitmux->queued_gops <= 1) {
882       allow_grow = TRUE;
883     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
884         ctx->is_video && splitmux->queued_gops <= 1) {
885       allow_grow = TRUE;
886     }
887
888     if (!allow_grow) {
889       for (cur = g_list_first (splitmux->contexts);
890           cur != NULL; cur = g_list_next (cur)) {
891         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
892         GST_DEBUG_OBJECT (tmpctx->sinkpad,
893             " len %u out_blocked %d",
894             g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
895         /* If another stream is starving, grow */
896         if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
897           allow_grow = TRUE;
898         }
899       }
900     }
901
902     if (allow_grow) {
903       splitmux->mq_max_buffers = cur_len + 1;
904
905       GST_INFO_OBJECT (splitmux,
906           "Multiqueue overrun - enlarging to %u buffers ctx %p",
907           splitmux->mq_max_buffers, ctx);
908
909       g_object_set (splitmux->mq, "max-size-buffers",
910           splitmux->mq_max_buffers, NULL);
911     }
912   }
913 }
914
915 static GstPadProbeReturn
916 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
917 {
918   GstSplitMuxSink *splitmux = ctx->splitmux;
919   GstBuffer *buf;
920   MqStreamBuf *buf_info = NULL;
921   GstClockTime ts;
922   gboolean loop_again;
923   gboolean keyframe = FALSE;
924
925   GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
926
927   /* FIXME: Handle buffer lists, until then make it clear they won't work */
928   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
929     g_warning ("Buffer list handling not implemented");
930     return GST_PAD_PROBE_DROP;
931   }
932   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
933     GstEvent *event = gst_pad_probe_info_get_event (info);
934     switch (GST_EVENT_TYPE (event)) {
935       case GST_EVENT_SEGMENT:
936         gst_event_copy_segment (event, &ctx->in_segment);
937         break;
938       case GST_EVENT_FLUSH_STOP:
939         GST_SPLITMUX_LOCK (splitmux);
940         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
941         ctx->in_eos = FALSE;
942         ctx->in_bytes = 0;
943         ctx->in_running_time = 0;
944         GST_SPLITMUX_UNLOCK (splitmux);
945         break;
946       case GST_EVENT_EOS:
947         GST_SPLITMUX_LOCK (splitmux);
948         ctx->in_eos = TRUE;
949
950         if (splitmux->state == SPLITMUX_STATE_STOPPED)
951           goto beach;
952
953         if (ctx->is_video) {
954           GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
955           /* Act as if this is a new keyframe with infinite timestamp */
956           splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
957           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
958           /* Wake up other input pads to collect this GOP */
959           GST_SPLITMUX_BROADCAST (splitmux);
960           check_completed_gop (splitmux, ctx);
961         } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
962           /* If we are waiting for a GOP to be completed (ie, for aux
963            * pads to catch up), then this pad is complete, so check
964            * if the whole GOP is.
965            */
966           check_completed_gop (splitmux, ctx);
967         }
968         GST_SPLITMUX_UNLOCK (splitmux);
969         break;
970       default:
971         break;
972     }
973     return GST_PAD_PROBE_PASS;
974   }
975
976   buf = gst_pad_probe_info_get_buffer (info);
977   ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
978       GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
979   buf_info = mq_stream_buf_new ();
980
981   if (GST_BUFFER_PTS_IS_VALID (buf))
982     ts = GST_BUFFER_PTS (buf);
983   else
984     ts = GST_BUFFER_DTS (buf);
985
986   GST_SPLITMUX_LOCK (splitmux);
987
988   if (splitmux->state == SPLITMUX_STATE_STOPPED)
989     goto beach;
990
991   /* If this buffer has a timestamp, advance the input timestamp of the
992    * stream */
993   if (GST_CLOCK_TIME_IS_VALID (ts)) {
994     GstClockTime running_time =
995         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
996         GST_BUFFER_TIMESTAMP (buf));
997
998     if (GST_CLOCK_TIME_IS_VALID (running_time) &&
999         (ctx->in_running_time == GST_CLOCK_TIME_NONE
1000             || running_time > ctx->in_running_time))
1001       ctx->in_running_time = running_time;
1002   }
1003
1004   /* Try to make sure we have a valid running time */
1005   if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1006     ctx->in_running_time =
1007         gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1008         ctx->in_segment.start);
1009   }
1010
1011   buf_info->run_ts = ctx->in_running_time;
1012   buf_info->buf_size = gst_buffer_get_size (buf);
1013
1014   /* Update total input byte counter for overflow detect */
1015   ctx->in_bytes += buf_info->buf_size;
1016
1017   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1018       " total in_bytes %" G_GSIZE_FORMAT,
1019       GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1020
1021   loop_again = TRUE;
1022   do {
1023     if (ctx->flushing)
1024       break;
1025
1026     switch (splitmux->state) {
1027       case SPLITMUX_STATE_COLLECTING_GOP_START:
1028         if (ctx->is_video) {
1029           /* If a keyframe, we have a complete GOP */
1030           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1031               !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1032               splitmux->max_in_running_time >= ctx->in_running_time) {
1033             /* Pass this buffer through */
1034             loop_again = FALSE;
1035             break;
1036           }
1037           GST_INFO_OBJECT (pad,
1038               "Have keyframe with running time %" GST_TIME_FORMAT,
1039               GST_TIME_ARGS (ctx->in_running_time));
1040           keyframe = TRUE;
1041           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1042           splitmux->max_in_running_time = ctx->in_running_time;
1043           /* Wake up other input pads to collect this GOP */
1044           GST_SPLITMUX_BROADCAST (splitmux);
1045           check_completed_gop (splitmux, ctx);
1046         } else {
1047           /* We're still waiting for a keyframe on the video pad, sleep */
1048           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1049           GST_SPLITMUX_WAIT (splitmux);
1050           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1051               splitmux->state);
1052         }
1053         break;
1054       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1055         /* After a GOP start is found, this buffer might complete the GOP */
1056         /* If we overran the target timestamp, it might be time to process
1057          * the GOP, otherwise bail out for more data
1058          */
1059         GST_LOG_OBJECT (pad,
1060             "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1061             GST_TIME_ARGS (ctx->in_running_time),
1062             GST_TIME_ARGS (splitmux->max_in_running_time));
1063
1064         if (ctx->in_running_time < splitmux->max_in_running_time) {
1065           loop_again = FALSE;
1066           break;
1067         }
1068
1069         GST_LOG_OBJECT (pad,
1070             "Collected last packet of GOP. Checking other pads");
1071         check_completed_gop (splitmux, ctx);
1072         break;
1073       case SPLITMUX_STATE_ENDING_FILE:
1074       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1075         /* A fragment is ending, wait until that's done before continuing */
1076         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1077         GST_SPLITMUX_WAIT (splitmux);
1078         GST_DEBUG_OBJECT (pad,
1079             "Done sleeping for fragment restart state now %d", splitmux->state);
1080         break;
1081       default:
1082         loop_again = FALSE;
1083         break;
1084     }
1085   } while (loop_again);
1086
1087   if (keyframe) {
1088     splitmux->queued_gops++;
1089     buf_info->keyframe = TRUE;
1090   }
1091
1092   /* Now add this buffer to the queue just before returning */
1093   g_queue_push_head (&ctx->queued_bufs, buf_info);
1094
1095   /* Check the buffer will fit in the mq */
1096   check_queue_length (splitmux, ctx);
1097
1098   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1099       " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1100
1101   GST_SPLITMUX_UNLOCK (splitmux);
1102   return GST_PAD_PROBE_PASS;
1103
1104 beach:
1105   GST_SPLITMUX_UNLOCK (splitmux);
1106   if (buf_info)
1107     mq_stream_buf_free (buf_info);
1108   return GST_PAD_PROBE_PASS;
1109 }
1110
1111 static GstPad *
1112 gst_splitmux_sink_request_new_pad (GstElement * element,
1113     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1114 {
1115   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1116   GstPadTemplate *mux_template = NULL;
1117   GstPad *res = NULL;
1118   GstPad *mq_sink, *mq_src;
1119   gchar *gname;
1120   gboolean is_video = FALSE;
1121   MqStreamCtx *ctx;
1122
1123   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1124
1125   GST_SPLITMUX_LOCK (splitmux);
1126   if (!create_elements (splitmux))
1127     goto fail;
1128
1129   if (templ->name_template) {
1130     if (g_str_equal (templ->name_template, "video")) {
1131       /* FIXME: Look for a pad template with matching caps, rather than by name */
1132       mux_template =
1133           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1134           (splitmux->muxer), "video_%u");
1135       is_video = TRUE;
1136       name = NULL;
1137     } else {
1138       mux_template =
1139           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1140           (splitmux->muxer), templ->name_template);
1141     }
1142   }
1143
1144   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1145   if (res == NULL)
1146     goto fail;
1147
1148   if (is_video)
1149     gname = g_strdup ("video");
1150   else if (name == NULL)
1151     gname = gst_pad_get_name (res);
1152   else
1153     gname = g_strdup (name);
1154
1155   if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1156     gst_element_release_request_pad (splitmux->muxer, res);
1157     gst_object_unref (GST_OBJECT (res));
1158     goto fail;
1159   }
1160
1161   if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1162     gst_element_release_request_pad (splitmux->muxer, res);
1163     gst_object_unref (GST_OBJECT (res));
1164     gst_element_release_request_pad (splitmux->mq, mq_sink);
1165     gst_object_unref (GST_OBJECT (mq_sink));
1166     goto fail;
1167   }
1168
1169   gst_object_unref (GST_OBJECT (res));
1170
1171   ctx = mq_stream_ctx_new (splitmux);
1172   ctx->is_video = is_video;
1173   ctx->srcpad = mq_src;
1174   ctx->sinkpad = mq_sink;
1175
1176   mq_stream_ctx_ref (ctx);
1177   ctx->src_pad_block_id =
1178       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1179       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1180       _pad_block_destroy_src_notify);
1181   if (is_video)
1182     splitmux->video_ctx = ctx;
1183
1184   res = gst_ghost_pad_new (gname, mq_sink);
1185   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1186
1187   mq_stream_ctx_ref (ctx);
1188   ctx->sink_pad_block_id =
1189       gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1190       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1191       _pad_block_destroy_sink_notify);
1192
1193   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1194       " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1195
1196   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1197
1198   g_free (gname);
1199
1200   gst_object_unref (mq_sink);
1201   gst_object_unref (mq_src);
1202
1203   gst_pad_set_active (res, TRUE);
1204   gst_element_add_pad (element, res);
1205   GST_SPLITMUX_UNLOCK (splitmux);
1206
1207   return res;
1208 fail:
1209   GST_SPLITMUX_UNLOCK (splitmux);
1210   return NULL;
1211 }
1212
1213 static void
1214 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1215 {
1216   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1217   GstPad *mqsink, *mqsrc, *muxpad;
1218   MqStreamCtx *ctx =
1219       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1220
1221   GST_SPLITMUX_LOCK (splitmux);
1222
1223   if (splitmux->muxer == NULL || splitmux->mq == NULL)
1224     goto fail;                  /* Elements don't exist yet - nothing to release */
1225
1226   GST_INFO_OBJECT (pad, "releasing request pad");
1227
1228   mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1229   mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1230   muxpad = gst_pad_get_peer (mqsrc);
1231
1232   /* Remove the context from our consideration */
1233   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1234
1235   if (ctx->sink_pad_block_id)
1236     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1237
1238   if (ctx->src_pad_block_id)
1239     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1240
1241   /* Can release the context now */
1242   mq_stream_ctx_unref (ctx);
1243
1244   /* Release and free the mq input */
1245   gst_element_release_request_pad (splitmux->mq, mqsink);
1246
1247   /* Release and free the muxer input */
1248   gst_element_release_request_pad (splitmux->muxer, muxpad);
1249
1250   gst_object_unref (mqsink);
1251   gst_object_unref (mqsrc);
1252   gst_object_unref (muxpad);
1253
1254   gst_element_remove_pad (element, pad);
1255
1256 fail:
1257   GST_SPLITMUX_UNLOCK (splitmux);
1258 }
1259
1260 static GstElement *
1261 create_element (GstSplitMuxSink * splitmux,
1262     const gchar * factory, const gchar * name)
1263 {
1264   GstElement *ret = gst_element_factory_make (factory, name);
1265   if (ret == NULL) {
1266     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1267     return NULL;
1268   }
1269
1270   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1271     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1272     gst_object_unref (ret);
1273     return NULL;
1274   }
1275
1276   return ret;
1277 }
1278
1279 static gboolean
1280 create_elements (GstSplitMuxSink * splitmux)
1281 {
1282   /* Create internal elements */
1283   if (splitmux->mq == NULL) {
1284     if ((splitmux->mq =
1285             create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1286       goto fail;
1287
1288     splitmux->mq_max_buffers = 5;
1289     /* No bytes or time limit, we limit buffers manually */
1290     g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1291         (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1292   }
1293
1294   if (splitmux->muxer == NULL) {
1295     GstElement *provided_muxer = NULL;
1296
1297     GST_OBJECT_LOCK (splitmux);
1298     if (splitmux->provided_muxer != NULL)
1299       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1300     GST_OBJECT_UNLOCK (splitmux);
1301
1302     if (provided_muxer == NULL) {
1303       if ((splitmux->muxer =
1304               create_element (splitmux, "mp4mux", "muxer")) == NULL)
1305         goto fail;
1306     } else {
1307       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1308         g_warning ("Could not add muxer element - splitmuxsink will not work");
1309         gst_object_unref (provided_muxer);
1310         goto fail;
1311       }
1312
1313       splitmux->muxer = provided_muxer;
1314       gst_object_unref (provided_muxer);
1315     }
1316   }
1317
1318   return TRUE;
1319 fail:
1320   return FALSE;
1321 }
1322
1323 static GstElement *
1324 find_sink (GstElement * e)
1325 {
1326   GstElement *res = NULL;
1327   GstIterator *iter;
1328   gboolean done = FALSE;
1329   GValue data = { 0, };
1330
1331   if (!GST_IS_BIN (e))
1332     return e;
1333
1334   iter = gst_bin_iterate_sinks (GST_BIN (e));
1335   while (!done) {
1336     switch (gst_iterator_next (iter, &data)) {
1337       case GST_ITERATOR_OK:
1338       {
1339         GstElement *child = g_value_get_object (&data);
1340         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1341                 "location") != NULL) {
1342           res = child;
1343           done = TRUE;
1344         }
1345         g_value_reset (&data);
1346         break;
1347       }
1348       case GST_ITERATOR_RESYNC:
1349         gst_iterator_resync (iter);
1350         break;
1351       case GST_ITERATOR_DONE:
1352         done = TRUE;
1353         break;
1354       case GST_ITERATOR_ERROR:
1355         g_assert_not_reached ();
1356         break;
1357     }
1358   }
1359   g_value_unset (&data);
1360   gst_iterator_free (iter);
1361
1362   return res;
1363 }
1364
1365 static gboolean
1366 create_sink (GstSplitMuxSink * splitmux)
1367 {
1368   GstElement *provided_sink = NULL;
1369
1370   g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
1371
1372   GST_OBJECT_LOCK (splitmux);
1373   if (splitmux->provided_sink != NULL)
1374     provided_sink = gst_object_ref (splitmux->provided_sink);
1375   GST_OBJECT_UNLOCK (splitmux);
1376
1377   if (provided_sink == NULL) {
1378     if ((splitmux->sink =
1379             create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1380       goto fail;
1381     splitmux->active_sink = splitmux->sink;
1382   } else {
1383     if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1384       g_warning ("Could not add sink elements - splitmuxsink will not work");
1385       gst_object_unref (provided_sink);
1386       goto fail;
1387     }
1388
1389     splitmux->active_sink = provided_sink;
1390
1391     /* The bin holds a ref now, we can drop our tmp ref */
1392     gst_object_unref (provided_sink);
1393
1394     /* Find the sink element */
1395     splitmux->sink = find_sink (splitmux->active_sink);
1396     if (splitmux->sink == NULL) {
1397       g_warning
1398           ("Could not locate sink element in provided sink - splitmuxsink will not work");
1399       goto fail;
1400     }
1401   }
1402
1403   if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1404     g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1405     goto fail;
1406   }
1407
1408   return TRUE;
1409 fail:
1410   return FALSE;
1411 }
1412
1413 #ifdef __GNUC__
1414 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1415 #endif
1416 static void
1417 set_next_filename (GstSplitMuxSink * splitmux)
1418 {
1419   if (splitmux->location) {
1420     gchar *fname;
1421
1422     fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
1423     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1424     g_object_set (splitmux->sink, "location", fname, NULL);
1425     g_free (fname);
1426
1427     splitmux->fragment_id++;
1428   }
1429 }
1430
1431 static GstStateChangeReturn
1432 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1433 {
1434   GstStateChangeReturn ret;
1435   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1436
1437   switch (transition) {
1438     case GST_STATE_CHANGE_NULL_TO_READY:{
1439       GST_SPLITMUX_LOCK (splitmux);
1440       if (!create_elements (splitmux) || !create_sink (splitmux)) {
1441         ret = GST_STATE_CHANGE_FAILURE;
1442         GST_SPLITMUX_UNLOCK (splitmux);
1443         goto beach;
1444       }
1445       GST_SPLITMUX_UNLOCK (splitmux);
1446       splitmux->fragment_id = 0;
1447       set_next_filename (splitmux);
1448       break;
1449     }
1450     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1451       GST_SPLITMUX_LOCK (splitmux);
1452       /* Start by collecting one input on each pad */
1453       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1454       splitmux->max_in_running_time = 0;
1455       splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1456       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1457       GST_SPLITMUX_UNLOCK (splitmux);
1458       break;
1459     }
1460     case GST_STATE_CHANGE_PAUSED_TO_READY:
1461     case GST_STATE_CHANGE_READY_TO_NULL:
1462       GST_SPLITMUX_LOCK (splitmux);
1463       splitmux->state = SPLITMUX_STATE_STOPPED;
1464       /* Wake up any blocked threads */
1465       GST_LOG_OBJECT (splitmux,
1466           "State change -> NULL or READY. Waking threads");
1467       GST_SPLITMUX_BROADCAST (splitmux);
1468       GST_SPLITMUX_UNLOCK (splitmux);
1469       break;
1470     default:
1471       break;
1472   }
1473
1474   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1475   if (ret == GST_STATE_CHANGE_FAILURE)
1476     goto beach;
1477
1478   switch (transition) {
1479     case GST_STATE_CHANGE_READY_TO_NULL:
1480       GST_SPLITMUX_LOCK (splitmux);
1481       splitmux->fragment_id = 0;
1482       gst_splitmux_reset (splitmux);
1483       GST_SPLITMUX_UNLOCK (splitmux);
1484       break;
1485     default:
1486       break;
1487   }
1488
1489 beach:
1490
1491   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1492       ret == GST_STATE_CHANGE_FAILURE) {
1493     /* Cleanup elements on failed transition out of NULL */
1494     gst_splitmux_reset (splitmux);
1495   }
1496   return ret;
1497 }
1498
1499 gboolean
1500 register_splitmuxsink (GstPlugin * plugin)
1501 {
1502   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1503       "Split File Muxing Sink");
1504
1505   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1506       GST_TYPE_SPLITMUX_SINK);
1507 }