mp4 robust muxing: improve documentation and logging
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * In the async-finalize mode, when the threshold is crossed, the old muxer
42  * and sink is disconnected from the pipeline and left to finish the file
43  * asynchronously, and a new muxer and sink is created to continue with the
44  * next fragment. For that reason, instead of muxer and sink objects, the
45  * muxer-factory and sink-factory properties are used to construct the new
46  * objects, together with muxer-properties and sink-properties.
47  *
48  * <refsect2>
49  * <title>Example pipelines</title>
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  * </refsect2>
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #include "config.h"
68 #endif
69
70 #include <string.h>
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
74
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
77
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
82
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
85
86 static void split_now (GstSplitMuxSink * splitmux);
87
88 enum
89 {
90   PROP_0,
91   PROP_LOCATION,
92   PROP_MAX_SIZE_TIME,
93   PROP_MAX_SIZE_BYTES,
94   PROP_MAX_SIZE_TIMECODE,
95   PROP_SEND_KEYFRAME_REQUESTS,
96   PROP_MAX_FILES,
97   PROP_MUXER_OVERHEAD,
98   PROP_USE_ROBUST_MUXING,
99   PROP_ALIGNMENT_THRESHOLD,
100   PROP_MUXER,
101   PROP_SINK,
102   PROP_RESET_MUXER,
103   PROP_ASYNC_FINALIZE,
104   PROP_MUXER_FACTORY,
105   PROP_MUXER_PROPERTIES,
106   PROP_SINK_FACTORY,
107   PROP_SINK_PROPERTIES
108 };
109
110 #define DEFAULT_MAX_SIZE_TIME       0
111 #define DEFAULT_MAX_SIZE_BYTES      0
112 #define DEFAULT_MAX_FILES           0
113 #define DEFAULT_MUXER_OVERHEAD      0.02
114 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
115 #define DEFAULT_ALIGNMENT_THRESHOLD 0
116 #define DEFAULT_MUXER "mp4mux"
117 #define DEFAULT_SINK "filesink"
118 #define DEFAULT_USE_ROBUST_MUXING FALSE
119 #define DEFAULT_RESET_MUXER TRUE
120 #define DEFAULT_ASYNC_FINALIZE FALSE
121
122 typedef struct _AsyncEosHelper
123 {
124   MqStreamCtx *ctx;
125   GstPad *pad;
126 } AsyncEosHelper;
127
128 enum
129 {
130   SIGNAL_FORMAT_LOCATION,
131   SIGNAL_FORMAT_LOCATION_FULL,
132   SIGNAL_SPLIT_NOW,
133   SIGNAL_MUXER_ADDED,
134   SIGNAL_SINK_ADDED,
135   SIGNAL_LAST
136 };
137
138 static guint signals[SIGNAL_LAST];
139
140 static GstStaticPadTemplate video_sink_template =
141 GST_STATIC_PAD_TEMPLATE ("video",
142     GST_PAD_SINK,
143     GST_PAD_REQUEST,
144     GST_STATIC_CAPS_ANY);
145 static GstStaticPadTemplate audio_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("audio_%u",
147     GST_PAD_SINK,
148     GST_PAD_REQUEST,
149     GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate subtitle_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
152     GST_PAD_SINK,
153     GST_PAD_REQUEST,
154     GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate caption_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("caption_%u",
157     GST_PAD_SINK,
158     GST_PAD_REQUEST,
159     GST_STATIC_CAPS_ANY);
160
161 static GQuark PAD_CONTEXT;
162 static GQuark EOS_FROM_US;
163 static GQuark RUNNING_TIME;
164 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
165  * to forward an incoming EOS message, but we cannot rely on the state of the
166  * splitmux anymore, so we set this qdata on the sink instead.
167  * The muxer and sink must be destroyed after both of these things have
168  * finished:
169  * 1) The EOS message has been sent when the fragment is ending
170  * 2) The muxer has been unlinked and relinked
171  * Therefore, EOS_FROM_US can have these two values:
172  * 0: EOS was not requested from us. Forward the message. The muxer and the
173  * sink will be destroyed together with the rest of the bin.
174  * 1: EOS was requested from us, but the other of the two tasks hasn't
175  * finished. Set EOS_FROM_US to 2 and do your stuff.
176  * 2: EOS was requested from us and the other of the two tasks has finished.
177  * Now we can destroy the muxer and the sink.
178  */
179
180 static void
181 _do_init (void)
182 {
183   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
184   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
185   RUNNING_TIME = g_quark_from_static_string ("running-time");
186 }
187
188 #define gst_splitmux_sink_parent_class parent_class
189 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
190     _do_init ());
191
192 static gboolean create_muxer (GstSplitMuxSink * splitmux);
193 static gboolean create_sink (GstSplitMuxSink * splitmux);
194 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
195     const GValue * value, GParamSpec * pspec);
196 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
197     GValue * value, GParamSpec * pspec);
198 static void gst_splitmux_sink_dispose (GObject * object);
199 static void gst_splitmux_sink_finalize (GObject * object);
200
201 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
202     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
203 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
204
205 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
206     element, GstStateChange transition);
207
208 static void bus_handler (GstBin * bin, GstMessage * msg);
209 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
210 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
211 static void mq_stream_ctx_free (MqStreamCtx * ctx);
212 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
213
214 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
215 static GstElement *create_element (GstSplitMuxSink * splitmux,
216     const gchar * factory, const gchar * name, gboolean locked);
217
218 static void do_async_done (GstSplitMuxSink * splitmux);
219
220 static MqStreamBuf *
221 mq_stream_buf_new (void)
222 {
223   return g_slice_new0 (MqStreamBuf);
224 }
225
226 static void
227 mq_stream_buf_free (MqStreamBuf * data)
228 {
229   g_slice_free (MqStreamBuf, data);
230 }
231
232 static SplitMuxOutputCommand *
233 out_cmd_buf_new (void)
234 {
235   return g_slice_new0 (SplitMuxOutputCommand);
236 }
237
238 static void
239 out_cmd_buf_free (SplitMuxOutputCommand * data)
240 {
241   g_slice_free (SplitMuxOutputCommand, data);
242 }
243
244 static void
245 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
246 {
247   GObjectClass *gobject_class = (GObjectClass *) klass;
248   GstElementClass *gstelement_class = (GstElementClass *) klass;
249   GstBinClass *gstbin_class = (GstBinClass *) klass;
250
251   gobject_class->set_property = gst_splitmux_sink_set_property;
252   gobject_class->get_property = gst_splitmux_sink_get_property;
253   gobject_class->dispose = gst_splitmux_sink_dispose;
254   gobject_class->finalize = gst_splitmux_sink_finalize;
255
256   gst_element_class_set_static_metadata (gstelement_class,
257       "Split Muxing Bin", "Generic/Bin/Muxer",
258       "Convenience bin that muxes incoming streams into multiple time/size limited files",
259       "Jan Schmidt <jan@centricular.com>");
260
261   gst_element_class_add_static_pad_template (gstelement_class,
262       &video_sink_template);
263   gst_element_class_add_static_pad_template (gstelement_class,
264       &audio_sink_template);
265   gst_element_class_add_static_pad_template (gstelement_class,
266       &subtitle_sink_template);
267   gst_element_class_add_static_pad_template (gstelement_class,
268       &caption_sink_template);
269
270   gstelement_class->change_state =
271       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
272   gstelement_class->request_new_pad =
273       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
274   gstelement_class->release_pad =
275       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
276
277   gstbin_class->handle_message = bus_handler;
278
279   g_object_class_install_property (gobject_class, PROP_LOCATION,
280       g_param_spec_string ("location", "File Output Pattern",
281           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
282           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
284       g_param_spec_double ("mux-overhead", "Muxing Overhead",
285           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
286           DEFAULT_MUXER_OVERHEAD,
287           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
288
289   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
290       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
291           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
292           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
293   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
294       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
295           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
296           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
298       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
299           "Maximum difference in timecode between first and last frame. "
300           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
301           "Will only be effective if a timecode track is present.",
302           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
304       g_param_spec_boolean ("send-keyframe-requests",
305           "Request keyframes at max-size-time",
306           "Request a keyframe every max-size-time ns to try splitting at that point. "
307           "Needs max-size-bytes to be 0 in order to be effective.",
308           DEFAULT_SEND_KEYFRAME_REQUESTS,
309           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
311       g_param_spec_uint ("max-files", "Max files",
312           "Maximum number of files to keep on disk. Once the maximum is reached,"
313           "old files start to be deleted to make room for new ones.", 0,
314           G_MAXUINT, DEFAULT_MAX_FILES,
315           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
317       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
318           "Allow non-reference streams to be that many ns before the reference"
319           " stream",
320           0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
321           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
322
323   g_object_class_install_property (gobject_class, PROP_MUXER,
324       g_param_spec_object ("muxer", "Muxer",
325           "The muxer element to use (NULL = default mp4mux). "
326           "Valid only for async-finalize = FALSE",
327           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328   g_object_class_install_property (gobject_class, PROP_SINK,
329       g_param_spec_object ("sink", "Sink",
330           "The sink element (or element chain) to use (NULL = default filesink). "
331           "Valid only for async-finalize = FALSE",
332           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333
334   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
335       g_param_spec_boolean ("use-robust-muxing",
336           "Support robust-muxing mode of some muxers",
337           "Check if muxers support robust muxing via the reserved-max-duration and "
338           "reserved-duration-remaining properties and use them if so. "
339           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
340           " create new fragments if the reserved header space is about to overflow. "
341           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
342           "manually by the app to a non-zero value for robust muxing to have an effect.",
343           DEFAULT_USE_ROBUST_MUXING,
344           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345
346   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
347       g_param_spec_boolean ("reset-muxer",
348           "Reset Muxer",
349           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
350           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
351
352   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
353       g_param_spec_boolean ("async-finalize",
354           "Finalize fragments asynchronously",
355           "Finalize each fragment asynchronously and start a new one",
356           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
358       g_param_spec_string ("muxer-factory", "Muxer factory",
359           "The muxer element factory to use (default = mp4mux). "
360           "Valid only for async-finalize = TRUE",
361           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
363       g_param_spec_boxed ("muxer-properties", "Muxer properties",
364           "The muxer element properties to use. "
365           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
366           "Valid only for async-finalize = TRUE",
367           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
369       g_param_spec_string ("sink-factory", "Sink factory",
370           "The sink element factory to use (default = filesink). "
371           "Valid only for async-finalize = TRUE",
372           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
374       g_param_spec_boxed ("sink-properties", "Sink properties",
375           "The sink element properties to use. "
376           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
377           "Valid only for async-finalize = TRUE",
378           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
379
380   /**
381    * GstSplitMuxSink::format-location:
382    * @splitmux: the #GstSplitMuxSink
383    * @fragment_id: the sequence number of the file to be created
384    *
385    * Returns: the location to be used for the next output file
386    */
387   signals[SIGNAL_FORMAT_LOCATION] =
388       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
389       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
390
391   /**
392    * GstSplitMuxSink::format-location-full:
393    * @splitmux: the #GstSplitMuxSink
394    * @fragment_id: the sequence number of the file to be created
395    * @first_sample: A #GstSample containing the first buffer
396    *   from the reference stream in the new file
397    *
398    * Returns: the location to be used for the next output file
399    */
400   signals[SIGNAL_FORMAT_LOCATION_FULL] =
401       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
402       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
403       GST_TYPE_SAMPLE);
404
405   /**
406    * GstSplitMuxSink::split-now:
407    * @splitmux: the #GstSplitMuxSink
408    *
409    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
410    *
411    *
412    * Since: 1.14
413    */
414   signals[SIGNAL_SPLIT_NOW] =
415       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
416       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
417           split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
418
419   /**
420    * GstSplitMuxSink::muxer-added:
421    * @splitmux: the #GstSplitMuxSink
422    * @muxer: the newly added muxer element
423    *
424    * Since: 1.14
425    */
426   signals[SIGNAL_MUXER_ADDED] =
427       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
428       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
429
430   /**
431    * GstSplitMuxSink::sink-added:
432    * @splitmux: the #GstSplitMuxSink
433    * @sink: the newly added sink element
434    *
435    * Since: 1.14
436    */
437   signals[SIGNAL_SINK_ADDED] =
438       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
439       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
440
441   klass->split_now = split_now;
442 }
443
444 static void
445 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
446 {
447   g_mutex_init (&splitmux->lock);
448   g_cond_init (&splitmux->input_cond);
449   g_cond_init (&splitmux->output_cond);
450   g_queue_init (&splitmux->out_cmd_q);
451
452   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
453   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
454   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
455   splitmux->max_files = DEFAULT_MAX_FILES;
456   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
457   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
458   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
459   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
460   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
461
462   splitmux->threshold_timecode_str = NULL;
463
464   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
465   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
466   splitmux->muxer_properties = NULL;
467   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
468   splitmux->sink_properties = NULL;
469
470   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
471   splitmux->split_now = FALSE;
472 }
473
474 static void
475 gst_splitmux_reset (GstSplitMuxSink * splitmux)
476 {
477   if (splitmux->muxer) {
478     gst_element_set_locked_state (splitmux->muxer, TRUE);
479     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
480     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
481   }
482   if (splitmux->active_sink) {
483     gst_element_set_locked_state (splitmux->active_sink, TRUE);
484     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
485     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
486   }
487
488   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
489 }
490
491 static void
492 gst_splitmux_sink_dispose (GObject * object)
493 {
494   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
495
496   /* Calling parent dispose invalidates all child pointers */
497   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
498
499   G_OBJECT_CLASS (parent_class)->dispose (object);
500 }
501
502 static void
503 gst_splitmux_sink_finalize (GObject * object)
504 {
505   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
506   g_cond_clear (&splitmux->input_cond);
507   g_cond_clear (&splitmux->output_cond);
508   g_mutex_clear (&splitmux->lock);
509   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
510   g_queue_clear (&splitmux->out_cmd_q);
511
512   if (splitmux->provided_sink)
513     gst_object_unref (splitmux->provided_sink);
514   if (splitmux->provided_muxer)
515     gst_object_unref (splitmux->provided_muxer);
516
517   if (splitmux->muxer_factory)
518     g_free (splitmux->muxer_factory);
519   if (splitmux->muxer_properties)
520     gst_structure_free (splitmux->muxer_properties);
521   if (splitmux->sink_factory)
522     g_free (splitmux->sink_factory);
523   if (splitmux->sink_properties)
524     gst_structure_free (splitmux->sink_properties);
525
526   if (splitmux->threshold_timecode_str)
527     g_free (splitmux->threshold_timecode_str);
528
529   g_free (splitmux->location);
530
531   /* Make sure to free any un-released contexts. There should not be any,
532    * because the dispose will have freed all request pads though */
533   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
534   g_list_free (splitmux->contexts);
535
536   G_OBJECT_CLASS (parent_class)->finalize (object);
537 }
538
539 /*
540  * Set any time threshold to the muxer, if it has
541  * reserved-max-duration and reserved-duration-remaining
542  * properties. Called when creating/claiming the muxer
543  * in create_elements() */
544 static void
545 update_muxer_properties (GstSplitMuxSink * sink)
546 {
547   GObjectClass *klass;
548   GstClockTime threshold_time;
549
550   sink->muxer_has_reserved_props = FALSE;
551   if (sink->muxer == NULL)
552     return;
553   klass = G_OBJECT_GET_CLASS (sink->muxer);
554   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
555     return;
556   if (g_object_class_find_property (klass,
557           "reserved-duration-remaining") == NULL)
558     return;
559   sink->muxer_has_reserved_props = TRUE;
560
561   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
562       GST_TIME_ARGS (sink->threshold_time));
563   GST_OBJECT_LOCK (sink);
564   threshold_time = sink->threshold_time;
565   GST_OBJECT_UNLOCK (sink);
566
567   if (threshold_time > 0) {
568     /* Tell the muxer how much space to reserve */
569     GstClockTime muxer_threshold = threshold_time;
570     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
571   }
572 }
573
574 static void
575 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
576     const GValue * value, GParamSpec * pspec)
577 {
578   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
579
580   switch (prop_id) {
581     case PROP_LOCATION:{
582       GST_OBJECT_LOCK (splitmux);
583       g_free (splitmux->location);
584       splitmux->location = g_value_dup_string (value);
585       GST_OBJECT_UNLOCK (splitmux);
586       break;
587     }
588     case PROP_MAX_SIZE_BYTES:
589       GST_OBJECT_LOCK (splitmux);
590       splitmux->threshold_bytes = g_value_get_uint64 (value);
591       GST_OBJECT_UNLOCK (splitmux);
592       break;
593     case PROP_MAX_SIZE_TIME:
594       GST_OBJECT_LOCK (splitmux);
595       splitmux->threshold_time = g_value_get_uint64 (value);
596       GST_OBJECT_UNLOCK (splitmux);
597       break;
598     case PROP_MAX_SIZE_TIMECODE:
599       GST_OBJECT_LOCK (splitmux);
600       splitmux->threshold_timecode_str = g_value_dup_string (value);
601       GST_OBJECT_UNLOCK (splitmux);
602       break;
603     case PROP_SEND_KEYFRAME_REQUESTS:
604       GST_OBJECT_LOCK (splitmux);
605       splitmux->send_keyframe_requests = g_value_get_boolean (value);
606       GST_OBJECT_UNLOCK (splitmux);
607       break;
608     case PROP_MAX_FILES:
609       GST_OBJECT_LOCK (splitmux);
610       splitmux->max_files = g_value_get_uint (value);
611       GST_OBJECT_UNLOCK (splitmux);
612       break;
613     case PROP_MUXER_OVERHEAD:
614       GST_OBJECT_LOCK (splitmux);
615       splitmux->mux_overhead = g_value_get_double (value);
616       GST_OBJECT_UNLOCK (splitmux);
617       break;
618     case PROP_USE_ROBUST_MUXING:
619       GST_OBJECT_LOCK (splitmux);
620       splitmux->use_robust_muxing = g_value_get_boolean (value);
621       GST_OBJECT_UNLOCK (splitmux);
622       if (splitmux->use_robust_muxing)
623         update_muxer_properties (splitmux);
624       break;
625     case PROP_ALIGNMENT_THRESHOLD:
626       GST_OBJECT_LOCK (splitmux);
627       splitmux->alignment_threshold = g_value_get_uint64 (value);
628       GST_OBJECT_UNLOCK (splitmux);
629       break;
630     case PROP_SINK:
631       GST_OBJECT_LOCK (splitmux);
632       if (splitmux->provided_sink)
633         gst_object_unref (splitmux->provided_sink);
634       splitmux->provided_sink = g_value_get_object (value);
635       gst_object_ref_sink (splitmux->provided_sink);
636       GST_OBJECT_UNLOCK (splitmux);
637       break;
638     case PROP_MUXER:
639       GST_OBJECT_LOCK (splitmux);
640       if (splitmux->provided_muxer)
641         gst_object_unref (splitmux->provided_muxer);
642       splitmux->provided_muxer = g_value_get_object (value);
643       gst_object_ref_sink (splitmux->provided_muxer);
644       GST_OBJECT_UNLOCK (splitmux);
645       break;
646     case PROP_RESET_MUXER:
647       GST_OBJECT_LOCK (splitmux);
648       splitmux->reset_muxer = g_value_get_boolean (value);
649       GST_OBJECT_UNLOCK (splitmux);
650       break;
651     case PROP_ASYNC_FINALIZE:
652       GST_OBJECT_LOCK (splitmux);
653       splitmux->async_finalize = g_value_get_boolean (value);
654       GST_OBJECT_UNLOCK (splitmux);
655       break;
656     case PROP_MUXER_FACTORY:
657       GST_OBJECT_LOCK (splitmux);
658       if (splitmux->muxer_factory)
659         g_free (splitmux->muxer_factory);
660       splitmux->muxer_factory = g_value_dup_string (value);
661       GST_OBJECT_UNLOCK (splitmux);
662       break;
663     case PROP_MUXER_PROPERTIES:
664       GST_OBJECT_LOCK (splitmux);
665       if (splitmux->muxer_properties)
666         gst_structure_free (splitmux->muxer_properties);
667       if (gst_value_get_structure (value))
668         splitmux->muxer_properties =
669             gst_structure_copy (gst_value_get_structure (value));
670       else
671         splitmux->muxer_properties = NULL;
672       GST_OBJECT_UNLOCK (splitmux);
673       break;
674     case PROP_SINK_FACTORY:
675       GST_OBJECT_LOCK (splitmux);
676       if (splitmux->sink_factory)
677         g_free (splitmux->sink_factory);
678       splitmux->sink_factory = g_value_dup_string (value);
679       GST_OBJECT_UNLOCK (splitmux);
680       break;
681     case PROP_SINK_PROPERTIES:
682       GST_OBJECT_LOCK (splitmux);
683       if (splitmux->sink_properties)
684         gst_structure_free (splitmux->sink_properties);
685       if (gst_value_get_structure (value))
686         splitmux->sink_properties =
687             gst_structure_copy (gst_value_get_structure (value));
688       else
689         splitmux->sink_properties = NULL;
690       GST_OBJECT_UNLOCK (splitmux);
691       break;
692     default:
693       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
694       break;
695   }
696 }
697
698 static void
699 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
700     GValue * value, GParamSpec * pspec)
701 {
702   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
703
704   switch (prop_id) {
705     case PROP_LOCATION:
706       GST_OBJECT_LOCK (splitmux);
707       g_value_set_string (value, splitmux->location);
708       GST_OBJECT_UNLOCK (splitmux);
709       break;
710     case PROP_MAX_SIZE_BYTES:
711       GST_OBJECT_LOCK (splitmux);
712       g_value_set_uint64 (value, splitmux->threshold_bytes);
713       GST_OBJECT_UNLOCK (splitmux);
714       break;
715     case PROP_MAX_SIZE_TIME:
716       GST_OBJECT_LOCK (splitmux);
717       g_value_set_uint64 (value, splitmux->threshold_time);
718       GST_OBJECT_UNLOCK (splitmux);
719       break;
720     case PROP_MAX_SIZE_TIMECODE:
721       GST_OBJECT_LOCK (splitmux);
722       g_value_set_string (value, splitmux->threshold_timecode_str);
723       GST_OBJECT_UNLOCK (splitmux);
724       break;
725     case PROP_SEND_KEYFRAME_REQUESTS:
726       GST_OBJECT_LOCK (splitmux);
727       g_value_set_boolean (value, splitmux->send_keyframe_requests);
728       GST_OBJECT_UNLOCK (splitmux);
729       break;
730     case PROP_MAX_FILES:
731       GST_OBJECT_LOCK (splitmux);
732       g_value_set_uint (value, splitmux->max_files);
733       GST_OBJECT_UNLOCK (splitmux);
734       break;
735     case PROP_MUXER_OVERHEAD:
736       GST_OBJECT_LOCK (splitmux);
737       g_value_set_double (value, splitmux->mux_overhead);
738       GST_OBJECT_UNLOCK (splitmux);
739       break;
740     case PROP_USE_ROBUST_MUXING:
741       GST_OBJECT_LOCK (splitmux);
742       g_value_set_boolean (value, splitmux->use_robust_muxing);
743       GST_OBJECT_UNLOCK (splitmux);
744       break;
745     case PROP_ALIGNMENT_THRESHOLD:
746       GST_OBJECT_LOCK (splitmux);
747       g_value_set_uint64 (value, splitmux->alignment_threshold);
748       GST_OBJECT_UNLOCK (splitmux);
749       break;
750     case PROP_SINK:
751       GST_OBJECT_LOCK (splitmux);
752       g_value_set_object (value, splitmux->provided_sink);
753       GST_OBJECT_UNLOCK (splitmux);
754       break;
755     case PROP_MUXER:
756       GST_OBJECT_LOCK (splitmux);
757       g_value_set_object (value, splitmux->provided_muxer);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     case PROP_RESET_MUXER:
761       GST_OBJECT_LOCK (splitmux);
762       g_value_set_boolean (value, splitmux->reset_muxer);
763       GST_OBJECT_UNLOCK (splitmux);
764       break;
765     case PROP_ASYNC_FINALIZE:
766       GST_OBJECT_LOCK (splitmux);
767       g_value_set_boolean (value, splitmux->async_finalize);
768       GST_OBJECT_UNLOCK (splitmux);
769       break;
770     case PROP_MUXER_FACTORY:
771       GST_OBJECT_LOCK (splitmux);
772       g_value_set_string (value, splitmux->muxer_factory);
773       GST_OBJECT_UNLOCK (splitmux);
774       break;
775     case PROP_MUXER_PROPERTIES:
776       GST_OBJECT_LOCK (splitmux);
777       gst_value_set_structure (value, splitmux->muxer_properties);
778       GST_OBJECT_UNLOCK (splitmux);
779       break;
780     case PROP_SINK_FACTORY:
781       GST_OBJECT_LOCK (splitmux);
782       g_value_set_string (value, splitmux->sink_factory);
783       GST_OBJECT_UNLOCK (splitmux);
784       break;
785     case PROP_SINK_PROPERTIES:
786       GST_OBJECT_LOCK (splitmux);
787       gst_value_set_structure (value, splitmux->sink_properties);
788       GST_OBJECT_UNLOCK (splitmux);
789       break;
790     default:
791       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
792       break;
793   }
794 }
795
796 /* Convenience function */
797 static inline GstClockTimeDiff
798 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
799 {
800   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
801
802   if (GST_CLOCK_TIME_IS_VALID (val)) {
803     gboolean sign =
804         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
805     if (sign > 0)
806       res = val;
807     else if (sign < 0)
808       res = -val;
809   }
810   return res;
811 }
812
813 static MqStreamCtx *
814 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
815 {
816   MqStreamCtx *ctx;
817
818   ctx = g_new0 (MqStreamCtx, 1);
819   ctx->splitmux = splitmux;
820   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
821   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
822   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
823   g_queue_init (&ctx->queued_bufs);
824   return ctx;
825 }
826
827 static void
828 mq_stream_ctx_free (MqStreamCtx * ctx)
829 {
830   if (ctx->q) {
831     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
832
833     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
834
835     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
836       gst_element_set_locked_state (ctx->q, TRUE);
837       gst_element_set_state (ctx->q, GST_STATE_NULL);
838       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
839       gst_object_unref (parent);
840     }
841     gst_object_unref (ctx->q);
842   }
843   gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
844   gst_object_unref (ctx->sinkpad);
845   gst_object_unref (ctx->srcpad);
846   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
847   g_queue_clear (&ctx->queued_bufs);
848   g_free (ctx);
849 }
850
851 static void
852 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
853     GstElement * sink)
854 {
855   gchar *location = NULL;
856   GstMessage *msg;
857   const gchar *msg_name = opened ?
858       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
859   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
860
861   if (!opened) {
862     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
863     if (rtime)
864       running_time = *rtime;
865   }
866
867   g_object_get (sink, "location", &location, NULL);
868
869   /* If it's in the middle of a teardown, the reference_ctc might have become
870    * NULL */
871   if (splitmux->reference_ctx) {
872     msg = gst_message_new_element (GST_OBJECT (splitmux),
873         gst_structure_new (msg_name,
874             "location", G_TYPE_STRING, location,
875             "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
876     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
877   }
878
879   g_free (location);
880 }
881
882 static void
883 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
884 {
885   GstEvent *eos;
886   GstPad *pad;
887   MqStreamCtx *ctx;
888
889   eos = gst_event_new_eos ();
890   pad = helper->pad;
891   ctx = helper->ctx;
892
893   GST_SPLITMUX_LOCK (splitmux);
894   if (!pad)
895     pad = gst_pad_get_peer (ctx->srcpad);
896   GST_SPLITMUX_UNLOCK (splitmux);
897
898   gst_pad_send_event (pad, eos);
899   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
900
901   gst_object_unref (pad);
902   g_free (helper);
903 }
904
905 /* Called with lock held, drops the lock to send EOS to the
906  * pad
907  */
908 static void
909 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
910 {
911   GstEvent *eos;
912   GstPad *pad;
913
914   eos = gst_event_new_eos ();
915   pad = gst_pad_get_peer (ctx->srcpad);
916
917   ctx->out_eos = TRUE;
918
919   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
920   GST_SPLITMUX_UNLOCK (splitmux);
921   gst_pad_send_event (pad, eos);
922   GST_SPLITMUX_LOCK (splitmux);
923
924   gst_object_unref (pad);
925 }
926
927 /* Called with lock held. Schedules an EOS event to the ctx pad
928  * to happen in another thread */
929 static void
930 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
931 {
932   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
933   GstPad *srcpad, *sinkpad;
934
935   srcpad = ctx->srcpad;
936   sinkpad = gst_pad_get_peer (srcpad);
937
938   helper->ctx = ctx;
939   helper->pad = sinkpad;        /* Takes the reference */
940
941   ctx->out_eos_async_done = TRUE;
942   /* HACK: Here, we explicitly unset the SINK flag on the target sink element
943    * that's about to be asynchronously disposed, so that it no longer
944    * participates in GstBin EOS logic. This fixes a race where if
945    * splitmuxsink really reaches EOS before an asynchronous background
946    * element has finished, then the bin won't actually send EOS to the
947    * pipeline. Even after finishing and removing the old element, the
948    * bin doesn't re-check EOS status on removing a SINK element. This
949    * should be fixed in core, making this hack unnecessary. */
950   GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
951
952   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
953       sinkpad, ctx);
954
955   g_assert_nonnull (helper->pad);
956   gst_element_call_async (GST_ELEMENT (splitmux),
957       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
958 }
959
960 /* Called with lock held. TRUE iff all contexts have a
961  * pending (or delivered) async eos event */
962 static gboolean
963 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
964 {
965   gboolean ret = TRUE;
966   GList *item;
967
968   for (item = splitmux->contexts; item; item = item->next) {
969     MqStreamCtx *ctx = item->data;
970     ret &= ctx->out_eos_async_done;
971   }
972   return ret;
973 }
974
975 /* Called with splitmux lock held to check if this output
976  * context needs to sleep to wait for the release of the
977  * next GOP, or to send EOS to close out the current file
978  */
979 static void
980 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
981 {
982   if (ctx->caps_change)
983     return;
984
985   do {
986     /* When first starting up, the reference stream has to output
987      * the first buffer to prepare the muxer and sink */
988     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
989     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
990
991     if (!(splitmux->max_out_running_time == 0 ||
992             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
993             splitmux->alignment_threshold == 0 ||
994             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
995       my_max_out_running_time -= splitmux->alignment_threshold;
996       GST_LOG_OBJECT (ctx->srcpad,
997           "Max out running time currently %" GST_STIME_FORMAT
998           ", with threshold applied it is %" GST_STIME_FORMAT,
999           GST_STIME_ARGS (splitmux->max_out_running_time),
1000           GST_STIME_ARGS (my_max_out_running_time));
1001     }
1002
1003     if (ctx->flushing
1004         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1005       return;
1006
1007     GST_LOG_OBJECT (ctx->srcpad,
1008         "Checking running time %" GST_STIME_FORMAT " against max %"
1009         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1010         GST_STIME_ARGS (my_max_out_running_time));
1011
1012     if (can_output) {
1013       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1014           ctx->out_running_time < my_max_out_running_time) {
1015         return;
1016       }
1017
1018       switch (splitmux->output_state) {
1019         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1020           /* We only get here if we've finished outputting a GOP and need to know
1021            * what to do next */
1022           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1023           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1024           continue;
1025
1026         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1027           /* We've reached the max out running_time to get here, so end this file now */
1028           if (ctx->out_eos == FALSE) {
1029             if (splitmux->async_finalize) {
1030               /* We must set EOS asynchronously at this point. We cannot defer
1031                * it, because we need all contexts to wake up, for the
1032                * reference context to eventually give us something at
1033                * START_NEXT_FILE. Otherwise, collectpads might choose another
1034                * context to give us the first buffer, and format-location-full
1035                * will not contain a valid sample. */
1036               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1037                   GINT_TO_POINTER (1));
1038               eos_context_async (ctx, splitmux);
1039               if (all_contexts_are_async_eos (splitmux)) {
1040                 GST_INFO_OBJECT (splitmux,
1041                     "All contexts are async_eos. Moving to the next file.");
1042                 /* We can start the next file once we've asked each pad to go EOS */
1043                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1044                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1045                 continue;
1046               }
1047             } else {
1048               send_eos (splitmux, ctx);
1049               continue;
1050             }
1051           } else {
1052             GST_INFO_OBJECT (splitmux,
1053                 "At end-of-file state, but context %p is already EOS", ctx);
1054           }
1055           break;
1056         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1057           if (ctx->is_reference) {
1058             /* Special handling on the reference ctx to start new fragments
1059              * and collect commands from the command queue */
1060             /* drops the splitmux lock briefly: */
1061             /* We must have reference ctx in order for format-location-full to
1062              * have a sample */
1063             start_next_fragment (splitmux, ctx);
1064             continue;
1065           }
1066           break;
1067         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1068           do {
1069             SplitMuxOutputCommand *cmd =
1070                 g_queue_pop_tail (&splitmux->out_cmd_q);
1071             if (cmd != NULL) {
1072               /* If we pop the last command, we need to make our queues bigger */
1073               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1074                 grow_blocked_queues (splitmux);
1075
1076               if (cmd->start_new_fragment) {
1077                 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1078                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1079               } else {
1080                 GST_DEBUG_OBJECT (splitmux,
1081                     "Got new output cmd for time %" GST_STIME_FORMAT,
1082                     GST_STIME_ARGS (cmd->max_output_ts));
1083
1084                 /* Extend the output range immediately */
1085                 splitmux->max_out_running_time = cmd->max_output_ts;
1086                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1087               }
1088               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1089
1090               out_cmd_buf_free (cmd);
1091               break;
1092             } else {
1093               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1094             }
1095           } while (splitmux->output_state ==
1096               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1097           /* loop and re-check the state */
1098           continue;
1099         }
1100         case SPLITMUX_OUTPUT_STATE_STOPPED:
1101           return;
1102       }
1103     }
1104
1105     GST_INFO_OBJECT (ctx->srcpad,
1106         "Sleeping for running time %"
1107         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1108         GST_STIME_ARGS (ctx->out_running_time),
1109         GST_STIME_ARGS (splitmux->max_out_running_time));
1110     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1111     GST_INFO_OBJECT (ctx->srcpad,
1112         "Woken for new max running time %" GST_STIME_FORMAT,
1113         GST_STIME_ARGS (splitmux->max_out_running_time));
1114   }
1115   while (1);
1116 }
1117
1118 static GstClockTime
1119 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1120     const GstVideoTimeCode * cur_tc)
1121 {
1122   GstVideoTimeCode *target_tc;
1123   GstVideoTimeCodeInterval *tc_inter;
1124   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1125
1126   if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1127     return GST_CLOCK_TIME_NONE;
1128
1129   tc_inter =
1130       gst_video_time_code_interval_new_from_string
1131       (splitmux->threshold_timecode_str);
1132   target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1133   gst_video_time_code_interval_free (tc_inter);
1134
1135   /* Convert to ns */
1136   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1137   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1138
1139   /* Add fragment_start_time, accounting for wraparound */
1140   if (target_tc_time >= cur_tc_time) {
1141     next_max_tc_time =
1142         target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1143   } else {
1144     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1145
1146     next_max_tc_time =
1147         day_in_ns - cur_tc_time + target_tc_time +
1148         splitmux->fragment_start_time;
1149   }
1150   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1151       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1152       GST_TIME_ARGS (cur_tc_time));
1153   gst_video_time_code_free (target_tc);
1154
1155   return next_max_tc_time;
1156 }
1157
1158 static gboolean
1159 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1160 {
1161   GstEvent *ev;
1162   GstClockTime target_time;
1163   gboolean timecode_based = FALSE;
1164
1165   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1166   if (splitmux->threshold_timecode_str) {
1167     GstVideoTimeCodeMeta *tc_meta;
1168
1169     if (buffer != NULL) {
1170       tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1171       if (tc_meta) {
1172         splitmux->next_max_tc_time =
1173             calculate_next_max_timecode (splitmux, &tc_meta->tc);
1174         timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1175       }
1176     } else {
1177       /* This can happen in the presence of GAP events that trigger
1178        * a new fragment start */
1179       GST_WARNING_OBJECT (splitmux,
1180           "No buffer available to calculate next timecode");
1181     }
1182   }
1183
1184   if (splitmux->send_keyframe_requests == FALSE
1185       || (splitmux->threshold_time == 0 && !timecode_based)
1186       || splitmux->threshold_bytes != 0)
1187     return TRUE;
1188
1189   if (timecode_based) {
1190     /* We might have rounding errors: aim slightly earlier */
1191     target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1192   } else {
1193     target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1194   }
1195   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1196   GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1197       GST_TIME_ARGS (target_time));
1198   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1199 }
1200
1201 static GstPadProbeReturn
1202 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1203 {
1204   GstSplitMuxSink *splitmux = ctx->splitmux;
1205   MqStreamBuf *buf_info = NULL;
1206
1207   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1208
1209   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1210   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1211     g_warning ("Buffer list handling not implemented");
1212     return GST_PAD_PROBE_DROP;
1213   }
1214   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1215       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1216     GstEvent *event = gst_pad_probe_info_get_event (info);
1217     gboolean locked = FALSE;
1218
1219     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1220
1221     switch (GST_EVENT_TYPE (event)) {
1222       case GST_EVENT_SEGMENT:
1223         gst_event_copy_segment (event, &ctx->out_segment);
1224         break;
1225       case GST_EVENT_FLUSH_STOP:
1226         GST_SPLITMUX_LOCK (splitmux);
1227         locked = TRUE;
1228         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1229         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1230         g_queue_clear (&ctx->queued_bufs);
1231         ctx->flushing = FALSE;
1232         break;
1233       case GST_EVENT_FLUSH_START:
1234         GST_SPLITMUX_LOCK (splitmux);
1235         locked = TRUE;
1236         GST_LOG_OBJECT (pad, "Flush start");
1237         ctx->flushing = TRUE;
1238         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1239         break;
1240       case GST_EVENT_EOS:
1241         GST_SPLITMUX_LOCK (splitmux);
1242         locked = TRUE;
1243         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1244           goto beach;
1245         ctx->out_eos = TRUE;
1246         GST_INFO_OBJECT (splitmux,
1247             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1248         break;
1249       case GST_EVENT_GAP:{
1250         GstClockTime gap_ts;
1251         GstClockTimeDiff rtime;
1252
1253         gst_event_parse_gap (event, &gap_ts, NULL);
1254         if (gap_ts == GST_CLOCK_TIME_NONE)
1255           break;
1256
1257         GST_SPLITMUX_LOCK (splitmux);
1258         locked = TRUE;
1259
1260         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1261           goto beach;
1262
1263         /* When we get a gap event on the
1264          * reference stream and we're trying to open a
1265          * new file, we need to store it until we get
1266          * the buffer afterwards
1267          */
1268         if (ctx->is_reference &&
1269             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1270           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1271           gst_event_replace (&ctx->pending_gap, event);
1272           GST_SPLITMUX_UNLOCK (splitmux);
1273           return GST_PAD_PROBE_HANDLED;
1274         }
1275
1276         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1277
1278         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1279             GST_STIME_ARGS (rtime));
1280
1281         if (rtime != GST_CLOCK_STIME_NONE) {
1282           ctx->out_running_time = rtime;
1283           complete_or_wait_on_out (splitmux, ctx);
1284         }
1285         break;
1286       }
1287       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1288         const GstStructure *s;
1289         GstClockTimeDiff ts = 0;
1290
1291         s = gst_event_get_structure (event);
1292         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1293           break;
1294
1295         gst_structure_get_int64 (s, "timestamp", &ts);
1296
1297         GST_SPLITMUX_LOCK (splitmux);
1298         locked = TRUE;
1299
1300         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1301           goto beach;
1302         ctx->out_running_time = ts;
1303         if (!ctx->is_reference)
1304           complete_or_wait_on_out (splitmux, ctx);
1305         GST_SPLITMUX_UNLOCK (splitmux);
1306         return GST_PAD_PROBE_DROP;
1307       }
1308       case GST_EVENT_CAPS:{
1309         GstPad *peer;
1310
1311         if (!ctx->is_reference)
1312           break;
1313
1314         peer = gst_pad_get_peer (pad);
1315         if (peer) {
1316           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1317
1318           gst_object_unref (peer);
1319
1320           if (ok)
1321             break;
1322
1323         } else {
1324           break;
1325         }
1326         /* This is in the case the muxer doesn't allow this change of caps */
1327         GST_SPLITMUX_LOCK (splitmux);
1328         locked = TRUE;
1329         ctx->caps_change = TRUE;
1330
1331         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1332           GST_DEBUG_OBJECT (splitmux,
1333               "New caps were not accepted. Switching output file");
1334           if (ctx->out_eos == FALSE) {
1335             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1336             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1337           }
1338         }
1339
1340         /* Lets it fall through, if it fails again, then the muxer just can't
1341          * support this format, but at least we have a closed file.
1342          */
1343         break;
1344       }
1345       default:
1346         break;
1347     }
1348
1349     /* We need to make sure events aren't passed
1350      * until the muxer / sink are ready for it */
1351     if (!locked)
1352       GST_SPLITMUX_LOCK (splitmux);
1353     if (!ctx->is_reference)
1354       complete_or_wait_on_out (splitmux, ctx);
1355     GST_SPLITMUX_UNLOCK (splitmux);
1356
1357     /* Don't try to forward sticky events before the next buffer is there
1358      * because it would cause a new file to be created without the first
1359      * buffer being available.
1360      */
1361     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1362       gst_event_unref (event);
1363       return GST_PAD_PROBE_HANDLED;
1364     } else
1365       return GST_PAD_PROBE_PASS;
1366   }
1367
1368   /* Allow everything through until the configured next stopping point */
1369   GST_SPLITMUX_LOCK (splitmux);
1370
1371   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1372   if (buf_info == NULL)
1373     /* Can only happen due to a poorly timed flush */
1374     goto beach;
1375
1376   /* If we have popped a keyframe, decrement the queued_gop count */
1377   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1378     splitmux->queued_keyframes--;
1379
1380   ctx->out_running_time = buf_info->run_ts;
1381   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1382
1383   GST_LOG_OBJECT (splitmux,
1384       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1385       " size %" G_GUINT64_FORMAT,
1386       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1387
1388   ctx->caps_change = FALSE;
1389
1390   complete_or_wait_on_out (splitmux, ctx);
1391
1392   splitmux->muxed_out_bytes += buf_info->buf_size;
1393
1394 #ifndef GST_DISABLE_GST_DEBUG
1395   {
1396     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1397     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1398         " run ts %" GST_STIME_FORMAT, buf,
1399         GST_STIME_ARGS (ctx->out_running_time));
1400   }
1401 #endif
1402
1403   ctx->cur_out_buffer = NULL;
1404   GST_SPLITMUX_UNLOCK (splitmux);
1405
1406   /* pending_gap is protected by the STREAM lock */
1407   if (ctx->pending_gap) {
1408     /* If we previously stored a gap event, send it now */
1409     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1410
1411     GST_DEBUG_OBJECT (splitmux,
1412         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1413
1414     gst_pad_send_event (peer, ctx->pending_gap);
1415     ctx->pending_gap = NULL;
1416
1417     gst_object_unref (peer);
1418   }
1419
1420   mq_stream_buf_free (buf_info);
1421
1422   return GST_PAD_PROBE_PASS;
1423
1424 beach:
1425   GST_SPLITMUX_UNLOCK (splitmux);
1426   return GST_PAD_PROBE_DROP;
1427 }
1428
1429 static gboolean
1430 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1431 {
1432   return gst_pad_send_event (peer, gst_event_ref (*event));
1433 }
1434
1435 static void
1436 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1437 {
1438   if (ctx->fragment_block_id > 0) {
1439     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1440     ctx->fragment_block_id = 0;
1441   }
1442 }
1443
1444 static void
1445 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1446 {
1447   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1448
1449   gst_pad_sticky_events_foreach (ctx->srcpad,
1450       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1451
1452   /* Clear EOS flag if not actually EOS */
1453   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1454   ctx->out_eos_async_done = ctx->out_eos;
1455
1456   gst_object_unref (peer);
1457 }
1458
1459 static void
1460 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1461 {
1462   GstPad *sinkpad, *srcpad, *newpad;
1463   GstPadTemplate *templ;
1464
1465   srcpad = ctx->srcpad;
1466   sinkpad = gst_pad_get_peer (srcpad);
1467
1468   templ = sinkpad->padtemplate;
1469   newpad =
1470       gst_element_request_pad (splitmux->muxer, templ,
1471       GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1472
1473   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1474       newpad);
1475   if (!gst_pad_unlink (srcpad, sinkpad)) {
1476     gst_object_unref (sinkpad);
1477     goto fail;
1478   }
1479   if (gst_pad_link_full (srcpad, newpad,
1480           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1481     gst_element_release_request_pad (splitmux->muxer, newpad);
1482     gst_object_unref (sinkpad);
1483     gst_object_unref (newpad);
1484     goto fail;
1485   }
1486   gst_object_unref (newpad);
1487   gst_object_unref (sinkpad);
1488   return;
1489
1490 fail:
1491   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1492       ("Could not create the new muxer/sink"), NULL);
1493 }
1494
1495 static GstPadProbeReturn
1496 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1497 {
1498   return GST_PAD_PROBE_OK;
1499 }
1500
1501 static void
1502 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1503 {
1504   ctx->fragment_block_id =
1505       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1506       NULL, NULL);
1507 }
1508
1509 static gboolean
1510 _set_property_from_structure (GQuark field_id, const GValue * value,
1511     gpointer user_data)
1512 {
1513   const gchar *property_name = g_quark_to_string (field_id);
1514   GObject *element = G_OBJECT (user_data);
1515
1516   g_object_set_property (element, property_name, value);
1517
1518   return TRUE;
1519 }
1520
1521 static void
1522 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1523 {
1524   gst_element_set_locked_state (element, TRUE);
1525   gst_element_set_state (element, GST_STATE_NULL);
1526   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1527   gst_bin_remove (GST_BIN (splitmux), element);
1528 }
1529
1530
1531 static void
1532 _send_event (const GValue * value, gpointer user_data)
1533 {
1534   GstPad *pad = g_value_get_object (value);
1535   GstEvent *ev = user_data;
1536
1537   gst_pad_send_event (pad, gst_event_ref (ev));
1538 }
1539
1540 /* Called with lock held when a fragment
1541  * reaches EOS and it is time to restart
1542  * a new fragment
1543  */
1544 static void
1545 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1546 {
1547   GstElement *muxer, *sink;
1548
1549   g_assert (ctx->is_reference);
1550
1551   /* 1 change to new file */
1552   splitmux->switching_fragment = TRUE;
1553
1554   /* We need to drop the splitmux lock to acquire the state lock
1555    * here and ensure there's no racy state change going on elsewhere */
1556   muxer = gst_object_ref (splitmux->muxer);
1557   sink = gst_object_ref (splitmux->active_sink);
1558
1559   GST_SPLITMUX_UNLOCK (splitmux);
1560   GST_STATE_LOCK (splitmux);
1561
1562   if (splitmux->async_finalize) {
1563     if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1564       gchar *newname;
1565       GstElement *new_sink, *new_muxer;
1566
1567       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1568           splitmux->fragment_id);
1569       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1570       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1571       GST_SPLITMUX_LOCK (splitmux);
1572       if ((splitmux->sink =
1573               create_element (splitmux, splitmux->sink_factory, newname,
1574                   TRUE)) == NULL)
1575         goto fail;
1576       if (splitmux->sink_properties)
1577         gst_structure_foreach (splitmux->sink_properties,
1578             _set_property_from_structure, splitmux->sink);
1579       splitmux->active_sink = splitmux->sink;
1580       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1581       g_free (newname);
1582       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1583       if ((splitmux->muxer =
1584               create_element (splitmux, splitmux->muxer_factory, newname,
1585                   TRUE)) == NULL)
1586         goto fail;
1587       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1588               "async") != NULL) {
1589         /* async child elements are causing state change races and weird
1590          * failures, so let's try and turn that off */
1591         g_object_set (splitmux->sink, "async", FALSE, NULL);
1592       }
1593       if (splitmux->muxer_properties)
1594         gst_structure_foreach (splitmux->muxer_properties,
1595             _set_property_from_structure, splitmux->muxer);
1596       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1597       g_free (newname);
1598       new_sink = splitmux->sink;
1599       new_muxer = splitmux->muxer;
1600       GST_SPLITMUX_UNLOCK (splitmux);
1601       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1602       gst_element_link (new_muxer, new_sink);
1603
1604       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1605         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1606                     EOS_FROM_US)) == 2) {
1607           _lock_and_set_to_null (muxer, splitmux);
1608           _lock_and_set_to_null (sink, splitmux);
1609         } else {
1610           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1611               GINT_TO_POINTER (2));
1612         }
1613       }
1614       gst_object_unref (muxer);
1615       gst_object_unref (sink);
1616       muxer = new_muxer;
1617       sink = new_sink;
1618       gst_object_ref (muxer);
1619       gst_object_ref (sink);
1620     }
1621   } else {
1622
1623     gst_element_set_locked_state (muxer, TRUE);
1624     gst_element_set_locked_state (sink, TRUE);
1625     gst_element_set_state (sink, GST_STATE_NULL);
1626
1627     if (splitmux->reset_muxer) {
1628       gst_element_set_state (muxer, GST_STATE_NULL);
1629     } else {
1630       GstIterator *it = gst_element_iterate_sink_pads (muxer);
1631       GstEvent *ev;
1632
1633       ev = gst_event_new_flush_start ();
1634       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1635       gst_event_unref (ev);
1636
1637       gst_iterator_resync (it);
1638
1639       ev = gst_event_new_flush_stop (TRUE);
1640       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1641       gst_event_unref (ev);
1642
1643       gst_iterator_free (it);
1644     }
1645   }
1646
1647   GST_SPLITMUX_LOCK (splitmux);
1648   if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1649     set_next_filename (splitmux, ctx);
1650   splitmux->muxed_out_bytes = 0;
1651   GST_SPLITMUX_UNLOCK (splitmux);
1652
1653   gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1654   gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1655   gst_element_set_locked_state (muxer, FALSE);
1656   gst_element_set_locked_state (sink, FALSE);
1657
1658   gst_object_unref (sink);
1659   gst_object_unref (muxer);
1660
1661   GST_SPLITMUX_LOCK (splitmux);
1662   GST_STATE_UNLOCK (splitmux);
1663   splitmux->switching_fragment = FALSE;
1664   do_async_done (splitmux);
1665
1666   splitmux->ready_for_output = TRUE;
1667
1668   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1669   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1670
1671   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1672
1673   /* FIXME: Is this always the correct next state? */
1674   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1675   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1676   return;
1677
1678 fail:
1679   GST_STATE_UNLOCK (splitmux);
1680   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1681       ("Could not create the new muxer/sink"), NULL);
1682 }
1683
1684 static void
1685 bus_handler (GstBin * bin, GstMessage * message)
1686 {
1687   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1688
1689   switch (GST_MESSAGE_TYPE (message)) {
1690     case GST_MESSAGE_EOS:{
1691       /* If the state is draining out the current file, drop this EOS */
1692       GstElement *sink;
1693
1694       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1695       GST_SPLITMUX_LOCK (splitmux);
1696
1697       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1698
1699       if (splitmux->async_finalize) {
1700
1701         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1702           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1703                       EOS_FROM_US)) == 2) {
1704             GstElement *muxer;
1705             GstPad *sinksink, *muxersrc;
1706
1707             sinksink = gst_element_get_static_pad (sink, "sink");
1708             muxersrc = gst_pad_get_peer (sinksink);
1709             muxer = gst_pad_get_parent_element (muxersrc);
1710             gst_object_unref (sinksink);
1711             gst_object_unref (muxersrc);
1712
1713             gst_element_call_async (muxer,
1714                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1715                 gst_object_ref (splitmux), gst_object_unref);
1716             gst_element_call_async (sink,
1717                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1718                 gst_object_ref (splitmux), gst_object_unref);
1719             gst_object_unref (muxer);
1720           } else {
1721             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1722                 GINT_TO_POINTER (2));
1723           }
1724           GST_DEBUG_OBJECT (splitmux,
1725               "Caught async EOS from previous muxer+sink. Dropping.");
1726           /* We forward the EOS so that it gets aggregated as normal. If the sink
1727            * finishes and is removed before the end, it will be de-aggregated */
1728           gst_message_unref (message);
1729           GST_SPLITMUX_UNLOCK (splitmux);
1730           return;
1731         }
1732       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1733         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1734         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1735         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1736
1737         gst_message_unref (message);
1738         GST_SPLITMUX_UNLOCK (splitmux);
1739         return;
1740       } else {
1741         GST_DEBUG_OBJECT (splitmux,
1742             "Passing EOS message. Output state %d max_out_running_time %"
1743             GST_STIME_FORMAT, splitmux->output_state,
1744             GST_STIME_ARGS (splitmux->max_out_running_time));
1745       }
1746       GST_SPLITMUX_UNLOCK (splitmux);
1747       break;
1748     }
1749     case GST_MESSAGE_ASYNC_START:
1750     case GST_MESSAGE_ASYNC_DONE:
1751       /* Ignore state changes from our children while switching */
1752       GST_SPLITMUX_LOCK (splitmux);
1753       if (splitmux->switching_fragment) {
1754         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1755             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1756           GST_LOG_OBJECT (splitmux,
1757               "Ignoring state change from child %" GST_PTR_FORMAT
1758               " while switching", GST_MESSAGE_SRC (message));
1759           gst_message_unref (message);
1760           GST_SPLITMUX_UNLOCK (splitmux);
1761           return;
1762         }
1763       }
1764       GST_SPLITMUX_UNLOCK (splitmux);
1765       break;
1766     case GST_MESSAGE_WARNING:
1767     {
1768       GError *gerror = NULL;
1769
1770       gst_message_parse_warning (message, &gerror, NULL);
1771
1772       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1773         GList *item;
1774         gboolean caps_change = FALSE;
1775
1776         GST_SPLITMUX_LOCK (splitmux);
1777
1778         for (item = splitmux->contexts; item; item = item->next) {
1779           MqStreamCtx *ctx = item->data;
1780
1781           if (ctx->caps_change) {
1782             caps_change = TRUE;
1783             break;
1784           }
1785         }
1786
1787         GST_SPLITMUX_UNLOCK (splitmux);
1788
1789         if (caps_change) {
1790           GST_LOG_OBJECT (splitmux,
1791               "Ignoring warning change from child %" GST_PTR_FORMAT
1792               " while switching caps", GST_MESSAGE_SRC (message));
1793           gst_message_unref (message);
1794           return;
1795         }
1796       }
1797       break;
1798     }
1799     default:
1800       break;
1801   }
1802
1803   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1804 }
1805
1806 static void
1807 ctx_set_unblock (MqStreamCtx * ctx)
1808 {
1809   ctx->need_unblock = TRUE;
1810 }
1811
1812 static gboolean
1813 need_new_fragment (GstSplitMuxSink * splitmux,
1814     GstClockTime queued_time, GstClockTime queued_gop_time,
1815     guint64 queued_bytes)
1816 {
1817   guint64 thresh_bytes;
1818   GstClockTime thresh_time;
1819   gboolean check_robust_muxing;
1820
1821   GST_OBJECT_LOCK (splitmux);
1822   thresh_bytes = splitmux->threshold_bytes;
1823   thresh_time = splitmux->threshold_time;
1824   check_robust_muxing = splitmux->use_robust_muxing
1825       && splitmux->muxer_has_reserved_props;
1826   GST_OBJECT_UNLOCK (splitmux);
1827
1828   /* Have we muxed anything into the new file at all? */
1829   if (splitmux->fragment_total_bytes <= 0)
1830     return FALSE;
1831
1832   /* User told us to split now */
1833   if (g_atomic_int_get (&(splitmux->split_now)) == TRUE)
1834     return TRUE;
1835
1836   if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1837     return TRUE;                /* Would overrun byte limit */
1838
1839   if (thresh_time > 0 && queued_time > thresh_time)
1840     return TRUE;                /* Would overrun byte limit */
1841
1842   /* Timecode-based threshold accounts for possible rounding errors:
1843    * 5us should be bigger than all possible rounding errors but nowhere near
1844    * big enough to skip to another frame */
1845   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1846       splitmux->reference_ctx->in_running_time >
1847       splitmux->next_max_tc_time + 5 * GST_USECOND)
1848     return TRUE;                /* Timecode threshold */
1849
1850   if (check_robust_muxing) {
1851     GstClockTime mux_reserved_remain;
1852
1853     g_object_get (splitmux->muxer,
1854         "reserved-duration-remaining", &mux_reserved_remain, NULL);
1855
1856     GST_LOG_OBJECT (splitmux,
1857         "Muxer robust muxing report - %" G_GUINT64_FORMAT
1858         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1859         mux_reserved_remain, queued_gop_time);
1860
1861     if (queued_gop_time >= mux_reserved_remain) {
1862       GST_INFO_OBJECT (splitmux,
1863           "File is about to run out of header room - %" G_GUINT64_FORMAT
1864           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1865           ". Switching to new file", mux_reserved_remain, queued_gop_time);
1866       return TRUE;
1867     }
1868   }
1869
1870   /* Continue and mux this GOP */
1871   return FALSE;
1872 }
1873
1874 /* Called with splitmux lock held */
1875 /* Called when entering ProcessingCompleteGop state
1876  * Assess if mq contents overflowed the current file
1877  *   -> If yes, need to switch to new file
1878  *   -> if no, set max_out_running_time to let this GOP in and
1879  *      go to COLLECTING_GOP_START state
1880  */
1881 static void
1882 handle_gathered_gop (GstSplitMuxSink * splitmux)
1883 {
1884   guint64 queued_bytes;
1885   GstClockTimeDiff queued_time = 0;
1886   GstClockTimeDiff queued_gop_time = 0;
1887   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1888   SplitMuxOutputCommand *cmd;
1889
1890   /* Assess if the multiqueue contents overflowed the current file */
1891   /* When considering if a newly gathered GOP overflows
1892    * the time limit for the file, only consider the running time of the
1893    * reference stream. Other streams might have run ahead a little bit,
1894    * but extra pieces won't be released to the muxer beyond the reference
1895    * stream cut-off anyway - so it forms the limit. */
1896   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1897   queued_time = splitmux->reference_ctx->in_running_time;
1898   /* queued_gop_time tracks how much unwritten data there is waiting to
1899    * be written to this fragment including this GOP */
1900   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
1901     queued_gop_time =
1902         splitmux->reference_ctx->in_running_time -
1903         splitmux->reference_ctx->out_running_time;
1904   else
1905     queued_gop_time =
1906         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
1907
1908   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1909
1910   g_assert (queued_gop_time >= 0);
1911   g_assert (queued_time >= splitmux->fragment_start_time);
1912
1913   queued_time -= splitmux->fragment_start_time;
1914   if (queued_time < queued_gop_time)
1915     queued_gop_time = queued_time;
1916
1917   /* Expand queued bytes estimate by muxer overhead */
1918   queued_bytes += (queued_bytes * splitmux->mux_overhead);
1919
1920   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1921       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1922   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1923     GST_LOG_OBJECT (splitmux,
1924         "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1925         GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1926         GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1927   }
1928
1929   /* Check for overrun - have we output at least one byte and overrun
1930    * either threshold? */
1931   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
1932     GstClockTime *sink_running_time = g_new (GstClockTime, 1);
1933     *sink_running_time = splitmux->reference_ctx->out_running_time;
1934     g_object_set_qdata_full (G_OBJECT (splitmux->sink),
1935         RUNNING_TIME, sink_running_time, g_free);
1936     g_atomic_int_set (&(splitmux->split_now), FALSE);
1937     /* Tell the output side to start a new fragment */
1938     GST_INFO_OBJECT (splitmux,
1939         "This GOP (dur %" GST_STIME_FORMAT
1940         ") would overflow the fragment, Sending start_new_fragment cmd",
1941         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1942             splitmux->gop_start_time));
1943     cmd = out_cmd_buf_new ();
1944     cmd->start_new_fragment = TRUE;
1945     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1946     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1947
1948     new_out_ts = splitmux->reference_ctx->in_running_time;
1949     splitmux->fragment_start_time = splitmux->gop_start_time;
1950     splitmux->fragment_total_bytes = 0;
1951
1952     if (request_next_keyframe (splitmux,
1953             splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1954       GST_WARNING_OBJECT (splitmux,
1955           "Could not request a keyframe. Files may not split at the exact location they should");
1956     }
1957     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1958   }
1959
1960   /* And set up to collect the next GOP */
1961   if (!splitmux->reference_ctx->in_eos) {
1962     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1963     splitmux->gop_start_time = new_out_ts;
1964   } else {
1965     /* This is probably already the current state, but just in case: */
1966     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1967     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
1968   }
1969
1970   /* And wake all input contexts to send a wake-up event */
1971   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1972   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1973
1974   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1975   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1976
1977   if (splitmux->gop_total_bytes > 0) {
1978     GST_LOG_OBJECT (splitmux,
1979         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1980         " time %" GST_STIME_FORMAT,
1981         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1982
1983     /* Send this GOP to the output command queue */
1984     cmd = out_cmd_buf_new ();
1985     cmd->start_new_fragment = FALSE;
1986     cmd->max_output_ts = new_out_ts;
1987     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1988         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1989     g_queue_push_head (&splitmux->out_cmd_q, cmd);
1990
1991     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1992   }
1993
1994   splitmux->gop_total_bytes = 0;
1995 }
1996
1997 /* Called with splitmux lock held */
1998 /* Called from each input pad when it is has all the pieces
1999  * for a GOP or EOS, starting with the reference pad which has set the
2000  * splitmux->max_in_running_time
2001  */
2002 static void
2003 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2004 {
2005   GList *cur;
2006   GstEvent *event;
2007
2008   /* On ENDING_FILE, the reference stream sends a command to start a new
2009    * fragment, then releases the GOP for output in the new fragment.
2010    *  If somes streams received no buffer during the last GOP that overran,
2011    * because its next buffer has a timestamp bigger than
2012    * ctx->max_in_running_time, its queue is empty. In that case the only
2013    * way to wakeup the output thread is by injecting an event in the
2014    * queue. This usually happen with subtitle streams.
2015    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2016   if (ctx->need_unblock) {
2017     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2018     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2019         GST_EVENT_TYPE_SERIALIZED,
2020         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2021             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2022
2023     GST_SPLITMUX_UNLOCK (splitmux);
2024     gst_pad_send_event (ctx->sinkpad, event);
2025     GST_SPLITMUX_LOCK (splitmux);
2026
2027     ctx->need_unblock = FALSE;
2028     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2029     /* state may have changed while we were unlocked. Loop again if so */
2030     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2031       return;
2032   }
2033
2034   if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2035     gboolean ready = TRUE;
2036
2037     /* Iterate each pad, and check that the input running time is at least
2038      * up to the reference running time, and if so handle the collected GOP */
2039     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2040         GST_STIME_FORMAT " ctx %p",
2041         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2042     for (cur = g_list_first (splitmux->contexts); cur != NULL;
2043         cur = g_list_next (cur)) {
2044       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2045
2046       GST_LOG_OBJECT (splitmux,
2047           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2048           " EOS %d", tmpctx, tmpctx->srcpad,
2049           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2050
2051       if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2052           tmpctx->in_running_time < splitmux->max_in_running_time &&
2053           !tmpctx->in_eos) {
2054         GST_LOG_OBJECT (splitmux,
2055             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2056             tmpctx, tmpctx->srcpad);
2057         ready = FALSE;
2058         break;
2059       }
2060     }
2061     if (ready) {
2062       GST_DEBUG_OBJECT (splitmux,
2063           "Collected GOP is complete. Processing (ctx %p)", ctx);
2064       /* All pads have a complete GOP, release it into the multiqueue */
2065       handle_gathered_gop (splitmux);
2066     }
2067   }
2068
2069   /* If upstream reached EOS we are not expecting more data, no need to wait
2070    * here. */
2071   if (ctx->in_eos)
2072     return;
2073
2074   /* Some pad is not yet ready, or GOP is being pushed
2075    * either way, sleep and wait to get woken */
2076   while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2077       !ctx->flushing &&
2078       (ctx->in_running_time >= splitmux->max_in_running_time) &&
2079       (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2080
2081     GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2082     GST_SPLITMUX_WAIT_INPUT (splitmux);
2083     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2084   }
2085 }
2086
2087 static GstPadProbeReturn
2088 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2089 {
2090   GstSplitMuxSink *splitmux = ctx->splitmux;
2091   GstBuffer *buf;
2092   MqStreamBuf *buf_info = NULL;
2093   GstClockTime ts;
2094   gboolean loop_again;
2095   gboolean keyframe = FALSE;
2096
2097   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2098
2099   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2100   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2101     g_warning ("Buffer list handling not implemented");
2102     return GST_PAD_PROBE_DROP;
2103   }
2104   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2105       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2106     GstEvent *event = gst_pad_probe_info_get_event (info);
2107
2108     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2109
2110     switch (GST_EVENT_TYPE (event)) {
2111       case GST_EVENT_SEGMENT:
2112         gst_event_copy_segment (event, &ctx->in_segment);
2113         break;
2114       case GST_EVENT_FLUSH_STOP:
2115         GST_SPLITMUX_LOCK (splitmux);
2116         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2117         ctx->in_eos = FALSE;
2118         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2119         GST_SPLITMUX_UNLOCK (splitmux);
2120         break;
2121       case GST_EVENT_EOS:
2122         GST_SPLITMUX_LOCK (splitmux);
2123         ctx->in_eos = TRUE;
2124
2125         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2126           goto beach;
2127
2128         if (ctx->is_reference) {
2129           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2130           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2131           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2132           /* Wake up other input pads to collect this GOP */
2133           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2134           check_completed_gop (splitmux, ctx);
2135         } else if (splitmux->input_state ==
2136             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2137           /* If we are waiting for a GOP to be completed (ie, for aux
2138            * pads to catch up), then this pad is complete, so check
2139            * if the whole GOP is.
2140            */
2141           check_completed_gop (splitmux, ctx);
2142         }
2143         GST_SPLITMUX_UNLOCK (splitmux);
2144         break;
2145       case GST_EVENT_GAP:{
2146         GstClockTime gap_ts;
2147         GstClockTimeDiff rtime;
2148
2149         gst_event_parse_gap (event, &gap_ts, NULL);
2150         if (gap_ts == GST_CLOCK_TIME_NONE)
2151           break;
2152
2153         GST_SPLITMUX_LOCK (splitmux);
2154
2155         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2156           goto beach;
2157         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2158
2159         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2160             GST_STIME_ARGS (rtime));
2161
2162         if (ctx->is_reference
2163             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2164           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2165           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2166               GST_STIME_ARGS (splitmux->fragment_start_time));
2167           /* Also take this as the first start time when starting up,
2168            * so that we start counting overflow from the first frame */
2169           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2170             splitmux->max_in_running_time = splitmux->fragment_start_time;
2171         }
2172
2173         GST_SPLITMUX_UNLOCK (splitmux);
2174         break;
2175       }
2176       default:
2177         break;
2178     }
2179     return GST_PAD_PROBE_PASS;
2180   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2181     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2182       case GST_QUERY_ALLOCATION:
2183         return GST_PAD_PROBE_DROP;
2184       default:
2185         return GST_PAD_PROBE_PASS;
2186     }
2187   }
2188
2189   buf = gst_pad_probe_info_get_buffer (info);
2190   buf_info = mq_stream_buf_new ();
2191
2192   if (GST_BUFFER_PTS_IS_VALID (buf))
2193     ts = GST_BUFFER_PTS (buf);
2194   else
2195     ts = GST_BUFFER_DTS (buf);
2196
2197   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2198
2199   GST_SPLITMUX_LOCK (splitmux);
2200
2201   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2202     goto beach;
2203
2204   /* If this buffer has a timestamp, advance the input timestamp of the
2205    * stream */
2206   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2207     GstClockTimeDiff running_time =
2208         my_segment_to_running_time (&ctx->in_segment, ts);
2209
2210     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2211         GST_STIME_ARGS (running_time));
2212
2213     if (GST_CLOCK_STIME_IS_VALID (running_time)
2214         && running_time > ctx->in_running_time)
2215       ctx->in_running_time = running_time;
2216   }
2217
2218   /* Try to make sure we have a valid running time */
2219   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2220     ctx->in_running_time =
2221         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2222   }
2223
2224   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2225       GST_STIME_ARGS (ctx->in_running_time));
2226
2227   buf_info->run_ts = ctx->in_running_time;
2228   buf_info->buf_size = gst_buffer_get_size (buf);
2229   buf_info->duration = GST_BUFFER_DURATION (buf);
2230
2231   /* initialize fragment_start_time */
2232   if (ctx->is_reference
2233       && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2234     splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2235     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2236         GST_STIME_ARGS (splitmux->fragment_start_time));
2237     gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2238
2239     /* Also take this as the first start time when starting up,
2240      * so that we start counting overflow from the first frame */
2241     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2242       splitmux->max_in_running_time = splitmux->fragment_start_time;
2243     if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2244       GST_WARNING_OBJECT (splitmux,
2245           "Could not request a keyframe. Files may not split at the exact location they should");
2246     }
2247     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2248   }
2249
2250   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2251       " total GOP bytes %" G_GUINT64_FORMAT,
2252       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2253
2254   loop_again = TRUE;
2255   do {
2256     if (ctx->flushing)
2257       break;
2258
2259     switch (splitmux->input_state) {
2260       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2261         if (ctx->is_reference) {
2262           /* This is the reference context. If it's a keyframe,
2263            * it marks the start of a new GOP and we should wait in
2264            * check_completed_gop before continuing, but either way
2265            * (keyframe or no, we'll pass this buffer through after
2266            * so set loop_again to FALSE */
2267           loop_again = FALSE;
2268
2269           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2270             /* Allow other input pads to catch up to here too */
2271             splitmux->max_in_running_time = ctx->in_running_time;
2272             GST_LOG_OBJECT (splitmux,
2273                 "Max in running time now %" GST_TIME_FORMAT,
2274                 GST_TIME_ARGS (splitmux->max_in_running_time));
2275             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2276             break;
2277           }
2278           GST_INFO_OBJECT (pad,
2279               "Have keyframe with running time %" GST_STIME_FORMAT,
2280               GST_STIME_ARGS (ctx->in_running_time));
2281           keyframe = TRUE;
2282           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2283           splitmux->max_in_running_time = ctx->in_running_time;
2284           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2285               GST_TIME_ARGS (splitmux->max_in_running_time));
2286           /* Wake up other input pads to collect this GOP */
2287           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2288           check_completed_gop (splitmux, ctx);
2289           /* Store this new keyframe to remember the start of GOP */
2290           gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2291         } else {
2292           /* Pass this buffer if the reference ctx is far enough ahead */
2293           if (ctx->in_running_time < splitmux->max_in_running_time) {
2294             loop_again = FALSE;
2295             break;
2296           }
2297
2298           /* We're still waiting for a keyframe on the reference pad, sleep */
2299           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2300           GST_SPLITMUX_WAIT_INPUT (splitmux);
2301           GST_LOG_OBJECT (pad,
2302               "Done sleeping for GOP start input state now %d",
2303               splitmux->input_state);
2304         }
2305         break;
2306       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2307         /* We're collecting a GOP. If this is the reference context,
2308          * we need to check if this is a keyframe that marks the start
2309          * of the next GOP. If it is, it marks the end of the GOP we're
2310          * collecting, so sleep and wait until all the other pads also
2311          * reach that timestamp - at which point, we have an entire GOP
2312          * and either go to ENDING_FILE or release this GOP to the muxer and
2313          * go back to COLLECT_GOP_START. */
2314
2315         /* If we overran the target timestamp, it might be time to process
2316          * the GOP, otherwise bail out for more data
2317          */
2318         GST_LOG_OBJECT (pad,
2319             "Checking TS %" GST_STIME_FORMAT " against max %"
2320             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2321             GST_STIME_ARGS (splitmux->max_in_running_time));
2322
2323         if (ctx->in_running_time < splitmux->max_in_running_time) {
2324           loop_again = FALSE;
2325           break;
2326         }
2327
2328         GST_LOG_OBJECT (pad,
2329             "Collected last packet of GOP. Checking other pads");
2330         check_completed_gop (splitmux, ctx);
2331         break;
2332       }
2333       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2334         loop_again = FALSE;
2335         break;
2336       default:
2337         loop_again = FALSE;
2338         break;
2339     }
2340   }
2341   while (loop_again);
2342
2343   if (keyframe) {
2344     splitmux->queued_keyframes++;
2345     buf_info->keyframe = TRUE;
2346   }
2347
2348   /* Update total input byte counter for overflow detect */
2349   splitmux->gop_total_bytes += buf_info->buf_size;
2350
2351   /* Now add this buffer to the queue just before returning */
2352   g_queue_push_head (&ctx->queued_bufs, buf_info);
2353
2354   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2355       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2356
2357   GST_SPLITMUX_UNLOCK (splitmux);
2358   return GST_PAD_PROBE_PASS;
2359
2360 beach:
2361   GST_SPLITMUX_UNLOCK (splitmux);
2362   if (buf_info)
2363     mq_stream_buf_free (buf_info);
2364   return GST_PAD_PROBE_PASS;
2365 }
2366
2367 static void
2368 grow_blocked_queues (GstSplitMuxSink * splitmux)
2369 {
2370   GList *cur;
2371
2372   /* Scan other queues for full-ness and grow them */
2373   for (cur = g_list_first (splitmux->contexts);
2374       cur != NULL; cur = g_list_next (cur)) {
2375     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2376     guint cur_limit;
2377     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2378
2379     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2380     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2381
2382     if (cur_len >= cur_limit) {
2383       cur_limit = cur_len + 1;
2384       GST_DEBUG_OBJECT (tmpctx->q,
2385           "Queue overflowed and needs enlarging. Growing to %u buffers",
2386           cur_limit);
2387       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2388     }
2389   }
2390 }
2391
2392 static void
2393 handle_q_underrun (GstElement * q, gpointer user_data)
2394 {
2395   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2396   GstSplitMuxSink *splitmux = ctx->splitmux;
2397
2398   GST_SPLITMUX_LOCK (splitmux);
2399   GST_DEBUG_OBJECT (q,
2400       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2401       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2402   grow_blocked_queues (splitmux);
2403   GST_SPLITMUX_UNLOCK (splitmux);
2404 }
2405
2406 static void
2407 handle_q_overrun (GstElement * q, gpointer user_data)
2408 {
2409   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2410   GstSplitMuxSink *splitmux = ctx->splitmux;
2411   gboolean allow_grow = FALSE;
2412
2413   GST_SPLITMUX_LOCK (splitmux);
2414   GST_DEBUG_OBJECT (q,
2415       "Queue reported overrun with %d keyframes and %d cmds enqueued",
2416       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2417
2418   if (splitmux->queued_keyframes < 2) {
2419     /* Less than a full GOP queued, grow the queue */
2420     allow_grow = TRUE;
2421   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2422     allow_grow = TRUE;
2423   } else {
2424     /* If another queue is starved, grow */
2425     GList *cur;
2426     for (cur = g_list_first (splitmux->contexts);
2427         cur != NULL; cur = g_list_next (cur)) {
2428       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2429       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2430         allow_grow = TRUE;
2431       }
2432     }
2433   }
2434   GST_SPLITMUX_UNLOCK (splitmux);
2435
2436   if (allow_grow) {
2437     guint cur_limit;
2438
2439     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2440     cur_limit++;
2441
2442     GST_DEBUG_OBJECT (q,
2443         "Queue overflowed and needs enlarging. Growing to %u buffers",
2444         cur_limit);
2445
2446     g_object_set (q, "max-size-buffers", cur_limit, NULL);
2447   }
2448 }
2449
2450 static GstPad *
2451 gst_splitmux_sink_request_new_pad (GstElement * element,
2452     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2453 {
2454   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2455   GstPadTemplate *mux_template = NULL;
2456   GstPad *res = NULL;
2457   GstElement *q;
2458   GstPad *q_sink = NULL, *q_src = NULL;
2459   gchar *gname;
2460   gboolean is_video = FALSE;
2461   MqStreamCtx *ctx;
2462
2463   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2464
2465   GST_SPLITMUX_LOCK (splitmux);
2466   if (!create_muxer (splitmux))
2467     goto fail;
2468   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2469
2470   if (templ->name_template) {
2471     if (g_str_equal (templ->name_template, "video")) {
2472       if (splitmux->have_video)
2473         goto already_have_video;
2474
2475       /* FIXME: Look for a pad template with matching caps, rather than by name */
2476       mux_template =
2477           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2478           (splitmux->muxer), "video_%u");
2479
2480       /* Fallback to find sink pad templates named 'video' (flvmux) */
2481       if (!mux_template) {
2482         mux_template =
2483             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2484             (splitmux->muxer), "video");
2485       }
2486       is_video = TRUE;
2487       name = NULL;
2488     } else {
2489       mux_template =
2490           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2491           (splitmux->muxer), templ->name_template);
2492
2493       /* Fallback to find sink pad templates named 'audio' (flvmux) */
2494       if (!mux_template) {
2495         mux_template =
2496             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2497             (splitmux->muxer), "audio");
2498         name = NULL;
2499       }
2500     }
2501     if (mux_template == NULL) {
2502       /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
2503       mux_template =
2504           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2505           (splitmux->muxer), "sink_%d");
2506       name = NULL;
2507     }
2508   }
2509
2510   res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2511   if (res == NULL)
2512     goto fail;
2513
2514   if (is_video)
2515     gname = g_strdup ("video");
2516   else if (name == NULL)
2517     gname = gst_pad_get_name (res);
2518   else
2519     gname = g_strdup (name);
2520
2521   if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2522     goto fail;
2523
2524   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2525
2526   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2527       "max-size-buffers", 5, NULL);
2528
2529   q_sink = gst_element_get_static_pad (q, "sink");
2530   q_src = gst_element_get_static_pad (q, "src");
2531
2532   if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2533     gst_element_release_request_pad (splitmux->muxer, res);
2534     gst_object_unref (GST_OBJECT (res));
2535     goto fail;
2536   }
2537
2538   gst_object_unref (GST_OBJECT (res));
2539
2540   ctx = mq_stream_ctx_new (splitmux);
2541   /* Context holds a ref: */
2542   ctx->q = gst_object_ref (q);
2543   ctx->srcpad = q_src;
2544   ctx->sinkpad = q_sink;
2545   ctx->q_overrun_id =
2546       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2547   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2548
2549   ctx->src_pad_block_id =
2550       gst_pad_add_probe (q_src,
2551       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2552       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2553   if (is_video && splitmux->reference_ctx != NULL) {
2554     splitmux->reference_ctx->is_reference = FALSE;
2555     splitmux->reference_ctx = NULL;
2556   }
2557   if (splitmux->reference_ctx == NULL) {
2558     splitmux->reference_ctx = ctx;
2559     ctx->is_reference = TRUE;
2560   }
2561
2562   res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2563   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2564
2565   ctx->sink_pad_block_id =
2566       gst_pad_add_probe (q_sink,
2567       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2568       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2569       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2570
2571   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2572       " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2573
2574   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2575
2576   g_free (gname);
2577
2578   if (is_video)
2579     splitmux->have_video = TRUE;
2580
2581   gst_pad_set_active (res, TRUE);
2582   gst_element_add_pad (element, res);
2583
2584   GST_SPLITMUX_UNLOCK (splitmux);
2585
2586   return res;
2587 fail:
2588   GST_SPLITMUX_UNLOCK (splitmux);
2589
2590   if (q_sink)
2591     gst_object_unref (q_sink);
2592   if (q_src)
2593     gst_object_unref (q_src);
2594   return NULL;
2595 already_have_video:
2596   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2597   GST_SPLITMUX_UNLOCK (splitmux);
2598   return NULL;
2599 }
2600
2601 static void
2602 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2603 {
2604   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2605   GstPad *muxpad = NULL;
2606   MqStreamCtx *ctx =
2607       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2608
2609   GST_SPLITMUX_LOCK (splitmux);
2610
2611   if (splitmux->muxer == NULL)
2612     goto fail;                  /* Elements don't exist yet - nothing to release */
2613
2614   GST_INFO_OBJECT (pad, "releasing request pad");
2615
2616   muxpad = gst_pad_get_peer (ctx->srcpad);
2617
2618   /* Remove the context from our consideration */
2619   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2620
2621   if (ctx->sink_pad_block_id)
2622     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2623
2624   if (ctx->src_pad_block_id)
2625     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2626
2627   /* Can release the context now */
2628   mq_stream_ctx_free (ctx);
2629   if (ctx == splitmux->reference_ctx)
2630     splitmux->reference_ctx = NULL;
2631
2632   /* Release and free the muxer input */
2633   if (muxpad) {
2634     gst_element_release_request_pad (splitmux->muxer, muxpad);
2635     gst_object_unref (muxpad);
2636   }
2637
2638   if (GST_PAD_PAD_TEMPLATE (pad) &&
2639       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2640               (pad)), "video"))
2641     splitmux->have_video = FALSE;
2642
2643   gst_element_remove_pad (element, pad);
2644
2645   /* Reset the internal elements only after all request pads are released */
2646   if (splitmux->contexts == NULL)
2647     gst_splitmux_reset (splitmux);
2648
2649 fail:
2650   GST_SPLITMUX_UNLOCK (splitmux);
2651 }
2652
2653 static GstElement *
2654 create_element (GstSplitMuxSink * splitmux,
2655     const gchar * factory, const gchar * name, gboolean locked)
2656 {
2657   GstElement *ret = gst_element_factory_make (factory, name);
2658   if (ret == NULL) {
2659     g_warning ("Failed to create %s - splitmuxsink will not work", name);
2660     return NULL;
2661   }
2662
2663   if (locked) {
2664     /* Ensure the sink starts in locked state and NULL - it will be changed
2665      * by the filename setting code */
2666     gst_element_set_locked_state (ret, TRUE);
2667     gst_element_set_state (ret, GST_STATE_NULL);
2668   }
2669
2670   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2671     g_warning ("Could not add %s element - splitmuxsink will not work", name);
2672     gst_object_unref (ret);
2673     return NULL;
2674   }
2675
2676   return ret;
2677 }
2678
2679 static gboolean
2680 create_muxer (GstSplitMuxSink * splitmux)
2681 {
2682   /* Create internal elements */
2683   if (splitmux->muxer == NULL) {
2684     GstElement *provided_muxer = NULL;
2685
2686     GST_OBJECT_LOCK (splitmux);
2687     if (splitmux->provided_muxer != NULL)
2688       provided_muxer = gst_object_ref (splitmux->provided_muxer);
2689     GST_OBJECT_UNLOCK (splitmux);
2690
2691     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2692         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2693       if ((splitmux->muxer =
2694               create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2695         goto fail;
2696     } else if (splitmux->async_finalize) {
2697       if ((splitmux->muxer =
2698               create_element (splitmux, splitmux->muxer_factory, "muxer",
2699                   FALSE)) == NULL)
2700         goto fail;
2701       if (splitmux->muxer_properties)
2702         gst_structure_foreach (splitmux->muxer_properties,
2703             _set_property_from_structure, splitmux->muxer);
2704     } else {
2705       /* Ensure it's not in locked state (we might be reusing an old element) */
2706       gst_element_set_locked_state (provided_muxer, FALSE);
2707       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2708         g_warning ("Could not add muxer element - splitmuxsink will not work");
2709         gst_object_unref (provided_muxer);
2710         goto fail;
2711       }
2712
2713       splitmux->muxer = provided_muxer;
2714       gst_object_unref (provided_muxer);
2715     }
2716
2717     if (splitmux->use_robust_muxing) {
2718       update_muxer_properties (splitmux);
2719     }
2720   }
2721
2722   return TRUE;
2723 fail:
2724   return FALSE;
2725 }
2726
2727 static GstElement *
2728 find_sink (GstElement * e)
2729 {
2730   GstElement *res = NULL;
2731   GstIterator *iter;
2732   gboolean done = FALSE;
2733   GValue data = { 0, };
2734
2735   if (!GST_IS_BIN (e))
2736     return e;
2737
2738   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2739     return e;
2740
2741   iter = gst_bin_iterate_sinks (GST_BIN (e));
2742   while (!done) {
2743     switch (gst_iterator_next (iter, &data)) {
2744       case GST_ITERATOR_OK:
2745       {
2746         GstElement *child = g_value_get_object (&data);
2747         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2748                 "location") != NULL) {
2749           res = child;
2750           done = TRUE;
2751         }
2752         g_value_reset (&data);
2753         break;
2754       }
2755       case GST_ITERATOR_RESYNC:
2756         gst_iterator_resync (iter);
2757         break;
2758       case GST_ITERATOR_DONE:
2759         done = TRUE;
2760         break;
2761       case GST_ITERATOR_ERROR:
2762         g_assert_not_reached ();
2763         break;
2764     }
2765   }
2766   g_value_unset (&data);
2767   gst_iterator_free (iter);
2768
2769   return res;
2770 }
2771
2772 static gboolean
2773 create_sink (GstSplitMuxSink * splitmux)
2774 {
2775   GstElement *provided_sink = NULL;
2776
2777   if (splitmux->active_sink == NULL) {
2778
2779     GST_OBJECT_LOCK (splitmux);
2780     if (splitmux->provided_sink != NULL)
2781       provided_sink = gst_object_ref (splitmux->provided_sink);
2782     GST_OBJECT_UNLOCK (splitmux);
2783
2784     if ((!splitmux->async_finalize && provided_sink == NULL) ||
2785         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2786       if ((splitmux->sink =
2787               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2788         goto fail;
2789       splitmux->active_sink = splitmux->sink;
2790     } else if (splitmux->async_finalize) {
2791       if ((splitmux->sink =
2792               create_element (splitmux, splitmux->sink_factory, "sink",
2793                   TRUE)) == NULL)
2794         goto fail;
2795       if (splitmux->sink_properties)
2796         gst_structure_foreach (splitmux->sink_properties,
2797             _set_property_from_structure, splitmux->sink);
2798       splitmux->active_sink = splitmux->sink;
2799     } else {
2800       /* Ensure the sink starts in locked state and NULL - it will be changed
2801        * by the filename setting code */
2802       gst_element_set_locked_state (provided_sink, TRUE);
2803       gst_element_set_state (provided_sink, GST_STATE_NULL);
2804       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2805         g_warning ("Could not add sink elements - splitmuxsink will not work");
2806         gst_object_unref (provided_sink);
2807         goto fail;
2808       }
2809
2810       splitmux->active_sink = provided_sink;
2811
2812       /* The bin holds a ref now, we can drop our tmp ref */
2813       gst_object_unref (provided_sink);
2814
2815       /* Find the sink element */
2816       splitmux->sink = find_sink (splitmux->active_sink);
2817       if (splitmux->sink == NULL) {
2818         g_warning
2819             ("Could not locate sink element in provided sink - splitmuxsink will not work");
2820         goto fail;
2821       }
2822     }
2823
2824 #if 1
2825     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2826             "async") != NULL) {
2827       /* async child elements are causing state change races and weird
2828        * failures, so let's try and turn that off */
2829       g_object_set (splitmux->sink, "async", FALSE, NULL);
2830     }
2831 #endif
2832
2833     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2834       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2835       goto fail;
2836     }
2837   }
2838
2839   return TRUE;
2840 fail:
2841   return FALSE;
2842 }
2843
2844 #ifdef __GNUC__
2845 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2846 #endif
2847 static void
2848 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2849 {
2850   gchar *fname = NULL;
2851   GstSample *sample;
2852   GstCaps *caps;
2853
2854   gst_splitmux_sink_ensure_max_files (splitmux);
2855
2856   if (ctx->cur_out_buffer == NULL) {
2857     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2858   }
2859
2860   caps = gst_pad_get_current_caps (ctx->srcpad);
2861   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2862   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2863       splitmux->fragment_id, sample, &fname);
2864   gst_sample_unref (sample);
2865   if (caps)
2866     gst_caps_unref (caps);
2867
2868   if (fname == NULL) {
2869     /* Fallback to the old signal if the new one returned nothing */
2870     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2871         splitmux->fragment_id, &fname);
2872   }
2873
2874   if (!fname)
2875     fname = splitmux->location ?
2876         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2877
2878   if (fname) {
2879     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2880     g_object_set (splitmux->sink, "location", fname, NULL);
2881     g_free (fname);
2882
2883     splitmux->fragment_id++;
2884   }
2885 }
2886
2887 static void
2888 do_async_start (GstSplitMuxSink * splitmux)
2889 {
2890   GstMessage *message;
2891
2892   if (!splitmux->need_async_start) {
2893     GST_INFO_OBJECT (splitmux, "no async_start needed");
2894     return;
2895   }
2896
2897   splitmux->async_pending = TRUE;
2898
2899   GST_INFO_OBJECT (splitmux, "Sending async_start message");
2900   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2901   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2902       (splitmux), message);
2903 }
2904
2905 static void
2906 do_async_done (GstSplitMuxSink * splitmux)
2907 {
2908   GstMessage *message;
2909
2910   if (splitmux->async_pending) {
2911     GST_INFO_OBJECT (splitmux, "Sending async_done message");
2912     message =
2913         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2914         GST_CLOCK_TIME_NONE);
2915     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2916         (splitmux), message);
2917
2918     splitmux->async_pending = FALSE;
2919   }
2920
2921   splitmux->need_async_start = FALSE;
2922 }
2923
2924 static GstStateChangeReturn
2925 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2926 {
2927   GstStateChangeReturn ret;
2928   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2929
2930   switch (transition) {
2931     case GST_STATE_CHANGE_NULL_TO_READY:{
2932       GST_SPLITMUX_LOCK (splitmux);
2933       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2934         ret = GST_STATE_CHANGE_FAILURE;
2935         GST_SPLITMUX_UNLOCK (splitmux);
2936         goto beach;
2937       }
2938       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2939       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2940       GST_SPLITMUX_UNLOCK (splitmux);
2941       splitmux->fragment_id = 0;
2942       break;
2943     }
2944     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2945       GST_SPLITMUX_LOCK (splitmux);
2946       /* Start by collecting one input on each pad */
2947       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2948       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2949       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2950       splitmux->gop_start_time = splitmux->fragment_start_time =
2951           GST_CLOCK_STIME_NONE;
2952       splitmux->muxed_out_bytes = 0;
2953       splitmux->ready_for_output = FALSE;
2954       GST_SPLITMUX_UNLOCK (splitmux);
2955       break;
2956     }
2957     case GST_STATE_CHANGE_PAUSED_TO_READY:
2958       g_atomic_int_set (&(splitmux->split_now), FALSE);
2959     case GST_STATE_CHANGE_READY_TO_NULL:
2960       GST_SPLITMUX_LOCK (splitmux);
2961       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2962       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2963       /* Wake up any blocked threads */
2964       GST_LOG_OBJECT (splitmux,
2965           "State change -> NULL or READY. Waking threads");
2966       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2967       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2968       GST_SPLITMUX_UNLOCK (splitmux);
2969       break;
2970     default:
2971       break;
2972   }
2973
2974   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2975   if (ret == GST_STATE_CHANGE_FAILURE)
2976     goto beach;
2977
2978   switch (transition) {
2979     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2980       splitmux->need_async_start = TRUE;
2981       break;
2982     case GST_STATE_CHANGE_READY_TO_PAUSED:{
2983       /* Change state async, because our child sink might not
2984        * be ready to do that for us yet if it's state is still locked */
2985
2986       splitmux->need_async_start = TRUE;
2987       /* we want to go async to PAUSED until we managed to configure and add the
2988        * sink */
2989       GST_SPLITMUX_LOCK (splitmux);
2990       do_async_start (splitmux);
2991       GST_SPLITMUX_UNLOCK (splitmux);
2992       ret = GST_STATE_CHANGE_ASYNC;
2993       break;
2994     }
2995     case GST_STATE_CHANGE_READY_TO_NULL:
2996       GST_SPLITMUX_LOCK (splitmux);
2997       splitmux->fragment_id = 0;
2998       /* Reset internal elements only if no pad contexts are using them */
2999       if (splitmux->contexts == NULL)
3000         gst_splitmux_reset (splitmux);
3001       do_async_done (splitmux);
3002       GST_SPLITMUX_UNLOCK (splitmux);
3003       break;
3004     default:
3005       break;
3006   }
3007
3008 beach:
3009   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3010       ret == GST_STATE_CHANGE_FAILURE) {
3011     /* Cleanup elements on failed transition out of NULL */
3012     gst_splitmux_reset (splitmux);
3013     GST_SPLITMUX_LOCK (splitmux);
3014     do_async_done (splitmux);
3015     GST_SPLITMUX_UNLOCK (splitmux);
3016   }
3017   return ret;
3018 }
3019
3020 gboolean
3021 register_splitmuxsink (GstPlugin * plugin)
3022 {
3023   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3024       "Split File Muxing Sink");
3025
3026   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3027       GST_TYPE_SPLITMUX_SINK);
3028 }
3029
3030 static void
3031 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3032 {
3033   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3034     splitmux->fragment_id = 0;
3035   }
3036 }
3037
3038 static void
3039 split_now (GstSplitMuxSink * splitmux)
3040 {
3041   g_atomic_int_set (&(splitmux->split_now), TRUE);
3042 }