1 /* GStreamer Split Demuxer bin that recombines files created by
2 * the splitmuxsink element.
4 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
27 #include "gstsplitmuxsrc.h"
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
46 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
48 typedef struct _GstSplitMuxPartPad
52 /* Reader we belong to */
53 GstSplitMuxPartReader *reader;
54 /* Output splitmuxsrc source pad */
67 GstSegment orig_segment;
68 GstClockTime initial_ts_offset;
71 typedef struct _GstSplitMuxPartPadClass
74 } GstSplitMuxPartPadClass;
76 static GType gst_splitmux_part_pad_get_type (void);
77 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
78 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
80 static void splitmux_part_pad_constructed (GObject * pad);
81 static void splitmux_part_pad_finalize (GObject * pad);
82 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
83 GstSplitMuxPartPad * part_pad, GstBuffer * buf);
85 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
86 guint visible, guint bytes, guint64 time, gpointer checkdata);
87 static void type_found (GstElement * typefind, guint probability,
88 GstCaps * caps, GstSplitMuxPartReader * reader);
89 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
91 /* Called with reader lock held */
93 have_empty_queue (GstSplitMuxPartReader * reader)
97 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
98 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
99 if (part_pad->is_eos) {
100 GST_LOG_OBJECT (part_pad, "Pad is EOS");
103 if (gst_data_queue_is_empty (part_pad->queue)) {
104 GST_LOG_OBJECT (part_pad, "Queue is empty");
112 /* Called with reader lock held */
114 block_until_can_push (GstSplitMuxPartReader * reader)
116 while (reader->running) {
117 if (reader->flushing)
119 if (reader->active && have_empty_queue (reader))
122 GST_LOG_OBJECT (reader,
123 "Waiting for activation or empty queue on reader %s", reader->path);
124 SPLITMUX_PART_WAIT (reader);
127 GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
128 reader->path, reader->active, reader->flushing);
130 return reader->active && !reader->flushing;
134 handle_buffer_measuring (GstSplitMuxPartReader * reader,
135 GstSplitMuxPartPad * part_pad, GstBuffer * buf)
137 GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
138 GstClockTimeDiff offset;
140 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
141 !part_pad->seen_buffer) {
142 /* If this is the first buffer on the pad in the collect_streams state,
143 * then calculate inital offset based on running time of this segment */
144 part_pad->initial_ts_offset =
145 part_pad->orig_segment.start + part_pad->orig_segment.base -
146 part_pad->orig_segment.time;
147 GST_DEBUG_OBJECT (reader,
148 "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
149 part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
151 part_pad->seen_buffer = TRUE;
153 /* Adjust buffer timestamps */
154 offset = reader->start_offset + part_pad->segment.base;
155 offset -= part_pad->initial_ts_offset;
157 /* Update the stored max duration on the pad,
158 * always preferring making DTS contiguous
160 if (GST_BUFFER_DTS_IS_VALID (buf))
161 ts = GST_BUFFER_DTS (buf) + offset;
162 else if (GST_BUFFER_PTS_IS_VALID (buf))
163 ts = GST_BUFFER_PTS (buf) + offset;
165 GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
166 " incoming PTS %" GST_TIME_FORMAT
167 " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
168 " to %" GST_STIME_FORMAT, part_pad,
169 GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
170 GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
171 GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
173 if (GST_CLOCK_STIME_IS_VALID (ts)) {
174 if (GST_BUFFER_DURATION_IS_VALID (buf))
175 ts += GST_BUFFER_DURATION (buf);
177 if (GST_CLOCK_STIME_IS_VALID (ts)
178 && ts > (GstClockTimeDiff) part_pad->max_ts) {
179 part_pad->max_ts = ts;
180 GST_LOG_OBJECT (reader,
181 "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
182 GST_TIME_ARGS (part_pad->max_ts));
185 /* Is it time to move to measuring state yet? */
186 check_if_pads_collected (reader);
190 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
191 guint visible, guint bytes, guint64 time, gpointer checkdata)
193 /* Arbitrary safety limit. If we hit it, playback is likely to stall */
194 if (time > 20 * GST_SECOND)
200 splitmux_part_free_queue_item (GstDataQueueItem * item)
202 gst_mini_object_unref (item->object);
203 g_slice_free (GstDataQueueItem, item);
207 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
209 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
210 GstSplitMuxPartReader *reader = part_pad->reader;
211 GstDataQueueItem *item;
212 GstClockTimeDiff offset;
214 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
215 SPLITMUX_PART_LOCK (reader);
217 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
218 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
219 handle_buffer_measuring (reader, part_pad, buf);
220 gst_buffer_unref (buf);
221 SPLITMUX_PART_UNLOCK (reader);
225 if (!block_until_can_push (reader)) {
227 SPLITMUX_PART_UNLOCK (reader);
228 gst_buffer_unref (buf);
229 return GST_FLOW_FLUSHING;
232 /* Adjust buffer timestamps */
233 offset = reader->start_offset + part_pad->segment.base;
234 offset -= part_pad->initial_ts_offset;
236 if (GST_BUFFER_PTS_IS_VALID (buf))
237 GST_BUFFER_PTS (buf) += offset;
238 if (GST_BUFFER_DTS_IS_VALID (buf))
239 GST_BUFFER_DTS (buf) += offset;
241 /* We are active, and one queue is empty, place this buffer in
243 GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
244 item = g_slice_new (GstDataQueueItem);
245 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
246 item->object = GST_MINI_OBJECT (buf);
247 item->size = gst_buffer_get_size (buf);
248 item->duration = GST_BUFFER_DURATION (buf);
249 if (item->duration == GST_CLOCK_TIME_NONE)
251 item->visible = TRUE;
253 gst_object_ref (part_pad);
255 SPLITMUX_PART_UNLOCK (reader);
257 if (!gst_data_queue_push (part_pad->queue, item)) {
258 splitmux_part_free_queue_item (item);
259 gst_object_unref (part_pad);
260 return GST_FLOW_FLUSHING;
263 gst_object_unref (part_pad);
267 /* Called with splitmux part lock held */
269 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
272 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
273 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
274 if (!part_pad->is_eos)
282 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
285 GST_LOG_OBJECT (part, "Checking for preroll");
286 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
287 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
288 if (!part_pad->seen_buffer) {
289 GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
294 GST_LOG_OBJECT (part, "Part is prerolled");
300 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
304 SPLITMUX_PART_LOCK (reader);
305 res = splitmux_part_is_eos_locked (reader);
306 SPLITMUX_PART_UNLOCK (reader);
311 /* Called with splitmux part lock held */
313 splitmux_is_flushing (GstSplitMuxPartReader * reader)
316 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
317 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
318 if (part_pad->flushing)
326 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
328 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
329 GstSplitMuxPartReader *reader = part_pad->reader;
331 SplitMuxSrcPad *target;
332 GstDataQueueItem *item;
334 SPLITMUX_PART_LOCK (reader);
336 target = gst_object_ref (part_pad->target);
338 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
341 if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
344 switch (GST_EVENT_TYPE (event)) {
345 case GST_EVENT_STREAM_START:{
346 GstStreamFlags flags;
347 gst_event_parse_stream_flags (event, &flags);
348 part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
351 case GST_EVENT_SEGMENT:{
352 GstSegment *seg = &part_pad->segment;
354 GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
356 gst_event_copy_segment (event, seg);
357 gst_event_copy_segment (event, &part_pad->orig_segment);
359 if (seg->format != GST_FORMAT_TIME)
363 /* Adjust start/stop so the overall file is 0 + start_offset based */
364 if (seg->stop != -1) {
365 seg->stop -= seg->start;
366 seg->stop += seg->time + reader->start_offset;
368 seg->start = seg->time + reader->start_offset;
369 seg->time += reader->start_offset;
370 seg->position += reader->start_offset;
372 GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
375 gst_event_unref (event);
376 event = gst_event_new_segment (seg);
378 if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
379 && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
380 break; /* Only do further stuff with segments during initial measuring */
382 /* Take the first segment from the first part */
383 if (target->segment.format == GST_FORMAT_UNDEFINED) {
384 gst_segment_copy_into (seg, &target->segment);
385 GST_DEBUG_OBJECT (reader,
386 "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
389 if (seg->stop != -1 && target->segment.stop != -1) {
390 GstClockTime stop = seg->base + seg->stop;
391 if (stop > target->segment.stop) {
392 target->segment.stop = stop;
393 GST_DEBUG_OBJECT (reader,
394 "Adjusting segment stop by %" GST_TIME_FORMAT
395 " output now %" GST_SEGMENT_FORMAT,
396 GST_TIME_ARGS (reader->start_offset), &target->segment);
399 GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
404 GST_DEBUG_OBJECT (part_pad,
405 "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
406 reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
408 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
409 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
410 /* Mark this pad as EOS */
411 part_pad->is_eos = TRUE;
412 if (splitmux_part_is_eos_locked (reader)) {
413 /* Finished measuring things, set state and tell the state change func
414 * so it can seek back to the start */
415 GST_LOG_OBJECT (reader,
416 "EOS while measuring streams. Resetting for ready");
417 reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
418 SPLITMUX_PART_BROADCAST (reader);
424 case GST_EVENT_FLUSH_START:
425 reader->flushing = TRUE;
426 part_pad->flushing = TRUE;
427 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
429 gst_data_queue_set_flushing (part_pad->queue, TRUE);
430 SPLITMUX_PART_BROADCAST (reader);
432 case GST_EVENT_FLUSH_STOP:{
433 gst_data_queue_set_flushing (part_pad->queue, FALSE);
434 gst_data_queue_flush (part_pad->queue);
435 part_pad->seen_buffer = FALSE;
436 part_pad->flushing = FALSE;
437 part_pad->is_eos = FALSE;
439 reader->flushing = splitmux_is_flushing (reader);
440 GST_LOG_OBJECT (reader,
441 "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
442 reader->path, pad, reader->flushing);
443 SPLITMUX_PART_BROADCAST (reader);
450 /* Don't send events downstream while preparing */
451 if (reader->prep_state != PART_STATE_READY)
454 /* Don't pass flush events - those are done by the parent */
455 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
456 GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
459 if (!block_until_can_push (reader))
462 switch (GST_EVENT_TYPE (event)) {
464 /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
471 /* We are active, and one queue is empty, place this buffer in
473 gst_object_ref (part_pad->queue);
474 SPLITMUX_PART_UNLOCK (reader);
476 GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
477 item = g_slice_new (GstDataQueueItem);
478 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
479 item->object = GST_MINI_OBJECT (event);
482 if (item->duration == GST_CLOCK_TIME_NONE)
484 item->visible = FALSE;
486 if (!gst_data_queue_push (part_pad->queue, item)) {
487 splitmux_part_free_queue_item (item);
491 gst_object_unref (part_pad->queue);
492 gst_object_unref (target);
496 gst_event_unref (event);
497 gst_object_unref (target);
498 SPLITMUX_PART_UNLOCK (reader);
499 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
500 ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
504 GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
505 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
506 gst_event_unref (event);
507 gst_object_unref (target);
508 SPLITMUX_PART_UNLOCK (reader);
513 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
515 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
516 GstSplitMuxPartReader *reader = part_pad->reader;
518 gboolean ret = FALSE;
521 SPLITMUX_PART_LOCK (reader);
522 target = gst_object_ref (part_pad->target);
523 active = reader->active;
524 SPLITMUX_PART_UNLOCK (reader);
527 GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
528 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
530 ret = gst_pad_query (target, query);
533 gst_object_unref (target);
538 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
541 splitmux_part_pad_constructed (GObject * pad)
543 gst_pad_set_chain_function (GST_PAD (pad),
544 GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
545 gst_pad_set_event_function (GST_PAD (pad),
546 GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
547 gst_pad_set_query_function (GST_PAD (pad),
548 GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
550 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
554 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
556 GObjectClass *gobject_klass = (GObjectClass *) (klass);
558 gobject_klass->constructed = splitmux_part_pad_constructed;
559 gobject_klass->finalize = splitmux_part_pad_finalize;
563 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
565 pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
567 gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
568 gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
572 splitmux_part_pad_finalize (GObject * obj)
574 GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
576 GST_DEBUG_OBJECT (obj, "finalize");
577 gst_data_queue_set_flushing (pad->queue, TRUE);
578 gst_data_queue_flush (pad->queue);
579 gst_object_unref (GST_OBJECT_CAST (pad->queue));
582 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
586 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
587 GstSplitMuxPartReader * part);
588 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
589 static GstStateChangeReturn
590 gst_splitmux_part_reader_change_state (GstElement * element,
591 GstStateChange transition);
592 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
594 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
595 * part, gboolean flushing);
596 static void bus_handler (GstBin * bin, GstMessage * msg);
597 static void splitmux_part_reader_dispose (GObject * object);
598 static void splitmux_part_reader_finalize (GObject * object);
599 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
601 #define gst_splitmux_part_reader_parent_class parent_class
602 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
606 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
608 GObjectClass *gobject_klass = (GObjectClass *) (klass);
609 GstElementClass *gstelement_class = (GstElementClass *) klass;
610 GstBinClass *gstbin_class = (GstBinClass *) klass;
612 GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
613 "Split File Demuxing Source helper");
615 gobject_klass->dispose = splitmux_part_reader_dispose;
616 gobject_klass->finalize = splitmux_part_reader_finalize;
618 part_reader_signals[SIGNAL_PREPARED] =
619 g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
620 G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
621 prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
622 gstelement_class->change_state = gst_splitmux_part_reader_change_state;
623 gstelement_class->send_event = gst_splitmux_part_reader_send_event;
625 gstbin_class->handle_message = bus_handler;
629 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
631 GstElement *typefind;
633 reader->active = FALSE;
634 reader->duration = GST_CLOCK_TIME_NONE;
636 g_cond_init (&reader->inactive_cond);
637 g_mutex_init (&reader->lock);
638 g_mutex_init (&reader->type_lock);
640 /* FIXME: Create elements on a state change */
641 reader->src = gst_element_factory_make ("filesrc", NULL);
642 if (reader->src == NULL) {
643 GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
646 gst_bin_add (GST_BIN_CAST (reader), reader->src);
648 typefind = gst_element_factory_make ("typefind", NULL);
650 GST_ERROR_OBJECT (reader,
651 "Failed to create typefind element - check your installation");
655 gst_bin_add (GST_BIN_CAST (reader), typefind);
656 reader->typefind = typefind;
658 if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
659 GST_ERROR_OBJECT (reader,
660 "Failed to link typefind element - check your installation");
664 g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
669 splitmux_part_reader_dispose (GObject * object)
671 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
673 splitmux_part_reader_reset (reader);
675 G_OBJECT_CLASS (parent_class)->dispose (object);
679 splitmux_part_reader_finalize (GObject * object)
681 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
683 g_cond_clear (&reader->inactive_cond);
684 g_mutex_clear (&reader->lock);
685 g_mutex_clear (&reader->type_lock);
687 g_free (reader->path);
689 G_OBJECT_CLASS (parent_class)->finalize (object);
693 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
697 SPLITMUX_PART_LOCK (reader);
698 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
699 GstPad *pad = GST_PAD_CAST (cur->data);
700 gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
701 gst_object_unref (GST_OBJECT_CAST (pad));
704 g_list_free (reader->pads);
706 SPLITMUX_PART_UNLOCK (reader);
709 static GstSplitMuxPartPad *
710 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
713 GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
714 "name", GST_PAD_NAME (target),
715 "direction", GST_PAD_SINK,
717 pad->target = target;
718 pad->reader = reader;
720 gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
726 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
727 GstSplitMuxPartReader * reader)
729 GstPad *out_pad = NULL;
730 GstSplitMuxPartPad *proxy_pad;
732 GstPadLinkReturn link_ret;
734 caps = gst_pad_get_current_caps (pad);
736 GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
737 " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
739 gst_caps_unref (caps);
741 /* Look up or create the output pad */
742 if (reader->get_pad_cb)
743 out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
744 if (out_pad == NULL) {
745 GST_DEBUG_OBJECT (reader,
746 "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
750 /* Create our proxy pad to interact with this new pad */
751 proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
752 GST_DEBUG_OBJECT (reader,
753 "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
756 link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
757 if (link_ret != GST_PAD_LINK_OK) {
758 gst_object_unref (proxy_pad);
759 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
760 ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
761 " ret %d", reader->path, pad, link_ret));
764 GST_DEBUG_OBJECT (reader,
765 "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
768 SPLITMUX_PART_LOCK (reader);
769 reader->pads = g_list_prepend (reader->pads, proxy_pad);
770 SPLITMUX_PART_UNLOCK (reader);
774 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
776 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
777 gboolean ret = FALSE;
780 /* Send event to the first source pad we found */
781 SPLITMUX_PART_LOCK (reader);
783 GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
784 pad = gst_pad_get_peer (proxy_pad);
786 SPLITMUX_PART_UNLOCK (reader);
789 ret = gst_pad_send_event (pad, event);
790 gst_object_unref (pad);
792 gst_event_unref (event);
798 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
800 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
803 SPLITMUX_PART_UNLOCK (reader);
804 GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
805 GST_TIME_ARGS (time));
806 gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
807 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
808 GST_SEEK_TYPE_END, 0);
810 SPLITMUX_PART_LOCK (reader);
812 /* Wait for flush to finish, so old data is gone */
813 while (reader->flushing) {
814 GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
815 SPLITMUX_PART_WAIT (reader);
819 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
821 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
822 GstSegment * target_seg, GstSeekFlags extra_flags)
825 GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
827 flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
829 SPLITMUX_PART_LOCK (reader);
830 if (target_seg->start >= reader->start_offset)
831 start = target_seg->start - reader->start_offset;
832 /* If the segment stop is within this part, don't play to the end */
833 if (target_seg->stop != -1 &&
834 target_seg->stop < reader->start_offset + reader->duration)
835 stop = target_seg->stop - reader->start_offset;
837 SPLITMUX_PART_UNLOCK (reader);
839 GST_DEBUG_OBJECT (reader,
840 "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
841 GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
842 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
844 return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
845 target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
849 /* Called with lock held */
851 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
853 /* Trigger a flushing seek to near the end of the file and run each stream
854 * to EOS in order to find the smallest end timestamp to start the next
857 if (GST_CLOCK_TIME_IS_VALID (reader->duration)
858 && reader->duration > GST_SECOND) {
859 GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
860 gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
863 /* Wait for things to happen */
864 while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
865 SPLITMUX_PART_WAIT (reader);
867 if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
868 /* Fire the prepared signal and go to READY state */
869 GST_DEBUG_OBJECT (reader,
870 "Stream measuring complete. File %s is now ready. Firing prepared signal",
872 reader->prep_state = PART_STATE_READY;
873 g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
878 find_demuxer (GstCaps * caps)
881 gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
883 GList *compat_elements;
884 GstElement *e = NULL;
886 if (factories == NULL)
890 gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
892 if (compat_elements) {
893 /* Just take the first (highest ranked) option */
894 GstElementFactory *factory =
895 GST_ELEMENT_FACTORY_CAST (compat_elements->data);
896 e = gst_element_factory_create (factory, NULL);
897 gst_plugin_feature_list_free (compat_elements);
901 gst_plugin_feature_list_free (factories);
907 type_found (GstElement * typefind, guint probability,
908 GstCaps * caps, GstSplitMuxPartReader * reader)
912 GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
914 /* typefind found a type. Look for the demuxer to handle it */
915 demux = reader->demux = find_demuxer (caps);
916 if (reader->demux == NULL) {
917 GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
921 /* Connect to demux signals */
922 g_signal_connect (demux,
923 "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
924 g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
926 gst_element_set_locked_state (demux, TRUE);
927 gst_bin_add (GST_BIN_CAST (reader), demux);
928 gst_element_link_pads (reader->typefind, "src", demux, NULL);
929 gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
930 gst_element_set_locked_state (demux, FALSE);
934 check_if_pads_collected (GstSplitMuxPartReader * reader)
936 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
937 /* Check we have all pads and each pad has seen a buffer */
938 if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
939 GST_DEBUG_OBJECT (reader,
940 "no more pads - file %s. Measuring stream length", reader->path);
941 reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
942 SPLITMUX_PART_BROADCAST (reader);
948 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
950 GstClockTime duration = GST_CLOCK_TIME_NONE;
952 /* Query the minimum duration of any pad in this piece and store it.
953 * FIXME: Only consider audio and video */
954 SPLITMUX_PART_LOCK (reader);
955 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
956 GstPad *target = GST_PAD_CAST (cur->data);
959 if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
960 GST_INFO_OBJECT (reader,
961 "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
962 reader->path, target, GST_TIME_ARGS (cur_duration));
963 if (cur_duration < duration)
964 duration = cur_duration;
968 GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
969 reader->path, GST_TIME_ARGS (duration));
970 reader->duration = (GstClockTime) duration;
972 reader->no_more_pads = TRUE;
974 check_if_pads_collected (reader);
975 SPLITMUX_PART_UNLOCK (reader);
979 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
980 GstPad * src_pad, GstQuery * query)
982 GstPad *target = NULL;
986 SPLITMUX_PART_LOCK (part);
987 /* Find the pad corresponding to the visible output target pad */
988 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
989 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
990 if (part_pad->target == src_pad) {
991 target = gst_object_ref (GST_OBJECT_CAST (part_pad));
995 SPLITMUX_PART_UNLOCK (part);
1000 ret = gst_pad_peer_query (target, query);
1005 /* Post-massaging of queries */
1006 switch (GST_QUERY_TYPE (query)) {
1007 case GST_QUERY_POSITION:{
1011 gst_query_parse_position (query, &fmt, &position);
1012 if (fmt != GST_FORMAT_TIME)
1014 SPLITMUX_PART_LOCK (part);
1015 position += part->start_offset;
1016 GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1017 GST_TIME_ARGS (position));
1018 SPLITMUX_PART_UNLOCK (part);
1020 gst_query_set_position (query, fmt, position);
1028 gst_object_unref (target);
1032 static GstStateChangeReturn
1033 gst_splitmux_part_reader_change_state (GstElement * element,
1034 GstStateChange transition)
1036 GstStateChangeReturn ret;
1037 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1039 switch (transition) {
1040 case GST_STATE_CHANGE_NULL_TO_READY:{
1043 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1044 /* Hold the splitmux type lock until after the
1045 * parent state change function has finished
1046 * changing the states of things, and type finding can continue */
1047 SPLITMUX_PART_LOCK (reader);
1048 g_object_set (reader->src, "location", reader->path, NULL);
1049 reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1050 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1051 reader->running = TRUE;
1052 SPLITMUX_PART_UNLOCK (reader);
1053 SPLITMUX_PART_TYPE_LOCK (reader);
1056 case GST_STATE_CHANGE_READY_TO_NULL:
1057 case GST_STATE_CHANGE_PAUSED_TO_READY:
1058 SPLITMUX_PART_LOCK (reader);
1059 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1060 reader->running = FALSE;
1061 SPLITMUX_PART_BROADCAST (reader);
1062 SPLITMUX_PART_UNLOCK (reader);
1064 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1065 SPLITMUX_PART_LOCK (reader);
1066 reader->active = FALSE;
1067 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1068 SPLITMUX_PART_BROADCAST (reader);
1069 SPLITMUX_PART_UNLOCK (reader);
1075 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1076 if (ret == GST_STATE_CHANGE_FAILURE) {
1077 if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
1078 /* Make sure to release the lock we took above */
1079 SPLITMUX_PART_TYPE_UNLOCK (reader);
1084 switch (transition) {
1085 case GST_STATE_CHANGE_READY_TO_PAUSED:
1086 /* Sleep and wait until all streams have been collected, then do the seeks
1087 * to measure the stream lengths. This took the type lock above,
1088 * but it's OK to release it now and let typefinding happen... */
1089 SPLITMUX_PART_TYPE_UNLOCK (reader);
1091 SPLITMUX_PART_LOCK (reader);
1093 while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1094 GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1095 SPLITMUX_PART_WAIT (reader);
1098 if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
1099 reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
1100 gst_splitmux_part_reader_measure_streams (reader);
1101 } else if (reader->prep_state == PART_STATE_FAILED)
1102 ret = GST_STATE_CHANGE_FAILURE;
1103 SPLITMUX_PART_UNLOCK (reader);
1105 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1106 SPLITMUX_PART_LOCK (reader);
1107 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1108 reader->active = TRUE;
1109 SPLITMUX_PART_BROADCAST (reader);
1110 SPLITMUX_PART_UNLOCK (reader);
1112 case GST_STATE_CHANGE_READY_TO_NULL:
1113 reader->prep_state = PART_STATE_NULL;
1114 splitmux_part_reader_reset (reader);
1125 check_bus_messages (GstSplitMuxPartReader * part)
1127 gboolean ret = FALSE;
1131 bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1132 while ((m = gst_bus_pop (bus)) != NULL) {
1133 if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1134 GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1135 gst_message_unref (m);
1138 gst_message_unref (m);
1142 gst_object_unref (bus);
1147 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1149 GstStateChangeReturn ret;
1151 ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1153 if (ret != GST_STATE_CHANGE_SUCCESS)
1156 return check_bus_messages (part);
1160 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1162 gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1166 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1169 reader->path = g_strdup (path);
1173 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1174 GstSegment * seg, GstSeekFlags extra_flags)
1176 GST_DEBUG_OBJECT (reader, "Activating part reader");
1178 if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
1179 GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1183 if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1184 GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1185 GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1192 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1196 SPLITMUX_PART_LOCK (part);
1198 SPLITMUX_PART_UNLOCK (part);
1204 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1206 GST_DEBUG_OBJECT (reader, "Deactivating reader");
1207 gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1211 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1216 GST_LOG_OBJECT (reader, "%s dataqueues",
1217 flushing ? "Flushing" : "Done flushing");
1218 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1219 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1220 gst_data_queue_set_flushing (part_pad->queue, flushing);
1222 gst_data_queue_flush (part_pad->queue);
1227 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1228 gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1230 reader->cb_data = cb_data;
1231 reader->get_pad_cb = get_pad_cb;
1235 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1238 GstClockTime ret = GST_CLOCK_TIME_NONE;
1240 SPLITMUX_PART_LOCK (reader);
1241 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1242 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1243 if (!part_pad->is_sparse && part_pad->max_ts < ret)
1244 ret = part_pad->max_ts;
1247 SPLITMUX_PART_UNLOCK (reader);
1253 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1254 GstClockTime offset)
1256 SPLITMUX_PART_LOCK (reader);
1257 reader->start_offset = offset;
1258 GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1259 GST_TIME_ARGS (offset));
1260 SPLITMUX_PART_UNLOCK (reader);
1264 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1266 GstClockTime ret = GST_CLOCK_TIME_NONE;
1268 SPLITMUX_PART_LOCK (reader);
1269 ret = reader->start_offset;
1270 SPLITMUX_PART_UNLOCK (reader);
1276 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1280 SPLITMUX_PART_LOCK (reader);
1281 dur = reader->duration;
1282 SPLITMUX_PART_UNLOCK (reader);
1288 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1291 GstPad *result = NULL;
1294 SPLITMUX_PART_LOCK (reader);
1295 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1296 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1297 if (part_pad->target == target) {
1298 result = (GstPad *) gst_object_ref (part_pad);
1302 SPLITMUX_PART_UNLOCK (reader);
1308 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1309 GstDataQueueItem ** item)
1311 GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1315 /* Get one item from the appropriate dataqueue */
1316 SPLITMUX_PART_LOCK (reader);
1317 if (reader->prep_state == PART_STATE_FAILED) {
1318 SPLITMUX_PART_UNLOCK (reader);
1319 return GST_FLOW_ERROR;
1322 q = gst_object_ref (part_pad->queue);
1324 /* Have to drop the lock around pop, so we can be woken up for flush */
1325 SPLITMUX_PART_UNLOCK (reader);
1326 if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1327 ret = GST_FLOW_FLUSHING;
1331 SPLITMUX_PART_LOCK (reader);
1333 SPLITMUX_PART_BROADCAST (reader);
1334 if (GST_IS_EVENT ((*item)->object)) {
1335 GstEvent *e = (GstEvent *) ((*item)->object);
1336 /* Mark this pad as EOS */
1337 if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1338 part_pad->is_eos = TRUE;
1341 SPLITMUX_PART_UNLOCK (reader);
1345 gst_object_unref (q);
1350 bus_handler (GstBin * bin, GstMessage * message)
1352 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1354 switch (GST_MESSAGE_TYPE (message)) {
1355 case GST_MESSAGE_ERROR:
1356 /* Make sure to set the state to failed and wake up the listener
1358 SPLITMUX_PART_LOCK (reader);
1359 GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
1360 " marking this reader as failed", GST_MESSAGE_SRC (message));
1361 reader->prep_state = PART_STATE_FAILED;
1362 SPLITMUX_PART_BROADCAST (reader);
1363 SPLITMUX_PART_UNLOCK (reader);
1369 GST_BIN_CLASS (parent_class)->handle_message (bin, message);