splitmuxsink: Don't leak old muxer/sink in async mode
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxpartreader.c
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39
40 enum
41 {
42   SIGNAL_PREPARED,
43   LAST_SIGNAL
44 };
45
46 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
47
48 typedef struct _GstSplitMuxPartPad
49 {
50   GstPad parent;
51
52   /* Reader we belong to */
53   GstSplitMuxPartReader *reader;
54   /* Output splitmuxsrc source pad */
55   GstPad *target;
56
57   GstDataQueue *queue;
58
59   gboolean is_eos;
60   gboolean flushing;
61   gboolean seen_buffer;
62
63   gboolean is_sparse;
64   GstClockTime max_ts;
65   GstSegment segment;
66
67   GstSegment orig_segment;
68   GstClockTime initial_ts_offset;
69 } GstSplitMuxPartPad;
70
71 typedef struct _GstSplitMuxPartPadClass
72 {
73   GstPadClass parent;
74 } GstSplitMuxPartPadClass;
75
76 static GType gst_splitmux_part_pad_get_type (void);
77 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
78 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
79
80 static void splitmux_part_pad_constructed (GObject * pad);
81 static void splitmux_part_pad_finalize (GObject * pad);
82 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
83     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
84
85 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
86     guint visible, guint bytes, guint64 time, gpointer checkdata);
87 static void type_found (GstElement * typefind, guint probability,
88     GstCaps * caps, GstSplitMuxPartReader * reader);
89 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
90
91 /* Called with reader lock held */
92 static gboolean
93 have_empty_queue (GstSplitMuxPartReader * reader)
94 {
95   GList *cur;
96
97   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
98     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
99     if (part_pad->is_eos) {
100       GST_LOG_OBJECT (part_pad, "Pad is EOS");
101       return TRUE;
102     }
103     if (gst_data_queue_is_empty (part_pad->queue)) {
104       GST_LOG_OBJECT (part_pad, "Queue is empty");
105       return TRUE;
106     }
107   }
108
109   return FALSE;
110 }
111
112 /* Called with reader lock held */
113 static gboolean
114 block_until_can_push (GstSplitMuxPartReader * reader)
115 {
116   while (reader->running) {
117     if (reader->flushing)
118       goto out;
119     if (reader->active && have_empty_queue (reader))
120       goto out;
121
122     GST_LOG_OBJECT (reader,
123         "Waiting for activation or empty queue on reader %s", reader->path);
124     SPLITMUX_PART_WAIT (reader);
125   }
126
127   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
128       reader->path, reader->active, reader->flushing);
129 out:
130   return reader->active && !reader->flushing;
131 }
132
133 static void
134 handle_buffer_measuring (GstSplitMuxPartReader * reader,
135     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
136 {
137   GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
138   GstClockTimeDiff offset;
139
140   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
141       !part_pad->seen_buffer) {
142     /* If this is the first buffer on the pad in the collect_streams state,
143      * then calculate inital offset based on running time of this segment */
144     part_pad->initial_ts_offset =
145         part_pad->orig_segment.start + part_pad->orig_segment.base -
146         part_pad->orig_segment.time;
147     GST_DEBUG_OBJECT (reader,
148         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
149         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
150   }
151   part_pad->seen_buffer = TRUE;
152
153   /* Adjust buffer timestamps */
154   offset = reader->start_offset + part_pad->segment.base;
155   offset -= part_pad->initial_ts_offset;
156
157   /* Update the stored max duration on the pad,
158    * always preferring making DTS contiguous
159    * where possible */
160   if (GST_BUFFER_DTS_IS_VALID (buf))
161     ts = GST_BUFFER_DTS (buf) + offset;
162   else if (GST_BUFFER_PTS_IS_VALID (buf))
163     ts = GST_BUFFER_PTS (buf) + offset;
164
165   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
166       " incoming PTS %" GST_TIME_FORMAT
167       " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
168       " to %" GST_STIME_FORMAT, part_pad,
169       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
170       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
171       GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
172
173   if (GST_CLOCK_STIME_IS_VALID (ts)) {
174     if (GST_BUFFER_DURATION_IS_VALID (buf))
175       ts += GST_BUFFER_DURATION (buf);
176
177     if (GST_CLOCK_STIME_IS_VALID (ts)
178         && ts > (GstClockTimeDiff) part_pad->max_ts) {
179       part_pad->max_ts = ts;
180       GST_LOG_OBJECT (reader,
181           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
182           GST_TIME_ARGS (part_pad->max_ts));
183     }
184   }
185   /* Is it time to move to measuring state yet? */
186   check_if_pads_collected (reader);
187 }
188
189 static gboolean
190 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
191     guint visible, guint bytes, guint64 time, gpointer checkdata)
192 {
193   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
194   if (time > 20 * GST_SECOND)
195     return TRUE;
196   return FALSE;
197 }
198
199 static void
200 splitmux_part_free_queue_item (GstDataQueueItem * item)
201 {
202   gst_mini_object_unref (item->object);
203   g_slice_free (GstDataQueueItem, item);
204 }
205
206 static GstFlowReturn
207 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
208 {
209   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
210   GstSplitMuxPartReader *reader = part_pad->reader;
211   GstDataQueueItem *item;
212   GstClockTimeDiff offset;
213
214   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
215   SPLITMUX_PART_LOCK (reader);
216
217   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
218       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
219     handle_buffer_measuring (reader, part_pad, buf);
220     gst_buffer_unref (buf);
221     SPLITMUX_PART_UNLOCK (reader);
222     return GST_FLOW_OK;
223   }
224
225   if (!block_until_can_push (reader)) {
226     /* Flushing */
227     SPLITMUX_PART_UNLOCK (reader);
228     gst_buffer_unref (buf);
229     return GST_FLOW_FLUSHING;
230   }
231
232   /* Adjust buffer timestamps */
233   offset = reader->start_offset + part_pad->segment.base;
234   offset -= part_pad->initial_ts_offset;
235
236   if (GST_BUFFER_PTS_IS_VALID (buf))
237     GST_BUFFER_PTS (buf) += offset;
238   if (GST_BUFFER_DTS_IS_VALID (buf))
239     GST_BUFFER_DTS (buf) += offset;
240
241   /* We are active, and one queue is empty, place this buffer in
242    * the dataqueue */
243   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
244   item = g_slice_new (GstDataQueueItem);
245   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
246   item->object = GST_MINI_OBJECT (buf);
247   item->size = gst_buffer_get_size (buf);
248   item->duration = GST_BUFFER_DURATION (buf);
249   if (item->duration == GST_CLOCK_TIME_NONE)
250     item->duration = 0;
251   item->visible = TRUE;
252
253   gst_object_ref (part_pad);
254
255   SPLITMUX_PART_UNLOCK (reader);
256
257   if (!gst_data_queue_push (part_pad->queue, item)) {
258     splitmux_part_free_queue_item (item);
259     gst_object_unref (part_pad);
260     return GST_FLOW_FLUSHING;
261   }
262
263   gst_object_unref (part_pad);
264   return GST_FLOW_OK;
265 }
266
267 /* Called with splitmux part lock held */
268 static gboolean
269 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
270 {
271   GList *cur;
272   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
273     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
274     if (!part_pad->is_eos)
275       return FALSE;
276   }
277
278   return TRUE;
279 }
280
281 static gboolean
282 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
283 {
284   GList *cur;
285   GST_LOG_OBJECT (part, "Checking for preroll");
286   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
287     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
288     if (!part_pad->seen_buffer) {
289       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
290           part_pad);
291       return FALSE;
292     }
293   }
294   GST_LOG_OBJECT (part, "Part is prerolled");
295   return TRUE;
296 }
297
298
299 gboolean
300 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
301 {
302   gboolean res;
303
304   SPLITMUX_PART_LOCK (reader);
305   res = splitmux_part_is_eos_locked (reader);
306   SPLITMUX_PART_UNLOCK (reader);
307
308   return res;
309 }
310
311 /* Called with splitmux part lock held */
312 static gboolean
313 splitmux_is_flushing (GstSplitMuxPartReader * reader)
314 {
315   GList *cur;
316   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
317     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
318     if (part_pad->flushing)
319       return TRUE;
320   }
321
322   return FALSE;
323 }
324
325 static gboolean
326 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
327 {
328   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
329   GstSplitMuxPartReader *reader = part_pad->reader;
330   gboolean ret = TRUE;
331   SplitMuxSrcPad *target;
332   GstDataQueueItem *item;
333
334   SPLITMUX_PART_LOCK (reader);
335
336   target = gst_object_ref (part_pad->target);
337
338   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
339       event);
340
341   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
342     goto drop_event;
343
344   switch (GST_EVENT_TYPE (event)) {
345     case GST_EVENT_STREAM_START:{
346       GstStreamFlags flags;
347       gst_event_parse_stream_flags (event, &flags);
348       part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
349       break;
350     }
351     case GST_EVENT_SEGMENT:{
352       GstSegment *seg = &part_pad->segment;
353
354       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
355
356       gst_event_copy_segment (event, seg);
357       gst_event_copy_segment (event, &part_pad->orig_segment);
358
359       if (seg->format != GST_FORMAT_TIME)
360         goto wrong_segment;
361
362       /* Adjust segment */
363       /* Adjust start/stop so the overall file is 0 + start_offset based */
364       if (seg->stop != -1) {
365         seg->stop -= seg->start;
366         seg->stop += seg->time + reader->start_offset;
367       }
368       seg->start = seg->time + reader->start_offset;
369       seg->time += reader->start_offset;
370       seg->position += reader->start_offset;
371
372       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
373
374       /* Replace event */
375       gst_event_unref (event);
376       event = gst_event_new_segment (seg);
377
378       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
379           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
380         break;                  /* Only do further stuff with segments during initial measuring */
381
382       /* Take the first segment from the first part */
383       if (target->segment.format == GST_FORMAT_UNDEFINED) {
384         gst_segment_copy_into (seg, &target->segment);
385         GST_DEBUG_OBJECT (reader,
386             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
387       }
388
389       if (seg->stop != -1 && target->segment.stop != -1) {
390         GstClockTime stop = seg->base + seg->stop;
391         if (stop > target->segment.stop) {
392           target->segment.stop = stop;
393           GST_DEBUG_OBJECT (reader,
394               "Adjusting segment stop by %" GST_TIME_FORMAT
395               " output now %" GST_SEGMENT_FORMAT,
396               GST_TIME_ARGS (reader->start_offset), &target->segment);
397         }
398       }
399       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
400       break;
401     }
402     case GST_EVENT_EOS:{
403
404       GST_DEBUG_OBJECT (part_pad,
405           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
406           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
407
408       if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
409           reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
410         /* Mark this pad as EOS */
411         part_pad->is_eos = TRUE;
412         if (splitmux_part_is_eos_locked (reader)) {
413           /* Finished measuring things, set state and tell the state change func
414            * so it can seek back to the start */
415           GST_LOG_OBJECT (reader,
416               "EOS while measuring streams. Resetting for ready");
417           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
418           SPLITMUX_PART_BROADCAST (reader);
419         }
420         goto drop_event;
421       }
422       break;
423     }
424     case GST_EVENT_FLUSH_START:
425       reader->flushing = TRUE;
426       part_pad->flushing = TRUE;
427       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
428           part_pad);
429       gst_data_queue_set_flushing (part_pad->queue, TRUE);
430       SPLITMUX_PART_BROADCAST (reader);
431       break;
432     case GST_EVENT_FLUSH_STOP:{
433       gst_data_queue_set_flushing (part_pad->queue, FALSE);
434       gst_data_queue_flush (part_pad->queue);
435       part_pad->seen_buffer = FALSE;
436       part_pad->flushing = FALSE;
437       part_pad->is_eos = FALSE;
438
439       reader->flushing = splitmux_is_flushing (reader);
440       GST_LOG_OBJECT (reader,
441           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
442           reader->path, pad, reader->flushing);
443       SPLITMUX_PART_BROADCAST (reader);
444       break;
445     }
446     default:
447       break;
448   }
449
450   /* Don't send events downstream while preparing */
451   if (reader->prep_state != PART_STATE_READY)
452     goto drop_event;
453
454   /* Don't pass flush events - those are done by the parent */
455   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
456       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
457     goto drop_event;
458
459   if (!block_until_can_push (reader))
460     goto drop_event;
461
462   switch (GST_EVENT_TYPE (event)) {
463     case GST_EVENT_GAP:{
464       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
465       goto drop_event;
466     }
467     default:
468       break;
469   }
470
471   /* We are active, and one queue is empty, place this buffer in
472    * the dataqueue */
473   gst_object_ref (part_pad->queue);
474   SPLITMUX_PART_UNLOCK (reader);
475
476   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
477   item = g_slice_new (GstDataQueueItem);
478   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
479   item->object = GST_MINI_OBJECT (event);
480   item->size = 0;
481   item->duration = 0;
482   if (item->duration == GST_CLOCK_TIME_NONE)
483     item->duration = 0;
484   item->visible = FALSE;
485
486   if (!gst_data_queue_push (part_pad->queue, item)) {
487     splitmux_part_free_queue_item (item);
488     ret = FALSE;
489   }
490
491   gst_object_unref (part_pad->queue);
492   gst_object_unref (target);
493
494   return ret;
495 wrong_segment:
496   gst_event_unref (event);
497   gst_object_unref (target);
498   SPLITMUX_PART_UNLOCK (reader);
499   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
500       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
501           reader->path, pad));
502   return FALSE;
503 drop_event:
504   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
505       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
506   gst_event_unref (event);
507   gst_object_unref (target);
508   SPLITMUX_PART_UNLOCK (reader);
509   return TRUE;
510 }
511
512 static gboolean
513 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
514 {
515   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
516   GstSplitMuxPartReader *reader = part_pad->reader;
517   GstPad *target;
518   gboolean ret = FALSE;
519   gboolean active;
520
521   SPLITMUX_PART_LOCK (reader);
522   target = gst_object_ref (part_pad->target);
523   active = reader->active;
524   SPLITMUX_PART_UNLOCK (reader);
525
526   if (active) {
527     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
528         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
529
530     ret = gst_pad_query (target, query);
531   }
532
533   gst_object_unref (target);
534
535   return ret;
536 }
537
538 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
539
540 static void
541 splitmux_part_pad_constructed (GObject * pad)
542 {
543   gst_pad_set_chain_function (GST_PAD (pad),
544       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
545   gst_pad_set_event_function (GST_PAD (pad),
546       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
547   gst_pad_set_query_function (GST_PAD (pad),
548       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
549
550   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
551 }
552
553 static void
554 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
555 {
556   GObjectClass *gobject_klass = (GObjectClass *) (klass);
557
558   gobject_klass->constructed = splitmux_part_pad_constructed;
559   gobject_klass->finalize = splitmux_part_pad_finalize;
560 }
561
562 static void
563 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
564 {
565   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
566       NULL, NULL, pad);
567   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
568   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
569 }
570
571 static void
572 splitmux_part_pad_finalize (GObject * obj)
573 {
574   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
575
576   GST_DEBUG_OBJECT (obj, "finalize");
577   gst_data_queue_set_flushing (pad->queue, TRUE);
578   gst_data_queue_flush (pad->queue);
579   gst_object_unref (GST_OBJECT_CAST (pad->queue));
580   pad->queue = NULL;
581
582   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
583 }
584
585 static void
586 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
587     GstSplitMuxPartReader * part);
588 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
589 static GstStateChangeReturn
590 gst_splitmux_part_reader_change_state (GstElement * element,
591     GstStateChange transition);
592 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
593     GstEvent * event);
594 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
595     * part, gboolean flushing);
596 static void bus_handler (GstBin * bin, GstMessage * msg);
597 static void splitmux_part_reader_dispose (GObject * object);
598 static void splitmux_part_reader_finalize (GObject * object);
599 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
600
601 #define gst_splitmux_part_reader_parent_class parent_class
602 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
603     GST_TYPE_PIPELINE);
604
605 static void
606 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
607 {
608   GObjectClass *gobject_klass = (GObjectClass *) (klass);
609   GstElementClass *gstelement_class = (GstElementClass *) klass;
610   GstBinClass *gstbin_class = (GstBinClass *) klass;
611
612   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
613       "Split File Demuxing Source helper");
614
615   gobject_klass->dispose = splitmux_part_reader_dispose;
616   gobject_klass->finalize = splitmux_part_reader_finalize;
617
618   part_reader_signals[SIGNAL_PREPARED] =
619       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
620       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
621           prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
622   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
623   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
624
625   gstbin_class->handle_message = bus_handler;
626 }
627
628 static void
629 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
630 {
631   GstElement *typefind;
632
633   reader->active = FALSE;
634   reader->duration = GST_CLOCK_TIME_NONE;
635
636   g_cond_init (&reader->inactive_cond);
637   g_mutex_init (&reader->lock);
638   g_mutex_init (&reader->type_lock);
639
640   /* FIXME: Create elements on a state change */
641   reader->src = gst_element_factory_make ("filesrc", NULL);
642   if (reader->src == NULL) {
643     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
644     return;
645   }
646   gst_bin_add (GST_BIN_CAST (reader), reader->src);
647
648   typefind = gst_element_factory_make ("typefind", NULL);
649   if (!typefind) {
650     GST_ERROR_OBJECT (reader,
651         "Failed to create typefind element - check your installation");
652     return;
653   }
654
655   gst_bin_add (GST_BIN_CAST (reader), typefind);
656   reader->typefind = typefind;
657
658   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
659     GST_ERROR_OBJECT (reader,
660         "Failed to link typefind element - check your installation");
661     return;
662   }
663
664   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
665       reader);
666 }
667
668 static void
669 splitmux_part_reader_dispose (GObject * object)
670 {
671   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
672
673   splitmux_part_reader_reset (reader);
674
675   G_OBJECT_CLASS (parent_class)->dispose (object);
676 }
677
678 static void
679 splitmux_part_reader_finalize (GObject * object)
680 {
681   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
682
683   g_cond_clear (&reader->inactive_cond);
684   g_mutex_clear (&reader->lock);
685   g_mutex_clear (&reader->type_lock);
686
687   g_free (reader->path);
688
689   G_OBJECT_CLASS (parent_class)->finalize (object);
690 }
691
692 static void
693 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
694 {
695   GList *cur;
696
697   SPLITMUX_PART_LOCK (reader);
698   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
699     GstPad *pad = GST_PAD_CAST (cur->data);
700     gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
701     gst_object_unref (GST_OBJECT_CAST (pad));
702   }
703
704   g_list_free (reader->pads);
705   reader->pads = NULL;
706   SPLITMUX_PART_UNLOCK (reader);
707 }
708
709 static GstSplitMuxPartPad *
710 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
711     GstPad * target)
712 {
713   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
714       "name", GST_PAD_NAME (target),
715       "direction", GST_PAD_SINK,
716       NULL);
717   pad->target = target;
718   pad->reader = reader;
719
720   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
721
722   return pad;
723 }
724
725 static void
726 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
727     GstSplitMuxPartReader * reader)
728 {
729   GstPad *out_pad = NULL;
730   GstSplitMuxPartPad *proxy_pad;
731   GstCaps *caps;
732   GstPadLinkReturn link_ret;
733
734   caps = gst_pad_get_current_caps (pad);
735
736   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
737       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
738
739   gst_caps_unref (caps);
740
741   /* Look up or create the output pad */
742   if (reader->get_pad_cb)
743     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
744   if (out_pad == NULL) {
745     GST_DEBUG_OBJECT (reader,
746         "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
747     return;
748   }
749
750   /* Create our proxy pad to interact with this new pad */
751   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
752   GST_DEBUG_OBJECT (reader,
753       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
754       proxy_pad, out_pad);
755
756   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
757   if (link_ret != GST_PAD_LINK_OK) {
758     gst_object_unref (proxy_pad);
759     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
760         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
761             " ret %d", reader->path, pad, link_ret));
762     return;
763   }
764   GST_DEBUG_OBJECT (reader,
765       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
766       pad, proxy_pad);
767
768   SPLITMUX_PART_LOCK (reader);
769   reader->pads = g_list_prepend (reader->pads, proxy_pad);
770   SPLITMUX_PART_UNLOCK (reader);
771 }
772
773 static gboolean
774 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
775 {
776   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
777   gboolean ret = FALSE;
778   GstPad *pad = NULL;
779
780   /* Send event to the first source pad we found */
781   SPLITMUX_PART_LOCK (reader);
782   if (reader->pads) {
783     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
784     pad = gst_pad_get_peer (proxy_pad);
785   }
786   SPLITMUX_PART_UNLOCK (reader);
787
788   if (pad) {
789     ret = gst_pad_send_event (pad, event);
790     gst_object_unref (pad);
791   } else {
792     gst_event_unref (event);
793   }
794
795   return ret;
796 }
797
798 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
799 static void
800 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
801     GstClockTime time)
802 {
803   SPLITMUX_PART_UNLOCK (reader);
804   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
805       GST_TIME_ARGS (time));
806   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
807       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
808       GST_SEEK_TYPE_END, 0);
809
810   SPLITMUX_PART_LOCK (reader);
811
812   /* Wait for flush to finish, so old data is gone */
813   while (reader->flushing) {
814     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
815     SPLITMUX_PART_WAIT (reader);
816   }
817 }
818
819 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
820 static gboolean
821 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
822     GstSegment * target_seg, GstSeekFlags extra_flags)
823 {
824   GstSeekFlags flags;
825   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
826
827   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
828
829   SPLITMUX_PART_LOCK (reader);
830   if (target_seg->start >= reader->start_offset)
831     start = target_seg->start - reader->start_offset;
832   /* If the segment stop is within this part, don't play to the end */
833   if (target_seg->stop != -1 &&
834       target_seg->stop < reader->start_offset + reader->duration)
835     stop = target_seg->stop - reader->start_offset;
836
837   SPLITMUX_PART_UNLOCK (reader);
838
839   GST_DEBUG_OBJECT (reader,
840       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
841       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
842       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
843
844   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
845       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
846       stop);
847 }
848
849 /* Called with lock held */
850 static void
851 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
852 {
853   /* Trigger a flushing seek to near the end of the file and run each stream
854    * to EOS in order to find the smallest end timestamp to start the next
855    * file from
856    */
857   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
858       && reader->duration > GST_SECOND) {
859     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
860     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
861   }
862
863   /* Wait for things to happen */
864   while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
865     SPLITMUX_PART_WAIT (reader);
866
867   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
868     /* Fire the prepared signal and go to READY state */
869     GST_DEBUG_OBJECT (reader,
870         "Stream measuring complete. File %s is now ready. Firing prepared signal",
871         reader->path);
872     reader->prep_state = PART_STATE_READY;
873     g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
874   }
875 }
876
877 static GstElement *
878 find_demuxer (GstCaps * caps)
879 {
880   GList *factories =
881       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
882       GST_RANK_MARGINAL);
883   GList *compat_elements;
884   GstElement *e = NULL;
885
886   if (factories == NULL)
887     return NULL;
888
889   compat_elements =
890       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
891
892   if (compat_elements) {
893     /* Just take the first (highest ranked) option */
894     GstElementFactory *factory =
895         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
896     e = gst_element_factory_create (factory, NULL);
897     gst_plugin_feature_list_free (compat_elements);
898   }
899
900   if (factories)
901     gst_plugin_feature_list_free (factories);
902
903   return e;
904 }
905
906 static void
907 type_found (GstElement * typefind, guint probability,
908     GstCaps * caps, GstSplitMuxPartReader * reader)
909 {
910   GstElement *demux;
911
912   GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
913
914   /* typefind found a type. Look for the demuxer to handle it */
915   demux = reader->demux = find_demuxer (caps);
916   if (reader->demux == NULL) {
917     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
918     return;
919   }
920
921   /* Connect to demux signals */
922   g_signal_connect (demux,
923       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
924   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
925
926   gst_element_set_locked_state (demux, TRUE);
927   gst_bin_add (GST_BIN_CAST (reader), demux);
928   gst_element_link_pads (reader->typefind, "src", demux, NULL);
929   gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
930   gst_element_set_locked_state (demux, FALSE);
931 }
932
933 static void
934 check_if_pads_collected (GstSplitMuxPartReader * reader)
935 {
936   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
937     /* Check we have all pads and each pad has seen a buffer */
938     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
939       GST_DEBUG_OBJECT (reader,
940           "no more pads - file %s. Measuring stream length", reader->path);
941       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
942       SPLITMUX_PART_BROADCAST (reader);
943     }
944   }
945 }
946
947 static void
948 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
949 {
950   GstClockTime duration = GST_CLOCK_TIME_NONE;
951   GList *cur;
952   /* Query the minimum duration of any pad in this piece and store it.
953    * FIXME: Only consider audio and video */
954   SPLITMUX_PART_LOCK (reader);
955   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
956     GstPad *target = GST_PAD_CAST (cur->data);
957     if (target) {
958       gint64 cur_duration;
959       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
960         GST_INFO_OBJECT (reader,
961             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
962             reader->path, target, GST_TIME_ARGS (cur_duration));
963         if (cur_duration < duration)
964           duration = cur_duration;
965       }
966     }
967   }
968   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
969       reader->path, GST_TIME_ARGS (duration));
970   reader->duration = (GstClockTime) duration;
971
972   reader->no_more_pads = TRUE;
973
974   check_if_pads_collected (reader);
975   SPLITMUX_PART_UNLOCK (reader);
976 }
977
978 gboolean
979 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
980     GstPad * src_pad, GstQuery * query)
981 {
982   GstPad *target = NULL;
983   gboolean ret;
984   GList *cur;
985
986   SPLITMUX_PART_LOCK (part);
987   /* Find the pad corresponding to the visible output target pad */
988   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
989     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
990     if (part_pad->target == src_pad) {
991       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
992       break;
993     }
994   }
995   SPLITMUX_PART_UNLOCK (part);
996
997   if (target == NULL)
998     return FALSE;
999
1000   ret = gst_pad_peer_query (target, query);
1001
1002   if (ret == FALSE)
1003     goto out;
1004
1005   /* Post-massaging of queries */
1006   switch (GST_QUERY_TYPE (query)) {
1007     case GST_QUERY_POSITION:{
1008       GstFormat fmt;
1009       gint64 position;
1010
1011       gst_query_parse_position (query, &fmt, &position);
1012       if (fmt != GST_FORMAT_TIME)
1013         return FALSE;
1014       SPLITMUX_PART_LOCK (part);
1015       position += part->start_offset;
1016       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1017           GST_TIME_ARGS (position));
1018       SPLITMUX_PART_UNLOCK (part);
1019
1020       gst_query_set_position (query, fmt, position);
1021       break;
1022     }
1023     default:
1024       break;
1025   }
1026
1027 out:
1028   gst_object_unref (target);
1029   return ret;
1030 }
1031
1032 static GstStateChangeReturn
1033 gst_splitmux_part_reader_change_state (GstElement * element,
1034     GstStateChange transition)
1035 {
1036   GstStateChangeReturn ret;
1037   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1038
1039   switch (transition) {
1040     case GST_STATE_CHANGE_NULL_TO_READY:{
1041       break;
1042     }
1043     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1044       /* Hold the splitmux type lock until after the
1045        * parent state change function has finished
1046        * changing the states of things, and type finding can continue */
1047       SPLITMUX_PART_LOCK (reader);
1048       g_object_set (reader->src, "location", reader->path, NULL);
1049       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1050       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1051       reader->running = TRUE;
1052       SPLITMUX_PART_UNLOCK (reader);
1053       SPLITMUX_PART_TYPE_LOCK (reader);
1054       break;
1055     }
1056     case GST_STATE_CHANGE_READY_TO_NULL:
1057     case GST_STATE_CHANGE_PAUSED_TO_READY:
1058       SPLITMUX_PART_LOCK (reader);
1059       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1060       reader->running = FALSE;
1061       SPLITMUX_PART_BROADCAST (reader);
1062       SPLITMUX_PART_UNLOCK (reader);
1063       break;
1064     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1065       SPLITMUX_PART_LOCK (reader);
1066       reader->active = FALSE;
1067       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1068       SPLITMUX_PART_BROADCAST (reader);
1069       SPLITMUX_PART_UNLOCK (reader);
1070       break;
1071     default:
1072       break;
1073   }
1074
1075   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1076   if (ret == GST_STATE_CHANGE_FAILURE) {
1077     if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
1078       /* Make sure to release the lock we took above */
1079       SPLITMUX_PART_TYPE_UNLOCK (reader);
1080     }
1081     goto beach;
1082   }
1083
1084   switch (transition) {
1085     case GST_STATE_CHANGE_READY_TO_PAUSED:
1086       /* Sleep and wait until all streams have been collected, then do the seeks
1087        * to measure the stream lengths. This took the type lock above,
1088        * but it's OK to release it now and let typefinding happen... */
1089       SPLITMUX_PART_TYPE_UNLOCK (reader);
1090
1091       SPLITMUX_PART_LOCK (reader);
1092
1093       while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1094         GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1095         SPLITMUX_PART_WAIT (reader);
1096       }
1097
1098       if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
1099           reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
1100         gst_splitmux_part_reader_measure_streams (reader);
1101       } else if (reader->prep_state == PART_STATE_FAILED)
1102         ret = GST_STATE_CHANGE_FAILURE;
1103       SPLITMUX_PART_UNLOCK (reader);
1104       break;
1105     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1106       SPLITMUX_PART_LOCK (reader);
1107       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1108       reader->active = TRUE;
1109       SPLITMUX_PART_BROADCAST (reader);
1110       SPLITMUX_PART_UNLOCK (reader);
1111       break;
1112     case GST_STATE_CHANGE_READY_TO_NULL:
1113       reader->prep_state = PART_STATE_NULL;
1114       splitmux_part_reader_reset (reader);
1115       break;
1116     default:
1117       break;
1118   }
1119
1120 beach:
1121   return ret;
1122 }
1123
1124 static gboolean
1125 check_bus_messages (GstSplitMuxPartReader * part)
1126 {
1127   gboolean ret = FALSE;
1128   GstBus *bus;
1129   GstMessage *m;
1130
1131   bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1132   while ((m = gst_bus_pop (bus)) != NULL) {
1133     if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1134       GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1135       gst_message_unref (m);
1136       goto done;
1137     }
1138     gst_message_unref (m);
1139   }
1140   ret = TRUE;
1141 done:
1142   gst_object_unref (bus);
1143   return ret;
1144 }
1145
1146 gboolean
1147 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1148 {
1149   GstStateChangeReturn ret;
1150
1151   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1152
1153   if (ret != GST_STATE_CHANGE_SUCCESS)
1154     return FALSE;
1155
1156   return check_bus_messages (part);
1157 }
1158
1159 void
1160 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1161 {
1162   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1163 }
1164
1165 void
1166 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1167     const gchar * path)
1168 {
1169   reader->path = g_strdup (path);
1170 }
1171
1172 gboolean
1173 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1174     GstSegment * seg, GstSeekFlags extra_flags)
1175 {
1176   GST_DEBUG_OBJECT (reader, "Activating part reader");
1177
1178   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
1179     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1180         seg);
1181     return FALSE;
1182   }
1183   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1184           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1185     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1186     return FALSE;
1187   }
1188   return TRUE;
1189 }
1190
1191 gboolean
1192 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1193 {
1194   gboolean ret;
1195
1196   SPLITMUX_PART_LOCK (part);
1197   ret = part->active;
1198   SPLITMUX_PART_UNLOCK (part);
1199
1200   return ret;
1201 }
1202
1203 void
1204 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1205 {
1206   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1207   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1208 }
1209
1210 void
1211 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1212     gboolean flushing)
1213 {
1214   GList *cur;
1215
1216   GST_LOG_OBJECT (reader, "%s dataqueues",
1217       flushing ? "Flushing" : "Done flushing");
1218   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1219     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1220     gst_data_queue_set_flushing (part_pad->queue, flushing);
1221     if (flushing)
1222       gst_data_queue_flush (part_pad->queue);
1223   }
1224 };
1225
1226 void
1227 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1228     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1229 {
1230   reader->cb_data = cb_data;
1231   reader->get_pad_cb = get_pad_cb;
1232 }
1233
1234 GstClockTime
1235 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1236 {
1237   GList *cur;
1238   GstClockTime ret = GST_CLOCK_TIME_NONE;
1239
1240   SPLITMUX_PART_LOCK (reader);
1241   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1242     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1243     if (!part_pad->is_sparse && part_pad->max_ts < ret)
1244       ret = part_pad->max_ts;
1245   }
1246
1247   SPLITMUX_PART_UNLOCK (reader);
1248
1249   return ret;
1250 }
1251
1252 void
1253 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1254     GstClockTime offset)
1255 {
1256   SPLITMUX_PART_LOCK (reader);
1257   reader->start_offset = offset;
1258   GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1259       GST_TIME_ARGS (offset));
1260   SPLITMUX_PART_UNLOCK (reader);
1261 }
1262
1263 GstClockTime
1264 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1265 {
1266   GstClockTime ret = GST_CLOCK_TIME_NONE;
1267
1268   SPLITMUX_PART_LOCK (reader);
1269   ret = reader->start_offset;
1270   SPLITMUX_PART_UNLOCK (reader);
1271
1272   return ret;
1273 }
1274
1275 GstClockTime
1276 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1277 {
1278   GstClockTime dur;
1279
1280   SPLITMUX_PART_LOCK (reader);
1281   dur = reader->duration;
1282   SPLITMUX_PART_UNLOCK (reader);
1283
1284   return dur;
1285 }
1286
1287 GstPad *
1288 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1289     GstPad * target)
1290 {
1291   GstPad *result = NULL;
1292   GList *cur;
1293
1294   SPLITMUX_PART_LOCK (reader);
1295   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1296     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1297     if (part_pad->target == target) {
1298       result = (GstPad *) gst_object_ref (part_pad);
1299       break;
1300     }
1301   }
1302   SPLITMUX_PART_UNLOCK (reader);
1303
1304   return result;
1305 }
1306
1307 GstFlowReturn
1308 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1309     GstDataQueueItem ** item)
1310 {
1311   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1312   GstDataQueue *q;
1313   GstFlowReturn ret;
1314
1315   /* Get one item from the appropriate dataqueue */
1316   SPLITMUX_PART_LOCK (reader);
1317   if (reader->prep_state == PART_STATE_FAILED) {
1318     SPLITMUX_PART_UNLOCK (reader);
1319     return GST_FLOW_ERROR;
1320   }
1321
1322   q = gst_object_ref (part_pad->queue);
1323
1324   /* Have to drop the lock around pop, so we can be woken up for flush */
1325   SPLITMUX_PART_UNLOCK (reader);
1326   if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1327     ret = GST_FLOW_FLUSHING;
1328     goto out;
1329   }
1330
1331   SPLITMUX_PART_LOCK (reader);
1332
1333   SPLITMUX_PART_BROADCAST (reader);
1334   if (GST_IS_EVENT ((*item)->object)) {
1335     GstEvent *e = (GstEvent *) ((*item)->object);
1336     /* Mark this pad as EOS */
1337     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1338       part_pad->is_eos = TRUE;
1339   }
1340
1341   SPLITMUX_PART_UNLOCK (reader);
1342
1343   ret = GST_FLOW_OK;
1344 out:
1345   gst_object_unref (q);
1346   return ret;
1347 }
1348
1349 static void
1350 bus_handler (GstBin * bin, GstMessage * message)
1351 {
1352   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1353
1354   switch (GST_MESSAGE_TYPE (message)) {
1355     case GST_MESSAGE_ERROR:
1356       /* Make sure to set the state to failed and wake up the listener
1357        * on error */
1358       SPLITMUX_PART_LOCK (reader);
1359       GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
1360           " marking this reader as failed", GST_MESSAGE_SRC (message));
1361       reader->prep_state = PART_STATE_FAILED;
1362       SPLITMUX_PART_BROADCAST (reader);
1363       SPLITMUX_PART_UNLOCK (reader);
1364       break;
1365     default:
1366       break;
1367   }
1368
1369   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1370 }