splitmuxsink: Use GstBin async-handling instead of our own.
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxpartreader.c
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39
40 enum
41 {
42   SIGNAL_PREPARED,
43   LAST_SIGNAL
44 };
45
46 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
47
48 typedef struct _GstSplitMuxPartPad
49 {
50   GstPad parent;
51
52   /* Reader we belong to */
53   GstSplitMuxPartReader *reader;
54   /* Output splitmuxsrc source pad */
55   GstPad *target;
56
57   GstDataQueue *queue;
58
59   gboolean is_eos;
60   gboolean flushing;
61   gboolean seen_buffer;
62
63   GstClockTime max_ts;
64   GstSegment segment;
65
66   GstSegment orig_segment;
67   GstClockTime initial_ts_offset;
68 } GstSplitMuxPartPad;
69
70 typedef struct _GstSplitMuxPartPadClass
71 {
72   GstPadClass parent;
73 } GstSplitMuxPartPadClass;
74
75 static GType gst_splitmux_part_pad_get_type (void);
76 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
77 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
78
79 static void splitmux_part_pad_constructed (GObject * pad);
80 static void splitmux_part_pad_finalize (GObject * pad);
81 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
82     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
83
84 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
85     guint visible, guint bytes, guint64 time, gpointer checkdata);
86 static void type_found (GstElement * typefind, guint probability,
87     GstCaps * caps, GstSplitMuxPartReader * reader);
88 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
89
90 /* Called with reader lock held */
91 static gboolean
92 have_empty_queue (GstSplitMuxPartReader * reader)
93 {
94   GList *cur;
95
96   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
97     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
98     if (part_pad->is_eos) {
99       GST_LOG_OBJECT (part_pad, "Pad is EOS");
100       return TRUE;
101     }
102     if (gst_data_queue_is_empty (part_pad->queue)) {
103       GST_LOG_OBJECT (part_pad, "Queue is empty");
104       return TRUE;
105     }
106   }
107
108   return FALSE;
109 }
110
111 /* Called with reader lock held */
112 static gboolean
113 block_until_can_push (GstSplitMuxPartReader * reader)
114 {
115   while (reader->running) {
116     if (reader->flushing)
117       goto out;
118     if (reader->active && have_empty_queue (reader))
119       goto out;
120
121     GST_LOG_OBJECT (reader,
122         "Waiting for activation or empty queue on reader %s", reader->path);
123     SPLITMUX_PART_WAIT (reader);
124   }
125
126   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
127       reader->path, reader->active, reader->flushing);
128 out:
129   return reader->active && !reader->flushing;
130 }
131
132 static void
133 handle_buffer_measuring (GstSplitMuxPartReader * reader,
134     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
135 {
136   GstClockTime ts = GST_CLOCK_TIME_NONE;
137   GstClockTimeDiff offset;
138
139   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
140       !part_pad->seen_buffer) {
141     /* If this is the first buffer on the pad in the collect_streams state,
142      * then calculate inital offset based on running time of this segment */
143     part_pad->initial_ts_offset =
144         part_pad->orig_segment.start + part_pad->orig_segment.base -
145         part_pad->orig_segment.time;
146     GST_DEBUG_OBJECT (reader,
147         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
148         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
149   }
150   part_pad->seen_buffer = TRUE;
151
152   /* Adjust buffer timestamps */
153   offset = reader->start_offset + part_pad->segment.base;
154   offset -= part_pad->initial_ts_offset;
155
156   /* Update the stored max duration on the pad,
157    * always preferring making DTS contiguous
158    * where possible */
159   if (GST_BUFFER_DTS_IS_VALID (buf))
160     ts = GST_BUFFER_DTS (buf) + offset;
161   else if (GST_BUFFER_PTS_IS_VALID (buf))
162     ts = GST_BUFFER_PTS (buf) + offset;
163
164   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
165       " incoming PTS %" GST_TIME_FORMAT
166       " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
167       " to %" GST_TIME_FORMAT, part_pad,
168       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
169       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
170       GST_STIME_ARGS (offset), GST_TIME_ARGS (ts));
171
172   if (GST_CLOCK_TIME_IS_VALID (ts)) {
173     if (GST_BUFFER_DURATION_IS_VALID (buf))
174       ts += GST_BUFFER_DURATION (buf);
175
176     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
177       part_pad->max_ts = ts;
178       GST_LOG_OBJECT (reader,
179           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
180           GST_TIME_ARGS (part_pad->max_ts));
181     }
182   }
183   /* Is it time to move to measuring state yet? */
184   check_if_pads_collected (reader);
185 }
186
187 static gboolean
188 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
189     guint visible, guint bytes, guint64 time, gpointer checkdata)
190 {
191   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
192   if (time > 20 * GST_SECOND)
193     return TRUE;
194   return FALSE;
195 }
196
197 static void
198 splitmux_part_free_queue_item (GstDataQueueItem * item)
199 {
200   gst_mini_object_unref (item->object);
201   g_slice_free (GstDataQueueItem, item);
202 }
203
204 static GstFlowReturn
205 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
206 {
207   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
208   GstSplitMuxPartReader *reader = part_pad->reader;
209   GstDataQueueItem *item;
210   GstClockTimeDiff offset;
211
212   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
213   SPLITMUX_PART_LOCK (reader);
214
215   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
216       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
217     handle_buffer_measuring (reader, part_pad, buf);
218     gst_buffer_unref (buf);
219     SPLITMUX_PART_UNLOCK (reader);
220     return GST_FLOW_OK;
221   }
222
223   if (!block_until_can_push (reader)) {
224     /* Flushing */
225     SPLITMUX_PART_UNLOCK (reader);
226     gst_buffer_unref (buf);
227     return GST_FLOW_FLUSHING;
228   }
229
230   /* Adjust buffer timestamps */
231   offset = reader->start_offset + part_pad->segment.base;
232   offset -= part_pad->initial_ts_offset;
233
234   if (GST_BUFFER_PTS_IS_VALID (buf))
235     GST_BUFFER_PTS (buf) += offset;
236   if (GST_BUFFER_DTS_IS_VALID (buf))
237     GST_BUFFER_DTS (buf) += offset;
238
239   /* We are active, and one queue is empty, place this buffer in
240    * the dataqueue */
241   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
242   item = g_slice_new (GstDataQueueItem);
243   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
244   item->object = GST_MINI_OBJECT (buf);
245   item->size = gst_buffer_get_size (buf);
246   item->duration = GST_BUFFER_DURATION (buf);
247   if (item->duration == GST_CLOCK_TIME_NONE)
248     item->duration = 0;
249   item->visible = TRUE;
250
251   gst_object_ref (part_pad);
252
253   SPLITMUX_PART_UNLOCK (reader);
254
255   if (!gst_data_queue_push (part_pad->queue, item)) {
256     splitmux_part_free_queue_item (item);
257     gst_object_unref (part_pad);
258     return GST_FLOW_FLUSHING;
259   }
260
261   gst_object_unref (part_pad);
262   return GST_FLOW_OK;
263 }
264
265 /* Called with splitmux part lock held */
266 static gboolean
267 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
268 {
269   GList *cur;
270   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
271     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
272     if (!part_pad->is_eos)
273       return FALSE;
274   }
275
276   return TRUE;
277 }
278
279 static gboolean
280 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
281 {
282   GList *cur;
283   GST_LOG_OBJECT (part, "Checking for preroll");
284   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
285     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
286     if (!part_pad->seen_buffer) {
287       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
288           part_pad);
289       return FALSE;
290     }
291   }
292   GST_LOG_OBJECT (part, "Part is prerolled");
293   return TRUE;
294 }
295
296
297 gboolean
298 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
299 {
300   gboolean res;
301
302   SPLITMUX_PART_LOCK (reader);
303   res = splitmux_part_is_eos_locked (reader);
304   SPLITMUX_PART_UNLOCK (reader);
305
306   return res;
307 }
308
309 /* Called with splitmux part lock held */
310 static gboolean
311 splitmux_is_flushing (GstSplitMuxPartReader * reader)
312 {
313   GList *cur;
314   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
315     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
316     if (part_pad->flushing)
317       return TRUE;
318   }
319
320   return FALSE;
321 }
322
323 static gboolean
324 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
325 {
326   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
327   GstSplitMuxPartReader *reader = part_pad->reader;
328   gboolean ret = TRUE;
329   SplitMuxSrcPad *target;
330   GstDataQueueItem *item;
331
332   SPLITMUX_PART_LOCK (reader);
333
334   target = gst_object_ref (part_pad->target);
335
336   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
337       event);
338
339   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
340     goto drop_event;
341
342   switch (GST_EVENT_TYPE (event)) {
343     case GST_EVENT_SEGMENT:{
344       GstSegment *seg = &part_pad->segment;
345
346       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
347
348       gst_event_copy_segment (event, seg);
349       gst_event_copy_segment (event, &part_pad->orig_segment);
350
351       if (seg->format != GST_FORMAT_TIME)
352         goto wrong_segment;
353
354       /* Adjust segment */
355       /* Adjust start/stop so the overall file is 0 + start_offset based */
356       if (seg->stop != -1) {
357         seg->stop -= seg->start;
358         seg->stop += seg->time + reader->start_offset;
359       }
360       seg->start = seg->time + reader->start_offset;
361       seg->time += reader->start_offset;
362       seg->position += reader->start_offset;
363
364       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
365
366       /* Replace event */
367       gst_event_unref (event);
368       event = gst_event_new_segment (seg);
369
370       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
371           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
372         break;                  /* Only do further stuff with segments during initial measuring */
373
374       /* Take the first segment from the first part */
375       if (target->segment.format == GST_FORMAT_UNDEFINED) {
376         gst_segment_copy_into (seg, &target->segment);
377         GST_DEBUG_OBJECT (reader,
378             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
379       }
380
381       if (seg->stop != -1 && target->segment.stop != -1) {
382         GstClockTime stop = seg->base + seg->stop;
383         if (stop > target->segment.stop) {
384           target->segment.stop = stop;
385           GST_DEBUG_OBJECT (reader,
386               "Adjusting segment stop by %" GST_TIME_FORMAT
387               " output now %" GST_SEGMENT_FORMAT,
388               GST_TIME_ARGS (reader->start_offset), &target->segment);
389         }
390       }
391       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
392       break;
393     }
394     case GST_EVENT_EOS:{
395
396       GST_DEBUG_OBJECT (part_pad,
397           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
398           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
399
400       if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
401           reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
402         /* Mark this pad as EOS */
403         part_pad->is_eos = TRUE;
404         if (splitmux_part_is_eos_locked (reader)) {
405           /* Finished measuring things, set state and tell the state change func
406            * so it can seek back to the start */
407           GST_LOG_OBJECT (reader,
408               "EOS while measuring streams. Resetting for ready");
409           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
410           SPLITMUX_PART_BROADCAST (reader);
411         }
412         goto drop_event;
413       }
414       break;
415     }
416     case GST_EVENT_FLUSH_START:
417       reader->flushing = TRUE;
418       part_pad->flushing = TRUE;
419       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
420           part_pad);
421       gst_data_queue_set_flushing (part_pad->queue, TRUE);
422       SPLITMUX_PART_BROADCAST (reader);
423       break;
424     case GST_EVENT_FLUSH_STOP:{
425       gst_data_queue_set_flushing (part_pad->queue, FALSE);
426       gst_data_queue_flush (part_pad->queue);
427       part_pad->seen_buffer = FALSE;
428       part_pad->flushing = FALSE;
429       part_pad->is_eos = FALSE;
430
431       reader->flushing = splitmux_is_flushing (reader);
432       GST_LOG_OBJECT (reader,
433           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
434           reader->path, pad, reader->flushing);
435       SPLITMUX_PART_BROADCAST (reader);
436       break;
437     }
438     default:
439       break;
440   }
441
442   /* Don't send events downstream while preparing */
443   if (reader->prep_state != PART_STATE_READY)
444     goto drop_event;
445
446   /* Don't pass flush events - those are done by the parent */
447   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
448       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
449     goto drop_event;
450
451   if (!block_until_can_push (reader)) {
452     SPLITMUX_PART_UNLOCK (reader);
453     gst_object_unref (target);
454     gst_event_unref (event);
455     return FALSE;
456   }
457
458   switch (GST_EVENT_TYPE (event)) {
459     case GST_EVENT_GAP:{
460       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
461       goto drop_event;
462     }
463     default:
464       break;
465   }
466
467   /* We are active, and one queue is empty, place this buffer in
468    * the dataqueue */
469   gst_object_ref (part_pad->queue);
470   SPLITMUX_PART_UNLOCK (reader);
471
472   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
473   item = g_slice_new (GstDataQueueItem);
474   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
475   item->object = GST_MINI_OBJECT (event);
476   item->size = 0;
477   item->duration = 0;
478   if (item->duration == GST_CLOCK_TIME_NONE)
479     item->duration = 0;
480   item->visible = FALSE;
481
482   if (!gst_data_queue_push (part_pad->queue, item)) {
483     splitmux_part_free_queue_item (item);
484     ret = FALSE;
485   }
486
487   gst_object_unref (part_pad->queue);
488   gst_object_unref (target);
489
490   return ret;
491 wrong_segment:
492   gst_event_unref (event);
493   gst_object_unref (target);
494   SPLITMUX_PART_UNLOCK (reader);
495   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
496       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
497           reader->path, pad));
498   return FALSE;
499 drop_event:
500   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
501       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
502   gst_event_unref (event);
503   gst_object_unref (target);
504   SPLITMUX_PART_UNLOCK (reader);
505   return TRUE;
506 }
507
508 static gboolean
509 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
510 {
511   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
512   GstSplitMuxPartReader *reader = part_pad->reader;
513   GstPad *target;
514   gboolean ret = FALSE;
515   gboolean active;
516
517   SPLITMUX_PART_LOCK (reader);
518   target = gst_object_ref (part_pad->target);
519   active = reader->active;
520   SPLITMUX_PART_UNLOCK (reader);
521
522   if (active) {
523     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
524         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
525
526     ret = gst_pad_query (target, query);
527   }
528
529   gst_object_unref (target);
530
531   return ret;
532 }
533
534 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
535
536 static void
537 splitmux_part_pad_constructed (GObject * pad)
538 {
539   gst_pad_set_chain_function (GST_PAD (pad),
540       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
541   gst_pad_set_event_function (GST_PAD (pad),
542       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
543   gst_pad_set_query_function (GST_PAD (pad),
544       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
545
546   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
547 }
548
549 static void
550 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
551 {
552   GObjectClass *gobject_klass = (GObjectClass *) (klass);
553
554   gobject_klass->constructed = splitmux_part_pad_constructed;
555   gobject_klass->finalize = splitmux_part_pad_finalize;
556 }
557
558 static void
559 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
560 {
561   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
562       NULL, NULL, pad);
563   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
564   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
565 }
566
567 static void
568 splitmux_part_pad_finalize (GObject * obj)
569 {
570   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
571
572   GST_DEBUG_OBJECT (obj, "finalize");
573   gst_data_queue_set_flushing (pad->queue, TRUE);
574   gst_data_queue_flush (pad->queue);
575   gst_object_unref (GST_OBJECT_CAST (pad->queue));
576   pad->queue = NULL;
577
578   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
579 }
580
581 static void
582 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
583     GstSplitMuxPartReader * part);
584 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
585 static GstStateChangeReturn
586 gst_splitmux_part_reader_change_state (GstElement * element,
587     GstStateChange transition);
588 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
589     GstEvent * event);
590 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
591     * part, gboolean flushing);
592 static void bus_handler (GstBin * bin, GstMessage * msg);
593 static void splitmux_part_reader_dispose (GObject * object);
594 static void splitmux_part_reader_finalize (GObject * object);
595 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
596
597 #define gst_splitmux_part_reader_parent_class parent_class
598 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
599     GST_TYPE_PIPELINE);
600
601 static void
602 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
603 {
604   GObjectClass *gobject_klass = (GObjectClass *) (klass);
605   GstElementClass *gstelement_class = (GstElementClass *) klass;
606   GstBinClass *gstbin_class = (GstBinClass *) klass;
607
608   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
609       "Split File Demuxing Source helper");
610
611   gobject_klass->dispose = splitmux_part_reader_dispose;
612   gobject_klass->finalize = splitmux_part_reader_finalize;
613
614   part_reader_signals[SIGNAL_PREPARED] =
615       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
616       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
617           prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
618   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
619   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
620
621   gstbin_class->handle_message = bus_handler;
622 }
623
624 static void
625 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
626 {
627   GstElement *typefind;
628
629   reader->active = FALSE;
630   reader->duration = GST_CLOCK_TIME_NONE;
631
632   g_cond_init (&reader->inactive_cond);
633   g_mutex_init (&reader->lock);
634   g_mutex_init (&reader->type_lock);
635
636   /* FIXME: Create elements on a state change */
637   reader->src = gst_element_factory_make ("filesrc", NULL);
638   if (reader->src == NULL) {
639     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
640     return;
641   }
642   gst_bin_add (GST_BIN_CAST (reader), reader->src);
643
644   typefind = gst_element_factory_make ("typefind", NULL);
645   if (!typefind) {
646     GST_ERROR_OBJECT (reader,
647         "Failed to create typefind element - check your installation");
648     return;
649   }
650
651   gst_bin_add (GST_BIN_CAST (reader), typefind);
652   reader->typefind = typefind;
653
654   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
655     GST_ERROR_OBJECT (reader,
656         "Failed to link typefind element - check your installation");
657     return;
658   }
659
660   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
661       reader);
662 }
663
664 static void
665 splitmux_part_reader_dispose (GObject * object)
666 {
667   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
668
669   splitmux_part_reader_reset (reader);
670
671   G_OBJECT_CLASS (parent_class)->dispose (object);
672 }
673
674 static void
675 splitmux_part_reader_finalize (GObject * object)
676 {
677   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
678
679   g_cond_clear (&reader->inactive_cond);
680   g_mutex_clear (&reader->lock);
681   g_mutex_clear (&reader->type_lock);
682
683   g_free (reader->path);
684
685   G_OBJECT_CLASS (parent_class)->finalize (object);
686 }
687
688 static void
689 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
690 {
691   GList *cur;
692
693   SPLITMUX_PART_LOCK (reader);
694   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
695     GstPad *pad = GST_PAD_CAST (cur->data);
696     gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
697     gst_object_unref (GST_OBJECT_CAST (pad));
698   }
699
700   g_list_free (reader->pads);
701   reader->pads = NULL;
702   SPLITMUX_PART_UNLOCK (reader);
703 }
704
705 static GstSplitMuxPartPad *
706 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
707     GstPad * target)
708 {
709   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
710       "name", GST_PAD_NAME (target),
711       "direction", GST_PAD_SINK,
712       NULL);
713   pad->target = target;
714   pad->reader = reader;
715
716   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
717
718   return pad;
719 }
720
721 static void
722 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
723     GstSplitMuxPartReader * reader)
724 {
725   GstPad *out_pad = NULL;
726   GstSplitMuxPartPad *proxy_pad;
727   GstCaps *caps;
728   GstPadLinkReturn link_ret;
729
730   caps = gst_pad_get_current_caps (pad);
731
732   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
733       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
734
735   gst_caps_unref (caps);
736
737   /* Look up or create the output pad */
738   if (reader->get_pad_cb)
739     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
740   if (out_pad == NULL)
741     return;
742
743   /* Create our proxy pad to interact with this new pad */
744   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
745   GST_DEBUG_OBJECT (reader,
746       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
747       proxy_pad, out_pad);
748
749   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
750   if (link_ret != GST_PAD_LINK_OK) {
751     gst_object_unref (proxy_pad);
752     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
753         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
754             " ret %d", reader->path, pad, link_ret));
755     return;
756   }
757   GST_DEBUG_OBJECT (reader,
758       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
759       pad, proxy_pad);
760
761   SPLITMUX_PART_LOCK (reader);
762   reader->pads = g_list_prepend (reader->pads, proxy_pad);
763   SPLITMUX_PART_UNLOCK (reader);
764 }
765
766 static gboolean
767 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
768 {
769   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
770   gboolean ret = FALSE;
771   GstPad *pad = NULL;
772
773   /* Send event to the first source pad we found */
774   SPLITMUX_PART_LOCK (reader);
775   if (reader->pads) {
776     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
777     pad = gst_pad_get_peer (proxy_pad);
778   }
779   SPLITMUX_PART_UNLOCK (reader);
780
781   if (pad) {
782     ret = gst_pad_send_event (pad, event);
783     gst_object_unref (pad);
784   } else {
785     gst_event_unref (event);
786   }
787
788   return ret;
789 }
790
791 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
792 static void
793 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
794     GstClockTime time)
795 {
796   SPLITMUX_PART_UNLOCK (reader);
797   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
798       GST_TIME_ARGS (time));
799   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
800       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
801       GST_SEEK_TYPE_END, 0);
802
803   SPLITMUX_PART_LOCK (reader);
804
805   /* Wait for flush to finish, so old data is gone */
806   while (reader->flushing) {
807     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
808     SPLITMUX_PART_WAIT (reader);
809   }
810 }
811
812 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
813 static gboolean
814 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
815     GstSegment * target_seg)
816 {
817   GstSeekFlags flags;
818   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
819
820   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
821
822   SPLITMUX_PART_LOCK (reader);
823   if (target_seg->start >= reader->start_offset)
824     start = target_seg->start - reader->start_offset;
825   /* If the segment stop is within this part, don't play to the end */
826   if (target_seg->stop != -1 &&
827       target_seg->stop < reader->start_offset + reader->duration)
828     stop = target_seg->stop - reader->start_offset;
829
830   SPLITMUX_PART_UNLOCK (reader);
831
832   GST_DEBUG_OBJECT (reader,
833       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
834       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
835       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
836
837   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
838       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
839       stop);
840 }
841
842 /* Called with lock held */
843 static void
844 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
845 {
846   /* Trigger a flushing seek to near the end of the file and run each stream
847    * to EOS in order to find the smallest end timestamp to start the next
848    * file from
849    */
850   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
851       && reader->duration > GST_SECOND) {
852     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
853     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
854   }
855
856   /* Wait for things to happen */
857   while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
858     SPLITMUX_PART_WAIT (reader);
859
860   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
861     /* Fire the prepared signal and go to READY state */
862     GST_DEBUG_OBJECT (reader,
863         "Stream measuring complete. File %s is now ready. Firing prepared signal",
864         reader->path);
865     reader->prep_state = PART_STATE_READY;
866     g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
867   }
868 }
869
870 static GstElement *
871 find_demuxer (GstCaps * caps)
872 {
873   GList *factories =
874       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
875       GST_RANK_MARGINAL);
876   GList *compat_elements;
877   GstElement *e = NULL;
878
879   if (factories == NULL)
880     return NULL;
881
882   compat_elements =
883       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
884
885   if (compat_elements) {
886     /* Just take the first (highest ranked) option */
887     GstElementFactory *factory =
888         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
889     e = gst_element_factory_create (factory, NULL);
890     gst_plugin_feature_list_free (compat_elements);
891   }
892
893   if (factories)
894     gst_plugin_feature_list_free (factories);
895
896   return e;
897 }
898
899 static void
900 type_found (GstElement * typefind, guint probability,
901     GstCaps * caps, GstSplitMuxPartReader * reader)
902 {
903   GstElement *demux;
904
905   GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
906
907   /* typefind found a type. Look for the demuxer to handle it */
908   demux = reader->demux = find_demuxer (caps);
909   if (reader->demux == NULL) {
910     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
911     return;
912   }
913
914   gst_element_set_locked_state (demux, TRUE);
915   gst_bin_add (GST_BIN_CAST (reader), demux);
916   gst_element_link_pads (reader->typefind, "src", demux, NULL);
917   gst_element_sync_state_with_parent (reader->demux);
918   gst_element_set_locked_state (demux, FALSE);
919
920   /* Connect to demux signals */
921   g_signal_connect (demux,
922       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
923   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
924 }
925
926 static void
927 check_if_pads_collected (GstSplitMuxPartReader * reader)
928 {
929   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
930     /* Check we have all pads and each pad has seen a buffer */
931     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
932       GST_DEBUG_OBJECT (reader,
933           "no more pads - file %s. Measuring stream length", reader->path);
934       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
935       SPLITMUX_PART_BROADCAST (reader);
936     }
937   }
938 }
939
940 static void
941 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
942 {
943   GstClockTime duration = GST_CLOCK_TIME_NONE;
944   GList *cur;
945   /* Query the minimum duration of any pad in this piece and store it.
946    * FIXME: Only consider audio and video */
947   SPLITMUX_PART_LOCK (reader);
948   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
949     GstPad *target = GST_PAD_CAST (cur->data);
950     if (target) {
951       gint64 cur_duration;
952       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
953         GST_INFO_OBJECT (reader,
954             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
955             reader->path, target, GST_TIME_ARGS (cur_duration));
956         if (cur_duration < duration)
957           duration = cur_duration;
958       }
959     }
960   }
961   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
962       reader->path, GST_TIME_ARGS (duration));
963   reader->duration = (GstClockTime) duration;
964
965   reader->no_more_pads = TRUE;
966
967   check_if_pads_collected (reader);
968   SPLITMUX_PART_UNLOCK (reader);
969 }
970
971 gboolean
972 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
973     GstPad * src_pad, GstQuery * query)
974 {
975   GstPad *target = NULL;
976   gboolean ret;
977   GList *cur;
978
979   SPLITMUX_PART_LOCK (part);
980   /* Find the pad corresponding to the visible output target pad */
981   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
982     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
983     if (part_pad->target == src_pad) {
984       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
985       break;
986     }
987   }
988   SPLITMUX_PART_UNLOCK (part);
989
990   if (target == NULL)
991     return FALSE;
992
993   ret = gst_pad_peer_query (target, query);
994   gst_object_unref (GST_OBJECT_CAST (target));
995
996   if (ret == FALSE)
997     goto out;
998
999   /* Post-massaging of queries */
1000   switch (GST_QUERY_TYPE (query)) {
1001     case GST_QUERY_POSITION:{
1002       GstFormat fmt;
1003       gint64 position;
1004
1005       gst_query_parse_position (query, &fmt, &position);
1006       if (fmt != GST_FORMAT_TIME)
1007         return FALSE;
1008       SPLITMUX_PART_LOCK (part);
1009       position += part->start_offset;
1010       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1011           GST_TIME_ARGS (position));
1012       SPLITMUX_PART_UNLOCK (part);
1013
1014       gst_query_set_position (query, fmt, position);
1015       break;
1016     }
1017     default:
1018       break;
1019   }
1020
1021 out:
1022   gst_object_unref (target);
1023   return ret;
1024 }
1025
1026 static GstStateChangeReturn
1027 gst_splitmux_part_reader_change_state (GstElement * element,
1028     GstStateChange transition)
1029 {
1030   GstStateChangeReturn ret;
1031   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1032
1033   switch (transition) {
1034     case GST_STATE_CHANGE_NULL_TO_READY:{
1035       break;
1036     }
1037     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1038       /* Hold the splitmux type lock until after the
1039        * parent state change function has finished
1040        * changing the states of things, and type finding can continue */
1041       SPLITMUX_PART_LOCK (reader);
1042       g_object_set (reader->src, "location", reader->path, NULL);
1043       SPLITMUX_PART_UNLOCK (reader);
1044       SPLITMUX_PART_TYPE_LOCK (reader);
1045       break;
1046     }
1047     case GST_STATE_CHANGE_READY_TO_NULL:
1048     case GST_STATE_CHANGE_PAUSED_TO_READY:
1049       SPLITMUX_PART_LOCK (reader);
1050       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1051       reader->running = FALSE;
1052       SPLITMUX_PART_BROADCAST (reader);
1053       SPLITMUX_PART_UNLOCK (reader);
1054       break;
1055     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1056       SPLITMUX_PART_LOCK (reader);
1057       reader->active = FALSE;
1058       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1059       SPLITMUX_PART_BROADCAST (reader);
1060       SPLITMUX_PART_UNLOCK (reader);
1061       break;
1062     default:
1063       break;
1064   }
1065
1066   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1067   if (ret == GST_STATE_CHANGE_FAILURE) {
1068     if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
1069       /* Make sure to release the lock we took above */
1070       SPLITMUX_PART_TYPE_UNLOCK (reader);
1071     }
1072     goto beach;
1073   }
1074
1075   switch (transition) {
1076     case GST_STATE_CHANGE_READY_TO_PAUSED:
1077       /* Sleep and wait until all streams have been collected, then do the seeks
1078        * to measure the stream lengths. This took the type lock above,
1079        * but it's OK to release it now and let typefinding happen... */
1080       SPLITMUX_PART_TYPE_UNLOCK (reader);
1081
1082       SPLITMUX_PART_LOCK (reader);
1083       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1084       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1085       reader->running = TRUE;
1086
1087       while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1088         GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1089         SPLITMUX_PART_WAIT (reader);
1090       }
1091
1092       if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
1093           reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
1094         gst_splitmux_part_reader_measure_streams (reader);
1095       } else if (reader->prep_state == PART_STATE_FAILED)
1096         ret = GST_STATE_CHANGE_FAILURE;
1097       SPLITMUX_PART_UNLOCK (reader);
1098       break;
1099     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1100       SPLITMUX_PART_LOCK (reader);
1101       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1102       reader->active = TRUE;
1103       SPLITMUX_PART_BROADCAST (reader);
1104       SPLITMUX_PART_UNLOCK (reader);
1105       break;
1106     case GST_STATE_CHANGE_READY_TO_NULL:
1107       reader->prep_state = PART_STATE_NULL;
1108       splitmux_part_reader_reset (reader);
1109       break;
1110     default:
1111       break;
1112   }
1113
1114 beach:
1115   return ret;
1116 }
1117
1118 static gboolean
1119 check_bus_messages (GstSplitMuxPartReader * part)
1120 {
1121   gboolean ret = FALSE;
1122   GstBus *bus;
1123   GstMessage *m;
1124
1125   bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1126   while ((m = gst_bus_pop (bus)) != NULL) {
1127     if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1128       GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1129       gst_message_unref (m);
1130       goto done;
1131     }
1132     gst_message_unref (m);
1133   }
1134   ret = TRUE;
1135 done:
1136   gst_object_unref (bus);
1137   return ret;
1138 }
1139
1140 gboolean
1141 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1142 {
1143   GstStateChangeReturn ret;
1144
1145   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1146
1147   if (ret != GST_STATE_CHANGE_SUCCESS)
1148     return FALSE;
1149
1150   return check_bus_messages (part);
1151 }
1152
1153 void
1154 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1155 {
1156   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1157 }
1158
1159 void
1160 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1161     const gchar * path)
1162 {
1163   reader->path = g_strdup (path);
1164 }
1165
1166 gboolean
1167 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1168     GstSegment * seg)
1169 {
1170   GST_DEBUG_OBJECT (reader, "Activating part reader");
1171
1172   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
1173     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1174         seg);
1175     return FALSE;
1176   }
1177   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1178           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1179     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1180     return FALSE;
1181   }
1182   return TRUE;
1183 }
1184
1185 void
1186 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1187 {
1188   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1189   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1190 }
1191
1192 void
1193 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1194     gboolean flushing)
1195 {
1196   GList *cur;
1197
1198   GST_LOG_OBJECT (reader, "%s dataqueues",
1199       flushing ? "Flushing" : "Done flushing");
1200   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1201     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1202     gst_data_queue_set_flushing (part_pad->queue, flushing);
1203     if (flushing)
1204       gst_data_queue_flush (part_pad->queue);
1205   }
1206 };
1207
1208 void
1209 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1210     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1211 {
1212   reader->cb_data = cb_data;
1213   reader->get_pad_cb = get_pad_cb;
1214 }
1215
1216 GstClockTime
1217 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1218 {
1219   GList *cur;
1220   GstClockTime ret = GST_CLOCK_TIME_NONE;
1221
1222   SPLITMUX_PART_LOCK (reader);
1223   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1224     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1225     if (part_pad->max_ts < ret)
1226       ret = part_pad->max_ts;
1227   }
1228
1229   SPLITMUX_PART_UNLOCK (reader);
1230
1231   return ret;
1232 }
1233
1234 void
1235 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1236     GstClockTime offset)
1237 {
1238   SPLITMUX_PART_LOCK (reader);
1239   reader->start_offset = offset;
1240   GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1241       GST_TIME_ARGS (offset));
1242   SPLITMUX_PART_UNLOCK (reader);
1243 }
1244
1245 GstClockTime
1246 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1247 {
1248   GstClockTime ret = GST_CLOCK_TIME_NONE;
1249
1250   SPLITMUX_PART_LOCK (reader);
1251   ret = reader->start_offset;
1252   SPLITMUX_PART_UNLOCK (reader);
1253
1254   return ret;
1255 }
1256
1257 GstClockTime
1258 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1259 {
1260   GstClockTime dur;
1261
1262   SPLITMUX_PART_LOCK (reader);
1263   dur = reader->duration;
1264   SPLITMUX_PART_UNLOCK (reader);
1265
1266   return dur;
1267 }
1268
1269 GstPad *
1270 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1271     GstPad * target)
1272 {
1273   GstPad *result = NULL;
1274   GList *cur;
1275
1276   SPLITMUX_PART_LOCK (reader);
1277   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1278     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1279     if (part_pad->target == target) {
1280       result = (GstPad *) gst_object_ref (part_pad);
1281       break;
1282     }
1283   }
1284   SPLITMUX_PART_UNLOCK (reader);
1285
1286   return result;
1287 }
1288
1289 GstFlowReturn
1290 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1291     GstDataQueueItem ** item)
1292 {
1293   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1294   GstDataQueue *q;
1295   GstFlowReturn ret;
1296
1297   /* Get one item from the appropriate dataqueue */
1298   SPLITMUX_PART_LOCK (reader);
1299   if (reader->prep_state == PART_STATE_FAILED) {
1300     SPLITMUX_PART_UNLOCK (reader);
1301     return GST_FLOW_ERROR;
1302   }
1303
1304   q = gst_object_ref (part_pad->queue);
1305
1306   /* Have to drop the lock around pop, so we can be woken up for flush */
1307   SPLITMUX_PART_UNLOCK (reader);
1308   if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1309     ret = GST_FLOW_FLUSHING;
1310     goto out;
1311   }
1312
1313   SPLITMUX_PART_LOCK (reader);
1314
1315   SPLITMUX_PART_BROADCAST (reader);
1316   if (GST_IS_EVENT ((*item)->object)) {
1317     GstEvent *e = (GstEvent *) ((*item)->object);
1318     /* Mark this pad as EOS */
1319     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1320       part_pad->is_eos = TRUE;
1321   }
1322
1323   SPLITMUX_PART_UNLOCK (reader);
1324
1325   ret = GST_FLOW_OK;
1326 out:
1327   gst_object_unref (q);
1328   return ret;
1329 }
1330
1331 static void
1332 bus_handler (GstBin * bin, GstMessage * message)
1333 {
1334   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1335
1336   switch (GST_MESSAGE_TYPE (message)) {
1337     case GST_MESSAGE_ERROR:
1338       /* Make sure to set the state to failed and wake up the listener
1339        * on error */
1340       SPLITMUX_PART_LOCK (reader);
1341       reader->prep_state = PART_STATE_FAILED;
1342       SPLITMUX_PART_BROADCAST (reader);
1343       SPLITMUX_PART_UNLOCK (reader);
1344       break;
1345     default:
1346       break;
1347   }
1348
1349   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1350 }