splitmuxsink: Don't leak old muxer/sink in async mode
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxpartreader.c
index 5996aa0..75171cb 100644 (file)
@@ -34,6 +34,9 @@ GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
 
+#define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
+#define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
+
 enum
 {
   SIGNAL_PREPARED,
@@ -57,6 +60,7 @@ typedef struct _GstSplitMuxPartPad
   gboolean flushing;
   gboolean seen_buffer;
 
+  gboolean is_sparse;
   GstClockTime max_ts;
   GstSegment segment;
 
@@ -130,7 +134,7 @@ static void
 handle_buffer_measuring (GstSplitMuxPartReader * reader,
     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
 {
-  GstClockTime ts = GST_CLOCK_TIME_NONE;
+  GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
   GstClockTimeDiff offset;
 
   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
@@ -160,17 +164,18 @@ handle_buffer_measuring (GstSplitMuxPartReader * reader,
 
   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
       " incoming PTS %" GST_TIME_FORMAT
-      " DTS %" GST_TIME_FORMAT " offset by %" GST_TIME_FORMAT
-      " to %" GST_TIME_FORMAT, part_pad,
+      " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
+      " to %" GST_STIME_FORMAT, part_pad,
       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
-      GST_TIME_ARGS (offset), GST_TIME_ARGS (ts));
+      GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
 
-  if (GST_CLOCK_TIME_IS_VALID (ts)) {
+  if (GST_CLOCK_STIME_IS_VALID (ts)) {
     if (GST_BUFFER_DURATION_IS_VALID (buf))
       ts += GST_BUFFER_DURATION (buf);
 
-    if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
+    if (GST_CLOCK_STIME_IS_VALID (ts)
+        && ts > (GstClockTimeDiff) part_pad->max_ts) {
       part_pad->max_ts = ts;
       GST_LOG_OBJECT (reader,
           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
@@ -337,6 +342,12 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
     goto drop_event;
 
   switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_STREAM_START:{
+      GstStreamFlags flags;
+      gst_event_parse_stream_flags (event, &flags);
+      part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
+      break;
+    }
     case GST_EVENT_SEGMENT:{
       GstSegment *seg = &part_pad->segment;
 
@@ -394,13 +405,15 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
 
-      if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
+      if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
+          reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
         /* Mark this pad as EOS */
         part_pad->is_eos = TRUE;
         if (splitmux_part_is_eos_locked (reader)) {
           /* Finished measuring things, set state and tell the state change func
            * so it can seek back to the start */
-          GST_LOG_OBJECT (reader, "EOS while measuring streams");
+          GST_LOG_OBJECT (reader,
+              "EOS while measuring streams. Resetting for ready");
           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
           SPLITMUX_PART_BROADCAST (reader);
         }
@@ -443,12 +456,8 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
     goto drop_event;
 
-  if (!block_until_can_push (reader)) {
-    SPLITMUX_PART_UNLOCK (reader);
-    gst_object_unref (target);
-    gst_event_unref (event);
-    return FALSE;
-  }
+  if (!block_until_can_push (reader))
+    goto drop_event;
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_GAP:{
@@ -480,10 +489,12 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
   }
 
   gst_object_unref (part_pad->queue);
+  gst_object_unref (target);
 
   return ret;
 wrong_segment:
   gst_event_unref (event);
+  gst_object_unref (target);
   SPLITMUX_PART_UNLOCK (reader);
   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
@@ -493,6 +504,7 @@ drop_event:
   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
   gst_event_unref (event);
+  gst_object_unref (target);
   SPLITMUX_PART_UNLOCK (reader);
   return TRUE;
 }
@@ -534,6 +546,8 @@ splitmux_part_pad_constructed (GObject * pad)
       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
   gst_pad_set_query_function (GST_PAD (pad),
       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
+
+  G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
 }
 
 static void
@@ -558,7 +572,14 @@ static void
 splitmux_part_pad_finalize (GObject * obj)
 {
   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
-  g_object_unref ((GObject *) (pad->queue));
+
+  GST_DEBUG_OBJECT (obj, "finalize");
+  gst_data_queue_set_flushing (pad->queue, TRUE);
+  gst_data_queue_flush (pad->queue);
+  gst_object_unref (GST_OBJECT_CAST (pad->queue));
+  pad->queue = NULL;
+
+  G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
 }
 
 static void
@@ -573,6 +594,9 @@ static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
     * part, gboolean flushing);
 static void bus_handler (GstBin * bin, GstMessage * msg);
+static void splitmux_part_reader_dispose (GObject * object);
+static void splitmux_part_reader_finalize (GObject * object);
+static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
 
 #define gst_splitmux_part_reader_parent_class parent_class
 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
@@ -581,12 +605,16 @@ G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
 static void
 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
 {
+  GObjectClass *gobject_klass = (GObjectClass *) (klass);
   GstElementClass *gstelement_class = (GstElementClass *) klass;
   GstBinClass *gstbin_class = (GstBinClass *) klass;
 
   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
       "Split File Demuxing Source helper");
 
+  gobject_klass->dispose = splitmux_part_reader_dispose;
+  gobject_klass->finalize = splitmux_part_reader_finalize;
+
   part_reader_signals[SIGNAL_PREPARED] =
       g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
@@ -607,6 +635,7 @@ gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
 
   g_cond_init (&reader->inactive_cond);
   g_mutex_init (&reader->lock);
+  g_mutex_init (&reader->type_lock);
 
   /* FIXME: Create elements on a state change */
   reader->src = gst_element_factory_make ("filesrc", NULL);
@@ -636,6 +665,47 @@ gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
       reader);
 }
 
+static void
+splitmux_part_reader_dispose (GObject * object)
+{
+  GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
+
+  splitmux_part_reader_reset (reader);
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static void
+splitmux_part_reader_finalize (GObject * object)
+{
+  GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
+
+  g_cond_clear (&reader->inactive_cond);
+  g_mutex_clear (&reader->lock);
+  g_mutex_clear (&reader->type_lock);
+
+  g_free (reader->path);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
+{
+  GList *cur;
+
+  SPLITMUX_PART_LOCK (reader);
+  for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
+    GstPad *pad = GST_PAD_CAST (cur->data);
+    gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
+    gst_object_unref (GST_OBJECT_CAST (pad));
+  }
+
+  g_list_free (reader->pads);
+  reader->pads = NULL;
+  SPLITMUX_PART_UNLOCK (reader);
+}
+
 static GstSplitMuxPartPad *
 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
     GstPad * target)
@@ -665,11 +735,17 @@ new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
 
   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
+
+  gst_caps_unref (caps);
+
   /* Look up or create the output pad */
   if (reader->get_pad_cb)
     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
-  if (out_pad == NULL)
+  if (out_pad == NULL) {
+    GST_DEBUG_OBJECT (reader,
+        "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
     return;
+  }
 
   /* Create our proxy pad to interact with this new pad */
   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
@@ -743,12 +819,12 @@ gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
 static gboolean
 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
-    GstSegment * target_seg)
+    GstSegment * target_seg, GstSeekFlags extra_flags)
 {
   GstSeekFlags flags;
   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
 
-  flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
+  flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
 
   SPLITMUX_PART_LOCK (reader);
   if (target_seg->start >= reader->start_offset)
@@ -788,9 +864,8 @@ gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
   while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
     SPLITMUX_PART_WAIT (reader);
 
-  /* Seek back to the start now */
   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
-    /* Fire the prepared signal */
+    /* Fire the prepared signal and go to READY state */
     GST_DEBUG_OBJECT (reader,
         "Stream measuring complete. File %s is now ready. Firing prepared signal",
         reader->path);
@@ -834,7 +909,7 @@ type_found (GstElement * typefind, guint probability,
 {
   GstElement *demux;
 
-  GST_WARNING ("Got type %" GST_PTR_FORMAT, caps);
+  GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
 
   /* typefind found a type. Look for the demuxer to handle it */
   demux = reader->demux = find_demuxer (caps);
@@ -843,14 +918,16 @@ type_found (GstElement * typefind, guint probability,
     return;
   }
 
-  gst_bin_add (GST_BIN_CAST (reader), demux);
-  gst_element_link_pads (reader->typefind, "src", demux, NULL);
-  gst_element_set_state (reader->demux, GST_STATE_PLAYING);
-
   /* Connect to demux signals */
   g_signal_connect (demux,
       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
+
+  gst_element_set_locked_state (demux, TRUE);
+  gst_bin_add (GST_BIN_CAST (reader), demux);
+  gst_element_link_pads (reader->typefind, "src", demux, NULL);
+  gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
+  gst_element_set_locked_state (demux, FALSE);
 }
 
 static void
@@ -921,10 +998,9 @@ gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
     return FALSE;
 
   ret = gst_pad_peer_query (target, query);
-  gst_object_unref (GST_OBJECT_CAST (target));
 
   if (ret == FALSE)
-    return ret;
+    goto out;
 
   /* Post-massaging of queries */
   switch (GST_QUERY_TYPE (query)) {
@@ -948,6 +1024,8 @@ gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
       break;
   }
 
+out:
+  gst_object_unref (target);
   return ret;
 }
 
@@ -963,12 +1041,16 @@ gst_splitmux_part_reader_change_state (GstElement * element,
       break;
     }
     case GST_STATE_CHANGE_READY_TO_PAUSED:{
-      g_object_set (reader->src, "location", reader->path, NULL);
+      /* Hold the splitmux type lock until after the
+       * parent state change function has finished
+       * changing the states of things, and type finding can continue */
       SPLITMUX_PART_LOCK (reader);
+      g_object_set (reader->src, "location", reader->path, NULL);
       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
       reader->running = TRUE;
       SPLITMUX_PART_UNLOCK (reader);
+      SPLITMUX_PART_TYPE_LOCK (reader);
       break;
     }
     case GST_STATE_CHANGE_READY_TO_NULL:
@@ -991,13 +1073,21 @@ gst_splitmux_part_reader_change_state (GstElement * element,
   }
 
   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-  if (ret == GST_STATE_CHANGE_FAILURE)
+  if (ret == GST_STATE_CHANGE_FAILURE) {
+    if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
+      /* Make sure to release the lock we took above */
+      SPLITMUX_PART_TYPE_UNLOCK (reader);
+    }
     goto beach;
+  }
 
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       /* Sleep and wait until all streams have been collected, then do the seeks
-       * to measure the stream lengths */
+       * to measure the stream lengths. This took the type lock above,
+       * but it's OK to release it now and let typefinding happen... */
+      SPLITMUX_PART_TYPE_UNLOCK (reader);
+
       SPLITMUX_PART_LOCK (reader);
 
       while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
@@ -1005,9 +1095,10 @@ gst_splitmux_part_reader_change_state (GstElement * element,
         SPLITMUX_PART_WAIT (reader);
       }
 
-      if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
+      if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
+          reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
         gst_splitmux_part_reader_measure_streams (reader);
-      else if (reader->prep_state == PART_STATE_FAILED)
+      else if (reader->prep_state == PART_STATE_FAILED)
         ret = GST_STATE_CHANGE_FAILURE;
       SPLITMUX_PART_UNLOCK (reader);
       break;
@@ -1020,6 +1111,7 @@ gst_splitmux_part_reader_change_state (GstElement * element,
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       reader->prep_state = PART_STATE_NULL;
+      splitmux_part_reader_reset (reader);
       break;
     default:
       break;
@@ -1079,11 +1171,11 @@ gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
 
 gboolean
 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
-    GstSegment * seg)
+    GstSegment * seg, GstSeekFlags extra_flags)
 {
   GST_DEBUG_OBJECT (reader, "Activating part reader");
 
-  if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
+  if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
         seg);
     return FALSE;
@@ -1096,6 +1188,18 @@ gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
   return TRUE;
 }
 
+gboolean
+gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
+{
+  gboolean ret;
+
+  SPLITMUX_PART_LOCK (part);
+  ret = part->active;
+  SPLITMUX_PART_UNLOCK (part);
+
+  return ret;
+}
+
 void
 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
 {
@@ -1136,7 +1240,7 @@ gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
   SPLITMUX_PART_LOCK (reader);
   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
-    if (part_pad->max_ts < ret)
+    if (!part_pad->is_sparse && part_pad->max_ts < ret)
       ret = part_pad->max_ts;
   }
 
@@ -1191,7 +1295,7 @@ gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
     if (part_pad->target == target) {
-      result = (GstPad *) part_pad;
+      result = (GstPad *) gst_object_ref (part_pad);
       break;
     }
   }
@@ -1206,6 +1310,7 @@ gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
 {
   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
   GstDataQueue *q;
+  GstFlowReturn ret;
 
   /* Get one item from the appropriate dataqueue */
   SPLITMUX_PART_LOCK (reader);
@@ -1218,8 +1323,10 @@ gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
 
   /* Have to drop the lock around pop, so we can be woken up for flush */
   SPLITMUX_PART_UNLOCK (reader);
-  if (!gst_data_queue_pop (q, item) || (*item == NULL))
-    return GST_FLOW_FLUSHING;
+  if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
+    ret = GST_FLOW_FLUSHING;
+    goto out;
+  }
 
   SPLITMUX_PART_LOCK (reader);
 
@@ -1232,8 +1339,11 @@ gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
   }
 
   SPLITMUX_PART_UNLOCK (reader);
+
+  ret = GST_FLOW_OK;
+out:
   gst_object_unref (q);
-  return GST_FLOW_OK;
+  return ret;
 }
 
 static void
@@ -1246,6 +1356,8 @@ bus_handler (GstBin * bin, GstMessage * message)
       /* Make sure to set the state to failed and wake up the listener
        * on error */
       SPLITMUX_PART_LOCK (reader);
+      GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
+          " marking this reader as failed", GST_MESSAGE_SRC (message));
       reader->prep_state = PART_STATE_FAILED;
       SPLITMUX_PART_BROADCAST (reader);
       SPLITMUX_PART_UNLOCK (reader);