Send seek event to baseparse when aacparse seek failed in push mode
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * In the async-finalize mode, when the threshold is crossed, the old muxer
42  * and sink is disconnected from the pipeline and left to finish the file
43  * asynchronously, and a new muxer and sink is created to continue with the
44  * next fragment. For that reason, instead of muxer and sink objects, the
45  * muxer-factory and sink-factory properties are used to construct the new
46  * objects, together with muxer-properties and sink-properties.
47  *
48  * <refsect2>
49  * <title>Example pipelines</title>
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  * </refsect2>
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #include "config.h"
68 #endif
69
70 #include <string.h>
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
74
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
77
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
82
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
85
86 static void split_now (GstSplitMuxSink * splitmux);
87 static void split_after (GstSplitMuxSink * splitmux);
88 static void split_at_running_time (GstSplitMuxSink * splitmux,
89     GstClockTime split_time);
90
91 enum
92 {
93   PROP_0,
94   PROP_LOCATION,
95   PROP_MAX_SIZE_TIME,
96   PROP_MAX_SIZE_BYTES,
97   PROP_MAX_SIZE_TIMECODE,
98   PROP_SEND_KEYFRAME_REQUESTS,
99   PROP_MAX_FILES,
100   PROP_MUXER_OVERHEAD,
101   PROP_USE_ROBUST_MUXING,
102   PROP_ALIGNMENT_THRESHOLD,
103   PROP_MUXER,
104   PROP_SINK,
105   PROP_RESET_MUXER,
106   PROP_ASYNC_FINALIZE,
107   PROP_MUXER_FACTORY,
108   PROP_MUXER_PROPERTIES,
109   PROP_SINK_FACTORY,
110   PROP_SINK_PROPERTIES
111 };
112
113 #define DEFAULT_MAX_SIZE_TIME       0
114 #define DEFAULT_MAX_SIZE_BYTES      0
115 #define DEFAULT_MAX_FILES           0
116 #define DEFAULT_MUXER_OVERHEAD      0.02
117 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
118 #define DEFAULT_ALIGNMENT_THRESHOLD 0
119 #define DEFAULT_MUXER "mp4mux"
120 #define DEFAULT_SINK "filesink"
121 #define DEFAULT_USE_ROBUST_MUXING FALSE
122 #define DEFAULT_RESET_MUXER TRUE
123 #define DEFAULT_ASYNC_FINALIZE FALSE
124
125 typedef struct _AsyncEosHelper
126 {
127   MqStreamCtx *ctx;
128   GstPad *pad;
129 } AsyncEosHelper;
130
131 enum
132 {
133   SIGNAL_FORMAT_LOCATION,
134   SIGNAL_FORMAT_LOCATION_FULL,
135   SIGNAL_SPLIT_NOW,
136   SIGNAL_SPLIT_AFTER,
137   SIGNAL_SPLIT_AT_RUNNING_TIME,
138   SIGNAL_MUXER_ADDED,
139   SIGNAL_SINK_ADDED,
140   SIGNAL_LAST
141 };
142
143 static guint signals[SIGNAL_LAST];
144
145 static GstStaticPadTemplate video_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("video",
147     GST_PAD_SINK,
148     GST_PAD_REQUEST,
149     GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate audio_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("audio_%u",
152     GST_PAD_SINK,
153     GST_PAD_REQUEST,
154     GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate subtitle_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
157     GST_PAD_SINK,
158     GST_PAD_REQUEST,
159     GST_STATIC_CAPS_ANY);
160 static GstStaticPadTemplate caption_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("caption_%u",
162     GST_PAD_SINK,
163     GST_PAD_REQUEST,
164     GST_STATIC_CAPS_ANY);
165
166 static GQuark PAD_CONTEXT;
167 static GQuark EOS_FROM_US;
168 static GQuark RUNNING_TIME;
169 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
170  * to forward an incoming EOS message, but we cannot rely on the state of the
171  * splitmux anymore, so we set this qdata on the sink instead.
172  * The muxer and sink must be destroyed after both of these things have
173  * finished:
174  * 1) The EOS message has been sent when the fragment is ending
175  * 2) The muxer has been unlinked and relinked
176  * Therefore, EOS_FROM_US can have these two values:
177  * 0: EOS was not requested from us. Forward the message. The muxer and the
178  * sink will be destroyed together with the rest of the bin.
179  * 1: EOS was requested from us, but the other of the two tasks hasn't
180  * finished. Set EOS_FROM_US to 2 and do your stuff.
181  * 2: EOS was requested from us and the other of the two tasks has finished.
182  * Now we can destroy the muxer and the sink.
183  */
184
185 static void
186 _do_init (void)
187 {
188   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
189   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
190   RUNNING_TIME = g_quark_from_static_string ("running-time");
191 }
192
193 #define gst_splitmux_sink_parent_class parent_class
194 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
195     _do_init ());
196
197 static gboolean create_muxer (GstSplitMuxSink * splitmux);
198 static gboolean create_sink (GstSplitMuxSink * splitmux);
199 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
200     const GValue * value, GParamSpec * pspec);
201 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
202     GValue * value, GParamSpec * pspec);
203 static void gst_splitmux_sink_dispose (GObject * object);
204 static void gst_splitmux_sink_finalize (GObject * object);
205
206 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
207     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
208 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
209
210 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
211     element, GstStateChange transition);
212
213 static void bus_handler (GstBin * bin, GstMessage * msg);
214 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
215 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
216 static void mq_stream_ctx_free (MqStreamCtx * ctx);
217 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
218
219 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
220 static GstElement *create_element (GstSplitMuxSink * splitmux,
221     const gchar * factory, const gchar * name, gboolean locked);
222
223 static void do_async_done (GstSplitMuxSink * splitmux);
224
225 static MqStreamBuf *
226 mq_stream_buf_new (void)
227 {
228   return g_slice_new0 (MqStreamBuf);
229 }
230
231 static void
232 mq_stream_buf_free (MqStreamBuf * data)
233 {
234   g_slice_free (MqStreamBuf, data);
235 }
236
237 static SplitMuxOutputCommand *
238 out_cmd_buf_new (void)
239 {
240   return g_slice_new0 (SplitMuxOutputCommand);
241 }
242
243 static void
244 out_cmd_buf_free (SplitMuxOutputCommand * data)
245 {
246   g_slice_free (SplitMuxOutputCommand, data);
247 }
248
249 static void
250 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
251 {
252   GObjectClass *gobject_class = (GObjectClass *) klass;
253   GstElementClass *gstelement_class = (GstElementClass *) klass;
254   GstBinClass *gstbin_class = (GstBinClass *) klass;
255
256   gobject_class->set_property = gst_splitmux_sink_set_property;
257   gobject_class->get_property = gst_splitmux_sink_get_property;
258   gobject_class->dispose = gst_splitmux_sink_dispose;
259   gobject_class->finalize = gst_splitmux_sink_finalize;
260
261   gst_element_class_set_static_metadata (gstelement_class,
262       "Split Muxing Bin", "Generic/Bin/Muxer",
263       "Convenience bin that muxes incoming streams into multiple time/size limited files",
264       "Jan Schmidt <jan@centricular.com>");
265
266   gst_element_class_add_static_pad_template (gstelement_class,
267       &video_sink_template);
268   gst_element_class_add_static_pad_template (gstelement_class,
269       &audio_sink_template);
270   gst_element_class_add_static_pad_template (gstelement_class,
271       &subtitle_sink_template);
272   gst_element_class_add_static_pad_template (gstelement_class,
273       &caption_sink_template);
274
275   gstelement_class->change_state =
276       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
277   gstelement_class->request_new_pad =
278       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
279   gstelement_class->release_pad =
280       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
281
282   gstbin_class->handle_message = bus_handler;
283
284   g_object_class_install_property (gobject_class, PROP_LOCATION,
285       g_param_spec_string ("location", "File Output Pattern",
286           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
287           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
288   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
289       g_param_spec_double ("mux-overhead", "Muxing Overhead",
290           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
291           DEFAULT_MUXER_OVERHEAD,
292           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
293
294   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
295       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
296           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
297           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
299       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
300           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
301           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
303       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
304           "Maximum difference in timecode between first and last frame. "
305           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
306           "Will only be effective if a timecode track is present.",
307           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
309       g_param_spec_boolean ("send-keyframe-requests",
310           "Request keyframes at max-size-time",
311           "Request a keyframe every max-size-time ns to try splitting at that point. "
312           "Needs max-size-bytes to be 0 in order to be effective.",
313           DEFAULT_SEND_KEYFRAME_REQUESTS,
314           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
316       g_param_spec_uint ("max-files", "Max files",
317           "Maximum number of files to keep on disk. Once the maximum is reached,"
318           "old files start to be deleted to make room for new ones.", 0,
319           G_MAXUINT, DEFAULT_MAX_FILES,
320           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
322       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
323           "Allow non-reference streams to be that many ns before the reference"
324           " stream",
325           0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
326           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327
328   g_object_class_install_property (gobject_class, PROP_MUXER,
329       g_param_spec_object ("muxer", "Muxer",
330           "The muxer element to use (NULL = default mp4mux). "
331           "Valid only for async-finalize = FALSE",
332           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333   g_object_class_install_property (gobject_class, PROP_SINK,
334       g_param_spec_object ("sink", "Sink",
335           "The sink element (or element chain) to use (NULL = default filesink). "
336           "Valid only for async-finalize = FALSE",
337           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338
339   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
340       g_param_spec_boolean ("use-robust-muxing",
341           "Support robust-muxing mode of some muxers",
342           "Check if muxers support robust muxing via the reserved-max-duration and "
343           "reserved-duration-remaining properties and use them if so. "
344           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
345           " create new fragments if the reserved header space is about to overflow. "
346           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
347           "manually by the app to a non-zero value for robust muxing to have an effect.",
348           DEFAULT_USE_ROBUST_MUXING,
349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
352       g_param_spec_boolean ("reset-muxer",
353           "Reset Muxer",
354           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
355           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
358       g_param_spec_boolean ("async-finalize",
359           "Finalize fragments asynchronously",
360           "Finalize each fragment asynchronously and start a new one",
361           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
363       g_param_spec_string ("muxer-factory", "Muxer factory",
364           "The muxer element factory to use (default = mp4mux). "
365           "Valid only for async-finalize = TRUE",
366           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
368       g_param_spec_boxed ("muxer-properties", "Muxer properties",
369           "The muxer element properties to use. "
370           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
371           "Valid only for async-finalize = TRUE",
372           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
374       g_param_spec_string ("sink-factory", "Sink factory",
375           "The sink element factory to use (default = filesink). "
376           "Valid only for async-finalize = TRUE",
377           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
379       g_param_spec_boxed ("sink-properties", "Sink properties",
380           "The sink element properties to use. "
381           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
382           "Valid only for async-finalize = TRUE",
383           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
384
385   /**
386    * GstSplitMuxSink::format-location:
387    * @splitmux: the #GstSplitMuxSink
388    * @fragment_id: the sequence number of the file to be created
389    *
390    * Returns: the location to be used for the next output file
391    */
392   signals[SIGNAL_FORMAT_LOCATION] =
393       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
394       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
395
396   /**
397    * GstSplitMuxSink::format-location-full:
398    * @splitmux: the #GstSplitMuxSink
399    * @fragment_id: the sequence number of the file to be created
400    * @first_sample: A #GstSample containing the first buffer
401    *   from the reference stream in the new file
402    *
403    * Returns: the location to be used for the next output file
404    */
405   signals[SIGNAL_FORMAT_LOCATION_FULL] =
406       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
407       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
408       GST_TYPE_SAMPLE);
409
410   /**
411    * GstSplitMuxSink::split-now:
412    * @splitmux: the #GstSplitMuxSink
413    *
414    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
415    * The current GOP will be output to the new file.
416    *
417    * Since: 1.14
418    */
419   signals[SIGNAL_SPLIT_NOW] =
420       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
421       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
422           split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
423
424   /**
425    * GstSplitMuxSink::split-after:
426    * @splitmux: the #GstSplitMuxSink
427    *
428    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
429    * The current GOP will be output to the old file.
430    *
431    * Since: 1.16
432    */
433   signals[SIGNAL_SPLIT_AFTER] =
434       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
435       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
436           split_after), NULL, NULL, NULL, G_TYPE_NONE, 0);
437
438   /**
439    * GstSplitMuxSink::split-now:
440    * @splitmux: the #GstSplitMuxSink
441    *
442    * When called by the user, this action signal splits the video file (and
443    * begins a new one) as soon as the given running time is reached. If this
444    * action signal is called multiple times, running times are queued up and
445    * processed in the order they were given.
446    *
447    * Note that this is prone to race conditions, where said running time is
448    * reached and surpassed before we had a chance to split. The file will
449    * still split immediately, but in order to make sure that the split doesn't
450    * happen too late, it is recommended to call this action signal from
451    * something that will prevent further buffers from flowing into
452    * splitmuxsink before the split is completed, such as a pad probe before
453    * splitmuxsink.
454    *
455    *
456    * Since: 1.16
457    */
458   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
459       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
460       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
461           split_at_running_time), NULL, NULL, NULL, G_TYPE_NONE, 1,
462       G_TYPE_UINT64);
463
464   /**
465    * GstSplitMuxSink::muxer-added:
466    * @splitmux: the #GstSplitMuxSink
467    * @muxer: the newly added muxer element
468    *
469    * Since: 1.14
470    */
471   signals[SIGNAL_MUXER_ADDED] =
472       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
473       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
474
475   /**
476    * GstSplitMuxSink::sink-added:
477    * @splitmux: the #GstSplitMuxSink
478    * @sink: the newly added sink element
479    *
480    * Since: 1.14
481    */
482   signals[SIGNAL_SINK_ADDED] =
483       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
484       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
485
486   klass->split_now = split_now;
487   klass->split_after = split_after;
488   klass->split_at_running_time = split_at_running_time;
489 }
490
491 static void
492 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
493 {
494   g_mutex_init (&splitmux->lock);
495   g_cond_init (&splitmux->input_cond);
496   g_cond_init (&splitmux->output_cond);
497   g_queue_init (&splitmux->out_cmd_q);
498
499   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
500   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
501   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
502   splitmux->max_files = DEFAULT_MAX_FILES;
503   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
504   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
505   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
506   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
507   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
508
509   splitmux->threshold_timecode_str = NULL;
510
511   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
512   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
513   splitmux->muxer_properties = NULL;
514   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
515   splitmux->sink_properties = NULL;
516
517   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
518   splitmux->split_requested = FALSE;
519   splitmux->do_split_next_gop = FALSE;
520   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
521 }
522
523 static void
524 gst_splitmux_reset (GstSplitMuxSink * splitmux)
525 {
526   if (splitmux->muxer) {
527     gst_element_set_locked_state (splitmux->muxer, TRUE);
528     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
529     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
530   }
531   if (splitmux->active_sink) {
532     gst_element_set_locked_state (splitmux->active_sink, TRUE);
533     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
534     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
535   }
536
537   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
538 }
539
540 static void
541 gst_splitmux_sink_dispose (GObject * object)
542 {
543   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
544
545   /* Calling parent dispose invalidates all child pointers */
546   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
547
548   G_OBJECT_CLASS (parent_class)->dispose (object);
549 }
550
551 static void
552 gst_splitmux_sink_finalize (GObject * object)
553 {
554   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
555   g_cond_clear (&splitmux->input_cond);
556   g_cond_clear (&splitmux->output_cond);
557   g_mutex_clear (&splitmux->lock);
558   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
559   g_queue_clear (&splitmux->out_cmd_q);
560
561   if (splitmux->provided_sink)
562     gst_object_unref (splitmux->provided_sink);
563   if (splitmux->provided_muxer)
564     gst_object_unref (splitmux->provided_muxer);
565
566   if (splitmux->muxer_factory)
567     g_free (splitmux->muxer_factory);
568   if (splitmux->muxer_properties)
569     gst_structure_free (splitmux->muxer_properties);
570   if (splitmux->sink_factory)
571     g_free (splitmux->sink_factory);
572   if (splitmux->sink_properties)
573     gst_structure_free (splitmux->sink_properties);
574
575   if (splitmux->threshold_timecode_str)
576     g_free (splitmux->threshold_timecode_str);
577
578   if (splitmux->times_to_split)
579     gst_queue_array_free (splitmux->times_to_split);
580
581   g_free (splitmux->location);
582
583   /* Make sure to free any un-released contexts. There should not be any,
584    * because the dispose will have freed all request pads though */
585   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
586   g_list_free (splitmux->contexts);
587
588   G_OBJECT_CLASS (parent_class)->finalize (object);
589 }
590
591 /*
592  * Set any time threshold to the muxer, if it has
593  * reserved-max-duration and reserved-duration-remaining
594  * properties. Called when creating/claiming the muxer
595  * in create_elements() */
596 static void
597 update_muxer_properties (GstSplitMuxSink * sink)
598 {
599   GObjectClass *klass;
600   GstClockTime threshold_time;
601
602   sink->muxer_has_reserved_props = FALSE;
603   if (sink->muxer == NULL)
604     return;
605   klass = G_OBJECT_GET_CLASS (sink->muxer);
606   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
607     return;
608   if (g_object_class_find_property (klass,
609           "reserved-duration-remaining") == NULL)
610     return;
611   sink->muxer_has_reserved_props = TRUE;
612
613   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
614       GST_TIME_ARGS (sink->threshold_time));
615   GST_OBJECT_LOCK (sink);
616   threshold_time = sink->threshold_time;
617   GST_OBJECT_UNLOCK (sink);
618
619   if (threshold_time > 0) {
620     /* Tell the muxer how much space to reserve */
621     GstClockTime muxer_threshold = threshold_time;
622     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
623   }
624 }
625
626 static void
627 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
628     const GValue * value, GParamSpec * pspec)
629 {
630   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
631
632   switch (prop_id) {
633     case PROP_LOCATION:{
634       GST_OBJECT_LOCK (splitmux);
635       g_free (splitmux->location);
636       splitmux->location = g_value_dup_string (value);
637       GST_OBJECT_UNLOCK (splitmux);
638       break;
639     }
640     case PROP_MAX_SIZE_BYTES:
641       GST_OBJECT_LOCK (splitmux);
642       splitmux->threshold_bytes = g_value_get_uint64 (value);
643       GST_OBJECT_UNLOCK (splitmux);
644       break;
645     case PROP_MAX_SIZE_TIME:
646       GST_OBJECT_LOCK (splitmux);
647       splitmux->threshold_time = g_value_get_uint64 (value);
648       GST_OBJECT_UNLOCK (splitmux);
649       break;
650     case PROP_MAX_SIZE_TIMECODE:
651       GST_OBJECT_LOCK (splitmux);
652       splitmux->threshold_timecode_str = g_value_dup_string (value);
653       GST_OBJECT_UNLOCK (splitmux);
654       break;
655     case PROP_SEND_KEYFRAME_REQUESTS:
656       GST_OBJECT_LOCK (splitmux);
657       splitmux->send_keyframe_requests = g_value_get_boolean (value);
658       GST_OBJECT_UNLOCK (splitmux);
659       break;
660     case PROP_MAX_FILES:
661       GST_OBJECT_LOCK (splitmux);
662       splitmux->max_files = g_value_get_uint (value);
663       GST_OBJECT_UNLOCK (splitmux);
664       break;
665     case PROP_MUXER_OVERHEAD:
666       GST_OBJECT_LOCK (splitmux);
667       splitmux->mux_overhead = g_value_get_double (value);
668       GST_OBJECT_UNLOCK (splitmux);
669       break;
670     case PROP_USE_ROBUST_MUXING:
671       GST_OBJECT_LOCK (splitmux);
672       splitmux->use_robust_muxing = g_value_get_boolean (value);
673       GST_OBJECT_UNLOCK (splitmux);
674       if (splitmux->use_robust_muxing)
675         update_muxer_properties (splitmux);
676       break;
677     case PROP_ALIGNMENT_THRESHOLD:
678       GST_OBJECT_LOCK (splitmux);
679       splitmux->alignment_threshold = g_value_get_uint64 (value);
680       GST_OBJECT_UNLOCK (splitmux);
681       break;
682     case PROP_SINK:
683       GST_OBJECT_LOCK (splitmux);
684       if (splitmux->provided_sink)
685         gst_object_unref (splitmux->provided_sink);
686       splitmux->provided_sink = g_value_get_object (value);
687       gst_object_ref_sink (splitmux->provided_sink);
688       GST_OBJECT_UNLOCK (splitmux);
689       break;
690     case PROP_MUXER:
691       GST_OBJECT_LOCK (splitmux);
692       if (splitmux->provided_muxer)
693         gst_object_unref (splitmux->provided_muxer);
694       splitmux->provided_muxer = g_value_get_object (value);
695       gst_object_ref_sink (splitmux->provided_muxer);
696       GST_OBJECT_UNLOCK (splitmux);
697       break;
698     case PROP_RESET_MUXER:
699       GST_OBJECT_LOCK (splitmux);
700       splitmux->reset_muxer = g_value_get_boolean (value);
701       GST_OBJECT_UNLOCK (splitmux);
702       break;
703     case PROP_ASYNC_FINALIZE:
704       GST_OBJECT_LOCK (splitmux);
705       splitmux->async_finalize = g_value_get_boolean (value);
706       GST_OBJECT_UNLOCK (splitmux);
707       break;
708     case PROP_MUXER_FACTORY:
709       GST_OBJECT_LOCK (splitmux);
710       if (splitmux->muxer_factory)
711         g_free (splitmux->muxer_factory);
712       splitmux->muxer_factory = g_value_dup_string (value);
713       GST_OBJECT_UNLOCK (splitmux);
714       break;
715     case PROP_MUXER_PROPERTIES:
716       GST_OBJECT_LOCK (splitmux);
717       if (splitmux->muxer_properties)
718         gst_structure_free (splitmux->muxer_properties);
719       if (gst_value_get_structure (value))
720         splitmux->muxer_properties =
721             gst_structure_copy (gst_value_get_structure (value));
722       else
723         splitmux->muxer_properties = NULL;
724       GST_OBJECT_UNLOCK (splitmux);
725       break;
726     case PROP_SINK_FACTORY:
727       GST_OBJECT_LOCK (splitmux);
728       if (splitmux->sink_factory)
729         g_free (splitmux->sink_factory);
730       splitmux->sink_factory = g_value_dup_string (value);
731       GST_OBJECT_UNLOCK (splitmux);
732       break;
733     case PROP_SINK_PROPERTIES:
734       GST_OBJECT_LOCK (splitmux);
735       if (splitmux->sink_properties)
736         gst_structure_free (splitmux->sink_properties);
737       if (gst_value_get_structure (value))
738         splitmux->sink_properties =
739             gst_structure_copy (gst_value_get_structure (value));
740       else
741         splitmux->sink_properties = NULL;
742       GST_OBJECT_UNLOCK (splitmux);
743       break;
744     default:
745       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
746       break;
747   }
748 }
749
750 static void
751 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
752     GValue * value, GParamSpec * pspec)
753 {
754   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
755
756   switch (prop_id) {
757     case PROP_LOCATION:
758       GST_OBJECT_LOCK (splitmux);
759       g_value_set_string (value, splitmux->location);
760       GST_OBJECT_UNLOCK (splitmux);
761       break;
762     case PROP_MAX_SIZE_BYTES:
763       GST_OBJECT_LOCK (splitmux);
764       g_value_set_uint64 (value, splitmux->threshold_bytes);
765       GST_OBJECT_UNLOCK (splitmux);
766       break;
767     case PROP_MAX_SIZE_TIME:
768       GST_OBJECT_LOCK (splitmux);
769       g_value_set_uint64 (value, splitmux->threshold_time);
770       GST_OBJECT_UNLOCK (splitmux);
771       break;
772     case PROP_MAX_SIZE_TIMECODE:
773       GST_OBJECT_LOCK (splitmux);
774       g_value_set_string (value, splitmux->threshold_timecode_str);
775       GST_OBJECT_UNLOCK (splitmux);
776       break;
777     case PROP_SEND_KEYFRAME_REQUESTS:
778       GST_OBJECT_LOCK (splitmux);
779       g_value_set_boolean (value, splitmux->send_keyframe_requests);
780       GST_OBJECT_UNLOCK (splitmux);
781       break;
782     case PROP_MAX_FILES:
783       GST_OBJECT_LOCK (splitmux);
784       g_value_set_uint (value, splitmux->max_files);
785       GST_OBJECT_UNLOCK (splitmux);
786       break;
787     case PROP_MUXER_OVERHEAD:
788       GST_OBJECT_LOCK (splitmux);
789       g_value_set_double (value, splitmux->mux_overhead);
790       GST_OBJECT_UNLOCK (splitmux);
791       break;
792     case PROP_USE_ROBUST_MUXING:
793       GST_OBJECT_LOCK (splitmux);
794       g_value_set_boolean (value, splitmux->use_robust_muxing);
795       GST_OBJECT_UNLOCK (splitmux);
796       break;
797     case PROP_ALIGNMENT_THRESHOLD:
798       GST_OBJECT_LOCK (splitmux);
799       g_value_set_uint64 (value, splitmux->alignment_threshold);
800       GST_OBJECT_UNLOCK (splitmux);
801       break;
802     case PROP_SINK:
803       GST_OBJECT_LOCK (splitmux);
804       g_value_set_object (value, splitmux->provided_sink);
805       GST_OBJECT_UNLOCK (splitmux);
806       break;
807     case PROP_MUXER:
808       GST_OBJECT_LOCK (splitmux);
809       g_value_set_object (value, splitmux->provided_muxer);
810       GST_OBJECT_UNLOCK (splitmux);
811       break;
812     case PROP_RESET_MUXER:
813       GST_OBJECT_LOCK (splitmux);
814       g_value_set_boolean (value, splitmux->reset_muxer);
815       GST_OBJECT_UNLOCK (splitmux);
816       break;
817     case PROP_ASYNC_FINALIZE:
818       GST_OBJECT_LOCK (splitmux);
819       g_value_set_boolean (value, splitmux->async_finalize);
820       GST_OBJECT_UNLOCK (splitmux);
821       break;
822     case PROP_MUXER_FACTORY:
823       GST_OBJECT_LOCK (splitmux);
824       g_value_set_string (value, splitmux->muxer_factory);
825       GST_OBJECT_UNLOCK (splitmux);
826       break;
827     case PROP_MUXER_PROPERTIES:
828       GST_OBJECT_LOCK (splitmux);
829       gst_value_set_structure (value, splitmux->muxer_properties);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK_FACTORY:
833       GST_OBJECT_LOCK (splitmux);
834       g_value_set_string (value, splitmux->sink_factory);
835       GST_OBJECT_UNLOCK (splitmux);
836       break;
837     case PROP_SINK_PROPERTIES:
838       GST_OBJECT_LOCK (splitmux);
839       gst_value_set_structure (value, splitmux->sink_properties);
840       GST_OBJECT_UNLOCK (splitmux);
841       break;
842     default:
843       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
844       break;
845   }
846 }
847
848 /* Convenience function */
849 static inline GstClockTimeDiff
850 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
851 {
852   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
853
854   if (GST_CLOCK_TIME_IS_VALID (val)) {
855     gboolean sign =
856         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
857     if (sign > 0)
858       res = val;
859     else if (sign < 0)
860       res = -val;
861   }
862   return res;
863 }
864
865 static MqStreamCtx *
866 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
867 {
868   MqStreamCtx *ctx;
869
870   ctx = g_new0 (MqStreamCtx, 1);
871   ctx->splitmux = splitmux;
872   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
873   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
874   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
875   g_queue_init (&ctx->queued_bufs);
876   return ctx;
877 }
878
879 static void
880 mq_stream_ctx_free (MqStreamCtx * ctx)
881 {
882   if (ctx->q) {
883     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
884
885     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
886
887     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
888       gst_element_set_locked_state (ctx->q, TRUE);
889       gst_element_set_state (ctx->q, GST_STATE_NULL);
890       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
891       gst_object_unref (parent);
892     }
893     gst_object_unref (ctx->q);
894   }
895   gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
896   gst_object_unref (ctx->sinkpad);
897   gst_object_unref (ctx->srcpad);
898   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
899   g_queue_clear (&ctx->queued_bufs);
900   g_free (ctx);
901 }
902
903 static void
904 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
905     GstElement * sink)
906 {
907   gchar *location = NULL;
908   GstMessage *msg;
909   const gchar *msg_name = opened ?
910       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
911   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
912
913   if (!opened) {
914     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
915     if (rtime)
916       running_time = *rtime;
917   }
918
919   g_object_get (sink, "location", &location, NULL);
920
921   /* If it's in the middle of a teardown, the reference_ctc might have become
922    * NULL */
923   if (splitmux->reference_ctx) {
924     msg = gst_message_new_element (GST_OBJECT (splitmux),
925         gst_structure_new (msg_name,
926             "location", G_TYPE_STRING, location,
927             "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
928     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
929   }
930
931   g_free (location);
932 }
933
934 static void
935 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
936 {
937   GstEvent *eos;
938   GstPad *pad;
939   MqStreamCtx *ctx;
940
941   eos = gst_event_new_eos ();
942   pad = helper->pad;
943   ctx = helper->ctx;
944
945   GST_SPLITMUX_LOCK (splitmux);
946   if (!pad)
947     pad = gst_pad_get_peer (ctx->srcpad);
948   GST_SPLITMUX_UNLOCK (splitmux);
949
950   gst_pad_send_event (pad, eos);
951   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
952
953   gst_object_unref (pad);
954   g_free (helper);
955 }
956
957 /* Called with lock held, drops the lock to send EOS to the
958  * pad
959  */
960 static void
961 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
962 {
963   GstEvent *eos;
964   GstPad *pad;
965
966   eos = gst_event_new_eos ();
967   pad = gst_pad_get_peer (ctx->srcpad);
968
969   ctx->out_eos = TRUE;
970
971   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
972   GST_SPLITMUX_UNLOCK (splitmux);
973   gst_pad_send_event (pad, eos);
974   GST_SPLITMUX_LOCK (splitmux);
975
976   gst_object_unref (pad);
977 }
978
979 /* Called with lock held. Schedules an EOS event to the ctx pad
980  * to happen in another thread */
981 static void
982 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
983 {
984   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
985   GstPad *srcpad, *sinkpad;
986
987   srcpad = ctx->srcpad;
988   sinkpad = gst_pad_get_peer (srcpad);
989
990   helper->ctx = ctx;
991   helper->pad = sinkpad;        /* Takes the reference */
992
993   ctx->out_eos_async_done = TRUE;
994   /* HACK: Here, we explicitly unset the SINK flag on the target sink element
995    * that's about to be asynchronously disposed, so that it no longer
996    * participates in GstBin EOS logic. This fixes a race where if
997    * splitmuxsink really reaches EOS before an asynchronous background
998    * element has finished, then the bin won't actually send EOS to the
999    * pipeline. Even after finishing and removing the old element, the
1000    * bin doesn't re-check EOS status on removing a SINK element. This
1001    * should be fixed in core, making this hack unnecessary. */
1002   GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
1003
1004   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1005       sinkpad, ctx);
1006
1007   g_assert_nonnull (helper->pad);
1008   gst_element_call_async (GST_ELEMENT (splitmux),
1009       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1010 }
1011
1012 /* Called with lock held. TRUE iff all contexts have a
1013  * pending (or delivered) async eos event */
1014 static gboolean
1015 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1016 {
1017   gboolean ret = TRUE;
1018   GList *item;
1019
1020   for (item = splitmux->contexts; item; item = item->next) {
1021     MqStreamCtx *ctx = item->data;
1022     ret &= ctx->out_eos_async_done;
1023   }
1024   return ret;
1025 }
1026
1027 /* Called with splitmux lock held to check if this output
1028  * context needs to sleep to wait for the release of the
1029  * next GOP, or to send EOS to close out the current file
1030  */
1031 static void
1032 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1033 {
1034   if (ctx->caps_change)
1035     return;
1036
1037   do {
1038     /* When first starting up, the reference stream has to output
1039      * the first buffer to prepare the muxer and sink */
1040     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1041     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1042
1043     if (!(splitmux->max_out_running_time == 0 ||
1044             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1045             splitmux->alignment_threshold == 0 ||
1046             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1047       my_max_out_running_time -= splitmux->alignment_threshold;
1048       GST_LOG_OBJECT (ctx->srcpad,
1049           "Max out running time currently %" GST_STIME_FORMAT
1050           ", with threshold applied it is %" GST_STIME_FORMAT,
1051           GST_STIME_ARGS (splitmux->max_out_running_time),
1052           GST_STIME_ARGS (my_max_out_running_time));
1053     }
1054
1055     if (ctx->flushing
1056         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1057       return;
1058
1059     GST_LOG_OBJECT (ctx->srcpad,
1060         "Checking running time %" GST_STIME_FORMAT " against max %"
1061         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1062         GST_STIME_ARGS (my_max_out_running_time));
1063
1064     if (can_output) {
1065       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1066           ctx->out_running_time < my_max_out_running_time) {
1067         return;
1068       }
1069
1070       switch (splitmux->output_state) {
1071         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1072           /* We only get here if we've finished outputting a GOP and need to know
1073            * what to do next */
1074           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1075           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1076           continue;
1077
1078         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1079           /* We've reached the max out running_time to get here, so end this file now */
1080           if (ctx->out_eos == FALSE) {
1081             if (splitmux->async_finalize) {
1082               /* We must set EOS asynchronously at this point. We cannot defer
1083                * it, because we need all contexts to wake up, for the
1084                * reference context to eventually give us something at
1085                * START_NEXT_FILE. Otherwise, collectpads might choose another
1086                * context to give us the first buffer, and format-location-full
1087                * will not contain a valid sample. */
1088               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1089                   GINT_TO_POINTER (1));
1090               eos_context_async (ctx, splitmux);
1091               if (all_contexts_are_async_eos (splitmux)) {
1092                 GST_INFO_OBJECT (splitmux,
1093                     "All contexts are async_eos. Moving to the next file.");
1094                 /* We can start the next file once we've asked each pad to go EOS */
1095                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1096                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1097                 continue;
1098               }
1099             } else {
1100               send_eos (splitmux, ctx);
1101               continue;
1102             }
1103           } else {
1104             GST_INFO_OBJECT (splitmux,
1105                 "At end-of-file state, but context %p is already EOS", ctx);
1106           }
1107           break;
1108         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1109           if (ctx->is_reference) {
1110             /* Special handling on the reference ctx to start new fragments
1111              * and collect commands from the command queue */
1112             /* drops the splitmux lock briefly: */
1113             /* We must have reference ctx in order for format-location-full to
1114              * have a sample */
1115             start_next_fragment (splitmux, ctx);
1116             continue;
1117           }
1118           break;
1119         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1120           do {
1121             SplitMuxOutputCommand *cmd =
1122                 g_queue_pop_tail (&splitmux->out_cmd_q);
1123             if (cmd != NULL) {
1124               /* If we pop the last command, we need to make our queues bigger */
1125               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1126                 grow_blocked_queues (splitmux);
1127
1128               if (cmd->start_new_fragment) {
1129                 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1130                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1131               } else {
1132                 GST_DEBUG_OBJECT (splitmux,
1133                     "Got new output cmd for time %" GST_STIME_FORMAT,
1134                     GST_STIME_ARGS (cmd->max_output_ts));
1135
1136                 /* Extend the output range immediately */
1137                 splitmux->max_out_running_time = cmd->max_output_ts;
1138                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1139               }
1140               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1141
1142               out_cmd_buf_free (cmd);
1143               break;
1144             } else {
1145               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1146             }
1147           } while (splitmux->output_state ==
1148               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1149           /* loop and re-check the state */
1150           continue;
1151         }
1152         case SPLITMUX_OUTPUT_STATE_STOPPED:
1153           return;
1154       }
1155     }
1156
1157     GST_INFO_OBJECT (ctx->srcpad,
1158         "Sleeping for running time %"
1159         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1160         GST_STIME_ARGS (ctx->out_running_time),
1161         GST_STIME_ARGS (splitmux->max_out_running_time));
1162     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1163     GST_INFO_OBJECT (ctx->srcpad,
1164         "Woken for new max running time %" GST_STIME_FORMAT,
1165         GST_STIME_ARGS (splitmux->max_out_running_time));
1166   }
1167   while (1);
1168 }
1169
1170 static GstClockTime
1171 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1172     const GstVideoTimeCode * cur_tc)
1173 {
1174   GstVideoTimeCode *target_tc;
1175   GstVideoTimeCodeInterval *tc_inter;
1176   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1177
1178   if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1179     return GST_CLOCK_TIME_NONE;
1180
1181   tc_inter =
1182       gst_video_time_code_interval_new_from_string
1183       (splitmux->threshold_timecode_str);
1184   target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1185   gst_video_time_code_interval_free (tc_inter);
1186
1187   /* Convert to ns */
1188   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1189   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1190
1191   /* Add fragment_start_time, accounting for wraparound */
1192   if (target_tc_time >= cur_tc_time) {
1193     next_max_tc_time =
1194         target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1195   } else {
1196     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1197
1198     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1199         (cur_tc->config.fps_d == 1001)) {
1200       /* Checking fps_d is probably unneeded, but better safe than sorry
1201        * (e.g. someone accidentally set a flag) */
1202       GstVideoTimeCode *tc_for_offset;
1203
1204       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1205        * but slightly less. Calculate that duration from a fake timecode. The
1206        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1207        * is to add one frame to 23:59:59;29 */
1208       tc_for_offset =
1209           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1210           NULL, cur_tc->config.flags, 23, 59, 59,
1211           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1212       day_in_ns =
1213           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1214           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1215           cur_tc->config.fps_n);
1216       gst_video_time_code_free (tc_for_offset);
1217     }
1218     next_max_tc_time =
1219         day_in_ns - cur_tc_time + target_tc_time +
1220         splitmux->fragment_start_time;
1221   }
1222
1223   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1224       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1225       GST_TIME_ARGS (cur_tc_time));
1226   gst_video_time_code_free (target_tc);
1227
1228   return next_max_tc_time;
1229 }
1230
1231 static gboolean
1232 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1233 {
1234   GstEvent *ev;
1235   GstClockTime target_time;
1236   gboolean timecode_based = FALSE;
1237
1238   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1239   if (splitmux->threshold_timecode_str) {
1240     GstVideoTimeCodeMeta *tc_meta;
1241
1242     if (buffer != NULL) {
1243       tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1244       if (tc_meta) {
1245         splitmux->next_max_tc_time =
1246             calculate_next_max_timecode (splitmux, &tc_meta->tc);
1247         timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1248       }
1249     } else {
1250       /* This can happen in the presence of GAP events that trigger
1251        * a new fragment start */
1252       GST_WARNING_OBJECT (splitmux,
1253           "No buffer available to calculate next timecode");
1254     }
1255   }
1256
1257   if (splitmux->send_keyframe_requests == FALSE
1258       || (splitmux->threshold_time == 0 && !timecode_based)
1259       || splitmux->threshold_bytes != 0)
1260     return TRUE;
1261
1262   if (timecode_based) {
1263     /* We might have rounding errors: aim slightly earlier */
1264     target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1265   } else {
1266     target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1267   }
1268   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1269   GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1270       GST_TIME_ARGS (target_time));
1271   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1272 }
1273
1274 static GstPadProbeReturn
1275 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1276 {
1277   GstSplitMuxSink *splitmux = ctx->splitmux;
1278   MqStreamBuf *buf_info = NULL;
1279
1280   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1281
1282   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1283   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1284     g_warning ("Buffer list handling not implemented");
1285     return GST_PAD_PROBE_DROP;
1286   }
1287   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1288       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1289     GstEvent *event = gst_pad_probe_info_get_event (info);
1290     gboolean locked = FALSE;
1291
1292     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1293
1294     switch (GST_EVENT_TYPE (event)) {
1295       case GST_EVENT_SEGMENT:
1296         gst_event_copy_segment (event, &ctx->out_segment);
1297         break;
1298       case GST_EVENT_FLUSH_STOP:
1299         GST_SPLITMUX_LOCK (splitmux);
1300         locked = TRUE;
1301         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1302         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1303         g_queue_clear (&ctx->queued_bufs);
1304         ctx->flushing = FALSE;
1305         break;
1306       case GST_EVENT_FLUSH_START:
1307         GST_SPLITMUX_LOCK (splitmux);
1308         locked = TRUE;
1309         GST_LOG_OBJECT (pad, "Flush start");
1310         ctx->flushing = TRUE;
1311         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1312         break;
1313       case GST_EVENT_EOS:
1314         GST_SPLITMUX_LOCK (splitmux);
1315         locked = TRUE;
1316         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1317           goto beach;
1318         ctx->out_eos = TRUE;
1319         GST_INFO_OBJECT (splitmux,
1320             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1321         break;
1322       case GST_EVENT_GAP:{
1323         GstClockTime gap_ts;
1324         GstClockTimeDiff rtime;
1325
1326         gst_event_parse_gap (event, &gap_ts, NULL);
1327         if (gap_ts == GST_CLOCK_TIME_NONE)
1328           break;
1329
1330         GST_SPLITMUX_LOCK (splitmux);
1331         locked = TRUE;
1332
1333         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1334           goto beach;
1335
1336         /* When we get a gap event on the
1337          * reference stream and we're trying to open a
1338          * new file, we need to store it until we get
1339          * the buffer afterwards
1340          */
1341         if (ctx->is_reference &&
1342             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1343           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1344           gst_event_replace (&ctx->pending_gap, event);
1345           GST_SPLITMUX_UNLOCK (splitmux);
1346           return GST_PAD_PROBE_HANDLED;
1347         }
1348
1349         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1350
1351         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1352             GST_STIME_ARGS (rtime));
1353
1354         if (rtime != GST_CLOCK_STIME_NONE) {
1355           ctx->out_running_time = rtime;
1356           complete_or_wait_on_out (splitmux, ctx);
1357         }
1358         break;
1359       }
1360       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1361         const GstStructure *s;
1362         GstClockTimeDiff ts = 0;
1363
1364         s = gst_event_get_structure (event);
1365         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1366           break;
1367
1368         gst_structure_get_int64 (s, "timestamp", &ts);
1369
1370         GST_SPLITMUX_LOCK (splitmux);
1371         locked = TRUE;
1372
1373         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1374           goto beach;
1375         ctx->out_running_time = ts;
1376         if (!ctx->is_reference)
1377           complete_or_wait_on_out (splitmux, ctx);
1378         GST_SPLITMUX_UNLOCK (splitmux);
1379         return GST_PAD_PROBE_DROP;
1380       }
1381       case GST_EVENT_CAPS:{
1382         GstPad *peer;
1383
1384         if (!ctx->is_reference)
1385           break;
1386
1387         peer = gst_pad_get_peer (pad);
1388         if (peer) {
1389           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1390
1391           gst_object_unref (peer);
1392
1393           if (ok)
1394             break;
1395
1396         } else {
1397           break;
1398         }
1399         /* This is in the case the muxer doesn't allow this change of caps */
1400         GST_SPLITMUX_LOCK (splitmux);
1401         locked = TRUE;
1402         ctx->caps_change = TRUE;
1403
1404         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1405           GST_DEBUG_OBJECT (splitmux,
1406               "New caps were not accepted. Switching output file");
1407           if (ctx->out_eos == FALSE) {
1408             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1409             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1410           }
1411         }
1412
1413         /* Lets it fall through, if it fails again, then the muxer just can't
1414          * support this format, but at least we have a closed file.
1415          */
1416         break;
1417       }
1418       default:
1419         break;
1420     }
1421
1422     /* We need to make sure events aren't passed
1423      * until the muxer / sink are ready for it */
1424     if (!locked)
1425       GST_SPLITMUX_LOCK (splitmux);
1426     if (!ctx->is_reference)
1427       complete_or_wait_on_out (splitmux, ctx);
1428     GST_SPLITMUX_UNLOCK (splitmux);
1429
1430     /* Don't try to forward sticky events before the next buffer is there
1431      * because it would cause a new file to be created without the first
1432      * buffer being available.
1433      */
1434     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1435       gst_event_unref (event);
1436       return GST_PAD_PROBE_HANDLED;
1437     } else
1438       return GST_PAD_PROBE_PASS;
1439   }
1440
1441   /* Allow everything through until the configured next stopping point */
1442   GST_SPLITMUX_LOCK (splitmux);
1443
1444   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1445   if (buf_info == NULL)
1446     /* Can only happen due to a poorly timed flush */
1447     goto beach;
1448
1449   /* If we have popped a keyframe, decrement the queued_gop count */
1450   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1451     splitmux->queued_keyframes--;
1452
1453   ctx->out_running_time = buf_info->run_ts;
1454   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1455
1456   GST_LOG_OBJECT (splitmux,
1457       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1458       " size %" G_GUINT64_FORMAT,
1459       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1460
1461   ctx->caps_change = FALSE;
1462
1463   complete_or_wait_on_out (splitmux, ctx);
1464
1465   splitmux->muxed_out_bytes += buf_info->buf_size;
1466
1467 #ifndef GST_DISABLE_GST_DEBUG
1468   {
1469     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1470     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1471         " run ts %" GST_STIME_FORMAT, buf,
1472         GST_STIME_ARGS (ctx->out_running_time));
1473   }
1474 #endif
1475
1476   ctx->cur_out_buffer = NULL;
1477   GST_SPLITMUX_UNLOCK (splitmux);
1478
1479   /* pending_gap is protected by the STREAM lock */
1480   if (ctx->pending_gap) {
1481     /* If we previously stored a gap event, send it now */
1482     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1483
1484     GST_DEBUG_OBJECT (splitmux,
1485         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1486
1487     gst_pad_send_event (peer, ctx->pending_gap);
1488     ctx->pending_gap = NULL;
1489
1490     gst_object_unref (peer);
1491   }
1492
1493   mq_stream_buf_free (buf_info);
1494
1495   return GST_PAD_PROBE_PASS;
1496
1497 beach:
1498   GST_SPLITMUX_UNLOCK (splitmux);
1499   return GST_PAD_PROBE_DROP;
1500 }
1501
1502 static gboolean
1503 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1504 {
1505   return gst_pad_send_event (peer, gst_event_ref (*event));
1506 }
1507
1508 static void
1509 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1510 {
1511   if (ctx->fragment_block_id > 0) {
1512     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1513     ctx->fragment_block_id = 0;
1514   }
1515 }
1516
1517 static void
1518 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1519 {
1520   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1521
1522   gst_pad_sticky_events_foreach (ctx->srcpad,
1523       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1524
1525   /* Clear EOS flag if not actually EOS */
1526   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1527   ctx->out_eos_async_done = ctx->out_eos;
1528
1529   gst_object_unref (peer);
1530 }
1531
1532 static void
1533 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1534 {
1535   GstPad *sinkpad, *srcpad, *newpad;
1536   GstPadTemplate *templ;
1537
1538   srcpad = ctx->srcpad;
1539   sinkpad = gst_pad_get_peer (srcpad);
1540
1541   templ = sinkpad->padtemplate;
1542   newpad =
1543       gst_element_request_pad (splitmux->muxer, templ,
1544       GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1545
1546   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1547       newpad);
1548   if (!gst_pad_unlink (srcpad, sinkpad)) {
1549     gst_object_unref (sinkpad);
1550     goto fail;
1551   }
1552   if (gst_pad_link_full (srcpad, newpad,
1553           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1554     gst_element_release_request_pad (splitmux->muxer, newpad);
1555     gst_object_unref (sinkpad);
1556     gst_object_unref (newpad);
1557     goto fail;
1558   }
1559   gst_object_unref (newpad);
1560   gst_object_unref (sinkpad);
1561   return;
1562
1563 fail:
1564   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1565       ("Could not create the new muxer/sink"), NULL);
1566 }
1567
1568 static GstPadProbeReturn
1569 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1570 {
1571   return GST_PAD_PROBE_OK;
1572 }
1573
1574 static void
1575 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1576 {
1577   ctx->fragment_block_id =
1578       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1579       NULL, NULL);
1580 }
1581
1582 static gboolean
1583 _set_property_from_structure (GQuark field_id, const GValue * value,
1584     gpointer user_data)
1585 {
1586   const gchar *property_name = g_quark_to_string (field_id);
1587   GObject *element = G_OBJECT (user_data);
1588
1589   g_object_set_property (element, property_name, value);
1590
1591   return TRUE;
1592 }
1593
1594 static void
1595 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1596 {
1597   gst_element_set_locked_state (element, TRUE);
1598   gst_element_set_state (element, GST_STATE_NULL);
1599   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1600   gst_bin_remove (GST_BIN (splitmux), element);
1601 }
1602
1603
1604 static void
1605 _send_event (const GValue * value, gpointer user_data)
1606 {
1607   GstPad *pad = g_value_get_object (value);
1608   GstEvent *ev = user_data;
1609
1610   gst_pad_send_event (pad, gst_event_ref (ev));
1611 }
1612
1613 /* Called with lock held when a fragment
1614  * reaches EOS and it is time to restart
1615  * a new fragment
1616  */
1617 static void
1618 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1619 {
1620   GstElement *muxer, *sink;
1621
1622   g_assert (ctx->is_reference);
1623
1624   /* 1 change to new file */
1625   splitmux->switching_fragment = TRUE;
1626
1627   /* We need to drop the splitmux lock to acquire the state lock
1628    * here and ensure there's no racy state change going on elsewhere */
1629   muxer = gst_object_ref (splitmux->muxer);
1630   sink = gst_object_ref (splitmux->active_sink);
1631
1632   GST_SPLITMUX_UNLOCK (splitmux);
1633   GST_STATE_LOCK (splitmux);
1634
1635   if (splitmux->async_finalize) {
1636     if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1637       gchar *newname;
1638       GstElement *new_sink, *new_muxer;
1639
1640       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1641           splitmux->fragment_id);
1642       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1643       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1644       GST_SPLITMUX_LOCK (splitmux);
1645       if ((splitmux->sink =
1646               create_element (splitmux, splitmux->sink_factory, newname,
1647                   TRUE)) == NULL)
1648         goto fail;
1649       if (splitmux->sink_properties)
1650         gst_structure_foreach (splitmux->sink_properties,
1651             _set_property_from_structure, splitmux->sink);
1652       splitmux->active_sink = splitmux->sink;
1653       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1654       g_free (newname);
1655       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1656       if ((splitmux->muxer =
1657               create_element (splitmux, splitmux->muxer_factory, newname,
1658                   TRUE)) == NULL)
1659         goto fail;
1660       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1661               "async") != NULL) {
1662         /* async child elements are causing state change races and weird
1663          * failures, so let's try and turn that off */
1664         g_object_set (splitmux->sink, "async", FALSE, NULL);
1665       }
1666       if (splitmux->muxer_properties)
1667         gst_structure_foreach (splitmux->muxer_properties,
1668             _set_property_from_structure, splitmux->muxer);
1669       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1670       g_free (newname);
1671       new_sink = splitmux->sink;
1672       new_muxer = splitmux->muxer;
1673       GST_SPLITMUX_UNLOCK (splitmux);
1674       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1675       gst_element_link (new_muxer, new_sink);
1676
1677       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1678         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1679                     EOS_FROM_US)) == 2) {
1680           _lock_and_set_to_null (muxer, splitmux);
1681           _lock_and_set_to_null (sink, splitmux);
1682         } else {
1683           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1684               GINT_TO_POINTER (2));
1685         }
1686       }
1687       gst_object_unref (muxer);
1688       gst_object_unref (sink);
1689       muxer = new_muxer;
1690       sink = new_sink;
1691       gst_object_ref (muxer);
1692       gst_object_ref (sink);
1693     }
1694   } else {
1695
1696     gst_element_set_locked_state (muxer, TRUE);
1697     gst_element_set_locked_state (sink, TRUE);
1698     gst_element_set_state (sink, GST_STATE_NULL);
1699
1700     if (splitmux->reset_muxer) {
1701       gst_element_set_state (muxer, GST_STATE_NULL);
1702     } else {
1703       GstIterator *it = gst_element_iterate_sink_pads (muxer);
1704       GstEvent *ev;
1705
1706       ev = gst_event_new_flush_start ();
1707       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1708       gst_event_unref (ev);
1709
1710       gst_iterator_resync (it);
1711
1712       ev = gst_event_new_flush_stop (TRUE);
1713       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1714       gst_event_unref (ev);
1715
1716       gst_iterator_free (it);
1717     }
1718   }
1719
1720   GST_SPLITMUX_LOCK (splitmux);
1721   if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1722     set_next_filename (splitmux, ctx);
1723   splitmux->muxed_out_bytes = 0;
1724   GST_SPLITMUX_UNLOCK (splitmux);
1725
1726   gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1727   gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1728   gst_element_set_locked_state (muxer, FALSE);
1729   gst_element_set_locked_state (sink, FALSE);
1730
1731   gst_object_unref (sink);
1732   gst_object_unref (muxer);
1733
1734   GST_SPLITMUX_LOCK (splitmux);
1735   GST_STATE_UNLOCK (splitmux);
1736   splitmux->switching_fragment = FALSE;
1737   do_async_done (splitmux);
1738
1739   splitmux->ready_for_output = TRUE;
1740
1741   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1742   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1743
1744   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1745
1746   /* FIXME: Is this always the correct next state? */
1747   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1748   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1749   return;
1750
1751 fail:
1752   GST_STATE_UNLOCK (splitmux);
1753   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1754       ("Could not create the new muxer/sink"), NULL);
1755 }
1756
1757 static void
1758 bus_handler (GstBin * bin, GstMessage * message)
1759 {
1760   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1761
1762   switch (GST_MESSAGE_TYPE (message)) {
1763     case GST_MESSAGE_EOS:{
1764       /* If the state is draining out the current file, drop this EOS */
1765       GstElement *sink;
1766
1767       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1768       GST_SPLITMUX_LOCK (splitmux);
1769
1770       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1771
1772       if (splitmux->async_finalize) {
1773
1774         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1775           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1776                       EOS_FROM_US)) == 2) {
1777             GstElement *muxer;
1778             GstPad *sinksink, *muxersrc;
1779
1780             sinksink = gst_element_get_static_pad (sink, "sink");
1781             muxersrc = gst_pad_get_peer (sinksink);
1782             muxer = gst_pad_get_parent_element (muxersrc);
1783             gst_object_unref (sinksink);
1784             gst_object_unref (muxersrc);
1785
1786             gst_element_call_async (muxer,
1787                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1788                 gst_object_ref (splitmux), gst_object_unref);
1789             gst_element_call_async (sink,
1790                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1791                 gst_object_ref (splitmux), gst_object_unref);
1792             gst_object_unref (muxer);
1793           } else {
1794             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1795                 GINT_TO_POINTER (2));
1796           }
1797           GST_DEBUG_OBJECT (splitmux,
1798               "Caught async EOS from previous muxer+sink. Dropping.");
1799           /* We forward the EOS so that it gets aggregated as normal. If the sink
1800            * finishes and is removed before the end, it will be de-aggregated */
1801           gst_message_unref (message);
1802           GST_SPLITMUX_UNLOCK (splitmux);
1803           return;
1804         }
1805       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1806         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1807         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1808         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1809
1810         gst_message_unref (message);
1811         GST_SPLITMUX_UNLOCK (splitmux);
1812         return;
1813       } else {
1814         GST_DEBUG_OBJECT (splitmux,
1815             "Passing EOS message. Output state %d max_out_running_time %"
1816             GST_STIME_FORMAT, splitmux->output_state,
1817             GST_STIME_ARGS (splitmux->max_out_running_time));
1818       }
1819       GST_SPLITMUX_UNLOCK (splitmux);
1820       break;
1821     }
1822     case GST_MESSAGE_ASYNC_START:
1823     case GST_MESSAGE_ASYNC_DONE:
1824       /* Ignore state changes from our children while switching */
1825       GST_SPLITMUX_LOCK (splitmux);
1826       if (splitmux->switching_fragment) {
1827         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1828             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1829           GST_LOG_OBJECT (splitmux,
1830               "Ignoring state change from child %" GST_PTR_FORMAT
1831               " while switching", GST_MESSAGE_SRC (message));
1832           gst_message_unref (message);
1833           GST_SPLITMUX_UNLOCK (splitmux);
1834           return;
1835         }
1836       }
1837       GST_SPLITMUX_UNLOCK (splitmux);
1838       break;
1839     case GST_MESSAGE_WARNING:
1840     {
1841       GError *gerror = NULL;
1842
1843       gst_message_parse_warning (message, &gerror, NULL);
1844
1845       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1846         GList *item;
1847         gboolean caps_change = FALSE;
1848
1849         GST_SPLITMUX_LOCK (splitmux);
1850
1851         for (item = splitmux->contexts; item; item = item->next) {
1852           MqStreamCtx *ctx = item->data;
1853
1854           if (ctx->caps_change) {
1855             caps_change = TRUE;
1856             break;
1857           }
1858         }
1859
1860         GST_SPLITMUX_UNLOCK (splitmux);
1861
1862         if (caps_change) {
1863           GST_LOG_OBJECT (splitmux,
1864               "Ignoring warning change from child %" GST_PTR_FORMAT
1865               " while switching caps", GST_MESSAGE_SRC (message));
1866           gst_message_unref (message);
1867           return;
1868         }
1869       }
1870       break;
1871     }
1872     default:
1873       break;
1874   }
1875
1876   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1877 }
1878
1879 static void
1880 ctx_set_unblock (MqStreamCtx * ctx)
1881 {
1882   ctx->need_unblock = TRUE;
1883 }
1884
1885 static gboolean
1886 need_new_fragment (GstSplitMuxSink * splitmux,
1887     GstClockTime queued_time, GstClockTime queued_gop_time,
1888     guint64 queued_bytes)
1889 {
1890   guint64 thresh_bytes;
1891   GstClockTime thresh_time;
1892   gboolean check_robust_muxing;
1893   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
1894   GstClockTime *ptr_to_time;
1895
1896   GST_OBJECT_LOCK (splitmux);
1897   thresh_bytes = splitmux->threshold_bytes;
1898   thresh_time = splitmux->threshold_time;
1899   ptr_to_time = (GstClockTime *)
1900       gst_queue_array_peek_head_struct (splitmux->times_to_split);
1901   if (ptr_to_time)
1902     time_to_split = *ptr_to_time;
1903   check_robust_muxing = splitmux->use_robust_muxing
1904       && splitmux->muxer_has_reserved_props;
1905   GST_OBJECT_UNLOCK (splitmux);
1906
1907   /* Have we muxed anything into the new file at all? */
1908   if (splitmux->fragment_total_bytes <= 0)
1909     return FALSE;
1910
1911   /* User told us to split now */
1912   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE)
1913     return TRUE;
1914
1915   /* User told us to split at this running time */
1916   if (splitmux->reference_ctx->in_running_time > time_to_split) {
1917     GST_OBJECT_LOCK (splitmux);
1918     /* Dequeue running time */
1919     gst_queue_array_pop_head_struct (splitmux->times_to_split);
1920     /* Empty any running times after this that are past now */
1921     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1922     while (ptr_to_time) {
1923       time_to_split = *ptr_to_time;
1924       if (splitmux->reference_ctx->in_running_time <= time_to_split) {
1925         break;
1926       }
1927       gst_queue_array_pop_head_struct (splitmux->times_to_split);
1928       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1929     }
1930     GST_OBJECT_UNLOCK (splitmux);
1931     return TRUE;
1932   }
1933
1934   if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1935     return TRUE;                /* Would overrun byte limit */
1936
1937   if (thresh_time > 0 && queued_time > thresh_time)
1938     return TRUE;                /* Would overrun byte limit */
1939
1940   /* Timecode-based threshold accounts for possible rounding errors:
1941    * 5us should be bigger than all possible rounding errors but nowhere near
1942    * big enough to skip to another frame */
1943   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1944       splitmux->reference_ctx->in_running_time >
1945       splitmux->next_max_tc_time + 5 * GST_USECOND)
1946     return TRUE;                /* Timecode threshold */
1947
1948   if (check_robust_muxing) {
1949     GstClockTime mux_reserved_remain;
1950
1951     g_object_get (splitmux->muxer,
1952         "reserved-duration-remaining", &mux_reserved_remain, NULL);
1953
1954     GST_LOG_OBJECT (splitmux,
1955         "Muxer robust muxing report - %" G_GUINT64_FORMAT
1956         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1957         mux_reserved_remain, queued_gop_time);
1958
1959     if (queued_gop_time >= mux_reserved_remain) {
1960       GST_INFO_OBJECT (splitmux,
1961           "File is about to run out of header room - %" G_GUINT64_FORMAT
1962           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1963           ". Switching to new file", mux_reserved_remain, queued_gop_time);
1964       return TRUE;
1965     }
1966   }
1967
1968   /* Continue and mux this GOP */
1969   return FALSE;
1970 }
1971
1972 /* Called with splitmux lock held */
1973 /* Called when entering ProcessingCompleteGop state
1974  * Assess if mq contents overflowed the current file
1975  *   -> If yes, need to switch to new file
1976  *   -> if no, set max_out_running_time to let this GOP in and
1977  *      go to COLLECTING_GOP_START state
1978  */
1979 static void
1980 handle_gathered_gop (GstSplitMuxSink * splitmux)
1981 {
1982   guint64 queued_bytes;
1983   GstClockTimeDiff queued_time = 0;
1984   GstClockTimeDiff queued_gop_time = 0;
1985   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1986   SplitMuxOutputCommand *cmd;
1987
1988   /* Assess if the multiqueue contents overflowed the current file */
1989   /* When considering if a newly gathered GOP overflows
1990    * the time limit for the file, only consider the running time of the
1991    * reference stream. Other streams might have run ahead a little bit,
1992    * but extra pieces won't be released to the muxer beyond the reference
1993    * stream cut-off anyway - so it forms the limit. */
1994   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1995   queued_time = splitmux->reference_ctx->in_running_time;
1996   /* queued_gop_time tracks how much unwritten data there is waiting to
1997    * be written to this fragment including this GOP */
1998   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
1999     queued_gop_time =
2000         splitmux->reference_ctx->in_running_time -
2001         splitmux->reference_ctx->out_running_time;
2002   else
2003     queued_gop_time =
2004         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2005
2006   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2007
2008   g_assert (queued_gop_time >= 0);
2009   g_assert (queued_time >= splitmux->fragment_start_time);
2010
2011   queued_time -= splitmux->fragment_start_time;
2012   if (queued_time < queued_gop_time)
2013     queued_gop_time = queued_time;
2014
2015   /* Expand queued bytes estimate by muxer overhead */
2016   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2017
2018   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2019       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
2020   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
2021     GST_LOG_OBJECT (splitmux,
2022         "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
2023         GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
2024         GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
2025   }
2026
2027   /* Check for overrun - have we output at least one byte and overrun
2028    * either threshold? */
2029   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2030     if (splitmux->async_finalize) {
2031       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2032       *sink_running_time = splitmux->reference_ctx->out_running_time;
2033       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2034           RUNNING_TIME, sink_running_time, g_free);
2035     }
2036     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2037     /* Tell the output side to start a new fragment */
2038     GST_INFO_OBJECT (splitmux,
2039         "This GOP (dur %" GST_STIME_FORMAT
2040         ") would overflow the fragment, Sending start_new_fragment cmd",
2041         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2042             splitmux->gop_start_time));
2043     cmd = out_cmd_buf_new ();
2044     cmd->start_new_fragment = TRUE;
2045     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2046     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2047
2048     new_out_ts = splitmux->reference_ctx->in_running_time;
2049     splitmux->fragment_start_time = splitmux->gop_start_time;
2050     splitmux->fragment_total_bytes = 0;
2051
2052     if (request_next_keyframe (splitmux,
2053             splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
2054       GST_WARNING_OBJECT (splitmux,
2055           "Could not request a keyframe. Files may not split at the exact location they should");
2056     }
2057     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2058   }
2059
2060   /* And set up to collect the next GOP */
2061   if (!splitmux->reference_ctx->in_eos) {
2062     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2063     splitmux->gop_start_time = new_out_ts;
2064   } else {
2065     /* This is probably already the current state, but just in case: */
2066     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2067     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
2068   }
2069
2070   /* And wake all input contexts to send a wake-up event */
2071   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2072   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2073
2074   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2075   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2076
2077   if (splitmux->gop_total_bytes > 0) {
2078     GST_LOG_OBJECT (splitmux,
2079         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2080         " time %" GST_STIME_FORMAT,
2081         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2082
2083     /* Send this GOP to the output command queue */
2084     cmd = out_cmd_buf_new ();
2085     cmd->start_new_fragment = FALSE;
2086     cmd->max_output_ts = new_out_ts;
2087     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2088         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2089     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2090
2091     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2092   }
2093
2094   splitmux->gop_total_bytes = 0;
2095 }
2096
2097 /* Called with splitmux lock held */
2098 /* Called from each input pad when it is has all the pieces
2099  * for a GOP or EOS, starting with the reference pad which has set the
2100  * splitmux->max_in_running_time
2101  */
2102 static void
2103 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2104 {
2105   GList *cur;
2106   GstEvent *event;
2107
2108   /* On ENDING_FILE, the reference stream sends a command to start a new
2109    * fragment, then releases the GOP for output in the new fragment.
2110    *  If somes streams received no buffer during the last GOP that overran,
2111    * because its next buffer has a timestamp bigger than
2112    * ctx->max_in_running_time, its queue is empty. In that case the only
2113    * way to wakeup the output thread is by injecting an event in the
2114    * queue. This usually happen with subtitle streams.
2115    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2116   if (ctx->need_unblock) {
2117     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2118     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2119         GST_EVENT_TYPE_SERIALIZED,
2120         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2121             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2122
2123     GST_SPLITMUX_UNLOCK (splitmux);
2124     gst_pad_send_event (ctx->sinkpad, event);
2125     GST_SPLITMUX_LOCK (splitmux);
2126
2127     ctx->need_unblock = FALSE;
2128     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2129     /* state may have changed while we were unlocked. Loop again if so */
2130     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2131       return;
2132   }
2133
2134   if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2135     gboolean ready = TRUE;
2136
2137     /* Iterate each pad, and check that the input running time is at least
2138      * up to the reference running time, and if so handle the collected GOP */
2139     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2140         GST_STIME_FORMAT " ctx %p",
2141         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2142     for (cur = g_list_first (splitmux->contexts); cur != NULL;
2143         cur = g_list_next (cur)) {
2144       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2145
2146       GST_LOG_OBJECT (splitmux,
2147           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2148           " EOS %d", tmpctx, tmpctx->srcpad,
2149           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2150
2151       if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2152           tmpctx->in_running_time < splitmux->max_in_running_time &&
2153           !tmpctx->in_eos) {
2154         GST_LOG_OBJECT (splitmux,
2155             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2156             tmpctx, tmpctx->srcpad);
2157         ready = FALSE;
2158         break;
2159       }
2160     }
2161     if (ready) {
2162       GST_DEBUG_OBJECT (splitmux,
2163           "Collected GOP is complete. Processing (ctx %p)", ctx);
2164       /* All pads have a complete GOP, release it into the multiqueue */
2165       handle_gathered_gop (splitmux);
2166
2167       /* The user has requested a split, we can split now that the previous GOP
2168        * has been collected to the correct location */
2169       if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE,
2170               FALSE)) {
2171         g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2172       }
2173     }
2174   }
2175
2176   /* If upstream reached EOS we are not expecting more data, no need to wait
2177    * here. */
2178   if (ctx->in_eos)
2179     return;
2180
2181   /* Some pad is not yet ready, or GOP is being pushed
2182    * either way, sleep and wait to get woken */
2183   while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2184       !ctx->flushing &&
2185       (ctx->in_running_time >= splitmux->max_in_running_time) &&
2186       (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2187
2188     GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2189     GST_SPLITMUX_WAIT_INPUT (splitmux);
2190     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2191   }
2192 }
2193
2194 static GstPadProbeReturn
2195 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2196 {
2197   GstSplitMuxSink *splitmux = ctx->splitmux;
2198   GstBuffer *buf;
2199   MqStreamBuf *buf_info = NULL;
2200   GstClockTime ts;
2201   gboolean loop_again;
2202   gboolean keyframe = FALSE;
2203
2204   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2205
2206   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2207   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2208     g_warning ("Buffer list handling not implemented");
2209     return GST_PAD_PROBE_DROP;
2210   }
2211   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2212       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2213     GstEvent *event = gst_pad_probe_info_get_event (info);
2214
2215     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2216
2217     switch (GST_EVENT_TYPE (event)) {
2218       case GST_EVENT_SEGMENT:
2219         gst_event_copy_segment (event, &ctx->in_segment);
2220         break;
2221       case GST_EVENT_FLUSH_STOP:
2222         GST_SPLITMUX_LOCK (splitmux);
2223         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2224         ctx->in_eos = FALSE;
2225         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2226         GST_SPLITMUX_UNLOCK (splitmux);
2227         break;
2228       case GST_EVENT_EOS:
2229         GST_SPLITMUX_LOCK (splitmux);
2230         ctx->in_eos = TRUE;
2231
2232         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2233           goto beach;
2234
2235         if (ctx->is_reference) {
2236           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2237           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2238           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2239           /* Wake up other input pads to collect this GOP */
2240           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2241           check_completed_gop (splitmux, ctx);
2242         } else if (splitmux->input_state ==
2243             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2244           /* If we are waiting for a GOP to be completed (ie, for aux
2245            * pads to catch up), then this pad is complete, so check
2246            * if the whole GOP is.
2247            */
2248           check_completed_gop (splitmux, ctx);
2249         }
2250         GST_SPLITMUX_UNLOCK (splitmux);
2251         break;
2252       case GST_EVENT_GAP:{
2253         GstClockTime gap_ts;
2254         GstClockTimeDiff rtime;
2255
2256         gst_event_parse_gap (event, &gap_ts, NULL);
2257         if (gap_ts == GST_CLOCK_TIME_NONE)
2258           break;
2259
2260         GST_SPLITMUX_LOCK (splitmux);
2261
2262         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2263           goto beach;
2264         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2265
2266         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2267             GST_STIME_ARGS (rtime));
2268
2269         if (ctx->is_reference
2270             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2271           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2272           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2273               GST_STIME_ARGS (splitmux->fragment_start_time));
2274           /* Also take this as the first start time when starting up,
2275            * so that we start counting overflow from the first frame */
2276           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2277             splitmux->max_in_running_time = splitmux->fragment_start_time;
2278         }
2279
2280         GST_SPLITMUX_UNLOCK (splitmux);
2281         break;
2282       }
2283       default:
2284         break;
2285     }
2286     return GST_PAD_PROBE_PASS;
2287   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2288     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2289       case GST_QUERY_ALLOCATION:
2290         return GST_PAD_PROBE_DROP;
2291       default:
2292         return GST_PAD_PROBE_PASS;
2293     }
2294   }
2295
2296   buf = gst_pad_probe_info_get_buffer (info);
2297   buf_info = mq_stream_buf_new ();
2298
2299   if (GST_BUFFER_PTS_IS_VALID (buf))
2300     ts = GST_BUFFER_PTS (buf);
2301   else
2302     ts = GST_BUFFER_DTS (buf);
2303
2304   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2305
2306   GST_SPLITMUX_LOCK (splitmux);
2307
2308   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2309     goto beach;
2310
2311   /* If this buffer has a timestamp, advance the input timestamp of the
2312    * stream */
2313   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2314     GstClockTimeDiff running_time =
2315         my_segment_to_running_time (&ctx->in_segment, ts);
2316
2317     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2318         GST_STIME_ARGS (running_time));
2319
2320     if (GST_CLOCK_STIME_IS_VALID (running_time)
2321         && running_time > ctx->in_running_time)
2322       ctx->in_running_time = running_time;
2323   }
2324
2325   /* Try to make sure we have a valid running time */
2326   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2327     ctx->in_running_time =
2328         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2329   }
2330
2331   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2332       GST_STIME_ARGS (ctx->in_running_time));
2333
2334   buf_info->run_ts = ctx->in_running_time;
2335   buf_info->buf_size = gst_buffer_get_size (buf);
2336   buf_info->duration = GST_BUFFER_DURATION (buf);
2337
2338   /* initialize fragment_start_time */
2339   if (ctx->is_reference
2340       && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2341     splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2342     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2343         GST_STIME_ARGS (splitmux->fragment_start_time));
2344     gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2345
2346     /* Also take this as the first start time when starting up,
2347      * so that we start counting overflow from the first frame */
2348     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2349       splitmux->max_in_running_time = splitmux->fragment_start_time;
2350     if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2351       GST_WARNING_OBJECT (splitmux,
2352           "Could not request a keyframe. Files may not split at the exact location they should");
2353     }
2354     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2355   }
2356
2357   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2358       " total GOP bytes %" G_GUINT64_FORMAT,
2359       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2360
2361   loop_again = TRUE;
2362   do {
2363     if (ctx->flushing)
2364       break;
2365
2366     switch (splitmux->input_state) {
2367       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2368         if (ctx->is_reference) {
2369           /* This is the reference context. If it's a keyframe,
2370            * it marks the start of a new GOP and we should wait in
2371            * check_completed_gop before continuing, but either way
2372            * (keyframe or no, we'll pass this buffer through after
2373            * so set loop_again to FALSE */
2374           loop_again = FALSE;
2375
2376           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2377             /* Allow other input pads to catch up to here too */
2378             splitmux->max_in_running_time = ctx->in_running_time;
2379             GST_LOG_OBJECT (splitmux,
2380                 "Max in running time now %" GST_TIME_FORMAT,
2381                 GST_TIME_ARGS (splitmux->max_in_running_time));
2382             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2383             break;
2384           }
2385           GST_INFO_OBJECT (pad,
2386               "Have keyframe with running time %" GST_STIME_FORMAT,
2387               GST_STIME_ARGS (ctx->in_running_time));
2388           keyframe = TRUE;
2389           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2390           splitmux->max_in_running_time = ctx->in_running_time;
2391           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2392               GST_TIME_ARGS (splitmux->max_in_running_time));
2393           /* Wake up other input pads to collect this GOP */
2394           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2395           check_completed_gop (splitmux, ctx);
2396           /* Store this new keyframe to remember the start of GOP */
2397           gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2398         } else {
2399           /* Pass this buffer if the reference ctx is far enough ahead */
2400           if (ctx->in_running_time < splitmux->max_in_running_time) {
2401             loop_again = FALSE;
2402             break;
2403           }
2404
2405           /* We're still waiting for a keyframe on the reference pad, sleep */
2406           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2407           GST_SPLITMUX_WAIT_INPUT (splitmux);
2408           GST_LOG_OBJECT (pad,
2409               "Done sleeping for GOP start input state now %d",
2410               splitmux->input_state);
2411         }
2412         break;
2413       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2414         /* We're collecting a GOP. If this is the reference context,
2415          * we need to check if this is a keyframe that marks the start
2416          * of the next GOP. If it is, it marks the end of the GOP we're
2417          * collecting, so sleep and wait until all the other pads also
2418          * reach that timestamp - at which point, we have an entire GOP
2419          * and either go to ENDING_FILE or release this GOP to the muxer and
2420          * go back to COLLECT_GOP_START. */
2421
2422         /* If we overran the target timestamp, it might be time to process
2423          * the GOP, otherwise bail out for more data
2424          */
2425         GST_LOG_OBJECT (pad,
2426             "Checking TS %" GST_STIME_FORMAT " against max %"
2427             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2428             GST_STIME_ARGS (splitmux->max_in_running_time));
2429
2430         if (ctx->in_running_time < splitmux->max_in_running_time) {
2431           loop_again = FALSE;
2432           break;
2433         }
2434
2435         GST_LOG_OBJECT (pad,
2436             "Collected last packet of GOP. Checking other pads");
2437         check_completed_gop (splitmux, ctx);
2438         break;
2439       }
2440       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2441         loop_again = FALSE;
2442         break;
2443       default:
2444         loop_again = FALSE;
2445         break;
2446     }
2447   }
2448   while (loop_again);
2449
2450   if (keyframe) {
2451     splitmux->queued_keyframes++;
2452     buf_info->keyframe = TRUE;
2453   }
2454
2455   /* Update total input byte counter for overflow detect */
2456   splitmux->gop_total_bytes += buf_info->buf_size;
2457
2458   /* Now add this buffer to the queue just before returning */
2459   g_queue_push_head (&ctx->queued_bufs, buf_info);
2460
2461   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2462       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2463
2464   GST_SPLITMUX_UNLOCK (splitmux);
2465   return GST_PAD_PROBE_PASS;
2466
2467 beach:
2468   GST_SPLITMUX_UNLOCK (splitmux);
2469   if (buf_info)
2470     mq_stream_buf_free (buf_info);
2471   return GST_PAD_PROBE_PASS;
2472 }
2473
2474 static void
2475 grow_blocked_queues (GstSplitMuxSink * splitmux)
2476 {
2477   GList *cur;
2478
2479   /* Scan other queues for full-ness and grow them */
2480   for (cur = g_list_first (splitmux->contexts);
2481       cur != NULL; cur = g_list_next (cur)) {
2482     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2483     guint cur_limit;
2484     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2485
2486     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2487     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2488
2489     if (cur_len >= cur_limit) {
2490       cur_limit = cur_len + 1;
2491       GST_DEBUG_OBJECT (tmpctx->q,
2492           "Queue overflowed and needs enlarging. Growing to %u buffers",
2493           cur_limit);
2494       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2495     }
2496   }
2497 }
2498
2499 static void
2500 handle_q_underrun (GstElement * q, gpointer user_data)
2501 {
2502   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2503   GstSplitMuxSink *splitmux = ctx->splitmux;
2504
2505   GST_SPLITMUX_LOCK (splitmux);
2506   GST_DEBUG_OBJECT (q,
2507       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2508       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2509   grow_blocked_queues (splitmux);
2510   GST_SPLITMUX_UNLOCK (splitmux);
2511 }
2512
2513 static void
2514 handle_q_overrun (GstElement * q, gpointer user_data)
2515 {
2516   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2517   GstSplitMuxSink *splitmux = ctx->splitmux;
2518   gboolean allow_grow = FALSE;
2519
2520   GST_SPLITMUX_LOCK (splitmux);
2521   GST_DEBUG_OBJECT (q,
2522       "Queue reported overrun with %d keyframes and %d cmds enqueued",
2523       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2524
2525   if (splitmux->queued_keyframes < 2) {
2526     /* Less than a full GOP queued, grow the queue */
2527     allow_grow = TRUE;
2528   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2529     allow_grow = TRUE;
2530   } else {
2531     /* If another queue is starved, grow */
2532     GList *cur;
2533     for (cur = g_list_first (splitmux->contexts);
2534         cur != NULL; cur = g_list_next (cur)) {
2535       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2536       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2537         allow_grow = TRUE;
2538       }
2539     }
2540   }
2541   GST_SPLITMUX_UNLOCK (splitmux);
2542
2543   if (allow_grow) {
2544     guint cur_limit;
2545
2546     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2547     cur_limit++;
2548
2549     GST_DEBUG_OBJECT (q,
2550         "Queue overflowed and needs enlarging. Growing to %u buffers",
2551         cur_limit);
2552
2553     g_object_set (q, "max-size-buffers", cur_limit, NULL);
2554   }
2555 }
2556
2557 static GstPad *
2558 gst_splitmux_sink_request_new_pad (GstElement * element,
2559     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2560 {
2561   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2562   GstPadTemplate *mux_template = NULL;
2563   GstPad *res = NULL;
2564   GstElement *q;
2565   GstPad *q_sink = NULL, *q_src = NULL;
2566   gchar *gname;
2567   gboolean is_video = FALSE;
2568   MqStreamCtx *ctx;
2569
2570   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2571
2572   GST_SPLITMUX_LOCK (splitmux);
2573   if (!create_muxer (splitmux))
2574     goto fail;
2575   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2576
2577   if (templ->name_template) {
2578     if (g_str_equal (templ->name_template, "video")) {
2579       if (splitmux->have_video)
2580         goto already_have_video;
2581
2582       /* FIXME: Look for a pad template with matching caps, rather than by name */
2583       GST_DEBUG_OBJECT (element,
2584           "searching for pad-template with name 'video_%%u'");
2585       mux_template =
2586           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2587           (splitmux->muxer), "video_%u");
2588
2589       /* Fallback to find sink pad templates named 'video' (flvmux) */
2590       if (!mux_template) {
2591         GST_DEBUG_OBJECT (element,
2592             "searching for pad-template with name 'video'");
2593         mux_template =
2594             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2595             (splitmux->muxer), "video");
2596       }
2597       is_video = TRUE;
2598       name = NULL;
2599     } else {
2600       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
2601           templ->name_template);
2602       mux_template =
2603           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2604           (splitmux->muxer), templ->name_template);
2605
2606       /* Fallback to find sink pad templates named 'audio' (flvmux) */
2607       if (!mux_template) {
2608         GST_DEBUG_OBJECT (element,
2609             "searching for pad-template with name 'audio'");
2610         mux_template =
2611             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2612             (splitmux->muxer), "audio");
2613         name = NULL;
2614       }
2615     }
2616
2617     if (mux_template == NULL) {
2618       GST_DEBUG_OBJECT (element,
2619           "searching for pad-template with name 'sink_%%d'");
2620       mux_template =
2621           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2622           (splitmux->muxer), "sink_%d");
2623       name = NULL;
2624     }
2625     if (mux_template == NULL) {
2626       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
2627       mux_template =
2628           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2629           (splitmux->muxer), "sink");
2630       name = NULL;
2631     }
2632   }
2633
2634   if (mux_template == NULL) {
2635     GST_ERROR_OBJECT (element,
2636         "unable to find a suitable sink pad-template on the muxer");
2637
2638     goto fail;
2639   }
2640   GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
2641       mux_template->name_template);
2642
2643   if (mux_template->presence == GST_PAD_REQUEST) {
2644     GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
2645
2646     res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2647     if (res == NULL)
2648       goto fail;
2649   } else if (mux_template->presence == GST_PAD_ALWAYS) {
2650     GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
2651
2652     res =
2653         gst_element_get_static_pad (splitmux->muxer,
2654         mux_template->name_template);
2655     if (res == NULL)
2656       goto fail;
2657   } else {
2658     GST_ERROR_OBJECT (element,
2659         "unexpected pad presence %d", mux_template->presence);
2660
2661     goto fail;
2662   }
2663
2664   if (is_video)
2665     gname = g_strdup ("video");
2666   else if (name == NULL)
2667     gname = gst_pad_get_name (res);
2668   else
2669     gname = g_strdup (name);
2670
2671   if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2672     goto fail;
2673
2674   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2675
2676   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2677       "max-size-buffers", 5, NULL);
2678
2679   q_sink = gst_element_get_static_pad (q, "sink");
2680   q_src = gst_element_get_static_pad (q, "src");
2681
2682   if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2683     gst_element_release_request_pad (splitmux->muxer, res);
2684     gst_object_unref (GST_OBJECT (res));
2685     goto fail;
2686   }
2687
2688   gst_object_unref (GST_OBJECT (res));
2689
2690   ctx = mq_stream_ctx_new (splitmux);
2691   /* Context holds a ref: */
2692   ctx->q = gst_object_ref (q);
2693   ctx->srcpad = q_src;
2694   ctx->sinkpad = q_sink;
2695   ctx->q_overrun_id =
2696       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2697   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2698
2699   ctx->src_pad_block_id =
2700       gst_pad_add_probe (q_src,
2701       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2702       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2703   if (is_video && splitmux->reference_ctx != NULL) {
2704     splitmux->reference_ctx->is_reference = FALSE;
2705     splitmux->reference_ctx = NULL;
2706   }
2707   if (splitmux->reference_ctx == NULL) {
2708     splitmux->reference_ctx = ctx;
2709     ctx->is_reference = TRUE;
2710   }
2711
2712   res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2713   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2714
2715   ctx->sink_pad_block_id =
2716       gst_pad_add_probe (q_sink,
2717       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2718       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2719       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2720
2721   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2722       " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2723
2724   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2725
2726   g_free (gname);
2727
2728   if (is_video)
2729     splitmux->have_video = TRUE;
2730
2731   gst_pad_set_active (res, TRUE);
2732   gst_element_add_pad (element, res);
2733
2734   GST_SPLITMUX_UNLOCK (splitmux);
2735
2736   return res;
2737 fail:
2738   GST_SPLITMUX_UNLOCK (splitmux);
2739
2740   if (q_sink)
2741     gst_object_unref (q_sink);
2742   if (q_src)
2743     gst_object_unref (q_src);
2744   return NULL;
2745 already_have_video:
2746   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2747   GST_SPLITMUX_UNLOCK (splitmux);
2748   return NULL;
2749 }
2750
2751 static void
2752 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2753 {
2754   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2755   GstPad *muxpad = NULL;
2756   MqStreamCtx *ctx =
2757       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2758
2759   GST_SPLITMUX_LOCK (splitmux);
2760
2761   if (splitmux->muxer == NULL)
2762     goto fail;                  /* Elements don't exist yet - nothing to release */
2763
2764   GST_INFO_OBJECT (pad, "releasing request pad");
2765
2766   muxpad = gst_pad_get_peer (ctx->srcpad);
2767
2768   /* Remove the context from our consideration */
2769   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2770
2771   if (ctx->sink_pad_block_id)
2772     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2773
2774   if (ctx->src_pad_block_id)
2775     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2776
2777   /* Can release the context now */
2778   mq_stream_ctx_free (ctx);
2779   if (ctx == splitmux->reference_ctx)
2780     splitmux->reference_ctx = NULL;
2781
2782   /* Release and free the muxer input */
2783   if (muxpad) {
2784     gst_element_release_request_pad (splitmux->muxer, muxpad);
2785     gst_object_unref (muxpad);
2786   }
2787
2788   if (GST_PAD_PAD_TEMPLATE (pad) &&
2789       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2790               (pad)), "video"))
2791     splitmux->have_video = FALSE;
2792
2793   gst_element_remove_pad (element, pad);
2794
2795   /* Reset the internal elements only after all request pads are released */
2796   if (splitmux->contexts == NULL)
2797     gst_splitmux_reset (splitmux);
2798
2799 fail:
2800   GST_SPLITMUX_UNLOCK (splitmux);
2801 }
2802
2803 static GstElement *
2804 create_element (GstSplitMuxSink * splitmux,
2805     const gchar * factory, const gchar * name, gboolean locked)
2806 {
2807   GstElement *ret = gst_element_factory_make (factory, name);
2808   if (ret == NULL) {
2809     g_warning ("Failed to create %s - splitmuxsink will not work", name);
2810     return NULL;
2811   }
2812
2813   if (locked) {
2814     /* Ensure the sink starts in locked state and NULL - it will be changed
2815      * by the filename setting code */
2816     gst_element_set_locked_state (ret, TRUE);
2817     gst_element_set_state (ret, GST_STATE_NULL);
2818   }
2819
2820   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2821     g_warning ("Could not add %s element - splitmuxsink will not work", name);
2822     gst_object_unref (ret);
2823     return NULL;
2824   }
2825
2826   return ret;
2827 }
2828
2829 static gboolean
2830 create_muxer (GstSplitMuxSink * splitmux)
2831 {
2832   /* Create internal elements */
2833   if (splitmux->muxer == NULL) {
2834     GstElement *provided_muxer = NULL;
2835
2836     GST_OBJECT_LOCK (splitmux);
2837     if (splitmux->provided_muxer != NULL)
2838       provided_muxer = gst_object_ref (splitmux->provided_muxer);
2839     GST_OBJECT_UNLOCK (splitmux);
2840
2841     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2842         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2843       if ((splitmux->muxer =
2844               create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2845         goto fail;
2846     } else if (splitmux->async_finalize) {
2847       if ((splitmux->muxer =
2848               create_element (splitmux, splitmux->muxer_factory, "muxer",
2849                   FALSE)) == NULL)
2850         goto fail;
2851       if (splitmux->muxer_properties)
2852         gst_structure_foreach (splitmux->muxer_properties,
2853             _set_property_from_structure, splitmux->muxer);
2854     } else {
2855       /* Ensure it's not in locked state (we might be reusing an old element) */
2856       gst_element_set_locked_state (provided_muxer, FALSE);
2857       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2858         g_warning ("Could not add muxer element - splitmuxsink will not work");
2859         gst_object_unref (provided_muxer);
2860         goto fail;
2861       }
2862
2863       splitmux->muxer = provided_muxer;
2864       gst_object_unref (provided_muxer);
2865     }
2866
2867     if (splitmux->use_robust_muxing) {
2868       update_muxer_properties (splitmux);
2869     }
2870   }
2871
2872   return TRUE;
2873 fail:
2874   return FALSE;
2875 }
2876
2877 static GstElement *
2878 find_sink (GstElement * e)
2879 {
2880   GstElement *res = NULL;
2881   GstIterator *iter;
2882   gboolean done = FALSE;
2883   GValue data = { 0, };
2884
2885   if (!GST_IS_BIN (e))
2886     return e;
2887
2888   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2889     return e;
2890
2891   iter = gst_bin_iterate_sinks (GST_BIN (e));
2892   while (!done) {
2893     switch (gst_iterator_next (iter, &data)) {
2894       case GST_ITERATOR_OK:
2895       {
2896         GstElement *child = g_value_get_object (&data);
2897         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2898                 "location") != NULL) {
2899           res = child;
2900           done = TRUE;
2901         }
2902         g_value_reset (&data);
2903         break;
2904       }
2905       case GST_ITERATOR_RESYNC:
2906         gst_iterator_resync (iter);
2907         break;
2908       case GST_ITERATOR_DONE:
2909         done = TRUE;
2910         break;
2911       case GST_ITERATOR_ERROR:
2912         g_assert_not_reached ();
2913         break;
2914     }
2915   }
2916   g_value_unset (&data);
2917   gst_iterator_free (iter);
2918
2919   return res;
2920 }
2921
2922 static gboolean
2923 create_sink (GstSplitMuxSink * splitmux)
2924 {
2925   GstElement *provided_sink = NULL;
2926
2927   if (splitmux->active_sink == NULL) {
2928
2929     GST_OBJECT_LOCK (splitmux);
2930     if (splitmux->provided_sink != NULL)
2931       provided_sink = gst_object_ref (splitmux->provided_sink);
2932     GST_OBJECT_UNLOCK (splitmux);
2933
2934     if ((!splitmux->async_finalize && provided_sink == NULL) ||
2935         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2936       if ((splitmux->sink =
2937               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2938         goto fail;
2939       splitmux->active_sink = splitmux->sink;
2940     } else if (splitmux->async_finalize) {
2941       if ((splitmux->sink =
2942               create_element (splitmux, splitmux->sink_factory, "sink",
2943                   TRUE)) == NULL)
2944         goto fail;
2945       if (splitmux->sink_properties)
2946         gst_structure_foreach (splitmux->sink_properties,
2947             _set_property_from_structure, splitmux->sink);
2948       splitmux->active_sink = splitmux->sink;
2949     } else {
2950       /* Ensure the sink starts in locked state and NULL - it will be changed
2951        * by the filename setting code */
2952       gst_element_set_locked_state (provided_sink, TRUE);
2953       gst_element_set_state (provided_sink, GST_STATE_NULL);
2954       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2955         g_warning ("Could not add sink elements - splitmuxsink will not work");
2956         gst_object_unref (provided_sink);
2957         goto fail;
2958       }
2959
2960       splitmux->active_sink = provided_sink;
2961
2962       /* The bin holds a ref now, we can drop our tmp ref */
2963       gst_object_unref (provided_sink);
2964
2965       /* Find the sink element */
2966       splitmux->sink = find_sink (splitmux->active_sink);
2967       if (splitmux->sink == NULL) {
2968         g_warning
2969             ("Could not locate sink element in provided sink - splitmuxsink will not work");
2970         goto fail;
2971       }
2972     }
2973
2974 #if 1
2975     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2976             "async") != NULL) {
2977       /* async child elements are causing state change races and weird
2978        * failures, so let's try and turn that off */
2979       g_object_set (splitmux->sink, "async", FALSE, NULL);
2980     }
2981 #endif
2982
2983     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2984       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2985       goto fail;
2986     }
2987   }
2988
2989   return TRUE;
2990 fail:
2991   return FALSE;
2992 }
2993
2994 #ifdef __GNUC__
2995 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2996 #endif
2997 static void
2998 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2999 {
3000   gchar *fname = NULL;
3001   GstSample *sample;
3002   GstCaps *caps;
3003
3004   gst_splitmux_sink_ensure_max_files (splitmux);
3005
3006   if (ctx->cur_out_buffer == NULL) {
3007     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3008   }
3009
3010   caps = gst_pad_get_current_caps (ctx->srcpad);
3011   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3012   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3013       splitmux->fragment_id, sample, &fname);
3014   gst_sample_unref (sample);
3015   if (caps)
3016     gst_caps_unref (caps);
3017
3018   if (fname == NULL) {
3019     /* Fallback to the old signal if the new one returned nothing */
3020     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3021         splitmux->fragment_id, &fname);
3022   }
3023
3024   if (!fname)
3025     fname = splitmux->location ?
3026         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3027
3028   if (fname) {
3029     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3030     g_object_set (splitmux->sink, "location", fname, NULL);
3031     g_free (fname);
3032
3033     splitmux->fragment_id++;
3034   }
3035 }
3036
3037 static void
3038 do_async_start (GstSplitMuxSink * splitmux)
3039 {
3040   GstMessage *message;
3041
3042   if (!splitmux->need_async_start) {
3043     GST_INFO_OBJECT (splitmux, "no async_start needed");
3044     return;
3045   }
3046
3047   splitmux->async_pending = TRUE;
3048
3049   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3050   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3051   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3052       (splitmux), message);
3053 }
3054
3055 static void
3056 do_async_done (GstSplitMuxSink * splitmux)
3057 {
3058   GstMessage *message;
3059
3060   if (splitmux->async_pending) {
3061     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3062     message =
3063         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3064         GST_CLOCK_TIME_NONE);
3065     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3066         (splitmux), message);
3067
3068     splitmux->async_pending = FALSE;
3069   }
3070
3071   splitmux->need_async_start = FALSE;
3072 }
3073
3074 static GstStateChangeReturn
3075 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3076 {
3077   GstStateChangeReturn ret;
3078   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3079
3080   switch (transition) {
3081     case GST_STATE_CHANGE_NULL_TO_READY:{
3082       GST_SPLITMUX_LOCK (splitmux);
3083       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3084         ret = GST_STATE_CHANGE_FAILURE;
3085         GST_SPLITMUX_UNLOCK (splitmux);
3086         goto beach;
3087       }
3088       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3089       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3090       GST_SPLITMUX_UNLOCK (splitmux);
3091       splitmux->fragment_id = 0;
3092       break;
3093     }
3094     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3095       GST_SPLITMUX_LOCK (splitmux);
3096       /* Start by collecting one input on each pad */
3097       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3098       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3099       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3100       splitmux->gop_start_time = splitmux->fragment_start_time =
3101           GST_CLOCK_STIME_NONE;
3102       splitmux->muxed_out_bytes = 0;
3103       splitmux->ready_for_output = FALSE;
3104       GST_SPLITMUX_UNLOCK (splitmux);
3105       break;
3106     }
3107     case GST_STATE_CHANGE_PAUSED_TO_READY:
3108       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3109       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3110     case GST_STATE_CHANGE_READY_TO_NULL:
3111       GST_SPLITMUX_LOCK (splitmux);
3112       gst_queue_array_clear (splitmux->times_to_split);
3113       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3114       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3115       /* Wake up any blocked threads */
3116       GST_LOG_OBJECT (splitmux,
3117           "State change -> NULL or READY. Waking threads");
3118       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3119       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3120       GST_SPLITMUX_UNLOCK (splitmux);
3121       break;
3122     default:
3123       break;
3124   }
3125
3126   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3127   if (ret == GST_STATE_CHANGE_FAILURE)
3128     goto beach;
3129
3130   switch (transition) {
3131     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3132       splitmux->need_async_start = TRUE;
3133       break;
3134     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3135       /* Change state async, because our child sink might not
3136        * be ready to do that for us yet if it's state is still locked */
3137
3138       splitmux->need_async_start = TRUE;
3139       /* we want to go async to PAUSED until we managed to configure and add the
3140        * sink */
3141       GST_SPLITMUX_LOCK (splitmux);
3142       do_async_start (splitmux);
3143       GST_SPLITMUX_UNLOCK (splitmux);
3144       ret = GST_STATE_CHANGE_ASYNC;
3145       break;
3146     }
3147     case GST_STATE_CHANGE_READY_TO_NULL:
3148       GST_SPLITMUX_LOCK (splitmux);
3149       splitmux->fragment_id = 0;
3150       /* Reset internal elements only if no pad contexts are using them */
3151       if (splitmux->contexts == NULL)
3152         gst_splitmux_reset (splitmux);
3153       do_async_done (splitmux);
3154       GST_SPLITMUX_UNLOCK (splitmux);
3155       break;
3156     default:
3157       break;
3158   }
3159
3160 beach:
3161   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3162       ret == GST_STATE_CHANGE_FAILURE) {
3163     /* Cleanup elements on failed transition out of NULL */
3164     gst_splitmux_reset (splitmux);
3165     GST_SPLITMUX_LOCK (splitmux);
3166     do_async_done (splitmux);
3167     GST_SPLITMUX_UNLOCK (splitmux);
3168   }
3169   return ret;
3170 }
3171
3172 gboolean
3173 register_splitmuxsink (GstPlugin * plugin)
3174 {
3175   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3176       "Split File Muxing Sink");
3177
3178   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3179       GST_TYPE_SPLITMUX_SINK);
3180 }
3181
3182 static void
3183 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3184 {
3185   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3186     splitmux->fragment_id = 0;
3187   }
3188 }
3189
3190 static void
3191 split_now (GstSplitMuxSink * splitmux)
3192 {
3193   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3194 }
3195
3196 static void
3197 split_after (GstSplitMuxSink * splitmux)
3198 {
3199   g_atomic_int_set (&(splitmux->split_requested), TRUE);
3200 }
3201
3202 static void
3203 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3204 {
3205   gboolean send_keyframe_requests;
3206
3207   GST_SPLITMUX_LOCK (splitmux);
3208   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3209   send_keyframe_requests = splitmux->send_keyframe_requests;
3210   GST_SPLITMUX_UNLOCK (splitmux);
3211
3212   if (send_keyframe_requests) {
3213     GstEvent *ev =
3214         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3215     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3216         GST_TIME_ARGS (split_time));
3217     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3218       GST_WARNING_OBJECT (splitmux,
3219           "Could not request keyframe at %" GST_TIME_FORMAT,
3220           GST_TIME_ARGS (split_time));
3221     }
3222   }
3223 }