splitmuxsrc: Implement state change asynchronously instead of blocking
authorSebastian Dröge <sebastian@centricular.com>
Wed, 9 Jan 2019 09:42:36 +0000 (11:42 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 9 Jan 2019 11:35:58 +0000 (13:35 +0200)
Blocking in change_state() is a recipe for disaster, even more so if
we wait for another thread that also calls into various element API and
could then lead to deadlocks on e.g. the state lock.

gst/multifile/gstsplitmuxpartreader.c
gst/multifile/gstsplitmuxpartreader.h
gst/multifile/gstsplitmuxsrc.c
gst/multifile/gstsplitmuxsrc.h

index 75171cb..d4f9096 100644 (file)
@@ -88,6 +88,10 @@ static void type_found (GstElement * typefind, guint probability,
     GstCaps * caps, GstSplitMuxPartReader * reader);
 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
 
+static void
+gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
+    reader);
+
 /* Called with reader lock held */
 static gboolean
 have_empty_queue (GstSplitMuxPartReader * reader)
@@ -415,7 +419,10 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
           GST_LOG_OBJECT (reader,
               "EOS while measuring streams. Resetting for ready");
           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
-          SPLITMUX_PART_BROADCAST (reader);
+
+          gst_element_call_async (GST_ELEMENT_CAST (reader),
+              (GstElementCallAsyncFunc)
+              gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
         }
         goto drop_event;
       }
@@ -690,6 +697,37 @@ splitmux_part_reader_finalize (GObject * object)
 }
 
 static void
+do_async_start (GstSplitMuxPartReader * reader)
+{
+  GstMessage *message;
+
+  GST_STATE_LOCK (reader);
+  reader->async_pending = TRUE;
+
+  message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
+  GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
+  GST_STATE_UNLOCK (reader);
+}
+
+static void
+do_async_done (GstSplitMuxPartReader * reader)
+{
+  GstMessage *message;
+
+  GST_STATE_LOCK (reader);
+  if (reader->async_pending) {
+    message =
+        gst_message_new_async_done (GST_OBJECT_CAST (reader),
+        GST_CLOCK_TIME_NONE);
+    GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
+        message);
+
+    reader->async_pending = FALSE;
+  }
+  GST_STATE_UNLOCK (reader);
+}
+
+static void
 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
 {
   GList *cur;
@@ -850,6 +888,7 @@ gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
 static void
 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
 {
+  SPLITMUX_PART_LOCK (reader);
   /* Trigger a flushing seek to near the end of the file and run each stream
    * to EOS in order to find the smallest end timestamp to start the next
    * file from
@@ -859,18 +898,25 @@ gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
   }
+  SPLITMUX_PART_UNLOCK (reader);
+}
 
-  /* Wait for things to happen */
-  while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
-    SPLITMUX_PART_WAIT (reader);
-
+static void
+gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
+    reader)
+{
+  SPLITMUX_PART_LOCK (reader);
   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
     /* Fire the prepared signal and go to READY state */
     GST_DEBUG_OBJECT (reader,
         "Stream measuring complete. File %s is now ready. Firing prepared signal",
         reader->path);
     reader->prep_state = PART_STATE_READY;
+    SPLITMUX_PART_UNLOCK (reader);
     g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
+    do_async_done (reader);
+  } else {
+    SPLITMUX_PART_UNLOCK (reader);
   }
 }
 
@@ -939,7 +985,9 @@ check_if_pads_collected (GstSplitMuxPartReader * reader)
       GST_DEBUG_OBJECT (reader,
           "no more pads - file %s. Measuring stream length", reader->path);
       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
-      SPLITMUX_PART_BROADCAST (reader);
+      gst_element_call_async (GST_ELEMENT_CAST (reader),
+          (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
+          NULL, NULL);
     }
   }
 }
@@ -1041,16 +1089,16 @@ gst_splitmux_part_reader_change_state (GstElement * element,
       break;
     }
     case GST_STATE_CHANGE_READY_TO_PAUSED:{
-      /* Hold the splitmux type lock until after the
-       * parent state change function has finished
-       * changing the states of things, and type finding can continue */
       SPLITMUX_PART_LOCK (reader);
       g_object_set (reader->src, "location", reader->path, NULL);
       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
       reader->running = TRUE;
       SPLITMUX_PART_UNLOCK (reader);
-      SPLITMUX_PART_TYPE_LOCK (reader);
+
+      /* we go to PAUSED asynchronously once all streams have been collected
+       * and seeks to measure the stream lengths are done */
+      do_async_start (reader);
       break;
     }
     case GST_STATE_CHANGE_READY_TO_NULL:
@@ -1074,33 +1122,16 @@ gst_splitmux_part_reader_change_state (GstElement * element,
 
   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
   if (ret == GST_STATE_CHANGE_FAILURE) {
-    if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
-      /* Make sure to release the lock we took above */
-      SPLITMUX_PART_TYPE_UNLOCK (reader);
-    }
+    do_async_done (reader);
     goto beach;
   }
 
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      /* Sleep and wait until all streams have been collected, then do the seeks
-       * to measure the stream lengths. This took the type lock above,
-       * but it's OK to release it now and let typefinding happen... */
-      SPLITMUX_PART_TYPE_UNLOCK (reader);
-
-      SPLITMUX_PART_LOCK (reader);
-
-      while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
-        GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
-        SPLITMUX_PART_WAIT (reader);
-      }
-
-      if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
-          reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
-        gst_splitmux_part_reader_measure_streams (reader);
-      } else if (reader->prep_state == PART_STATE_FAILED)
-        ret = GST_STATE_CHANGE_FAILURE;
-      SPLITMUX_PART_UNLOCK (reader);
+      ret = GST_STATE_CHANGE_ASYNC;
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      do_async_done (reader);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       SPLITMUX_PART_LOCK (reader);
@@ -1121,28 +1152,6 @@ beach:
   return ret;
 }
 
-static gboolean
-check_bus_messages (GstSplitMuxPartReader * part)
-{
-  gboolean ret = FALSE;
-  GstBus *bus;
-  GstMessage *m;
-
-  bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
-  while ((m = gst_bus_pop (bus)) != NULL) {
-    if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
-      GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
-      gst_message_unref (m);
-      goto done;
-    }
-    gst_message_unref (m);
-  }
-  ret = TRUE;
-done:
-  gst_object_unref (bus);
-  return ret;
-}
-
 gboolean
 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
 {
@@ -1150,10 +1159,10 @@ gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
 
   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
 
-  if (ret != GST_STATE_CHANGE_SUCCESS)
+  if (ret == GST_STATE_CHANGE_FAILURE)
     return FALSE;
 
-  return check_bus_messages (part);
+  return TRUE;
 }
 
 void
@@ -1361,6 +1370,7 @@ bus_handler (GstBin * bin, GstMessage * message)
       reader->prep_state = PART_STATE_FAILED;
       SPLITMUX_PART_BROADCAST (reader);
       SPLITMUX_PART_UNLOCK (reader);
+      do_async_done (reader);
       break;
     default:
       break;
index 03b8b5c..b1d2456 100644 (file)
@@ -64,6 +64,7 @@ struct _GstSplitMuxPartReader
   GstElement *typefind;
   GstElement *demux;
 
+  gboolean async_pending;
   gboolean active;
   gboolean running;
   gboolean prepared;
index 2434218..814329f 100644 (file)
@@ -114,6 +114,9 @@ static gboolean gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux,
     SplitMuxSrcPad * pad);
 static gboolean gst_splitmux_check_new_caps (SplitMuxSrcPad * splitpad,
     GstEvent * event);
+static gboolean gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux);
+static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
+    guint part, GstSeekFlags extra_flags);
 
 #define _do_init \
     G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, splitmux_src_uri_handler_init);
@@ -315,6 +318,38 @@ gst_splitmux_src_get_property (GObject * object, guint prop_id,
   }
 }
 
+static void
+do_async_start (GstSplitMuxSrc * splitmux)
+{
+  GstMessage *message;
+
+  GST_STATE_LOCK (splitmux);
+  splitmux->async_pending = TRUE;
+
+  message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
+  GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux),
+      message);
+  GST_STATE_UNLOCK (splitmux);
+}
+
+static void
+do_async_done (GstSplitMuxSrc * splitmux)
+{
+  GstMessage *message;
+
+  GST_STATE_LOCK (splitmux);
+  if (splitmux->async_pending) {
+    message =
+        gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
+        GST_CLOCK_TIME_NONE);
+    GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux),
+        message);
+
+    splitmux->async_pending = FALSE;
+  }
+  GST_STATE_UNLOCK (splitmux);
+}
+
 static GstStateChangeReturn
 gst_splitmux_src_change_state (GstElement * element, GstStateChange transition)
 {
@@ -326,8 +361,12 @@ gst_splitmux_src_change_state (GstElement * element, GstStateChange transition)
       break;
     }
     case GST_STATE_CHANGE_READY_TO_PAUSED:{
-      if (!gst_splitmux_src_start (splitmux))
+      do_async_start (splitmux);
+
+      if (!gst_splitmux_src_start (splitmux)) {
+        do_async_done (splitmux);
         return GST_STATE_CHANGE_FAILURE;
+      }
       break;
     }
     case GST_STATE_CHANGE_PAUSED_TO_READY:
@@ -341,14 +380,150 @@ gst_splitmux_src_change_state (GstElement * element, GstStateChange transition)
   }
 
   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  if (ret == GST_STATE_CHANGE_FAILURE) {
+    do_async_done (splitmux);
+    return ret;
+  }
+
+  switch (transition) {
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      ret = GST_STATE_CHANGE_ASYNC;
+      break;
+    default:
+      break;
+  }
+
 
   return ret;
 }
 
+static gboolean gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux);
+static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
+    guint part, GstSeekFlags extra_flags);
+
+static void
+gst_splitmux_src_activate_first_part (GstSplitMuxSrc * splitmux)
+{
+  if (!gst_splitmux_src_activate_part (splitmux, 0, GST_SEEK_FLAG_NONE)) {
+    GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
+        ("Failed to activate first part for playback"));
+  }
+}
+
+static GstBusSyncReply
+gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
+    gpointer user_data)
+{
+  GstSplitMuxSrc *splitmux = user_data;
+
+  switch (GST_MESSAGE_TYPE (msg)) {
+    case GST_MESSAGE_ASYNC_DONE:{
+      guint idx = splitmux->num_prepared_parts;
+
+      if (idx >= splitmux->num_parts) {
+        /* Shouldn't really happen! */
+        do_async_done (splitmux);
+        g_warn_if_reached ();
+        break;
+      }
+
+      GST_DEBUG_OBJECT (splitmux, "Prepared file part %s (%u)",
+          splitmux->parts[idx]->path, idx);
+
+      /* Extend our total duration to cover this part */
+      GST_OBJECT_LOCK (splitmux);
+      splitmux->total_duration +=
+          gst_splitmux_part_reader_get_duration (splitmux->parts[idx]);
+      splitmux->play_segment.duration = splitmux->total_duration;
+      GST_OBJECT_UNLOCK (splitmux);
+
+      splitmux->end_offset =
+          gst_splitmux_part_reader_get_end_offset (splitmux->parts[idx]);
+
+      GST_DEBUG_OBJECT (splitmux,
+          "Duration %" GST_TIME_FORMAT ", total duration now: %" GST_TIME_FORMAT
+          " and end offset %" GST_TIME_FORMAT,
+          gst_splitmux_part_reader_get_duration (splitmux->parts[idx]),
+          splitmux->total_duration, splitmux->end_offset);
+
+      splitmux->num_prepared_parts++;
+
+      /* If we're done or preparing the next part fails, finish here */
+      if (splitmux->num_prepared_parts >= splitmux->num_parts
+          || !gst_splitmux_src_prepare_next_part (splitmux)) {
+        /* Store how many parts we actually prepared in the end */
+        splitmux->num_parts = splitmux->num_prepared_parts;
+        do_async_done (splitmux);
+
+        /* All done preparing, activate the first part */
+        GST_INFO_OBJECT (splitmux,
+            "All parts prepared. Total duration %" GST_TIME_FORMAT
+            " Activating first part", GST_TIME_ARGS (splitmux->total_duration));
+        gst_element_call_async (GST_ELEMENT_CAST (splitmux),
+            (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
+            NULL, NULL);
+      }
+
+      break;
+    }
+    case GST_MESSAGE_ERROR:{
+      GST_ERROR_OBJECT (splitmux,
+          "Got error message from part %" GST_PTR_FORMAT ": %" GST_PTR_FORMAT,
+          GST_MESSAGE_SRC (msg), msg);
+      if (splitmux->num_prepared_parts < splitmux->num_parts) {
+        guint idx = splitmux->num_prepared_parts;
+
+        if (idx == 0) {
+          GST_ERROR_OBJECT (splitmux,
+              "Failed to prepare first file part %s for playback",
+              splitmux->parts[idx]->path);
+          GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
+              ("Failed to prepare first file part %s for playback",
+                  splitmux->parts[idx]->path));
+        } else {
+          GST_WARNING_OBJECT (splitmux,
+              "Failed to prepare file part %s. Cannot play past there.",
+              splitmux->parts[idx]->path);
+          GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
+              ("Failed to prepare file part %s. Cannot play past there.",
+                  splitmux->parts[idx]->path));
+        }
+
+        /* Store how many parts we actually prepared in the end */
+        splitmux->num_parts = splitmux->num_prepared_parts;
+        do_async_done (splitmux);
+
+        if (idx > 0) {
+          /* All done preparing, activate the first part */
+          GST_INFO_OBJECT (splitmux,
+              "All parts prepared. Total duration %" GST_TIME_FORMAT
+              " Activating first part",
+              GST_TIME_ARGS (splitmux->total_duration));
+          gst_element_call_async (GST_ELEMENT_CAST (splitmux),
+              (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
+              NULL, NULL);
+        }
+      } else {
+        /* Need to update the message source so that it's part of the element
+         * hierarchy the application would expect */
+        msg = gst_message_copy (msg);
+        gst_object_replace ((GstObject **) & msg->src, (GstObject *) splitmux);
+        gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
+      }
+      break;
+    }
+    default:
+      break;
+  }
+
+  return GST_BUS_PASS;
+}
+
 static GstSplitMuxPartReader *
 gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename)
 {
   GstSplitMuxPartReader *r;
+  GstBus *bus;
 
   r = g_object_new (GST_TYPE_SPLITMUX_PART_READER, NULL);
 
@@ -359,6 +534,10 @@ gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename)
       (GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad);
   gst_splitmux_part_reader_set_location (r, filename);
 
+  bus = gst_element_get_bus (GST_ELEMENT_CAST (r));
+  gst_bus_set_sync_handler (bus, gst_splitmux_part_bus_handler, splitmux, NULL);
+  gst_object_unref (bus);
+
   return r;
 }
 
@@ -654,6 +833,34 @@ gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part,
 }
 
 static gboolean
+gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux)
+{
+  guint idx = splitmux->num_prepared_parts;
+
+  g_assert (idx < splitmux->num_parts);
+
+  GST_DEBUG_OBJECT (splitmux, "Preparing file part %s (%u)",
+      splitmux->parts[idx]->path, idx);
+
+  gst_splitmux_part_reader_set_start_offset (splitmux->parts[idx],
+      splitmux->end_offset);
+  if (!gst_splitmux_part_reader_prepare (splitmux->parts[idx])) {
+    GST_WARNING_OBJECT (splitmux,
+        "Failed to prepare file part %s. Cannot play past there.",
+        splitmux->parts[idx]->path);
+    GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
+        ("Failed to prepare file part %s. Cannot play past there.",
+            splitmux->parts[idx]->path));
+    gst_splitmux_part_reader_unprepare (splitmux->parts[idx]);
+    g_object_unref (splitmux->parts[idx]);
+    splitmux->parts[idx] = NULL;
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+static gboolean
 gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
 {
   gboolean ret = FALSE;
@@ -661,9 +868,7 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
   gchar *basename = NULL;
   gchar *dirname = NULL;
   gchar **files;
-  GstClockTime next_offset = 0;
   guint i;
-  GstClockTime total_duration = 0;
 
   GST_DEBUG_OBJECT (splitmux, "Starting");
 
@@ -693,52 +898,33 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
 
   splitmux->parts = g_new0 (GstSplitMuxPartReader *, splitmux->num_parts);
 
+  /* Create all part pipelines */
   for (i = 0; i < splitmux->num_parts; i++) {
     splitmux->parts[i] = gst_splitmux_part_create (splitmux, files[i]);
     if (splitmux->parts[i] == NULL)
       break;
-
-    /* Figure out the next offset - the smallest one */
-    gst_splitmux_part_reader_set_start_offset (splitmux->parts[i], next_offset);
-    if (!gst_splitmux_part_reader_prepare (splitmux->parts[i])) {
-      GST_WARNING_OBJECT (splitmux,
-          "Failed to prepare file part %s. Cannot play past there.", files[i]);
-      GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
-          ("Failed to prepare file part %s. Cannot play past there.",
-              files[i]));
-      gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
-      g_object_unref (splitmux->parts[i]);
-      splitmux->parts[i] = NULL;
-      break;
-    }
-
-    /* Extend our total duration to cover this part */
-    total_duration =
-        next_offset +
-        gst_splitmux_part_reader_get_duration (splitmux->parts[i]);
-    splitmux->play_segment.duration = total_duration;
-
-    next_offset = gst_splitmux_part_reader_get_end_offset (splitmux->parts[i]);
   }
 
+  /* Store how many parts we actually created */
+  splitmux->num_created_parts = splitmux->num_parts = i;
+  splitmux->num_prepared_parts = 0;
+
   /* Update total_duration state variable */
   GST_OBJECT_LOCK (splitmux);
-  splitmux->total_duration = total_duration;
+  splitmux->total_duration = 0;
+  splitmux->end_offset = 0;
   GST_OBJECT_UNLOCK (splitmux);
 
-  /* Store how many parts we actually created */
-  splitmux->num_parts = i;
-
-  if (splitmux->num_parts < 1)
+  /* Then start the first: it will asynchronously go to PAUSED
+   * or error out and then we can proceed with the next one
+   */
+  if (!gst_splitmux_src_prepare_next_part (splitmux) || splitmux->num_parts < 1)
     goto failed_part;
 
-  /* All done preparing, activate the first part */
-  GST_INFO_OBJECT (splitmux,
-      "All parts prepared. Total duration %" GST_TIME_FORMAT
-      " Activating first part", GST_TIME_ARGS (total_duration));
-  ret = gst_splitmux_src_activate_part (splitmux, 0, GST_SEEK_FLAG_NONE);
-  if (ret == FALSE)
-    goto failed_first_part;
+  /* All good now: we have to wait for all parts to be asynchronously
+   * prepared to know the total duration we can play */
+  ret = TRUE;
+
 done:
   if (err != NULL)
     g_error_free (err);
@@ -762,12 +948,6 @@ failed_part:
         ("Failed to open any files for reading"));
     goto done;
   }
-failed_first_part:
-  {
-    GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
-        ("Failed to activate first part for playback"));
-    goto done;
-  }
 }
 
 static gboolean
@@ -784,7 +964,7 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
   GST_DEBUG_OBJECT (splitmux, "Stopping");
 
   /* Stop and destroy all parts  */
-  for (i = 0; i < splitmux->num_parts; i++) {
+  for (i = 0; i < splitmux->num_created_parts; i++) {
     if (splitmux->parts[i] == NULL)
       continue;
     gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
@@ -809,6 +989,8 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
   g_free (splitmux->parts);
   splitmux->parts = NULL;
   splitmux->num_parts = 0;
+  splitmux->num_prepared_parts = 0;
+  splitmux->num_created_parts = 0;
   splitmux->running = FALSE;
   splitmux->total_duration = GST_CLOCK_TIME_NONE;
   /* Reset playback segment */
index 96e605c..a419d9c 100644 (file)
@@ -50,15 +50,20 @@ struct _GstSplitMuxSrc
 
   GstSplitMuxPartReader **parts;
   guint        num_parts;
+  guint        num_prepared_parts;
+  guint        num_created_parts;
   guint        cur_part;
 
+  gboolean async_pending;
   gboolean pads_complete;
+
   GMutex pads_lock;
   GList  *pads; /* pads_lock */
   guint n_pads;
   guint n_notlinked;
 
   GstClockTime total_duration;
+  GstClockTime end_offset;
   GstSegment play_segment;
   guint32 segment_seqnum;
 };