1 /* GStreamer Split Demuxer bin that recombines files created by
2 * the splitmuxsink element.
4 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
27 #include "gstsplitmuxsrc.h"
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
43 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
45 typedef struct _GstSplitMuxPartPad
49 /* Reader we belong to */
50 GstSplitMuxPartReader *reader;
51 /* Output splitmuxsrc source pad */
63 GstSegment orig_segment;
64 GstClockTime initial_ts_offset;
67 typedef struct _GstSplitMuxPartPadClass
70 } GstSplitMuxPartPadClass;
72 static GType gst_splitmux_part_pad_get_type (void);
73 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
74 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
76 static void splitmux_part_pad_constructed (GObject * pad);
77 static void splitmux_part_pad_finalize (GObject * pad);
78 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
79 GstSplitMuxPartPad * part_pad, GstBuffer * buf);
81 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
82 guint visible, guint bytes, guint64 time, gpointer checkdata);
83 static void type_found (GstElement * typefind, guint probability,
84 GstCaps * caps, GstSplitMuxPartReader * reader);
85 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
87 /* Called with reader lock held */
89 have_empty_queue (GstSplitMuxPartReader * reader)
93 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
94 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
95 if (part_pad->is_eos) {
96 GST_LOG_OBJECT (part_pad, "Pad is EOS");
99 if (gst_data_queue_is_empty (part_pad->queue)) {
100 GST_LOG_OBJECT (part_pad, "Queue is empty");
108 /* Called with reader lock held */
110 block_until_can_push (GstSplitMuxPartReader * reader)
112 while (reader->running) {
113 if (reader->flushing)
115 if (reader->active && have_empty_queue (reader))
118 GST_LOG_OBJECT (reader,
119 "Waiting for activation or empty queue on reader %s", reader->path);
120 SPLITMUX_PART_WAIT (reader);
123 GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
124 reader->path, reader->active, reader->flushing);
126 return reader->active && !reader->flushing;
130 handle_buffer_measuring (GstSplitMuxPartReader * reader,
131 GstSplitMuxPartPad * part_pad, GstBuffer * buf)
133 GstClockTime ts = GST_CLOCK_TIME_NONE;
134 GstClockTimeDiff offset;
136 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
137 !part_pad->seen_buffer) {
138 /* If this is the first buffer on the pad in the collect_streams state,
139 * then calculate inital offset based on running time of this segment */
140 part_pad->initial_ts_offset =
141 part_pad->orig_segment.start + part_pad->orig_segment.base -
142 part_pad->orig_segment.time;
143 GST_DEBUG_OBJECT (reader,
144 "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
145 part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
147 part_pad->seen_buffer = TRUE;
149 /* Adjust buffer timestamps */
150 offset = reader->start_offset + part_pad->segment.base;
151 offset -= part_pad->initial_ts_offset;
153 /* Update the stored max duration on the pad,
154 * always preferring making DTS contiguous
156 if (GST_BUFFER_DTS_IS_VALID (buf))
157 ts = GST_BUFFER_DTS (buf) + offset;
158 else if (GST_BUFFER_PTS_IS_VALID (buf))
159 ts = GST_BUFFER_PTS (buf) + offset;
161 GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
162 " incoming PTS %" GST_TIME_FORMAT
163 " DTS %" GST_TIME_FORMAT " offset by %" GST_TIME_FORMAT
164 " to %" GST_TIME_FORMAT, part_pad,
165 GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
166 GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
167 GST_TIME_ARGS (offset), GST_TIME_ARGS (ts));
169 if (GST_CLOCK_TIME_IS_VALID (ts)) {
170 if (GST_BUFFER_DURATION_IS_VALID (buf))
171 ts += GST_BUFFER_DURATION (buf);
173 if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
174 part_pad->max_ts = ts;
175 GST_LOG_OBJECT (reader,
176 "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
177 GST_TIME_ARGS (part_pad->max_ts));
180 /* Is it time to move to measuring state yet? */
181 check_if_pads_collected (reader);
185 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
186 guint visible, guint bytes, guint64 time, gpointer checkdata)
188 /* Arbitrary safety limit. If we hit it, playback is likely to stall */
189 if (time > 20 * GST_SECOND)
195 splitmux_part_free_queue_item (GstDataQueueItem * item)
197 gst_mini_object_unref (item->object);
198 g_slice_free (GstDataQueueItem, item);
202 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
204 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
205 GstSplitMuxPartReader *reader = part_pad->reader;
206 GstDataQueueItem *item;
207 GstClockTimeDiff offset;
209 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
210 SPLITMUX_PART_LOCK (reader);
212 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
213 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
214 handle_buffer_measuring (reader, part_pad, buf);
215 gst_buffer_unref (buf);
216 SPLITMUX_PART_UNLOCK (reader);
220 if (!block_until_can_push (reader)) {
222 SPLITMUX_PART_UNLOCK (reader);
223 gst_buffer_unref (buf);
224 return GST_FLOW_FLUSHING;
227 /* Adjust buffer timestamps */
228 offset = reader->start_offset + part_pad->segment.base;
229 offset -= part_pad->initial_ts_offset;
231 if (GST_BUFFER_PTS_IS_VALID (buf))
232 GST_BUFFER_PTS (buf) += offset;
233 if (GST_BUFFER_DTS_IS_VALID (buf))
234 GST_BUFFER_DTS (buf) += offset;
236 /* We are active, and one queue is empty, place this buffer in
238 GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
239 item = g_slice_new (GstDataQueueItem);
240 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
241 item->object = GST_MINI_OBJECT (buf);
242 item->size = gst_buffer_get_size (buf);
243 item->duration = GST_BUFFER_DURATION (buf);
244 if (item->duration == GST_CLOCK_TIME_NONE)
246 item->visible = TRUE;
248 gst_object_ref (part_pad);
250 SPLITMUX_PART_UNLOCK (reader);
252 if (!gst_data_queue_push (part_pad->queue, item)) {
253 splitmux_part_free_queue_item (item);
254 gst_object_unref (part_pad);
255 return GST_FLOW_FLUSHING;
258 gst_object_unref (part_pad);
262 /* Called with splitmux part lock held */
264 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
267 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
268 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
269 if (!part_pad->is_eos)
277 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
280 GST_LOG_OBJECT (part, "Checking for preroll");
281 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
282 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
283 if (!part_pad->seen_buffer) {
284 GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
289 GST_LOG_OBJECT (part, "Part is prerolled");
295 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
299 SPLITMUX_PART_LOCK (reader);
300 res = splitmux_part_is_eos_locked (reader);
301 SPLITMUX_PART_UNLOCK (reader);
306 /* Called with splitmux part lock held */
308 splitmux_is_flushing (GstSplitMuxPartReader * reader)
311 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
312 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
313 if (part_pad->flushing)
321 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
323 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
324 GstSplitMuxPartReader *reader = part_pad->reader;
326 SplitMuxSrcPad *target;
327 GstDataQueueItem *item;
329 SPLITMUX_PART_LOCK (reader);
331 target = gst_object_ref (part_pad->target);
333 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
336 if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
339 switch (GST_EVENT_TYPE (event)) {
340 case GST_EVENT_SEGMENT:{
341 GstSegment *seg = &part_pad->segment;
343 GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
345 gst_event_copy_segment (event, seg);
346 gst_event_copy_segment (event, &part_pad->orig_segment);
348 if (seg->format != GST_FORMAT_TIME)
352 /* Adjust start/stop so the overall file is 0 + start_offset based */
353 if (seg->stop != -1) {
354 seg->stop -= seg->start;
355 seg->stop += seg->time + reader->start_offset;
357 seg->start = seg->time + reader->start_offset;
358 seg->time += reader->start_offset;
359 seg->position += reader->start_offset;
361 GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
364 gst_event_unref (event);
365 event = gst_event_new_segment (seg);
367 if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
368 && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
369 break; /* Only do further stuff with segments during initial measuring */
371 /* Take the first segment from the first part */
372 if (target->segment.format == GST_FORMAT_UNDEFINED) {
373 gst_segment_copy_into (seg, &target->segment);
374 GST_DEBUG_OBJECT (reader,
375 "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
378 if (seg->stop != -1 && target->segment.stop != -1) {
379 GstClockTime stop = seg->base + seg->stop;
380 if (stop > target->segment.stop) {
381 target->segment.stop = stop;
382 GST_DEBUG_OBJECT (reader,
383 "Adjusting segment stop by %" GST_TIME_FORMAT
384 " output now %" GST_SEGMENT_FORMAT,
385 GST_TIME_ARGS (reader->start_offset), &target->segment);
388 GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
393 GST_DEBUG_OBJECT (part_pad,
394 "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
395 reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
397 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
398 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
399 /* Mark this pad as EOS */
400 part_pad->is_eos = TRUE;
401 if (splitmux_part_is_eos_locked (reader)) {
402 /* Finished measuring things, set state and tell the state change func
403 * so it can seek back to the start */
404 GST_LOG_OBJECT (reader,
405 "EOS while measuring streams. Resetting for ready");
406 reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
407 SPLITMUX_PART_BROADCAST (reader);
413 case GST_EVENT_FLUSH_START:
414 reader->flushing = TRUE;
415 part_pad->flushing = TRUE;
416 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
418 gst_data_queue_set_flushing (part_pad->queue, TRUE);
419 SPLITMUX_PART_BROADCAST (reader);
421 case GST_EVENT_FLUSH_STOP:{
422 gst_data_queue_set_flushing (part_pad->queue, FALSE);
423 gst_data_queue_flush (part_pad->queue);
424 part_pad->seen_buffer = FALSE;
425 part_pad->flushing = FALSE;
426 part_pad->is_eos = FALSE;
428 reader->flushing = splitmux_is_flushing (reader);
429 GST_LOG_OBJECT (reader,
430 "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
431 reader->path, pad, reader->flushing);
432 SPLITMUX_PART_BROADCAST (reader);
439 /* Don't send events downstream while preparing */
440 if (reader->prep_state != PART_STATE_READY)
443 /* Don't pass flush events - those are done by the parent */
444 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
445 GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
448 if (!block_until_can_push (reader)) {
449 SPLITMUX_PART_UNLOCK (reader);
450 gst_object_unref (target);
451 gst_event_unref (event);
455 switch (GST_EVENT_TYPE (event)) {
457 /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
464 /* We are active, and one queue is empty, place this buffer in
466 gst_object_ref (part_pad->queue);
467 SPLITMUX_PART_UNLOCK (reader);
469 GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
470 item = g_slice_new (GstDataQueueItem);
471 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
472 item->object = GST_MINI_OBJECT (event);
475 if (item->duration == GST_CLOCK_TIME_NONE)
477 item->visible = FALSE;
479 if (!gst_data_queue_push (part_pad->queue, item)) {
480 splitmux_part_free_queue_item (item);
484 gst_object_unref (part_pad->queue);
485 gst_object_unref (target);
489 gst_event_unref (event);
490 gst_object_unref (target);
491 SPLITMUX_PART_UNLOCK (reader);
492 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
493 ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
497 GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
498 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
499 gst_event_unref (event);
500 gst_object_unref (target);
501 SPLITMUX_PART_UNLOCK (reader);
506 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
508 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
509 GstSplitMuxPartReader *reader = part_pad->reader;
511 gboolean ret = FALSE;
514 SPLITMUX_PART_LOCK (reader);
515 target = gst_object_ref (part_pad->target);
516 active = reader->active;
517 SPLITMUX_PART_UNLOCK (reader);
520 GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
521 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
523 ret = gst_pad_query (target, query);
526 gst_object_unref (target);
531 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
534 splitmux_part_pad_constructed (GObject * pad)
536 gst_pad_set_chain_function (GST_PAD (pad),
537 GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
538 gst_pad_set_event_function (GST_PAD (pad),
539 GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
540 gst_pad_set_query_function (GST_PAD (pad),
541 GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
543 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
547 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
549 GObjectClass *gobject_klass = (GObjectClass *) (klass);
551 gobject_klass->constructed = splitmux_part_pad_constructed;
552 gobject_klass->finalize = splitmux_part_pad_finalize;
556 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
558 pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
560 gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
561 gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
565 splitmux_part_pad_finalize (GObject * obj)
567 GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
569 GST_DEBUG_OBJECT (obj, "finalize");
570 gst_data_queue_set_flushing (pad->queue, TRUE);
571 gst_data_queue_flush (pad->queue);
572 gst_object_unref (GST_OBJECT_CAST (pad->queue));
575 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
579 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
580 GstSplitMuxPartReader * part);
581 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
582 static GstStateChangeReturn
583 gst_splitmux_part_reader_change_state (GstElement * element,
584 GstStateChange transition);
585 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
587 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
588 * part, gboolean flushing);
589 static void bus_handler (GstBin * bin, GstMessage * msg);
590 static void splitmux_part_reader_dispose (GObject * object);
591 static void splitmux_part_reader_finalize (GObject * object);
592 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
594 #define gst_splitmux_part_reader_parent_class parent_class
595 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
599 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
601 GObjectClass *gobject_klass = (GObjectClass *) (klass);
602 GstElementClass *gstelement_class = (GstElementClass *) klass;
603 GstBinClass *gstbin_class = (GstBinClass *) klass;
605 GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
606 "Split File Demuxing Source helper");
608 gobject_klass->dispose = splitmux_part_reader_dispose;
609 gobject_klass->finalize = splitmux_part_reader_finalize;
611 part_reader_signals[SIGNAL_PREPARED] =
612 g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
613 G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
614 prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
615 gstelement_class->change_state = gst_splitmux_part_reader_change_state;
616 gstelement_class->send_event = gst_splitmux_part_reader_send_event;
618 gstbin_class->handle_message = bus_handler;
622 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
624 GstElement *typefind;
626 reader->active = FALSE;
627 reader->duration = GST_CLOCK_TIME_NONE;
629 g_cond_init (&reader->inactive_cond);
630 g_mutex_init (&reader->lock);
632 /* FIXME: Create elements on a state change */
633 reader->src = gst_element_factory_make ("filesrc", NULL);
634 if (reader->src == NULL) {
635 GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
638 gst_bin_add (GST_BIN_CAST (reader), reader->src);
640 typefind = gst_element_factory_make ("typefind", NULL);
642 GST_ERROR_OBJECT (reader,
643 "Failed to create typefind element - check your installation");
647 gst_bin_add (GST_BIN_CAST (reader), typefind);
648 reader->typefind = typefind;
650 if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
651 GST_ERROR_OBJECT (reader,
652 "Failed to link typefind element - check your installation");
656 g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
661 splitmux_part_reader_dispose (GObject * object)
663 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
665 splitmux_part_reader_reset (reader);
667 G_OBJECT_CLASS (parent_class)->dispose (object);
671 splitmux_part_reader_finalize (GObject * object)
673 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
675 g_free (reader->path);
677 G_OBJECT_CLASS (parent_class)->finalize (object);
681 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
685 SPLITMUX_PART_LOCK (reader);
686 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
687 GstPad *pad = GST_PAD_CAST (cur->data);
688 gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
689 gst_object_unref (GST_OBJECT_CAST (pad));
692 g_list_free (reader->pads);
694 SPLITMUX_PART_UNLOCK (reader);
697 static GstSplitMuxPartPad *
698 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
701 GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
702 "name", GST_PAD_NAME (target),
703 "direction", GST_PAD_SINK,
705 pad->target = target;
706 pad->reader = reader;
708 gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
714 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
715 GstSplitMuxPartReader * reader)
717 GstPad *out_pad = NULL;
718 GstSplitMuxPartPad *proxy_pad;
720 GstPadLinkReturn link_ret;
722 caps = gst_pad_get_current_caps (pad);
724 GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
725 " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
727 gst_caps_unref (caps);
729 /* Look up or create the output pad */
730 if (reader->get_pad_cb)
731 out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
735 /* Create our proxy pad to interact with this new pad */
736 proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
737 GST_DEBUG_OBJECT (reader,
738 "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
741 link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
742 if (link_ret != GST_PAD_LINK_OK) {
743 gst_object_unref (proxy_pad);
744 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
745 ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
746 " ret %d", reader->path, pad, link_ret));
749 GST_DEBUG_OBJECT (reader,
750 "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
753 SPLITMUX_PART_LOCK (reader);
754 reader->pads = g_list_prepend (reader->pads, proxy_pad);
755 SPLITMUX_PART_UNLOCK (reader);
759 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
761 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
762 gboolean ret = FALSE;
765 /* Send event to the first source pad we found */
766 SPLITMUX_PART_LOCK (reader);
768 GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
769 pad = gst_pad_get_peer (proxy_pad);
771 SPLITMUX_PART_UNLOCK (reader);
774 ret = gst_pad_send_event (pad, event);
775 gst_object_unref (pad);
777 gst_event_unref (event);
783 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
785 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
788 SPLITMUX_PART_UNLOCK (reader);
789 GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
790 GST_TIME_ARGS (time));
791 gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
792 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
793 GST_SEEK_TYPE_END, 0);
795 SPLITMUX_PART_LOCK (reader);
797 /* Wait for flush to finish, so old data is gone */
798 while (reader->flushing) {
799 GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
800 SPLITMUX_PART_WAIT (reader);
804 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
806 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
807 GstSegment * target_seg)
810 GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
812 flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
814 SPLITMUX_PART_LOCK (reader);
815 if (target_seg->start >= reader->start_offset)
816 start = target_seg->start - reader->start_offset;
817 /* If the segment stop is within this part, don't play to the end */
818 if (target_seg->stop != -1 &&
819 target_seg->stop < reader->start_offset + reader->duration)
820 stop = target_seg->stop - reader->start_offset;
822 SPLITMUX_PART_UNLOCK (reader);
824 GST_DEBUG_OBJECT (reader,
825 "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
826 GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
827 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
829 return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
830 target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
834 /* Called with lock held */
836 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
838 /* Trigger a flushing seek to near the end of the file and run each stream
839 * to EOS in order to find the smallest end timestamp to start the next
842 if (GST_CLOCK_TIME_IS_VALID (reader->duration)
843 && reader->duration > GST_SECOND) {
844 GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
845 gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
848 /* Wait for things to happen */
849 while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
850 SPLITMUX_PART_WAIT (reader);
852 if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
853 /* Fire the prepared signal and go to READY state */
854 GST_DEBUG_OBJECT (reader,
855 "Stream measuring complete. File %s is now ready. Firing prepared signal",
857 reader->prep_state = PART_STATE_READY;
858 g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
863 find_demuxer (GstCaps * caps)
866 gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
868 GList *compat_elements;
869 GstElement *e = NULL;
871 if (factories == NULL)
875 gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
877 if (compat_elements) {
878 /* Just take the first (highest ranked) option */
879 GstElementFactory *factory =
880 GST_ELEMENT_FACTORY_CAST (compat_elements->data);
881 e = gst_element_factory_create (factory, NULL);
882 gst_plugin_feature_list_free (compat_elements);
886 gst_plugin_feature_list_free (factories);
892 type_found (GstElement * typefind, guint probability,
893 GstCaps * caps, GstSplitMuxPartReader * reader)
897 GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
899 /* typefind found a type. Look for the demuxer to handle it */
900 demux = reader->demux = find_demuxer (caps);
901 if (reader->demux == NULL) {
902 GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
906 gst_bin_add (GST_BIN_CAST (reader), demux);
907 gst_element_link_pads (reader->typefind, "src", demux, NULL);
908 gst_element_sync_state_with_parent (reader->demux);
910 /* Connect to demux signals */
911 g_signal_connect (demux,
912 "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
913 g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
917 check_if_pads_collected (GstSplitMuxPartReader * reader)
919 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
920 /* Check we have all pads and each pad has seen a buffer */
921 if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
922 GST_DEBUG_OBJECT (reader,
923 "no more pads - file %s. Measuring stream length", reader->path);
924 reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
925 SPLITMUX_PART_BROADCAST (reader);
931 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
933 GstClockTime duration = GST_CLOCK_TIME_NONE;
935 /* Query the minimum duration of any pad in this piece and store it.
936 * FIXME: Only consider audio and video */
937 SPLITMUX_PART_LOCK (reader);
938 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
939 GstPad *target = GST_PAD_CAST (cur->data);
942 if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
943 GST_INFO_OBJECT (reader,
944 "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
945 reader->path, target, GST_TIME_ARGS (cur_duration));
946 if (cur_duration < duration)
947 duration = cur_duration;
951 GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
952 reader->path, GST_TIME_ARGS (duration));
953 reader->duration = (GstClockTime) duration;
955 reader->no_more_pads = TRUE;
957 check_if_pads_collected (reader);
958 SPLITMUX_PART_UNLOCK (reader);
962 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
963 GstPad * src_pad, GstQuery * query)
965 GstPad *target = NULL;
969 SPLITMUX_PART_LOCK (part);
970 /* Find the pad corresponding to the visible output target pad */
971 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
972 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
973 if (part_pad->target == src_pad) {
974 target = gst_object_ref (GST_OBJECT_CAST (part_pad));
978 SPLITMUX_PART_UNLOCK (part);
983 ret = gst_pad_peer_query (target, query);
984 gst_object_unref (GST_OBJECT_CAST (target));
989 /* Post-massaging of queries */
990 switch (GST_QUERY_TYPE (query)) {
991 case GST_QUERY_POSITION:{
995 gst_query_parse_position (query, &fmt, &position);
996 if (fmt != GST_FORMAT_TIME)
998 SPLITMUX_PART_LOCK (part);
999 position += part->start_offset;
1000 GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1001 GST_TIME_ARGS (position));
1002 SPLITMUX_PART_UNLOCK (part);
1004 gst_query_set_position (query, fmt, position);
1012 gst_object_unref (target);
1016 static GstStateChangeReturn
1017 gst_splitmux_part_reader_change_state (GstElement * element,
1018 GstStateChange transition)
1020 GstStateChangeReturn ret;
1021 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1023 switch (transition) {
1024 case GST_STATE_CHANGE_NULL_TO_READY:{
1027 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1028 g_object_set (reader->src, "location", reader->path, NULL);
1029 SPLITMUX_PART_LOCK (reader);
1030 reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1031 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1032 reader->running = TRUE;
1033 SPLITMUX_PART_UNLOCK (reader);
1036 case GST_STATE_CHANGE_READY_TO_NULL:
1037 case GST_STATE_CHANGE_PAUSED_TO_READY:
1038 SPLITMUX_PART_LOCK (reader);
1039 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1040 reader->running = FALSE;
1041 SPLITMUX_PART_BROADCAST (reader);
1042 SPLITMUX_PART_UNLOCK (reader);
1044 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1045 SPLITMUX_PART_LOCK (reader);
1046 reader->active = FALSE;
1047 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1048 SPLITMUX_PART_BROADCAST (reader);
1049 SPLITMUX_PART_UNLOCK (reader);
1055 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1056 if (ret == GST_STATE_CHANGE_FAILURE)
1059 switch (transition) {
1060 case GST_STATE_CHANGE_READY_TO_PAUSED:
1061 /* Sleep and wait until all streams have been collected, then do the seeks
1062 * to measure the stream lengths */
1063 SPLITMUX_PART_LOCK (reader);
1065 while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1066 GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1067 SPLITMUX_PART_WAIT (reader);
1070 if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
1071 gst_splitmux_part_reader_measure_streams (reader);
1072 else if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY)
1073 reader->prep_state = PART_STATE_READY;
1074 else if (reader->prep_state == PART_STATE_FAILED)
1075 ret = GST_STATE_CHANGE_FAILURE;
1076 SPLITMUX_PART_UNLOCK (reader);
1078 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1079 SPLITMUX_PART_LOCK (reader);
1080 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1081 reader->active = TRUE;
1082 SPLITMUX_PART_BROADCAST (reader);
1083 SPLITMUX_PART_UNLOCK (reader);
1085 case GST_STATE_CHANGE_READY_TO_NULL:
1086 reader->prep_state = PART_STATE_NULL;
1087 splitmux_part_reader_reset (reader);
1098 check_bus_messages (GstSplitMuxPartReader * part)
1100 gboolean ret = FALSE;
1104 bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1105 while ((m = gst_bus_pop (bus)) != NULL) {
1106 if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1107 GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1108 gst_message_unref (m);
1111 gst_message_unref (m);
1115 gst_object_unref (bus);
1120 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1122 GstStateChangeReturn ret;
1124 ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1126 if (ret != GST_STATE_CHANGE_SUCCESS)
1129 return check_bus_messages (part);
1133 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1135 gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1139 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1142 reader->path = g_strdup (path);
1146 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1149 GST_DEBUG_OBJECT (reader, "Activating part reader");
1151 if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
1152 GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1156 if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1157 GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1158 GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1165 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1167 GST_DEBUG_OBJECT (reader, "Deactivating reader");
1168 gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1172 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1177 GST_LOG_OBJECT (reader, "%s dataqueues",
1178 flushing ? "Flushing" : "Done flushing");
1179 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1180 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1181 gst_data_queue_set_flushing (part_pad->queue, flushing);
1183 gst_data_queue_flush (part_pad->queue);
1188 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1189 gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1191 reader->cb_data = cb_data;
1192 reader->get_pad_cb = get_pad_cb;
1196 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1199 GstClockTime ret = GST_CLOCK_TIME_NONE;
1201 SPLITMUX_PART_LOCK (reader);
1202 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1203 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1204 if (part_pad->max_ts < ret)
1205 ret = part_pad->max_ts;
1208 SPLITMUX_PART_UNLOCK (reader);
1214 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1215 GstClockTime offset)
1217 SPLITMUX_PART_LOCK (reader);
1218 reader->start_offset = offset;
1219 GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1220 GST_TIME_ARGS (offset));
1221 SPLITMUX_PART_UNLOCK (reader);
1225 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1227 GstClockTime ret = GST_CLOCK_TIME_NONE;
1229 SPLITMUX_PART_LOCK (reader);
1230 ret = reader->start_offset;
1231 SPLITMUX_PART_UNLOCK (reader);
1237 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1241 SPLITMUX_PART_LOCK (reader);
1242 dur = reader->duration;
1243 SPLITMUX_PART_UNLOCK (reader);
1249 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1252 GstPad *result = NULL;
1255 SPLITMUX_PART_LOCK (reader);
1256 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1257 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1258 if (part_pad->target == target) {
1259 result = (GstPad *) part_pad;
1263 SPLITMUX_PART_UNLOCK (reader);
1269 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1270 GstDataQueueItem ** item)
1272 GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1276 /* Get one item from the appropriate dataqueue */
1277 SPLITMUX_PART_LOCK (reader);
1278 if (reader->prep_state == PART_STATE_FAILED) {
1279 SPLITMUX_PART_UNLOCK (reader);
1280 return GST_FLOW_ERROR;
1283 q = gst_object_ref (part_pad->queue);
1285 /* Have to drop the lock around pop, so we can be woken up for flush */
1286 SPLITMUX_PART_UNLOCK (reader);
1287 if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1288 ret = GST_FLOW_FLUSHING;
1292 SPLITMUX_PART_LOCK (reader);
1294 SPLITMUX_PART_BROADCAST (reader);
1295 if (GST_IS_EVENT ((*item)->object)) {
1296 GstEvent *e = (GstEvent *) ((*item)->object);
1297 /* Mark this pad as EOS */
1298 if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1299 part_pad->is_eos = TRUE;
1302 SPLITMUX_PART_UNLOCK (reader);
1306 gst_object_unref (q);
1311 bus_handler (GstBin * bin, GstMessage * message)
1313 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1315 switch (GST_MESSAGE_TYPE (message)) {
1316 case GST_MESSAGE_ERROR:
1317 /* Make sure to set the state to failed and wake up the listener
1319 SPLITMUX_PART_LOCK (reader);
1320 reader->prep_state = PART_STATE_FAILED;
1321 SPLITMUX_PART_BROADCAST (reader);
1322 SPLITMUX_PART_UNLOCK (reader);
1328 GST_BIN_CLASS (parent_class)->handle_message (bin, message);