splitmuxsink: Don't leak old muxer/sink in async mode
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * In the async-finalize mode, when the threshold is crossed, the old muxer
42  * and sink is disconnected from the pipeline and left to finish the file
43  * asynchronously, and a new muxer and sink is created to continue with the
44  * next fragment. For that reason, instead of muxer and sink objects, the
45  * muxer-factory and sink-factory properties are used to construct the new
46  * objects, together with muxer-properties and sink-properties.
47  *
48  * <refsect2>
49  * <title>Example pipelines</title>
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  * </refsect2>
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #include "config.h"
68 #endif
69
70 #include <string.h>
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
74
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
77
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
82
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
85
86 static void split_now (GstSplitMuxSink * splitmux);
87
88 enum
89 {
90   PROP_0,
91   PROP_LOCATION,
92   PROP_MAX_SIZE_TIME,
93   PROP_MAX_SIZE_BYTES,
94   PROP_MAX_SIZE_TIMECODE,
95   PROP_SEND_KEYFRAME_REQUESTS,
96   PROP_MAX_FILES,
97   PROP_MUXER_OVERHEAD,
98   PROP_USE_ROBUST_MUXING,
99   PROP_ALIGNMENT_THRESHOLD,
100   PROP_MUXER,
101   PROP_SINK,
102   PROP_RESET_MUXER,
103   PROP_ASYNC_FINALIZE,
104   PROP_MUXER_FACTORY,
105   PROP_MUXER_PROPERTIES,
106   PROP_SINK_FACTORY,
107   PROP_SINK_PROPERTIES
108 };
109
110 #define DEFAULT_MAX_SIZE_TIME       0
111 #define DEFAULT_MAX_SIZE_BYTES      0
112 #define DEFAULT_MAX_FILES           0
113 #define DEFAULT_MUXER_OVERHEAD      0.02
114 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
115 #define DEFAULT_ALIGNMENT_THRESHOLD 0
116 #define DEFAULT_MUXER "mp4mux"
117 #define DEFAULT_SINK "filesink"
118 #define DEFAULT_USE_ROBUST_MUXING FALSE
119 #define DEFAULT_RESET_MUXER TRUE
120 #define DEFAULT_ASYNC_FINALIZE FALSE
121
122 typedef struct _AsyncEosHelper
123 {
124   MqStreamCtx *ctx;
125   GstPad *pad;
126 } AsyncEosHelper;
127
128 enum
129 {
130   SIGNAL_FORMAT_LOCATION,
131   SIGNAL_FORMAT_LOCATION_FULL,
132   SIGNAL_SPLIT_NOW,
133   SIGNAL_MUXER_ADDED,
134   SIGNAL_SINK_ADDED,
135   SIGNAL_LAST
136 };
137
138 static guint signals[SIGNAL_LAST];
139
140 static GstStaticPadTemplate video_sink_template =
141 GST_STATIC_PAD_TEMPLATE ("video",
142     GST_PAD_SINK,
143     GST_PAD_REQUEST,
144     GST_STATIC_CAPS_ANY);
145 static GstStaticPadTemplate audio_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("audio_%u",
147     GST_PAD_SINK,
148     GST_PAD_REQUEST,
149     GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate subtitle_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
152     GST_PAD_SINK,
153     GST_PAD_REQUEST,
154     GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate caption_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("caption_%u",
157     GST_PAD_SINK,
158     GST_PAD_REQUEST,
159     GST_STATIC_CAPS_ANY);
160
161 static GQuark PAD_CONTEXT;
162 static GQuark EOS_FROM_US;
163 static GQuark RUNNING_TIME;
164 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
165  * to forward an incoming EOS message, but we cannot rely on the state of the
166  * splitmux anymore, so we set this qdata on the sink instead.
167  * The muxer and sink must be destroyed after both of these things have
168  * finished:
169  * 1) The EOS message has been sent when the fragment is ending
170  * 2) The muxer has been unlinked and relinked
171  * Therefore, EOS_FROM_US can have these two values:
172  * 0: EOS was not requested from us. Forward the message. The muxer and the
173  * sink will be destroyed together with the rest of the bin.
174  * 1: EOS was requested from us, but the other of the two tasks hasn't
175  * finished. Set EOS_FROM_US to 2 and do your stuff.
176  * 2: EOS was requested from us and the other of the two tasks has finished.
177  * Now we can destroy the muxer and the sink.
178  */
179
180 static void
181 _do_init (void)
182 {
183   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
184   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
185   RUNNING_TIME = g_quark_from_static_string ("running-time");
186 }
187
188 #define gst_splitmux_sink_parent_class parent_class
189 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
190     _do_init ());
191
192 static gboolean create_muxer (GstSplitMuxSink * splitmux);
193 static gboolean create_sink (GstSplitMuxSink * splitmux);
194 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
195     const GValue * value, GParamSpec * pspec);
196 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
197     GValue * value, GParamSpec * pspec);
198 static void gst_splitmux_sink_dispose (GObject * object);
199 static void gst_splitmux_sink_finalize (GObject * object);
200
201 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
202     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
203 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
204
205 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
206     element, GstStateChange transition);
207
208 static void bus_handler (GstBin * bin, GstMessage * msg);
209 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
210 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
211 static void mq_stream_ctx_free (MqStreamCtx * ctx);
212 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
213
214 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
215 static GstElement *create_element (GstSplitMuxSink * splitmux,
216     const gchar * factory, const gchar * name, gboolean locked);
217
218 static void do_async_done (GstSplitMuxSink * splitmux);
219
220 static MqStreamBuf *
221 mq_stream_buf_new (void)
222 {
223   return g_slice_new0 (MqStreamBuf);
224 }
225
226 static void
227 mq_stream_buf_free (MqStreamBuf * data)
228 {
229   g_slice_free (MqStreamBuf, data);
230 }
231
232 static SplitMuxOutputCommand *
233 out_cmd_buf_new (void)
234 {
235   return g_slice_new0 (SplitMuxOutputCommand);
236 }
237
238 static void
239 out_cmd_buf_free (SplitMuxOutputCommand * data)
240 {
241   g_slice_free (SplitMuxOutputCommand, data);
242 }
243
244 static void
245 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
246 {
247   GObjectClass *gobject_class = (GObjectClass *) klass;
248   GstElementClass *gstelement_class = (GstElementClass *) klass;
249   GstBinClass *gstbin_class = (GstBinClass *) klass;
250
251   gobject_class->set_property = gst_splitmux_sink_set_property;
252   gobject_class->get_property = gst_splitmux_sink_get_property;
253   gobject_class->dispose = gst_splitmux_sink_dispose;
254   gobject_class->finalize = gst_splitmux_sink_finalize;
255
256   gst_element_class_set_static_metadata (gstelement_class,
257       "Split Muxing Bin", "Generic/Bin/Muxer",
258       "Convenience bin that muxes incoming streams into multiple time/size limited files",
259       "Jan Schmidt <jan@centricular.com>");
260
261   gst_element_class_add_static_pad_template (gstelement_class,
262       &video_sink_template);
263   gst_element_class_add_static_pad_template (gstelement_class,
264       &audio_sink_template);
265   gst_element_class_add_static_pad_template (gstelement_class,
266       &subtitle_sink_template);
267   gst_element_class_add_static_pad_template (gstelement_class,
268       &caption_sink_template);
269
270   gstelement_class->change_state =
271       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
272   gstelement_class->request_new_pad =
273       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
274   gstelement_class->release_pad =
275       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
276
277   gstbin_class->handle_message = bus_handler;
278
279   g_object_class_install_property (gobject_class, PROP_LOCATION,
280       g_param_spec_string ("location", "File Output Pattern",
281           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
282           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
284       g_param_spec_double ("mux-overhead", "Muxing Overhead",
285           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
286           DEFAULT_MUXER_OVERHEAD,
287           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
288
289   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
290       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
291           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
292           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
293   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
294       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
295           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
296           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
298       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
299           "Maximum difference in timecode between first and last frame. "
300           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
301           "Will only be effective if a timecode track is present.",
302           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
304       g_param_spec_boolean ("send-keyframe-requests",
305           "Request keyframes at max-size-time",
306           "Request a keyframe every max-size-time ns to try splitting at that point. "
307           "Needs max-size-bytes to be 0 in order to be effective.",
308           DEFAULT_SEND_KEYFRAME_REQUESTS,
309           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
311       g_param_spec_uint ("max-files", "Max files",
312           "Maximum number of files to keep on disk. Once the maximum is reached,"
313           "old files start to be deleted to make room for new ones.", 0,
314           G_MAXUINT, DEFAULT_MAX_FILES,
315           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
317       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
318           "Allow non-reference streams to be that many ns before the reference"
319           " stream",
320           0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
321           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
322
323   g_object_class_install_property (gobject_class, PROP_MUXER,
324       g_param_spec_object ("muxer", "Muxer",
325           "The muxer element to use (NULL = default mp4mux). "
326           "Valid only for async-finalize = FALSE",
327           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328   g_object_class_install_property (gobject_class, PROP_SINK,
329       g_param_spec_object ("sink", "Sink",
330           "The sink element (or element chain) to use (NULL = default filesink). "
331           "Valid only for async-finalize = FALSE",
332           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333
334   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
335       g_param_spec_boolean ("use-robust-muxing",
336           "Support robust-muxing mode of some muxers",
337           "Check if muxers support robust muxing via the reserved-max-duration and "
338           "reserved-duration-remaining properties and use them if so. "
339           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
340           " create new fragments if the reserved header space is about to overflow. "
341           "Note this does not set reserved-moov-update-period - apps should do that manually",
342           DEFAULT_USE_ROBUST_MUXING,
343           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344
345   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
346       g_param_spec_boolean ("reset-muxer",
347           "Reset Muxer",
348           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
349           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
352       g_param_spec_boolean ("async-finalize",
353           "Finalize fragments asynchronously",
354           "Finalize each fragment asynchronously and start a new one",
355           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
357       g_param_spec_string ("muxer-factory", "Muxer factory",
358           "The muxer element factory to use (default = mp4mux). "
359           "Valid only for async-finalize = TRUE",
360           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
362       g_param_spec_boxed ("muxer-properties", "Muxer properties",
363           "The muxer element properties to use. "
364           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
365           "Valid only for async-finalize = TRUE",
366           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
368       g_param_spec_string ("sink-factory", "Sink factory",
369           "The sink element factory to use (default = filesink). "
370           "Valid only for async-finalize = TRUE",
371           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
373       g_param_spec_boxed ("sink-properties", "Sink properties",
374           "The sink element properties to use. "
375           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
376           "Valid only for async-finalize = TRUE",
377           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378
379   /**
380    * GstSplitMuxSink::format-location:
381    * @splitmux: the #GstSplitMuxSink
382    * @fragment_id: the sequence number of the file to be created
383    *
384    * Returns: the location to be used for the next output file
385    */
386   signals[SIGNAL_FORMAT_LOCATION] =
387       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
388       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
389
390   /**
391    * GstSplitMuxSink::format-location-full:
392    * @splitmux: the #GstSplitMuxSink
393    * @fragment_id: the sequence number of the file to be created
394    * @first_sample: A #GstSample containing the first buffer
395    *   from the reference stream in the new file
396    *
397    * Returns: the location to be used for the next output file
398    */
399   signals[SIGNAL_FORMAT_LOCATION_FULL] =
400       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
401       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
402       GST_TYPE_SAMPLE);
403
404   /**
405    * GstSplitMuxSink::split-now:
406    * @splitmux: the #GstSplitMuxSink
407    *
408    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
409    *
410    *
411    * Since: 1.14
412    */
413   signals[SIGNAL_SPLIT_NOW] =
414       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
415       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
416           split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
417
418   /**
419    * GstSplitMuxSink::muxer-added:
420    * @splitmux: the #GstSplitMuxSink
421    * @muxer: the newly added muxer element
422    *
423    * Since: 1.14
424    */
425   signals[SIGNAL_MUXER_ADDED] =
426       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
427       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
428
429   /**
430    * GstSplitMuxSink::sink-added:
431    * @splitmux: the #GstSplitMuxSink
432    * @sink: the newly added sink element
433    *
434    * Since: 1.14
435    */
436   signals[SIGNAL_SINK_ADDED] =
437       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
438       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
439
440   klass->split_now = split_now;
441 }
442
443 static void
444 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
445 {
446   g_mutex_init (&splitmux->lock);
447   g_cond_init (&splitmux->input_cond);
448   g_cond_init (&splitmux->output_cond);
449   g_queue_init (&splitmux->out_cmd_q);
450
451   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
452   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
453   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
454   splitmux->max_files = DEFAULT_MAX_FILES;
455   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
456   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
457   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
458   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
459   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
460
461   splitmux->threshold_timecode_str = NULL;
462
463   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
464   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
465   splitmux->muxer_properties = NULL;
466   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
467   splitmux->sink_properties = NULL;
468
469   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
470   splitmux->split_now = FALSE;
471 }
472
473 static void
474 gst_splitmux_reset (GstSplitMuxSink * splitmux)
475 {
476   if (splitmux->muxer) {
477     gst_element_set_locked_state (splitmux->muxer, TRUE);
478     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
479     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
480   }
481   if (splitmux->active_sink) {
482     gst_element_set_locked_state (splitmux->active_sink, TRUE);
483     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
484     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
485   }
486
487   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
488 }
489
490 static void
491 gst_splitmux_sink_dispose (GObject * object)
492 {
493   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
494
495   /* Calling parent dispose invalidates all child pointers */
496   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
497
498   G_OBJECT_CLASS (parent_class)->dispose (object);
499 }
500
501 static void
502 gst_splitmux_sink_finalize (GObject * object)
503 {
504   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
505   g_cond_clear (&splitmux->input_cond);
506   g_cond_clear (&splitmux->output_cond);
507   g_mutex_clear (&splitmux->lock);
508   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
509   g_queue_clear (&splitmux->out_cmd_q);
510
511   if (splitmux->provided_sink)
512     gst_object_unref (splitmux->provided_sink);
513   if (splitmux->provided_muxer)
514     gst_object_unref (splitmux->provided_muxer);
515
516   if (splitmux->muxer_factory)
517     g_free (splitmux->muxer_factory);
518   if (splitmux->muxer_properties)
519     gst_structure_free (splitmux->muxer_properties);
520   if (splitmux->sink_factory)
521     g_free (splitmux->sink_factory);
522   if (splitmux->sink_properties)
523     gst_structure_free (splitmux->sink_properties);
524
525   if (splitmux->threshold_timecode_str)
526     g_free (splitmux->threshold_timecode_str);
527
528   g_free (splitmux->location);
529
530   /* Make sure to free any un-released contexts. There should not be any,
531    * because the dispose will have freed all request pads though */
532   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
533   g_list_free (splitmux->contexts);
534
535   G_OBJECT_CLASS (parent_class)->finalize (object);
536 }
537
538 /*
539  * Set any time threshold to the muxer, if it has
540  * reserved-max-duration and reserved-duration-remaining
541  * properties. Called when creating/claiming the muxer
542  * in create_elements() */
543 static void
544 update_muxer_properties (GstSplitMuxSink * sink)
545 {
546   GObjectClass *klass;
547   GstClockTime threshold_time;
548
549   sink->muxer_has_reserved_props = FALSE;
550   if (sink->muxer == NULL)
551     return;
552   klass = G_OBJECT_GET_CLASS (sink->muxer);
553   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
554     return;
555   if (g_object_class_find_property (klass,
556           "reserved-duration-remaining") == NULL)
557     return;
558   sink->muxer_has_reserved_props = TRUE;
559
560   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
561       GST_TIME_ARGS (sink->threshold_time));
562   GST_OBJECT_LOCK (sink);
563   threshold_time = sink->threshold_time;
564   GST_OBJECT_UNLOCK (sink);
565
566   if (threshold_time > 0) {
567     /* Tell the muxer how much space to reserve */
568     GstClockTime muxer_threshold = threshold_time;
569     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
570   }
571 }
572
573 static void
574 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
575     const GValue * value, GParamSpec * pspec)
576 {
577   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
578
579   switch (prop_id) {
580     case PROP_LOCATION:{
581       GST_OBJECT_LOCK (splitmux);
582       g_free (splitmux->location);
583       splitmux->location = g_value_dup_string (value);
584       GST_OBJECT_UNLOCK (splitmux);
585       break;
586     }
587     case PROP_MAX_SIZE_BYTES:
588       GST_OBJECT_LOCK (splitmux);
589       splitmux->threshold_bytes = g_value_get_uint64 (value);
590       GST_OBJECT_UNLOCK (splitmux);
591       break;
592     case PROP_MAX_SIZE_TIME:
593       GST_OBJECT_LOCK (splitmux);
594       splitmux->threshold_time = g_value_get_uint64 (value);
595       GST_OBJECT_UNLOCK (splitmux);
596       break;
597     case PROP_MAX_SIZE_TIMECODE:
598       GST_OBJECT_LOCK (splitmux);
599       splitmux->threshold_timecode_str = g_value_dup_string (value);
600       GST_OBJECT_UNLOCK (splitmux);
601       break;
602     case PROP_SEND_KEYFRAME_REQUESTS:
603       GST_OBJECT_LOCK (splitmux);
604       splitmux->send_keyframe_requests = g_value_get_boolean (value);
605       GST_OBJECT_UNLOCK (splitmux);
606       break;
607     case PROP_MAX_FILES:
608       GST_OBJECT_LOCK (splitmux);
609       splitmux->max_files = g_value_get_uint (value);
610       GST_OBJECT_UNLOCK (splitmux);
611       break;
612     case PROP_MUXER_OVERHEAD:
613       GST_OBJECT_LOCK (splitmux);
614       splitmux->mux_overhead = g_value_get_double (value);
615       GST_OBJECT_UNLOCK (splitmux);
616       break;
617     case PROP_USE_ROBUST_MUXING:
618       GST_OBJECT_LOCK (splitmux);
619       splitmux->use_robust_muxing = g_value_get_boolean (value);
620       GST_OBJECT_UNLOCK (splitmux);
621       if (splitmux->use_robust_muxing)
622         update_muxer_properties (splitmux);
623       break;
624     case PROP_ALIGNMENT_THRESHOLD:
625       GST_OBJECT_LOCK (splitmux);
626       splitmux->alignment_threshold = g_value_get_uint64 (value);
627       GST_OBJECT_UNLOCK (splitmux);
628       break;
629     case PROP_SINK:
630       GST_OBJECT_LOCK (splitmux);
631       if (splitmux->provided_sink)
632         gst_object_unref (splitmux->provided_sink);
633       splitmux->provided_sink = g_value_get_object (value);
634       gst_object_ref_sink (splitmux->provided_sink);
635       GST_OBJECT_UNLOCK (splitmux);
636       break;
637     case PROP_MUXER:
638       GST_OBJECT_LOCK (splitmux);
639       if (splitmux->provided_muxer)
640         gst_object_unref (splitmux->provided_muxer);
641       splitmux->provided_muxer = g_value_get_object (value);
642       gst_object_ref_sink (splitmux->provided_muxer);
643       GST_OBJECT_UNLOCK (splitmux);
644       break;
645     case PROP_RESET_MUXER:
646       GST_OBJECT_LOCK (splitmux);
647       splitmux->reset_muxer = g_value_get_boolean (value);
648       GST_OBJECT_UNLOCK (splitmux);
649       break;
650     case PROP_ASYNC_FINALIZE:
651       GST_OBJECT_LOCK (splitmux);
652       splitmux->async_finalize = g_value_get_boolean (value);
653       GST_OBJECT_UNLOCK (splitmux);
654       break;
655     case PROP_MUXER_FACTORY:
656       GST_OBJECT_LOCK (splitmux);
657       if (splitmux->muxer_factory)
658         g_free (splitmux->muxer_factory);
659       splitmux->muxer_factory = g_value_dup_string (value);
660       GST_OBJECT_UNLOCK (splitmux);
661       break;
662     case PROP_MUXER_PROPERTIES:
663       GST_OBJECT_LOCK (splitmux);
664       if (splitmux->muxer_properties)
665         gst_structure_free (splitmux->muxer_properties);
666       if (gst_value_get_structure (value))
667         splitmux->muxer_properties =
668             gst_structure_copy (gst_value_get_structure (value));
669       else
670         splitmux->muxer_properties = NULL;
671       GST_OBJECT_UNLOCK (splitmux);
672       break;
673     case PROP_SINK_FACTORY:
674       GST_OBJECT_LOCK (splitmux);
675       if (splitmux->sink_factory)
676         g_free (splitmux->sink_factory);
677       splitmux->sink_factory = g_value_dup_string (value);
678       GST_OBJECT_UNLOCK (splitmux);
679       break;
680     case PROP_SINK_PROPERTIES:
681       GST_OBJECT_LOCK (splitmux);
682       if (splitmux->sink_properties)
683         gst_structure_free (splitmux->sink_properties);
684       if (gst_value_get_structure (value))
685         splitmux->sink_properties =
686             gst_structure_copy (gst_value_get_structure (value));
687       else
688         splitmux->sink_properties = NULL;
689       GST_OBJECT_UNLOCK (splitmux);
690       break;
691     default:
692       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
693       break;
694   }
695 }
696
697 static void
698 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
699     GValue * value, GParamSpec * pspec)
700 {
701   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
702
703   switch (prop_id) {
704     case PROP_LOCATION:
705       GST_OBJECT_LOCK (splitmux);
706       g_value_set_string (value, splitmux->location);
707       GST_OBJECT_UNLOCK (splitmux);
708       break;
709     case PROP_MAX_SIZE_BYTES:
710       GST_OBJECT_LOCK (splitmux);
711       g_value_set_uint64 (value, splitmux->threshold_bytes);
712       GST_OBJECT_UNLOCK (splitmux);
713       break;
714     case PROP_MAX_SIZE_TIME:
715       GST_OBJECT_LOCK (splitmux);
716       g_value_set_uint64 (value, splitmux->threshold_time);
717       GST_OBJECT_UNLOCK (splitmux);
718       break;
719     case PROP_MAX_SIZE_TIMECODE:
720       GST_OBJECT_LOCK (splitmux);
721       g_value_set_string (value, splitmux->threshold_timecode_str);
722       GST_OBJECT_UNLOCK (splitmux);
723       break;
724     case PROP_SEND_KEYFRAME_REQUESTS:
725       GST_OBJECT_LOCK (splitmux);
726       g_value_set_boolean (value, splitmux->send_keyframe_requests);
727       GST_OBJECT_UNLOCK (splitmux);
728       break;
729     case PROP_MAX_FILES:
730       GST_OBJECT_LOCK (splitmux);
731       g_value_set_uint (value, splitmux->max_files);
732       GST_OBJECT_UNLOCK (splitmux);
733       break;
734     case PROP_MUXER_OVERHEAD:
735       GST_OBJECT_LOCK (splitmux);
736       g_value_set_double (value, splitmux->mux_overhead);
737       GST_OBJECT_UNLOCK (splitmux);
738       break;
739     case PROP_USE_ROBUST_MUXING:
740       GST_OBJECT_LOCK (splitmux);
741       g_value_set_boolean (value, splitmux->use_robust_muxing);
742       GST_OBJECT_UNLOCK (splitmux);
743       break;
744     case PROP_ALIGNMENT_THRESHOLD:
745       GST_OBJECT_LOCK (splitmux);
746       g_value_set_uint64 (value, splitmux->alignment_threshold);
747       GST_OBJECT_UNLOCK (splitmux);
748       break;
749     case PROP_SINK:
750       GST_OBJECT_LOCK (splitmux);
751       g_value_set_object (value, splitmux->provided_sink);
752       GST_OBJECT_UNLOCK (splitmux);
753       break;
754     case PROP_MUXER:
755       GST_OBJECT_LOCK (splitmux);
756       g_value_set_object (value, splitmux->provided_muxer);
757       GST_OBJECT_UNLOCK (splitmux);
758       break;
759     case PROP_RESET_MUXER:
760       GST_OBJECT_LOCK (splitmux);
761       g_value_set_boolean (value, splitmux->reset_muxer);
762       GST_OBJECT_UNLOCK (splitmux);
763       break;
764     case PROP_ASYNC_FINALIZE:
765       GST_OBJECT_LOCK (splitmux);
766       g_value_set_boolean (value, splitmux->async_finalize);
767       GST_OBJECT_UNLOCK (splitmux);
768       break;
769     case PROP_MUXER_FACTORY:
770       GST_OBJECT_LOCK (splitmux);
771       g_value_set_string (value, splitmux->muxer_factory);
772       GST_OBJECT_UNLOCK (splitmux);
773       break;
774     case PROP_MUXER_PROPERTIES:
775       GST_OBJECT_LOCK (splitmux);
776       gst_value_set_structure (value, splitmux->muxer_properties);
777       GST_OBJECT_UNLOCK (splitmux);
778       break;
779     case PROP_SINK_FACTORY:
780       GST_OBJECT_LOCK (splitmux);
781       g_value_set_string (value, splitmux->sink_factory);
782       GST_OBJECT_UNLOCK (splitmux);
783       break;
784     case PROP_SINK_PROPERTIES:
785       GST_OBJECT_LOCK (splitmux);
786       gst_value_set_structure (value, splitmux->sink_properties);
787       GST_OBJECT_UNLOCK (splitmux);
788       break;
789     default:
790       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
791       break;
792   }
793 }
794
795 /* Convenience function */
796 static inline GstClockTimeDiff
797 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
798 {
799   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
800
801   if (GST_CLOCK_TIME_IS_VALID (val)) {
802     gboolean sign =
803         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
804     if (sign > 0)
805       res = val;
806     else if (sign < 0)
807       res = -val;
808   }
809   return res;
810 }
811
812 static MqStreamCtx *
813 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
814 {
815   MqStreamCtx *ctx;
816
817   ctx = g_new0 (MqStreamCtx, 1);
818   ctx->splitmux = splitmux;
819   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
820   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
821   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
822   g_queue_init (&ctx->queued_bufs);
823   return ctx;
824 }
825
826 static void
827 mq_stream_ctx_free (MqStreamCtx * ctx)
828 {
829   if (ctx->q) {
830     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
831
832     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
833
834     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
835       gst_element_set_locked_state (ctx->q, TRUE);
836       gst_element_set_state (ctx->q, GST_STATE_NULL);
837       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
838       gst_object_unref (parent);
839     }
840     gst_object_unref (ctx->q);
841   }
842   gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
843   gst_object_unref (ctx->sinkpad);
844   gst_object_unref (ctx->srcpad);
845   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
846   g_queue_clear (&ctx->queued_bufs);
847   g_free (ctx);
848 }
849
850 static void
851 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
852     GstElement * sink)
853 {
854   gchar *location = NULL;
855   GstMessage *msg;
856   const gchar *msg_name = opened ?
857       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
858   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
859
860   if (!opened) {
861     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
862     if (rtime)
863       running_time = *rtime;
864   }
865
866   g_object_get (sink, "location", &location, NULL);
867
868   /* If it's in the middle of a teardown, the reference_ctc might have become
869    * NULL */
870   if (splitmux->reference_ctx) {
871     msg = gst_message_new_element (GST_OBJECT (splitmux),
872         gst_structure_new (msg_name,
873             "location", G_TYPE_STRING, location,
874             "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
875     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
876   }
877
878   g_free (location);
879 }
880
881 static void
882 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
883 {
884   GstEvent *eos;
885   GstPad *pad;
886   MqStreamCtx *ctx;
887
888   eos = gst_event_new_eos ();
889   pad = helper->pad;
890   ctx = helper->ctx;
891
892   GST_SPLITMUX_LOCK (splitmux);
893   if (!pad)
894     pad = gst_pad_get_peer (ctx->srcpad);
895   GST_SPLITMUX_UNLOCK (splitmux);
896
897   gst_pad_send_event (pad, eos);
898   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
899
900   gst_object_unref (pad);
901   g_free (helper);
902 }
903
904 /* Called with lock held, drops the lock to send EOS to the
905  * pad
906  */
907 static void
908 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
909 {
910   GstEvent *eos;
911   GstPad *pad;
912
913   eos = gst_event_new_eos ();
914   pad = gst_pad_get_peer (ctx->srcpad);
915
916   ctx->out_eos = TRUE;
917
918   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
919   GST_SPLITMUX_UNLOCK (splitmux);
920   gst_pad_send_event (pad, eos);
921   GST_SPLITMUX_LOCK (splitmux);
922
923   gst_object_unref (pad);
924 }
925
926 /* Called with lock held. Schedules an EOS event to the ctx pad
927  * to happen in another thread */
928 static void
929 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
930 {
931   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
932   GstPad *srcpad, *sinkpad;
933
934   srcpad = ctx->srcpad;
935   sinkpad = gst_pad_get_peer (srcpad);
936
937   helper->ctx = ctx;
938   helper->pad = sinkpad;        /* Takes the reference */
939
940   ctx->out_eos_async_done = TRUE;
941   /* HACK: Here, we explicitly unset the SINK flag on the target sink element
942    * that's about to be asynchronously disposed, so that it no longer
943    * participates in GstBin EOS logic. This fixes a race where if
944    * splitmuxsink really reaches EOS before an asynchronous background
945    * element has finished, then the bin won't actually send EOS to the
946    * pipeline. Even after finishing and removing the old element, the
947    * bin doesn't re-check EOS status on removing a SINK element. This
948    * should be fixed in core, making this hack unnecessary. */
949   GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
950
951   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
952       sinkpad, ctx);
953
954   g_assert_nonnull (helper->pad);
955   gst_element_call_async (GST_ELEMENT (splitmux),
956       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
957 }
958
959 /* Called with lock held. TRUE iff all contexts have a
960  * pending (or delivered) async eos event */
961 static gboolean
962 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
963 {
964   gboolean ret = TRUE;
965   GList *item;
966
967   for (item = splitmux->contexts; item; item = item->next) {
968     MqStreamCtx *ctx = item->data;
969     ret &= ctx->out_eos_async_done;
970   }
971   return ret;
972 }
973
974 /* Called with splitmux lock held to check if this output
975  * context needs to sleep to wait for the release of the
976  * next GOP, or to send EOS to close out the current file
977  */
978 static void
979 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
980 {
981   if (ctx->caps_change)
982     return;
983
984   do {
985     /* When first starting up, the reference stream has to output
986      * the first buffer to prepare the muxer and sink */
987     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
988     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
989
990     if (!(splitmux->max_out_running_time == 0 ||
991             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
992             splitmux->alignment_threshold == 0 ||
993             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
994       my_max_out_running_time -= splitmux->alignment_threshold;
995       GST_LOG_OBJECT (ctx->srcpad,
996           "Max out running time currently %" GST_STIME_FORMAT
997           ", with threshold applied it is %" GST_STIME_FORMAT,
998           GST_STIME_ARGS (splitmux->max_out_running_time),
999           GST_STIME_ARGS (my_max_out_running_time));
1000     }
1001
1002     if (ctx->flushing
1003         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1004       return;
1005
1006     GST_LOG_OBJECT (ctx->srcpad,
1007         "Checking running time %" GST_STIME_FORMAT " against max %"
1008         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1009         GST_STIME_ARGS (my_max_out_running_time));
1010
1011     if (can_output) {
1012       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1013           ctx->out_running_time < my_max_out_running_time) {
1014         return;
1015       }
1016
1017       switch (splitmux->output_state) {
1018         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1019           /* We only get here if we've finished outputting a GOP and need to know
1020            * what to do next */
1021           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1022           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1023           continue;
1024
1025         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1026           /* We've reached the max out running_time to get here, so end this file now */
1027           if (ctx->out_eos == FALSE) {
1028             if (splitmux->async_finalize) {
1029               /* We must set EOS asynchronously at this point. We cannot defer
1030                * it, because we need all contexts to wake up, for the
1031                * reference context to eventually give us something at
1032                * START_NEXT_FILE. Otherwise, collectpads might choose another
1033                * context to give us the first buffer, and format-location-full
1034                * will not contain a valid sample. */
1035               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1036                   GINT_TO_POINTER (1));
1037               eos_context_async (ctx, splitmux);
1038               if (all_contexts_are_async_eos (splitmux)) {
1039                 GST_INFO_OBJECT (splitmux,
1040                     "All contexts are async_eos. Moving to the next file.");
1041                 /* We can start the next file once we've asked each pad to go EOS */
1042                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1043                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1044                 continue;
1045               }
1046             } else {
1047               send_eos (splitmux, ctx);
1048               continue;
1049             }
1050           } else {
1051             GST_INFO_OBJECT (splitmux,
1052                 "At end-of-file state, but context %p is already EOS", ctx);
1053           }
1054           break;
1055         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1056           if (ctx->is_reference) {
1057             /* Special handling on the reference ctx to start new fragments
1058              * and collect commands from the command queue */
1059             /* drops the splitmux lock briefly: */
1060             /* We must have reference ctx in order for format-location-full to
1061              * have a sample */
1062             start_next_fragment (splitmux, ctx);
1063             continue;
1064           }
1065           break;
1066         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1067           do {
1068             SplitMuxOutputCommand *cmd =
1069                 g_queue_pop_tail (&splitmux->out_cmd_q);
1070             if (cmd != NULL) {
1071               /* If we pop the last command, we need to make our queues bigger */
1072               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1073                 grow_blocked_queues (splitmux);
1074
1075               if (cmd->start_new_fragment) {
1076                 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1077                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1078               } else {
1079                 GST_DEBUG_OBJECT (splitmux,
1080                     "Got new output cmd for time %" GST_STIME_FORMAT,
1081                     GST_STIME_ARGS (cmd->max_output_ts));
1082
1083                 /* Extend the output range immediately */
1084                 splitmux->max_out_running_time = cmd->max_output_ts;
1085                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1086               }
1087               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1088
1089               out_cmd_buf_free (cmd);
1090               break;
1091             } else {
1092               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1093             }
1094           } while (splitmux->output_state ==
1095               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1096           /* loop and re-check the state */
1097           continue;
1098         }
1099         case SPLITMUX_OUTPUT_STATE_STOPPED:
1100           return;
1101       }
1102     }
1103
1104     GST_INFO_OBJECT (ctx->srcpad,
1105         "Sleeping for running time %"
1106         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1107         GST_STIME_ARGS (ctx->out_running_time),
1108         GST_STIME_ARGS (splitmux->max_out_running_time));
1109     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1110     GST_INFO_OBJECT (ctx->srcpad,
1111         "Woken for new max running time %" GST_STIME_FORMAT,
1112         GST_STIME_ARGS (splitmux->max_out_running_time));
1113   }
1114   while (1);
1115 }
1116
1117 static GstClockTime
1118 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1119     const GstVideoTimeCode * cur_tc)
1120 {
1121   GstVideoTimeCode *target_tc;
1122   GstVideoTimeCodeInterval *tc_inter;
1123   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1124
1125   if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1126     return GST_CLOCK_TIME_NONE;
1127
1128   tc_inter =
1129       gst_video_time_code_interval_new_from_string
1130       (splitmux->threshold_timecode_str);
1131   target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1132   gst_video_time_code_interval_free (tc_inter);
1133
1134   /* Convert to ns */
1135   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1136   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1137
1138   /* Add fragment_start_time, accounting for wraparound */
1139   if (target_tc_time >= cur_tc_time) {
1140     next_max_tc_time =
1141         target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1142   } else {
1143     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1144
1145     next_max_tc_time =
1146         day_in_ns - cur_tc_time + target_tc_time +
1147         splitmux->fragment_start_time;
1148   }
1149   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1150       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1151       GST_TIME_ARGS (cur_tc_time));
1152   gst_video_time_code_free (target_tc);
1153
1154   return next_max_tc_time;
1155 }
1156
1157 static gboolean
1158 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1159 {
1160   GstEvent *ev;
1161   GstClockTime target_time;
1162   gboolean timecode_based = FALSE;
1163
1164   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1165   if (splitmux->threshold_timecode_str) {
1166     GstVideoTimeCodeMeta *tc_meta;
1167
1168     if (buffer != NULL) {
1169       tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1170       if (tc_meta) {
1171         splitmux->next_max_tc_time =
1172             calculate_next_max_timecode (splitmux, &tc_meta->tc);
1173         timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1174       }
1175     } else {
1176       /* This can happen in the presence of GAP events that trigger
1177        * a new fragment start */
1178       GST_WARNING_OBJECT (splitmux,
1179           "No buffer available to calculate next timecode");
1180     }
1181   }
1182
1183   if (splitmux->send_keyframe_requests == FALSE
1184       || (splitmux->threshold_time == 0 && !timecode_based)
1185       || splitmux->threshold_bytes != 0)
1186     return TRUE;
1187
1188   if (timecode_based) {
1189     /* We might have rounding errors: aim slightly earlier */
1190     target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1191   } else {
1192     target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1193   }
1194   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1195   GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1196       GST_TIME_ARGS (target_time));
1197   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1198 }
1199
1200 static GstPadProbeReturn
1201 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1202 {
1203   GstSplitMuxSink *splitmux = ctx->splitmux;
1204   MqStreamBuf *buf_info = NULL;
1205
1206   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1207
1208   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1209   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1210     g_warning ("Buffer list handling not implemented");
1211     return GST_PAD_PROBE_DROP;
1212   }
1213   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1214       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1215     GstEvent *event = gst_pad_probe_info_get_event (info);
1216     gboolean locked = FALSE;
1217
1218     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1219
1220     switch (GST_EVENT_TYPE (event)) {
1221       case GST_EVENT_SEGMENT:
1222         gst_event_copy_segment (event, &ctx->out_segment);
1223         break;
1224       case GST_EVENT_FLUSH_STOP:
1225         GST_SPLITMUX_LOCK (splitmux);
1226         locked = TRUE;
1227         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1228         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1229         g_queue_clear (&ctx->queued_bufs);
1230         ctx->flushing = FALSE;
1231         break;
1232       case GST_EVENT_FLUSH_START:
1233         GST_SPLITMUX_LOCK (splitmux);
1234         locked = TRUE;
1235         GST_LOG_OBJECT (pad, "Flush start");
1236         ctx->flushing = TRUE;
1237         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1238         break;
1239       case GST_EVENT_EOS:
1240         GST_SPLITMUX_LOCK (splitmux);
1241         locked = TRUE;
1242         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1243           goto beach;
1244         ctx->out_eos = TRUE;
1245         GST_INFO_OBJECT (splitmux,
1246             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1247         break;
1248       case GST_EVENT_GAP:{
1249         GstClockTime gap_ts;
1250         GstClockTimeDiff rtime;
1251
1252         gst_event_parse_gap (event, &gap_ts, NULL);
1253         if (gap_ts == GST_CLOCK_TIME_NONE)
1254           break;
1255
1256         GST_SPLITMUX_LOCK (splitmux);
1257         locked = TRUE;
1258
1259         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1260           goto beach;
1261
1262         /* When we get a gap event on the
1263          * reference stream and we're trying to open a
1264          * new file, we need to store it until we get
1265          * the buffer afterwards
1266          */
1267         if (ctx->is_reference &&
1268             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1269           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1270           gst_event_replace (&ctx->pending_gap, event);
1271           GST_SPLITMUX_UNLOCK (splitmux);
1272           return GST_PAD_PROBE_HANDLED;
1273         }
1274
1275         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1276
1277         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1278             GST_STIME_ARGS (rtime));
1279
1280         if (rtime != GST_CLOCK_STIME_NONE) {
1281           ctx->out_running_time = rtime;
1282           complete_or_wait_on_out (splitmux, ctx);
1283         }
1284         break;
1285       }
1286       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1287         const GstStructure *s;
1288         GstClockTimeDiff ts = 0;
1289
1290         s = gst_event_get_structure (event);
1291         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1292           break;
1293
1294         gst_structure_get_int64 (s, "timestamp", &ts);
1295
1296         GST_SPLITMUX_LOCK (splitmux);
1297         locked = TRUE;
1298
1299         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1300           goto beach;
1301         ctx->out_running_time = ts;
1302         if (!ctx->is_reference)
1303           complete_or_wait_on_out (splitmux, ctx);
1304         GST_SPLITMUX_UNLOCK (splitmux);
1305         return GST_PAD_PROBE_DROP;
1306       }
1307       case GST_EVENT_CAPS:{
1308         GstPad *peer;
1309
1310         if (!ctx->is_reference)
1311           break;
1312
1313         peer = gst_pad_get_peer (pad);
1314         if (peer) {
1315           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1316
1317           gst_object_unref (peer);
1318
1319           if (ok)
1320             break;
1321
1322         } else {
1323           break;
1324         }
1325         /* This is in the case the muxer doesn't allow this change of caps */
1326         GST_SPLITMUX_LOCK (splitmux);
1327         locked = TRUE;
1328         ctx->caps_change = TRUE;
1329
1330         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1331           GST_DEBUG_OBJECT (splitmux,
1332               "New caps were not accepted. Switching output file");
1333           if (ctx->out_eos == FALSE) {
1334             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1335             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1336           }
1337         }
1338
1339         /* Lets it fall through, if it fails again, then the muxer just can't
1340          * support this format, but at least we have a closed file.
1341          */
1342         break;
1343       }
1344       default:
1345         break;
1346     }
1347
1348     /* We need to make sure events aren't passed
1349      * until the muxer / sink are ready for it */
1350     if (!locked)
1351       GST_SPLITMUX_LOCK (splitmux);
1352     if (!ctx->is_reference)
1353       complete_or_wait_on_out (splitmux, ctx);
1354     GST_SPLITMUX_UNLOCK (splitmux);
1355
1356     /* Don't try to forward sticky events before the next buffer is there
1357      * because it would cause a new file to be created without the first
1358      * buffer being available.
1359      */
1360     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1361       gst_event_unref (event);
1362       return GST_PAD_PROBE_HANDLED;
1363     } else
1364       return GST_PAD_PROBE_PASS;
1365   }
1366
1367   /* Allow everything through until the configured next stopping point */
1368   GST_SPLITMUX_LOCK (splitmux);
1369
1370   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1371   if (buf_info == NULL)
1372     /* Can only happen due to a poorly timed flush */
1373     goto beach;
1374
1375   /* If we have popped a keyframe, decrement the queued_gop count */
1376   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1377     splitmux->queued_keyframes--;
1378
1379   ctx->out_running_time = buf_info->run_ts;
1380   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1381
1382   GST_LOG_OBJECT (splitmux,
1383       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1384       " size %" G_GUINT64_FORMAT,
1385       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1386
1387   ctx->caps_change = FALSE;
1388
1389   complete_or_wait_on_out (splitmux, ctx);
1390
1391   splitmux->muxed_out_bytes += buf_info->buf_size;
1392
1393 #ifndef GST_DISABLE_GST_DEBUG
1394   {
1395     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1396     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1397         " run ts %" GST_STIME_FORMAT, buf,
1398         GST_STIME_ARGS (ctx->out_running_time));
1399   }
1400 #endif
1401
1402   ctx->cur_out_buffer = NULL;
1403   GST_SPLITMUX_UNLOCK (splitmux);
1404
1405   /* pending_gap is protected by the STREAM lock */
1406   if (ctx->pending_gap) {
1407     /* If we previously stored a gap event, send it now */
1408     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1409
1410     GST_DEBUG_OBJECT (splitmux,
1411         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1412
1413     gst_pad_send_event (peer, ctx->pending_gap);
1414     ctx->pending_gap = NULL;
1415
1416     gst_object_unref (peer);
1417   }
1418
1419   mq_stream_buf_free (buf_info);
1420
1421   return GST_PAD_PROBE_PASS;
1422
1423 beach:
1424   GST_SPLITMUX_UNLOCK (splitmux);
1425   return GST_PAD_PROBE_DROP;
1426 }
1427
1428 static gboolean
1429 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1430 {
1431   return gst_pad_send_event (peer, gst_event_ref (*event));
1432 }
1433
1434 static void
1435 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1436 {
1437   if (ctx->fragment_block_id > 0) {
1438     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1439     ctx->fragment_block_id = 0;
1440   }
1441 }
1442
1443 static void
1444 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1445 {
1446   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1447
1448   gst_pad_sticky_events_foreach (ctx->srcpad,
1449       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1450
1451   /* Clear EOS flag if not actually EOS */
1452   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1453   ctx->out_eos_async_done = ctx->out_eos;
1454
1455   gst_object_unref (peer);
1456 }
1457
1458 static void
1459 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1460 {
1461   GstPad *sinkpad, *srcpad, *newpad;
1462   GstPadTemplate *templ;
1463
1464   srcpad = ctx->srcpad;
1465   sinkpad = gst_pad_get_peer (srcpad);
1466
1467   templ = sinkpad->padtemplate;
1468   newpad =
1469       gst_element_request_pad (splitmux->muxer, templ,
1470       GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1471
1472   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1473       newpad);
1474   if (!gst_pad_unlink (srcpad, sinkpad)) {
1475     gst_object_unref (sinkpad);
1476     goto fail;
1477   }
1478   if (gst_pad_link_full (srcpad, newpad,
1479           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1480     gst_element_release_request_pad (splitmux->muxer, newpad);
1481     gst_object_unref (sinkpad);
1482     gst_object_unref (newpad);
1483     goto fail;
1484   }
1485   gst_object_unref (newpad);
1486   gst_object_unref (sinkpad);
1487   return;
1488
1489 fail:
1490   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1491       ("Could not create the new muxer/sink"), NULL);
1492 }
1493
1494 static GstPadProbeReturn
1495 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1496 {
1497   return GST_PAD_PROBE_OK;
1498 }
1499
1500 static void
1501 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1502 {
1503   ctx->fragment_block_id =
1504       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1505       NULL, NULL);
1506 }
1507
1508 static gboolean
1509 _set_property_from_structure (GQuark field_id, const GValue * value,
1510     gpointer user_data)
1511 {
1512   const gchar *property_name = g_quark_to_string (field_id);
1513   GObject *element = G_OBJECT (user_data);
1514
1515   g_object_set_property (element, property_name, value);
1516
1517   return TRUE;
1518 }
1519
1520 static void
1521 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1522 {
1523   gst_element_set_locked_state (element, TRUE);
1524   gst_element_set_state (element, GST_STATE_NULL);
1525   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1526   gst_bin_remove (GST_BIN (splitmux), element);
1527 }
1528
1529
1530 static void
1531 _send_event (const GValue * value, gpointer user_data)
1532 {
1533   GstPad *pad = g_value_get_object (value);
1534   GstEvent *ev = user_data;
1535
1536   gst_pad_send_event (pad, gst_event_ref (ev));
1537 }
1538
1539 /* Called with lock held when a fragment
1540  * reaches EOS and it is time to restart
1541  * a new fragment
1542  */
1543 static void
1544 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1545 {
1546   GstElement *muxer, *sink;
1547
1548   g_assert (ctx->is_reference);
1549
1550   /* 1 change to new file */
1551   splitmux->switching_fragment = TRUE;
1552
1553   /* We need to drop the splitmux lock to acquire the state lock
1554    * here and ensure there's no racy state change going on elsewhere */
1555   muxer = gst_object_ref (splitmux->muxer);
1556   sink = gst_object_ref (splitmux->active_sink);
1557
1558   GST_SPLITMUX_UNLOCK (splitmux);
1559   GST_STATE_LOCK (splitmux);
1560
1561   if (splitmux->async_finalize) {
1562     if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1563       gchar *newname;
1564       GstElement *new_sink, *new_muxer;
1565
1566       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1567           splitmux->fragment_id);
1568       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1569       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1570       GST_SPLITMUX_LOCK (splitmux);
1571       if ((splitmux->sink =
1572               create_element (splitmux, splitmux->sink_factory, newname,
1573                   TRUE)) == NULL)
1574         goto fail;
1575       if (splitmux->sink_properties)
1576         gst_structure_foreach (splitmux->sink_properties,
1577             _set_property_from_structure, splitmux->sink);
1578       splitmux->active_sink = splitmux->sink;
1579       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1580       g_free (newname);
1581       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1582       if ((splitmux->muxer =
1583               create_element (splitmux, splitmux->muxer_factory, newname,
1584                   TRUE)) == NULL)
1585         goto fail;
1586       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1587               "async") != NULL) {
1588         /* async child elements are causing state change races and weird
1589          * failures, so let's try and turn that off */
1590         g_object_set (splitmux->sink, "async", FALSE, NULL);
1591       }
1592       if (splitmux->muxer_properties)
1593         gst_structure_foreach (splitmux->muxer_properties,
1594             _set_property_from_structure, splitmux->muxer);
1595       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1596       g_free (newname);
1597       new_sink = splitmux->sink;
1598       new_muxer = splitmux->muxer;
1599       GST_SPLITMUX_UNLOCK (splitmux);
1600       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1601       gst_element_link (new_muxer, new_sink);
1602
1603       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1604         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1605                     EOS_FROM_US)) == 2) {
1606           _lock_and_set_to_null (muxer, splitmux);
1607           _lock_and_set_to_null (sink, splitmux);
1608         } else {
1609           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1610               GINT_TO_POINTER (2));
1611         }
1612       }
1613       gst_object_unref (muxer);
1614       gst_object_unref (sink);
1615       muxer = new_muxer;
1616       sink = new_sink;
1617       gst_object_ref (muxer);
1618       gst_object_ref (sink);
1619     }
1620   } else {
1621
1622     gst_element_set_locked_state (muxer, TRUE);
1623     gst_element_set_locked_state (sink, TRUE);
1624     gst_element_set_state (sink, GST_STATE_NULL);
1625
1626     if (splitmux->reset_muxer) {
1627       gst_element_set_state (muxer, GST_STATE_NULL);
1628     } else {
1629       GstIterator *it = gst_element_iterate_sink_pads (muxer);
1630       GstEvent *ev;
1631
1632       ev = gst_event_new_flush_start ();
1633       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1634       gst_event_unref (ev);
1635
1636       gst_iterator_resync (it);
1637
1638       ev = gst_event_new_flush_stop (TRUE);
1639       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1640       gst_event_unref (ev);
1641
1642       gst_iterator_free (it);
1643     }
1644   }
1645
1646   GST_SPLITMUX_LOCK (splitmux);
1647   if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1648     set_next_filename (splitmux, ctx);
1649   splitmux->muxed_out_bytes = 0;
1650   GST_SPLITMUX_UNLOCK (splitmux);
1651
1652   gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1653   gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1654   gst_element_set_locked_state (muxer, FALSE);
1655   gst_element_set_locked_state (sink, FALSE);
1656
1657   gst_object_unref (sink);
1658   gst_object_unref (muxer);
1659
1660   GST_SPLITMUX_LOCK (splitmux);
1661   GST_STATE_UNLOCK (splitmux);
1662   splitmux->switching_fragment = FALSE;
1663   do_async_done (splitmux);
1664
1665   splitmux->ready_for_output = TRUE;
1666
1667   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1668   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1669
1670   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1671
1672   /* FIXME: Is this always the correct next state? */
1673   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1674   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1675   return;
1676
1677 fail:
1678   GST_STATE_UNLOCK (splitmux);
1679   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1680       ("Could not create the new muxer/sink"), NULL);
1681 }
1682
1683 static void
1684 bus_handler (GstBin * bin, GstMessage * message)
1685 {
1686   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1687
1688   switch (GST_MESSAGE_TYPE (message)) {
1689     case GST_MESSAGE_EOS:{
1690       /* If the state is draining out the current file, drop this EOS */
1691       GstElement *sink;
1692
1693       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1694       GST_SPLITMUX_LOCK (splitmux);
1695
1696       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1697
1698       if (splitmux->async_finalize) {
1699
1700         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1701           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1702                       EOS_FROM_US)) == 2) {
1703             GstElement *muxer;
1704             GstPad *sinksink, *muxersrc;
1705
1706             sinksink = gst_element_get_static_pad (sink, "sink");
1707             muxersrc = gst_pad_get_peer (sinksink);
1708             muxer = gst_pad_get_parent_element (muxersrc);
1709             gst_object_unref (sinksink);
1710             gst_object_unref (muxersrc);
1711
1712             gst_element_call_async (muxer,
1713                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1714                 gst_object_ref (splitmux), gst_object_unref);
1715             gst_element_call_async (sink,
1716                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1717                 gst_object_ref (splitmux), gst_object_unref);
1718             gst_object_unref (muxer);
1719           } else {
1720             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1721                 GINT_TO_POINTER (2));
1722           }
1723           GST_DEBUG_OBJECT (splitmux,
1724               "Caught async EOS from previous muxer+sink. Dropping.");
1725           /* We forward the EOS so that it gets aggregated as normal. If the sink
1726            * finishes and is removed before the end, it will be de-aggregated */
1727           gst_message_unref (message);
1728           GST_SPLITMUX_UNLOCK (splitmux);
1729           return;
1730         }
1731       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1732         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1733         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1734         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1735
1736         gst_message_unref (message);
1737         GST_SPLITMUX_UNLOCK (splitmux);
1738         return;
1739       } else {
1740         GST_DEBUG_OBJECT (splitmux,
1741             "Passing EOS message. Output state %d max_out_running_time %"
1742             GST_STIME_FORMAT, splitmux->output_state,
1743             GST_STIME_ARGS (splitmux->max_out_running_time));
1744       }
1745       GST_SPLITMUX_UNLOCK (splitmux);
1746       break;
1747     }
1748     case GST_MESSAGE_ASYNC_START:
1749     case GST_MESSAGE_ASYNC_DONE:
1750       /* Ignore state changes from our children while switching */
1751       GST_SPLITMUX_LOCK (splitmux);
1752       if (splitmux->switching_fragment) {
1753         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1754             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1755           GST_LOG_OBJECT (splitmux,
1756               "Ignoring state change from child %" GST_PTR_FORMAT
1757               " while switching", GST_MESSAGE_SRC (message));
1758           gst_message_unref (message);
1759           GST_SPLITMUX_UNLOCK (splitmux);
1760           return;
1761         }
1762       }
1763       GST_SPLITMUX_UNLOCK (splitmux);
1764       break;
1765     case GST_MESSAGE_WARNING:
1766     {
1767       GError *gerror = NULL;
1768
1769       gst_message_parse_warning (message, &gerror, NULL);
1770
1771       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1772         GList *item;
1773         gboolean caps_change = FALSE;
1774
1775         GST_SPLITMUX_LOCK (splitmux);
1776
1777         for (item = splitmux->contexts; item; item = item->next) {
1778           MqStreamCtx *ctx = item->data;
1779
1780           if (ctx->caps_change) {
1781             caps_change = TRUE;
1782             break;
1783           }
1784         }
1785
1786         GST_SPLITMUX_UNLOCK (splitmux);
1787
1788         if (caps_change) {
1789           GST_LOG_OBJECT (splitmux,
1790               "Ignoring warning change from child %" GST_PTR_FORMAT
1791               " while switching caps", GST_MESSAGE_SRC (message));
1792           gst_message_unref (message);
1793           return;
1794         }
1795       }
1796       break;
1797     }
1798     default:
1799       break;
1800   }
1801
1802   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1803 }
1804
1805 static void
1806 ctx_set_unblock (MqStreamCtx * ctx)
1807 {
1808   ctx->need_unblock = TRUE;
1809 }
1810
1811 static gboolean
1812 need_new_fragment (GstSplitMuxSink * splitmux,
1813     GstClockTime queued_time, GstClockTime queued_gop_time,
1814     guint64 queued_bytes)
1815 {
1816   guint64 thresh_bytes;
1817   GstClockTime thresh_time;
1818   gboolean check_robust_muxing;
1819
1820   GST_OBJECT_LOCK (splitmux);
1821   thresh_bytes = splitmux->threshold_bytes;
1822   thresh_time = splitmux->threshold_time;
1823   check_robust_muxing = splitmux->use_robust_muxing
1824       && splitmux->muxer_has_reserved_props;
1825   GST_OBJECT_UNLOCK (splitmux);
1826
1827   /* Have we muxed anything into the new file at all? */
1828   if (splitmux->fragment_total_bytes <= 0)
1829     return FALSE;
1830
1831   /* User told us to split now */
1832   if (g_atomic_int_get (&(splitmux->split_now)) == TRUE)
1833     return TRUE;
1834
1835   if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1836     return TRUE;                /* Would overrun byte limit */
1837
1838   if (thresh_time > 0 && queued_time > thresh_time)
1839     return TRUE;                /* Would overrun byte limit */
1840
1841   /* Timecode-based threshold accounts for possible rounding errors:
1842    * 5us should be bigger than all possible rounding errors but nowhere near
1843    * big enough to skip to another frame */
1844   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1845       splitmux->reference_ctx->in_running_time >
1846       splitmux->next_max_tc_time + 5 * GST_USECOND)
1847     return TRUE;                /* Timecode threshold */
1848
1849   if (check_robust_muxing) {
1850     GstClockTime mux_reserved_remain;
1851
1852     g_object_get (splitmux->muxer,
1853         "reserved-duration-remaining", &mux_reserved_remain, NULL);
1854
1855     GST_LOG_OBJECT (splitmux,
1856         "Muxer robust muxing report - %" G_GUINT64_FORMAT
1857         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1858         mux_reserved_remain, queued_gop_time);
1859
1860     if (queued_gop_time >= mux_reserved_remain) {
1861       GST_INFO_OBJECT (splitmux,
1862           "File is about to run out of header room - %" G_GUINT64_FORMAT
1863           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1864           ". Switching to new file", mux_reserved_remain, queued_gop_time);
1865       return TRUE;
1866     }
1867   }
1868
1869   /* Continue and mux this GOP */
1870   return FALSE;
1871 }
1872
1873 /* Called with splitmux lock held */
1874 /* Called when entering ProcessingCompleteGop state
1875  * Assess if mq contents overflowed the current file
1876  *   -> If yes, need to switch to new file
1877  *   -> if no, set max_out_running_time to let this GOP in and
1878  *      go to COLLECTING_GOP_START state
1879  */
1880 static void
1881 handle_gathered_gop (GstSplitMuxSink * splitmux)
1882 {
1883   guint64 queued_bytes;
1884   GstClockTimeDiff queued_time = 0;
1885   GstClockTimeDiff queued_gop_time = 0;
1886   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1887   SplitMuxOutputCommand *cmd;
1888
1889   /* Assess if the multiqueue contents overflowed the current file */
1890   /* When considering if a newly gathered GOP overflows
1891    * the time limit for the file, only consider the running time of the
1892    * reference stream. Other streams might have run ahead a little bit,
1893    * but extra pieces won't be released to the muxer beyond the reference
1894    * stream cut-off anyway - so it forms the limit. */
1895   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1896   queued_time = splitmux->reference_ctx->in_running_time;
1897   /* queued_gop_time tracks how much unwritten data there is waiting to
1898    * be written to this fragment including this GOP */
1899   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
1900     queued_gop_time =
1901         splitmux->reference_ctx->in_running_time -
1902         splitmux->reference_ctx->out_running_time;
1903   else
1904     queued_gop_time =
1905         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
1906
1907   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1908
1909   g_assert (queued_gop_time >= 0);
1910   g_assert (queued_time >= splitmux->fragment_start_time);
1911
1912   queued_time -= splitmux->fragment_start_time;
1913   if (queued_time < queued_gop_time)
1914     queued_gop_time = queued_time;
1915
1916   /* Expand queued bytes estimate by muxer overhead */
1917   queued_bytes += (queued_bytes * splitmux->mux_overhead);
1918
1919   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1920       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1921   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1922     GST_LOG_OBJECT (splitmux,
1923         "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1924         GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1925         GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1926   }
1927
1928   /* Check for overrun - have we output at least one byte and overrun
1929    * either threshold? */
1930   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
1931     GstClockTime *sink_running_time = g_new (GstClockTime, 1);
1932     *sink_running_time = splitmux->reference_ctx->out_running_time;
1933     g_object_set_qdata_full (G_OBJECT (splitmux->sink),
1934         RUNNING_TIME, sink_running_time, g_free);
1935     g_atomic_int_set (&(splitmux->split_now), FALSE);
1936     /* Tell the output side to start a new fragment */
1937     GST_INFO_OBJECT (splitmux,
1938         "This GOP (dur %" GST_STIME_FORMAT
1939         ") would overflow the fragment, Sending start_new_fragment cmd",
1940         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1941             splitmux->gop_start_time));
1942     cmd = out_cmd_buf_new ();
1943     cmd->start_new_fragment = TRUE;
1944     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1945     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1946
1947     new_out_ts = splitmux->reference_ctx->in_running_time;
1948     splitmux->fragment_start_time = splitmux->gop_start_time;
1949     splitmux->fragment_total_bytes = 0;
1950
1951     if (request_next_keyframe (splitmux,
1952             splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1953       GST_WARNING_OBJECT (splitmux,
1954           "Could not request a keyframe. Files may not split at the exact location they should");
1955     }
1956     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1957   }
1958
1959   /* And set up to collect the next GOP */
1960   if (!splitmux->reference_ctx->in_eos) {
1961     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1962     splitmux->gop_start_time = new_out_ts;
1963   } else {
1964     /* This is probably already the current state, but just in case: */
1965     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1966     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
1967   }
1968
1969   /* And wake all input contexts to send a wake-up event */
1970   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1971   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1972
1973   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1974   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1975
1976   if (splitmux->gop_total_bytes > 0) {
1977     GST_LOG_OBJECT (splitmux,
1978         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1979         " time %" GST_STIME_FORMAT,
1980         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1981
1982     /* Send this GOP to the output command queue */
1983     cmd = out_cmd_buf_new ();
1984     cmd->start_new_fragment = FALSE;
1985     cmd->max_output_ts = new_out_ts;
1986     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1987         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1988     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1989
1990     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1991   }
1992
1993   splitmux->gop_total_bytes = 0;
1994 }
1995
1996 /* Called with splitmux lock held */
1997 /* Called from each input pad when it is has all the pieces
1998  * for a GOP or EOS, starting with the reference pad which has set the
1999  * splitmux->max_in_running_time
2000  */
2001 static void
2002 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2003 {
2004   GList *cur;
2005   GstEvent *event;
2006
2007   /* On ENDING_FILE, the reference stream sends a command to start a new
2008    * fragment, then releases the GOP for output in the new fragment.
2009    *  If somes streams received no buffer during the last GOP that overran,
2010    * because its next buffer has a timestamp bigger than
2011    * ctx->max_in_running_time, its queue is empty. In that case the only
2012    * way to wakeup the output thread is by injecting an event in the
2013    * queue. This usually happen with subtitle streams.
2014    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2015   if (ctx->need_unblock) {
2016     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2017     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2018         GST_EVENT_TYPE_SERIALIZED,
2019         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2020             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2021
2022     GST_SPLITMUX_UNLOCK (splitmux);
2023     gst_pad_send_event (ctx->sinkpad, event);
2024     GST_SPLITMUX_LOCK (splitmux);
2025
2026     ctx->need_unblock = FALSE;
2027     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2028     /* state may have changed while we were unlocked. Loop again if so */
2029     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2030       return;
2031   }
2032
2033   if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2034     gboolean ready = TRUE;
2035
2036     /* Iterate each pad, and check that the input running time is at least
2037      * up to the reference running time, and if so handle the collected GOP */
2038     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2039         GST_STIME_FORMAT " ctx %p",
2040         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2041     for (cur = g_list_first (splitmux->contexts); cur != NULL;
2042         cur = g_list_next (cur)) {
2043       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2044
2045       GST_LOG_OBJECT (splitmux,
2046           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2047           " EOS %d", tmpctx, tmpctx->srcpad,
2048           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2049
2050       if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2051           tmpctx->in_running_time < splitmux->max_in_running_time &&
2052           !tmpctx->in_eos) {
2053         GST_LOG_OBJECT (splitmux,
2054             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2055             tmpctx, tmpctx->srcpad);
2056         ready = FALSE;
2057         break;
2058       }
2059     }
2060     if (ready) {
2061       GST_DEBUG_OBJECT (splitmux,
2062           "Collected GOP is complete. Processing (ctx %p)", ctx);
2063       /* All pads have a complete GOP, release it into the multiqueue */
2064       handle_gathered_gop (splitmux);
2065     }
2066   }
2067
2068   /* If upstream reached EOS we are not expecting more data, no need to wait
2069    * here. */
2070   if (ctx->in_eos)
2071     return;
2072
2073   /* Some pad is not yet ready, or GOP is being pushed
2074    * either way, sleep and wait to get woken */
2075   while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2076       !ctx->flushing &&
2077       (ctx->in_running_time >= splitmux->max_in_running_time) &&
2078       (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2079
2080     GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2081     GST_SPLITMUX_WAIT_INPUT (splitmux);
2082     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2083   }
2084 }
2085
2086 static GstPadProbeReturn
2087 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2088 {
2089   GstSplitMuxSink *splitmux = ctx->splitmux;
2090   GstBuffer *buf;
2091   MqStreamBuf *buf_info = NULL;
2092   GstClockTime ts;
2093   gboolean loop_again;
2094   gboolean keyframe = FALSE;
2095
2096   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2097
2098   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2099   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2100     g_warning ("Buffer list handling not implemented");
2101     return GST_PAD_PROBE_DROP;
2102   }
2103   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2104       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2105     GstEvent *event = gst_pad_probe_info_get_event (info);
2106
2107     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2108
2109     switch (GST_EVENT_TYPE (event)) {
2110       case GST_EVENT_SEGMENT:
2111         gst_event_copy_segment (event, &ctx->in_segment);
2112         break;
2113       case GST_EVENT_FLUSH_STOP:
2114         GST_SPLITMUX_LOCK (splitmux);
2115         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2116         ctx->in_eos = FALSE;
2117         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2118         GST_SPLITMUX_UNLOCK (splitmux);
2119         break;
2120       case GST_EVENT_EOS:
2121         GST_SPLITMUX_LOCK (splitmux);
2122         ctx->in_eos = TRUE;
2123
2124         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2125           goto beach;
2126
2127         if (ctx->is_reference) {
2128           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2129           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2130           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2131           /* Wake up other input pads to collect this GOP */
2132           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2133           check_completed_gop (splitmux, ctx);
2134         } else if (splitmux->input_state ==
2135             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2136           /* If we are waiting for a GOP to be completed (ie, for aux
2137            * pads to catch up), then this pad is complete, so check
2138            * if the whole GOP is.
2139            */
2140           check_completed_gop (splitmux, ctx);
2141         }
2142         GST_SPLITMUX_UNLOCK (splitmux);
2143         break;
2144       case GST_EVENT_GAP:{
2145         GstClockTime gap_ts;
2146         GstClockTimeDiff rtime;
2147
2148         gst_event_parse_gap (event, &gap_ts, NULL);
2149         if (gap_ts == GST_CLOCK_TIME_NONE)
2150           break;
2151
2152         GST_SPLITMUX_LOCK (splitmux);
2153
2154         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2155           goto beach;
2156         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2157
2158         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2159             GST_STIME_ARGS (rtime));
2160
2161         if (ctx->is_reference
2162             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2163           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2164           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2165               GST_STIME_ARGS (splitmux->fragment_start_time));
2166           /* Also take this as the first start time when starting up,
2167            * so that we start counting overflow from the first frame */
2168           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2169             splitmux->max_in_running_time = splitmux->fragment_start_time;
2170         }
2171
2172         GST_SPLITMUX_UNLOCK (splitmux);
2173         break;
2174       }
2175       default:
2176         break;
2177     }
2178     return GST_PAD_PROBE_PASS;
2179   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2180     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2181       case GST_QUERY_ALLOCATION:
2182         return GST_PAD_PROBE_DROP;
2183       default:
2184         return GST_PAD_PROBE_PASS;
2185     }
2186   }
2187
2188   buf = gst_pad_probe_info_get_buffer (info);
2189   buf_info = mq_stream_buf_new ();
2190
2191   if (GST_BUFFER_PTS_IS_VALID (buf))
2192     ts = GST_BUFFER_PTS (buf);
2193   else
2194     ts = GST_BUFFER_DTS (buf);
2195
2196   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2197
2198   GST_SPLITMUX_LOCK (splitmux);
2199
2200   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2201     goto beach;
2202
2203   /* If this buffer has a timestamp, advance the input timestamp of the
2204    * stream */
2205   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2206     GstClockTimeDiff running_time =
2207         my_segment_to_running_time (&ctx->in_segment, ts);
2208
2209     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2210         GST_STIME_ARGS (running_time));
2211
2212     if (GST_CLOCK_STIME_IS_VALID (running_time)
2213         && running_time > ctx->in_running_time)
2214       ctx->in_running_time = running_time;
2215   }
2216
2217   /* Try to make sure we have a valid running time */
2218   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2219     ctx->in_running_time =
2220         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2221   }
2222
2223   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2224       GST_STIME_ARGS (ctx->in_running_time));
2225
2226   buf_info->run_ts = ctx->in_running_time;
2227   buf_info->buf_size = gst_buffer_get_size (buf);
2228   buf_info->duration = GST_BUFFER_DURATION (buf);
2229
2230   /* initialize fragment_start_time */
2231   if (ctx->is_reference
2232       && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2233     splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2234     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2235         GST_STIME_ARGS (splitmux->fragment_start_time));
2236     gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2237
2238     /* Also take this as the first start time when starting up,
2239      * so that we start counting overflow from the first frame */
2240     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2241       splitmux->max_in_running_time = splitmux->fragment_start_time;
2242     if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2243       GST_WARNING_OBJECT (splitmux,
2244           "Could not request a keyframe. Files may not split at the exact location they should");
2245     }
2246     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2247   }
2248
2249   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2250       " total GOP bytes %" G_GUINT64_FORMAT,
2251       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2252
2253   loop_again = TRUE;
2254   do {
2255     if (ctx->flushing)
2256       break;
2257
2258     switch (splitmux->input_state) {
2259       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2260         if (ctx->is_reference) {
2261           /* This is the reference context. If it's a keyframe,
2262            * it marks the start of a new GOP and we should wait in
2263            * check_completed_gop before continuing, but either way
2264            * (keyframe or no, we'll pass this buffer through after
2265            * so set loop_again to FALSE */
2266           loop_again = FALSE;
2267
2268           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2269             /* Allow other input pads to catch up to here too */
2270             splitmux->max_in_running_time = ctx->in_running_time;
2271             GST_LOG_OBJECT (splitmux,
2272                 "Max in running time now %" GST_TIME_FORMAT,
2273                 GST_TIME_ARGS (splitmux->max_in_running_time));
2274             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2275             break;
2276           }
2277           GST_INFO_OBJECT (pad,
2278               "Have keyframe with running time %" GST_STIME_FORMAT,
2279               GST_STIME_ARGS (ctx->in_running_time));
2280           keyframe = TRUE;
2281           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2282           splitmux->max_in_running_time = ctx->in_running_time;
2283           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2284               GST_TIME_ARGS (splitmux->max_in_running_time));
2285           /* Wake up other input pads to collect this GOP */
2286           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2287           check_completed_gop (splitmux, ctx);
2288           /* Store this new keyframe to remember the start of GOP */
2289           gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2290         } else {
2291           /* Pass this buffer if the reference ctx is far enough ahead */
2292           if (ctx->in_running_time < splitmux->max_in_running_time) {
2293             loop_again = FALSE;
2294             break;
2295           }
2296
2297           /* We're still waiting for a keyframe on the reference pad, sleep */
2298           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2299           GST_SPLITMUX_WAIT_INPUT (splitmux);
2300           GST_LOG_OBJECT (pad,
2301               "Done sleeping for GOP start input state now %d",
2302               splitmux->input_state);
2303         }
2304         break;
2305       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2306         /* We're collecting a GOP. If this is the reference context,
2307          * we need to check if this is a keyframe that marks the start
2308          * of the next GOP. If it is, it marks the end of the GOP we're
2309          * collecting, so sleep and wait until all the other pads also
2310          * reach that timestamp - at which point, we have an entire GOP
2311          * and either go to ENDING_FILE or release this GOP to the muxer and
2312          * go back to COLLECT_GOP_START. */
2313
2314         /* If we overran the target timestamp, it might be time to process
2315          * the GOP, otherwise bail out for more data
2316          */
2317         GST_LOG_OBJECT (pad,
2318             "Checking TS %" GST_STIME_FORMAT " against max %"
2319             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2320             GST_STIME_ARGS (splitmux->max_in_running_time));
2321
2322         if (ctx->in_running_time < splitmux->max_in_running_time) {
2323           loop_again = FALSE;
2324           break;
2325         }
2326
2327         GST_LOG_OBJECT (pad,
2328             "Collected last packet of GOP. Checking other pads");
2329         check_completed_gop (splitmux, ctx);
2330         break;
2331       }
2332       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2333         loop_again = FALSE;
2334         break;
2335       default:
2336         loop_again = FALSE;
2337         break;
2338     }
2339   }
2340   while (loop_again);
2341
2342   if (keyframe) {
2343     splitmux->queued_keyframes++;
2344     buf_info->keyframe = TRUE;
2345   }
2346
2347   /* Update total input byte counter for overflow detect */
2348   splitmux->gop_total_bytes += buf_info->buf_size;
2349
2350   /* Now add this buffer to the queue just before returning */
2351   g_queue_push_head (&ctx->queued_bufs, buf_info);
2352
2353   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2354       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2355
2356   GST_SPLITMUX_UNLOCK (splitmux);
2357   return GST_PAD_PROBE_PASS;
2358
2359 beach:
2360   GST_SPLITMUX_UNLOCK (splitmux);
2361   if (buf_info)
2362     mq_stream_buf_free (buf_info);
2363   return GST_PAD_PROBE_PASS;
2364 }
2365
2366 static void
2367 grow_blocked_queues (GstSplitMuxSink * splitmux)
2368 {
2369   GList *cur;
2370
2371   /* Scan other queues for full-ness and grow them */
2372   for (cur = g_list_first (splitmux->contexts);
2373       cur != NULL; cur = g_list_next (cur)) {
2374     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2375     guint cur_limit;
2376     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2377
2378     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2379     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2380
2381     if (cur_len >= cur_limit) {
2382       cur_limit = cur_len + 1;
2383       GST_DEBUG_OBJECT (tmpctx->q,
2384           "Queue overflowed and needs enlarging. Growing to %u buffers",
2385           cur_limit);
2386       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2387     }
2388   }
2389 }
2390
2391 static void
2392 handle_q_underrun (GstElement * q, gpointer user_data)
2393 {
2394   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2395   GstSplitMuxSink *splitmux = ctx->splitmux;
2396
2397   GST_SPLITMUX_LOCK (splitmux);
2398   GST_DEBUG_OBJECT (q,
2399       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2400       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2401   grow_blocked_queues (splitmux);
2402   GST_SPLITMUX_UNLOCK (splitmux);
2403 }
2404
2405 static void
2406 handle_q_overrun (GstElement * q, gpointer user_data)
2407 {
2408   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2409   GstSplitMuxSink *splitmux = ctx->splitmux;
2410   gboolean allow_grow = FALSE;
2411
2412   GST_SPLITMUX_LOCK (splitmux);
2413   GST_DEBUG_OBJECT (q,
2414       "Queue reported overrun with %d keyframes and %d cmds enqueued",
2415       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2416
2417   if (splitmux->queued_keyframes < 2) {
2418     /* Less than a full GOP queued, grow the queue */
2419     allow_grow = TRUE;
2420   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2421     allow_grow = TRUE;
2422   } else {
2423     /* If another queue is starved, grow */
2424     GList *cur;
2425     for (cur = g_list_first (splitmux->contexts);
2426         cur != NULL; cur = g_list_next (cur)) {
2427       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2428       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2429         allow_grow = TRUE;
2430       }
2431     }
2432   }
2433   GST_SPLITMUX_UNLOCK (splitmux);
2434
2435   if (allow_grow) {
2436     guint cur_limit;
2437
2438     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2439     cur_limit++;
2440
2441     GST_DEBUG_OBJECT (q,
2442         "Queue overflowed and needs enlarging. Growing to %u buffers",
2443         cur_limit);
2444
2445     g_object_set (q, "max-size-buffers", cur_limit, NULL);
2446   }
2447 }
2448
2449 static GstPad *
2450 gst_splitmux_sink_request_new_pad (GstElement * element,
2451     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2452 {
2453   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2454   GstPadTemplate *mux_template = NULL;
2455   GstPad *res = NULL;
2456   GstElement *q;
2457   GstPad *q_sink = NULL, *q_src = NULL;
2458   gchar *gname;
2459   gboolean is_video = FALSE;
2460   MqStreamCtx *ctx;
2461
2462   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2463
2464   GST_SPLITMUX_LOCK (splitmux);
2465   if (!create_muxer (splitmux))
2466     goto fail;
2467   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2468
2469   if (templ->name_template) {
2470     if (g_str_equal (templ->name_template, "video")) {
2471       if (splitmux->have_video)
2472         goto already_have_video;
2473
2474       /* FIXME: Look for a pad template with matching caps, rather than by name */
2475       mux_template =
2476           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2477           (splitmux->muxer), "video_%u");
2478
2479       /* Fallback to find sink pad templates named 'video' (flvmux) */
2480       if (!mux_template) {
2481         mux_template =
2482             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2483             (splitmux->muxer), "video");
2484       }
2485       is_video = TRUE;
2486       name = NULL;
2487     } else {
2488       mux_template =
2489           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2490           (splitmux->muxer), templ->name_template);
2491
2492       /* Fallback to find sink pad templates named 'audio' (flvmux) */
2493       if (!mux_template) {
2494         mux_template =
2495             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2496             (splitmux->muxer), "audio");
2497         name = NULL;
2498       }
2499     }
2500     if (mux_template == NULL) {
2501       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
2502       mux_template =
2503           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2504           (splitmux->muxer), "sink_%d");
2505       name = NULL;
2506     }
2507   }
2508
2509   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2510   if (res == NULL)
2511     goto fail;
2512
2513   if (is_video)
2514     gname = g_strdup ("video");
2515   else if (name == NULL)
2516     gname = gst_pad_get_name (res);
2517   else
2518     gname = g_strdup (name);
2519
2520   if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2521     goto fail;
2522
2523   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2524
2525   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2526       "max-size-buffers", 5, NULL);
2527
2528   q_sink = gst_element_get_static_pad (q, "sink");
2529   q_src = gst_element_get_static_pad (q, "src");
2530
2531   if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2532     gst_element_release_request_pad (splitmux->muxer, res);
2533     gst_object_unref (GST_OBJECT (res));
2534     goto fail;
2535   }
2536
2537   gst_object_unref (GST_OBJECT (res));
2538
2539   ctx = mq_stream_ctx_new (splitmux);
2540   /* Context holds a ref: */
2541   ctx->q = gst_object_ref (q);
2542   ctx->srcpad = q_src;
2543   ctx->sinkpad = q_sink;
2544   ctx->q_overrun_id =
2545       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2546   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2547
2548   ctx->src_pad_block_id =
2549       gst_pad_add_probe (q_src,
2550       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2551       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2552   if (is_video && splitmux->reference_ctx != NULL) {
2553     splitmux->reference_ctx->is_reference = FALSE;
2554     splitmux->reference_ctx = NULL;
2555   }
2556   if (splitmux->reference_ctx == NULL) {
2557     splitmux->reference_ctx = ctx;
2558     ctx->is_reference = TRUE;
2559   }
2560
2561   res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2562   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2563
2564   ctx->sink_pad_block_id =
2565       gst_pad_add_probe (q_sink,
2566       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2567       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2568       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2569
2570   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2571       " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2572
2573   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2574
2575   g_free (gname);
2576
2577   if (is_video)
2578     splitmux->have_video = TRUE;
2579
2580   gst_pad_set_active (res, TRUE);
2581   gst_element_add_pad (element, res);
2582
2583   GST_SPLITMUX_UNLOCK (splitmux);
2584
2585   return res;
2586 fail:
2587   GST_SPLITMUX_UNLOCK (splitmux);
2588
2589   if (q_sink)
2590     gst_object_unref (q_sink);
2591   if (q_src)
2592     gst_object_unref (q_src);
2593   return NULL;
2594 already_have_video:
2595   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2596   GST_SPLITMUX_UNLOCK (splitmux);
2597   return NULL;
2598 }
2599
2600 static void
2601 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2602 {
2603   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2604   GstPad *muxpad = NULL;
2605   MqStreamCtx *ctx =
2606       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2607
2608   GST_SPLITMUX_LOCK (splitmux);
2609
2610   if (splitmux->muxer == NULL)
2611     goto fail;                  /* Elements don't exist yet - nothing to release */
2612
2613   GST_INFO_OBJECT (pad, "releasing request pad");
2614
2615   muxpad = gst_pad_get_peer (ctx->srcpad);
2616
2617   /* Remove the context from our consideration */
2618   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2619
2620   if (ctx->sink_pad_block_id)
2621     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2622
2623   if (ctx->src_pad_block_id)
2624     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2625
2626   /* Can release the context now */
2627   mq_stream_ctx_free (ctx);
2628   if (ctx == splitmux->reference_ctx)
2629     splitmux->reference_ctx = NULL;
2630
2631   /* Release and free the muxer input */
2632   if (muxpad) {
2633     gst_element_release_request_pad (splitmux->muxer, muxpad);
2634     gst_object_unref (muxpad);
2635   }
2636
2637   if (GST_PAD_PAD_TEMPLATE (pad) &&
2638       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2639               (pad)), "video"))
2640     splitmux->have_video = FALSE;
2641
2642   gst_element_remove_pad (element, pad);
2643
2644   /* Reset the internal elements only after all request pads are released */
2645   if (splitmux->contexts == NULL)
2646     gst_splitmux_reset (splitmux);
2647
2648 fail:
2649   GST_SPLITMUX_UNLOCK (splitmux);
2650 }
2651
2652 static GstElement *
2653 create_element (GstSplitMuxSink * splitmux,
2654     const gchar * factory, const gchar * name, gboolean locked)
2655 {
2656   GstElement *ret = gst_element_factory_make (factory, name);
2657   if (ret == NULL) {
2658     g_warning ("Failed to create %s - splitmuxsink will not work", name);
2659     return NULL;
2660   }
2661
2662   if (locked) {
2663     /* Ensure the sink starts in locked state and NULL - it will be changed
2664      * by the filename setting code */
2665     gst_element_set_locked_state (ret, TRUE);
2666     gst_element_set_state (ret, GST_STATE_NULL);
2667   }
2668
2669   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2670     g_warning ("Could not add %s element - splitmuxsink will not work", name);
2671     gst_object_unref (ret);
2672     return NULL;
2673   }
2674
2675   return ret;
2676 }
2677
2678 static gboolean
2679 create_muxer (GstSplitMuxSink * splitmux)
2680 {
2681   /* Create internal elements */
2682   if (splitmux->muxer == NULL) {
2683     GstElement *provided_muxer = NULL;
2684
2685     GST_OBJECT_LOCK (splitmux);
2686     if (splitmux->provided_muxer != NULL)
2687       provided_muxer = gst_object_ref (splitmux->provided_muxer);
2688     GST_OBJECT_UNLOCK (splitmux);
2689
2690     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2691         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2692       if ((splitmux->muxer =
2693               create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2694         goto fail;
2695     } else if (splitmux->async_finalize) {
2696       if ((splitmux->muxer =
2697               create_element (splitmux, splitmux->muxer_factory, "muxer",
2698                   FALSE)) == NULL)
2699         goto fail;
2700       if (splitmux->muxer_properties)
2701         gst_structure_foreach (splitmux->muxer_properties,
2702             _set_property_from_structure, splitmux->muxer);
2703     } else {
2704       /* Ensure it's not in locked state (we might be reusing an old element) */
2705       gst_element_set_locked_state (provided_muxer, FALSE);
2706       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2707         g_warning ("Could not add muxer element - splitmuxsink will not work");
2708         gst_object_unref (provided_muxer);
2709         goto fail;
2710       }
2711
2712       splitmux->muxer = provided_muxer;
2713       gst_object_unref (provided_muxer);
2714     }
2715
2716     if (splitmux->use_robust_muxing) {
2717       update_muxer_properties (splitmux);
2718     }
2719   }
2720
2721   return TRUE;
2722 fail:
2723   return FALSE;
2724 }
2725
2726 static GstElement *
2727 find_sink (GstElement * e)
2728 {
2729   GstElement *res = NULL;
2730   GstIterator *iter;
2731   gboolean done = FALSE;
2732   GValue data = { 0, };
2733
2734   if (!GST_IS_BIN (e))
2735     return e;
2736
2737   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2738     return e;
2739
2740   iter = gst_bin_iterate_sinks (GST_BIN (e));
2741   while (!done) {
2742     switch (gst_iterator_next (iter, &data)) {
2743       case GST_ITERATOR_OK:
2744       {
2745         GstElement *child = g_value_get_object (&data);
2746         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2747                 "location") != NULL) {
2748           res = child;
2749           done = TRUE;
2750         }
2751         g_value_reset (&data);
2752         break;
2753       }
2754       case GST_ITERATOR_RESYNC:
2755         gst_iterator_resync (iter);
2756         break;
2757       case GST_ITERATOR_DONE:
2758         done = TRUE;
2759         break;
2760       case GST_ITERATOR_ERROR:
2761         g_assert_not_reached ();
2762         break;
2763     }
2764   }
2765   g_value_unset (&data);
2766   gst_iterator_free (iter);
2767
2768   return res;
2769 }
2770
2771 static gboolean
2772 create_sink (GstSplitMuxSink * splitmux)
2773 {
2774   GstElement *provided_sink = NULL;
2775
2776   if (splitmux->active_sink == NULL) {
2777
2778     GST_OBJECT_LOCK (splitmux);
2779     if (splitmux->provided_sink != NULL)
2780       provided_sink = gst_object_ref (splitmux->provided_sink);
2781     GST_OBJECT_UNLOCK (splitmux);
2782
2783     if ((!splitmux->async_finalize && provided_sink == NULL) ||
2784         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2785       if ((splitmux->sink =
2786               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2787         goto fail;
2788       splitmux->active_sink = splitmux->sink;
2789     } else if (splitmux->async_finalize) {
2790       if ((splitmux->sink =
2791               create_element (splitmux, splitmux->sink_factory, "sink",
2792                   TRUE)) == NULL)
2793         goto fail;
2794       if (splitmux->sink_properties)
2795         gst_structure_foreach (splitmux->sink_properties,
2796             _set_property_from_structure, splitmux->sink);
2797       splitmux->active_sink = splitmux->sink;
2798     } else {
2799       /* Ensure the sink starts in locked state and NULL - it will be changed
2800        * by the filename setting code */
2801       gst_element_set_locked_state (provided_sink, TRUE);
2802       gst_element_set_state (provided_sink, GST_STATE_NULL);
2803       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2804         g_warning ("Could not add sink elements - splitmuxsink will not work");
2805         gst_object_unref (provided_sink);
2806         goto fail;
2807       }
2808
2809       splitmux->active_sink = provided_sink;
2810
2811       /* The bin holds a ref now, we can drop our tmp ref */
2812       gst_object_unref (provided_sink);
2813
2814       /* Find the sink element */
2815       splitmux->sink = find_sink (splitmux->active_sink);
2816       if (splitmux->sink == NULL) {
2817         g_warning
2818             ("Could not locate sink element in provided sink - splitmuxsink will not work");
2819         goto fail;
2820       }
2821     }
2822
2823 #if 1
2824     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2825             "async") != NULL) {
2826       /* async child elements are causing state change races and weird
2827        * failures, so let's try and turn that off */
2828       g_object_set (splitmux->sink, "async", FALSE, NULL);
2829     }
2830 #endif
2831
2832     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2833       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2834       goto fail;
2835     }
2836   }
2837
2838   return TRUE;
2839 fail:
2840   return FALSE;
2841 }
2842
2843 #ifdef __GNUC__
2844 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2845 #endif
2846 static void
2847 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2848 {
2849   gchar *fname = NULL;
2850   GstSample *sample;
2851   GstCaps *caps;
2852
2853   gst_splitmux_sink_ensure_max_files (splitmux);
2854
2855   if (ctx->cur_out_buffer == NULL) {
2856     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2857   }
2858
2859   caps = gst_pad_get_current_caps (ctx->srcpad);
2860   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2861   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2862       splitmux->fragment_id, sample, &fname);
2863   gst_sample_unref (sample);
2864   if (caps)
2865     gst_caps_unref (caps);
2866
2867   if (fname == NULL) {
2868     /* Fallback to the old signal if the new one returned nothing */
2869     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2870         splitmux->fragment_id, &fname);
2871   }
2872
2873   if (!fname)
2874     fname = splitmux->location ?
2875         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2876
2877   if (fname) {
2878     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2879     g_object_set (splitmux->sink, "location", fname, NULL);
2880     g_free (fname);
2881
2882     splitmux->fragment_id++;
2883   }
2884 }
2885
2886 static void
2887 do_async_start (GstSplitMuxSink * splitmux)
2888 {
2889   GstMessage *message;
2890
2891   if (!splitmux->need_async_start) {
2892     GST_INFO_OBJECT (splitmux, "no async_start needed");
2893     return;
2894   }
2895
2896   splitmux->async_pending = TRUE;
2897
2898   GST_INFO_OBJECT (splitmux, "Sending async_start message");
2899   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2900   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2901       (splitmux), message);
2902 }
2903
2904 static void
2905 do_async_done (GstSplitMuxSink * splitmux)
2906 {
2907   GstMessage *message;
2908
2909   if (splitmux->async_pending) {
2910     GST_INFO_OBJECT (splitmux, "Sending async_done message");
2911     message =
2912         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2913         GST_CLOCK_TIME_NONE);
2914     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2915         (splitmux), message);
2916
2917     splitmux->async_pending = FALSE;
2918   }
2919
2920   splitmux->need_async_start = FALSE;
2921 }
2922
2923 static GstStateChangeReturn
2924 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2925 {
2926   GstStateChangeReturn ret;
2927   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2928
2929   switch (transition) {
2930     case GST_STATE_CHANGE_NULL_TO_READY:{
2931       GST_SPLITMUX_LOCK (splitmux);
2932       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2933         ret = GST_STATE_CHANGE_FAILURE;
2934         GST_SPLITMUX_UNLOCK (splitmux);
2935         goto beach;
2936       }
2937       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2938       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2939       GST_SPLITMUX_UNLOCK (splitmux);
2940       splitmux->fragment_id = 0;
2941       break;
2942     }
2943     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2944       GST_SPLITMUX_LOCK (splitmux);
2945       /* Start by collecting one input on each pad */
2946       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2947       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2948       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2949       splitmux->gop_start_time = splitmux->fragment_start_time =
2950           GST_CLOCK_STIME_NONE;
2951       splitmux->muxed_out_bytes = 0;
2952       splitmux->ready_for_output = FALSE;
2953       GST_SPLITMUX_UNLOCK (splitmux);
2954       break;
2955     }
2956     case GST_STATE_CHANGE_PAUSED_TO_READY:
2957       g_atomic_int_set (&(splitmux->split_now), FALSE);
2958     case GST_STATE_CHANGE_READY_TO_NULL:
2959       GST_SPLITMUX_LOCK (splitmux);
2960       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2961       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2962       /* Wake up any blocked threads */
2963       GST_LOG_OBJECT (splitmux,
2964           "State change -> NULL or READY. Waking threads");
2965       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2966       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2967       GST_SPLITMUX_UNLOCK (splitmux);
2968       break;
2969     default:
2970       break;
2971   }
2972
2973   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2974   if (ret == GST_STATE_CHANGE_FAILURE)
2975     goto beach;
2976
2977   switch (transition) {
2978     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2979       splitmux->need_async_start = TRUE;
2980       break;
2981     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2982       /* Change state async, because our child sink might not
2983        * be ready to do that for us yet if it's state is still locked */
2984
2985       splitmux->need_async_start = TRUE;
2986       /* we want to go async to PAUSED until we managed to configure and add the
2987        * sink */
2988       GST_SPLITMUX_LOCK (splitmux);
2989       do_async_start (splitmux);
2990       GST_SPLITMUX_UNLOCK (splitmux);
2991       ret = GST_STATE_CHANGE_ASYNC;
2992       break;
2993     }
2994     case GST_STATE_CHANGE_READY_TO_NULL:
2995       GST_SPLITMUX_LOCK (splitmux);
2996       splitmux->fragment_id = 0;
2997       /* Reset internal elements only if no pad contexts are using them */
2998       if (splitmux->contexts == NULL)
2999         gst_splitmux_reset (splitmux);
3000       do_async_done (splitmux);
3001       GST_SPLITMUX_UNLOCK (splitmux);
3002       break;
3003     default:
3004       break;
3005   }
3006
3007 beach:
3008   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3009       ret == GST_STATE_CHANGE_FAILURE) {
3010     /* Cleanup elements on failed transition out of NULL */
3011     gst_splitmux_reset (splitmux);
3012     GST_SPLITMUX_LOCK (splitmux);
3013     do_async_done (splitmux);
3014     GST_SPLITMUX_UNLOCK (splitmux);
3015   }
3016   return ret;
3017 }
3018
3019 gboolean
3020 register_splitmuxsink (GstPlugin * plugin)
3021 {
3022   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3023       "Split File Muxing Sink");
3024
3025   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3026       GST_TYPE_SPLITMUX_SINK);
3027 }
3028
3029 static void
3030 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3031 {
3032   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3033     splitmux->fragment_id = 0;
3034   }
3035 }
3036
3037 static void
3038 split_now (GstSplitMuxSink * splitmux)
3039 {
3040   g_atomic_int_set (&(splitmux->split_now), TRUE);
3041 }