test: rtpbin_buffer_list: check if the chain_list function has been called
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * In the async-finalize mode, when the threshold is crossed, the old muxer
42  * and sink is disconnected from the pipeline and left to finish the file
43  * asynchronously, and a new muxer and sink is created to continue with the
44  * next fragment. For that reason, instead of muxer and sink objects, the
45  * muxer-factory and sink-factory properties are used to construct the new
46  * objects, together with muxer-properties and sink-properties.
47  *
48  * <refsect2>
49  * <title>Example pipelines</title>
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  * </refsect2>
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #include "config.h"
68 #endif
69
70 #include <string.h>
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
74
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
77
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
82
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
85
86 static void split_now (GstSplitMuxSink * splitmux);
87 static void split_after (GstSplitMuxSink * splitmux);
88 static void split_at_running_time (GstSplitMuxSink * splitmux,
89     GstClockTime split_time);
90
91 enum
92 {
93   PROP_0,
94   PROP_LOCATION,
95   PROP_MAX_SIZE_TIME,
96   PROP_MAX_SIZE_BYTES,
97   PROP_MAX_SIZE_TIMECODE,
98   PROP_SEND_KEYFRAME_REQUESTS,
99   PROP_MAX_FILES,
100   PROP_MUXER_OVERHEAD,
101   PROP_USE_ROBUST_MUXING,
102   PROP_ALIGNMENT_THRESHOLD,
103   PROP_MUXER,
104   PROP_SINK,
105   PROP_RESET_MUXER,
106   PROP_ASYNC_FINALIZE,
107   PROP_MUXER_FACTORY,
108   PROP_MUXER_PROPERTIES,
109   PROP_SINK_FACTORY,
110   PROP_SINK_PROPERTIES
111 };
112
113 #define DEFAULT_MAX_SIZE_TIME       0
114 #define DEFAULT_MAX_SIZE_BYTES      0
115 #define DEFAULT_MAX_FILES           0
116 #define DEFAULT_MUXER_OVERHEAD      0.02
117 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
118 #define DEFAULT_ALIGNMENT_THRESHOLD 0
119 #define DEFAULT_MUXER "mp4mux"
120 #define DEFAULT_SINK "filesink"
121 #define DEFAULT_USE_ROBUST_MUXING FALSE
122 #define DEFAULT_RESET_MUXER TRUE
123 #define DEFAULT_ASYNC_FINALIZE FALSE
124
125 typedef struct _AsyncEosHelper
126 {
127   MqStreamCtx *ctx;
128   GstPad *pad;
129 } AsyncEosHelper;
130
131 enum
132 {
133   SIGNAL_FORMAT_LOCATION,
134   SIGNAL_FORMAT_LOCATION_FULL,
135   SIGNAL_SPLIT_NOW,
136   SIGNAL_SPLIT_AFTER,
137   SIGNAL_SPLIT_AT_RUNNING_TIME,
138   SIGNAL_MUXER_ADDED,
139   SIGNAL_SINK_ADDED,
140   SIGNAL_LAST
141 };
142
143 static guint signals[SIGNAL_LAST];
144
145 static GstStaticPadTemplate video_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("video",
147     GST_PAD_SINK,
148     GST_PAD_REQUEST,
149     GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate audio_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("audio_%u",
152     GST_PAD_SINK,
153     GST_PAD_REQUEST,
154     GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate subtitle_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
157     GST_PAD_SINK,
158     GST_PAD_REQUEST,
159     GST_STATIC_CAPS_ANY);
160 static GstStaticPadTemplate caption_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("caption_%u",
162     GST_PAD_SINK,
163     GST_PAD_REQUEST,
164     GST_STATIC_CAPS_ANY);
165
166 static GQuark PAD_CONTEXT;
167 static GQuark EOS_FROM_US;
168 static GQuark RUNNING_TIME;
169 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
170  * to forward an incoming EOS message, but we cannot rely on the state of the
171  * splitmux anymore, so we set this qdata on the sink instead.
172  * The muxer and sink must be destroyed after both of these things have
173  * finished:
174  * 1) The EOS message has been sent when the fragment is ending
175  * 2) The muxer has been unlinked and relinked
176  * Therefore, EOS_FROM_US can have these two values:
177  * 0: EOS was not requested from us. Forward the message. The muxer and the
178  * sink will be destroyed together with the rest of the bin.
179  * 1: EOS was requested from us, but the other of the two tasks hasn't
180  * finished. Set EOS_FROM_US to 2 and do your stuff.
181  * 2: EOS was requested from us and the other of the two tasks has finished.
182  * Now we can destroy the muxer and the sink.
183  */
184
185 static void
186 _do_init (void)
187 {
188   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
189   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
190   RUNNING_TIME = g_quark_from_static_string ("running-time");
191 }
192
193 #define gst_splitmux_sink_parent_class parent_class
194 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
195     _do_init ());
196
197 static gboolean create_muxer (GstSplitMuxSink * splitmux);
198 static gboolean create_sink (GstSplitMuxSink * splitmux);
199 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
200     const GValue * value, GParamSpec * pspec);
201 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
202     GValue * value, GParamSpec * pspec);
203 static void gst_splitmux_sink_dispose (GObject * object);
204 static void gst_splitmux_sink_finalize (GObject * object);
205
206 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
207     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
208 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
209
210 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
211     element, GstStateChange transition);
212
213 static void bus_handler (GstBin * bin, GstMessage * msg);
214 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
215 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
216 static void mq_stream_ctx_free (MqStreamCtx * ctx);
217 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
218
219 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
220 static GstElement *create_element (GstSplitMuxSink * splitmux,
221     const gchar * factory, const gchar * name, gboolean locked);
222
223 static void do_async_done (GstSplitMuxSink * splitmux);
224
225 static MqStreamBuf *
226 mq_stream_buf_new (void)
227 {
228   return g_slice_new0 (MqStreamBuf);
229 }
230
231 static void
232 mq_stream_buf_free (MqStreamBuf * data)
233 {
234   g_slice_free (MqStreamBuf, data);
235 }
236
237 static SplitMuxOutputCommand *
238 out_cmd_buf_new (void)
239 {
240   return g_slice_new0 (SplitMuxOutputCommand);
241 }
242
243 static void
244 out_cmd_buf_free (SplitMuxOutputCommand * data)
245 {
246   g_slice_free (SplitMuxOutputCommand, data);
247 }
248
249 static void
250 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
251 {
252   GObjectClass *gobject_class = (GObjectClass *) klass;
253   GstElementClass *gstelement_class = (GstElementClass *) klass;
254   GstBinClass *gstbin_class = (GstBinClass *) klass;
255
256   gobject_class->set_property = gst_splitmux_sink_set_property;
257   gobject_class->get_property = gst_splitmux_sink_get_property;
258   gobject_class->dispose = gst_splitmux_sink_dispose;
259   gobject_class->finalize = gst_splitmux_sink_finalize;
260
261   gst_element_class_set_static_metadata (gstelement_class,
262       "Split Muxing Bin", "Generic/Bin/Muxer",
263       "Convenience bin that muxes incoming streams into multiple time/size limited files",
264       "Jan Schmidt <jan@centricular.com>");
265
266   gst_element_class_add_static_pad_template (gstelement_class,
267       &video_sink_template);
268   gst_element_class_add_static_pad_template (gstelement_class,
269       &audio_sink_template);
270   gst_element_class_add_static_pad_template (gstelement_class,
271       &subtitle_sink_template);
272   gst_element_class_add_static_pad_template (gstelement_class,
273       &caption_sink_template);
274
275   gstelement_class->change_state =
276       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
277   gstelement_class->request_new_pad =
278       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
279   gstelement_class->release_pad =
280       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
281
282   gstbin_class->handle_message = bus_handler;
283
284   g_object_class_install_property (gobject_class, PROP_LOCATION,
285       g_param_spec_string ("location", "File Output Pattern",
286           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
287           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
288   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
289       g_param_spec_double ("mux-overhead", "Muxing Overhead",
290           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
291           DEFAULT_MUXER_OVERHEAD,
292           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
293
294   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
295       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
296           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
297           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
299       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
300           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
301           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
303       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
304           "Maximum difference in timecode between first and last frame. "
305           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
306           "Will only be effective if a timecode track is present.",
307           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
309       g_param_spec_boolean ("send-keyframe-requests",
310           "Request keyframes at max-size-time",
311           "Request a keyframe every max-size-time ns to try splitting at that point. "
312           "Needs max-size-bytes to be 0 in order to be effective.",
313           DEFAULT_SEND_KEYFRAME_REQUESTS,
314           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
316       g_param_spec_uint ("max-files", "Max files",
317           "Maximum number of files to keep on disk. Once the maximum is reached,"
318           "old files start to be deleted to make room for new ones.", 0,
319           G_MAXUINT, DEFAULT_MAX_FILES,
320           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
322       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
323           "Allow non-reference streams to be that many ns before the reference"
324           " stream",
325           0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
326           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327
328   g_object_class_install_property (gobject_class, PROP_MUXER,
329       g_param_spec_object ("muxer", "Muxer",
330           "The muxer element to use (NULL = default mp4mux). "
331           "Valid only for async-finalize = FALSE",
332           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333   g_object_class_install_property (gobject_class, PROP_SINK,
334       g_param_spec_object ("sink", "Sink",
335           "The sink element (or element chain) to use (NULL = default filesink). "
336           "Valid only for async-finalize = FALSE",
337           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338
339   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
340       g_param_spec_boolean ("use-robust-muxing",
341           "Support robust-muxing mode of some muxers",
342           "Check if muxers support robust muxing via the reserved-max-duration and "
343           "reserved-duration-remaining properties and use them if so. "
344           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
345           " create new fragments if the reserved header space is about to overflow. "
346           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
347           "manually by the app to a non-zero value for robust muxing to have an effect.",
348           DEFAULT_USE_ROBUST_MUXING,
349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
352       g_param_spec_boolean ("reset-muxer",
353           "Reset Muxer",
354           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
355           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
358       g_param_spec_boolean ("async-finalize",
359           "Finalize fragments asynchronously",
360           "Finalize each fragment asynchronously and start a new one",
361           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
363       g_param_spec_string ("muxer-factory", "Muxer factory",
364           "The muxer element factory to use (default = mp4mux). "
365           "Valid only for async-finalize = TRUE",
366           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
368       g_param_spec_boxed ("muxer-properties", "Muxer properties",
369           "The muxer element properties to use. "
370           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
371           "Valid only for async-finalize = TRUE",
372           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
374       g_param_spec_string ("sink-factory", "Sink factory",
375           "The sink element factory to use (default = filesink). "
376           "Valid only for async-finalize = TRUE",
377           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
379       g_param_spec_boxed ("sink-properties", "Sink properties",
380           "The sink element properties to use. "
381           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
382           "Valid only for async-finalize = TRUE",
383           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
384
385   /**
386    * GstSplitMuxSink::format-location:
387    * @splitmux: the #GstSplitMuxSink
388    * @fragment_id: the sequence number of the file to be created
389    *
390    * Returns: the location to be used for the next output file
391    */
392   signals[SIGNAL_FORMAT_LOCATION] =
393       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
394       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
395
396   /**
397    * GstSplitMuxSink::format-location-full:
398    * @splitmux: the #GstSplitMuxSink
399    * @fragment_id: the sequence number of the file to be created
400    * @first_sample: A #GstSample containing the first buffer
401    *   from the reference stream in the new file
402    *
403    * Returns: the location to be used for the next output file
404    */
405   signals[SIGNAL_FORMAT_LOCATION_FULL] =
406       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
407       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
408       GST_TYPE_SAMPLE);
409
410   /**
411    * GstSplitMuxSink::split-now:
412    * @splitmux: the #GstSplitMuxSink
413    *
414    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
415    * The current GOP will be output to the new file.
416    *
417    * Since: 1.14
418    */
419   signals[SIGNAL_SPLIT_NOW] =
420       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
421       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
422           split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
423
424   /**
425    * GstSplitMuxSink::split-after:
426    * @splitmux: the #GstSplitMuxSink
427    *
428    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
429    * The current GOP will be output to the old file.
430    *
431    * Since: 1.16
432    */
433   signals[SIGNAL_SPLIT_AFTER] =
434       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
435       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
436           split_after), NULL, NULL, NULL, G_TYPE_NONE, 0);
437
438   /**
439    * GstSplitMuxSink::split-now:
440    * @splitmux: the #GstSplitMuxSink
441    *
442    * When called by the user, this action signal splits the video file (and
443    * begins a new one) as soon as the given running time is reached. If this
444    * action signal is called multiple times, running times are queued up and
445    * processed in the order they were given.
446    *
447    * Note that this is prone to race conditions, where said running time is
448    * reached and surpassed before we had a chance to split. The file will
449    * still split immediately, but in order to make sure that the split doesn't
450    * happen too late, it is recommended to call this action signal from
451    * something that will prevent further buffers from flowing into
452    * splitmuxsink before the split is completed, such as a pad probe before
453    * splitmuxsink.
454    *
455    *
456    * Since: 1.16
457    */
458   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
459       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
460       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
461           split_at_running_time), NULL, NULL, NULL, G_TYPE_NONE, 1,
462       G_TYPE_UINT64);
463
464   /**
465    * GstSplitMuxSink::muxer-added:
466    * @splitmux: the #GstSplitMuxSink
467    * @muxer: the newly added muxer element
468    *
469    * Since: 1.14
470    */
471   signals[SIGNAL_MUXER_ADDED] =
472       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
473       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
474
475   /**
476    * GstSplitMuxSink::sink-added:
477    * @splitmux: the #GstSplitMuxSink
478    * @sink: the newly added sink element
479    *
480    * Since: 1.14
481    */
482   signals[SIGNAL_SINK_ADDED] =
483       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
484       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
485
486   klass->split_now = split_now;
487   klass->split_after = split_after;
488   klass->split_at_running_time = split_at_running_time;
489 }
490
491 static void
492 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
493 {
494   g_mutex_init (&splitmux->lock);
495   g_cond_init (&splitmux->input_cond);
496   g_cond_init (&splitmux->output_cond);
497   g_queue_init (&splitmux->out_cmd_q);
498
499   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
500   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
501   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
502   splitmux->max_files = DEFAULT_MAX_FILES;
503   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
504   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
505   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
506   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
507   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
508
509   splitmux->threshold_timecode_str = NULL;
510
511   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
512   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
513   splitmux->muxer_properties = NULL;
514   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
515   splitmux->sink_properties = NULL;
516
517   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
518   splitmux->split_requested = FALSE;
519   splitmux->do_split_next_gop = FALSE;
520   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
521 }
522
523 static void
524 gst_splitmux_reset (GstSplitMuxSink * splitmux)
525 {
526   if (splitmux->muxer) {
527     gst_element_set_locked_state (splitmux->muxer, TRUE);
528     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
529     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
530   }
531   if (splitmux->active_sink) {
532     gst_element_set_locked_state (splitmux->active_sink, TRUE);
533     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
534     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
535   }
536
537   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
538 }
539
540 static void
541 gst_splitmux_sink_dispose (GObject * object)
542 {
543   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
544
545   /* Calling parent dispose invalidates all child pointers */
546   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
547
548   G_OBJECT_CLASS (parent_class)->dispose (object);
549 }
550
551 static void
552 gst_splitmux_sink_finalize (GObject * object)
553 {
554   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
555   g_cond_clear (&splitmux->input_cond);
556   g_cond_clear (&splitmux->output_cond);
557   g_mutex_clear (&splitmux->lock);
558   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
559   g_queue_clear (&splitmux->out_cmd_q);
560
561   if (splitmux->provided_sink)
562     gst_object_unref (splitmux->provided_sink);
563   if (splitmux->provided_muxer)
564     gst_object_unref (splitmux->provided_muxer);
565
566   if (splitmux->muxer_factory)
567     g_free (splitmux->muxer_factory);
568   if (splitmux->muxer_properties)
569     gst_structure_free (splitmux->muxer_properties);
570   if (splitmux->sink_factory)
571     g_free (splitmux->sink_factory);
572   if (splitmux->sink_properties)
573     gst_structure_free (splitmux->sink_properties);
574
575   if (splitmux->threshold_timecode_str)
576     g_free (splitmux->threshold_timecode_str);
577
578   if (splitmux->times_to_split)
579     gst_queue_array_free (splitmux->times_to_split);
580
581   g_free (splitmux->location);
582
583   /* Make sure to free any un-released contexts. There should not be any,
584    * because the dispose will have freed all request pads though */
585   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
586   g_list_free (splitmux->contexts);
587
588   G_OBJECT_CLASS (parent_class)->finalize (object);
589 }
590
591 /*
592  * Set any time threshold to the muxer, if it has
593  * reserved-max-duration and reserved-duration-remaining
594  * properties. Called when creating/claiming the muxer
595  * in create_elements() */
596 static void
597 update_muxer_properties (GstSplitMuxSink * sink)
598 {
599   GObjectClass *klass;
600   GstClockTime threshold_time;
601
602   sink->muxer_has_reserved_props = FALSE;
603   if (sink->muxer == NULL)
604     return;
605   klass = G_OBJECT_GET_CLASS (sink->muxer);
606   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
607     return;
608   if (g_object_class_find_property (klass,
609           "reserved-duration-remaining") == NULL)
610     return;
611   sink->muxer_has_reserved_props = TRUE;
612
613   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
614       GST_TIME_ARGS (sink->threshold_time));
615   GST_OBJECT_LOCK (sink);
616   threshold_time = sink->threshold_time;
617   GST_OBJECT_UNLOCK (sink);
618
619   if (threshold_time > 0) {
620     /* Tell the muxer how much space to reserve */
621     GstClockTime muxer_threshold = threshold_time;
622     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
623   }
624 }
625
626 static void
627 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
628     const GValue * value, GParamSpec * pspec)
629 {
630   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
631
632   switch (prop_id) {
633     case PROP_LOCATION:{
634       GST_OBJECT_LOCK (splitmux);
635       g_free (splitmux->location);
636       splitmux->location = g_value_dup_string (value);
637       GST_OBJECT_UNLOCK (splitmux);
638       break;
639     }
640     case PROP_MAX_SIZE_BYTES:
641       GST_OBJECT_LOCK (splitmux);
642       splitmux->threshold_bytes = g_value_get_uint64 (value);
643       GST_OBJECT_UNLOCK (splitmux);
644       break;
645     case PROP_MAX_SIZE_TIME:
646       GST_OBJECT_LOCK (splitmux);
647       splitmux->threshold_time = g_value_get_uint64 (value);
648       GST_OBJECT_UNLOCK (splitmux);
649       break;
650     case PROP_MAX_SIZE_TIMECODE:
651       GST_OBJECT_LOCK (splitmux);
652       splitmux->threshold_timecode_str = g_value_dup_string (value);
653       GST_OBJECT_UNLOCK (splitmux);
654       break;
655     case PROP_SEND_KEYFRAME_REQUESTS:
656       GST_OBJECT_LOCK (splitmux);
657       splitmux->send_keyframe_requests = g_value_get_boolean (value);
658       GST_OBJECT_UNLOCK (splitmux);
659       break;
660     case PROP_MAX_FILES:
661       GST_OBJECT_LOCK (splitmux);
662       splitmux->max_files = g_value_get_uint (value);
663       GST_OBJECT_UNLOCK (splitmux);
664       break;
665     case PROP_MUXER_OVERHEAD:
666       GST_OBJECT_LOCK (splitmux);
667       splitmux->mux_overhead = g_value_get_double (value);
668       GST_OBJECT_UNLOCK (splitmux);
669       break;
670     case PROP_USE_ROBUST_MUXING:
671       GST_OBJECT_LOCK (splitmux);
672       splitmux->use_robust_muxing = g_value_get_boolean (value);
673       GST_OBJECT_UNLOCK (splitmux);
674       if (splitmux->use_robust_muxing)
675         update_muxer_properties (splitmux);
676       break;
677     case PROP_ALIGNMENT_THRESHOLD:
678       GST_OBJECT_LOCK (splitmux);
679       splitmux->alignment_threshold = g_value_get_uint64 (value);
680       GST_OBJECT_UNLOCK (splitmux);
681       break;
682     case PROP_SINK:
683       GST_OBJECT_LOCK (splitmux);
684       if (splitmux->provided_sink)
685         gst_object_unref (splitmux->provided_sink);
686       splitmux->provided_sink = g_value_get_object (value);
687       gst_object_ref_sink (splitmux->provided_sink);
688       GST_OBJECT_UNLOCK (splitmux);
689       break;
690     case PROP_MUXER:
691       GST_OBJECT_LOCK (splitmux);
692       if (splitmux->provided_muxer)
693         gst_object_unref (splitmux->provided_muxer);
694       splitmux->provided_muxer = g_value_get_object (value);
695       gst_object_ref_sink (splitmux->provided_muxer);
696       GST_OBJECT_UNLOCK (splitmux);
697       break;
698     case PROP_RESET_MUXER:
699       GST_OBJECT_LOCK (splitmux);
700       splitmux->reset_muxer = g_value_get_boolean (value);
701       GST_OBJECT_UNLOCK (splitmux);
702       break;
703     case PROP_ASYNC_FINALIZE:
704       GST_OBJECT_LOCK (splitmux);
705       splitmux->async_finalize = g_value_get_boolean (value);
706       GST_OBJECT_UNLOCK (splitmux);
707       break;
708     case PROP_MUXER_FACTORY:
709       GST_OBJECT_LOCK (splitmux);
710       if (splitmux->muxer_factory)
711         g_free (splitmux->muxer_factory);
712       splitmux->muxer_factory = g_value_dup_string (value);
713       GST_OBJECT_UNLOCK (splitmux);
714       break;
715     case PROP_MUXER_PROPERTIES:
716       GST_OBJECT_LOCK (splitmux);
717       if (splitmux->muxer_properties)
718         gst_structure_free (splitmux->muxer_properties);
719       if (gst_value_get_structure (value))
720         splitmux->muxer_properties =
721             gst_structure_copy (gst_value_get_structure (value));
722       else
723         splitmux->muxer_properties = NULL;
724       GST_OBJECT_UNLOCK (splitmux);
725       break;
726     case PROP_SINK_FACTORY:
727       GST_OBJECT_LOCK (splitmux);
728       if (splitmux->sink_factory)
729         g_free (splitmux->sink_factory);
730       splitmux->sink_factory = g_value_dup_string (value);
731       GST_OBJECT_UNLOCK (splitmux);
732       break;
733     case PROP_SINK_PROPERTIES:
734       GST_OBJECT_LOCK (splitmux);
735       if (splitmux->sink_properties)
736         gst_structure_free (splitmux->sink_properties);
737       if (gst_value_get_structure (value))
738         splitmux->sink_properties =
739             gst_structure_copy (gst_value_get_structure (value));
740       else
741         splitmux->sink_properties = NULL;
742       GST_OBJECT_UNLOCK (splitmux);
743       break;
744     default:
745       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
746       break;
747   }
748 }
749
750 static void
751 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
752     GValue * value, GParamSpec * pspec)
753 {
754   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
755
756   switch (prop_id) {
757     case PROP_LOCATION:
758       GST_OBJECT_LOCK (splitmux);
759       g_value_set_string (value, splitmux->location);
760       GST_OBJECT_UNLOCK (splitmux);
761       break;
762     case PROP_MAX_SIZE_BYTES:
763       GST_OBJECT_LOCK (splitmux);
764       g_value_set_uint64 (value, splitmux->threshold_bytes);
765       GST_OBJECT_UNLOCK (splitmux);
766       break;
767     case PROP_MAX_SIZE_TIME:
768       GST_OBJECT_LOCK (splitmux);
769       g_value_set_uint64 (value, splitmux->threshold_time);
770       GST_OBJECT_UNLOCK (splitmux);
771       break;
772     case PROP_MAX_SIZE_TIMECODE:
773       GST_OBJECT_LOCK (splitmux);
774       g_value_set_string (value, splitmux->threshold_timecode_str);
775       GST_OBJECT_UNLOCK (splitmux);
776       break;
777     case PROP_SEND_KEYFRAME_REQUESTS:
778       GST_OBJECT_LOCK (splitmux);
779       g_value_set_boolean (value, splitmux->send_keyframe_requests);
780       GST_OBJECT_UNLOCK (splitmux);
781       break;
782     case PROP_MAX_FILES:
783       GST_OBJECT_LOCK (splitmux);
784       g_value_set_uint (value, splitmux->max_files);
785       GST_OBJECT_UNLOCK (splitmux);
786       break;
787     case PROP_MUXER_OVERHEAD:
788       GST_OBJECT_LOCK (splitmux);
789       g_value_set_double (value, splitmux->mux_overhead);
790       GST_OBJECT_UNLOCK (splitmux);
791       break;
792     case PROP_USE_ROBUST_MUXING:
793       GST_OBJECT_LOCK (splitmux);
794       g_value_set_boolean (value, splitmux->use_robust_muxing);
795       GST_OBJECT_UNLOCK (splitmux);
796       break;
797     case PROP_ALIGNMENT_THRESHOLD:
798       GST_OBJECT_LOCK (splitmux);
799       g_value_set_uint64 (value, splitmux->alignment_threshold);
800       GST_OBJECT_UNLOCK (splitmux);
801       break;
802     case PROP_SINK:
803       GST_OBJECT_LOCK (splitmux);
804       g_value_set_object (value, splitmux->provided_sink);
805       GST_OBJECT_UNLOCK (splitmux);
806       break;
807     case PROP_MUXER:
808       GST_OBJECT_LOCK (splitmux);
809       g_value_set_object (value, splitmux->provided_muxer);
810       GST_OBJECT_UNLOCK (splitmux);
811       break;
812     case PROP_RESET_MUXER:
813       GST_OBJECT_LOCK (splitmux);
814       g_value_set_boolean (value, splitmux->reset_muxer);
815       GST_OBJECT_UNLOCK (splitmux);
816       break;
817     case PROP_ASYNC_FINALIZE:
818       GST_OBJECT_LOCK (splitmux);
819       g_value_set_boolean (value, splitmux->async_finalize);
820       GST_OBJECT_UNLOCK (splitmux);
821       break;
822     case PROP_MUXER_FACTORY:
823       GST_OBJECT_LOCK (splitmux);
824       g_value_set_string (value, splitmux->muxer_factory);
825       GST_OBJECT_UNLOCK (splitmux);
826       break;
827     case PROP_MUXER_PROPERTIES:
828       GST_OBJECT_LOCK (splitmux);
829       gst_value_set_structure (value, splitmux->muxer_properties);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK_FACTORY:
833       GST_OBJECT_LOCK (splitmux);
834       g_value_set_string (value, splitmux->sink_factory);
835       GST_OBJECT_UNLOCK (splitmux);
836       break;
837     case PROP_SINK_PROPERTIES:
838       GST_OBJECT_LOCK (splitmux);
839       gst_value_set_structure (value, splitmux->sink_properties);
840       GST_OBJECT_UNLOCK (splitmux);
841       break;
842     default:
843       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
844       break;
845   }
846 }
847
848 /* Convenience function */
849 static inline GstClockTimeDiff
850 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
851 {
852   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
853
854   if (GST_CLOCK_TIME_IS_VALID (val)) {
855     gboolean sign =
856         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
857     if (sign > 0)
858       res = val;
859     else if (sign < 0)
860       res = -val;
861   }
862   return res;
863 }
864
865 static MqStreamCtx *
866 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
867 {
868   MqStreamCtx *ctx;
869
870   ctx = g_new0 (MqStreamCtx, 1);
871   ctx->splitmux = splitmux;
872   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
873   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
874   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
875   g_queue_init (&ctx->queued_bufs);
876   return ctx;
877 }
878
879 static void
880 mq_stream_ctx_free (MqStreamCtx * ctx)
881 {
882   if (ctx->q) {
883     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
884
885     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
886
887     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
888       gst_element_set_locked_state (ctx->q, TRUE);
889       gst_element_set_state (ctx->q, GST_STATE_NULL);
890       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
891       gst_object_unref (parent);
892     }
893     gst_object_unref (ctx->q);
894   }
895   gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
896   gst_object_unref (ctx->sinkpad);
897   gst_object_unref (ctx->srcpad);
898   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
899   g_queue_clear (&ctx->queued_bufs);
900   g_free (ctx);
901 }
902
903 static void
904 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
905     GstElement * sink)
906 {
907   gchar *location = NULL;
908   GstMessage *msg;
909   const gchar *msg_name = opened ?
910       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
911   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
912
913   if (!opened) {
914     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
915     if (rtime)
916       running_time = *rtime;
917   }
918
919   g_object_get (sink, "location", &location, NULL);
920
921   /* If it's in the middle of a teardown, the reference_ctc might have become
922    * NULL */
923   if (splitmux->reference_ctx) {
924     msg = gst_message_new_element (GST_OBJECT (splitmux),
925         gst_structure_new (msg_name,
926             "location", G_TYPE_STRING, location,
927             "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
928     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
929   }
930
931   g_free (location);
932 }
933
934 static void
935 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
936 {
937   GstEvent *eos;
938   GstPad *pad;
939   MqStreamCtx *ctx;
940
941   eos = gst_event_new_eos ();
942   pad = helper->pad;
943   ctx = helper->ctx;
944
945   GST_SPLITMUX_LOCK (splitmux);
946   if (!pad)
947     pad = gst_pad_get_peer (ctx->srcpad);
948   GST_SPLITMUX_UNLOCK (splitmux);
949
950   gst_pad_send_event (pad, eos);
951   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
952
953   gst_object_unref (pad);
954   g_free (helper);
955 }
956
957 /* Called with lock held, drops the lock to send EOS to the
958  * pad
959  */
960 static void
961 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
962 {
963   GstEvent *eos;
964   GstPad *pad;
965
966   eos = gst_event_new_eos ();
967   pad = gst_pad_get_peer (ctx->srcpad);
968
969   ctx->out_eos = TRUE;
970
971   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
972   GST_SPLITMUX_UNLOCK (splitmux);
973   gst_pad_send_event (pad, eos);
974   GST_SPLITMUX_LOCK (splitmux);
975
976   gst_object_unref (pad);
977 }
978
979 /* Called with lock held. Schedules an EOS event to the ctx pad
980  * to happen in another thread */
981 static void
982 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
983 {
984   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
985   GstPad *srcpad, *sinkpad;
986
987   srcpad = ctx->srcpad;
988   sinkpad = gst_pad_get_peer (srcpad);
989
990   helper->ctx = ctx;
991   helper->pad = sinkpad;        /* Takes the reference */
992
993   ctx->out_eos_async_done = TRUE;
994   /* HACK: Here, we explicitly unset the SINK flag on the target sink element
995    * that's about to be asynchronously disposed, so that it no longer
996    * participates in GstBin EOS logic. This fixes a race where if
997    * splitmuxsink really reaches EOS before an asynchronous background
998    * element has finished, then the bin won't actually send EOS to the
999    * pipeline. Even after finishing and removing the old element, the
1000    * bin doesn't re-check EOS status on removing a SINK element. This
1001    * should be fixed in core, making this hack unnecessary. */
1002   GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
1003
1004   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1005       sinkpad, ctx);
1006
1007   g_assert_nonnull (helper->pad);
1008   gst_element_call_async (GST_ELEMENT (splitmux),
1009       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1010 }
1011
1012 /* Called with lock held. TRUE iff all contexts have a
1013  * pending (or delivered) async eos event */
1014 static gboolean
1015 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1016 {
1017   gboolean ret = TRUE;
1018   GList *item;
1019
1020   for (item = splitmux->contexts; item; item = item->next) {
1021     MqStreamCtx *ctx = item->data;
1022     ret &= ctx->out_eos_async_done;
1023   }
1024   return ret;
1025 }
1026
1027 /* Called with splitmux lock held to check if this output
1028  * context needs to sleep to wait for the release of the
1029  * next GOP, or to send EOS to close out the current file
1030  */
1031 static void
1032 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1033 {
1034   if (ctx->caps_change)
1035     return;
1036
1037   do {
1038     /* When first starting up, the reference stream has to output
1039      * the first buffer to prepare the muxer and sink */
1040     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1041     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1042
1043     if (!(splitmux->max_out_running_time == 0 ||
1044             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1045             splitmux->alignment_threshold == 0 ||
1046             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1047       my_max_out_running_time -= splitmux->alignment_threshold;
1048       GST_LOG_OBJECT (ctx->srcpad,
1049           "Max out running time currently %" GST_STIME_FORMAT
1050           ", with threshold applied it is %" GST_STIME_FORMAT,
1051           GST_STIME_ARGS (splitmux->max_out_running_time),
1052           GST_STIME_ARGS (my_max_out_running_time));
1053     }
1054
1055     if (ctx->flushing
1056         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1057       return;
1058
1059     GST_LOG_OBJECT (ctx->srcpad,
1060         "Checking running time %" GST_STIME_FORMAT " against max %"
1061         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1062         GST_STIME_ARGS (my_max_out_running_time));
1063
1064     if (can_output) {
1065       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1066           ctx->out_running_time < my_max_out_running_time) {
1067         return;
1068       }
1069
1070       switch (splitmux->output_state) {
1071         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1072           /* We only get here if we've finished outputting a GOP and need to know
1073            * what to do next */
1074           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1075           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1076           continue;
1077
1078         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1079           /* We've reached the max out running_time to get here, so end this file now */
1080           if (ctx->out_eos == FALSE) {
1081             if (splitmux->async_finalize) {
1082               /* We must set EOS asynchronously at this point. We cannot defer
1083                * it, because we need all contexts to wake up, for the
1084                * reference context to eventually give us something at
1085                * START_NEXT_FILE. Otherwise, collectpads might choose another
1086                * context to give us the first buffer, and format-location-full
1087                * will not contain a valid sample. */
1088               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1089                   GINT_TO_POINTER (1));
1090               eos_context_async (ctx, splitmux);
1091               if (all_contexts_are_async_eos (splitmux)) {
1092                 GST_INFO_OBJECT (splitmux,
1093                     "All contexts are async_eos. Moving to the next file.");
1094                 /* We can start the next file once we've asked each pad to go EOS */
1095                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1096                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1097                 continue;
1098               }
1099             } else {
1100               send_eos (splitmux, ctx);
1101               continue;
1102             }
1103           } else {
1104             GST_INFO_OBJECT (splitmux,
1105                 "At end-of-file state, but context %p is already EOS", ctx);
1106           }
1107           break;
1108         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1109           if (ctx->is_reference) {
1110             /* Special handling on the reference ctx to start new fragments
1111              * and collect commands from the command queue */
1112             /* drops the splitmux lock briefly: */
1113             /* We must have reference ctx in order for format-location-full to
1114              * have a sample */
1115             start_next_fragment (splitmux, ctx);
1116             continue;
1117           }
1118           break;
1119         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1120           do {
1121             SplitMuxOutputCommand *cmd =
1122                 g_queue_pop_tail (&splitmux->out_cmd_q);
1123             if (cmd != NULL) {
1124               /* If we pop the last command, we need to make our queues bigger */
1125               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1126                 grow_blocked_queues (splitmux);
1127
1128               if (cmd->start_new_fragment) {
1129                 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1130                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1131               } else {
1132                 GST_DEBUG_OBJECT (splitmux,
1133                     "Got new output cmd for time %" GST_STIME_FORMAT,
1134                     GST_STIME_ARGS (cmd->max_output_ts));
1135
1136                 /* Extend the output range immediately */
1137                 splitmux->max_out_running_time = cmd->max_output_ts;
1138                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1139               }
1140               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1141
1142               out_cmd_buf_free (cmd);
1143               break;
1144             } else {
1145               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1146             }
1147           } while (splitmux->output_state ==
1148               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1149           /* loop and re-check the state */
1150           continue;
1151         }
1152         case SPLITMUX_OUTPUT_STATE_STOPPED:
1153           return;
1154       }
1155     }
1156
1157     GST_INFO_OBJECT (ctx->srcpad,
1158         "Sleeping for running time %"
1159         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1160         GST_STIME_ARGS (ctx->out_running_time),
1161         GST_STIME_ARGS (splitmux->max_out_running_time));
1162     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1163     GST_INFO_OBJECT (ctx->srcpad,
1164         "Woken for new max running time %" GST_STIME_FORMAT,
1165         GST_STIME_ARGS (splitmux->max_out_running_time));
1166   }
1167   while (1);
1168 }
1169
1170 static GstClockTime
1171 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1172     const GstVideoTimeCode * cur_tc)
1173 {
1174   GstVideoTimeCode *target_tc;
1175   GstVideoTimeCodeInterval *tc_inter;
1176   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1177
1178   if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1179     return GST_CLOCK_TIME_NONE;
1180
1181   tc_inter =
1182       gst_video_time_code_interval_new_from_string
1183       (splitmux->threshold_timecode_str);
1184   target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1185   gst_video_time_code_interval_free (tc_inter);
1186
1187   /* Convert to ns */
1188   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1189   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1190
1191   /* Add fragment_start_time, accounting for wraparound */
1192   if (target_tc_time >= cur_tc_time) {
1193     next_max_tc_time =
1194         target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1195   } else {
1196     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1197
1198     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1199         (cur_tc->config.fps_d == 1001)) {
1200       /* Checking fps_d is probably unneeded, but better safe than sorry
1201        * (e.g. someone accidentally set a flag) */
1202       GstVideoTimeCode *tc_for_offset;
1203
1204       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1205        * but slightly less. Calculate that duration from a fake timecode. The
1206        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1207        * is to add one frame to 23:59:59;29 */
1208       tc_for_offset =
1209           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1210           NULL, cur_tc->config.flags, 23, 59, 59,
1211           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1212       day_in_ns =
1213           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1214           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1215           cur_tc->config.fps_n);
1216       gst_video_time_code_free (tc_for_offset);
1217     }
1218     next_max_tc_time =
1219         day_in_ns - cur_tc_time + target_tc_time +
1220         splitmux->fragment_start_time;
1221   }
1222
1223   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1224       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1225       GST_TIME_ARGS (cur_tc_time));
1226   gst_video_time_code_free (target_tc);
1227
1228   return next_max_tc_time;
1229 }
1230
1231 static gboolean
1232 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1233 {
1234   GstEvent *ev;
1235   GstClockTime target_time;
1236   gboolean timecode_based = FALSE;
1237
1238   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1239   if (splitmux->threshold_timecode_str) {
1240     GstVideoTimeCodeMeta *tc_meta;
1241
1242     if (buffer != NULL) {
1243       tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1244       if (tc_meta) {
1245         splitmux->next_max_tc_time =
1246             calculate_next_max_timecode (splitmux, &tc_meta->tc);
1247         timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1248       }
1249     } else {
1250       /* This can happen in the presence of GAP events that trigger
1251        * a new fragment start */
1252       GST_WARNING_OBJECT (splitmux,
1253           "No buffer available to calculate next timecode");
1254     }
1255   }
1256
1257   if (splitmux->send_keyframe_requests == FALSE
1258       || (splitmux->threshold_time == 0 && !timecode_based)
1259       || splitmux->threshold_bytes != 0)
1260     return TRUE;
1261
1262   if (timecode_based) {
1263     /* We might have rounding errors: aim slightly earlier */
1264     target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1265   } else {
1266     target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1267   }
1268   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1269   GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1270       GST_TIME_ARGS (target_time));
1271   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1272 }
1273
1274 static GstPadProbeReturn
1275 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1276 {
1277   GstSplitMuxSink *splitmux = ctx->splitmux;
1278   MqStreamBuf *buf_info = NULL;
1279
1280   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1281
1282   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1283   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1284     g_warning ("Buffer list handling not implemented");
1285     return GST_PAD_PROBE_DROP;
1286   }
1287   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1288       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1289     GstEvent *event = gst_pad_probe_info_get_event (info);
1290     gboolean locked = FALSE;
1291
1292     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1293
1294     switch (GST_EVENT_TYPE (event)) {
1295       case GST_EVENT_SEGMENT:
1296         gst_event_copy_segment (event, &ctx->out_segment);
1297         break;
1298       case GST_EVENT_FLUSH_STOP:
1299         GST_SPLITMUX_LOCK (splitmux);
1300         locked = TRUE;
1301         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1302         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1303         g_queue_clear (&ctx->queued_bufs);
1304         ctx->flushing = FALSE;
1305         break;
1306       case GST_EVENT_FLUSH_START:
1307         GST_SPLITMUX_LOCK (splitmux);
1308         locked = TRUE;
1309         GST_LOG_OBJECT (pad, "Flush start");
1310         ctx->flushing = TRUE;
1311         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1312         break;
1313       case GST_EVENT_EOS:
1314         GST_SPLITMUX_LOCK (splitmux);
1315         locked = TRUE;
1316         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1317           goto beach;
1318         ctx->out_eos = TRUE;
1319         GST_INFO_OBJECT (splitmux,
1320             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1321         break;
1322       case GST_EVENT_GAP:{
1323         GstClockTime gap_ts;
1324         GstClockTimeDiff rtime;
1325
1326         gst_event_parse_gap (event, &gap_ts, NULL);
1327         if (gap_ts == GST_CLOCK_TIME_NONE)
1328           break;
1329
1330         GST_SPLITMUX_LOCK (splitmux);
1331         locked = TRUE;
1332
1333         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1334           goto beach;
1335
1336         /* When we get a gap event on the
1337          * reference stream and we're trying to open a
1338          * new file, we need to store it until we get
1339          * the buffer afterwards
1340          */
1341         if (ctx->is_reference &&
1342             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1343           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1344           gst_event_replace (&ctx->pending_gap, event);
1345           GST_SPLITMUX_UNLOCK (splitmux);
1346           return GST_PAD_PROBE_HANDLED;
1347         }
1348
1349         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1350
1351         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1352             GST_STIME_ARGS (rtime));
1353
1354         if (rtime != GST_CLOCK_STIME_NONE) {
1355           ctx->out_running_time = rtime;
1356           complete_or_wait_on_out (splitmux, ctx);
1357         }
1358         break;
1359       }
1360       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1361         const GstStructure *s;
1362         GstClockTimeDiff ts = 0;
1363
1364         s = gst_event_get_structure (event);
1365         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1366           break;
1367
1368         gst_structure_get_int64 (s, "timestamp", &ts);
1369
1370         GST_SPLITMUX_LOCK (splitmux);
1371         locked = TRUE;
1372
1373         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1374           goto beach;
1375         ctx->out_running_time = ts;
1376         if (!ctx->is_reference)
1377           complete_or_wait_on_out (splitmux, ctx);
1378         GST_SPLITMUX_UNLOCK (splitmux);
1379         return GST_PAD_PROBE_DROP;
1380       }
1381       case GST_EVENT_CAPS:{
1382         GstPad *peer;
1383
1384         if (!ctx->is_reference)
1385           break;
1386
1387         peer = gst_pad_get_peer (pad);
1388         if (peer) {
1389           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1390
1391           gst_object_unref (peer);
1392
1393           if (ok)
1394             break;
1395
1396         } else {
1397           break;
1398         }
1399         /* This is in the case the muxer doesn't allow this change of caps */
1400         GST_SPLITMUX_LOCK (splitmux);
1401         locked = TRUE;
1402         ctx->caps_change = TRUE;
1403
1404         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1405           GST_DEBUG_OBJECT (splitmux,
1406               "New caps were not accepted. Switching output file");
1407           if (ctx->out_eos == FALSE) {
1408             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1409             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1410           }
1411         }
1412
1413         /* Lets it fall through, if it fails again, then the muxer just can't
1414          * support this format, but at least we have a closed file.
1415          */
1416         break;
1417       }
1418       default:
1419         break;
1420     }
1421
1422     /* We need to make sure events aren't passed
1423      * until the muxer / sink are ready for it */
1424     if (!locked)
1425       GST_SPLITMUX_LOCK (splitmux);
1426     if (!ctx->is_reference)
1427       complete_or_wait_on_out (splitmux, ctx);
1428     GST_SPLITMUX_UNLOCK (splitmux);
1429
1430     /* Don't try to forward sticky events before the next buffer is there
1431      * because it would cause a new file to be created without the first
1432      * buffer being available.
1433      */
1434     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1435       gst_event_unref (event);
1436       return GST_PAD_PROBE_HANDLED;
1437     } else
1438       return GST_PAD_PROBE_PASS;
1439   }
1440
1441   /* Allow everything through until the configured next stopping point */
1442   GST_SPLITMUX_LOCK (splitmux);
1443
1444   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1445   if (buf_info == NULL)
1446     /* Can only happen due to a poorly timed flush */
1447     goto beach;
1448
1449   /* If we have popped a keyframe, decrement the queued_gop count */
1450   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1451     splitmux->queued_keyframes--;
1452
1453   ctx->out_running_time = buf_info->run_ts;
1454   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1455
1456   GST_LOG_OBJECT (splitmux,
1457       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1458       " size %" G_GUINT64_FORMAT,
1459       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1460
1461   ctx->caps_change = FALSE;
1462
1463   complete_or_wait_on_out (splitmux, ctx);
1464
1465   splitmux->muxed_out_bytes += buf_info->buf_size;
1466
1467 #ifndef GST_DISABLE_GST_DEBUG
1468   {
1469     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1470     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1471         " run ts %" GST_STIME_FORMAT, buf,
1472         GST_STIME_ARGS (ctx->out_running_time));
1473   }
1474 #endif
1475
1476   ctx->cur_out_buffer = NULL;
1477   GST_SPLITMUX_UNLOCK (splitmux);
1478
1479   /* pending_gap is protected by the STREAM lock */
1480   if (ctx->pending_gap) {
1481     /* If we previously stored a gap event, send it now */
1482     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1483
1484     GST_DEBUG_OBJECT (splitmux,
1485         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1486
1487     gst_pad_send_event (peer, ctx->pending_gap);
1488     ctx->pending_gap = NULL;
1489
1490     gst_object_unref (peer);
1491   }
1492
1493   mq_stream_buf_free (buf_info);
1494
1495   return GST_PAD_PROBE_PASS;
1496
1497 beach:
1498   GST_SPLITMUX_UNLOCK (splitmux);
1499   return GST_PAD_PROBE_DROP;
1500 }
1501
1502 static gboolean
1503 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1504 {
1505   return gst_pad_send_event (peer, gst_event_ref (*event));
1506 }
1507
1508 static void
1509 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1510 {
1511   if (ctx->fragment_block_id > 0) {
1512     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1513     ctx->fragment_block_id = 0;
1514   }
1515 }
1516
1517 static void
1518 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1519 {
1520   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1521
1522   gst_pad_sticky_events_foreach (ctx->srcpad,
1523       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1524
1525   /* Clear EOS flag if not actually EOS */
1526   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1527   ctx->out_eos_async_done = ctx->out_eos;
1528
1529   gst_object_unref (peer);
1530 }
1531
1532 static void
1533 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1534 {
1535   GstPad *sinkpad, *srcpad, *newpad;
1536   GstPadTemplate *templ;
1537
1538   srcpad = ctx->srcpad;
1539   sinkpad = gst_pad_get_peer (srcpad);
1540
1541   templ = sinkpad->padtemplate;
1542   newpad =
1543       gst_element_request_pad (splitmux->muxer, templ,
1544       GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1545
1546   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1547       newpad);
1548   if (!gst_pad_unlink (srcpad, sinkpad)) {
1549     gst_object_unref (sinkpad);
1550     goto fail;
1551   }
1552   if (gst_pad_link_full (srcpad, newpad,
1553           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1554     gst_element_release_request_pad (splitmux->muxer, newpad);
1555     gst_object_unref (sinkpad);
1556     gst_object_unref (newpad);
1557     goto fail;
1558   }
1559   gst_object_unref (newpad);
1560   gst_object_unref (sinkpad);
1561   return;
1562
1563 fail:
1564   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1565       ("Could not create the new muxer/sink"), NULL);
1566 }
1567
1568 static GstPadProbeReturn
1569 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1570 {
1571   return GST_PAD_PROBE_OK;
1572 }
1573
1574 static void
1575 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1576 {
1577   ctx->fragment_block_id =
1578       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1579       NULL, NULL);
1580 }
1581
1582 static gboolean
1583 _set_property_from_structure (GQuark field_id, const GValue * value,
1584     gpointer user_data)
1585 {
1586   const gchar *property_name = g_quark_to_string (field_id);
1587   GObject *element = G_OBJECT (user_data);
1588
1589   g_object_set_property (element, property_name, value);
1590
1591   return TRUE;
1592 }
1593
1594 static void
1595 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1596 {
1597   gst_element_set_locked_state (element, TRUE);
1598   gst_element_set_state (element, GST_STATE_NULL);
1599   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1600   gst_bin_remove (GST_BIN (splitmux), element);
1601 }
1602
1603
1604 static void
1605 _send_event (const GValue * value, gpointer user_data)
1606 {
1607   GstPad *pad = g_value_get_object (value);
1608   GstEvent *ev = user_data;
1609
1610   gst_pad_send_event (pad, gst_event_ref (ev));
1611 }
1612
1613 /* Called with lock held when a fragment
1614  * reaches EOS and it is time to restart
1615  * a new fragment
1616  */
1617 static void
1618 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1619 {
1620   GstElement *muxer, *sink;
1621
1622   g_assert (ctx->is_reference);
1623
1624   /* 1 change to new file */
1625   splitmux->switching_fragment = TRUE;
1626
1627   /* We need to drop the splitmux lock to acquire the state lock
1628    * here and ensure there's no racy state change going on elsewhere */
1629   muxer = gst_object_ref (splitmux->muxer);
1630   sink = gst_object_ref (splitmux->active_sink);
1631
1632   GST_SPLITMUX_UNLOCK (splitmux);
1633   GST_STATE_LOCK (splitmux);
1634
1635   if (splitmux->async_finalize) {
1636     if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1637       gchar *newname;
1638       GstElement *new_sink, *new_muxer;
1639
1640       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1641           splitmux->fragment_id);
1642       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1643       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1644       GST_SPLITMUX_LOCK (splitmux);
1645       if ((splitmux->sink =
1646               create_element (splitmux, splitmux->sink_factory, newname,
1647                   TRUE)) == NULL)
1648         goto fail;
1649       if (splitmux->sink_properties)
1650         gst_structure_foreach (splitmux->sink_properties,
1651             _set_property_from_structure, splitmux->sink);
1652       splitmux->active_sink = splitmux->sink;
1653       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1654       g_free (newname);
1655       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1656       if ((splitmux->muxer =
1657               create_element (splitmux, splitmux->muxer_factory, newname,
1658                   TRUE)) == NULL)
1659         goto fail;
1660       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1661               "async") != NULL) {
1662         /* async child elements are causing state change races and weird
1663          * failures, so let's try and turn that off */
1664         g_object_set (splitmux->sink, "async", FALSE, NULL);
1665       }
1666       if (splitmux->muxer_properties)
1667         gst_structure_foreach (splitmux->muxer_properties,
1668             _set_property_from_structure, splitmux->muxer);
1669       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1670       g_free (newname);
1671       new_sink = splitmux->sink;
1672       new_muxer = splitmux->muxer;
1673       GST_SPLITMUX_UNLOCK (splitmux);
1674       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1675       gst_element_link (new_muxer, new_sink);
1676
1677       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1678         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1679                     EOS_FROM_US)) == 2) {
1680           _lock_and_set_to_null (muxer, splitmux);
1681           _lock_and_set_to_null (sink, splitmux);
1682         } else {
1683           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1684               GINT_TO_POINTER (2));
1685         }
1686       }
1687       gst_object_unref (muxer);
1688       gst_object_unref (sink);
1689       muxer = new_muxer;
1690       sink = new_sink;
1691       gst_object_ref (muxer);
1692       gst_object_ref (sink);
1693     }
1694   } else {
1695
1696     gst_element_set_locked_state (muxer, TRUE);
1697     gst_element_set_locked_state (sink, TRUE);
1698     gst_element_set_state (sink, GST_STATE_NULL);
1699
1700     if (splitmux->reset_muxer) {
1701       gst_element_set_state (muxer, GST_STATE_NULL);
1702     } else {
1703       GstIterator *it = gst_element_iterate_sink_pads (muxer);
1704       GstEvent *ev;
1705
1706       ev = gst_event_new_flush_start ();
1707       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1708       gst_event_unref (ev);
1709
1710       gst_iterator_resync (it);
1711
1712       ev = gst_event_new_flush_stop (TRUE);
1713       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1714       gst_event_unref (ev);
1715
1716       gst_iterator_free (it);
1717     }
1718   }
1719
1720   GST_SPLITMUX_LOCK (splitmux);
1721   if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1722     set_next_filename (splitmux, ctx);
1723   splitmux->muxed_out_bytes = 0;
1724   GST_SPLITMUX_UNLOCK (splitmux);
1725
1726   gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1727   gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1728   gst_element_set_locked_state (muxer, FALSE);
1729   gst_element_set_locked_state (sink, FALSE);
1730
1731   gst_object_unref (sink);
1732   gst_object_unref (muxer);
1733
1734   GST_SPLITMUX_LOCK (splitmux);
1735   GST_STATE_UNLOCK (splitmux);
1736   splitmux->switching_fragment = FALSE;
1737   do_async_done (splitmux);
1738
1739   splitmux->ready_for_output = TRUE;
1740
1741   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1742   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1743
1744   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1745
1746   /* FIXME: Is this always the correct next state? */
1747   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1748   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1749   return;
1750
1751 fail:
1752   GST_STATE_UNLOCK (splitmux);
1753   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1754       ("Could not create the new muxer/sink"), NULL);
1755 }
1756
1757 static void
1758 bus_handler (GstBin * bin, GstMessage * message)
1759 {
1760   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1761
1762   switch (GST_MESSAGE_TYPE (message)) {
1763     case GST_MESSAGE_EOS:{
1764       /* If the state is draining out the current file, drop this EOS */
1765       GstElement *sink;
1766
1767       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1768       GST_SPLITMUX_LOCK (splitmux);
1769
1770       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1771
1772       if (splitmux->async_finalize) {
1773
1774         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1775           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1776                       EOS_FROM_US)) == 2) {
1777             GstElement *muxer;
1778             GstPad *sinksink, *muxersrc;
1779
1780             sinksink = gst_element_get_static_pad (sink, "sink");
1781             muxersrc = gst_pad_get_peer (sinksink);
1782             muxer = gst_pad_get_parent_element (muxersrc);
1783             gst_object_unref (sinksink);
1784             gst_object_unref (muxersrc);
1785
1786             gst_element_call_async (muxer,
1787                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1788                 gst_object_ref (splitmux), gst_object_unref);
1789             gst_element_call_async (sink,
1790                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1791                 gst_object_ref (splitmux), gst_object_unref);
1792             gst_object_unref (muxer);
1793           } else {
1794             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1795                 GINT_TO_POINTER (2));
1796           }
1797           GST_DEBUG_OBJECT (splitmux,
1798               "Caught async EOS from previous muxer+sink. Dropping.");
1799           /* We forward the EOS so that it gets aggregated as normal. If the sink
1800            * finishes and is removed before the end, it will be de-aggregated */
1801           gst_message_unref (message);
1802           GST_SPLITMUX_UNLOCK (splitmux);
1803           return;
1804         }
1805       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1806         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1807         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1808         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1809
1810         gst_message_unref (message);
1811         GST_SPLITMUX_UNLOCK (splitmux);
1812         return;
1813       } else {
1814         GST_DEBUG_OBJECT (splitmux,
1815             "Passing EOS message. Output state %d max_out_running_time %"
1816             GST_STIME_FORMAT, splitmux->output_state,
1817             GST_STIME_ARGS (splitmux->max_out_running_time));
1818       }
1819       GST_SPLITMUX_UNLOCK (splitmux);
1820       break;
1821     }
1822     case GST_MESSAGE_ASYNC_START:
1823     case GST_MESSAGE_ASYNC_DONE:
1824       /* Ignore state changes from our children while switching */
1825       GST_SPLITMUX_LOCK (splitmux);
1826       if (splitmux->switching_fragment) {
1827         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1828             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1829           GST_LOG_OBJECT (splitmux,
1830               "Ignoring state change from child %" GST_PTR_FORMAT
1831               " while switching", GST_MESSAGE_SRC (message));
1832           gst_message_unref (message);
1833           GST_SPLITMUX_UNLOCK (splitmux);
1834           return;
1835         }
1836       }
1837       GST_SPLITMUX_UNLOCK (splitmux);
1838       break;
1839     case GST_MESSAGE_WARNING:
1840     {
1841       GError *gerror = NULL;
1842
1843       gst_message_parse_warning (message, &gerror, NULL);
1844
1845       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1846         GList *item;
1847         gboolean caps_change = FALSE;
1848
1849         GST_SPLITMUX_LOCK (splitmux);
1850
1851         for (item = splitmux->contexts; item; item = item->next) {
1852           MqStreamCtx *ctx = item->data;
1853
1854           if (ctx->caps_change) {
1855             caps_change = TRUE;
1856             break;
1857           }
1858         }
1859
1860         GST_SPLITMUX_UNLOCK (splitmux);
1861
1862         if (caps_change) {
1863           GST_LOG_OBJECT (splitmux,
1864               "Ignoring warning change from child %" GST_PTR_FORMAT
1865               " while switching caps", GST_MESSAGE_SRC (message));
1866           gst_message_unref (message);
1867           return;
1868         }
1869       }
1870       break;
1871     }
1872     default:
1873       break;
1874   }
1875
1876   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1877 }
1878
1879 static void
1880 ctx_set_unblock (MqStreamCtx * ctx)
1881 {
1882   ctx->need_unblock = TRUE;
1883 }
1884
1885 static gboolean
1886 need_new_fragment (GstSplitMuxSink * splitmux,
1887     GstClockTime queued_time, GstClockTime queued_gop_time,
1888     guint64 queued_bytes)
1889 {
1890   guint64 thresh_bytes;
1891   GstClockTime thresh_time;
1892   gboolean check_robust_muxing;
1893   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
1894   GstClockTime *ptr_to_time;
1895
1896   GST_OBJECT_LOCK (splitmux);
1897   thresh_bytes = splitmux->threshold_bytes;
1898   thresh_time = splitmux->threshold_time;
1899   ptr_to_time = (GstClockTime *)
1900       gst_queue_array_peek_head_struct (splitmux->times_to_split);
1901   if (ptr_to_time)
1902     time_to_split = *ptr_to_time;
1903   check_robust_muxing = splitmux->use_robust_muxing
1904       && splitmux->muxer_has_reserved_props;
1905   GST_OBJECT_UNLOCK (splitmux);
1906
1907   /* Have we muxed anything into the new file at all? */
1908   if (splitmux->fragment_total_bytes <= 0)
1909     return FALSE;
1910
1911   /* User told us to split now */
1912   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE)
1913     return TRUE;
1914
1915   /* User told us to split at this running time */
1916   if (splitmux->reference_ctx->in_running_time > time_to_split) {
1917     GST_OBJECT_LOCK (splitmux);
1918     /* Dequeue running time */
1919     gst_queue_array_pop_head_struct (splitmux->times_to_split);
1920     /* Empty any running times after this that are past now */
1921     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1922     while (ptr_to_time) {
1923       time_to_split = *ptr_to_time;
1924       if (splitmux->reference_ctx->in_running_time <= time_to_split) {
1925         break;
1926       }
1927       gst_queue_array_pop_head_struct (splitmux->times_to_split);
1928       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1929     }
1930     GST_OBJECT_UNLOCK (splitmux);
1931     return TRUE;
1932   }
1933
1934   if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1935     return TRUE;                /* Would overrun byte limit */
1936
1937   if (thresh_time > 0 && queued_time > thresh_time)
1938     return TRUE;                /* Would overrun byte limit */
1939
1940   /* Timecode-based threshold accounts for possible rounding errors:
1941    * 5us should be bigger than all possible rounding errors but nowhere near
1942    * big enough to skip to another frame */
1943   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1944       splitmux->reference_ctx->in_running_time >
1945       splitmux->next_max_tc_time + 5 * GST_USECOND)
1946     return TRUE;                /* Timecode threshold */
1947
1948   if (check_robust_muxing) {
1949     GstClockTime mux_reserved_remain;
1950
1951     g_object_get (splitmux->muxer,
1952         "reserved-duration-remaining", &mux_reserved_remain, NULL);
1953
1954     GST_LOG_OBJECT (splitmux,
1955         "Muxer robust muxing report - %" G_GUINT64_FORMAT
1956         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1957         mux_reserved_remain, queued_gop_time);
1958
1959     if (queued_gop_time >= mux_reserved_remain) {
1960       GST_INFO_OBJECT (splitmux,
1961           "File is about to run out of header room - %" G_GUINT64_FORMAT
1962           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1963           ". Switching to new file", mux_reserved_remain, queued_gop_time);
1964       return TRUE;
1965     }
1966   }
1967
1968   /* Continue and mux this GOP */
1969   return FALSE;
1970 }
1971
1972 /* Called with splitmux lock held */
1973 /* Called when entering ProcessingCompleteGop state
1974  * Assess if mq contents overflowed the current file
1975  *   -> If yes, need to switch to new file
1976  *   -> if no, set max_out_running_time to let this GOP in and
1977  *      go to COLLECTING_GOP_START state
1978  */
1979 static void
1980 handle_gathered_gop (GstSplitMuxSink * splitmux)
1981 {
1982   guint64 queued_bytes;
1983   GstClockTimeDiff queued_time = 0;
1984   GstClockTimeDiff queued_gop_time = 0;
1985   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1986   SplitMuxOutputCommand *cmd;
1987
1988   /* Assess if the multiqueue contents overflowed the current file */
1989   /* When considering if a newly gathered GOP overflows
1990    * the time limit for the file, only consider the running time of the
1991    * reference stream. Other streams might have run ahead a little bit,
1992    * but extra pieces won't be released to the muxer beyond the reference
1993    * stream cut-off anyway - so it forms the limit. */
1994   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1995   queued_time = splitmux->reference_ctx->in_running_time;
1996   /* queued_gop_time tracks how much unwritten data there is waiting to
1997    * be written to this fragment including this GOP */
1998   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
1999     queued_gop_time =
2000         splitmux->reference_ctx->in_running_time -
2001         splitmux->reference_ctx->out_running_time;
2002   else
2003     queued_gop_time =
2004         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2005
2006   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2007
2008   g_assert (queued_gop_time >= 0);
2009   g_assert (queued_time >= splitmux->fragment_start_time);
2010
2011   queued_time -= splitmux->fragment_start_time;
2012   if (queued_time < queued_gop_time)
2013     queued_gop_time = queued_time;
2014
2015   /* Expand queued bytes estimate by muxer overhead */
2016   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2017
2018   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2019       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
2020   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
2021     GST_LOG_OBJECT (splitmux,
2022         "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
2023         GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
2024         GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
2025   }
2026
2027   /* Check for overrun - have we output at least one byte and overrun
2028    * either threshold? */
2029   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2030     GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2031     *sink_running_time = splitmux->reference_ctx->out_running_time;
2032     g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2033         RUNNING_TIME, sink_running_time, g_free);
2034     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2035     /* Tell the output side to start a new fragment */
2036     GST_INFO_OBJECT (splitmux,
2037         "This GOP (dur %" GST_STIME_FORMAT
2038         ") would overflow the fragment, Sending start_new_fragment cmd",
2039         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2040             splitmux->gop_start_time));
2041     cmd = out_cmd_buf_new ();
2042     cmd->start_new_fragment = TRUE;
2043     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2044     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2045
2046     new_out_ts = splitmux->reference_ctx->in_running_time;
2047     splitmux->fragment_start_time = splitmux->gop_start_time;
2048     splitmux->fragment_total_bytes = 0;
2049
2050     if (request_next_keyframe (splitmux,
2051             splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
2052       GST_WARNING_OBJECT (splitmux,
2053           "Could not request a keyframe. Files may not split at the exact location they should");
2054     }
2055     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2056   }
2057
2058   /* And set up to collect the next GOP */
2059   if (!splitmux->reference_ctx->in_eos) {
2060     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2061     splitmux->gop_start_time = new_out_ts;
2062   } else {
2063     /* This is probably already the current state, but just in case: */
2064     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2065     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
2066   }
2067
2068   /* And wake all input contexts to send a wake-up event */
2069   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2070   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2071
2072   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2073   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2074
2075   if (splitmux->gop_total_bytes > 0) {
2076     GST_LOG_OBJECT (splitmux,
2077         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2078         " time %" GST_STIME_FORMAT,
2079         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2080
2081     /* Send this GOP to the output command queue */
2082     cmd = out_cmd_buf_new ();
2083     cmd->start_new_fragment = FALSE;
2084     cmd->max_output_ts = new_out_ts;
2085     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2086         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2087     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2088
2089     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2090   }
2091
2092   splitmux->gop_total_bytes = 0;
2093 }
2094
2095 /* Called with splitmux lock held */
2096 /* Called from each input pad when it is has all the pieces
2097  * for a GOP or EOS, starting with the reference pad which has set the
2098  * splitmux->max_in_running_time
2099  */
2100 static void
2101 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2102 {
2103   GList *cur;
2104   GstEvent *event;
2105
2106   /* On ENDING_FILE, the reference stream sends a command to start a new
2107    * fragment, then releases the GOP for output in the new fragment.
2108    *  If somes streams received no buffer during the last GOP that overran,
2109    * because its next buffer has a timestamp bigger than
2110    * ctx->max_in_running_time, its queue is empty. In that case the only
2111    * way to wakeup the output thread is by injecting an event in the
2112    * queue. This usually happen with subtitle streams.
2113    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2114   if (ctx->need_unblock) {
2115     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2116     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2117         GST_EVENT_TYPE_SERIALIZED,
2118         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2119             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2120
2121     GST_SPLITMUX_UNLOCK (splitmux);
2122     gst_pad_send_event (ctx->sinkpad, event);
2123     GST_SPLITMUX_LOCK (splitmux);
2124
2125     ctx->need_unblock = FALSE;
2126     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2127     /* state may have changed while we were unlocked. Loop again if so */
2128     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2129       return;
2130   }
2131
2132   if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2133     gboolean ready = TRUE;
2134
2135     /* Iterate each pad, and check that the input running time is at least
2136      * up to the reference running time, and if so handle the collected GOP */
2137     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2138         GST_STIME_FORMAT " ctx %p",
2139         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2140     for (cur = g_list_first (splitmux->contexts); cur != NULL;
2141         cur = g_list_next (cur)) {
2142       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2143
2144       GST_LOG_OBJECT (splitmux,
2145           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2146           " EOS %d", tmpctx, tmpctx->srcpad,
2147           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2148
2149       if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2150           tmpctx->in_running_time < splitmux->max_in_running_time &&
2151           !tmpctx->in_eos) {
2152         GST_LOG_OBJECT (splitmux,
2153             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2154             tmpctx, tmpctx->srcpad);
2155         ready = FALSE;
2156         break;
2157       }
2158     }
2159     if (ready) {
2160       GST_DEBUG_OBJECT (splitmux,
2161           "Collected GOP is complete. Processing (ctx %p)", ctx);
2162       /* All pads have a complete GOP, release it into the multiqueue */
2163       handle_gathered_gop (splitmux);
2164
2165       /* The user has requested a split, we can split now that the previous GOP
2166        * has been collected to the correct location */
2167       if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE,
2168               FALSE)) {
2169         g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2170       }
2171     }
2172   }
2173
2174   /* If upstream reached EOS we are not expecting more data, no need to wait
2175    * here. */
2176   if (ctx->in_eos)
2177     return;
2178
2179   /* Some pad is not yet ready, or GOP is being pushed
2180    * either way, sleep and wait to get woken */
2181   while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2182       !ctx->flushing &&
2183       (ctx->in_running_time >= splitmux->max_in_running_time) &&
2184       (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2185
2186     GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2187     GST_SPLITMUX_WAIT_INPUT (splitmux);
2188     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2189   }
2190 }
2191
2192 static GstPadProbeReturn
2193 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2194 {
2195   GstSplitMuxSink *splitmux = ctx->splitmux;
2196   GstBuffer *buf;
2197   MqStreamBuf *buf_info = NULL;
2198   GstClockTime ts;
2199   gboolean loop_again;
2200   gboolean keyframe = FALSE;
2201
2202   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2203
2204   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2205   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2206     g_warning ("Buffer list handling not implemented");
2207     return GST_PAD_PROBE_DROP;
2208   }
2209   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2210       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2211     GstEvent *event = gst_pad_probe_info_get_event (info);
2212
2213     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2214
2215     switch (GST_EVENT_TYPE (event)) {
2216       case GST_EVENT_SEGMENT:
2217         gst_event_copy_segment (event, &ctx->in_segment);
2218         break;
2219       case GST_EVENT_FLUSH_STOP:
2220         GST_SPLITMUX_LOCK (splitmux);
2221         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2222         ctx->in_eos = FALSE;
2223         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2224         GST_SPLITMUX_UNLOCK (splitmux);
2225         break;
2226       case GST_EVENT_EOS:
2227         GST_SPLITMUX_LOCK (splitmux);
2228         ctx->in_eos = TRUE;
2229
2230         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2231           goto beach;
2232
2233         if (ctx->is_reference) {
2234           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2235           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2236           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2237           /* Wake up other input pads to collect this GOP */
2238           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2239           check_completed_gop (splitmux, ctx);
2240         } else if (splitmux->input_state ==
2241             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2242           /* If we are waiting for a GOP to be completed (ie, for aux
2243            * pads to catch up), then this pad is complete, so check
2244            * if the whole GOP is.
2245            */
2246           check_completed_gop (splitmux, ctx);
2247         }
2248         GST_SPLITMUX_UNLOCK (splitmux);
2249         break;
2250       case GST_EVENT_GAP:{
2251         GstClockTime gap_ts;
2252         GstClockTimeDiff rtime;
2253
2254         gst_event_parse_gap (event, &gap_ts, NULL);
2255         if (gap_ts == GST_CLOCK_TIME_NONE)
2256           break;
2257
2258         GST_SPLITMUX_LOCK (splitmux);
2259
2260         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2261           goto beach;
2262         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2263
2264         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2265             GST_STIME_ARGS (rtime));
2266
2267         if (ctx->is_reference
2268             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2269           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2270           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2271               GST_STIME_ARGS (splitmux->fragment_start_time));
2272           /* Also take this as the first start time when starting up,
2273            * so that we start counting overflow from the first frame */
2274           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2275             splitmux->max_in_running_time = splitmux->fragment_start_time;
2276         }
2277
2278         GST_SPLITMUX_UNLOCK (splitmux);
2279         break;
2280       }
2281       default:
2282         break;
2283     }
2284     return GST_PAD_PROBE_PASS;
2285   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2286     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2287       case GST_QUERY_ALLOCATION:
2288         return GST_PAD_PROBE_DROP;
2289       default:
2290         return GST_PAD_PROBE_PASS;
2291     }
2292   }
2293
2294   buf = gst_pad_probe_info_get_buffer (info);
2295   buf_info = mq_stream_buf_new ();
2296
2297   if (GST_BUFFER_PTS_IS_VALID (buf))
2298     ts = GST_BUFFER_PTS (buf);
2299   else
2300     ts = GST_BUFFER_DTS (buf);
2301
2302   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2303
2304   GST_SPLITMUX_LOCK (splitmux);
2305
2306   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2307     goto beach;
2308
2309   /* If this buffer has a timestamp, advance the input timestamp of the
2310    * stream */
2311   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2312     GstClockTimeDiff running_time =
2313         my_segment_to_running_time (&ctx->in_segment, ts);
2314
2315     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2316         GST_STIME_ARGS (running_time));
2317
2318     if (GST_CLOCK_STIME_IS_VALID (running_time)
2319         && running_time > ctx->in_running_time)
2320       ctx->in_running_time = running_time;
2321   }
2322
2323   /* Try to make sure we have a valid running time */
2324   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2325     ctx->in_running_time =
2326         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2327   }
2328
2329   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2330       GST_STIME_ARGS (ctx->in_running_time));
2331
2332   buf_info->run_ts = ctx->in_running_time;
2333   buf_info->buf_size = gst_buffer_get_size (buf);
2334   buf_info->duration = GST_BUFFER_DURATION (buf);
2335
2336   /* initialize fragment_start_time */
2337   if (ctx->is_reference
2338       && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2339     splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2340     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2341         GST_STIME_ARGS (splitmux->fragment_start_time));
2342     gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2343
2344     /* Also take this as the first start time when starting up,
2345      * so that we start counting overflow from the first frame */
2346     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2347       splitmux->max_in_running_time = splitmux->fragment_start_time;
2348     if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2349       GST_WARNING_OBJECT (splitmux,
2350           "Could not request a keyframe. Files may not split at the exact location they should");
2351     }
2352     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2353   }
2354
2355   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2356       " total GOP bytes %" G_GUINT64_FORMAT,
2357       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2358
2359   loop_again = TRUE;
2360   do {
2361     if (ctx->flushing)
2362       break;
2363
2364     switch (splitmux->input_state) {
2365       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2366         if (ctx->is_reference) {
2367           /* This is the reference context. If it's a keyframe,
2368            * it marks the start of a new GOP and we should wait in
2369            * check_completed_gop before continuing, but either way
2370            * (keyframe or no, we'll pass this buffer through after
2371            * so set loop_again to FALSE */
2372           loop_again = FALSE;
2373
2374           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2375             /* Allow other input pads to catch up to here too */
2376             splitmux->max_in_running_time = ctx->in_running_time;
2377             GST_LOG_OBJECT (splitmux,
2378                 "Max in running time now %" GST_TIME_FORMAT,
2379                 GST_TIME_ARGS (splitmux->max_in_running_time));
2380             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2381             break;
2382           }
2383           GST_INFO_OBJECT (pad,
2384               "Have keyframe with running time %" GST_STIME_FORMAT,
2385               GST_STIME_ARGS (ctx->in_running_time));
2386           keyframe = TRUE;
2387           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2388           splitmux->max_in_running_time = ctx->in_running_time;
2389           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2390               GST_TIME_ARGS (splitmux->max_in_running_time));
2391           /* Wake up other input pads to collect this GOP */
2392           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2393           check_completed_gop (splitmux, ctx);
2394           /* Store this new keyframe to remember the start of GOP */
2395           gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2396         } else {
2397           /* Pass this buffer if the reference ctx is far enough ahead */
2398           if (ctx->in_running_time < splitmux->max_in_running_time) {
2399             loop_again = FALSE;
2400             break;
2401           }
2402
2403           /* We're still waiting for a keyframe on the reference pad, sleep */
2404           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2405           GST_SPLITMUX_WAIT_INPUT (splitmux);
2406           GST_LOG_OBJECT (pad,
2407               "Done sleeping for GOP start input state now %d",
2408               splitmux->input_state);
2409         }
2410         break;
2411       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2412         /* We're collecting a GOP. If this is the reference context,
2413          * we need to check if this is a keyframe that marks the start
2414          * of the next GOP. If it is, it marks the end of the GOP we're
2415          * collecting, so sleep and wait until all the other pads also
2416          * reach that timestamp - at which point, we have an entire GOP
2417          * and either go to ENDING_FILE or release this GOP to the muxer and
2418          * go back to COLLECT_GOP_START. */
2419
2420         /* If we overran the target timestamp, it might be time to process
2421          * the GOP, otherwise bail out for more data
2422          */
2423         GST_LOG_OBJECT (pad,
2424             "Checking TS %" GST_STIME_FORMAT " against max %"
2425             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2426             GST_STIME_ARGS (splitmux->max_in_running_time));
2427
2428         if (ctx->in_running_time < splitmux->max_in_running_time) {
2429           loop_again = FALSE;
2430           break;
2431         }
2432
2433         GST_LOG_OBJECT (pad,
2434             "Collected last packet of GOP. Checking other pads");
2435         check_completed_gop (splitmux, ctx);
2436         break;
2437       }
2438       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2439         loop_again = FALSE;
2440         break;
2441       default:
2442         loop_again = FALSE;
2443         break;
2444     }
2445   }
2446   while (loop_again);
2447
2448   if (keyframe) {
2449     splitmux->queued_keyframes++;
2450     buf_info->keyframe = TRUE;
2451   }
2452
2453   /* Update total input byte counter for overflow detect */
2454   splitmux->gop_total_bytes += buf_info->buf_size;
2455
2456   /* Now add this buffer to the queue just before returning */
2457   g_queue_push_head (&ctx->queued_bufs, buf_info);
2458
2459   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2460       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2461
2462   GST_SPLITMUX_UNLOCK (splitmux);
2463   return GST_PAD_PROBE_PASS;
2464
2465 beach:
2466   GST_SPLITMUX_UNLOCK (splitmux);
2467   if (buf_info)
2468     mq_stream_buf_free (buf_info);
2469   return GST_PAD_PROBE_PASS;
2470 }
2471
2472 static void
2473 grow_blocked_queues (GstSplitMuxSink * splitmux)
2474 {
2475   GList *cur;
2476
2477   /* Scan other queues for full-ness and grow them */
2478   for (cur = g_list_first (splitmux->contexts);
2479       cur != NULL; cur = g_list_next (cur)) {
2480     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2481     guint cur_limit;
2482     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2483
2484     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2485     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2486
2487     if (cur_len >= cur_limit) {
2488       cur_limit = cur_len + 1;
2489       GST_DEBUG_OBJECT (tmpctx->q,
2490           "Queue overflowed and needs enlarging. Growing to %u buffers",
2491           cur_limit);
2492       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2493     }
2494   }
2495 }
2496
2497 static void
2498 handle_q_underrun (GstElement * q, gpointer user_data)
2499 {
2500   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2501   GstSplitMuxSink *splitmux = ctx->splitmux;
2502
2503   GST_SPLITMUX_LOCK (splitmux);
2504   GST_DEBUG_OBJECT (q,
2505       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2506       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2507   grow_blocked_queues (splitmux);
2508   GST_SPLITMUX_UNLOCK (splitmux);
2509 }
2510
2511 static void
2512 handle_q_overrun (GstElement * q, gpointer user_data)
2513 {
2514   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2515   GstSplitMuxSink *splitmux = ctx->splitmux;
2516   gboolean allow_grow = FALSE;
2517
2518   GST_SPLITMUX_LOCK (splitmux);
2519   GST_DEBUG_OBJECT (q,
2520       "Queue reported overrun with %d keyframes and %d cmds enqueued",
2521       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2522
2523   if (splitmux->queued_keyframes < 2) {
2524     /* Less than a full GOP queued, grow the queue */
2525     allow_grow = TRUE;
2526   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2527     allow_grow = TRUE;
2528   } else {
2529     /* If another queue is starved, grow */
2530     GList *cur;
2531     for (cur = g_list_first (splitmux->contexts);
2532         cur != NULL; cur = g_list_next (cur)) {
2533       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2534       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2535         allow_grow = TRUE;
2536       }
2537     }
2538   }
2539   GST_SPLITMUX_UNLOCK (splitmux);
2540
2541   if (allow_grow) {
2542     guint cur_limit;
2543
2544     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2545     cur_limit++;
2546
2547     GST_DEBUG_OBJECT (q,
2548         "Queue overflowed and needs enlarging. Growing to %u buffers",
2549         cur_limit);
2550
2551     g_object_set (q, "max-size-buffers", cur_limit, NULL);
2552   }
2553 }
2554
2555 static GstPad *
2556 gst_splitmux_sink_request_new_pad (GstElement * element,
2557     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2558 {
2559   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2560   GstPadTemplate *mux_template = NULL;
2561   GstPad *res = NULL;
2562   GstElement *q;
2563   GstPad *q_sink = NULL, *q_src = NULL;
2564   gchar *gname;
2565   gboolean is_video = FALSE;
2566   MqStreamCtx *ctx;
2567
2568   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2569
2570   GST_SPLITMUX_LOCK (splitmux);
2571   if (!create_muxer (splitmux))
2572     goto fail;
2573   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2574
2575   if (templ->name_template) {
2576     if (g_str_equal (templ->name_template, "video")) {
2577       if (splitmux->have_video)
2578         goto already_have_video;
2579
2580       /* FIXME: Look for a pad template with matching caps, rather than by name */
2581       GST_DEBUG_OBJECT (element,
2582           "searching for pad-template with name 'video_%%u'");
2583       mux_template =
2584           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2585           (splitmux->muxer), "video_%u");
2586
2587       /* Fallback to find sink pad templates named 'video' (flvmux) */
2588       if (!mux_template) {
2589         GST_DEBUG_OBJECT (element,
2590             "searching for pad-template with name 'video'");
2591         mux_template =
2592             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2593             (splitmux->muxer), "video");
2594       }
2595       is_video = TRUE;
2596       name = NULL;
2597     } else {
2598       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
2599           templ->name_template);
2600       mux_template =
2601           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2602           (splitmux->muxer), templ->name_template);
2603
2604       /* Fallback to find sink pad templates named 'audio' (flvmux) */
2605       if (!mux_template) {
2606         GST_DEBUG_OBJECT (element,
2607             "searching for pad-template with name 'audio'");
2608         mux_template =
2609             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2610             (splitmux->muxer), "audio");
2611         name = NULL;
2612       }
2613     }
2614
2615     if (mux_template == NULL) {
2616       GST_DEBUG_OBJECT (element,
2617           "searching for pad-template with name 'sink_%%d'");
2618       mux_template =
2619           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2620           (splitmux->muxer), "sink_%d");
2621       name = NULL;
2622     }
2623     if (mux_template == NULL) {
2624       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
2625       mux_template =
2626           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2627           (splitmux->muxer), "sink");
2628       name = NULL;
2629     }
2630   }
2631
2632   if (mux_template == NULL) {
2633     GST_ERROR_OBJECT (element,
2634         "unable to find a suitable sink pad-template on the muxer");
2635
2636     goto fail;
2637   }
2638   GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
2639       mux_template->name_template);
2640
2641   if (mux_template->presence == GST_PAD_REQUEST) {
2642     GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
2643
2644     res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2645     if (res == NULL)
2646       goto fail;
2647   } else if (mux_template->presence == GST_PAD_ALWAYS) {
2648     GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
2649
2650     res =
2651         gst_element_get_static_pad (splitmux->muxer,
2652         mux_template->name_template);
2653     if (res == NULL)
2654       goto fail;
2655   } else {
2656     GST_ERROR_OBJECT (element,
2657         "unexpected pad presence %d", mux_template->presence);
2658
2659     goto fail;
2660   }
2661
2662   if (is_video)
2663     gname = g_strdup ("video");
2664   else if (name == NULL)
2665     gname = gst_pad_get_name (res);
2666   else
2667     gname = g_strdup (name);
2668
2669   if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2670     goto fail;
2671
2672   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2673
2674   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2675       "max-size-buffers", 5, NULL);
2676
2677   q_sink = gst_element_get_static_pad (q, "sink");
2678   q_src = gst_element_get_static_pad (q, "src");
2679
2680   if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2681     gst_element_release_request_pad (splitmux->muxer, res);
2682     gst_object_unref (GST_OBJECT (res));
2683     goto fail;
2684   }
2685
2686   gst_object_unref (GST_OBJECT (res));
2687
2688   ctx = mq_stream_ctx_new (splitmux);
2689   /* Context holds a ref: */
2690   ctx->q = gst_object_ref (q);
2691   ctx->srcpad = q_src;
2692   ctx->sinkpad = q_sink;
2693   ctx->q_overrun_id =
2694       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2695   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2696
2697   ctx->src_pad_block_id =
2698       gst_pad_add_probe (q_src,
2699       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2700       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2701   if (is_video && splitmux->reference_ctx != NULL) {
2702     splitmux->reference_ctx->is_reference = FALSE;
2703     splitmux->reference_ctx = NULL;
2704   }
2705   if (splitmux->reference_ctx == NULL) {
2706     splitmux->reference_ctx = ctx;
2707     ctx->is_reference = TRUE;
2708   }
2709
2710   res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2711   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2712
2713   ctx->sink_pad_block_id =
2714       gst_pad_add_probe (q_sink,
2715       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2716       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2717       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2718
2719   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2720       " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2721
2722   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2723
2724   g_free (gname);
2725
2726   if (is_video)
2727     splitmux->have_video = TRUE;
2728
2729   gst_pad_set_active (res, TRUE);
2730   gst_element_add_pad (element, res);
2731
2732   GST_SPLITMUX_UNLOCK (splitmux);
2733
2734   return res;
2735 fail:
2736   GST_SPLITMUX_UNLOCK (splitmux);
2737
2738   if (q_sink)
2739     gst_object_unref (q_sink);
2740   if (q_src)
2741     gst_object_unref (q_src);
2742   return NULL;
2743 already_have_video:
2744   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2745   GST_SPLITMUX_UNLOCK (splitmux);
2746   return NULL;
2747 }
2748
2749 static void
2750 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2751 {
2752   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2753   GstPad *muxpad = NULL;
2754   MqStreamCtx *ctx =
2755       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2756
2757   GST_SPLITMUX_LOCK (splitmux);
2758
2759   if (splitmux->muxer == NULL)
2760     goto fail;                  /* Elements don't exist yet - nothing to release */
2761
2762   GST_INFO_OBJECT (pad, "releasing request pad");
2763
2764   muxpad = gst_pad_get_peer (ctx->srcpad);
2765
2766   /* Remove the context from our consideration */
2767   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2768
2769   if (ctx->sink_pad_block_id)
2770     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2771
2772   if (ctx->src_pad_block_id)
2773     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2774
2775   /* Can release the context now */
2776   mq_stream_ctx_free (ctx);
2777   if (ctx == splitmux->reference_ctx)
2778     splitmux->reference_ctx = NULL;
2779
2780   /* Release and free the muxer input */
2781   if (muxpad) {
2782     gst_element_release_request_pad (splitmux->muxer, muxpad);
2783     gst_object_unref (muxpad);
2784   }
2785
2786   if (GST_PAD_PAD_TEMPLATE (pad) &&
2787       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2788               (pad)), "video"))
2789     splitmux->have_video = FALSE;
2790
2791   gst_element_remove_pad (element, pad);
2792
2793   /* Reset the internal elements only after all request pads are released */
2794   if (splitmux->contexts == NULL)
2795     gst_splitmux_reset (splitmux);
2796
2797 fail:
2798   GST_SPLITMUX_UNLOCK (splitmux);
2799 }
2800
2801 static GstElement *
2802 create_element (GstSplitMuxSink * splitmux,
2803     const gchar * factory, const gchar * name, gboolean locked)
2804 {
2805   GstElement *ret = gst_element_factory_make (factory, name);
2806   if (ret == NULL) {
2807     g_warning ("Failed to create %s - splitmuxsink will not work", name);
2808     return NULL;
2809   }
2810
2811   if (locked) {
2812     /* Ensure the sink starts in locked state and NULL - it will be changed
2813      * by the filename setting code */
2814     gst_element_set_locked_state (ret, TRUE);
2815     gst_element_set_state (ret, GST_STATE_NULL);
2816   }
2817
2818   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2819     g_warning ("Could not add %s element - splitmuxsink will not work", name);
2820     gst_object_unref (ret);
2821     return NULL;
2822   }
2823
2824   return ret;
2825 }
2826
2827 static gboolean
2828 create_muxer (GstSplitMuxSink * splitmux)
2829 {
2830   /* Create internal elements */
2831   if (splitmux->muxer == NULL) {
2832     GstElement *provided_muxer = NULL;
2833
2834     GST_OBJECT_LOCK (splitmux);
2835     if (splitmux->provided_muxer != NULL)
2836       provided_muxer = gst_object_ref (splitmux->provided_muxer);
2837     GST_OBJECT_UNLOCK (splitmux);
2838
2839     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2840         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2841       if ((splitmux->muxer =
2842               create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2843         goto fail;
2844     } else if (splitmux->async_finalize) {
2845       if ((splitmux->muxer =
2846               create_element (splitmux, splitmux->muxer_factory, "muxer",
2847                   FALSE)) == NULL)
2848         goto fail;
2849       if (splitmux->muxer_properties)
2850         gst_structure_foreach (splitmux->muxer_properties,
2851             _set_property_from_structure, splitmux->muxer);
2852     } else {
2853       /* Ensure it's not in locked state (we might be reusing an old element) */
2854       gst_element_set_locked_state (provided_muxer, FALSE);
2855       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2856         g_warning ("Could not add muxer element - splitmuxsink will not work");
2857         gst_object_unref (provided_muxer);
2858         goto fail;
2859       }
2860
2861       splitmux->muxer = provided_muxer;
2862       gst_object_unref (provided_muxer);
2863     }
2864
2865     if (splitmux->use_robust_muxing) {
2866       update_muxer_properties (splitmux);
2867     }
2868   }
2869
2870   return TRUE;
2871 fail:
2872   return FALSE;
2873 }
2874
2875 static GstElement *
2876 find_sink (GstElement * e)
2877 {
2878   GstElement *res = NULL;
2879   GstIterator *iter;
2880   gboolean done = FALSE;
2881   GValue data = { 0, };
2882
2883   if (!GST_IS_BIN (e))
2884     return e;
2885
2886   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2887     return e;
2888
2889   iter = gst_bin_iterate_sinks (GST_BIN (e));
2890   while (!done) {
2891     switch (gst_iterator_next (iter, &data)) {
2892       case GST_ITERATOR_OK:
2893       {
2894         GstElement *child = g_value_get_object (&data);
2895         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2896                 "location") != NULL) {
2897           res = child;
2898           done = TRUE;
2899         }
2900         g_value_reset (&data);
2901         break;
2902       }
2903       case GST_ITERATOR_RESYNC:
2904         gst_iterator_resync (iter);
2905         break;
2906       case GST_ITERATOR_DONE:
2907         done = TRUE;
2908         break;
2909       case GST_ITERATOR_ERROR:
2910         g_assert_not_reached ();
2911         break;
2912     }
2913   }
2914   g_value_unset (&data);
2915   gst_iterator_free (iter);
2916
2917   return res;
2918 }
2919
2920 static gboolean
2921 create_sink (GstSplitMuxSink * splitmux)
2922 {
2923   GstElement *provided_sink = NULL;
2924
2925   if (splitmux->active_sink == NULL) {
2926
2927     GST_OBJECT_LOCK (splitmux);
2928     if (splitmux->provided_sink != NULL)
2929       provided_sink = gst_object_ref (splitmux->provided_sink);
2930     GST_OBJECT_UNLOCK (splitmux);
2931
2932     if ((!splitmux->async_finalize && provided_sink == NULL) ||
2933         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2934       if ((splitmux->sink =
2935               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2936         goto fail;
2937       splitmux->active_sink = splitmux->sink;
2938     } else if (splitmux->async_finalize) {
2939       if ((splitmux->sink =
2940               create_element (splitmux, splitmux->sink_factory, "sink",
2941                   TRUE)) == NULL)
2942         goto fail;
2943       if (splitmux->sink_properties)
2944         gst_structure_foreach (splitmux->sink_properties,
2945             _set_property_from_structure, splitmux->sink);
2946       splitmux->active_sink = splitmux->sink;
2947     } else {
2948       /* Ensure the sink starts in locked state and NULL - it will be changed
2949        * by the filename setting code */
2950       gst_element_set_locked_state (provided_sink, TRUE);
2951       gst_element_set_state (provided_sink, GST_STATE_NULL);
2952       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2953         g_warning ("Could not add sink elements - splitmuxsink will not work");
2954         gst_object_unref (provided_sink);
2955         goto fail;
2956       }
2957
2958       splitmux->active_sink = provided_sink;
2959
2960       /* The bin holds a ref now, we can drop our tmp ref */
2961       gst_object_unref (provided_sink);
2962
2963       /* Find the sink element */
2964       splitmux->sink = find_sink (splitmux->active_sink);
2965       if (splitmux->sink == NULL) {
2966         g_warning
2967             ("Could not locate sink element in provided sink - splitmuxsink will not work");
2968         goto fail;
2969       }
2970     }
2971
2972 #if 1
2973     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2974             "async") != NULL) {
2975       /* async child elements are causing state change races and weird
2976        * failures, so let's try and turn that off */
2977       g_object_set (splitmux->sink, "async", FALSE, NULL);
2978     }
2979 #endif
2980
2981     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2982       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2983       goto fail;
2984     }
2985   }
2986
2987   return TRUE;
2988 fail:
2989   return FALSE;
2990 }
2991
2992 #ifdef __GNUC__
2993 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2994 #endif
2995 static void
2996 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2997 {
2998   gchar *fname = NULL;
2999   GstSample *sample;
3000   GstCaps *caps;
3001
3002   gst_splitmux_sink_ensure_max_files (splitmux);
3003
3004   if (ctx->cur_out_buffer == NULL) {
3005     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3006   }
3007
3008   caps = gst_pad_get_current_caps (ctx->srcpad);
3009   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3010   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3011       splitmux->fragment_id, sample, &fname);
3012   gst_sample_unref (sample);
3013   if (caps)
3014     gst_caps_unref (caps);
3015
3016   if (fname == NULL) {
3017     /* Fallback to the old signal if the new one returned nothing */
3018     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3019         splitmux->fragment_id, &fname);
3020   }
3021
3022   if (!fname)
3023     fname = splitmux->location ?
3024         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3025
3026   if (fname) {
3027     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3028     g_object_set (splitmux->sink, "location", fname, NULL);
3029     g_free (fname);
3030
3031     splitmux->fragment_id++;
3032   }
3033 }
3034
3035 static void
3036 do_async_start (GstSplitMuxSink * splitmux)
3037 {
3038   GstMessage *message;
3039
3040   if (!splitmux->need_async_start) {
3041     GST_INFO_OBJECT (splitmux, "no async_start needed");
3042     return;
3043   }
3044
3045   splitmux->async_pending = TRUE;
3046
3047   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3048   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3049   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3050       (splitmux), message);
3051 }
3052
3053 static void
3054 do_async_done (GstSplitMuxSink * splitmux)
3055 {
3056   GstMessage *message;
3057
3058   if (splitmux->async_pending) {
3059     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3060     message =
3061         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3062         GST_CLOCK_TIME_NONE);
3063     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3064         (splitmux), message);
3065
3066     splitmux->async_pending = FALSE;
3067   }
3068
3069   splitmux->need_async_start = FALSE;
3070 }
3071
3072 static GstStateChangeReturn
3073 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3074 {
3075   GstStateChangeReturn ret;
3076   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3077
3078   switch (transition) {
3079     case GST_STATE_CHANGE_NULL_TO_READY:{
3080       GST_SPLITMUX_LOCK (splitmux);
3081       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3082         ret = GST_STATE_CHANGE_FAILURE;
3083         GST_SPLITMUX_UNLOCK (splitmux);
3084         goto beach;
3085       }
3086       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3087       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3088       GST_SPLITMUX_UNLOCK (splitmux);
3089       splitmux->fragment_id = 0;
3090       break;
3091     }
3092     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3093       GST_SPLITMUX_LOCK (splitmux);
3094       /* Start by collecting one input on each pad */
3095       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3096       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3097       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3098       splitmux->gop_start_time = splitmux->fragment_start_time =
3099           GST_CLOCK_STIME_NONE;
3100       splitmux->muxed_out_bytes = 0;
3101       splitmux->ready_for_output = FALSE;
3102       GST_SPLITMUX_UNLOCK (splitmux);
3103       break;
3104     }
3105     case GST_STATE_CHANGE_PAUSED_TO_READY:
3106       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3107       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3108     case GST_STATE_CHANGE_READY_TO_NULL:
3109       GST_SPLITMUX_LOCK (splitmux);
3110       gst_queue_array_clear (splitmux->times_to_split);
3111       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3112       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3113       /* Wake up any blocked threads */
3114       GST_LOG_OBJECT (splitmux,
3115           "State change -> NULL or READY. Waking threads");
3116       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3117       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3118       GST_SPLITMUX_UNLOCK (splitmux);
3119       break;
3120     default:
3121       break;
3122   }
3123
3124   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3125   if (ret == GST_STATE_CHANGE_FAILURE)
3126     goto beach;
3127
3128   switch (transition) {
3129     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3130       splitmux->need_async_start = TRUE;
3131       break;
3132     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3133       /* Change state async, because our child sink might not
3134        * be ready to do that for us yet if it's state is still locked */
3135
3136       splitmux->need_async_start = TRUE;
3137       /* we want to go async to PAUSED until we managed to configure and add the
3138        * sink */
3139       GST_SPLITMUX_LOCK (splitmux);
3140       do_async_start (splitmux);
3141       GST_SPLITMUX_UNLOCK (splitmux);
3142       ret = GST_STATE_CHANGE_ASYNC;
3143       break;
3144     }
3145     case GST_STATE_CHANGE_READY_TO_NULL:
3146       GST_SPLITMUX_LOCK (splitmux);
3147       splitmux->fragment_id = 0;
3148       /* Reset internal elements only if no pad contexts are using them */
3149       if (splitmux->contexts == NULL)
3150         gst_splitmux_reset (splitmux);
3151       do_async_done (splitmux);
3152       GST_SPLITMUX_UNLOCK (splitmux);
3153       break;
3154     default:
3155       break;
3156   }
3157
3158 beach:
3159   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3160       ret == GST_STATE_CHANGE_FAILURE) {
3161     /* Cleanup elements on failed transition out of NULL */
3162     gst_splitmux_reset (splitmux);
3163     GST_SPLITMUX_LOCK (splitmux);
3164     do_async_done (splitmux);
3165     GST_SPLITMUX_UNLOCK (splitmux);
3166   }
3167   return ret;
3168 }
3169
3170 gboolean
3171 register_splitmuxsink (GstPlugin * plugin)
3172 {
3173   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3174       "Split File Muxing Sink");
3175
3176   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3177       GST_TYPE_SPLITMUX_SINK);
3178 }
3179
3180 static void
3181 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3182 {
3183   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3184     splitmux->fragment_id = 0;
3185   }
3186 }
3187
3188 static void
3189 split_now (GstSplitMuxSink * splitmux)
3190 {
3191   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3192 }
3193
3194 static void
3195 split_after (GstSplitMuxSink * splitmux)
3196 {
3197   g_atomic_int_set (&(splitmux->split_requested), TRUE);
3198 }
3199
3200 static void
3201 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3202 {
3203   gboolean send_keyframe_requests;
3204
3205   GST_SPLITMUX_LOCK (splitmux);
3206   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3207   send_keyframe_requests = splitmux->send_keyframe_requests;
3208   GST_SPLITMUX_UNLOCK (splitmux);
3209
3210   if (send_keyframe_requests) {
3211     GstEvent *ev =
3212         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3213     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3214         GST_TIME_ARGS (split_time));
3215     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3216       GST_WARNING_OBJECT (splitmux,
3217           "Could not request keyframe at %" GST_TIME_FORMAT,
3218           GST_TIME_ARGS (split_time));
3219     }
3220   }
3221 }