1 /* GStreamer Split Demuxer bin that recombines files created by
2 * the splitmuxsink element.
4 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
27 #include "gstsplitmuxsrc.h"
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
46 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
48 typedef struct _GstSplitMuxPartPad
52 /* Reader we belong to */
53 GstSplitMuxPartReader *reader;
54 /* Output splitmuxsrc source pad */
66 GstSegment orig_segment;
67 GstClockTime initial_ts_offset;
70 typedef struct _GstSplitMuxPartPadClass
73 } GstSplitMuxPartPadClass;
75 static GType gst_splitmux_part_pad_get_type (void);
76 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
77 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
79 static void splitmux_part_pad_constructed (GObject * pad);
80 static void splitmux_part_pad_finalize (GObject * pad);
81 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
82 GstSplitMuxPartPad * part_pad, GstBuffer * buf);
84 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
85 guint visible, guint bytes, guint64 time, gpointer checkdata);
86 static void type_found (GstElement * typefind, guint probability,
87 GstCaps * caps, GstSplitMuxPartReader * reader);
88 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
90 /* Called with reader lock held */
92 have_empty_queue (GstSplitMuxPartReader * reader)
96 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
97 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
98 if (part_pad->is_eos) {
99 GST_LOG_OBJECT (part_pad, "Pad is EOS");
102 if (gst_data_queue_is_empty (part_pad->queue)) {
103 GST_LOG_OBJECT (part_pad, "Queue is empty");
111 /* Called with reader lock held */
113 block_until_can_push (GstSplitMuxPartReader * reader)
115 while (reader->running) {
116 if (reader->flushing)
118 if (reader->active && have_empty_queue (reader))
121 GST_LOG_OBJECT (reader,
122 "Waiting for activation or empty queue on reader %s", reader->path);
123 SPLITMUX_PART_WAIT (reader);
126 GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
127 reader->path, reader->active, reader->flushing);
129 return reader->active && !reader->flushing;
133 handle_buffer_measuring (GstSplitMuxPartReader * reader,
134 GstSplitMuxPartPad * part_pad, GstBuffer * buf)
136 GstClockTime ts = GST_CLOCK_TIME_NONE;
137 GstClockTimeDiff offset;
139 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
140 !part_pad->seen_buffer) {
141 /* If this is the first buffer on the pad in the collect_streams state,
142 * then calculate inital offset based on running time of this segment */
143 part_pad->initial_ts_offset =
144 part_pad->orig_segment.start + part_pad->orig_segment.base -
145 part_pad->orig_segment.time;
146 GST_DEBUG_OBJECT (reader,
147 "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
148 part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
150 part_pad->seen_buffer = TRUE;
152 /* Adjust buffer timestamps */
153 offset = reader->start_offset + part_pad->segment.base;
154 offset -= part_pad->initial_ts_offset;
156 /* Update the stored max duration on the pad,
157 * always preferring making DTS contiguous
159 if (GST_BUFFER_DTS_IS_VALID (buf))
160 ts = GST_BUFFER_DTS (buf) + offset;
161 else if (GST_BUFFER_PTS_IS_VALID (buf))
162 ts = GST_BUFFER_PTS (buf) + offset;
164 GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
165 " incoming PTS %" GST_TIME_FORMAT
166 " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
167 " to %" GST_TIME_FORMAT, part_pad,
168 GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
169 GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
170 GST_STIME_ARGS (offset), GST_TIME_ARGS (ts));
172 if (GST_CLOCK_TIME_IS_VALID (ts)) {
173 if (GST_BUFFER_DURATION_IS_VALID (buf))
174 ts += GST_BUFFER_DURATION (buf);
176 if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
177 part_pad->max_ts = ts;
178 GST_LOG_OBJECT (reader,
179 "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
180 GST_TIME_ARGS (part_pad->max_ts));
183 /* Is it time to move to measuring state yet? */
184 check_if_pads_collected (reader);
188 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
189 guint visible, guint bytes, guint64 time, gpointer checkdata)
191 /* Arbitrary safety limit. If we hit it, playback is likely to stall */
192 if (time > 20 * GST_SECOND)
198 splitmux_part_free_queue_item (GstDataQueueItem * item)
200 gst_mini_object_unref (item->object);
201 g_slice_free (GstDataQueueItem, item);
205 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
207 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
208 GstSplitMuxPartReader *reader = part_pad->reader;
209 GstDataQueueItem *item;
210 GstClockTimeDiff offset;
212 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
213 SPLITMUX_PART_LOCK (reader);
215 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
216 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
217 handle_buffer_measuring (reader, part_pad, buf);
218 gst_buffer_unref (buf);
219 SPLITMUX_PART_UNLOCK (reader);
223 if (!block_until_can_push (reader)) {
225 SPLITMUX_PART_UNLOCK (reader);
226 gst_buffer_unref (buf);
227 return GST_FLOW_FLUSHING;
230 /* Adjust buffer timestamps */
231 offset = reader->start_offset + part_pad->segment.base;
232 offset -= part_pad->initial_ts_offset;
234 if (GST_BUFFER_PTS_IS_VALID (buf))
235 GST_BUFFER_PTS (buf) += offset;
236 if (GST_BUFFER_DTS_IS_VALID (buf))
237 GST_BUFFER_DTS (buf) += offset;
239 /* We are active, and one queue is empty, place this buffer in
241 GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
242 item = g_slice_new (GstDataQueueItem);
243 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
244 item->object = GST_MINI_OBJECT (buf);
245 item->size = gst_buffer_get_size (buf);
246 item->duration = GST_BUFFER_DURATION (buf);
247 if (item->duration == GST_CLOCK_TIME_NONE)
249 item->visible = TRUE;
251 gst_object_ref (part_pad);
253 SPLITMUX_PART_UNLOCK (reader);
255 if (!gst_data_queue_push (part_pad->queue, item)) {
256 splitmux_part_free_queue_item (item);
257 gst_object_unref (part_pad);
258 return GST_FLOW_FLUSHING;
261 gst_object_unref (part_pad);
265 /* Called with splitmux part lock held */
267 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
270 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
271 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
272 if (!part_pad->is_eos)
280 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
283 GST_LOG_OBJECT (part, "Checking for preroll");
284 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
285 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
286 if (!part_pad->seen_buffer) {
287 GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
292 GST_LOG_OBJECT (part, "Part is prerolled");
298 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
302 SPLITMUX_PART_LOCK (reader);
303 res = splitmux_part_is_eos_locked (reader);
304 SPLITMUX_PART_UNLOCK (reader);
309 /* Called with splitmux part lock held */
311 splitmux_is_flushing (GstSplitMuxPartReader * reader)
314 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
315 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
316 if (part_pad->flushing)
324 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
326 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
327 GstSplitMuxPartReader *reader = part_pad->reader;
329 SplitMuxSrcPad *target;
330 GstDataQueueItem *item;
332 SPLITMUX_PART_LOCK (reader);
334 target = gst_object_ref (part_pad->target);
336 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
339 if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
342 switch (GST_EVENT_TYPE (event)) {
343 case GST_EVENT_SEGMENT:{
344 GstSegment *seg = &part_pad->segment;
346 GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
348 gst_event_copy_segment (event, seg);
349 gst_event_copy_segment (event, &part_pad->orig_segment);
351 if (seg->format != GST_FORMAT_TIME)
355 /* Adjust start/stop so the overall file is 0 + start_offset based */
356 if (seg->stop != -1) {
357 seg->stop -= seg->start;
358 seg->stop += seg->time + reader->start_offset;
360 seg->start = seg->time + reader->start_offset;
361 seg->time += reader->start_offset;
362 seg->position += reader->start_offset;
364 GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
367 gst_event_unref (event);
368 event = gst_event_new_segment (seg);
370 if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
371 && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
372 break; /* Only do further stuff with segments during initial measuring */
374 /* Take the first segment from the first part */
375 if (target->segment.format == GST_FORMAT_UNDEFINED) {
376 gst_segment_copy_into (seg, &target->segment);
377 GST_DEBUG_OBJECT (reader,
378 "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
381 if (seg->stop != -1 && target->segment.stop != -1) {
382 GstClockTime stop = seg->base + seg->stop;
383 if (stop > target->segment.stop) {
384 target->segment.stop = stop;
385 GST_DEBUG_OBJECT (reader,
386 "Adjusting segment stop by %" GST_TIME_FORMAT
387 " output now %" GST_SEGMENT_FORMAT,
388 GST_TIME_ARGS (reader->start_offset), &target->segment);
391 GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
396 GST_DEBUG_OBJECT (part_pad,
397 "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
398 reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
400 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
401 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
402 /* Mark this pad as EOS */
403 part_pad->is_eos = TRUE;
404 if (splitmux_part_is_eos_locked (reader)) {
405 /* Finished measuring things, set state and tell the state change func
406 * so it can seek back to the start */
407 GST_LOG_OBJECT (reader,
408 "EOS while measuring streams. Resetting for ready");
409 reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
410 SPLITMUX_PART_BROADCAST (reader);
416 case GST_EVENT_FLUSH_START:
417 reader->flushing = TRUE;
418 part_pad->flushing = TRUE;
419 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
421 gst_data_queue_set_flushing (part_pad->queue, TRUE);
422 SPLITMUX_PART_BROADCAST (reader);
424 case GST_EVENT_FLUSH_STOP:{
425 gst_data_queue_set_flushing (part_pad->queue, FALSE);
426 gst_data_queue_flush (part_pad->queue);
427 part_pad->seen_buffer = FALSE;
428 part_pad->flushing = FALSE;
429 part_pad->is_eos = FALSE;
431 reader->flushing = splitmux_is_flushing (reader);
432 GST_LOG_OBJECT (reader,
433 "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
434 reader->path, pad, reader->flushing);
435 SPLITMUX_PART_BROADCAST (reader);
442 /* Don't send events downstream while preparing */
443 if (reader->prep_state != PART_STATE_READY)
446 /* Don't pass flush events - those are done by the parent */
447 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
448 GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
451 if (!block_until_can_push (reader)) {
452 SPLITMUX_PART_UNLOCK (reader);
453 gst_object_unref (target);
454 gst_event_unref (event);
458 switch (GST_EVENT_TYPE (event)) {
460 /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
467 /* We are active, and one queue is empty, place this buffer in
469 gst_object_ref (part_pad->queue);
470 SPLITMUX_PART_UNLOCK (reader);
472 GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
473 item = g_slice_new (GstDataQueueItem);
474 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
475 item->object = GST_MINI_OBJECT (event);
478 if (item->duration == GST_CLOCK_TIME_NONE)
480 item->visible = FALSE;
482 if (!gst_data_queue_push (part_pad->queue, item)) {
483 splitmux_part_free_queue_item (item);
487 gst_object_unref (part_pad->queue);
488 gst_object_unref (target);
492 gst_event_unref (event);
493 gst_object_unref (target);
494 SPLITMUX_PART_UNLOCK (reader);
495 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
496 ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
500 GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
501 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
502 gst_event_unref (event);
503 gst_object_unref (target);
504 SPLITMUX_PART_UNLOCK (reader);
509 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
511 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
512 GstSplitMuxPartReader *reader = part_pad->reader;
514 gboolean ret = FALSE;
517 SPLITMUX_PART_LOCK (reader);
518 target = gst_object_ref (part_pad->target);
519 active = reader->active;
520 SPLITMUX_PART_UNLOCK (reader);
523 GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
524 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
526 ret = gst_pad_query (target, query);
529 gst_object_unref (target);
534 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
537 splitmux_part_pad_constructed (GObject * pad)
539 gst_pad_set_chain_function (GST_PAD (pad),
540 GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
541 gst_pad_set_event_function (GST_PAD (pad),
542 GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
543 gst_pad_set_query_function (GST_PAD (pad),
544 GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
546 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
550 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
552 GObjectClass *gobject_klass = (GObjectClass *) (klass);
554 gobject_klass->constructed = splitmux_part_pad_constructed;
555 gobject_klass->finalize = splitmux_part_pad_finalize;
559 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
561 pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
563 gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
564 gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
568 splitmux_part_pad_finalize (GObject * obj)
570 GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
572 GST_DEBUG_OBJECT (obj, "finalize");
573 gst_data_queue_set_flushing (pad->queue, TRUE);
574 gst_data_queue_flush (pad->queue);
575 gst_object_unref (GST_OBJECT_CAST (pad->queue));
578 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
582 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
583 GstSplitMuxPartReader * part);
584 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
585 static GstStateChangeReturn
586 gst_splitmux_part_reader_change_state (GstElement * element,
587 GstStateChange transition);
588 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
590 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
591 * part, gboolean flushing);
592 static void bus_handler (GstBin * bin, GstMessage * msg);
593 static void splitmux_part_reader_dispose (GObject * object);
594 static void splitmux_part_reader_finalize (GObject * object);
595 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
597 #define gst_splitmux_part_reader_parent_class parent_class
598 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
602 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
604 GObjectClass *gobject_klass = (GObjectClass *) (klass);
605 GstElementClass *gstelement_class = (GstElementClass *) klass;
606 GstBinClass *gstbin_class = (GstBinClass *) klass;
608 GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
609 "Split File Demuxing Source helper");
611 gobject_klass->dispose = splitmux_part_reader_dispose;
612 gobject_klass->finalize = splitmux_part_reader_finalize;
614 part_reader_signals[SIGNAL_PREPARED] =
615 g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
616 G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
617 prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
618 gstelement_class->change_state = gst_splitmux_part_reader_change_state;
619 gstelement_class->send_event = gst_splitmux_part_reader_send_event;
621 gstbin_class->handle_message = bus_handler;
625 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
627 GstElement *typefind;
629 reader->active = FALSE;
630 reader->duration = GST_CLOCK_TIME_NONE;
632 g_cond_init (&reader->inactive_cond);
633 g_mutex_init (&reader->lock);
634 g_mutex_init (&reader->type_lock);
636 /* FIXME: Create elements on a state change */
637 reader->src = gst_element_factory_make ("filesrc", NULL);
638 if (reader->src == NULL) {
639 GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
642 gst_bin_add (GST_BIN_CAST (reader), reader->src);
644 typefind = gst_element_factory_make ("typefind", NULL);
646 GST_ERROR_OBJECT (reader,
647 "Failed to create typefind element - check your installation");
651 gst_bin_add (GST_BIN_CAST (reader), typefind);
652 reader->typefind = typefind;
654 if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
655 GST_ERROR_OBJECT (reader,
656 "Failed to link typefind element - check your installation");
660 g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
665 splitmux_part_reader_dispose (GObject * object)
667 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
669 splitmux_part_reader_reset (reader);
671 G_OBJECT_CLASS (parent_class)->dispose (object);
675 splitmux_part_reader_finalize (GObject * object)
677 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
679 g_cond_clear (&reader->inactive_cond);
680 g_mutex_clear (&reader->lock);
681 g_mutex_clear (&reader->type_lock);
683 g_free (reader->path);
685 G_OBJECT_CLASS (parent_class)->finalize (object);
689 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
693 SPLITMUX_PART_LOCK (reader);
694 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
695 GstPad *pad = GST_PAD_CAST (cur->data);
696 gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
697 gst_object_unref (GST_OBJECT_CAST (pad));
700 g_list_free (reader->pads);
702 SPLITMUX_PART_UNLOCK (reader);
705 static GstSplitMuxPartPad *
706 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
709 GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
710 "name", GST_PAD_NAME (target),
711 "direction", GST_PAD_SINK,
713 pad->target = target;
714 pad->reader = reader;
716 gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
722 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
723 GstSplitMuxPartReader * reader)
725 GstPad *out_pad = NULL;
726 GstSplitMuxPartPad *proxy_pad;
728 GstPadLinkReturn link_ret;
730 caps = gst_pad_get_current_caps (pad);
732 GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
733 " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
735 gst_caps_unref (caps);
737 /* Look up or create the output pad */
738 if (reader->get_pad_cb)
739 out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
740 if (out_pad == NULL) {
741 GST_DEBUG_OBJECT (reader,
742 "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
746 /* Create our proxy pad to interact with this new pad */
747 proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
748 GST_DEBUG_OBJECT (reader,
749 "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
752 link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
753 if (link_ret != GST_PAD_LINK_OK) {
754 gst_object_unref (proxy_pad);
755 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
756 ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
757 " ret %d", reader->path, pad, link_ret));
760 GST_DEBUG_OBJECT (reader,
761 "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
764 SPLITMUX_PART_LOCK (reader);
765 reader->pads = g_list_prepend (reader->pads, proxy_pad);
766 SPLITMUX_PART_UNLOCK (reader);
770 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
772 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
773 gboolean ret = FALSE;
776 /* Send event to the first source pad we found */
777 SPLITMUX_PART_LOCK (reader);
779 GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
780 pad = gst_pad_get_peer (proxy_pad);
782 SPLITMUX_PART_UNLOCK (reader);
785 ret = gst_pad_send_event (pad, event);
786 gst_object_unref (pad);
788 gst_event_unref (event);
794 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
796 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
799 SPLITMUX_PART_UNLOCK (reader);
800 GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
801 GST_TIME_ARGS (time));
802 gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
803 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
804 GST_SEEK_TYPE_END, 0);
806 SPLITMUX_PART_LOCK (reader);
808 /* Wait for flush to finish, so old data is gone */
809 while (reader->flushing) {
810 GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
811 SPLITMUX_PART_WAIT (reader);
815 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
817 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
818 GstSegment * target_seg)
821 GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
823 flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
825 SPLITMUX_PART_LOCK (reader);
826 if (target_seg->start >= reader->start_offset)
827 start = target_seg->start - reader->start_offset;
828 /* If the segment stop is within this part, don't play to the end */
829 if (target_seg->stop != -1 &&
830 target_seg->stop < reader->start_offset + reader->duration)
831 stop = target_seg->stop - reader->start_offset;
833 SPLITMUX_PART_UNLOCK (reader);
835 GST_DEBUG_OBJECT (reader,
836 "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
837 GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
838 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
840 return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
841 target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
845 /* Called with lock held */
847 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
849 /* Trigger a flushing seek to near the end of the file and run each stream
850 * to EOS in order to find the smallest end timestamp to start the next
853 if (GST_CLOCK_TIME_IS_VALID (reader->duration)
854 && reader->duration > GST_SECOND) {
855 GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
856 gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
859 /* Wait for things to happen */
860 while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
861 SPLITMUX_PART_WAIT (reader);
863 if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
864 /* Fire the prepared signal and go to READY state */
865 GST_DEBUG_OBJECT (reader,
866 "Stream measuring complete. File %s is now ready. Firing prepared signal",
868 reader->prep_state = PART_STATE_READY;
869 g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
874 find_demuxer (GstCaps * caps)
877 gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
879 GList *compat_elements;
880 GstElement *e = NULL;
882 if (factories == NULL)
886 gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
888 if (compat_elements) {
889 /* Just take the first (highest ranked) option */
890 GstElementFactory *factory =
891 GST_ELEMENT_FACTORY_CAST (compat_elements->data);
892 e = gst_element_factory_create (factory, NULL);
893 gst_plugin_feature_list_free (compat_elements);
897 gst_plugin_feature_list_free (factories);
903 type_found (GstElement * typefind, guint probability,
904 GstCaps * caps, GstSplitMuxPartReader * reader)
908 GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
910 /* typefind found a type. Look for the demuxer to handle it */
911 demux = reader->demux = find_demuxer (caps);
912 if (reader->demux == NULL) {
913 GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
917 /* Connect to demux signals */
918 g_signal_connect (demux,
919 "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
920 g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
922 gst_element_set_locked_state (demux, TRUE);
923 gst_bin_add (GST_BIN_CAST (reader), demux);
924 gst_element_link_pads (reader->typefind, "src", demux, NULL);
925 gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
926 gst_element_set_locked_state (demux, FALSE);
930 check_if_pads_collected (GstSplitMuxPartReader * reader)
932 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
933 /* Check we have all pads and each pad has seen a buffer */
934 if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
935 GST_DEBUG_OBJECT (reader,
936 "no more pads - file %s. Measuring stream length", reader->path);
937 reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
938 SPLITMUX_PART_BROADCAST (reader);
944 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
946 GstClockTime duration = GST_CLOCK_TIME_NONE;
948 /* Query the minimum duration of any pad in this piece and store it.
949 * FIXME: Only consider audio and video */
950 SPLITMUX_PART_LOCK (reader);
951 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
952 GstPad *target = GST_PAD_CAST (cur->data);
955 if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
956 GST_INFO_OBJECT (reader,
957 "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
958 reader->path, target, GST_TIME_ARGS (cur_duration));
959 if (cur_duration < duration)
960 duration = cur_duration;
964 GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
965 reader->path, GST_TIME_ARGS (duration));
966 reader->duration = (GstClockTime) duration;
968 reader->no_more_pads = TRUE;
970 check_if_pads_collected (reader);
971 SPLITMUX_PART_UNLOCK (reader);
975 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
976 GstPad * src_pad, GstQuery * query)
978 GstPad *target = NULL;
982 SPLITMUX_PART_LOCK (part);
983 /* Find the pad corresponding to the visible output target pad */
984 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
985 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
986 if (part_pad->target == src_pad) {
987 target = gst_object_ref (GST_OBJECT_CAST (part_pad));
991 SPLITMUX_PART_UNLOCK (part);
996 ret = gst_pad_peer_query (target, query);
1001 /* Post-massaging of queries */
1002 switch (GST_QUERY_TYPE (query)) {
1003 case GST_QUERY_POSITION:{
1007 gst_query_parse_position (query, &fmt, &position);
1008 if (fmt != GST_FORMAT_TIME)
1010 SPLITMUX_PART_LOCK (part);
1011 position += part->start_offset;
1012 GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1013 GST_TIME_ARGS (position));
1014 SPLITMUX_PART_UNLOCK (part);
1016 gst_query_set_position (query, fmt, position);
1024 gst_object_unref (target);
1028 static GstStateChangeReturn
1029 gst_splitmux_part_reader_change_state (GstElement * element,
1030 GstStateChange transition)
1032 GstStateChangeReturn ret;
1033 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1035 switch (transition) {
1036 case GST_STATE_CHANGE_NULL_TO_READY:{
1039 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1040 /* Hold the splitmux type lock until after the
1041 * parent state change function has finished
1042 * changing the states of things, and type finding can continue */
1043 SPLITMUX_PART_LOCK (reader);
1044 g_object_set (reader->src, "location", reader->path, NULL);
1045 SPLITMUX_PART_UNLOCK (reader);
1046 SPLITMUX_PART_TYPE_LOCK (reader);
1049 case GST_STATE_CHANGE_READY_TO_NULL:
1050 case GST_STATE_CHANGE_PAUSED_TO_READY:
1051 SPLITMUX_PART_LOCK (reader);
1052 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1053 reader->running = FALSE;
1054 SPLITMUX_PART_BROADCAST (reader);
1055 SPLITMUX_PART_UNLOCK (reader);
1057 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1058 SPLITMUX_PART_LOCK (reader);
1059 reader->active = FALSE;
1060 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1061 SPLITMUX_PART_BROADCAST (reader);
1062 SPLITMUX_PART_UNLOCK (reader);
1068 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1069 if (ret == GST_STATE_CHANGE_FAILURE) {
1070 if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
1071 /* Make sure to release the lock we took above */
1072 SPLITMUX_PART_TYPE_UNLOCK (reader);
1077 switch (transition) {
1078 case GST_STATE_CHANGE_READY_TO_PAUSED:
1079 /* Sleep and wait until all streams have been collected, then do the seeks
1080 * to measure the stream lengths. This took the type lock above,
1081 * but it's OK to release it now and let typefinding happen... */
1082 SPLITMUX_PART_TYPE_UNLOCK (reader);
1084 SPLITMUX_PART_LOCK (reader);
1085 reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1086 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1087 reader->running = TRUE;
1089 while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1090 GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1091 SPLITMUX_PART_WAIT (reader);
1094 if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
1095 reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
1096 gst_splitmux_part_reader_measure_streams (reader);
1097 } else if (reader->prep_state == PART_STATE_FAILED)
1098 ret = GST_STATE_CHANGE_FAILURE;
1099 SPLITMUX_PART_UNLOCK (reader);
1101 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1102 SPLITMUX_PART_LOCK (reader);
1103 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1104 reader->active = TRUE;
1105 SPLITMUX_PART_BROADCAST (reader);
1106 SPLITMUX_PART_UNLOCK (reader);
1108 case GST_STATE_CHANGE_READY_TO_NULL:
1109 reader->prep_state = PART_STATE_NULL;
1110 splitmux_part_reader_reset (reader);
1121 check_bus_messages (GstSplitMuxPartReader * part)
1123 gboolean ret = FALSE;
1127 bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1128 while ((m = gst_bus_pop (bus)) != NULL) {
1129 if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1130 GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1131 gst_message_unref (m);
1134 gst_message_unref (m);
1138 gst_object_unref (bus);
1143 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1145 GstStateChangeReturn ret;
1147 ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1149 if (ret != GST_STATE_CHANGE_SUCCESS)
1152 return check_bus_messages (part);
1156 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1158 gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1162 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1165 reader->path = g_strdup (path);
1169 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1172 GST_DEBUG_OBJECT (reader, "Activating part reader");
1174 if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
1175 GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1179 if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1180 GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1181 GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1188 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1192 SPLITMUX_PART_LOCK (part);
1194 SPLITMUX_PART_UNLOCK (part);
1200 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1202 GST_DEBUG_OBJECT (reader, "Deactivating reader");
1203 gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1207 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1212 GST_LOG_OBJECT (reader, "%s dataqueues",
1213 flushing ? "Flushing" : "Done flushing");
1214 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1215 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1216 gst_data_queue_set_flushing (part_pad->queue, flushing);
1218 gst_data_queue_flush (part_pad->queue);
1223 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1224 gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1226 reader->cb_data = cb_data;
1227 reader->get_pad_cb = get_pad_cb;
1231 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1234 GstClockTime ret = GST_CLOCK_TIME_NONE;
1236 SPLITMUX_PART_LOCK (reader);
1237 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1238 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1239 if (part_pad->max_ts < ret)
1240 ret = part_pad->max_ts;
1243 SPLITMUX_PART_UNLOCK (reader);
1249 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1250 GstClockTime offset)
1252 SPLITMUX_PART_LOCK (reader);
1253 reader->start_offset = offset;
1254 GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1255 GST_TIME_ARGS (offset));
1256 SPLITMUX_PART_UNLOCK (reader);
1260 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1262 GstClockTime ret = GST_CLOCK_TIME_NONE;
1264 SPLITMUX_PART_LOCK (reader);
1265 ret = reader->start_offset;
1266 SPLITMUX_PART_UNLOCK (reader);
1272 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1276 SPLITMUX_PART_LOCK (reader);
1277 dur = reader->duration;
1278 SPLITMUX_PART_UNLOCK (reader);
1284 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1287 GstPad *result = NULL;
1290 SPLITMUX_PART_LOCK (reader);
1291 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1292 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1293 if (part_pad->target == target) {
1294 result = (GstPad *) gst_object_ref (part_pad);
1298 SPLITMUX_PART_UNLOCK (reader);
1304 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1305 GstDataQueueItem ** item)
1307 GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1311 /* Get one item from the appropriate dataqueue */
1312 SPLITMUX_PART_LOCK (reader);
1313 if (reader->prep_state == PART_STATE_FAILED) {
1314 SPLITMUX_PART_UNLOCK (reader);
1315 return GST_FLOW_ERROR;
1318 q = gst_object_ref (part_pad->queue);
1320 /* Have to drop the lock around pop, so we can be woken up for flush */
1321 SPLITMUX_PART_UNLOCK (reader);
1322 if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1323 ret = GST_FLOW_FLUSHING;
1327 SPLITMUX_PART_LOCK (reader);
1329 SPLITMUX_PART_BROADCAST (reader);
1330 if (GST_IS_EVENT ((*item)->object)) {
1331 GstEvent *e = (GstEvent *) ((*item)->object);
1332 /* Mark this pad as EOS */
1333 if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1334 part_pad->is_eos = TRUE;
1337 SPLITMUX_PART_UNLOCK (reader);
1341 gst_object_unref (q);
1346 bus_handler (GstBin * bin, GstMessage * message)
1348 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1350 switch (GST_MESSAGE_TYPE (message)) {
1351 case GST_MESSAGE_ERROR:
1352 /* Make sure to set the state to failed and wake up the listener
1354 SPLITMUX_PART_LOCK (reader);
1355 reader->prep_state = PART_STATE_FAILED;
1356 SPLITMUX_PART_BROADCAST (reader);
1357 SPLITMUX_PART_UNLOCK (reader);
1363 GST_BIN_CLASS (parent_class)->handle_message (bin, message);