splitmuxsink: Remove unused muxed_out_time
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * <refsect2>
42  * <title>Example pipelines</title>
43  * |[
44  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
45  * ]|
46  * Records a video stream captured from a v4l2 device and muxes it into
47  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48  * and 1MB maximum size.
49  * </refsect2>
50  */
51
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55
56 #include <string.h>
57 #include <glib/gstdio.h>
58 #include <gst/video/video.h>
59 #include "gstsplitmuxsink.h"
60
61 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
62 #define GST_CAT_DEFAULT splitmux_debug
63
64 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
65 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
66 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
67 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
68
69 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
70 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
71
72 enum
73 {
74   PROP_0,
75   PROP_LOCATION,
76   PROP_MAX_SIZE_TIME,
77   PROP_MAX_SIZE_BYTES,
78   PROP_MAX_SIZE_TIMECODE,
79   PROP_SEND_KEYFRAME_REQUESTS,
80   PROP_MAX_FILES,
81   PROP_MUXER_OVERHEAD,
82   PROP_MUXER,
83   PROP_SINK
84 };
85
86 #define DEFAULT_MAX_SIZE_TIME       0
87 #define DEFAULT_MAX_SIZE_BYTES      0
88 #define DEFAULT_MAX_FILES           0
89 #define DEFAULT_MUXER_OVERHEAD      0.02
90 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
91 #define DEFAULT_MUXER "mp4mux"
92 #define DEFAULT_SINK "filesink"
93
94 enum
95 {
96   SIGNAL_FORMAT_LOCATION,
97   SIGNAL_FORMAT_LOCATION_FULL,
98   SIGNAL_LAST
99 };
100
101 static guint signals[SIGNAL_LAST];
102
103 static GstStaticPadTemplate video_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("video",
105     GST_PAD_SINK,
106     GST_PAD_REQUEST,
107     GST_STATIC_CAPS_ANY);
108 static GstStaticPadTemplate audio_sink_template =
109 GST_STATIC_PAD_TEMPLATE ("audio_%u",
110     GST_PAD_SINK,
111     GST_PAD_REQUEST,
112     GST_STATIC_CAPS_ANY);
113 static GstStaticPadTemplate subtitle_sink_template =
114 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
115     GST_PAD_SINK,
116     GST_PAD_REQUEST,
117     GST_STATIC_CAPS_ANY);
118
119 static GQuark PAD_CONTEXT;
120
121 static void
122 _do_init (void)
123 {
124   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
125 }
126
127 #define gst_splitmux_sink_parent_class parent_class
128 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
129     _do_init ());
130
131 static gboolean create_muxer (GstSplitMuxSink * splitmux);
132 static gboolean create_sink (GstSplitMuxSink * splitmux);
133 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
134     const GValue * value, GParamSpec * pspec);
135 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
136     GValue * value, GParamSpec * pspec);
137 static void gst_splitmux_sink_dispose (GObject * object);
138 static void gst_splitmux_sink_finalize (GObject * object);
139
140 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
141     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
142 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
143
144 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
145     element, GstStateChange transition);
146
147 static void bus_handler (GstBin * bin, GstMessage * msg);
148 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
149 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
150 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
151 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
152
153 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
154 static GstElement *create_element (GstSplitMuxSink * splitmux,
155     const gchar * factory, const gchar * name, gboolean locked);
156
157 static void do_async_done (GstSplitMuxSink * splitmux);
158
159 static MqStreamBuf *
160 mq_stream_buf_new (void)
161 {
162   return g_slice_new0 (MqStreamBuf);
163 }
164
165 static void
166 mq_stream_buf_free (MqStreamBuf * data)
167 {
168   g_slice_free (MqStreamBuf, data);
169 }
170
171 static SplitMuxOutputCommand *
172 out_cmd_buf_new (void)
173 {
174   return g_slice_new0 (SplitMuxOutputCommand);
175 }
176
177 static void
178 out_cmd_buf_free (SplitMuxOutputCommand * data)
179 {
180   g_slice_free (SplitMuxOutputCommand, data);
181 }
182
183 static void
184 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
185 {
186   GObjectClass *gobject_class = (GObjectClass *) klass;
187   GstElementClass *gstelement_class = (GstElementClass *) klass;
188   GstBinClass *gstbin_class = (GstBinClass *) klass;
189
190   gobject_class->set_property = gst_splitmux_sink_set_property;
191   gobject_class->get_property = gst_splitmux_sink_get_property;
192   gobject_class->dispose = gst_splitmux_sink_dispose;
193   gobject_class->finalize = gst_splitmux_sink_finalize;
194
195   gst_element_class_set_static_metadata (gstelement_class,
196       "Split Muxing Bin", "Generic/Bin/Muxer",
197       "Convenience bin that muxes incoming streams into multiple time/size limited files",
198       "Jan Schmidt <jan@centricular.com>");
199
200   gst_element_class_add_static_pad_template (gstelement_class,
201       &video_sink_template);
202   gst_element_class_add_static_pad_template (gstelement_class,
203       &audio_sink_template);
204   gst_element_class_add_static_pad_template (gstelement_class,
205       &subtitle_sink_template);
206
207   gstelement_class->change_state =
208       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
209   gstelement_class->request_new_pad =
210       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
211   gstelement_class->release_pad =
212       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
213
214   gstbin_class->handle_message = bus_handler;
215
216   g_object_class_install_property (gobject_class, PROP_LOCATION,
217       g_param_spec_string ("location", "File Output Pattern",
218           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
219           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
221       g_param_spec_double ("mux-overhead", "Muxing Overhead",
222           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
223           DEFAULT_MUXER_OVERHEAD,
224           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
225
226   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
227       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
228           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
229           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
231       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
232           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
233           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
235       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
236           "Maximum difference in timecode between first and last frame. "
237           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
238           "Will only be effective if a timecode track is present.",
239           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
240   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
241       g_param_spec_boolean ("send-keyframe-requests",
242           "Request keyframes at max-size-time",
243           "Request a keyframe every max-size-time ns to try splitting at that point. "
244           "Needs max-size-bytes to be 0 in order to be effective.",
245           DEFAULT_SEND_KEYFRAME_REQUESTS,
246           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
247   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
248       g_param_spec_uint ("max-files", "Max files",
249           "Maximum number of files to keep on disk. Once the maximum is reached,"
250           "old files start to be deleted to make room for new ones.", 0,
251           G_MAXUINT, DEFAULT_MAX_FILES,
252           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
253
254
255   g_object_class_install_property (gobject_class, PROP_MUXER,
256       g_param_spec_object ("muxer", "Muxer",
257           "The muxer element to use (NULL = default mp4mux)",
258           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259   g_object_class_install_property (gobject_class, PROP_SINK,
260       g_param_spec_object ("sink", "Sink",
261           "The sink element (or element chain) to use (NULL = default filesink)",
262           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263
264   /**
265    * GstSplitMuxSink::format-location:
266    * @splitmux: the #GstSplitMuxSink
267    * @fragment_id: the sequence number of the file to be created
268    *
269    * Returns: the location to be used for the next output file
270    */
271   signals[SIGNAL_FORMAT_LOCATION] =
272       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
273       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
274
275   /**
276    * GstSplitMuxSink::format-location-full:
277    * @splitmux: the #GstSplitMuxSink
278    * @fragment_id: the sequence number of the file to be created
279    * @first_sample: A #GstSample containing the first buffer
280    *   from the reference stream in the new file
281    *
282    * Returns: the location to be used for the next output file
283    */
284   signals[SIGNAL_FORMAT_LOCATION_FULL] =
285       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
286       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
287       GST_TYPE_SAMPLE);
288 }
289
290 static void
291 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
292 {
293   g_mutex_init (&splitmux->lock);
294   g_cond_init (&splitmux->input_cond);
295   g_cond_init (&splitmux->output_cond);
296   g_queue_init (&splitmux->out_cmd_q);
297
298   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
299   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
300   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
301   splitmux->max_files = DEFAULT_MAX_FILES;
302   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
303   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
304
305   splitmux->threshold_timecode_str = NULL;
306
307   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
308 }
309
310 static void
311 gst_splitmux_reset (GstSplitMuxSink * splitmux)
312 {
313   if (splitmux->muxer) {
314     gst_element_set_locked_state (splitmux->muxer, TRUE);
315     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
316     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
317   }
318   if (splitmux->active_sink) {
319     gst_element_set_locked_state (splitmux->active_sink, TRUE);
320     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
321     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
322   }
323
324   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
325 }
326
327 static void
328 gst_splitmux_sink_dispose (GObject * object)
329 {
330   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
331
332   /* Calling parent dispose invalidates all child pointers */
333   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
334
335   G_OBJECT_CLASS (parent_class)->dispose (object);
336 }
337
338 static void
339 gst_splitmux_sink_finalize (GObject * object)
340 {
341   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
342   g_cond_clear (&splitmux->input_cond);
343   g_cond_clear (&splitmux->output_cond);
344   g_mutex_clear (&splitmux->lock);
345   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
346   g_queue_clear (&splitmux->out_cmd_q);
347
348   if (splitmux->provided_sink)
349     gst_object_unref (splitmux->provided_sink);
350   if (splitmux->provided_muxer)
351     gst_object_unref (splitmux->provided_muxer);
352
353   if (splitmux->threshold_timecode_str)
354     g_free (splitmux->threshold_timecode_str);
355
356   g_free (splitmux->location);
357
358   /* Make sure to free any un-released contexts */
359   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
360   g_list_free (splitmux->contexts);
361
362   G_OBJECT_CLASS (parent_class)->finalize (object);
363 }
364
365 static void
366 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
367     const GValue * value, GParamSpec * pspec)
368 {
369   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
370
371   switch (prop_id) {
372     case PROP_LOCATION:{
373       GST_OBJECT_LOCK (splitmux);
374       g_free (splitmux->location);
375       splitmux->location = g_value_dup_string (value);
376       GST_OBJECT_UNLOCK (splitmux);
377       break;
378     }
379     case PROP_MAX_SIZE_BYTES:
380       GST_OBJECT_LOCK (splitmux);
381       splitmux->threshold_bytes = g_value_get_uint64 (value);
382       GST_OBJECT_UNLOCK (splitmux);
383       break;
384     case PROP_MAX_SIZE_TIME:
385       GST_OBJECT_LOCK (splitmux);
386       splitmux->threshold_time = g_value_get_uint64 (value);
387       GST_OBJECT_UNLOCK (splitmux);
388       break;
389     case PROP_MAX_SIZE_TIMECODE:
390       GST_OBJECT_LOCK (splitmux);
391       splitmux->threshold_timecode_str = g_value_dup_string (value);
392       GST_OBJECT_UNLOCK (splitmux);
393       break;
394     case PROP_SEND_KEYFRAME_REQUESTS:
395       GST_OBJECT_LOCK (splitmux);
396       splitmux->send_keyframe_requests = g_value_get_boolean (value);
397       GST_OBJECT_UNLOCK (splitmux);
398       break;
399     case PROP_MAX_FILES:
400       GST_OBJECT_LOCK (splitmux);
401       splitmux->max_files = g_value_get_uint (value);
402       GST_OBJECT_UNLOCK (splitmux);
403       break;
404     case PROP_MUXER_OVERHEAD:
405       GST_OBJECT_LOCK (splitmux);
406       splitmux->mux_overhead = g_value_get_double (value);
407       GST_OBJECT_UNLOCK (splitmux);
408       break;
409     case PROP_SINK:
410       GST_OBJECT_LOCK (splitmux);
411       if (splitmux->provided_sink)
412         gst_object_unref (splitmux->provided_sink);
413       splitmux->provided_sink = g_value_get_object (value);
414       gst_object_ref_sink (splitmux->provided_sink);
415       GST_OBJECT_UNLOCK (splitmux);
416       break;
417     case PROP_MUXER:
418       GST_OBJECT_LOCK (splitmux);
419       if (splitmux->provided_muxer)
420         gst_object_unref (splitmux->provided_muxer);
421       splitmux->provided_muxer = g_value_get_object (value);
422       gst_object_ref_sink (splitmux->provided_muxer);
423       GST_OBJECT_UNLOCK (splitmux);
424       break;
425     default:
426       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
427       break;
428   }
429 }
430
431 static void
432 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
433     GValue * value, GParamSpec * pspec)
434 {
435   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
436
437   switch (prop_id) {
438     case PROP_LOCATION:
439       GST_OBJECT_LOCK (splitmux);
440       g_value_set_string (value, splitmux->location);
441       GST_OBJECT_UNLOCK (splitmux);
442       break;
443     case PROP_MAX_SIZE_BYTES:
444       GST_OBJECT_LOCK (splitmux);
445       g_value_set_uint64 (value, splitmux->threshold_bytes);
446       GST_OBJECT_UNLOCK (splitmux);
447       break;
448     case PROP_MAX_SIZE_TIME:
449       GST_OBJECT_LOCK (splitmux);
450       g_value_set_uint64 (value, splitmux->threshold_time);
451       GST_OBJECT_UNLOCK (splitmux);
452       break;
453     case PROP_MAX_SIZE_TIMECODE:
454       GST_OBJECT_LOCK (splitmux);
455       g_value_set_string (value, splitmux->threshold_timecode_str);
456       GST_OBJECT_UNLOCK (splitmux);
457       break;
458     case PROP_SEND_KEYFRAME_REQUESTS:
459       GST_OBJECT_LOCK (splitmux);
460       g_value_set_boolean (value, splitmux->send_keyframe_requests);
461       GST_OBJECT_UNLOCK (splitmux);
462       break;
463     case PROP_MAX_FILES:
464       GST_OBJECT_LOCK (splitmux);
465       g_value_set_uint (value, splitmux->max_files);
466       GST_OBJECT_UNLOCK (splitmux);
467       break;
468     case PROP_MUXER_OVERHEAD:
469       GST_OBJECT_LOCK (splitmux);
470       g_value_set_double (value, splitmux->mux_overhead);
471       GST_OBJECT_UNLOCK (splitmux);
472       break;
473     case PROP_SINK:
474       GST_OBJECT_LOCK (splitmux);
475       g_value_set_object (value, splitmux->provided_sink);
476       GST_OBJECT_UNLOCK (splitmux);
477       break;
478     case PROP_MUXER:
479       GST_OBJECT_LOCK (splitmux);
480       g_value_set_object (value, splitmux->provided_muxer);
481       GST_OBJECT_UNLOCK (splitmux);
482       break;
483     default:
484       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
485       break;
486   }
487 }
488
489 /* Convenience function */
490 static inline GstClockTimeDiff
491 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
492 {
493   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
494
495   if (GST_CLOCK_TIME_IS_VALID (val)) {
496     gboolean sign =
497         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
498     if (sign > 0)
499       res = val;
500     else if (sign < 0)
501       res = -val;
502   }
503   return res;
504 }
505
506 static MqStreamCtx *
507 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
508 {
509   MqStreamCtx *ctx;
510
511   ctx = g_new0 (MqStreamCtx, 1);
512   g_atomic_int_set (&ctx->refcount, 1);
513   ctx->splitmux = splitmux;
514   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
515   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
516   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
517   g_queue_init (&ctx->queued_bufs);
518   return ctx;
519 }
520
521 static void
522 mq_stream_ctx_free (MqStreamCtx * ctx)
523 {
524   if (ctx->q) {
525     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
526     gst_element_set_locked_state (ctx->q, TRUE);
527     gst_element_set_state (ctx->q, GST_STATE_NULL);
528     gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
529     gst_object_unref (ctx->q);
530   }
531   gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
532   gst_object_unref (ctx->sinkpad);
533   gst_object_unref (ctx->srcpad);
534   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
535   g_queue_clear (&ctx->queued_bufs);
536   g_free (ctx);
537 }
538
539 static void
540 mq_stream_ctx_unref (MqStreamCtx * ctx)
541 {
542   if (g_atomic_int_dec_and_test (&ctx->refcount))
543     mq_stream_ctx_free (ctx);
544 }
545
546 static void
547 mq_stream_ctx_ref (MqStreamCtx * ctx)
548 {
549   g_atomic_int_inc (&ctx->refcount);
550 }
551
552 static void
553 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
554 {
555   ctx->sink_pad_block_id = 0;
556   mq_stream_ctx_unref (ctx);
557 }
558
559 static void
560 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
561 {
562   ctx->src_pad_block_id = 0;
563   mq_stream_ctx_unref (ctx);
564 }
565
566 static void
567 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
568 {
569   gchar *location = NULL;
570   GstMessage *msg;
571   const gchar *msg_name = opened ?
572       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
573
574   g_object_get (splitmux->sink, "location", &location, NULL);
575
576   msg = gst_message_new_element (GST_OBJECT (splitmux),
577       gst_structure_new (msg_name,
578           "location", G_TYPE_STRING, location,
579           "running-time", GST_TYPE_CLOCK_TIME,
580           splitmux->reference_ctx->out_running_time, NULL));
581   gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
582
583   g_free (location);
584 }
585
586 /* Called with lock held, drops the lock to send EOS to the
587  * pad
588  */
589 static void
590 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
591 {
592   GstEvent *eos;
593   GstPad *pad;
594
595   eos = gst_event_new_eos ();
596   pad = gst_pad_get_peer (ctx->srcpad);
597
598   ctx->out_eos = TRUE;
599
600   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
601   GST_SPLITMUX_UNLOCK (splitmux);
602   gst_pad_send_event (pad, eos);
603   GST_SPLITMUX_LOCK (splitmux);
604
605   gst_object_unref (pad);
606 }
607
608 /* Called with splitmux lock held to check if this output
609  * context needs to sleep to wait for the release of the
610  * next GOP, or to send EOS to close out the current file
611  */
612 static void
613 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
614 {
615   do {
616     /* When first starting up, the reference stream has to output
617      * the first buffer to prepare the muxer and sink */
618     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
619
620     if (ctx->flushing
621         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
622       return;
623
624     GST_LOG_OBJECT (ctx->srcpad,
625         "Checking running time %" GST_STIME_FORMAT " against max %"
626         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
627         GST_STIME_ARGS (splitmux->max_out_running_time));
628
629     if (can_output) {
630       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
631           ctx->out_running_time < splitmux->max_out_running_time) {
632         return;
633       }
634
635       switch (splitmux->output_state) {
636         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
637           /* We only get here if we've finished outputting a GOP and need to know
638            * what to do next */
639           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
640           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
641           continue;
642
643         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
644           /* We've reached the max out running_time to get here, so end this file now */
645           if (ctx->out_eos == FALSE) {
646             send_eos (splitmux, ctx);
647             continue;
648           }
649           break;
650         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
651           if (ctx->is_reference) {
652             /* Special handling on the reference ctx to start new fragments
653              * and collect commands from the command queue */
654             /* drops the splitmux lock briefly: */
655             start_next_fragment (splitmux, ctx);
656             continue;
657           }
658           break;
659
660         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
661           do {
662             SplitMuxOutputCommand *cmd =
663                 g_queue_pop_tail (&splitmux->out_cmd_q);
664             if (cmd != NULL) {
665               /* If we pop the last command, we need to make our queues bigger */
666               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
667                 grow_blocked_queues (splitmux);
668
669               if (cmd->start_new_fragment) {
670                 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
671                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
672               } else {
673                 GST_DEBUG_OBJECT (splitmux,
674                     "Got new output cmd for time %" GST_STIME_FORMAT,
675                     GST_STIME_ARGS (cmd->max_output_ts));
676
677                 /* Extend the output range immediately */
678                 splitmux->max_out_running_time = cmd->max_output_ts;
679                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
680               }
681               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
682
683               out_cmd_buf_free (cmd);
684               break;
685             } else {
686               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
687             }
688           } while (splitmux->output_state ==
689               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
690           /* loop and re-check the state */
691           continue;
692         }
693         case SPLITMUX_OUTPUT_STATE_STOPPED:
694           return;
695       }
696     }
697
698     GST_INFO_OBJECT (ctx->srcpad,
699         "Sleeping for running time %"
700         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
701         GST_STIME_ARGS (ctx->out_running_time),
702         GST_STIME_ARGS (splitmux->max_out_running_time));
703     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
704     GST_INFO_OBJECT (ctx->srcpad,
705         "Woken for new max running time %" GST_STIME_FORMAT,
706         GST_STIME_ARGS (splitmux->max_out_running_time));
707   }
708   while (1);
709 }
710
711 static GstClockTime
712 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
713     const GstVideoTimeCode * cur_tc)
714 {
715   GstVideoTimeCode *target_tc;
716   GstVideoTimeCodeInterval *tc_inter;
717   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
718
719   if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
720     return GST_CLOCK_TIME_NONE;
721
722   tc_inter =
723       gst_video_time_code_interval_new_from_string
724       (splitmux->threshold_timecode_str);
725   target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
726   gst_video_time_code_interval_free (tc_inter);
727
728   /* Convert to ns */
729   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
730   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
731
732   /* Add fragment_start_time, accounting for wraparound */
733   if (target_tc_time >= cur_tc_time) {
734     next_max_tc_time =
735         target_tc_time - cur_tc_time + splitmux->fragment_start_time;
736   } else {
737     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
738
739     next_max_tc_time =
740         day_in_ns - cur_tc_time + target_tc_time +
741         splitmux->fragment_start_time;
742   }
743   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
744       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
745       GST_TIME_ARGS (cur_tc_time));
746   gst_video_time_code_free (target_tc);
747
748   return next_max_tc_time;
749 }
750
751 static gboolean
752 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
753 {
754   GstEvent *ev;
755   GstClockTime target_time;
756   gboolean timecode_based = FALSE;
757
758   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
759   if (splitmux->threshold_timecode_str) {
760     GstVideoTimeCodeMeta *tc_meta;
761
762     if (buffer != NULL) {
763       tc_meta = gst_buffer_get_video_time_code_meta (buffer);
764       if (tc_meta) {
765         splitmux->next_max_tc_time =
766             calculate_next_max_timecode (splitmux, &tc_meta->tc);
767         timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
768       }
769     } else {
770       /* This can happen in the presence of GAP events that trigger
771        * a new fragment start */
772       GST_WARNING_OBJECT (splitmux,
773           "No buffer available to calculate next timecode");
774     }
775   }
776
777   if (splitmux->send_keyframe_requests == FALSE
778       || (splitmux->threshold_time == 0 && !timecode_based)
779       || splitmux->threshold_bytes != 0)
780     return TRUE;
781
782   if (timecode_based) {
783     /* We might have rounding errors: aim slightly earlier */
784     target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
785   } else {
786     target_time = splitmux->fragment_start_time + splitmux->threshold_time;
787   }
788   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
789   GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
790       GST_TIME_ARGS (target_time));
791   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
792 }
793
794 static GstPadProbeReturn
795 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
796 {
797   GstSplitMuxSink *splitmux = ctx->splitmux;
798   MqStreamBuf *buf_info = NULL;
799
800   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
801
802   /* FIXME: Handle buffer lists, until then make it clear they won't work */
803   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
804     g_warning ("Buffer list handling not implemented");
805     return GST_PAD_PROBE_DROP;
806   }
807   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
808     GstEvent *event = gst_pad_probe_info_get_event (info);
809     gboolean locked = FALSE;
810
811     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
812
813     switch (GST_EVENT_TYPE (event)) {
814       case GST_EVENT_SEGMENT:
815         gst_event_copy_segment (event, &ctx->out_segment);
816         break;
817       case GST_EVENT_FLUSH_STOP:
818         GST_SPLITMUX_LOCK (splitmux);
819         locked = TRUE;
820         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
821         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
822         g_queue_clear (&ctx->queued_bufs);
823         ctx->flushing = FALSE;
824         break;
825       case GST_EVENT_FLUSH_START:
826         GST_SPLITMUX_LOCK (splitmux);
827         locked = TRUE;
828         GST_LOG_OBJECT (pad, "Flush start");
829         ctx->flushing = TRUE;
830         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
831         break;
832       case GST_EVENT_EOS:
833         GST_SPLITMUX_LOCK (splitmux);
834         locked = TRUE;
835         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
836           goto beach;
837         ctx->out_eos = TRUE;
838         break;
839       case GST_EVENT_GAP:{
840         GstClockTime gap_ts;
841         GstClockTimeDiff rtime;
842
843         gst_event_parse_gap (event, &gap_ts, NULL);
844         if (gap_ts == GST_CLOCK_TIME_NONE)
845           break;
846
847         GST_SPLITMUX_LOCK (splitmux);
848         locked = TRUE;
849
850         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
851           goto beach;
852
853         /* When we get a gap event on the
854          * reference stream and we're trying to open a
855          * new file, we need to store it until we get
856          * the buffer afterwards
857          */
858         if (ctx->is_reference &&
859             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
860           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
861           gst_event_replace (&ctx->pending_gap, event);
862           GST_SPLITMUX_UNLOCK (splitmux);
863           return GST_PAD_PROBE_HANDLED;
864         }
865
866         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
867
868         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
869             GST_STIME_ARGS (rtime));
870
871         if (rtime != GST_CLOCK_STIME_NONE) {
872           ctx->out_running_time = rtime;
873           complete_or_wait_on_out (splitmux, ctx);
874         }
875         break;
876       }
877       case GST_EVENT_CUSTOM_DOWNSTREAM:{
878         const GstStructure *s;
879         GstClockTimeDiff ts = 0;
880
881         s = gst_event_get_structure (event);
882         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
883           break;
884
885         gst_structure_get_int64 (s, "timestamp", &ts);
886
887         GST_SPLITMUX_LOCK (splitmux);
888         locked = TRUE;
889
890         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
891           goto beach;
892         ctx->out_running_time = ts;
893         if (!ctx->is_reference)
894           complete_or_wait_on_out (splitmux, ctx);
895         GST_SPLITMUX_UNLOCK (splitmux);
896         return GST_PAD_PROBE_DROP;
897       }
898       default:
899         break;
900     }
901
902     /* We need to make sure events aren't passed
903      * until the muxer / sink are ready for it */
904     if (!locked)
905       GST_SPLITMUX_LOCK (splitmux);
906     if (!ctx->is_reference)
907       complete_or_wait_on_out (splitmux, ctx);
908     GST_SPLITMUX_UNLOCK (splitmux);
909
910     return GST_PAD_PROBE_PASS;
911   }
912
913   /* Allow everything through until the configured next stopping point */
914   GST_SPLITMUX_LOCK (splitmux);
915
916   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
917   if (buf_info == NULL)
918     /* Can only happen due to a poorly timed flush */
919     goto beach;
920
921   /* If we have popped a keyframe, decrement the queued_gop count */
922   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
923     splitmux->queued_keyframes--;
924
925   ctx->out_running_time = buf_info->run_ts;
926   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
927
928   GST_LOG_OBJECT (splitmux,
929       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
930       " size %" G_GUINT64_FORMAT,
931       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
932
933   complete_or_wait_on_out (splitmux, ctx);
934
935   splitmux->muxed_out_bytes += buf_info->buf_size;
936
937 #ifndef GST_DISABLE_GST_DEBUG
938   {
939     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
940     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
941         " run ts %" GST_STIME_FORMAT, buf,
942         GST_STIME_ARGS (ctx->out_running_time));
943   }
944 #endif
945
946   ctx->cur_out_buffer = NULL;
947   GST_SPLITMUX_UNLOCK (splitmux);
948
949   /* pending_gap is protected by the STREAM lock */
950   if (ctx->pending_gap) {
951     /* If we previously stored a gap event, send it now */
952     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
953
954     GST_DEBUG_OBJECT (splitmux,
955         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
956
957     gst_pad_send_event (peer, ctx->pending_gap);
958     ctx->pending_gap = NULL;
959
960     gst_object_unref (peer);
961   }
962
963   mq_stream_buf_free (buf_info);
964
965   return GST_PAD_PROBE_PASS;
966
967 beach:
968   GST_SPLITMUX_UNLOCK (splitmux);
969   return GST_PAD_PROBE_DROP;
970 }
971
972 static gboolean
973 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
974 {
975   return gst_pad_send_event (peer, gst_event_ref (*event));
976 }
977
978 static void
979 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
980 {
981   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
982
983   gst_pad_sticky_events_foreach (ctx->srcpad,
984       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
985
986   /* Clear EOS flag if not actually EOS */
987   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
988
989   gst_object_unref (peer);
990 }
991
992 /* Called with lock held when a fragment
993  * reaches EOS and it is time to restart
994  * a new fragment
995  */
996 static void
997 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
998 {
999   GstElement *muxer, *sink;
1000
1001   /* 1 change to new file */
1002   splitmux->switching_fragment = TRUE;
1003
1004   /* We need to drop the splitmux lock to acquire the state lock
1005    * here and ensure there's no racy state change going on elsewhere */
1006   muxer = gst_object_ref (splitmux->muxer);
1007   sink = gst_object_ref (splitmux->active_sink);
1008
1009   GST_SPLITMUX_UNLOCK (splitmux);
1010   GST_STATE_LOCK (splitmux);
1011
1012   gst_element_set_locked_state (muxer, TRUE);
1013   gst_element_set_locked_state (sink, TRUE);
1014   gst_element_set_state (muxer, GST_STATE_NULL);
1015   gst_element_set_state (sink, GST_STATE_NULL);
1016
1017   GST_SPLITMUX_LOCK (splitmux);
1018   set_next_filename (splitmux, ctx);
1019   GST_SPLITMUX_UNLOCK (splitmux);
1020
1021   gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1022   gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1023   gst_element_set_locked_state (muxer, FALSE);
1024   gst_element_set_locked_state (sink, FALSE);
1025
1026   gst_object_unref (sink);
1027   gst_object_unref (muxer);
1028
1029   GST_SPLITMUX_LOCK (splitmux);
1030   GST_STATE_UNLOCK (splitmux);
1031   splitmux->switching_fragment = FALSE;
1032   do_async_done (splitmux);
1033
1034   splitmux->ready_for_output = TRUE;
1035
1036   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1037
1038   send_fragment_opened_closed_msg (splitmux, TRUE);
1039
1040   /* FIXME: Is this always the correct next state? */
1041   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1042   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1043 }
1044
1045 static void
1046 bus_handler (GstBin * bin, GstMessage * message)
1047 {
1048   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1049
1050   switch (GST_MESSAGE_TYPE (message)) {
1051     case GST_MESSAGE_EOS:
1052       /* If the state is draining out the current file, drop this EOS */
1053       GST_SPLITMUX_LOCK (splitmux);
1054
1055       send_fragment_opened_closed_msg (splitmux, FALSE);
1056
1057       if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1058         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1059         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1060         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1061
1062         gst_message_unref (message);
1063         GST_SPLITMUX_UNLOCK (splitmux);
1064         return;
1065       } else {
1066         GST_DEBUG_OBJECT (splitmux,
1067             "Passing EOS message. Output state %d max_out_running_time %"
1068             GST_STIME_FORMAT, splitmux->output_state,
1069             GST_STIME_ARGS (splitmux->max_out_running_time));
1070       }
1071       GST_SPLITMUX_UNLOCK (splitmux);
1072       break;
1073     case GST_MESSAGE_ASYNC_START:
1074     case GST_MESSAGE_ASYNC_DONE:
1075       /* Ignore state changes from our children while switching */
1076       if (splitmux->switching_fragment) {
1077         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1078             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1079           GST_LOG_OBJECT (splitmux,
1080               "Ignoring state change from child %" GST_PTR_FORMAT
1081               " while switching", GST_MESSAGE_SRC (message));
1082           gst_message_unref (message);
1083           return;
1084         }
1085       }
1086       break;
1087     default:
1088       break;
1089   }
1090
1091   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1092 }
1093
1094 static void
1095 ctx_set_unblock (MqStreamCtx * ctx)
1096 {
1097   ctx->need_unblock = TRUE;
1098 }
1099
1100 /* Called with splitmux lock held */
1101 /* Called when entering ProcessingCompleteGop state
1102  * Assess if mq contents overflowed the current file
1103  *   -> If yes, need to switch to new file
1104  *   -> if no, set max_out_running_time to let this GOP in and
1105  *      go to COLLECTING_GOP_START state
1106  */
1107 static void
1108 handle_gathered_gop (GstSplitMuxSink * splitmux)
1109 {
1110   guint64 queued_bytes;
1111   GstClockTimeDiff queued_time = 0;
1112   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1113   SplitMuxOutputCommand *cmd;
1114
1115   /* Assess if the multiqueue contents overflowed the current file */
1116   /* When considering if a newly gathered GOP overflows
1117    * the time limit for the file, only consider the running time of the
1118    * reference stream. Other streams might have run ahead a little bit,
1119    * but extra pieces won't be released to the muxer beyond the reference
1120    * stream cut-off anyway - so it forms the limit. */
1121   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1122   queued_time = splitmux->reference_ctx->in_running_time;
1123
1124   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1125
1126   g_assert (queued_time >= splitmux->fragment_start_time);
1127
1128   queued_time -= splitmux->fragment_start_time;
1129
1130   /* Expand queued bytes estimate by muxer overhead */
1131   queued_bytes += (queued_bytes * splitmux->mux_overhead);
1132
1133   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1134       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1135   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1136     GST_LOG_OBJECT (splitmux,
1137         "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1138         GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1139         GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1140   }
1141
1142   /* Check for overrun - have we output at least one byte and overrun
1143    * either threshold? */
1144   /* Timecode-based threshold accounts for possible rounding errors:
1145    * 5us should be bigger than all possible rounding errors but nowhere near
1146    * big enough to skip to another frame */
1147   if ((splitmux->fragment_total_bytes > 0 &&
1148           ((splitmux->threshold_bytes > 0 &&
1149                   queued_bytes > splitmux->threshold_bytes) ||
1150               (splitmux->threshold_time > 0 &&
1151                   queued_time > splitmux->threshold_time) ||
1152               (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1153                   splitmux->reference_ctx->in_running_time >
1154                   splitmux->next_max_tc_time + 5 * GST_USECOND)))) {
1155
1156     /* Tell the output side to start a new fragment */
1157     GST_INFO_OBJECT (splitmux,
1158         "This GOP (dur %" GST_STIME_FORMAT
1159         ") would overflow the fragment, Sending start_new_fragment cmd",
1160         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1161             splitmux->gop_start_time));
1162     cmd = out_cmd_buf_new ();
1163     cmd->start_new_fragment = TRUE;
1164     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1165     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1166
1167     new_out_ts = splitmux->reference_ctx->in_running_time;
1168     splitmux->fragment_start_time = splitmux->gop_start_time;
1169     splitmux->fragment_total_bytes = 0;
1170
1171     if (request_next_keyframe (splitmux,
1172             splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1173       GST_WARNING_OBJECT (splitmux,
1174           "Could not request a keyframe. Files may not split at the exact location they should");
1175     }
1176     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1177   }
1178
1179   /* And set up to collect the next GOP */
1180   if (!splitmux->reference_ctx->in_eos) {
1181     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1182     splitmux->gop_start_time = new_out_ts;
1183   } else {
1184     /* This is probably already the current state, but just in case: */
1185     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1186     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
1187   }
1188
1189   /* And wake all input contexts to send a wake-up event */
1190   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1191   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1192
1193   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1194   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1195
1196   if (splitmux->gop_total_bytes > 0) {
1197     GST_LOG_OBJECT (splitmux,
1198         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1199         " time %" GST_STIME_FORMAT,
1200         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1201
1202     /* Send this GOP to the output command queue */
1203     cmd = out_cmd_buf_new ();
1204     cmd->start_new_fragment = FALSE;
1205     cmd->max_output_ts = new_out_ts;
1206     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1207         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1208     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1209
1210     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1211   }
1212
1213   splitmux->gop_total_bytes = 0;
1214 }
1215
1216 /* Called with splitmux lock held */
1217 /* Called from each input pad when it is has all the pieces
1218  * for a GOP or EOS, starting with the reference pad which has set the
1219  * splitmux->max_in_running_time
1220  */
1221 static void
1222 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1223 {
1224   GList *cur;
1225   GstEvent *event;
1226
1227   /* On ENDING_FILE, the reference stream sends a command to start a new
1228    * fragment, then releases the GOP for output in the new fragment.
1229    *  If somes streams received no buffer during the last GOP that overran,
1230    * because its next buffer has a timestamp bigger than
1231    * ctx->max_in_running_time, its queue is empty. In that case the only
1232    * way to wakeup the output thread is by injecting an event in the
1233    * queue. This usually happen with subtitle streams.
1234    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1235   if (ctx->need_unblock) {
1236     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
1237     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1238         GST_EVENT_TYPE_SERIALIZED,
1239         gst_structure_new ("splitmuxsink-unblock", "timestamp",
1240             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1241
1242     GST_SPLITMUX_UNLOCK (splitmux);
1243     gst_pad_send_event (ctx->sinkpad, event);
1244     GST_SPLITMUX_LOCK (splitmux);
1245
1246     ctx->need_unblock = FALSE;
1247     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1248     /* state may have changed while we were unlocked. Loop again if so */
1249     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
1250       return;
1251   }
1252
1253   if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1254     gboolean ready = TRUE;
1255
1256     /* Iterate each pad, and check that the input running time is at least
1257      * up to the reference running time, and if so handle the collected GOP */
1258     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
1259         GST_STIME_FORMAT " ctx %p",
1260         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
1261     for (cur = g_list_first (splitmux->contexts); cur != NULL;
1262         cur = g_list_next (cur)) {
1263       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1264
1265       GST_LOG_OBJECT (splitmux,
1266           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
1267           " EOS %d", tmpctx, tmpctx->srcpad,
1268           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
1269
1270       if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
1271           tmpctx->in_running_time < splitmux->max_in_running_time &&
1272           !tmpctx->in_eos) {
1273         GST_LOG_OBJECT (splitmux,
1274             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
1275             tmpctx, tmpctx->srcpad);
1276         ready = FALSE;
1277         break;
1278       }
1279     }
1280     if (ready) {
1281       GST_DEBUG_OBJECT (splitmux,
1282           "Collected GOP is complete. Processing (ctx %p)", ctx);
1283       /* All pads have a complete GOP, release it into the multiqueue */
1284       handle_gathered_gop (splitmux);
1285     }
1286   }
1287
1288   /* If upstream reached EOS we are not expecting more data, no need to wait
1289    * here. */
1290   if (ctx->in_eos)
1291     return;
1292
1293   /* Some pad is not yet ready, or GOP is being pushed
1294    * either way, sleep and wait to get woken */
1295   while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
1296       !ctx->flushing &&
1297       (ctx->in_running_time >= splitmux->max_in_running_time) &&
1298       (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
1299
1300     GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
1301     GST_SPLITMUX_WAIT_INPUT (splitmux);
1302     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1303   }
1304 }
1305
1306 static GstPadProbeReturn
1307 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1308 {
1309   GstSplitMuxSink *splitmux = ctx->splitmux;
1310   GstBuffer *buf;
1311   MqStreamBuf *buf_info = NULL;
1312   GstClockTime ts;
1313   gboolean loop_again;
1314   gboolean keyframe = FALSE;
1315
1316   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1317
1318   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1319   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1320     g_warning ("Buffer list handling not implemented");
1321     return GST_PAD_PROBE_DROP;
1322   }
1323   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1324     GstEvent *event = gst_pad_probe_info_get_event (info);
1325
1326     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1327
1328     switch (GST_EVENT_TYPE (event)) {
1329       case GST_EVENT_SEGMENT:
1330         gst_event_copy_segment (event, &ctx->in_segment);
1331         break;
1332       case GST_EVENT_FLUSH_STOP:
1333         GST_SPLITMUX_LOCK (splitmux);
1334         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1335         ctx->in_eos = FALSE;
1336         ctx->in_running_time = GST_CLOCK_STIME_NONE;
1337         GST_SPLITMUX_UNLOCK (splitmux);
1338         break;
1339       case GST_EVENT_EOS:
1340         GST_SPLITMUX_LOCK (splitmux);
1341         ctx->in_eos = TRUE;
1342
1343         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1344           goto beach;
1345
1346         if (ctx->is_reference) {
1347           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1348           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
1349           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1350           /* Wake up other input pads to collect this GOP */
1351           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1352           check_completed_gop (splitmux, ctx);
1353         } else if (splitmux->input_state ==
1354             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1355           /* If we are waiting for a GOP to be completed (ie, for aux
1356            * pads to catch up), then this pad is complete, so check
1357            * if the whole GOP is.
1358            */
1359           check_completed_gop (splitmux, ctx);
1360         }
1361         GST_SPLITMUX_UNLOCK (splitmux);
1362         break;
1363       case GST_EVENT_GAP:{
1364         GstClockTime gap_ts;
1365         GstClockTimeDiff rtime;
1366
1367         gst_event_parse_gap (event, &gap_ts, NULL);
1368         if (gap_ts == GST_CLOCK_TIME_NONE)
1369           break;
1370
1371         GST_SPLITMUX_LOCK (splitmux);
1372
1373         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1374           goto beach;
1375         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
1376
1377         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1378             GST_STIME_ARGS (rtime));
1379
1380         if (ctx->is_reference
1381             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1382           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
1383           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1384               GST_STIME_ARGS (splitmux->fragment_start_time));
1385           /* Also take this as the first start time when starting up,
1386            * so that we start counting overflow from the first frame */
1387           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1388             splitmux->max_in_running_time = splitmux->fragment_start_time;
1389         }
1390
1391         GST_SPLITMUX_UNLOCK (splitmux);
1392         break;
1393       }
1394       default:
1395         break;
1396     }
1397     return GST_PAD_PROBE_PASS;
1398   }
1399
1400   buf = gst_pad_probe_info_get_buffer (info);
1401   buf_info = mq_stream_buf_new ();
1402
1403   if (GST_BUFFER_PTS_IS_VALID (buf))
1404     ts = GST_BUFFER_PTS (buf);
1405   else
1406     ts = GST_BUFFER_DTS (buf);
1407
1408   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1409
1410   GST_SPLITMUX_LOCK (splitmux);
1411
1412   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1413     goto beach;
1414
1415   /* If this buffer has a timestamp, advance the input timestamp of the
1416    * stream */
1417   if (GST_CLOCK_TIME_IS_VALID (ts)) {
1418     GstClockTimeDiff running_time =
1419         my_segment_to_running_time (&ctx->in_segment, ts);
1420
1421     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1422         GST_STIME_ARGS (running_time));
1423
1424     if (GST_CLOCK_STIME_IS_VALID (running_time)
1425         && running_time > ctx->in_running_time)
1426       ctx->in_running_time = running_time;
1427   }
1428
1429   /* Try to make sure we have a valid running time */
1430   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1431     ctx->in_running_time =
1432         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1433   }
1434
1435   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1436       GST_STIME_ARGS (ctx->in_running_time));
1437
1438   buf_info->run_ts = ctx->in_running_time;
1439   buf_info->buf_size = gst_buffer_get_size (buf);
1440   buf_info->duration = GST_BUFFER_DURATION (buf);
1441
1442   /* initialize fragment_start_time */
1443   if (ctx->is_reference
1444       && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1445     splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
1446     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1447         GST_STIME_ARGS (splitmux->fragment_start_time));
1448     gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1449
1450     /* Also take this as the first start time when starting up,
1451      * so that we start counting overflow from the first frame */
1452     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1453       splitmux->max_in_running_time = splitmux->fragment_start_time;
1454     if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
1455       GST_WARNING_OBJECT (splitmux,
1456           "Could not request a keyframe. Files may not split at the exact location they should");
1457     }
1458     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1459   }
1460
1461   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1462       " total GOP bytes %" G_GUINT64_FORMAT,
1463       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
1464
1465   loop_again = TRUE;
1466   do {
1467     if (ctx->flushing)
1468       break;
1469
1470     switch (splitmux->input_state) {
1471       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
1472         if (ctx->is_reference) {
1473           /* If a keyframe, we have a complete GOP */
1474           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1475               !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1476               splitmux->max_in_running_time >= ctx->in_running_time) {
1477             /* Pass this buffer through */
1478             loop_again = FALSE;
1479             /* Allow other input pads to catch up to here too */
1480             splitmux->max_in_running_time = ctx->in_running_time;
1481             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1482             break;
1483           }
1484           GST_INFO_OBJECT (pad,
1485               "Have keyframe with running time %" GST_STIME_FORMAT,
1486               GST_STIME_ARGS (ctx->in_running_time));
1487           keyframe = TRUE;
1488           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1489           splitmux->max_in_running_time = ctx->in_running_time;
1490           /* Wake up other input pads to collect this GOP */
1491           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1492           check_completed_gop (splitmux, ctx);
1493           /* Store this new keyframe to remember the start of GOP */
1494           gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1495         } else {
1496           /* Pass this buffer if the reference ctx is far enough ahead */
1497           if (ctx->in_running_time < splitmux->max_in_running_time) {
1498             loop_again = FALSE;
1499             break;
1500           }
1501
1502           /* We're still waiting for a keyframe on the reference pad, sleep */
1503           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1504           GST_SPLITMUX_WAIT_INPUT (splitmux);
1505           GST_LOG_OBJECT (pad,
1506               "Done sleeping for GOP start input state now %d",
1507               splitmux->input_state);
1508         }
1509         break;
1510       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
1511         /* We're collecting a GOP. If this is the reference context,
1512          * we need to check if this is a keyframe that marks the start
1513          * of the next GOP. If it is, it marks the end of the GOP we're
1514          * collecting, so sleep and wait until all the other pads also
1515          * reach that timestamp - at which point, we have an entire GOP
1516          * and either go to ENDING_FILE or release this GOP to the muxer and
1517          * go back to COLLECT_GOP_START. */
1518
1519         /* If we overran the target timestamp, it might be time to process
1520          * the GOP, otherwise bail out for more data
1521          */
1522         GST_LOG_OBJECT (pad,
1523             "Checking TS %" GST_STIME_FORMAT " against max %"
1524             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
1525             GST_STIME_ARGS (splitmux->max_in_running_time));
1526
1527         if (ctx->in_running_time < splitmux->max_in_running_time) {
1528           loop_again = FALSE;
1529           break;
1530         }
1531
1532         GST_LOG_OBJECT (pad,
1533             "Collected last packet of GOP. Checking other pads");
1534         check_completed_gop (splitmux, ctx);
1535         break;
1536       }
1537       case SPLITMUX_INPUT_STATE_FINISHING_UP:
1538         loop_again = FALSE;
1539         break;
1540       default:
1541         loop_again = FALSE;
1542         break;
1543     }
1544   }
1545   while (loop_again);
1546
1547   if (keyframe) {
1548     splitmux->queued_keyframes++;
1549     buf_info->keyframe = TRUE;
1550   }
1551
1552   /* Update total input byte counter for overflow detect */
1553   splitmux->gop_total_bytes += buf_info->buf_size;
1554
1555   /* Now add this buffer to the queue just before returning */
1556   g_queue_push_head (&ctx->queued_bufs, buf_info);
1557
1558   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1559       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1560
1561   GST_SPLITMUX_UNLOCK (splitmux);
1562   return GST_PAD_PROBE_PASS;
1563
1564 beach:
1565   GST_SPLITMUX_UNLOCK (splitmux);
1566   if (buf_info)
1567     mq_stream_buf_free (buf_info);
1568   return GST_PAD_PROBE_PASS;
1569 }
1570
1571 static void
1572 grow_blocked_queues (GstSplitMuxSink * splitmux)
1573 {
1574   GList *cur;
1575
1576   /* Scan other queues for full-ness and grow them */
1577   for (cur = g_list_first (splitmux->contexts);
1578       cur != NULL; cur = g_list_next (cur)) {
1579     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1580     guint cur_limit;
1581     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
1582
1583     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
1584     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
1585
1586     if (cur_len >= cur_limit) {
1587       cur_limit = cur_len + 1;
1588       GST_DEBUG_OBJECT (tmpctx->q,
1589           "Queue overflowed and needs enlarging. Growing to %u buffers",
1590           cur_limit);
1591       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
1592     }
1593   }
1594 }
1595
1596 static void
1597 handle_q_underrun (GstElement * q, gpointer user_data)
1598 {
1599   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1600   GstSplitMuxSink *splitmux = ctx->splitmux;
1601
1602   GST_SPLITMUX_LOCK (splitmux);
1603   GST_DEBUG_OBJECT (q,
1604       "Queue reported underrun with %d keyframes and %d cmds enqueued",
1605       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1606   grow_blocked_queues (splitmux);
1607   GST_SPLITMUX_UNLOCK (splitmux);
1608 }
1609
1610 static void
1611 handle_q_overrun (GstElement * q, gpointer user_data)
1612 {
1613   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1614   GstSplitMuxSink *splitmux = ctx->splitmux;
1615   gboolean allow_grow = FALSE;
1616
1617   GST_SPLITMUX_LOCK (splitmux);
1618   GST_DEBUG_OBJECT (q,
1619       "Queue reported overrun with %d keyframes and %d cmds enqueued",
1620       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1621
1622   if (splitmux->queued_keyframes < 2) {
1623     /* Less than a full GOP queued, grow the queue */
1624     allow_grow = TRUE;
1625   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
1626     allow_grow = TRUE;
1627   } else {
1628     /* If another queue is starved, grow */
1629     GList *cur;
1630     for (cur = g_list_first (splitmux->contexts);
1631         cur != NULL; cur = g_list_next (cur)) {
1632       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1633       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1634         allow_grow = TRUE;
1635       }
1636     }
1637   }
1638   GST_SPLITMUX_UNLOCK (splitmux);
1639
1640   if (allow_grow) {
1641     guint cur_limit;
1642
1643     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
1644     cur_limit++;
1645
1646     GST_DEBUG_OBJECT (q,
1647         "Queue overflowed and needs enlarging. Growing to %u buffers",
1648         cur_limit);
1649
1650     g_object_set (q, "max-size-buffers", cur_limit, NULL);
1651   }
1652 }
1653
1654 static GstPad *
1655 gst_splitmux_sink_request_new_pad (GstElement * element,
1656     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1657 {
1658   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1659   GstPadTemplate *mux_template = NULL;
1660   GstPad *res = NULL;
1661   GstElement *q;
1662   GstPad *q_sink = NULL, *q_src = NULL;
1663   gchar *gname;
1664   gboolean is_video = FALSE;
1665   MqStreamCtx *ctx;
1666
1667   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1668
1669   GST_SPLITMUX_LOCK (splitmux);
1670   if (!create_muxer (splitmux))
1671     goto fail;
1672
1673   if (templ->name_template) {
1674     if (g_str_equal (templ->name_template, "video")) {
1675       if (splitmux->have_video)
1676         goto already_have_video;
1677
1678       /* FIXME: Look for a pad template with matching caps, rather than by name */
1679       mux_template =
1680           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1681           (splitmux->muxer), "video_%u");
1682
1683       /* Fallback to find sink pad templates named 'video' (flvmux) */
1684       if (!mux_template) {
1685         mux_template =
1686             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1687             (splitmux->muxer), "video");
1688       }
1689       is_video = TRUE;
1690       name = NULL;
1691     } else {
1692       mux_template =
1693           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1694           (splitmux->muxer), templ->name_template);
1695
1696       /* Fallback to find sink pad templates named 'audio' (flvmux) */
1697       if (!mux_template) {
1698         mux_template =
1699             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1700             (splitmux->muxer), "audio");
1701       }
1702     }
1703     if (mux_template == NULL) {
1704       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1705       mux_template =
1706           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1707           (splitmux->muxer), "sink_%d");
1708     }
1709   }
1710
1711   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1712   if (res == NULL)
1713     goto fail;
1714
1715   if (is_video)
1716     gname = g_strdup ("video");
1717   else if (name == NULL)
1718     gname = gst_pad_get_name (res);
1719   else
1720     gname = g_strdup (name);
1721
1722   if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
1723     goto fail;
1724
1725   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
1726
1727   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
1728       "max-size-buffers", 5, NULL);
1729
1730   q_sink = gst_element_get_static_pad (q, "sink");
1731   q_src = gst_element_get_static_pad (q, "src");
1732
1733   if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
1734     gst_element_release_request_pad (splitmux->muxer, res);
1735     gst_object_unref (GST_OBJECT (res));
1736     goto fail;
1737   }
1738
1739   gst_object_unref (GST_OBJECT (res));
1740
1741   ctx = mq_stream_ctx_new (splitmux);
1742   /* Context holds a ref: */
1743   ctx->q = gst_object_ref (q);
1744   ctx->srcpad = q_src;
1745   ctx->sinkpad = q_sink;
1746   ctx->q_overrun_id =
1747       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
1748   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
1749
1750   mq_stream_ctx_ref (ctx);
1751   ctx->src_pad_block_id =
1752       gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1753       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1754       _pad_block_destroy_src_notify);
1755   if (is_video && splitmux->reference_ctx != NULL) {
1756     splitmux->reference_ctx->is_reference = FALSE;
1757     splitmux->reference_ctx = NULL;
1758   }
1759   if (splitmux->reference_ctx == NULL) {
1760     splitmux->reference_ctx = ctx;
1761     ctx->is_reference = TRUE;
1762   }
1763
1764   res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
1765   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1766
1767   mq_stream_ctx_ref (ctx);
1768   ctx->sink_pad_block_id =
1769       gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1770       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1771       _pad_block_destroy_sink_notify);
1772
1773   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1774       " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
1775
1776   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1777
1778   g_free (gname);
1779
1780   if (is_video)
1781     splitmux->have_video = TRUE;
1782
1783   gst_pad_set_active (res, TRUE);
1784   gst_element_add_pad (element, res);
1785
1786   GST_SPLITMUX_UNLOCK (splitmux);
1787
1788   return res;
1789 fail:
1790   GST_SPLITMUX_UNLOCK (splitmux);
1791
1792   if (q_sink)
1793     gst_object_unref (q_sink);
1794   if (q_src)
1795     gst_object_unref (q_src);
1796   return NULL;
1797 already_have_video:
1798   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
1799   GST_SPLITMUX_UNLOCK (splitmux);
1800   return NULL;
1801 }
1802
1803 static void
1804 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1805 {
1806   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1807   GstPad *muxpad = NULL;
1808   MqStreamCtx *ctx =
1809       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1810
1811   GST_SPLITMUX_LOCK (splitmux);
1812
1813   if (splitmux->muxer == NULL)
1814     goto fail;                  /* Elements don't exist yet - nothing to release */
1815
1816   GST_INFO_OBJECT (pad, "releasing request pad");
1817
1818   muxpad = gst_pad_get_peer (ctx->srcpad);
1819
1820   /* Remove the context from our consideration */
1821   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1822
1823   if (ctx->sink_pad_block_id)
1824     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1825
1826   if (ctx->src_pad_block_id)
1827     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1828
1829   /* Can release the context now */
1830   mq_stream_ctx_unref (ctx);
1831   if (ctx == splitmux->reference_ctx)
1832     splitmux->reference_ctx = NULL;
1833
1834   /* Release and free the muxer input */
1835   if (muxpad) {
1836     gst_element_release_request_pad (splitmux->muxer, muxpad);
1837     gst_object_unref (muxpad);
1838   }
1839
1840   if (GST_PAD_PAD_TEMPLATE (pad) &&
1841       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
1842               (pad)), "video"))
1843     splitmux->have_video = FALSE;
1844
1845   gst_element_remove_pad (element, pad);
1846
1847   /* Reset the internal elements only after all request pads are released */
1848   if (splitmux->contexts == NULL)
1849     gst_splitmux_reset (splitmux);
1850
1851 fail:
1852   GST_SPLITMUX_UNLOCK (splitmux);
1853 }
1854
1855 static GstElement *
1856 create_element (GstSplitMuxSink * splitmux,
1857     const gchar * factory, const gchar * name, gboolean locked)
1858 {
1859   GstElement *ret = gst_element_factory_make (factory, name);
1860   if (ret == NULL) {
1861     g_warning ("Failed to create %s - splitmuxsink will not work", name);
1862     return NULL;
1863   }
1864
1865   if (locked) {
1866     /* Ensure the sink starts in locked state and NULL - it will be changed
1867      * by the filename setting code */
1868     gst_element_set_locked_state (ret, TRUE);
1869     gst_element_set_state (ret, GST_STATE_NULL);
1870   }
1871
1872   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1873     g_warning ("Could not add %s element - splitmuxsink will not work", name);
1874     gst_object_unref (ret);
1875     return NULL;
1876   }
1877
1878   return ret;
1879 }
1880
1881 static gboolean
1882 create_muxer (GstSplitMuxSink * splitmux)
1883 {
1884   /* Create internal elements */
1885   if (splitmux->muxer == NULL) {
1886     GstElement *provided_muxer = NULL;
1887
1888     GST_OBJECT_LOCK (splitmux);
1889     if (splitmux->provided_muxer != NULL)
1890       provided_muxer = gst_object_ref (splitmux->provided_muxer);
1891     GST_OBJECT_UNLOCK (splitmux);
1892
1893     if (provided_muxer == NULL) {
1894       if ((splitmux->muxer =
1895               create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
1896         goto fail;
1897     } else {
1898       /* Ensure it's not in locked state (we might be reusing an old element) */
1899       gst_element_set_locked_state (provided_muxer, FALSE);
1900       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1901         g_warning ("Could not add muxer element - splitmuxsink will not work");
1902         gst_object_unref (provided_muxer);
1903         goto fail;
1904       }
1905
1906       splitmux->muxer = provided_muxer;
1907       gst_object_unref (provided_muxer);
1908     }
1909   }
1910
1911   return TRUE;
1912 fail:
1913   return FALSE;
1914 }
1915
1916 static GstElement *
1917 find_sink (GstElement * e)
1918 {
1919   GstElement *res = NULL;
1920   GstIterator *iter;
1921   gboolean done = FALSE;
1922   GValue data = { 0, };
1923
1924   if (!GST_IS_BIN (e))
1925     return e;
1926
1927   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
1928     return e;
1929
1930   iter = gst_bin_iterate_sinks (GST_BIN (e));
1931   while (!done) {
1932     switch (gst_iterator_next (iter, &data)) {
1933       case GST_ITERATOR_OK:
1934       {
1935         GstElement *child = g_value_get_object (&data);
1936         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1937                 "location") != NULL) {
1938           res = child;
1939           done = TRUE;
1940         }
1941         g_value_reset (&data);
1942         break;
1943       }
1944       case GST_ITERATOR_RESYNC:
1945         gst_iterator_resync (iter);
1946         break;
1947       case GST_ITERATOR_DONE:
1948         done = TRUE;
1949         break;
1950       case GST_ITERATOR_ERROR:
1951         g_assert_not_reached ();
1952         break;
1953     }
1954   }
1955   g_value_unset (&data);
1956   gst_iterator_free (iter);
1957
1958   return res;
1959 }
1960
1961 static gboolean
1962 create_sink (GstSplitMuxSink * splitmux)
1963 {
1964   GstElement *provided_sink = NULL;
1965
1966   if (splitmux->active_sink == NULL) {
1967
1968     GST_OBJECT_LOCK (splitmux);
1969     if (splitmux->provided_sink != NULL)
1970       provided_sink = gst_object_ref (splitmux->provided_sink);
1971     GST_OBJECT_UNLOCK (splitmux);
1972
1973     if (provided_sink == NULL) {
1974       if ((splitmux->sink =
1975               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
1976         goto fail;
1977       splitmux->active_sink = splitmux->sink;
1978     } else {
1979       /* Ensure the sink starts in locked state and NULL - it will be changed
1980        * by the filename setting code */
1981       gst_element_set_locked_state (provided_sink, TRUE);
1982       gst_element_set_state (provided_sink, GST_STATE_NULL);
1983       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1984         g_warning ("Could not add sink elements - splitmuxsink will not work");
1985         gst_object_unref (provided_sink);
1986         goto fail;
1987       }
1988
1989       splitmux->active_sink = provided_sink;
1990
1991       /* The bin holds a ref now, we can drop our tmp ref */
1992       gst_object_unref (provided_sink);
1993
1994       /* Find the sink element */
1995       splitmux->sink = find_sink (splitmux->active_sink);
1996       if (splitmux->sink == NULL) {
1997         g_warning
1998             ("Could not locate sink element in provided sink - splitmuxsink will not work");
1999         goto fail;
2000       }
2001     }
2002
2003 #if 1
2004     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2005             "async") != NULL) {
2006       /* async child elements are causing state change races and weird
2007        * failures, so let's try and turn that off */
2008       g_object_set (splitmux->sink, "async", FALSE, NULL);
2009     }
2010 #endif
2011
2012     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2013       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2014       goto fail;
2015     }
2016   }
2017
2018   return TRUE;
2019 fail:
2020   return FALSE;
2021 }
2022
2023 #ifdef __GNUC__
2024 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2025 #endif
2026 static void
2027 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2028 {
2029   gchar *fname = NULL;
2030   GstSample *sample;
2031   GstCaps *caps;
2032
2033   gst_splitmux_sink_ensure_max_files (splitmux);
2034
2035   if (ctx->cur_out_buffer == NULL) {
2036     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2037   }
2038
2039   caps = gst_pad_get_current_caps (ctx->srcpad);
2040   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2041   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2042       splitmux->fragment_id, sample, &fname);
2043   gst_sample_unref (sample);
2044   if (caps)
2045     gst_caps_unref (caps);
2046
2047   if (fname == NULL) {
2048     /* Fallback to the old signal if the new one returned nothing */
2049     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2050         splitmux->fragment_id, &fname);
2051   }
2052
2053   if (!fname)
2054     fname = splitmux->location ?
2055         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2056
2057   if (fname) {
2058     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2059     g_object_set (splitmux->sink, "location", fname, NULL);
2060     g_free (fname);
2061
2062     splitmux->fragment_id++;
2063   }
2064 }
2065
2066 static void
2067 do_async_start (GstSplitMuxSink * splitmux)
2068 {
2069   GstMessage *message;
2070
2071   if (!splitmux->need_async_start) {
2072     GST_INFO_OBJECT (splitmux, "no async_start needed");
2073     return;
2074   }
2075
2076   splitmux->async_pending = TRUE;
2077
2078   GST_INFO_OBJECT (splitmux, "Sending async_start message");
2079   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2080   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2081       (splitmux), message);
2082 }
2083
2084 static void
2085 do_async_done (GstSplitMuxSink * splitmux)
2086 {
2087   GstMessage *message;
2088
2089   if (splitmux->async_pending) {
2090     GST_INFO_OBJECT (splitmux, "Sending async_done message");
2091     message =
2092         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2093         GST_CLOCK_TIME_NONE);
2094     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2095         (splitmux), message);
2096
2097     splitmux->async_pending = FALSE;
2098   }
2099
2100   splitmux->need_async_start = FALSE;
2101 }
2102
2103 static GstStateChangeReturn
2104 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2105 {
2106   GstStateChangeReturn ret;
2107   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2108
2109   switch (transition) {
2110     case GST_STATE_CHANGE_NULL_TO_READY:{
2111       GST_SPLITMUX_LOCK (splitmux);
2112       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2113         ret = GST_STATE_CHANGE_FAILURE;
2114         GST_SPLITMUX_UNLOCK (splitmux);
2115         goto beach;
2116       }
2117       GST_SPLITMUX_UNLOCK (splitmux);
2118       splitmux->fragment_id = 0;
2119       break;
2120     }
2121     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2122       GST_SPLITMUX_LOCK (splitmux);
2123       /* Start by collecting one input on each pad */
2124       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2125       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2126       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2127       splitmux->gop_start_time = splitmux->fragment_start_time =
2128           GST_CLOCK_STIME_NONE;
2129       splitmux->muxed_out_bytes = 0;
2130       GST_SPLITMUX_UNLOCK (splitmux);
2131       break;
2132     }
2133     case GST_STATE_CHANGE_PAUSED_TO_READY:
2134     case GST_STATE_CHANGE_READY_TO_NULL:
2135       GST_SPLITMUX_LOCK (splitmux);
2136       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2137       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2138       /* Wake up any blocked threads */
2139       GST_LOG_OBJECT (splitmux,
2140           "State change -> NULL or READY. Waking threads");
2141       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2142       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2143       GST_SPLITMUX_UNLOCK (splitmux);
2144       break;
2145     default:
2146       break;
2147   }
2148
2149   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2150   if (ret == GST_STATE_CHANGE_FAILURE)
2151     goto beach;
2152
2153   switch (transition) {
2154     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2155       splitmux->need_async_start = TRUE;
2156       break;
2157     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2158       /* Change state async, because our child sink might not
2159        * be ready to do that for us yet if it's state is still locked */
2160
2161       splitmux->need_async_start = TRUE;
2162       /* we want to go async to PAUSED until we managed to configure and add the
2163        * sink */
2164       GST_SPLITMUX_LOCK (splitmux);
2165       do_async_start (splitmux);
2166       GST_SPLITMUX_UNLOCK (splitmux);
2167       ret = GST_STATE_CHANGE_ASYNC;
2168       break;
2169     }
2170     case GST_STATE_CHANGE_READY_TO_NULL:
2171       GST_SPLITMUX_LOCK (splitmux);
2172       splitmux->fragment_id = 0;
2173       /* Reset internal elements only if no pad contexts are using them */
2174       if (splitmux->contexts == NULL)
2175         gst_splitmux_reset (splitmux);
2176       do_async_done (splitmux);
2177       GST_SPLITMUX_UNLOCK (splitmux);
2178       break;
2179     default:
2180       break;
2181   }
2182
2183 beach:
2184   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
2185       ret == GST_STATE_CHANGE_FAILURE) {
2186     /* Cleanup elements on failed transition out of NULL */
2187     gst_splitmux_reset (splitmux);
2188     GST_SPLITMUX_LOCK (splitmux);
2189     do_async_done (splitmux);
2190     GST_SPLITMUX_UNLOCK (splitmux);
2191   }
2192   return ret;
2193 }
2194
2195 gboolean
2196 register_splitmuxsink (GstPlugin * plugin)
2197 {
2198   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
2199       "Split File Muxing Sink");
2200
2201   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
2202       GST_TYPE_SPLITMUX_SINK);
2203 }
2204
2205 static void
2206 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
2207 {
2208   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
2209     splitmux->fragment_id = 0;
2210   }
2211 }