splitmux: Implement new elements for splitting files at mux level.
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxpartreader.c
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 enum
38 {
39   SIGNAL_PREPARED,
40   LAST_SIGNAL
41 };
42
43 static guint part_reader_signals[LAST_SIGNAL] = { 0 };
44
45 typedef struct _GstSplitMuxPartPad
46 {
47   GstPad parent;
48
49   /* Reader we belong to */
50   GstSplitMuxPartReader *reader;
51   /* Output splitmuxsrc source pad */
52   GstPad *target;
53
54   GstDataQueue *queue;
55
56   gboolean is_eos;
57   gboolean flushing;
58   gboolean seen_buffer;
59
60   GstClockTime max_ts;
61   GstSegment segment;
62
63   GstSegment orig_segment;
64   GstClockTime initial_ts_offset;
65 } GstSplitMuxPartPad;
66
67 typedef struct _GstSplitMuxPartPadClass
68 {
69   GstPadClass parent;
70 } GstSplitMuxPartPadClass;
71
72 static GType gst_splitmux_part_pad_get_type (void);
73 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
74 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
75
76 static void splitmux_part_pad_constructed (GObject * pad);
77 static void splitmux_part_pad_finalize (GObject * pad);
78 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
79     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
80
81 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
82     guint visible, guint bytes, guint64 time, gpointer checkdata);
83 static void type_found (GstElement * typefind, guint probability,
84     GstCaps * caps, GstSplitMuxPartReader * reader);
85 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
86
87 /* Called with reader lock held */
88 static gboolean
89 have_empty_queue (GstSplitMuxPartReader * reader)
90 {
91   GList *cur;
92
93   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
94     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
95     if (part_pad->is_eos) {
96       GST_LOG_OBJECT (part_pad, "Pad is EOS");
97       return TRUE;
98     }
99     if (gst_data_queue_is_empty (part_pad->queue)) {
100       GST_LOG_OBJECT (part_pad, "Queue is empty");
101       return TRUE;
102     }
103   }
104
105   return FALSE;
106 }
107
108 /* Called with reader lock held */
109 static gboolean
110 block_until_can_push (GstSplitMuxPartReader * reader)
111 {
112   while (reader->running) {
113     if (reader->flushing)
114       goto out;
115     if (reader->active && have_empty_queue (reader))
116       goto out;
117
118     GST_LOG_OBJECT (reader,
119         "Waiting for activation or empty queue on reader %s", reader->path);
120     SPLITMUX_PART_WAIT (reader);
121   }
122
123   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
124       reader->path, reader->active, reader->flushing);
125 out:
126   return reader->active && !reader->flushing;
127 }
128
129 static void
130 handle_buffer_measuring (GstSplitMuxPartReader * reader,
131     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
132 {
133   GstClockTime ts = GST_CLOCK_TIME_NONE;
134   GstClockTimeDiff offset;
135
136   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
137       !part_pad->seen_buffer) {
138     /* If this is the first buffer on the pad in the collect_streams state,
139      * then calculate inital offset based on running time of this segment */
140     part_pad->initial_ts_offset =
141         part_pad->orig_segment.start + part_pad->orig_segment.base -
142         part_pad->orig_segment.time;
143     GST_DEBUG_OBJECT (reader,
144         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
145         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
146   }
147   part_pad->seen_buffer = TRUE;
148
149   /* Adjust buffer timestamps */
150   offset = reader->start_offset + part_pad->segment.base;
151   offset -= part_pad->initial_ts_offset;
152
153   /* Update the stored max duration on the pad,
154    * always preferring making DTS contiguous
155    * where possible */
156   if (GST_BUFFER_DTS_IS_VALID (buf))
157     ts = GST_BUFFER_DTS (buf) + offset;
158   else if (GST_BUFFER_PTS_IS_VALID (buf))
159     ts = GST_BUFFER_PTS (buf) + offset;
160
161   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
162       " incoming PTS %" GST_TIME_FORMAT
163       " DTS %" GST_TIME_FORMAT " offset by %" GST_TIME_FORMAT
164       " to %" GST_TIME_FORMAT, part_pad,
165       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
166       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
167       GST_TIME_ARGS (offset), GST_TIME_ARGS (ts));
168
169   if (GST_CLOCK_TIME_IS_VALID (ts)) {
170     if (GST_BUFFER_DURATION_IS_VALID (buf))
171       ts += GST_BUFFER_DURATION (buf);
172
173     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
174       part_pad->max_ts = ts;
175       GST_LOG_OBJECT (reader,
176           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
177           GST_TIME_ARGS (part_pad->max_ts));
178     }
179   }
180   /* Is it time to move to measuring state yet? */
181   check_if_pads_collected (reader);
182 }
183
184 static gboolean
185 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
186     guint visible, guint bytes, guint64 time, gpointer checkdata)
187 {
188   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
189   if (time > 20 * GST_SECOND)
190     return TRUE;
191   return FALSE;
192 }
193
194 static void
195 splitmux_part_free_queue_item (GstDataQueueItem * item)
196 {
197   gst_mini_object_unref (item->object);
198   g_slice_free (GstDataQueueItem, item);
199 }
200
201 static GstFlowReturn
202 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
203 {
204   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
205   GstSplitMuxPartReader *reader = part_pad->reader;
206   GstDataQueueItem *item;
207   GstClockTimeDiff offset;
208
209   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
210   SPLITMUX_PART_LOCK (reader);
211
212   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
213       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
214     handle_buffer_measuring (reader, part_pad, buf);
215     gst_buffer_unref (buf);
216     SPLITMUX_PART_UNLOCK (reader);
217     return GST_FLOW_OK;
218   }
219
220   if (!block_until_can_push (reader)) {
221     /* Flushing */
222     SPLITMUX_PART_UNLOCK (reader);
223     gst_buffer_unref (buf);
224     return GST_FLOW_FLUSHING;
225   }
226
227   /* Adjust buffer timestamps */
228   offset = reader->start_offset + part_pad->segment.base;
229   offset -= part_pad->initial_ts_offset;
230
231   if (GST_BUFFER_PTS_IS_VALID (buf))
232     GST_BUFFER_PTS (buf) += offset;
233   if (GST_BUFFER_DTS_IS_VALID (buf))
234     GST_BUFFER_DTS (buf) += offset;
235
236   /* We are active, and one queue is empty, place this buffer in
237    * the dataqueue */
238   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
239   item = g_slice_new (GstDataQueueItem);
240   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
241   item->object = GST_MINI_OBJECT (buf);
242   item->size = gst_buffer_get_size (buf);
243   item->duration = GST_BUFFER_DURATION (buf);
244   if (item->duration == GST_CLOCK_TIME_NONE)
245     item->duration = 0;
246   item->visible = TRUE;
247
248   gst_object_ref (part_pad);
249
250   SPLITMUX_PART_UNLOCK (reader);
251
252   if (!gst_data_queue_push (part_pad->queue, item)) {
253     splitmux_part_free_queue_item (item);
254     gst_object_unref (part_pad);
255     return GST_FLOW_FLUSHING;
256   }
257
258   gst_object_unref (part_pad);
259   return GST_FLOW_OK;
260 }
261
262 /* Called with splitmux part lock held */
263 static gboolean
264 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
265 {
266   GList *cur;
267   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
268     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
269     if (!part_pad->is_eos)
270       return FALSE;
271   }
272
273   return TRUE;
274 }
275
276 static gboolean
277 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
278 {
279   GList *cur;
280   GST_LOG_OBJECT (part, "Checking for preroll");
281   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
282     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
283     if (!part_pad->seen_buffer) {
284       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
285           part_pad);
286       return FALSE;
287     }
288   }
289   GST_LOG_OBJECT (part, "Part is prerolled");
290   return TRUE;
291 }
292
293
294 gboolean
295 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
296 {
297   gboolean res;
298
299   SPLITMUX_PART_LOCK (reader);
300   res = splitmux_part_is_eos_locked (reader);
301   SPLITMUX_PART_UNLOCK (reader);
302
303   return res;
304 }
305
306 /* Called with splitmux part lock held */
307 static gboolean
308 splitmux_is_flushing (GstSplitMuxPartReader * reader)
309 {
310   GList *cur;
311   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
312     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
313     if (part_pad->flushing)
314       return TRUE;
315   }
316
317   return FALSE;
318 }
319
320 static gboolean
321 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
322 {
323   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
324   GstSplitMuxPartReader *reader = part_pad->reader;
325   gboolean ret = TRUE;
326   SplitMuxSrcPad *target;
327   GstDataQueueItem *item;
328
329   SPLITMUX_PART_LOCK (reader);
330
331   target = gst_object_ref (part_pad->target);
332
333   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
334       event);
335
336   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
337     goto drop_event;
338
339   switch (GST_EVENT_TYPE (event)) {
340     case GST_EVENT_SEGMENT:{
341       GstSegment *seg = &part_pad->segment;
342
343       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
344
345       gst_event_copy_segment (event, seg);
346       gst_event_copy_segment (event, &part_pad->orig_segment);
347
348       if (seg->format != GST_FORMAT_TIME)
349         goto wrong_segment;
350
351       /* Adjust segment */
352       /* Adjust start/stop so the overall file is 0 + start_offset based */
353       if (seg->stop != -1) {
354         seg->stop -= seg->start;
355         seg->stop += seg->time + reader->start_offset;
356       }
357       seg->start = seg->time + reader->start_offset;
358       seg->time += reader->start_offset;
359       seg->position += reader->start_offset;
360
361       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
362
363       /* Replace event */
364       gst_event_unref (event);
365       event = gst_event_new_segment (seg);
366
367       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
368           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
369         break;                  /* Only do further stuff with segments during initial measuring */
370
371       /* Take the first segment from the first part */
372       if (target->segment.format == GST_FORMAT_UNDEFINED) {
373         gst_segment_copy_into (seg, &target->segment);
374         GST_DEBUG_OBJECT (reader,
375             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
376       }
377
378       if (seg->stop != -1 && target->segment.stop != -1) {
379         GstClockTime stop = seg->base + seg->stop;
380         if (stop > target->segment.stop) {
381           target->segment.stop = stop;
382           GST_DEBUG_OBJECT (reader,
383               "Adjusting segment stop by %" GST_TIME_FORMAT
384               " output now %" GST_SEGMENT_FORMAT,
385               GST_TIME_ARGS (reader->start_offset), &target->segment);
386         }
387       }
388       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
389       break;
390     }
391     case GST_EVENT_EOS:{
392
393       GST_DEBUG_OBJECT (part_pad,
394           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
395           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
396
397       if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
398         /* Mark this pad as EOS */
399         part_pad->is_eos = TRUE;
400         if (splitmux_part_is_eos_locked (reader)) {
401           /* Finished measuring things, set state and tell the state change func
402            * so it can seek back to the start */
403           GST_LOG_OBJECT (reader, "EOS while measuring streams");
404           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
405           SPLITMUX_PART_BROADCAST (reader);
406         }
407         goto drop_event;
408       }
409       break;
410     }
411     case GST_EVENT_FLUSH_START:
412       reader->flushing = TRUE;
413       part_pad->flushing = TRUE;
414       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
415           part_pad);
416       gst_data_queue_set_flushing (part_pad->queue, TRUE);
417       SPLITMUX_PART_BROADCAST (reader);
418       break;
419     case GST_EVENT_FLUSH_STOP:{
420       gst_data_queue_set_flushing (part_pad->queue, FALSE);
421       gst_data_queue_flush (part_pad->queue);
422       part_pad->seen_buffer = FALSE;
423       part_pad->flushing = FALSE;
424       part_pad->is_eos = FALSE;
425
426       reader->flushing = splitmux_is_flushing (reader);
427       GST_LOG_OBJECT (reader,
428           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
429           reader->path, pad, reader->flushing);
430       SPLITMUX_PART_BROADCAST (reader);
431       break;
432     }
433     default:
434       break;
435   }
436
437   /* Don't send events downstream while preparing */
438   if (reader->prep_state != PART_STATE_READY)
439     goto drop_event;
440
441   /* Don't pass flush events - those are done by the parent */
442   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
443       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
444     goto drop_event;
445
446   if (!block_until_can_push (reader)) {
447     SPLITMUX_PART_UNLOCK (reader);
448     gst_object_unref (target);
449     gst_event_unref (event);
450     return FALSE;
451   }
452
453   switch (GST_EVENT_TYPE (event)) {
454     case GST_EVENT_GAP:{
455       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
456       goto drop_event;
457     }
458     default:
459       break;
460   }
461
462   /* We are active, and one queue is empty, place this buffer in
463    * the dataqueue */
464   gst_object_ref (part_pad->queue);
465   SPLITMUX_PART_UNLOCK (reader);
466
467   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
468   item = g_slice_new (GstDataQueueItem);
469   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
470   item->object = GST_MINI_OBJECT (event);
471   item->size = 0;
472   item->duration = 0;
473   if (item->duration == GST_CLOCK_TIME_NONE)
474     item->duration = 0;
475   item->visible = FALSE;
476
477   if (!gst_data_queue_push (part_pad->queue, item)) {
478     splitmux_part_free_queue_item (item);
479     ret = FALSE;
480   }
481
482   gst_object_unref (part_pad->queue);
483
484   return ret;
485 wrong_segment:
486   gst_event_unref (event);
487   SPLITMUX_PART_UNLOCK (reader);
488   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
489       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
490           reader->path, pad));
491   return FALSE;
492 drop_event:
493   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
494       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
495   gst_event_unref (event);
496   SPLITMUX_PART_UNLOCK (reader);
497   return TRUE;
498 }
499
500 static gboolean
501 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
502 {
503   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
504   GstSplitMuxPartReader *reader = part_pad->reader;
505   GstPad *target;
506   gboolean ret = FALSE;
507   gboolean active;
508
509   SPLITMUX_PART_LOCK (reader);
510   target = gst_object_ref (part_pad->target);
511   active = reader->active;
512   SPLITMUX_PART_UNLOCK (reader);
513
514   if (active) {
515     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
516         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
517
518     ret = gst_pad_query (target, query);
519   }
520
521   gst_object_unref (target);
522
523   return ret;
524 }
525
526 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
527
528 static void
529 splitmux_part_pad_constructed (GObject * pad)
530 {
531   gst_pad_set_chain_function (GST_PAD (pad),
532       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
533   gst_pad_set_event_function (GST_PAD (pad),
534       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
535   gst_pad_set_query_function (GST_PAD (pad),
536       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
537 }
538
539 static void
540 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
541 {
542   GObjectClass *gobject_klass = (GObjectClass *) (klass);
543
544   gobject_klass->constructed = splitmux_part_pad_constructed;
545   gobject_klass->finalize = splitmux_part_pad_finalize;
546 }
547
548 static void
549 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
550 {
551   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
552       NULL, NULL, pad);
553   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
554   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
555 }
556
557 static void
558 splitmux_part_pad_finalize (GObject * obj)
559 {
560   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
561   g_object_unref ((GObject *) (pad->queue));
562 }
563
564 static void
565 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
566     GstSplitMuxPartReader * part);
567 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
568 static GstStateChangeReturn
569 gst_splitmux_part_reader_change_state (GstElement * element,
570     GstStateChange transition);
571 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
572     GstEvent * event);
573 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
574     * part, gboolean flushing);
575 static void bus_handler (GstBin * bin, GstMessage * msg);
576
577 #define gst_splitmux_part_reader_parent_class parent_class
578 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
579     GST_TYPE_PIPELINE);
580
581 static void
582 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
583 {
584   GstElementClass *gstelement_class = (GstElementClass *) klass;
585   GstBinClass *gstbin_class = (GstBinClass *) klass;
586
587   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
588       "Split File Demuxing Source helper");
589
590   part_reader_signals[SIGNAL_PREPARED] =
591       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
592       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
593           prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
594   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
595   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
596
597   gstbin_class->handle_message = bus_handler;
598 }
599
600 static void
601 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
602 {
603   GstElement *typefind;
604
605   reader->active = FALSE;
606   reader->duration = GST_CLOCK_TIME_NONE;
607
608   g_cond_init (&reader->inactive_cond);
609   g_mutex_init (&reader->lock);
610
611   /* FIXME: Create elements on a state change */
612   reader->src = gst_element_factory_make ("filesrc", NULL);
613   if (reader->src == NULL) {
614     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
615     return;
616   }
617   gst_bin_add (GST_BIN_CAST (reader), reader->src);
618
619   typefind = gst_element_factory_make ("typefind", NULL);
620   if (!typefind) {
621     GST_ERROR_OBJECT (reader,
622         "Failed to create typefind element - check your installation");
623     return;
624   }
625
626   gst_bin_add (GST_BIN_CAST (reader), typefind);
627   reader->typefind = typefind;
628
629   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
630     GST_ERROR_OBJECT (reader,
631         "Failed to link typefind element - check your installation");
632     return;
633   }
634
635   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
636       reader);
637 }
638
639 static GstSplitMuxPartPad *
640 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
641     GstPad * target)
642 {
643   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
644       "name", GST_PAD_NAME (target),
645       "direction", GST_PAD_SINK,
646       NULL);
647   pad->target = target;
648   pad->reader = reader;
649
650   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
651
652   return pad;
653 }
654
655 static void
656 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
657     GstSplitMuxPartReader * reader)
658 {
659   GstPad *out_pad = NULL;
660   GstSplitMuxPartPad *proxy_pad;
661   GstCaps *caps;
662   GstPadLinkReturn link_ret;
663
664   caps = gst_pad_get_current_caps (pad);
665
666   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
667       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
668   /* Look up or create the output pad */
669   if (reader->get_pad_cb)
670     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
671   if (out_pad == NULL)
672     return;
673
674   /* Create our proxy pad to interact with this new pad */
675   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
676   GST_DEBUG_OBJECT (reader,
677       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
678       proxy_pad, out_pad);
679
680   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
681   if (link_ret != GST_PAD_LINK_OK) {
682     gst_object_unref (proxy_pad);
683     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
684         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
685             " ret %d", reader->path, pad, link_ret));
686     return;
687   }
688   GST_DEBUG_OBJECT (reader,
689       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
690       pad, proxy_pad);
691
692   SPLITMUX_PART_LOCK (reader);
693   reader->pads = g_list_prepend (reader->pads, proxy_pad);
694   SPLITMUX_PART_UNLOCK (reader);
695 }
696
697 static gboolean
698 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
699 {
700   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
701   gboolean ret = FALSE;
702   GstPad *pad = NULL;
703
704   /* Send event to the first source pad we found */
705   SPLITMUX_PART_LOCK (reader);
706   if (reader->pads) {
707     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
708     pad = gst_pad_get_peer (proxy_pad);
709   }
710   SPLITMUX_PART_UNLOCK (reader);
711
712   if (pad) {
713     ret = gst_pad_send_event (pad, event);
714     gst_object_unref (pad);
715   } else {
716     gst_event_unref (event);
717   }
718
719   return ret;
720 }
721
722 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
723 static void
724 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
725     GstClockTime time)
726 {
727   SPLITMUX_PART_UNLOCK (reader);
728   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
729       GST_TIME_ARGS (time));
730   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
731       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
732       GST_SEEK_TYPE_END, 0);
733
734   SPLITMUX_PART_LOCK (reader);
735
736   /* Wait for flush to finish, so old data is gone */
737   while (reader->flushing) {
738     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
739     SPLITMUX_PART_WAIT (reader);
740   }
741 }
742
743 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
744 static gboolean
745 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
746     GstSegment * target_seg)
747 {
748   GstSeekFlags flags;
749   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
750
751   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
752
753   SPLITMUX_PART_LOCK (reader);
754   if (target_seg->start >= reader->start_offset)
755     start = target_seg->start - reader->start_offset;
756   /* If the segment stop is within this part, don't play to the end */
757   if (target_seg->stop != -1 &&
758       target_seg->stop < reader->start_offset + reader->duration)
759     stop = target_seg->stop - reader->start_offset;
760
761   SPLITMUX_PART_UNLOCK (reader);
762
763   GST_DEBUG_OBJECT (reader,
764       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
765       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
766       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
767
768   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
769       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
770       stop);
771 }
772
773 /* Called with lock held */
774 static void
775 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
776 {
777   /* Trigger a flushing seek to near the end of the file and run each stream
778    * to EOS in order to find the smallest end timestamp to start the next
779    * file from
780    */
781   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
782       && reader->duration > GST_SECOND) {
783     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
784     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
785   }
786
787   /* Wait for things to happen */
788   while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
789     SPLITMUX_PART_WAIT (reader);
790
791   /* Seek back to the start now */
792   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
793     /* Fire the prepared signal */
794     GST_DEBUG_OBJECT (reader,
795         "Stream measuring complete. File %s is now ready. Firing prepared signal",
796         reader->path);
797     reader->prep_state = PART_STATE_READY;
798     g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
799   }
800 }
801
802 static GstElement *
803 find_demuxer (GstCaps * caps)
804 {
805   GList *factories =
806       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
807       GST_RANK_MARGINAL);
808   GList *compat_elements;
809   GstElement *e = NULL;
810
811   if (factories == NULL)
812     return NULL;
813
814   compat_elements =
815       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
816
817   if (compat_elements) {
818     /* Just take the first (highest ranked) option */
819     GstElementFactory *factory =
820         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
821     e = gst_element_factory_create (factory, NULL);
822     gst_plugin_feature_list_free (compat_elements);
823   }
824
825   if (factories)
826     gst_plugin_feature_list_free (factories);
827
828   return e;
829 }
830
831 static void
832 type_found (GstElement * typefind, guint probability,
833     GstCaps * caps, GstSplitMuxPartReader * reader)
834 {
835   GstElement *demux;
836
837   GST_WARNING ("Got type %" GST_PTR_FORMAT, caps);
838
839   /* typefind found a type. Look for the demuxer to handle it */
840   demux = reader->demux = find_demuxer (caps);
841   if (reader->demux == NULL) {
842     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
843     return;
844   }
845
846   gst_bin_add (GST_BIN_CAST (reader), demux);
847   gst_element_link_pads (reader->typefind, "src", demux, NULL);
848   gst_element_set_state (reader->demux, GST_STATE_PLAYING);
849
850   /* Connect to demux signals */
851   g_signal_connect (demux,
852       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
853   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
854 }
855
856 static void
857 check_if_pads_collected (GstSplitMuxPartReader * reader)
858 {
859   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
860     /* Check we have all pads and each pad has seen a buffer */
861     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
862       GST_DEBUG_OBJECT (reader,
863           "no more pads - file %s. Measuring stream length", reader->path);
864       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
865       SPLITMUX_PART_BROADCAST (reader);
866     }
867   }
868 }
869
870 static void
871 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
872 {
873   GstClockTime duration = GST_CLOCK_TIME_NONE;
874   GList *cur;
875   /* Query the minimum duration of any pad in this piece and store it.
876    * FIXME: Only consider audio and video */
877   SPLITMUX_PART_LOCK (reader);
878   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
879     GstPad *target = GST_PAD_CAST (cur->data);
880     if (target) {
881       gint64 cur_duration;
882       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
883         GST_INFO_OBJECT (reader,
884             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
885             reader->path, target, GST_TIME_ARGS (cur_duration));
886         if (cur_duration < duration)
887           duration = cur_duration;
888       }
889     }
890   }
891   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
892       reader->path, GST_TIME_ARGS (duration));
893   reader->duration = (GstClockTime) duration;
894
895   reader->no_more_pads = TRUE;
896
897   check_if_pads_collected (reader);
898   SPLITMUX_PART_UNLOCK (reader);
899 }
900
901 gboolean
902 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
903     GstPad * src_pad, GstQuery * query)
904 {
905   GstPad *target = NULL;
906   gboolean ret;
907   GList *cur;
908
909   SPLITMUX_PART_LOCK (part);
910   /* Find the pad corresponding to the visible output target pad */
911   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
912     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
913     if (part_pad->target == src_pad) {
914       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
915       break;
916     }
917   }
918   SPLITMUX_PART_UNLOCK (part);
919
920   if (target == NULL)
921     return FALSE;
922
923   ret = gst_pad_peer_query (target, query);
924   gst_object_unref (GST_OBJECT_CAST (target));
925
926   if (ret == FALSE)
927     return ret;
928
929   /* Post-massaging of queries */
930   switch (GST_QUERY_TYPE (query)) {
931     case GST_QUERY_POSITION:{
932       GstFormat fmt;
933       gint64 position;
934
935       gst_query_parse_position (query, &fmt, &position);
936       if (fmt != GST_FORMAT_TIME)
937         return FALSE;
938       SPLITMUX_PART_LOCK (part);
939       position += part->start_offset;
940       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
941           GST_TIME_ARGS (position));
942       SPLITMUX_PART_UNLOCK (part);
943
944       gst_query_set_position (query, fmt, position);
945       break;
946     }
947     default:
948       break;
949   }
950
951   return ret;
952 }
953
954 static GstStateChangeReturn
955 gst_splitmux_part_reader_change_state (GstElement * element,
956     GstStateChange transition)
957 {
958   GstStateChangeReturn ret;
959   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
960
961   switch (transition) {
962     case GST_STATE_CHANGE_NULL_TO_READY:{
963       break;
964     }
965     case GST_STATE_CHANGE_READY_TO_PAUSED:{
966       g_object_set (reader->src, "location", reader->path, NULL);
967       SPLITMUX_PART_LOCK (reader);
968       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
969       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
970       reader->running = TRUE;
971       SPLITMUX_PART_UNLOCK (reader);
972       break;
973     }
974     case GST_STATE_CHANGE_READY_TO_NULL:
975     case GST_STATE_CHANGE_PAUSED_TO_READY:
976       SPLITMUX_PART_LOCK (reader);
977       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
978       reader->running = FALSE;
979       SPLITMUX_PART_BROADCAST (reader);
980       SPLITMUX_PART_UNLOCK (reader);
981       break;
982     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
983       SPLITMUX_PART_LOCK (reader);
984       reader->active = FALSE;
985       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
986       SPLITMUX_PART_BROADCAST (reader);
987       SPLITMUX_PART_UNLOCK (reader);
988       break;
989     default:
990       break;
991   }
992
993   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
994   if (ret == GST_STATE_CHANGE_FAILURE)
995     goto beach;
996
997   switch (transition) {
998     case GST_STATE_CHANGE_READY_TO_PAUSED:
999       /* Sleep and wait until all streams have been collected, then do the seeks
1000        * to measure the stream lengths */
1001       SPLITMUX_PART_LOCK (reader);
1002
1003       while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
1004         GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
1005         SPLITMUX_PART_WAIT (reader);
1006       }
1007
1008       if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
1009         gst_splitmux_part_reader_measure_streams (reader);
1010       else if (reader->prep_state == PART_STATE_FAILED)
1011         ret = GST_STATE_CHANGE_FAILURE;
1012       SPLITMUX_PART_UNLOCK (reader);
1013       break;
1014     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1015       SPLITMUX_PART_LOCK (reader);
1016       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1017       reader->active = TRUE;
1018       SPLITMUX_PART_BROADCAST (reader);
1019       SPLITMUX_PART_UNLOCK (reader);
1020       break;
1021     case GST_STATE_CHANGE_READY_TO_NULL:
1022       reader->prep_state = PART_STATE_NULL;
1023       break;
1024     default:
1025       break;
1026   }
1027
1028 beach:
1029   return ret;
1030 }
1031
1032 static gboolean
1033 check_bus_messages (GstSplitMuxPartReader * part)
1034 {
1035   gboolean ret = FALSE;
1036   GstBus *bus;
1037   GstMessage *m;
1038
1039   bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
1040   while ((m = gst_bus_pop (bus)) != NULL) {
1041     if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
1042       GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
1043       gst_message_unref (m);
1044       goto done;
1045     }
1046     gst_message_unref (m);
1047   }
1048   ret = TRUE;
1049 done:
1050   gst_object_unref (bus);
1051   return ret;
1052 }
1053
1054 gboolean
1055 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1056 {
1057   GstStateChangeReturn ret;
1058
1059   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1060
1061   if (ret != GST_STATE_CHANGE_SUCCESS)
1062     return FALSE;
1063
1064   return check_bus_messages (part);
1065 }
1066
1067 void
1068 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1069 {
1070   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1071 }
1072
1073 void
1074 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1075     const gchar * path)
1076 {
1077   reader->path = g_strdup (path);
1078 }
1079
1080 gboolean
1081 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1082     GstSegment * seg)
1083 {
1084   GST_DEBUG_OBJECT (reader, "Activating part reader");
1085
1086   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
1087     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1088         seg);
1089     return FALSE;
1090   }
1091   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1092           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1093     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1094     return FALSE;
1095   }
1096   return TRUE;
1097 }
1098
1099 void
1100 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1101 {
1102   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1103   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1104 }
1105
1106 void
1107 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1108     gboolean flushing)
1109 {
1110   GList *cur;
1111
1112   GST_LOG_OBJECT (reader, "%s dataqueues",
1113       flushing ? "Flushing" : "Done flushing");
1114   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1115     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1116     gst_data_queue_set_flushing (part_pad->queue, flushing);
1117     if (flushing)
1118       gst_data_queue_flush (part_pad->queue);
1119   }
1120 };
1121
1122 void
1123 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1124     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1125 {
1126   reader->cb_data = cb_data;
1127   reader->get_pad_cb = get_pad_cb;
1128 }
1129
1130 GstClockTime
1131 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1132 {
1133   GList *cur;
1134   GstClockTime ret = GST_CLOCK_TIME_NONE;
1135
1136   SPLITMUX_PART_LOCK (reader);
1137   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1138     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1139     if (part_pad->max_ts < ret)
1140       ret = part_pad->max_ts;
1141   }
1142
1143   SPLITMUX_PART_UNLOCK (reader);
1144
1145   return ret;
1146 }
1147
1148 void
1149 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1150     GstClockTime offset)
1151 {
1152   SPLITMUX_PART_LOCK (reader);
1153   reader->start_offset = offset;
1154   GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1155       GST_TIME_ARGS (offset));
1156   SPLITMUX_PART_UNLOCK (reader);
1157 }
1158
1159 GstClockTime
1160 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1161 {
1162   GstClockTime ret = GST_CLOCK_TIME_NONE;
1163
1164   SPLITMUX_PART_LOCK (reader);
1165   ret = reader->start_offset;
1166   SPLITMUX_PART_UNLOCK (reader);
1167
1168   return ret;
1169 }
1170
1171 GstClockTime
1172 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1173 {
1174   GstClockTime dur;
1175
1176   SPLITMUX_PART_LOCK (reader);
1177   dur = reader->duration;
1178   SPLITMUX_PART_UNLOCK (reader);
1179
1180   return dur;
1181 }
1182
1183 GstPad *
1184 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1185     GstPad * target)
1186 {
1187   GstPad *result = NULL;
1188   GList *cur;
1189
1190   SPLITMUX_PART_LOCK (reader);
1191   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1192     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1193     if (part_pad->target == target) {
1194       result = (GstPad *) part_pad;
1195       break;
1196     }
1197   }
1198   SPLITMUX_PART_UNLOCK (reader);
1199
1200   return result;
1201 }
1202
1203 GstFlowReturn
1204 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1205     GstDataQueueItem ** item)
1206 {
1207   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1208   GstDataQueue *q;
1209
1210   /* Get one item from the appropriate dataqueue */
1211   SPLITMUX_PART_LOCK (reader);
1212   if (reader->prep_state == PART_STATE_FAILED) {
1213     SPLITMUX_PART_UNLOCK (reader);
1214     return GST_FLOW_ERROR;
1215   }
1216
1217   q = gst_object_ref (part_pad->queue);
1218
1219   /* Have to drop the lock around pop, so we can be woken up for flush */
1220   SPLITMUX_PART_UNLOCK (reader);
1221   if (!gst_data_queue_pop (q, item) || (*item == NULL))
1222     return GST_FLOW_FLUSHING;
1223
1224   SPLITMUX_PART_LOCK (reader);
1225
1226   SPLITMUX_PART_BROADCAST (reader);
1227   if (GST_IS_EVENT ((*item)->object)) {
1228     GstEvent *e = (GstEvent *) ((*item)->object);
1229     /* Mark this pad as EOS */
1230     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1231       part_pad->is_eos = TRUE;
1232   }
1233
1234   SPLITMUX_PART_UNLOCK (reader);
1235   gst_object_unref (q);
1236   return GST_FLOW_OK;
1237 }
1238
1239 static void
1240 bus_handler (GstBin * bin, GstMessage * message)
1241 {
1242   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1243
1244   switch (GST_MESSAGE_TYPE (message)) {
1245     case GST_MESSAGE_ERROR:
1246       /* Make sure to set the state to failed and wake up the listener
1247        * on error */
1248       SPLITMUX_PART_LOCK (reader);
1249       reader->prep_state = PART_STATE_FAILED;
1250       SPLITMUX_PART_BROADCAST (reader);
1251       SPLITMUX_PART_UNLOCK (reader);
1252       break;
1253     default:
1254       break;
1255   }
1256
1257   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1258 }