splitmuxsrc: Fix extra unref handling queries
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxpartreader.c
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39
40 enum
41 {
42   SIGNAL_PREPARED,
43   LAST_SIGNAL
44 };
45
46 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
47
48 typedef struct _GstSplitMuxPartPad
49 {
50   GstPad parent;
51
52   /* Reader we belong to */
53   GstSplitMuxPartReader *reader;
54   /* Output splitmuxsrc source pad */
55   GstPad *target;
56
57   GstDataQueue *queue;
58
59   gboolean is_eos;
60   gboolean flushing;
61   gboolean seen_buffer;
62
63   GstClockTime max_ts;
64   GstSegment segment;
65
66   GstSegment orig_segment;
67   GstClockTime initial_ts_offset;
68 } GstSplitMuxPartPad;
69
70 typedef struct _GstSplitMuxPartPadClass
71 {
72   GstPadClass parent;
73 } GstSplitMuxPartPadClass;
74
75 static GType gst_splitmux_part_pad_get_type (void);
76 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
77 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
78
79 static void splitmux_part_pad_constructed (GObject * pad);
80 static void splitmux_part_pad_finalize (GObject * pad);
81 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
82     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
83
84 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
85     guint visible, guint bytes, guint64 time, gpointer checkdata);
86 static void type_found (GstElement * typefind, guint probability,
87     GstCaps * caps, GstSplitMuxPartReader * reader);
88 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
89
90 /* Called with reader lock held */
91 static gboolean
92 have_empty_queue (GstSplitMuxPartReader * reader)
93 {
94   GList *cur;
95
96   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
97     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
98     if (part_pad->is_eos) {
99       GST_LOG_OBJECT (part_pad, "Pad is EOS");
100       return TRUE;
101     }
102     if (gst_data_queue_is_empty (part_pad->queue)) {
103       GST_LOG_OBJECT (part_pad, "Queue is empty");
104       return TRUE;
105     }
106   }
107
108   return FALSE;
109 }
110
111 /* Called with reader lock held */
112 static gboolean
113 block_until_can_push (GstSplitMuxPartReader * reader)
114 {
115   while (reader->running) {
116     if (reader->flushing)
117       goto out;
118     if (reader->active && have_empty_queue (reader))
119       goto out;
120
121     GST_LOG_OBJECT (reader,
122         "Waiting for activation or empty queue on reader %s", reader->path);
123     SPLITMUX_PART_WAIT (reader);
124   }
125
126   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
127       reader->path, reader->active, reader->flushing);
128 out:
129   return reader->active && !reader->flushing;
130 }
131
132 static void
133 handle_buffer_measuring (GstSplitMuxPartReader * reader,
134     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
135 {
136   GstClockTime ts = GST_CLOCK_TIME_NONE;
137   GstClockTimeDiff offset;
138
139   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
140       !part_pad->seen_buffer) {
141     /* If this is the first buffer on the pad in the collect_streams state,
142      * then calculate inital offset based on running time of this segment */
143     part_pad->initial_ts_offset =
144         part_pad->orig_segment.start + part_pad->orig_segment.base -
145         part_pad->orig_segment.time;
146     GST_DEBUG_OBJECT (reader,
147         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
148         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
149   }
150   part_pad->seen_buffer = TRUE;
151
152   /* Adjust buffer timestamps */
153   offset = reader->start_offset + part_pad->segment.base;
154   offset -= part_pad->initial_ts_offset;
155
156   /* Update the stored max duration on the pad,
157    * always preferring making DTS contiguous
158    * where possible */
159   if (GST_BUFFER_DTS_IS_VALID (buf))
160     ts = GST_BUFFER_DTS (buf) + offset;
161   else if (GST_BUFFER_PTS_IS_VALID (buf))
162     ts = GST_BUFFER_PTS (buf) + offset;
163
164   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
165       " incoming PTS %" GST_TIME_FORMAT
166       " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
167       " to %" GST_TIME_FORMAT, part_pad,
168       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
169       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
170       GST_STIME_ARGS (offset), GST_TIME_ARGS (ts));
171
172   if (GST_CLOCK_TIME_IS_VALID (ts)) {
173     if (GST_BUFFER_DURATION_IS_VALID (buf))
174       ts += GST_BUFFER_DURATION (buf);
175
176     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
177       part_pad->max_ts = ts;
178       GST_LOG_OBJECT (reader,
179           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
180           GST_TIME_ARGS (part_pad->max_ts));
181     }
182   }
183   /* Is it time to move to measuring state yet? */
184   check_if_pads_collected (reader);
185 }
186
187 static gboolean
188 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
189     guint visible, guint bytes, guint64 time, gpointer checkdata)
190 {
191   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
192   if (time > 20 * GST_SECOND)
193     return TRUE;
194   return FALSE;
195 }
196
197 static void
198 splitmux_part_free_queue_item (GstDataQueueItem * item)
199 {
200   gst_mini_object_unref (item->object);
201   g_slice_free (GstDataQueueItem, item);
202 }
203
204 static GstFlowReturn
205 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
206 {
207   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
208   GstSplitMuxPartReader *reader = part_pad->reader;
209   GstDataQueueItem *item;
210   GstClockTimeDiff offset;
211
212   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
213   SPLITMUX_PART_LOCK (reader);
214
215   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
216       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
217     handle_buffer_measuring (reader, part_pad, buf);
218     gst_buffer_unref (buf);
219     SPLITMUX_PART_UNLOCK (reader);
220     return GST_FLOW_OK;
221   }
222
223   if (!block_until_can_push (reader)) {
224     /* Flushing */
225     SPLITMUX_PART_UNLOCK (reader);
226     gst_buffer_unref (buf);
227     return GST_FLOW_FLUSHING;
228   }
229
230   /* Adjust buffer timestamps */
231   offset = reader->start_offset + part_pad->segment.base;
232   offset -= part_pad->initial_ts_offset;
233
234   if (GST_BUFFER_PTS_IS_VALID (buf))
235     GST_BUFFER_PTS (buf) += offset;
236   if (GST_BUFFER_DTS_IS_VALID (buf))
237     GST_BUFFER_DTS (buf) += offset;
238
239   /* We are active, and one queue is empty, place this buffer in
240    * the dataqueue */
241   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
242   item = g_slice_new (GstDataQueueItem);
243   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
244   item->object = GST_MINI_OBJECT (buf);
245   item->size = gst_buffer_get_size (buf);
246   item->duration = GST_BUFFER_DURATION (buf);
247   if (item->duration == GST_CLOCK_TIME_NONE)
248     item->duration = 0;
249   item->visible = TRUE;
250
251   gst_object_ref (part_pad);
252
253   SPLITMUX_PART_UNLOCK (reader);
254
255   if (!gst_data_queue_push (part_pad->queue, item)) {
256     splitmux_part_free_queue_item (item);
257     gst_object_unref (part_pad);
258     return GST_FLOW_FLUSHING;
259   }
260
261   gst_object_unref (part_pad);
262   return GST_FLOW_OK;
263 }
264
265 /* Called with splitmux part lock held */
266 static gboolean
267 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
268 {
269   GList *cur;
270   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
271     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
272     if (!part_pad->is_eos)
273       return FALSE;
274   }
275
276   return TRUE;
277 }
278
279 static gboolean
280 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
281 {
282   GList *cur;
283   GST_LOG_OBJECT (part, "Checking for preroll");
284   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
285     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
286     if (!part_pad->seen_buffer) {
287       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
288           part_pad);
289       return FALSE;
290     }
291   }
292   GST_LOG_OBJECT (part, "Part is prerolled");
293   return TRUE;
294 }
295
296
297 gboolean
298 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
299 {
300   gboolean res;
301
302   SPLITMUX_PART_LOCK (reader);
303   res = splitmux_part_is_eos_locked (reader);
304   SPLITMUX_PART_UNLOCK (reader);
305
306   return res;
307 }
308
309 /* Called with splitmux part lock held */
310 static gboolean
311 splitmux_is_flushing (GstSplitMuxPartReader * reader)
312 {
313   GList *cur;
314   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
315     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
316     if (part_pad->flushing)
317       return TRUE;
318   }
319
320   return FALSE;
321 }
322
323 static gboolean
324 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
325 {
326   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
327   GstSplitMuxPartReader *reader = part_pad->reader;
328   gboolean ret = TRUE;
329   SplitMuxSrcPad *target;
330   GstDataQueueItem *item;
331
332   SPLITMUX_PART_LOCK (reader);
333
334   target = gst_object_ref (part_pad->target);
335
336   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
337       event);
338
339   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
340     goto drop_event;
341
342   switch (GST_EVENT_TYPE (event)) {
343     case GST_EVENT_SEGMENT:{
344       GstSegment *seg = &part_pad->segment;
345
346       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
347
348       gst_event_copy_segment (event, seg);
349       gst_event_copy_segment (event, &part_pad->orig_segment);
350
351       if (seg->format != GST_FORMAT_TIME)
352         goto wrong_segment;
353
354       /* Adjust segment */
355       /* Adjust start/stop so the overall file is 0 + start_offset based */
356       if (seg->stop != -1) {
357         seg->stop -= seg->start;
358         seg->stop += seg->time + reader->start_offset;
359       }
360       seg->start = seg->time + reader->start_offset;
361       seg->time += reader->start_offset;
362       seg->position += reader->start_offset;
363
364       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
365
366       /* Replace event */
367       gst_event_unref (event);
368       event = gst_event_new_segment (seg);
369
370       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
371           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
372         break;                  /* Only do further stuff with segments during initial measuring */
373
374       /* Take the first segment from the first part */
375       if (target->segment.format == GST_FORMAT_UNDEFINED) {
376         gst_segment_copy_into (seg, &target->segment);
377         GST_DEBUG_OBJECT (reader,
378             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
379       }
380
381       if (seg->stop != -1 && target->segment.stop != -1) {
382         GstClockTime stop = seg->base + seg->stop;
383         if (stop > target->segment.stop) {
384           target->segment.stop = stop;
385           GST_DEBUG_OBJECT (reader,
386               "Adjusting segment stop by %" GST_TIME_FORMAT
387               " output now %" GST_SEGMENT_FORMAT,
388               GST_TIME_ARGS (reader->start_offset), &target->segment);
389         }
390       }
391       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
392       break;
393     }
394     case GST_EVENT_EOS:{
395
396       GST_DEBUG_OBJECT (part_pad,
397           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
398           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
399
400       if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
401           reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
402         /* Mark this pad as EOS */
403         part_pad->is_eos = TRUE;
404         if (splitmux_part_is_eos_locked (reader)) {
405           /* Finished measuring things, set state and tell the state change func
406            * so it can seek back to the start */
407           GST_LOG_OBJECT (reader,
408               "EOS while measuring streams. Resetting for ready");
409           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
410           SPLITMUX_PART_BROADCAST (reader);
411         }
412         goto drop_event;
413       }
414       break;
415     }
416     case GST_EVENT_FLUSH_START:
417       reader->flushing = TRUE;
418       part_pad->flushing = TRUE;
419       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
420           part_pad);
421       gst_data_queue_set_flushing (part_pad->queue, TRUE);
422       SPLITMUX_PART_BROADCAST (reader);
423       break;
424     case GST_EVENT_FLUSH_STOP:{
425       gst_data_queue_set_flushing (part_pad->queue, FALSE);
426       gst_data_queue_flush (part_pad->queue);
427       part_pad->seen_buffer = FALSE;
428       part_pad->flushing = FALSE;
429       part_pad->is_eos = FALSE;
430
431       reader->flushing = splitmux_is_flushing (reader);
432       GST_LOG_OBJECT (reader,
433           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
434           reader->path, pad, reader->flushing);
435       SPLITMUX_PART_BROADCAST (reader);
436       break;
437     }
438     default:
439       break;
440   }
441
442   /* Don't send events downstream while preparing */
443   if (reader->prep_state != PART_STATE_READY)
444     goto drop_event;
445
446   /* Don't pass flush events - those are done by the parent */
447   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
448       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
449     goto drop_event;
450
451   if (!block_until_can_push (reader)) {
452     SPLITMUX_PART_UNLOCK (reader);
453     gst_object_unref (target);
454     gst_event_unref (event);
455     return FALSE;
456   }
457
458   switch (GST_EVENT_TYPE (event)) {
459     case GST_EVENT_GAP:{
460       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
461       goto drop_event;
462     }
463     default:
464       break;
465   }
466
467   /* We are active, and one queue is empty, place this buffer in
468    * the dataqueue */
469   gst_object_ref (part_pad->queue);
470   SPLITMUX_PART_UNLOCK (reader);
471
472   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
473   item = g_slice_new (GstDataQueueItem);
474   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
475   item->object = GST_MINI_OBJECT (event);
476   item->size = 0;
477   item->duration = 0;
478   if (item->duration == GST_CLOCK_TIME_NONE)
479     item->duration = 0;
480   item->visible = FALSE;
481
482   if (!gst_data_queue_push (part_pad->queue, item)) {
483     splitmux_part_free_queue_item (item);
484     ret = FALSE;
485   }
486
487   gst_object_unref (part_pad->queue);
488   gst_object_unref (target);
489
490   return ret;
491 wrong_segment:
492   gst_event_unref (event);
493   gst_object_unref (target);
494   SPLITMUX_PART_UNLOCK (reader);
495   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
496       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
497           reader->path, pad));
498   return FALSE;
499 drop_event:
500   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
501       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
502   gst_event_unref (event);
503   gst_object_unref (target);
504   SPLITMUX_PART_UNLOCK (reader);
505   return TRUE;
506 }
507
508 static gboolean
509 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
510 {
511   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
512   GstSplitMuxPartReader *reader = part_pad->reader;
513   GstPad *target;
514   gboolean ret = FALSE;
515   gboolean active;
516
517   SPLITMUX_PART_LOCK (reader);
518   target = gst_object_ref (part_pad->target);
519   active = reader->active;
520   SPLITMUX_PART_UNLOCK (reader);
521
522   if (active) {
523     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
524         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
525
526     ret = gst_pad_query (target, query);
527   }
528
529   gst_object_unref (target);
530
531   return ret;
532 }
533
534 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
535
536 static void
537 splitmux_part_pad_constructed (GObject * pad)
538 {
539   gst_pad_set_chain_function (GST_PAD (pad),
540       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
541   gst_pad_set_event_function (GST_PAD (pad),
542       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
543   gst_pad_set_query_function (GST_PAD (pad),
544       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
545
546   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
547 }
548
549 static void
550 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
551 {
552   GObjectClass *gobject_klass = (GObjectClass *) (klass);
553
554   gobject_klass->constructed = splitmux_part_pad_constructed;
555   gobject_klass->finalize = splitmux_part_pad_finalize;
556 }
557
558 static void
559 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
560 {
561   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
562       NULL, NULL, pad);
563   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
564   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
565 }
566
567 static void
568 splitmux_part_pad_finalize (GObject * obj)
569 {
570   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
571
572   GST_DEBUG_OBJECT (obj, "finalize");
573   gst_data_queue_set_flushing (pad->queue, TRUE);
574   gst_data_queue_flush (pad->queue);
575   gst_object_unref (GST_OBJECT_CAST (pad->queue));
576   pad->queue = NULL;
577
578   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
579 }
580
581 static void
582 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
583     GstSplitMuxPartReader * part);
584 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
585 static GstStateChangeReturn
586 gst_splitmux_part_reader_change_state (GstElement * element,
587     GstStateChange transition);
588 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
589     GstEvent * event);
590 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
591     * part, gboolean flushing);
592 static void bus_handler (GstBin * bin, GstMessage * msg);
593 static void splitmux_part_reader_dispose (GObject * object);
594 static void splitmux_part_reader_finalize (GObject * object);
595 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
596
597 #define gst_splitmux_part_reader_parent_class parent_class
598 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
599     GST_TYPE_PIPELINE);
600
601 static void
602 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
603 {
604   GObjectClass *gobject_klass = (GObjectClass *) (klass);
605   GstElementClass *gstelement_class = (GstElementClass *) klass;
606   GstBinClass *gstbin_class = (GstBinClass *) klass;
607
608   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
609       "Split File Demuxing Source helper");
610
611   gobject_klass->dispose = splitmux_part_reader_dispose;
612   gobject_klass->finalize = splitmux_part_reader_finalize;
613
614   part_reader_signals[SIGNAL_PREPARED] =
615       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
616       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
617           prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
618   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
619   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
620
621   gstbin_class->handle_message = bus_handler;
622 }
623
624 static void
625 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
626 {
627   GstElement *typefind;
628
629   reader->active = FALSE;
630   reader->duration = GST_CLOCK_TIME_NONE;
631
632   g_cond_init (&reader->inactive_cond);
633   g_mutex_init (&reader->lock);
634   g_mutex_init (&reader->type_lock);
635
636   /* FIXME: Create elements on a state change */
637   reader->src = gst_element_factory_make ("filesrc", NULL);
638   if (reader->src == NULL) {
639     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
640     return;
641   }
642   gst_bin_add (GST_BIN_CAST (reader), reader->src);
643
644   typefind = gst_element_factory_make ("typefind", NULL);
645   if (!typefind) {
646     GST_ERROR_OBJECT (reader,
647         "Failed to create typefind element - check your installation");
648     return;
649   }
650
651   gst_bin_add (GST_BIN_CAST (reader), typefind);
652   reader->typefind = typefind;
653
654   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
655     GST_ERROR_OBJECT (reader,
656         "Failed to link typefind element - check your installation");
657     return;
658   }
659
660   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
661       reader);
662 }
663
664 static void
665 splitmux_part_reader_dispose (GObject * object)
666 {
667   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
668
669   splitmux_part_reader_reset (reader);
670
671   G_OBJECT_CLASS (parent_class)->dispose (object);
672 }
673
674 static void
675 splitmux_part_reader_finalize (GObject * object)
676 {
677   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
678
679   g_cond_clear (&reader->inactive_cond);
680   g_mutex_clear (&reader->lock);
681   g_mutex_clear (&reader->type_lock);
682
683   g_free (reader->path);
684
685   G_OBJECT_CLASS (parent_class)->finalize (object);
686 }
687
688 static void
689 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
690 {
691   GList *cur;
692
693   SPLITMUX_PART_LOCK (reader);
694   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
695     GstPad *pad = GST_PAD_CAST (cur->data);
696     gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
697     gst_object_unref (GST_OBJECT_CAST (pad));
698   }
699
700   g_list_free (reader->pads);
701   reader->pads = NULL;
702   SPLITMUX_PART_UNLOCK (reader);
703 }
704
705 static GstSplitMuxPartPad *
706 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
707     GstPad * target)
708 {
709   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
710       "name", GST_PAD_NAME (target),
711       "direction", GST_PAD_SINK,
712       NULL);
713   pad->target = target;
714   pad->reader = reader;
715
716   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
717
718   return pad;
719 }
720
721 static void
722 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
723     GstSplitMuxPartReader * reader)
724 {
725   GstPad *out_pad = NULL;
726   GstSplitMuxPartPad *proxy_pad;
727   GstCaps *caps;
728   GstPadLinkReturn link_ret;
729
730   caps = gst_pad_get_current_caps (pad);
731
732   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
733       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
734
735   gst_caps_unref (caps);
736
737   /* Look up or create the output pad */
738   if (reader->get_pad_cb)
739     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
740   if (out_pad == NULL) {
741     GST_DEBUG_OBJECT (reader,
742         "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
743     return;
744   }
745
746   /* Create our proxy pad to interact with this new pad */
747   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
748   GST_DEBUG_OBJECT (reader,
749       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
750       proxy_pad, out_pad);
751
752   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
753   if (link_ret != GST_PAD_LINK_OK) {
754     gst_object_unref (proxy_pad);
755     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
756         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
757             " ret %d", reader->path, pad, link_ret));
758     return;
759   }
760   GST_DEBUG_OBJECT (reader,
761       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
762       pad, proxy_pad);
763
764   SPLITMUX_PART_LOCK (reader);
765   reader->pads = g_list_prepend (reader->pads, proxy_pad);
766   SPLITMUX_PART_UNLOCK (reader);
767 }
768
769 static gboolean
770 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
771 {
772   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
773   gboolean ret = FALSE;
774   GstPad *pad = NULL;
775
776   /* Send event to the first source pad we found */
777   SPLITMUX_PART_LOCK (reader);
778   if (reader->pads) {
779     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
780     pad = gst_pad_get_peer (proxy_pad);
781   }
782   SPLITMUX_PART_UNLOCK (reader);
783
784   if (pad) {
785     ret = gst_pad_send_event (pad, event);
786     gst_object_unref (pad);
787   } else {
788     gst_event_unref (event);
789   }
790
791   return ret;
792 }
793
794 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
795 static void
796 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
797     GstClockTime time)
798 {
799   SPLITMUX_PART_UNLOCK (reader);
800   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
801       GST_TIME_ARGS (time));
802   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
803       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
804       GST_SEEK_TYPE_END, 0);
805
806   SPLITMUX_PART_LOCK (reader);
807
808   /* Wait for flush to finish, so old data is gone */
809   while (reader->flushing) {
810     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
811     SPLITMUX_PART_WAIT (reader);
812   }
813 }
814
815 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
816 static gboolean
817 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
818     GstSegment * target_seg)
819 {
820   GstSeekFlags flags;
821   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
822
823   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
824
825   SPLITMUX_PART_LOCK (reader);
826   if (target_seg->start >= reader->start_offset)
827     start = target_seg->start - reader->start_offset;
828   /* If the segment stop is within this part, don't play to the end */
829   if (target_seg->stop != -1 &&
830       target_seg->stop < reader->start_offset + reader->duration)
831     stop = target_seg->stop - reader->start_offset;
832
833   SPLITMUX_PART_UNLOCK (reader);
834
835   GST_DEBUG_OBJECT (reader,
836       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
837       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
838       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
839
840   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
841       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
842       stop);
843 }
844
845 /* Called with lock held */
846 static void
847 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
848 {
849   /* Trigger a flushing seek to near the end of the file and run each stream
850    * to EOS in order to find the smallest end timestamp to start the next
851    * file from
852    */
853   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
854       && reader->duration > GST_SECOND) {
855     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
856     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
857   }
858
859   /* Wait for things to happen */
860   while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
861     SPLITMUX_PART_WAIT (reader);
862
863   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
864     /* Fire the prepared signal and go to READY state */
865     GST_DEBUG_OBJECT (reader,
866         "Stream measuring complete. File %s is now ready. Firing prepared signal",
867         reader->path);
868     reader->prep_state = PART_STATE_READY;
869     g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
870   }
871 }
872
873 static GstElement *
874 find_demuxer (GstCaps * caps)
875 {
876   GList *factories =
877       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
878       GST_RANK_MARGINAL);
879   GList *compat_elements;
880   GstElement *e = NULL;
881
882   if (factories == NULL)
883     return NULL;
884
885   compat_elements =
886       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
887
888   if (compat_elements) {
889     /* Just take the first (highest ranked) option */
890     GstElementFactory *factory =
891         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
892     e = gst_element_factory_create (factory, NULL);
893     gst_plugin_feature_list_free (compat_elements);
894   }
895
896   if (factories)
897     gst_plugin_feature_list_free (factories);
898
899   return e;
900 }
901
902 static void
903 type_found (GstElement * typefind, guint probability,
904     GstCaps * caps, GstSplitMuxPartReader * reader)
905 {
906   GstElement *demux;
907
908   GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
909
910   /* typefind found a type. Look for the demuxer to handle it */
911   demux = reader->demux = find_demuxer (caps);
912   if (reader->demux == NULL) {
913     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
914     return;
915   }
916
917   /* Connect to demux signals */
918   g_signal_connect (demux,
919       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
920   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
921
922   gst_element_set_locked_state (demux, TRUE);
923   gst_bin_add (GST_BIN_CAST (reader), demux);
924   gst_element_link_pads (reader->typefind, "src", demux, NULL);
925   gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
926   gst_element_set_locked_state (demux, FALSE);
927 }
928
929 static void
930 check_if_pads_collected (GstSplitMuxPartReader * reader)
931 {
932   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
933     /* Check we have all pads and each pad has seen a buffer */
934     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
935       GST_DEBUG_OBJECT (reader,
936           "no more pads - file %s. Measuring stream length", reader->path);
937       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
938       SPLITMUX_PART_BROADCAST (reader);
939     }
940   }
941 }
942
943 static void
944 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
945 {
946   GstClockTime duration = GST_CLOCK_TIME_NONE;
947   GList *cur;
948   /* Query the minimum duration of any pad in this piece and store it.
949    * FIXME: Only consider audio and video */
950   SPLITMUX_PART_LOCK (reader);
951   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
952     GstPad *target = GST_PAD_CAST (cur->data);
953     if (target) {
954       gint64 cur_duration;
955       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
956         GST_INFO_OBJECT (reader,
957             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
958             reader->path, target, GST_TIME_ARGS (cur_duration));
959         if (cur_duration < duration)
960           duration = cur_duration;
961       }
962     }
963   }
964   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
965       reader->path, GST_TIME_ARGS (duration));
966   reader->duration = (GstClockTime) duration;
967
968   reader->no_more_pads = TRUE;
969
970   check_if_pads_collected (reader);
971   SPLITMUX_PART_UNLOCK (reader);
972 }
973
974 gboolean
975 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
976     GstPad * src_pad, GstQuery * query)
977 {
978   GstPad *target = NULL;
979   gboolean ret;
980   GList *cur;
981
982   SPLITMUX_PART_LOCK (part);
983   /* Find the pad corresponding to the visible output target pad */
984   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
985     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
986     if (part_pad->target == src_pad) {
987       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
988       break;
989     }
990   }
991   SPLITMUX_PART_UNLOCK (part);
992
993   if (target == NULL)
994     return FALSE;
995
996   ret = gst_pad_peer_query (target, query);
997
998   if (ret == FALSE)
999     goto out;
1000
1001   /* Post-massaging of queries */
1002   switch (GST_QUERY_TYPE (query)) {
1003     case GST_QUERY_POSITION:{
1004       GstFormat fmt;
1005       gint64 position;
1006
1007       gst_query_parse_position (query, &fmt, &position);
1008       if (fmt != GST_FORMAT_TIME)
1009         return FALSE;
1010       SPLITMUX_PART_LOCK (part);
1011       position += part->start_offset;
1012       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1013           GST_TIME_ARGS (position));
1014       SPLITMUX_PART_UNLOCK (part);
1015
1016       gst_query_set_position (query, fmt, position);
1017       break;
1018     }
1019     default:
1020       break;
1021   }
1022
1023 out:
1024   gst_object_unref (target);
1025   return ret;
1026 }
1027
1028 static GstStateChangeReturn
1029 gst_splitmux_part_reader_change_state (GstElement * element,
1030     GstStateChange transition)
1031 {
1032   GstStateChangeReturn ret;
1033   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1034
1035   switch (transition) {
1036     case GST_STATE_CHANGE_NULL_TO_READY:{
1037       break;
1038     }
1039     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1040       /* Hold the splitmux type lock until after the
1041        * parent state change function has finished
1042        * changing the states of things, and type finding can continue */
1043       SPLITMUX_PART_LOCK (reader);
1044       g_object_set (reader->src, "location", reader->path, NULL);
1045       SPLITMUX_PART_UNLOCK (reader);
1046       SPLITMUX_PART_TYPE_LOCK (reader);
1047       break;
1048     }
1049     case GST_STATE_CHANGE_READY_TO_NULL:
1050     case GST_STATE_CHANGE_PAUSED_TO_READY:
1051       SPLITMUX_PART_LOCK (reader);
1052       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1053       reader->running = FALSE;
1054       SPLITMUX_PART_BROADCAST (reader);
1055       SPLITMUX_PART_UNLOCK (reader);
1056       break;
1057     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1058       SPLITMUX_PART_LOCK (reader);
1059       reader->active = FALSE;
1060       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1061       SPLITMUX_PART_BROADCAST (reader);
1062       SPLITMUX_PART_UNLOCK (reader);
1063       break;
1064     default:
1065       break;
1066   }
1067
1068   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1069   if (ret == GST_STATE_CHANGE_FAILURE) {
1070     if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
1071       /* Make sure to release the lock we took above */
1072       SPLITMUX_PART_TYPE_UNLOCK (reader);
1073     }
1074     goto beach;
1075   }
1076
1077   switch (transition) {
1078     case GST_STATE_CHANGE_READY_TO_PAUSED:
1079       /* Sleep and wait until all streams have been collected, then do the seeks
1080        * to measure the stream lengths. This took the type lock above,
1081        * but it's OK to release it now and let typefinding happen... */
1082       SPLITMUX_PART_TYPE_UNLOCK (reader);
1083
1084       SPLITMUX_PART_LOCK (reader);
1085       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1086       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1087       reader->running = TRUE;
1088
1089       while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1090         GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1091         SPLITMUX_PART_WAIT (reader);
1092       }
1093
1094       if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
1095           reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
1096         gst_splitmux_part_reader_measure_streams (reader);
1097       } else if (reader->prep_state == PART_STATE_FAILED)
1098         ret = GST_STATE_CHANGE_FAILURE;
1099       SPLITMUX_PART_UNLOCK (reader);
1100       break;
1101     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1102       SPLITMUX_PART_LOCK (reader);
1103       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1104       reader->active = TRUE;
1105       SPLITMUX_PART_BROADCAST (reader);
1106       SPLITMUX_PART_UNLOCK (reader);
1107       break;
1108     case GST_STATE_CHANGE_READY_TO_NULL:
1109       reader->prep_state = PART_STATE_NULL;
1110       splitmux_part_reader_reset (reader);
1111       break;
1112     default:
1113       break;
1114   }
1115
1116 beach:
1117   return ret;
1118 }
1119
1120 static gboolean
1121 check_bus_messages (GstSplitMuxPartReader * part)
1122 {
1123   gboolean ret = FALSE;
1124   GstBus *bus;
1125   GstMessage *m;
1126
1127   bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1128   while ((m = gst_bus_pop (bus)) != NULL) {
1129     if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1130       GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1131       gst_message_unref (m);
1132       goto done;
1133     }
1134     gst_message_unref (m);
1135   }
1136   ret = TRUE;
1137 done:
1138   gst_object_unref (bus);
1139   return ret;
1140 }
1141
1142 gboolean
1143 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1144 {
1145   GstStateChangeReturn ret;
1146
1147   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1148
1149   if (ret != GST_STATE_CHANGE_SUCCESS)
1150     return FALSE;
1151
1152   return check_bus_messages (part);
1153 }
1154
1155 void
1156 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1157 {
1158   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1159 }
1160
1161 void
1162 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1163     const gchar * path)
1164 {
1165   reader->path = g_strdup (path);
1166 }
1167
1168 gboolean
1169 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1170     GstSegment * seg)
1171 {
1172   GST_DEBUG_OBJECT (reader, "Activating part reader");
1173
1174   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
1175     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1176         seg);
1177     return FALSE;
1178   }
1179   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1180           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1181     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1182     return FALSE;
1183   }
1184   return TRUE;
1185 }
1186
1187 gboolean
1188 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1189 {
1190   gboolean ret;
1191
1192   SPLITMUX_PART_LOCK (part);
1193   ret = part->active;
1194   SPLITMUX_PART_UNLOCK (part);
1195
1196   return ret;
1197 }
1198
1199 void
1200 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1201 {
1202   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1203   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1204 }
1205
1206 void
1207 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1208     gboolean flushing)
1209 {
1210   GList *cur;
1211
1212   GST_LOG_OBJECT (reader, "%s dataqueues",
1213       flushing ? "Flushing" : "Done flushing");
1214   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1215     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1216     gst_data_queue_set_flushing (part_pad->queue, flushing);
1217     if (flushing)
1218       gst_data_queue_flush (part_pad->queue);
1219   }
1220 };
1221
1222 void
1223 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1224     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1225 {
1226   reader->cb_data = cb_data;
1227   reader->get_pad_cb = get_pad_cb;
1228 }
1229
1230 GstClockTime
1231 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1232 {
1233   GList *cur;
1234   GstClockTime ret = GST_CLOCK_TIME_NONE;
1235
1236   SPLITMUX_PART_LOCK (reader);
1237   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1238     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1239     if (part_pad->max_ts < ret)
1240       ret = part_pad->max_ts;
1241   }
1242
1243   SPLITMUX_PART_UNLOCK (reader);
1244
1245   return ret;
1246 }
1247
1248 void
1249 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1250     GstClockTime offset)
1251 {
1252   SPLITMUX_PART_LOCK (reader);
1253   reader->start_offset = offset;
1254   GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1255       GST_TIME_ARGS (offset));
1256   SPLITMUX_PART_UNLOCK (reader);
1257 }
1258
1259 GstClockTime
1260 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1261 {
1262   GstClockTime ret = GST_CLOCK_TIME_NONE;
1263
1264   SPLITMUX_PART_LOCK (reader);
1265   ret = reader->start_offset;
1266   SPLITMUX_PART_UNLOCK (reader);
1267
1268   return ret;
1269 }
1270
1271 GstClockTime
1272 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1273 {
1274   GstClockTime dur;
1275
1276   SPLITMUX_PART_LOCK (reader);
1277   dur = reader->duration;
1278   SPLITMUX_PART_UNLOCK (reader);
1279
1280   return dur;
1281 }
1282
1283 GstPad *
1284 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1285     GstPad * target)
1286 {
1287   GstPad *result = NULL;
1288   GList *cur;
1289
1290   SPLITMUX_PART_LOCK (reader);
1291   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1292     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1293     if (part_pad->target == target) {
1294       result = (GstPad *) gst_object_ref (part_pad);
1295       break;
1296     }
1297   }
1298   SPLITMUX_PART_UNLOCK (reader);
1299
1300   return result;
1301 }
1302
1303 GstFlowReturn
1304 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1305     GstDataQueueItem ** item)
1306 {
1307   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1308   GstDataQueue *q;
1309   GstFlowReturn ret;
1310
1311   /* Get one item from the appropriate dataqueue */
1312   SPLITMUX_PART_LOCK (reader);
1313   if (reader->prep_state == PART_STATE_FAILED) {
1314     SPLITMUX_PART_UNLOCK (reader);
1315     return GST_FLOW_ERROR;
1316   }
1317
1318   q = gst_object_ref (part_pad->queue);
1319
1320   /* Have to drop the lock around pop, so we can be woken up for flush */
1321   SPLITMUX_PART_UNLOCK (reader);
1322   if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1323     ret = GST_FLOW_FLUSHING;
1324     goto out;
1325   }
1326
1327   SPLITMUX_PART_LOCK (reader);
1328
1329   SPLITMUX_PART_BROADCAST (reader);
1330   if (GST_IS_EVENT ((*item)->object)) {
1331     GstEvent *e = (GstEvent *) ((*item)->object);
1332     /* Mark this pad as EOS */
1333     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1334       part_pad->is_eos = TRUE;
1335   }
1336
1337   SPLITMUX_PART_UNLOCK (reader);
1338
1339   ret = GST_FLOW_OK;
1340 out:
1341   gst_object_unref (q);
1342   return ret;
1343 }
1344
1345 static void
1346 bus_handler (GstBin * bin, GstMessage * message)
1347 {
1348   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1349
1350   switch (GST_MESSAGE_TYPE (message)) {
1351     case GST_MESSAGE_ERROR:
1352       /* Make sure to set the state to failed and wake up the listener
1353        * on error */
1354       SPLITMUX_PART_LOCK (reader);
1355       reader->prep_state = PART_STATE_FAILED;
1356       SPLITMUX_PART_BROADCAST (reader);
1357       SPLITMUX_PART_UNLOCK (reader);
1358       break;
1359     default:
1360       break;
1361   }
1362
1363   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1364 }