splitmuxsink: Allow time and bytes to reach their respective thresholds
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxpartreader.c
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39
40 enum
41 {
42   SIGNAL_PREPARED,
43   LAST_SIGNAL
44 };
45
46 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
47
48 typedef struct _GstSplitMuxPartPad
49 {
50   GstPad parent;
51
52   /* Reader we belong to */
53   GstSplitMuxPartReader *reader;
54   /* Output splitmuxsrc source pad */
55   GstPad *target;
56
57   GstDataQueue *queue;
58
59   gboolean is_eos;
60   gboolean flushing;
61   gboolean seen_buffer;
62
63   GstClockTime max_ts;
64   GstSegment segment;
65
66   GstSegment orig_segment;
67   GstClockTime initial_ts_offset;
68 } GstSplitMuxPartPad;
69
70 typedef struct _GstSplitMuxPartPadClass
71 {
72   GstPadClass parent;
73 } GstSplitMuxPartPadClass;
74
75 static GType gst_splitmux_part_pad_get_type (void);
76 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
77 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
78
79 static void splitmux_part_pad_constructed (GObject * pad);
80 static void splitmux_part_pad_finalize (GObject * pad);
81 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
82     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
83
84 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
85     guint visible, guint bytes, guint64 time, gpointer checkdata);
86 static void type_found (GstElement * typefind, guint probability,
87     GstCaps * caps, GstSplitMuxPartReader * reader);
88 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
89
90 /* Called with reader lock held */
91 static gboolean
92 have_empty_queue (GstSplitMuxPartReader * reader)
93 {
94   GList *cur;
95
96   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
97     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
98     if (part_pad->is_eos) {
99       GST_LOG_OBJECT (part_pad, "Pad is EOS");
100       return TRUE;
101     }
102     if (gst_data_queue_is_empty (part_pad->queue)) {
103       GST_LOG_OBJECT (part_pad, "Queue is empty");
104       return TRUE;
105     }
106   }
107
108   return FALSE;
109 }
110
111 /* Called with reader lock held */
112 static gboolean
113 block_until_can_push (GstSplitMuxPartReader * reader)
114 {
115   while (reader->running) {
116     if (reader->flushing)
117       goto out;
118     if (reader->active && have_empty_queue (reader))
119       goto out;
120
121     GST_LOG_OBJECT (reader,
122         "Waiting for activation or empty queue on reader %s", reader->path);
123     SPLITMUX_PART_WAIT (reader);
124   }
125
126   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
127       reader->path, reader->active, reader->flushing);
128 out:
129   return reader->active && !reader->flushing;
130 }
131
132 static void
133 handle_buffer_measuring (GstSplitMuxPartReader * reader,
134     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
135 {
136   GstClockTime ts = GST_CLOCK_TIME_NONE;
137   GstClockTimeDiff offset;
138
139   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
140       !part_pad->seen_buffer) {
141     /* If this is the first buffer on the pad in the collect_streams state,
142      * then calculate inital offset based on running time of this segment */
143     part_pad->initial_ts_offset =
144         part_pad->orig_segment.start + part_pad->orig_segment.base -
145         part_pad->orig_segment.time;
146     GST_DEBUG_OBJECT (reader,
147         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
148         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
149   }
150   part_pad->seen_buffer = TRUE;
151
152   /* Adjust buffer timestamps */
153   offset = reader->start_offset + part_pad->segment.base;
154   offset -= part_pad->initial_ts_offset;
155
156   /* Update the stored max duration on the pad,
157    * always preferring making DTS contiguous
158    * where possible */
159   if (GST_BUFFER_DTS_IS_VALID (buf))
160     ts = GST_BUFFER_DTS (buf) + offset;
161   else if (GST_BUFFER_PTS_IS_VALID (buf))
162     ts = GST_BUFFER_PTS (buf) + offset;
163
164   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
165       " incoming PTS %" GST_TIME_FORMAT
166       " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
167       " to %" GST_TIME_FORMAT, part_pad,
168       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
169       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
170       GST_STIME_ARGS (offset), GST_TIME_ARGS (ts));
171
172   if (GST_CLOCK_TIME_IS_VALID (ts)) {
173     if (GST_BUFFER_DURATION_IS_VALID (buf))
174       ts += GST_BUFFER_DURATION (buf);
175
176     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
177       part_pad->max_ts = ts;
178       GST_LOG_OBJECT (reader,
179           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
180           GST_TIME_ARGS (part_pad->max_ts));
181     }
182   }
183   /* Is it time to move to measuring state yet? */
184   check_if_pads_collected (reader);
185 }
186
187 static gboolean
188 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
189     guint visible, guint bytes, guint64 time, gpointer checkdata)
190 {
191   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
192   if (time > 20 * GST_SECOND)
193     return TRUE;
194   return FALSE;
195 }
196
197 static void
198 splitmux_part_free_queue_item (GstDataQueueItem * item)
199 {
200   gst_mini_object_unref (item->object);
201   g_slice_free (GstDataQueueItem, item);
202 }
203
204 static GstFlowReturn
205 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
206 {
207   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
208   GstSplitMuxPartReader *reader = part_pad->reader;
209   GstDataQueueItem *item;
210   GstClockTimeDiff offset;
211
212   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
213   SPLITMUX_PART_LOCK (reader);
214
215   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
216       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
217     handle_buffer_measuring (reader, part_pad, buf);
218     gst_buffer_unref (buf);
219     SPLITMUX_PART_UNLOCK (reader);
220     return GST_FLOW_OK;
221   }
222
223   if (!block_until_can_push (reader)) {
224     /* Flushing */
225     SPLITMUX_PART_UNLOCK (reader);
226     gst_buffer_unref (buf);
227     return GST_FLOW_FLUSHING;
228   }
229
230   /* Adjust buffer timestamps */
231   offset = reader->start_offset + part_pad->segment.base;
232   offset -= part_pad->initial_ts_offset;
233
234   if (GST_BUFFER_PTS_IS_VALID (buf))
235     GST_BUFFER_PTS (buf) += offset;
236   if (GST_BUFFER_DTS_IS_VALID (buf))
237     GST_BUFFER_DTS (buf) += offset;
238
239   /* We are active, and one queue is empty, place this buffer in
240    * the dataqueue */
241   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
242   item = g_slice_new (GstDataQueueItem);
243   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
244   item->object = GST_MINI_OBJECT (buf);
245   item->size = gst_buffer_get_size (buf);
246   item->duration = GST_BUFFER_DURATION (buf);
247   if (item->duration == GST_CLOCK_TIME_NONE)
248     item->duration = 0;
249   item->visible = TRUE;
250
251   gst_object_ref (part_pad);
252
253   SPLITMUX_PART_UNLOCK (reader);
254
255   if (!gst_data_queue_push (part_pad->queue, item)) {
256     splitmux_part_free_queue_item (item);
257     gst_object_unref (part_pad);
258     return GST_FLOW_FLUSHING;
259   }
260
261   gst_object_unref (part_pad);
262   return GST_FLOW_OK;
263 }
264
265 /* Called with splitmux part lock held */
266 static gboolean
267 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
268 {
269   GList *cur;
270   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
271     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
272     if (!part_pad->is_eos)
273       return FALSE;
274   }
275
276   return TRUE;
277 }
278
279 static gboolean
280 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
281 {
282   GList *cur;
283   GST_LOG_OBJECT (part, "Checking for preroll");
284   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
285     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
286     if (!part_pad->seen_buffer) {
287       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
288           part_pad);
289       return FALSE;
290     }
291   }
292   GST_LOG_OBJECT (part, "Part is prerolled");
293   return TRUE;
294 }
295
296
297 gboolean
298 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
299 {
300   gboolean res;
301
302   SPLITMUX_PART_LOCK (reader);
303   res = splitmux_part_is_eos_locked (reader);
304   SPLITMUX_PART_UNLOCK (reader);
305
306   return res;
307 }
308
309 /* Called with splitmux part lock held */
310 static gboolean
311 splitmux_is_flushing (GstSplitMuxPartReader * reader)
312 {
313   GList *cur;
314   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
315     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
316     if (part_pad->flushing)
317       return TRUE;
318   }
319
320   return FALSE;
321 }
322
323 static gboolean
324 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
325 {
326   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
327   GstSplitMuxPartReader *reader = part_pad->reader;
328   gboolean ret = TRUE;
329   SplitMuxSrcPad *target;
330   GstDataQueueItem *item;
331
332   SPLITMUX_PART_LOCK (reader);
333
334   target = gst_object_ref (part_pad->target);
335
336   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
337       event);
338
339   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
340     goto drop_event;
341
342   switch (GST_EVENT_TYPE (event)) {
343     case GST_EVENT_SEGMENT:{
344       GstSegment *seg = &part_pad->segment;
345
346       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
347
348       gst_event_copy_segment (event, seg);
349       gst_event_copy_segment (event, &part_pad->orig_segment);
350
351       if (seg->format != GST_FORMAT_TIME)
352         goto wrong_segment;
353
354       /* Adjust segment */
355       /* Adjust start/stop so the overall file is 0 + start_offset based */
356       if (seg->stop != -1) {
357         seg->stop -= seg->start;
358         seg->stop += seg->time + reader->start_offset;
359       }
360       seg->start = seg->time + reader->start_offset;
361       seg->time += reader->start_offset;
362       seg->position += reader->start_offset;
363
364       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
365
366       /* Replace event */
367       gst_event_unref (event);
368       event = gst_event_new_segment (seg);
369
370       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
371           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
372         break;                  /* Only do further stuff with segments during initial measuring */
373
374       /* Take the first segment from the first part */
375       if (target->segment.format == GST_FORMAT_UNDEFINED) {
376         gst_segment_copy_into (seg, &target->segment);
377         GST_DEBUG_OBJECT (reader,
378             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
379       }
380
381       if (seg->stop != -1 && target->segment.stop != -1) {
382         GstClockTime stop = seg->base + seg->stop;
383         if (stop > target->segment.stop) {
384           target->segment.stop = stop;
385           GST_DEBUG_OBJECT (reader,
386               "Adjusting segment stop by %" GST_TIME_FORMAT
387               " output now %" GST_SEGMENT_FORMAT,
388               GST_TIME_ARGS (reader->start_offset), &target->segment);
389         }
390       }
391       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
392       break;
393     }
394     case GST_EVENT_EOS:{
395
396       GST_DEBUG_OBJECT (part_pad,
397           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
398           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
399
400       if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
401           reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
402         /* Mark this pad as EOS */
403         part_pad->is_eos = TRUE;
404         if (splitmux_part_is_eos_locked (reader)) {
405           /* Finished measuring things, set state and tell the state change func
406            * so it can seek back to the start */
407           GST_LOG_OBJECT (reader,
408               "EOS while measuring streams. Resetting for ready");
409           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
410           SPLITMUX_PART_BROADCAST (reader);
411         }
412         goto drop_event;
413       }
414       break;
415     }
416     case GST_EVENT_FLUSH_START:
417       reader->flushing = TRUE;
418       part_pad->flushing = TRUE;
419       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
420           part_pad);
421       gst_data_queue_set_flushing (part_pad->queue, TRUE);
422       SPLITMUX_PART_BROADCAST (reader);
423       break;
424     case GST_EVENT_FLUSH_STOP:{
425       gst_data_queue_set_flushing (part_pad->queue, FALSE);
426       gst_data_queue_flush (part_pad->queue);
427       part_pad->seen_buffer = FALSE;
428       part_pad->flushing = FALSE;
429       part_pad->is_eos = FALSE;
430
431       reader->flushing = splitmux_is_flushing (reader);
432       GST_LOG_OBJECT (reader,
433           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
434           reader->path, pad, reader->flushing);
435       SPLITMUX_PART_BROADCAST (reader);
436       break;
437     }
438     default:
439       break;
440   }
441
442   /* Don't send events downstream while preparing */
443   if (reader->prep_state != PART_STATE_READY)
444     goto drop_event;
445
446   /* Don't pass flush events - those are done by the parent */
447   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
448       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
449     goto drop_event;
450
451   if (!block_until_can_push (reader)) {
452     SPLITMUX_PART_UNLOCK (reader);
453     gst_object_unref (target);
454     gst_event_unref (event);
455     return FALSE;
456   }
457
458   switch (GST_EVENT_TYPE (event)) {
459     case GST_EVENT_GAP:{
460       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
461       goto drop_event;
462     }
463     default:
464       break;
465   }
466
467   /* We are active, and one queue is empty, place this buffer in
468    * the dataqueue */
469   gst_object_ref (part_pad->queue);
470   SPLITMUX_PART_UNLOCK (reader);
471
472   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
473   item = g_slice_new (GstDataQueueItem);
474   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
475   item->object = GST_MINI_OBJECT (event);
476   item->size = 0;
477   item->duration = 0;
478   if (item->duration == GST_CLOCK_TIME_NONE)
479     item->duration = 0;
480   item->visible = FALSE;
481
482   if (!gst_data_queue_push (part_pad->queue, item)) {
483     splitmux_part_free_queue_item (item);
484     ret = FALSE;
485   }
486
487   gst_object_unref (part_pad->queue);
488   gst_object_unref (target);
489
490   return ret;
491 wrong_segment:
492   gst_event_unref (event);
493   gst_object_unref (target);
494   SPLITMUX_PART_UNLOCK (reader);
495   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
496       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
497           reader->path, pad));
498   return FALSE;
499 drop_event:
500   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
501       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
502   gst_event_unref (event);
503   gst_object_unref (target);
504   SPLITMUX_PART_UNLOCK (reader);
505   return TRUE;
506 }
507
508 static gboolean
509 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
510 {
511   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
512   GstSplitMuxPartReader *reader = part_pad->reader;
513   GstPad *target;
514   gboolean ret = FALSE;
515   gboolean active;
516
517   SPLITMUX_PART_LOCK (reader);
518   target = gst_object_ref (part_pad->target);
519   active = reader->active;
520   SPLITMUX_PART_UNLOCK (reader);
521
522   if (active) {
523     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
524         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
525
526     ret = gst_pad_query (target, query);
527   }
528
529   gst_object_unref (target);
530
531   return ret;
532 }
533
534 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
535
536 static void
537 splitmux_part_pad_constructed (GObject * pad)
538 {
539   gst_pad_set_chain_function (GST_PAD (pad),
540       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
541   gst_pad_set_event_function (GST_PAD (pad),
542       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
543   gst_pad_set_query_function (GST_PAD (pad),
544       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
545
546   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
547 }
548
549 static void
550 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
551 {
552   GObjectClass *gobject_klass = (GObjectClass *) (klass);
553
554   gobject_klass->constructed = splitmux_part_pad_constructed;
555   gobject_klass->finalize = splitmux_part_pad_finalize;
556 }
557
558 static void
559 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
560 {
561   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
562       NULL, NULL, pad);
563   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
564   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
565 }
566
567 static void
568 splitmux_part_pad_finalize (GObject * obj)
569 {
570   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
571
572   GST_DEBUG_OBJECT (obj, "finalize");
573   gst_data_queue_set_flushing (pad->queue, TRUE);
574   gst_data_queue_flush (pad->queue);
575   gst_object_unref (GST_OBJECT_CAST (pad->queue));
576   pad->queue = NULL;
577
578   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
579 }
580
581 static void
582 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
583     GstSplitMuxPartReader * part);
584 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
585 static GstStateChangeReturn
586 gst_splitmux_part_reader_change_state (GstElement * element,
587     GstStateChange transition);
588 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
589     GstEvent * event);
590 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
591     * part, gboolean flushing);
592 static void bus_handler (GstBin * bin, GstMessage * msg);
593 static void splitmux_part_reader_dispose (GObject * object);
594 static void splitmux_part_reader_finalize (GObject * object);
595 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
596
597 #define gst_splitmux_part_reader_parent_class parent_class
598 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
599     GST_TYPE_PIPELINE);
600
601 static void
602 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
603 {
604   GObjectClass *gobject_klass = (GObjectClass *) (klass);
605   GstElementClass *gstelement_class = (GstElementClass *) klass;
606   GstBinClass *gstbin_class = (GstBinClass *) klass;
607
608   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
609       "Split File Demuxing Source helper");
610
611   gobject_klass->dispose = splitmux_part_reader_dispose;
612   gobject_klass->finalize = splitmux_part_reader_finalize;
613
614   part_reader_signals[SIGNAL_PREPARED] =
615       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
616       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
617           prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
618   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
619   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
620
621   gstbin_class->handle_message = bus_handler;
622 }
623
624 static void
625 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
626 {
627   GstElement *typefind;
628
629   reader->active = FALSE;
630   reader->duration = GST_CLOCK_TIME_NONE;
631
632   g_cond_init (&reader->inactive_cond);
633   g_mutex_init (&reader->lock);
634   g_mutex_init (&reader->type_lock);
635
636   /* FIXME: Create elements on a state change */
637   reader->src = gst_element_factory_make ("filesrc", NULL);
638   if (reader->src == NULL) {
639     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
640     return;
641   }
642   gst_bin_add (GST_BIN_CAST (reader), reader->src);
643
644   typefind = gst_element_factory_make ("typefind", NULL);
645   if (!typefind) {
646     GST_ERROR_OBJECT (reader,
647         "Failed to create typefind element - check your installation");
648     return;
649   }
650
651   gst_bin_add (GST_BIN_CAST (reader), typefind);
652   reader->typefind = typefind;
653
654   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
655     GST_ERROR_OBJECT (reader,
656         "Failed to link typefind element - check your installation");
657     return;
658   }
659
660   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
661       reader);
662 }
663
664 static void
665 splitmux_part_reader_dispose (GObject * object)
666 {
667   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
668
669   splitmux_part_reader_reset (reader);
670
671   G_OBJECT_CLASS (parent_class)->dispose (object);
672 }
673
674 static void
675 splitmux_part_reader_finalize (GObject * object)
676 {
677   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
678
679   g_cond_clear (&reader->inactive_cond);
680   g_mutex_clear (&reader->lock);
681   g_mutex_clear (&reader->type_lock);
682
683   g_free (reader->path);
684
685   G_OBJECT_CLASS (parent_class)->finalize (object);
686 }
687
688 static void
689 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
690 {
691   GList *cur;
692
693   SPLITMUX_PART_LOCK (reader);
694   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
695     GstPad *pad = GST_PAD_CAST (cur->data);
696     gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
697     gst_object_unref (GST_OBJECT_CAST (pad));
698   }
699
700   g_list_free (reader->pads);
701   reader->pads = NULL;
702   SPLITMUX_PART_UNLOCK (reader);
703 }
704
705 static GstSplitMuxPartPad *
706 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
707     GstPad * target)
708 {
709   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
710       "name", GST_PAD_NAME (target),
711       "direction", GST_PAD_SINK,
712       NULL);
713   pad->target = target;
714   pad->reader = reader;
715
716   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
717
718   return pad;
719 }
720
721 static void
722 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
723     GstSplitMuxPartReader * reader)
724 {
725   GstPad *out_pad = NULL;
726   GstSplitMuxPartPad *proxy_pad;
727   GstCaps *caps;
728   GstPadLinkReturn link_ret;
729
730   caps = gst_pad_get_current_caps (pad);
731
732   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
733       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
734
735   gst_caps_unref (caps);
736
737   /* Look up or create the output pad */
738   if (reader->get_pad_cb)
739     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
740   if (out_pad == NULL) {
741     GST_DEBUG_OBJECT (reader,
742         "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
743     return;
744   }
745
746   /* Create our proxy pad to interact with this new pad */
747   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
748   GST_DEBUG_OBJECT (reader,
749       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
750       proxy_pad, out_pad);
751
752   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
753   if (link_ret != GST_PAD_LINK_OK) {
754     gst_object_unref (proxy_pad);
755     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
756         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
757             " ret %d", reader->path, pad, link_ret));
758     return;
759   }
760   GST_DEBUG_OBJECT (reader,
761       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
762       pad, proxy_pad);
763
764   SPLITMUX_PART_LOCK (reader);
765   reader->pads = g_list_prepend (reader->pads, proxy_pad);
766   SPLITMUX_PART_UNLOCK (reader);
767 }
768
769 static gboolean
770 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
771 {
772   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
773   gboolean ret = FALSE;
774   GstPad *pad = NULL;
775
776   /* Send event to the first source pad we found */
777   SPLITMUX_PART_LOCK (reader);
778   if (reader->pads) {
779     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
780     pad = gst_pad_get_peer (proxy_pad);
781   }
782   SPLITMUX_PART_UNLOCK (reader);
783
784   if (pad) {
785     ret = gst_pad_send_event (pad, event);
786     gst_object_unref (pad);
787   } else {
788     gst_event_unref (event);
789   }
790
791   return ret;
792 }
793
794 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
795 static void
796 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
797     GstClockTime time)
798 {
799   SPLITMUX_PART_UNLOCK (reader);
800   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
801       GST_TIME_ARGS (time));
802   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
803       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
804       GST_SEEK_TYPE_END, 0);
805
806   SPLITMUX_PART_LOCK (reader);
807
808   /* Wait for flush to finish, so old data is gone */
809   while (reader->flushing) {
810     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
811     SPLITMUX_PART_WAIT (reader);
812   }
813 }
814
815 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
816 static gboolean
817 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
818     GstSegment * target_seg)
819 {
820   GstSeekFlags flags;
821   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
822
823   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
824
825   SPLITMUX_PART_LOCK (reader);
826   if (target_seg->start >= reader->start_offset)
827     start = target_seg->start - reader->start_offset;
828   /* If the segment stop is within this part, don't play to the end */
829   if (target_seg->stop != -1 &&
830       target_seg->stop < reader->start_offset + reader->duration)
831     stop = target_seg->stop - reader->start_offset;
832
833   SPLITMUX_PART_UNLOCK (reader);
834
835   GST_DEBUG_OBJECT (reader,
836       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
837       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
838       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
839
840   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
841       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
842       stop);
843 }
844
845 /* Called with lock held */
846 static void
847 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
848 {
849   /* Trigger a flushing seek to near the end of the file and run each stream
850    * to EOS in order to find the smallest end timestamp to start the next
851    * file from
852    */
853   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
854       && reader->duration > GST_SECOND) {
855     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
856     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
857   }
858
859   /* Wait for things to happen */
860   while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
861     SPLITMUX_PART_WAIT (reader);
862
863   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
864     /* Fire the prepared signal and go to READY state */
865     GST_DEBUG_OBJECT (reader,
866         "Stream measuring complete. File %s is now ready. Firing prepared signal",
867         reader->path);
868     reader->prep_state = PART_STATE_READY;
869     g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
870   }
871 }
872
873 static GstElement *
874 find_demuxer (GstCaps * caps)
875 {
876   GList *factories =
877       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
878       GST_RANK_MARGINAL);
879   GList *compat_elements;
880   GstElement *e = NULL;
881
882   if (factories == NULL)
883     return NULL;
884
885   compat_elements =
886       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
887
888   if (compat_elements) {
889     /* Just take the first (highest ranked) option */
890     GstElementFactory *factory =
891         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
892     e = gst_element_factory_create (factory, NULL);
893     gst_plugin_feature_list_free (compat_elements);
894   }
895
896   if (factories)
897     gst_plugin_feature_list_free (factories);
898
899   return e;
900 }
901
902 static void
903 type_found (GstElement * typefind, guint probability,
904     GstCaps * caps, GstSplitMuxPartReader * reader)
905 {
906   GstElement *demux;
907
908   GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
909
910   /* typefind found a type. Look for the demuxer to handle it */
911   demux = reader->demux = find_demuxer (caps);
912   if (reader->demux == NULL) {
913     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
914     return;
915   }
916
917   /* Connect to demux signals */
918   g_signal_connect (demux,
919       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
920   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
921
922   gst_element_set_locked_state (demux, TRUE);
923   gst_bin_add (GST_BIN_CAST (reader), demux);
924   gst_element_link_pads (reader->typefind, "src", demux, NULL);
925   gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
926   gst_element_set_locked_state (demux, FALSE);
927 }
928
929 static void
930 check_if_pads_collected (GstSplitMuxPartReader * reader)
931 {
932   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
933     /* Check we have all pads and each pad has seen a buffer */
934     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
935       GST_DEBUG_OBJECT (reader,
936           "no more pads - file %s. Measuring stream length", reader->path);
937       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
938       SPLITMUX_PART_BROADCAST (reader);
939     }
940   }
941 }
942
943 static void
944 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
945 {
946   GstClockTime duration = GST_CLOCK_TIME_NONE;
947   GList *cur;
948   /* Query the minimum duration of any pad in this piece and store it.
949    * FIXME: Only consider audio and video */
950   SPLITMUX_PART_LOCK (reader);
951   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
952     GstPad *target = GST_PAD_CAST (cur->data);
953     if (target) {
954       gint64 cur_duration;
955       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
956         GST_INFO_OBJECT (reader,
957             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
958             reader->path, target, GST_TIME_ARGS (cur_duration));
959         if (cur_duration < duration)
960           duration = cur_duration;
961       }
962     }
963   }
964   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
965       reader->path, GST_TIME_ARGS (duration));
966   reader->duration = (GstClockTime) duration;
967
968   reader->no_more_pads = TRUE;
969
970   check_if_pads_collected (reader);
971   SPLITMUX_PART_UNLOCK (reader);
972 }
973
974 gboolean
975 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
976     GstPad * src_pad, GstQuery * query)
977 {
978   GstPad *target = NULL;
979   gboolean ret;
980   GList *cur;
981
982   SPLITMUX_PART_LOCK (part);
983   /* Find the pad corresponding to the visible output target pad */
984   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
985     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
986     if (part_pad->target == src_pad) {
987       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
988       break;
989     }
990   }
991   SPLITMUX_PART_UNLOCK (part);
992
993   if (target == NULL)
994     return FALSE;
995
996   ret = gst_pad_peer_query (target, query);
997   gst_object_unref (GST_OBJECT_CAST (target));
998
999   if (ret == FALSE)
1000     goto out;
1001
1002   /* Post-massaging of queries */
1003   switch (GST_QUERY_TYPE (query)) {
1004     case GST_QUERY_POSITION:{
1005       GstFormat fmt;
1006       gint64 position;
1007
1008       gst_query_parse_position (query, &fmt, &position);
1009       if (fmt != GST_FORMAT_TIME)
1010         return FALSE;
1011       SPLITMUX_PART_LOCK (part);
1012       position += part->start_offset;
1013       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1014           GST_TIME_ARGS (position));
1015       SPLITMUX_PART_UNLOCK (part);
1016
1017       gst_query_set_position (query, fmt, position);
1018       break;
1019     }
1020     default:
1021       break;
1022   }
1023
1024 out:
1025   gst_object_unref (target);
1026   return ret;
1027 }
1028
1029 static GstStateChangeReturn
1030 gst_splitmux_part_reader_change_state (GstElement * element,
1031     GstStateChange transition)
1032 {
1033   GstStateChangeReturn ret;
1034   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1035
1036   switch (transition) {
1037     case GST_STATE_CHANGE_NULL_TO_READY:{
1038       break;
1039     }
1040     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1041       /* Hold the splitmux type lock until after the
1042        * parent state change function has finished
1043        * changing the states of things, and type finding can continue */
1044       SPLITMUX_PART_LOCK (reader);
1045       g_object_set (reader->src, "location", reader->path, NULL);
1046       SPLITMUX_PART_UNLOCK (reader);
1047       SPLITMUX_PART_TYPE_LOCK (reader);
1048       break;
1049     }
1050     case GST_STATE_CHANGE_READY_TO_NULL:
1051     case GST_STATE_CHANGE_PAUSED_TO_READY:
1052       SPLITMUX_PART_LOCK (reader);
1053       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1054       reader->running = FALSE;
1055       SPLITMUX_PART_BROADCAST (reader);
1056       SPLITMUX_PART_UNLOCK (reader);
1057       break;
1058     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1059       SPLITMUX_PART_LOCK (reader);
1060       reader->active = FALSE;
1061       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1062       SPLITMUX_PART_BROADCAST (reader);
1063       SPLITMUX_PART_UNLOCK (reader);
1064       break;
1065     default:
1066       break;
1067   }
1068
1069   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1070   if (ret == GST_STATE_CHANGE_FAILURE) {
1071     if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
1072       /* Make sure to release the lock we took above */
1073       SPLITMUX_PART_TYPE_UNLOCK (reader);
1074     }
1075     goto beach;
1076   }
1077
1078   switch (transition) {
1079     case GST_STATE_CHANGE_READY_TO_PAUSED:
1080       /* Sleep and wait until all streams have been collected, then do the seeks
1081        * to measure the stream lengths. This took the type lock above,
1082        * but it's OK to release it now and let typefinding happen... */
1083       SPLITMUX_PART_TYPE_UNLOCK (reader);
1084
1085       SPLITMUX_PART_LOCK (reader);
1086       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1087       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1088       reader->running = TRUE;
1089
1090       while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1091         GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1092         SPLITMUX_PART_WAIT (reader);
1093       }
1094
1095       if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
1096           reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
1097         gst_splitmux_part_reader_measure_streams (reader);
1098       } else if (reader->prep_state == PART_STATE_FAILED)
1099         ret = GST_STATE_CHANGE_FAILURE;
1100       SPLITMUX_PART_UNLOCK (reader);
1101       break;
1102     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1103       SPLITMUX_PART_LOCK (reader);
1104       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1105       reader->active = TRUE;
1106       SPLITMUX_PART_BROADCAST (reader);
1107       SPLITMUX_PART_UNLOCK (reader);
1108       break;
1109     case GST_STATE_CHANGE_READY_TO_NULL:
1110       reader->prep_state = PART_STATE_NULL;
1111       splitmux_part_reader_reset (reader);
1112       break;
1113     default:
1114       break;
1115   }
1116
1117 beach:
1118   return ret;
1119 }
1120
1121 static gboolean
1122 check_bus_messages (GstSplitMuxPartReader * part)
1123 {
1124   gboolean ret = FALSE;
1125   GstBus *bus;
1126   GstMessage *m;
1127
1128   bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1129   while ((m = gst_bus_pop (bus)) != NULL) {
1130     if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1131       GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1132       gst_message_unref (m);
1133       goto done;
1134     }
1135     gst_message_unref (m);
1136   }
1137   ret = TRUE;
1138 done:
1139   gst_object_unref (bus);
1140   return ret;
1141 }
1142
1143 gboolean
1144 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1145 {
1146   GstStateChangeReturn ret;
1147
1148   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1149
1150   if (ret != GST_STATE_CHANGE_SUCCESS)
1151     return FALSE;
1152
1153   return check_bus_messages (part);
1154 }
1155
1156 void
1157 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1158 {
1159   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1160 }
1161
1162 void
1163 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1164     const gchar * path)
1165 {
1166   reader->path = g_strdup (path);
1167 }
1168
1169 gboolean
1170 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1171     GstSegment * seg)
1172 {
1173   GST_DEBUG_OBJECT (reader, "Activating part reader");
1174
1175   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
1176     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1177         seg);
1178     return FALSE;
1179   }
1180   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1181           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1182     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1183     return FALSE;
1184   }
1185   return TRUE;
1186 }
1187
1188 void
1189 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1190 {
1191   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1192   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1193 }
1194
1195 void
1196 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1197     gboolean flushing)
1198 {
1199   GList *cur;
1200
1201   GST_LOG_OBJECT (reader, "%s dataqueues",
1202       flushing ? "Flushing" : "Done flushing");
1203   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1204     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1205     gst_data_queue_set_flushing (part_pad->queue, flushing);
1206     if (flushing)
1207       gst_data_queue_flush (part_pad->queue);
1208   }
1209 };
1210
1211 void
1212 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1213     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1214 {
1215   reader->cb_data = cb_data;
1216   reader->get_pad_cb = get_pad_cb;
1217 }
1218
1219 GstClockTime
1220 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1221 {
1222   GList *cur;
1223   GstClockTime ret = GST_CLOCK_TIME_NONE;
1224
1225   SPLITMUX_PART_LOCK (reader);
1226   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1227     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1228     if (part_pad->max_ts < ret)
1229       ret = part_pad->max_ts;
1230   }
1231
1232   SPLITMUX_PART_UNLOCK (reader);
1233
1234   return ret;
1235 }
1236
1237 void
1238 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1239     GstClockTime offset)
1240 {
1241   SPLITMUX_PART_LOCK (reader);
1242   reader->start_offset = offset;
1243   GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1244       GST_TIME_ARGS (offset));
1245   SPLITMUX_PART_UNLOCK (reader);
1246 }
1247
1248 GstClockTime
1249 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1250 {
1251   GstClockTime ret = GST_CLOCK_TIME_NONE;
1252
1253   SPLITMUX_PART_LOCK (reader);
1254   ret = reader->start_offset;
1255   SPLITMUX_PART_UNLOCK (reader);
1256
1257   return ret;
1258 }
1259
1260 GstClockTime
1261 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1262 {
1263   GstClockTime dur;
1264
1265   SPLITMUX_PART_LOCK (reader);
1266   dur = reader->duration;
1267   SPLITMUX_PART_UNLOCK (reader);
1268
1269   return dur;
1270 }
1271
1272 GstPad *
1273 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1274     GstPad * target)
1275 {
1276   GstPad *result = NULL;
1277   GList *cur;
1278
1279   SPLITMUX_PART_LOCK (reader);
1280   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1281     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1282     if (part_pad->target == target) {
1283       result = (GstPad *) gst_object_ref (part_pad);
1284       break;
1285     }
1286   }
1287   SPLITMUX_PART_UNLOCK (reader);
1288
1289   return result;
1290 }
1291
1292 GstFlowReturn
1293 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1294     GstDataQueueItem ** item)
1295 {
1296   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1297   GstDataQueue *q;
1298   GstFlowReturn ret;
1299
1300   /* Get one item from the appropriate dataqueue */
1301   SPLITMUX_PART_LOCK (reader);
1302   if (reader->prep_state == PART_STATE_FAILED) {
1303     SPLITMUX_PART_UNLOCK (reader);
1304     return GST_FLOW_ERROR;
1305   }
1306
1307   q = gst_object_ref (part_pad->queue);
1308
1309   /* Have to drop the lock around pop, so we can be woken up for flush */
1310   SPLITMUX_PART_UNLOCK (reader);
1311   if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1312     ret = GST_FLOW_FLUSHING;
1313     goto out;
1314   }
1315
1316   SPLITMUX_PART_LOCK (reader);
1317
1318   SPLITMUX_PART_BROADCAST (reader);
1319   if (GST_IS_EVENT ((*item)->object)) {
1320     GstEvent *e = (GstEvent *) ((*item)->object);
1321     /* Mark this pad as EOS */
1322     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1323       part_pad->is_eos = TRUE;
1324   }
1325
1326   SPLITMUX_PART_UNLOCK (reader);
1327
1328   ret = GST_FLOW_OK;
1329 out:
1330   gst_object_unref (q);
1331   return ret;
1332 }
1333
1334 static void
1335 bus_handler (GstBin * bin, GstMessage * message)
1336 {
1337   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1338
1339   switch (GST_MESSAGE_TYPE (message)) {
1340     case GST_MESSAGE_ERROR:
1341       /* Make sure to set the state to failed and wake up the listener
1342        * on error */
1343       SPLITMUX_PART_LOCK (reader);
1344       reader->prep_state = PART_STATE_FAILED;
1345       SPLITMUX_PART_BROADCAST (reader);
1346       SPLITMUX_PART_UNLOCK (reader);
1347       break;
1348     default:
1349       break;
1350   }
1351
1352   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1353 }