6e92c710137c7d08b825f2b0d509f512dbbc946d
[platform/upstream/gstreamer.git] / gst / multifile / gstsplitmuxpartreader.c
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 enum
38 {
39   SIGNAL_PREPARED,
40   LAST_SIGNAL
41 };
42
43 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
44
45 typedef struct _GstSplitMuxPartPad
46 {
47   GstPad parent;
48
49   /* Reader we belong to */
50   GstSplitMuxPartReader *reader;
51   /* Output splitmuxsrc source pad */
52   GstPad *target;
53
54   GstDataQueue *queue;
55
56   gboolean is_eos;
57   gboolean flushing;
58   gboolean seen_buffer;
59
60   GstClockTime max_ts;
61   GstSegment segment;
62
63   GstSegment orig_segment;
64   GstClockTime initial_ts_offset;
65 } GstSplitMuxPartPad;
66
67 typedef struct _GstSplitMuxPartPadClass
68 {
69   GstPadClass parent;
70 } GstSplitMuxPartPadClass;
71
72 static GType gst_splitmux_part_pad_get_type (void);
73 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
74 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
75
76 static void splitmux_part_pad_constructed (GObject * pad);
77 static void splitmux_part_pad_finalize (GObject * pad);
78 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
79     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
80
81 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
82     guint visible, guint bytes, guint64 time, gpointer checkdata);
83 static void type_found (GstElement * typefind, guint probability,
84     GstCaps * caps, GstSplitMuxPartReader * reader);
85 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
86
87 /* Called with reader lock held */
88 static gboolean
89 have_empty_queue (GstSplitMuxPartReader * reader)
90 {
91   GList *cur;
92
93   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
94     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
95     if (part_pad->is_eos) {
96       GST_LOG_OBJECT (part_pad, "Pad is EOS");
97       return TRUE;
98     }
99     if (gst_data_queue_is_empty (part_pad->queue)) {
100       GST_LOG_OBJECT (part_pad, "Queue is empty");
101       return TRUE;
102     }
103   }
104
105   return FALSE;
106 }
107
108 /* Called with reader lock held */
109 static gboolean
110 block_until_can_push (GstSplitMuxPartReader * reader)
111 {
112   while (reader->running) {
113     if (reader->flushing)
114       goto out;
115     if (reader->active && have_empty_queue (reader))
116       goto out;
117
118     GST_LOG_OBJECT (reader,
119         "Waiting for activation or empty queue on reader %s", reader->path);
120     SPLITMUX_PART_WAIT (reader);
121   }
122
123   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
124       reader->path, reader->active, reader->flushing);
125 out:
126   return reader->active && !reader->flushing;
127 }
128
129 static void
130 handle_buffer_measuring (GstSplitMuxPartReader * reader,
131     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
132 {
133   GstClockTime ts = GST_CLOCK_TIME_NONE;
134   GstClockTimeDiff offset;
135
136   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
137       !part_pad->seen_buffer) {
138     /* If this is the first buffer on the pad in the collect_streams state,
139      * then calculate inital offset based on running time of this segment */
140     part_pad->initial_ts_offset =
141         part_pad->orig_segment.start + part_pad->orig_segment.base -
142         part_pad->orig_segment.time;
143     GST_DEBUG_OBJECT (reader,
144         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
145         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
146   }
147   part_pad->seen_buffer = TRUE;
148
149   /* Adjust buffer timestamps */
150   offset = reader->start_offset + part_pad->segment.base;
151   offset -= part_pad->initial_ts_offset;
152
153   /* Update the stored max duration on the pad,
154    * always preferring making DTS contiguous
155    * where possible */
156   if (GST_BUFFER_DTS_IS_VALID (buf))
157     ts = GST_BUFFER_DTS (buf) + offset;
158   else if (GST_BUFFER_PTS_IS_VALID (buf))
159     ts = GST_BUFFER_PTS (buf) + offset;
160
161   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
162       " incoming PTS %" GST_TIME_FORMAT
163       " DTS %" GST_TIME_FORMAT " offset by %" GST_TIME_FORMAT
164       " to %" GST_TIME_FORMAT, part_pad,
165       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
166       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
167       GST_TIME_ARGS (offset), GST_TIME_ARGS (ts));
168
169   if (GST_CLOCK_TIME_IS_VALID (ts)) {
170     if (GST_BUFFER_DURATION_IS_VALID (buf))
171       ts += GST_BUFFER_DURATION (buf);
172
173     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
174       part_pad->max_ts = ts;
175       GST_LOG_OBJECT (reader,
176           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
177           GST_TIME_ARGS (part_pad->max_ts));
178     }
179   }
180   /* Is it time to move to measuring state yet? */
181   check_if_pads_collected (reader);
182 }
183
184 static gboolean
185 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
186     guint visible, guint bytes, guint64 time, gpointer checkdata)
187 {
188   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
189   if (time > 20 * GST_SECOND)
190     return TRUE;
191   return FALSE;
192 }
193
194 static void
195 splitmux_part_free_queue_item (GstDataQueueItem * item)
196 {
197   gst_mini_object_unref (item->object);
198   g_slice_free (GstDataQueueItem, item);
199 }
200
201 static GstFlowReturn
202 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
203 {
204   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
205   GstSplitMuxPartReader *reader = part_pad->reader;
206   GstDataQueueItem *item;
207   GstClockTimeDiff offset;
208
209   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
210   SPLITMUX_PART_LOCK (reader);
211
212   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
213       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
214     handle_buffer_measuring (reader, part_pad, buf);
215     gst_buffer_unref (buf);
216     SPLITMUX_PART_UNLOCK (reader);
217     return GST_FLOW_OK;
218   }
219
220   if (!block_until_can_push (reader)) {
221     /* Flushing */
222     SPLITMUX_PART_UNLOCK (reader);
223     gst_buffer_unref (buf);
224     return GST_FLOW_FLUSHING;
225   }
226
227   /* Adjust buffer timestamps */
228   offset = reader->start_offset + part_pad->segment.base;
229   offset -= part_pad->initial_ts_offset;
230
231   if (GST_BUFFER_PTS_IS_VALID (buf))
232     GST_BUFFER_PTS (buf) += offset;
233   if (GST_BUFFER_DTS_IS_VALID (buf))
234     GST_BUFFER_DTS (buf) += offset;
235
236   /* We are active, and one queue is empty, place this buffer in
237    * the dataqueue */
238   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
239   item = g_slice_new (GstDataQueueItem);
240   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
241   item->object = GST_MINI_OBJECT (buf);
242   item->size = gst_buffer_get_size (buf);
243   item->duration = GST_BUFFER_DURATION (buf);
244   if (item->duration == GST_CLOCK_TIME_NONE)
245     item->duration = 0;
246   item->visible = TRUE;
247
248   gst_object_ref (part_pad);
249
250   SPLITMUX_PART_UNLOCK (reader);
251
252   if (!gst_data_queue_push (part_pad->queue, item)) {
253     splitmux_part_free_queue_item (item);
254     gst_object_unref (part_pad);
255     return GST_FLOW_FLUSHING;
256   }
257
258   gst_object_unref (part_pad);
259   return GST_FLOW_OK;
260 }
261
262 /* Called with splitmux part lock held */
263 static gboolean
264 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
265 {
266   GList *cur;
267   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
268     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
269     if (!part_pad->is_eos)
270       return FALSE;
271   }
272
273   return TRUE;
274 }
275
276 static gboolean
277 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
278 {
279   GList *cur;
280   GST_LOG_OBJECT (part, "Checking for preroll");
281   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
282     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
283     if (!part_pad->seen_buffer) {
284       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
285           part_pad);
286       return FALSE;
287     }
288   }
289   GST_LOG_OBJECT (part, "Part is prerolled");
290   return TRUE;
291 }
292
293
294 gboolean
295 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
296 {
297   gboolean res;
298
299   SPLITMUX_PART_LOCK (reader);
300   res = splitmux_part_is_eos_locked (reader);
301   SPLITMUX_PART_UNLOCK (reader);
302
303   return res;
304 }
305
306 /* Called with splitmux part lock held */
307 static gboolean
308 splitmux_is_flushing (GstSplitMuxPartReader * reader)
309 {
310   GList *cur;
311   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
312     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
313     if (part_pad->flushing)
314       return TRUE;
315   }
316
317   return FALSE;
318 }
319
320 static gboolean
321 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
322 {
323   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
324   GstSplitMuxPartReader *reader = part_pad->reader;
325   gboolean ret = TRUE;
326   SplitMuxSrcPad *target;
327   GstDataQueueItem *item;
328
329   SPLITMUX_PART_LOCK (reader);
330
331   target = gst_object_ref (part_pad->target);
332
333   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
334       event);
335
336   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
337     goto drop_event;
338
339   switch (GST_EVENT_TYPE (event)) {
340     case GST_EVENT_SEGMENT:{
341       GstSegment *seg = &part_pad->segment;
342
343       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
344
345       gst_event_copy_segment (event, seg);
346       gst_event_copy_segment (event, &part_pad->orig_segment);
347
348       if (seg->format != GST_FORMAT_TIME)
349         goto wrong_segment;
350
351       /* Adjust segment */
352       /* Adjust start/stop so the overall file is 0 + start_offset based */
353       if (seg->stop != -1) {
354         seg->stop -= seg->start;
355         seg->stop += seg->time + reader->start_offset;
356       }
357       seg->start = seg->time + reader->start_offset;
358       seg->time += reader->start_offset;
359       seg->position += reader->start_offset;
360
361       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
362
363       /* Replace event */
364       gst_event_unref (event);
365       event = gst_event_new_segment (seg);
366
367       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
368           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
369         break;                  /* Only do further stuff with segments during initial measuring */
370
371       /* Take the first segment from the first part */
372       if (target->segment.format == GST_FORMAT_UNDEFINED) {
373         gst_segment_copy_into (seg, &target->segment);
374         GST_DEBUG_OBJECT (reader,
375             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
376       }
377
378       if (seg->stop != -1 && target->segment.stop != -1) {
379         GstClockTime stop = seg->base + seg->stop;
380         if (stop > target->segment.stop) {
381           target->segment.stop = stop;
382           GST_DEBUG_OBJECT (reader,
383               "Adjusting segment stop by %" GST_TIME_FORMAT
384               " output now %" GST_SEGMENT_FORMAT,
385               GST_TIME_ARGS (reader->start_offset), &target->segment);
386         }
387       }
388       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
389       break;
390     }
391     case GST_EVENT_EOS:{
392
393       GST_DEBUG_OBJECT (part_pad,
394           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
395           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
396
397       if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
398           reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
399         /* Mark this pad as EOS */
400         part_pad->is_eos = TRUE;
401         if (splitmux_part_is_eos_locked (reader)) {
402           /* Finished measuring things, set state and tell the state change func
403            * so it can seek back to the start */
404           GST_LOG_OBJECT (reader,
405               "EOS while measuring streams. Resetting for ready");
406           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
407           SPLITMUX_PART_BROADCAST (reader);
408         }
409         goto drop_event;
410       }
411       break;
412     }
413     case GST_EVENT_FLUSH_START:
414       reader->flushing = TRUE;
415       part_pad->flushing = TRUE;
416       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
417           part_pad);
418       gst_data_queue_set_flushing (part_pad->queue, TRUE);
419       SPLITMUX_PART_BROADCAST (reader);
420       break;
421     case GST_EVENT_FLUSH_STOP:{
422       gst_data_queue_set_flushing (part_pad->queue, FALSE);
423       gst_data_queue_flush (part_pad->queue);
424       part_pad->seen_buffer = FALSE;
425       part_pad->flushing = FALSE;
426       part_pad->is_eos = FALSE;
427
428       reader->flushing = splitmux_is_flushing (reader);
429       GST_LOG_OBJECT (reader,
430           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
431           reader->path, pad, reader->flushing);
432       SPLITMUX_PART_BROADCAST (reader);
433       break;
434     }
435     default:
436       break;
437   }
438
439   /* Don't send events downstream while preparing */
440   if (reader->prep_state != PART_STATE_READY)
441     goto drop_event;
442
443   /* Don't pass flush events - those are done by the parent */
444   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
445       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
446     goto drop_event;
447
448   if (!block_until_can_push (reader)) {
449     SPLITMUX_PART_UNLOCK (reader);
450     gst_object_unref (target);
451     gst_event_unref (event);
452     return FALSE;
453   }
454
455   switch (GST_EVENT_TYPE (event)) {
456     case GST_EVENT_GAP:{
457       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
458       goto drop_event;
459     }
460     default:
461       break;
462   }
463
464   /* We are active, and one queue is empty, place this buffer in
465    * the dataqueue */
466   gst_object_ref (part_pad->queue);
467   SPLITMUX_PART_UNLOCK (reader);
468
469   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
470   item = g_slice_new (GstDataQueueItem);
471   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
472   item->object = GST_MINI_OBJECT (event);
473   item->size = 0;
474   item->duration = 0;
475   if (item->duration == GST_CLOCK_TIME_NONE)
476     item->duration = 0;
477   item->visible = FALSE;
478
479   if (!gst_data_queue_push (part_pad->queue, item)) {
480     splitmux_part_free_queue_item (item);
481     ret = FALSE;
482   }
483
484   gst_object_unref (part_pad->queue);
485   gst_object_unref (target);
486
487   return ret;
488 wrong_segment:
489   gst_event_unref (event);
490   gst_object_unref (target);
491   SPLITMUX_PART_UNLOCK (reader);
492   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
493       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
494           reader->path, pad));
495   return FALSE;
496 drop_event:
497   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
498       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
499   gst_event_unref (event);
500   gst_object_unref (target);
501   SPLITMUX_PART_UNLOCK (reader);
502   return TRUE;
503 }
504
505 static gboolean
506 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
507 {
508   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
509   GstSplitMuxPartReader *reader = part_pad->reader;
510   GstPad *target;
511   gboolean ret = FALSE;
512   gboolean active;
513
514   SPLITMUX_PART_LOCK (reader);
515   target = gst_object_ref (part_pad->target);
516   active = reader->active;
517   SPLITMUX_PART_UNLOCK (reader);
518
519   if (active) {
520     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
521         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
522
523     ret = gst_pad_query (target, query);
524   }
525
526   gst_object_unref (target);
527
528   return ret;
529 }
530
531 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
532
533 static void
534 splitmux_part_pad_constructed (GObject * pad)
535 {
536   gst_pad_set_chain_function (GST_PAD (pad),
537       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
538   gst_pad_set_event_function (GST_PAD (pad),
539       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
540   gst_pad_set_query_function (GST_PAD (pad),
541       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
542
543   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
544 }
545
546 static void
547 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
548 {
549   GObjectClass *gobject_klass = (GObjectClass *) (klass);
550
551   gobject_klass->constructed = splitmux_part_pad_constructed;
552   gobject_klass->finalize = splitmux_part_pad_finalize;
553 }
554
555 static void
556 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
557 {
558   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
559       NULL, NULL, pad);
560   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
561   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
562 }
563
564 static void
565 splitmux_part_pad_finalize (GObject * obj)
566 {
567   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
568
569   GST_DEBUG_OBJECT (obj, "finalize");
570   gst_data_queue_set_flushing (pad->queue, TRUE);
571   gst_data_queue_flush (pad->queue);
572   gst_object_unref (GST_OBJECT_CAST (pad->queue));
573   pad->queue = NULL;
574
575   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
576 }
577
578 static void
579 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
580     GstSplitMuxPartReader * part);
581 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
582 static GstStateChangeReturn
583 gst_splitmux_part_reader_change_state (GstElement * element,
584     GstStateChange transition);
585 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
586     GstEvent * event);
587 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
588     * part, gboolean flushing);
589 static void bus_handler (GstBin * bin, GstMessage * msg);
590 static void splitmux_part_reader_dispose (GObject * object);
591 static void splitmux_part_reader_finalize (GObject * object);
592 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
593
594 #define gst_splitmux_part_reader_parent_class parent_class
595 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
596     GST_TYPE_PIPELINE);
597
598 static void
599 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
600 {
601   GObjectClass *gobject_klass = (GObjectClass *) (klass);
602   GstElementClass *gstelement_class = (GstElementClass *) klass;
603   GstBinClass *gstbin_class = (GstBinClass *) klass;
604
605   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
606       "Split File Demuxing Source helper");
607
608   gobject_klass->dispose = splitmux_part_reader_dispose;
609   gobject_klass->finalize = splitmux_part_reader_finalize;
610
611   part_reader_signals[SIGNAL_PREPARED] =
612       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
613       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
614           prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
615   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
616   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
617
618   gstbin_class->handle_message = bus_handler;
619 }
620
621 static void
622 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
623 {
624   GstElement *typefind;
625
626   reader->active = FALSE;
627   reader->duration = GST_CLOCK_TIME_NONE;
628
629   g_cond_init (&reader->inactive_cond);
630   g_mutex_init (&reader->lock);
631
632   /* FIXME: Create elements on a state change */
633   reader->src = gst_element_factory_make ("filesrc", NULL);
634   if (reader->src == NULL) {
635     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
636     return;
637   }
638   gst_bin_add (GST_BIN_CAST (reader), reader->src);
639
640   typefind = gst_element_factory_make ("typefind", NULL);
641   if (!typefind) {
642     GST_ERROR_OBJECT (reader,
643         "Failed to create typefind element - check your installation");
644     return;
645   }
646
647   gst_bin_add (GST_BIN_CAST (reader), typefind);
648   reader->typefind = typefind;
649
650   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
651     GST_ERROR_OBJECT (reader,
652         "Failed to link typefind element - check your installation");
653     return;
654   }
655
656   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
657       reader);
658 }
659
660 static void
661 splitmux_part_reader_dispose (GObject * object)
662 {
663   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
664
665   splitmux_part_reader_reset (reader);
666
667   G_OBJECT_CLASS (parent_class)->dispose (object);
668 }
669
670 static void
671 splitmux_part_reader_finalize (GObject * object)
672 {
673   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
674
675   g_free (reader->path);
676
677   G_OBJECT_CLASS (parent_class)->finalize (object);
678 }
679
680 static void
681 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
682 {
683   GList *cur;
684
685   SPLITMUX_PART_LOCK (reader);
686   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
687     GstPad *pad = GST_PAD_CAST (cur->data);
688     gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
689     gst_object_unref (GST_OBJECT_CAST (pad));
690   }
691
692   g_list_free (reader->pads);
693   reader->pads = NULL;
694   SPLITMUX_PART_UNLOCK (reader);
695 }
696
697 static GstSplitMuxPartPad *
698 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
699     GstPad * target)
700 {
701   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
702       "name", GST_PAD_NAME (target),
703       "direction", GST_PAD_SINK,
704       NULL);
705   pad->target = target;
706   pad->reader = reader;
707
708   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
709
710   return pad;
711 }
712
713 static void
714 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
715     GstSplitMuxPartReader * reader)
716 {
717   GstPad *out_pad = NULL;
718   GstSplitMuxPartPad *proxy_pad;
719   GstCaps *caps;
720   GstPadLinkReturn link_ret;
721
722   caps = gst_pad_get_current_caps (pad);
723
724   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
725       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
726
727   gst_caps_unref (caps);
728
729   /* Look up or create the output pad */
730   if (reader->get_pad_cb)
731     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
732   if (out_pad == NULL)
733     return;
734
735   /* Create our proxy pad to interact with this new pad */
736   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
737   GST_DEBUG_OBJECT (reader,
738       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
739       proxy_pad, out_pad);
740
741   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
742   if (link_ret != GST_PAD_LINK_OK) {
743     gst_object_unref (proxy_pad);
744     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
745         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
746             " ret %d", reader->path, pad, link_ret));
747     return;
748   }
749   GST_DEBUG_OBJECT (reader,
750       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
751       pad, proxy_pad);
752
753   SPLITMUX_PART_LOCK (reader);
754   reader->pads = g_list_prepend (reader->pads, proxy_pad);
755   SPLITMUX_PART_UNLOCK (reader);
756 }
757
758 static gboolean
759 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
760 {
761   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
762   gboolean ret = FALSE;
763   GstPad *pad = NULL;
764
765   /* Send event to the first source pad we found */
766   SPLITMUX_PART_LOCK (reader);
767   if (reader->pads) {
768     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
769     pad = gst_pad_get_peer (proxy_pad);
770   }
771   SPLITMUX_PART_UNLOCK (reader);
772
773   if (pad) {
774     ret = gst_pad_send_event (pad, event);
775     gst_object_unref (pad);
776   } else {
777     gst_event_unref (event);
778   }
779
780   return ret;
781 }
782
783 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
784 static void
785 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
786     GstClockTime time)
787 {
788   SPLITMUX_PART_UNLOCK (reader);
789   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
790       GST_TIME_ARGS (time));
791   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
792       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
793       GST_SEEK_TYPE_END, 0);
794
795   SPLITMUX_PART_LOCK (reader);
796
797   /* Wait for flush to finish, so old data is gone */
798   while (reader->flushing) {
799     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
800     SPLITMUX_PART_WAIT (reader);
801   }
802 }
803
804 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
805 static gboolean
806 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
807     GstSegment * target_seg)
808 {
809   GstSeekFlags flags;
810   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
811
812   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
813
814   SPLITMUX_PART_LOCK (reader);
815   if (target_seg->start >= reader->start_offset)
816     start = target_seg->start - reader->start_offset;
817   /* If the segment stop is within this part, don't play to the end */
818   if (target_seg->stop != -1 &&
819       target_seg->stop < reader->start_offset + reader->duration)
820     stop = target_seg->stop - reader->start_offset;
821
822   SPLITMUX_PART_UNLOCK (reader);
823
824   GST_DEBUG_OBJECT (reader,
825       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
826       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
827       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
828
829   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
830       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
831       stop);
832 }
833
834 /* Called with lock held */
835 static void
836 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
837 {
838   /* Trigger a flushing seek to near the end of the file and run each stream
839    * to EOS in order to find the smallest end timestamp to start the next
840    * file from
841    */
842   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
843       && reader->duration > GST_SECOND) {
844     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
845     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
846   }
847
848   /* Wait for things to happen */
849   while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
850     SPLITMUX_PART_WAIT (reader);
851
852   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
853     /* Fire the prepared signal and go to READY state */
854     GST_DEBUG_OBJECT (reader,
855         "Stream measuring complete. File %s is now ready. Firing prepared signal",
856         reader->path);
857     reader->prep_state = PART_STATE_READY;
858     g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
859   }
860 }
861
862 static GstElement *
863 find_demuxer (GstCaps * caps)
864 {
865   GList *factories =
866       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
867       GST_RANK_MARGINAL);
868   GList *compat_elements;
869   GstElement *e = NULL;
870
871   if (factories == NULL)
872     return NULL;
873
874   compat_elements =
875       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
876
877   if (compat_elements) {
878     /* Just take the first (highest ranked) option */
879     GstElementFactory *factory =
880         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
881     e = gst_element_factory_create (factory, NULL);
882     gst_plugin_feature_list_free (compat_elements);
883   }
884
885   if (factories)
886     gst_plugin_feature_list_free (factories);
887
888   return e;
889 }
890
891 static void
892 type_found (GstElement * typefind, guint probability,
893     GstCaps * caps, GstSplitMuxPartReader * reader)
894 {
895   GstElement *demux;
896
897   GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
898
899   /* typefind found a type. Look for the demuxer to handle it */
900   demux = reader->demux = find_demuxer (caps);
901   if (reader->demux == NULL) {
902     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
903     return;
904   }
905
906   gst_bin_add (GST_BIN_CAST (reader), demux);
907   gst_element_link_pads (reader->typefind, "src", demux, NULL);
908   gst_element_sync_state_with_parent (reader->demux);
909
910   /* Connect to demux signals */
911   g_signal_connect (demux,
912       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
913   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
914 }
915
916 static void
917 check_if_pads_collected (GstSplitMuxPartReader * reader)
918 {
919   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
920     /* Check we have all pads and each pad has seen a buffer */
921     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
922       GST_DEBUG_OBJECT (reader,
923           "no more pads - file %s. Measuring stream length", reader->path);
924       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
925       SPLITMUX_PART_BROADCAST (reader);
926     }
927   }
928 }
929
930 static void
931 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
932 {
933   GstClockTime duration = GST_CLOCK_TIME_NONE;
934   GList *cur;
935   /* Query the minimum duration of any pad in this piece and store it.
936    * FIXME: Only consider audio and video */
937   SPLITMUX_PART_LOCK (reader);
938   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
939     GstPad *target = GST_PAD_CAST (cur->data);
940     if (target) {
941       gint64 cur_duration;
942       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
943         GST_INFO_OBJECT (reader,
944             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
945             reader->path, target, GST_TIME_ARGS (cur_duration));
946         if (cur_duration < duration)
947           duration = cur_duration;
948       }
949     }
950   }
951   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
952       reader->path, GST_TIME_ARGS (duration));
953   reader->duration = (GstClockTime) duration;
954
955   reader->no_more_pads = TRUE;
956
957   check_if_pads_collected (reader);
958   SPLITMUX_PART_UNLOCK (reader);
959 }
960
961 gboolean
962 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
963     GstPad * src_pad, GstQuery * query)
964 {
965   GstPad *target = NULL;
966   gboolean ret;
967   GList *cur;
968
969   SPLITMUX_PART_LOCK (part);
970   /* Find the pad corresponding to the visible output target pad */
971   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
972     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
973     if (part_pad->target == src_pad) {
974       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
975       break;
976     }
977   }
978   SPLITMUX_PART_UNLOCK (part);
979
980   if (target == NULL)
981     return FALSE;
982
983   ret = gst_pad_peer_query (target, query);
984   gst_object_unref (GST_OBJECT_CAST (target));
985
986   if (ret == FALSE)
987     goto out;
988
989   /* Post-massaging of queries */
990   switch (GST_QUERY_TYPE (query)) {
991     case GST_QUERY_POSITION:{
992       GstFormat fmt;
993       gint64 position;
994
995       gst_query_parse_position (query, &fmt, &position);
996       if (fmt != GST_FORMAT_TIME)
997         return FALSE;
998       SPLITMUX_PART_LOCK (part);
999       position += part->start_offset;
1000       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1001           GST_TIME_ARGS (position));
1002       SPLITMUX_PART_UNLOCK (part);
1003
1004       gst_query_set_position (query, fmt, position);
1005       break;
1006     }
1007     default:
1008       break;
1009   }
1010
1011 out:
1012   gst_object_unref (target);
1013   return ret;
1014 }
1015
1016 static GstStateChangeReturn
1017 gst_splitmux_part_reader_change_state (GstElement * element,
1018     GstStateChange transition)
1019 {
1020   GstStateChangeReturn ret;
1021   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1022
1023   switch (transition) {
1024     case GST_STATE_CHANGE_NULL_TO_READY:{
1025       break;
1026     }
1027     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1028       g_object_set (reader->src, "location", reader->path, NULL);
1029       SPLITMUX_PART_LOCK (reader);
1030       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1031       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1032       reader->running = TRUE;
1033       SPLITMUX_PART_UNLOCK (reader);
1034       break;
1035     }
1036     case GST_STATE_CHANGE_READY_TO_NULL:
1037     case GST_STATE_CHANGE_PAUSED_TO_READY:
1038       SPLITMUX_PART_LOCK (reader);
1039       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1040       reader->running = FALSE;
1041       SPLITMUX_PART_BROADCAST (reader);
1042       SPLITMUX_PART_UNLOCK (reader);
1043       break;
1044     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1045       SPLITMUX_PART_LOCK (reader);
1046       reader->active = FALSE;
1047       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1048       SPLITMUX_PART_BROADCAST (reader);
1049       SPLITMUX_PART_UNLOCK (reader);
1050       break;
1051     default:
1052       break;
1053   }
1054
1055   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1056   if (ret == GST_STATE_CHANGE_FAILURE)
1057     goto beach;
1058
1059   switch (transition) {
1060     case GST_STATE_CHANGE_READY_TO_PAUSED:
1061       /* Sleep and wait until all streams have been collected, then do the seeks
1062        * to measure the stream lengths */
1063       SPLITMUX_PART_LOCK (reader);
1064
1065       while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1066         GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1067         SPLITMUX_PART_WAIT (reader);
1068       }
1069
1070       if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
1071         gst_splitmux_part_reader_measure_streams (reader);
1072       else if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY)
1073         reader->prep_state = PART_STATE_READY;
1074       else if (reader->prep_state == PART_STATE_FAILED)
1075         ret = GST_STATE_CHANGE_FAILURE;
1076       SPLITMUX_PART_UNLOCK (reader);
1077       break;
1078     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1079       SPLITMUX_PART_LOCK (reader);
1080       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1081       reader->active = TRUE;
1082       SPLITMUX_PART_BROADCAST (reader);
1083       SPLITMUX_PART_UNLOCK (reader);
1084       break;
1085     case GST_STATE_CHANGE_READY_TO_NULL:
1086       reader->prep_state = PART_STATE_NULL;
1087       splitmux_part_reader_reset (reader);
1088       break;
1089     default:
1090       break;
1091   }
1092
1093 beach:
1094   return ret;
1095 }
1096
1097 static gboolean
1098 check_bus_messages (GstSplitMuxPartReader * part)
1099 {
1100   gboolean ret = FALSE;
1101   GstBus *bus;
1102   GstMessage *m;
1103
1104   bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1105   while ((m = gst_bus_pop (bus)) != NULL) {
1106     if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1107       GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1108       gst_message_unref (m);
1109       goto done;
1110     }
1111     gst_message_unref (m);
1112   }
1113   ret = TRUE;
1114 done:
1115   gst_object_unref (bus);
1116   return ret;
1117 }
1118
1119 gboolean
1120 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1121 {
1122   GstStateChangeReturn ret;
1123
1124   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1125
1126   if (ret != GST_STATE_CHANGE_SUCCESS)
1127     return FALSE;
1128
1129   return check_bus_messages (part);
1130 }
1131
1132 void
1133 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1134 {
1135   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1136 }
1137
1138 void
1139 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1140     const gchar * path)
1141 {
1142   reader->path = g_strdup (path);
1143 }
1144
1145 gboolean
1146 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1147     GstSegment * seg)
1148 {
1149   GST_DEBUG_OBJECT (reader, "Activating part reader");
1150
1151   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
1152     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1153         seg);
1154     return FALSE;
1155   }
1156   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1157           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1158     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1159     return FALSE;
1160   }
1161   return TRUE;
1162 }
1163
1164 void
1165 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1166 {
1167   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1168   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1169 }
1170
1171 void
1172 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1173     gboolean flushing)
1174 {
1175   GList *cur;
1176
1177   GST_LOG_OBJECT (reader, "%s dataqueues",
1178       flushing ? "Flushing" : "Done flushing");
1179   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1180     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1181     gst_data_queue_set_flushing (part_pad->queue, flushing);
1182     if (flushing)
1183       gst_data_queue_flush (part_pad->queue);
1184   }
1185 };
1186
1187 void
1188 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1189     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1190 {
1191   reader->cb_data = cb_data;
1192   reader->get_pad_cb = get_pad_cb;
1193 }
1194
1195 GstClockTime
1196 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1197 {
1198   GList *cur;
1199   GstClockTime ret = GST_CLOCK_TIME_NONE;
1200
1201   SPLITMUX_PART_LOCK (reader);
1202   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1203     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1204     if (part_pad->max_ts < ret)
1205       ret = part_pad->max_ts;
1206   }
1207
1208   SPLITMUX_PART_UNLOCK (reader);
1209
1210   return ret;
1211 }
1212
1213 void
1214 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1215     GstClockTime offset)
1216 {
1217   SPLITMUX_PART_LOCK (reader);
1218   reader->start_offset = offset;
1219   GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1220       GST_TIME_ARGS (offset));
1221   SPLITMUX_PART_UNLOCK (reader);
1222 }
1223
1224 GstClockTime
1225 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1226 {
1227   GstClockTime ret = GST_CLOCK_TIME_NONE;
1228
1229   SPLITMUX_PART_LOCK (reader);
1230   ret = reader->start_offset;
1231   SPLITMUX_PART_UNLOCK (reader);
1232
1233   return ret;
1234 }
1235
1236 GstClockTime
1237 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1238 {
1239   GstClockTime dur;
1240
1241   SPLITMUX_PART_LOCK (reader);
1242   dur = reader->duration;
1243   SPLITMUX_PART_UNLOCK (reader);
1244
1245   return dur;
1246 }
1247
1248 GstPad *
1249 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1250     GstPad * target)
1251 {
1252   GstPad *result = NULL;
1253   GList *cur;
1254
1255   SPLITMUX_PART_LOCK (reader);
1256   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1257     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1258     if (part_pad->target == target) {
1259       result = (GstPad *) part_pad;
1260       break;
1261     }
1262   }
1263   SPLITMUX_PART_UNLOCK (reader);
1264
1265   return result;
1266 }
1267
1268 GstFlowReturn
1269 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1270     GstDataQueueItem ** item)
1271 {
1272   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1273   GstDataQueue *q;
1274   GstFlowReturn ret;
1275
1276   /* Get one item from the appropriate dataqueue */
1277   SPLITMUX_PART_LOCK (reader);
1278   if (reader->prep_state == PART_STATE_FAILED) {
1279     SPLITMUX_PART_UNLOCK (reader);
1280     return GST_FLOW_ERROR;
1281   }
1282
1283   q = gst_object_ref (part_pad->queue);
1284
1285   /* Have to drop the lock around pop, so we can be woken up for flush */
1286   SPLITMUX_PART_UNLOCK (reader);
1287   if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1288     ret = GST_FLOW_FLUSHING;
1289     goto out;
1290   }
1291
1292   SPLITMUX_PART_LOCK (reader);
1293
1294   SPLITMUX_PART_BROADCAST (reader);
1295   if (GST_IS_EVENT ((*item)->object)) {
1296     GstEvent *e = (GstEvent *) ((*item)->object);
1297     /* Mark this pad as EOS */
1298     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1299       part_pad->is_eos = TRUE;
1300   }
1301
1302   SPLITMUX_PART_UNLOCK (reader);
1303
1304   ret = GST_FLOW_OK;
1305 out:
1306   gst_object_unref (q);
1307   return ret;
1308 }
1309
1310 static void
1311 bus_handler (GstBin * bin, GstMessage * message)
1312 {
1313   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1314
1315   switch (GST_MESSAGE_TYPE (message)) {
1316     case GST_MESSAGE_ERROR:
1317       /* Make sure to set the state to failed and wake up the listener
1318        * on error */
1319       SPLITMUX_PART_LOCK (reader);
1320       reader->prep_state = PART_STATE_FAILED;
1321       SPLITMUX_PART_BROADCAST (reader);
1322       SPLITMUX_PART_UNLOCK (reader);
1323       break;
1324     default:
1325       break;
1326   }
1327
1328   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1329 }