GstCaps * caps, GstSplitMuxPartReader * reader);
static void check_if_pads_collected (GstSplitMuxPartReader * reader);
+static void
+gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
+ reader);
+
/* Called with reader lock held */
static gboolean
have_empty_queue (GstSplitMuxPartReader * reader)
GST_LOG_OBJECT (reader,
"EOS while measuring streams. Resetting for ready");
reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
- SPLITMUX_PART_BROADCAST (reader);
+
+ gst_element_call_async (GST_ELEMENT_CAST (reader),
+ (GstElementCallAsyncFunc)
+ gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
}
goto drop_event;
}
}
static void
+do_async_start (GstSplitMuxPartReader * reader)
+{
+ GstMessage *message;
+
+ GST_STATE_LOCK (reader);
+ reader->async_pending = TRUE;
+
+ message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
+ GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
+ GST_STATE_UNLOCK (reader);
+}
+
+static void
+do_async_done (GstSplitMuxPartReader * reader)
+{
+ GstMessage *message;
+
+ GST_STATE_LOCK (reader);
+ if (reader->async_pending) {
+ message =
+ gst_message_new_async_done (GST_OBJECT_CAST (reader),
+ GST_CLOCK_TIME_NONE);
+ GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
+ message);
+
+ reader->async_pending = FALSE;
+ }
+ GST_STATE_UNLOCK (reader);
+}
+
+static void
splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
{
GList *cur;
static void
gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
{
+ SPLITMUX_PART_LOCK (reader);
/* Trigger a flushing seek to near the end of the file and run each stream
* to EOS in order to find the smallest end timestamp to start the next
* file from
GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
}
+ SPLITMUX_PART_UNLOCK (reader);
+}
- /* Wait for things to happen */
- while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
- SPLITMUX_PART_WAIT (reader);
-
+static void
+gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
+ reader)
+{
+ SPLITMUX_PART_LOCK (reader);
if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
/* Fire the prepared signal and go to READY state */
GST_DEBUG_OBJECT (reader,
"Stream measuring complete. File %s is now ready. Firing prepared signal",
reader->path);
reader->prep_state = PART_STATE_READY;
+ SPLITMUX_PART_UNLOCK (reader);
g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
+ do_async_done (reader);
+ } else {
+ SPLITMUX_PART_UNLOCK (reader);
}
}
GST_DEBUG_OBJECT (reader,
"no more pads - file %s. Measuring stream length", reader->path);
reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
- SPLITMUX_PART_BROADCAST (reader);
+ gst_element_call_async (GST_ELEMENT_CAST (reader),
+ (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
+ NULL, NULL);
}
}
}
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
- /* Hold the splitmux type lock until after the
- * parent state change function has finished
- * changing the states of things, and type finding can continue */
SPLITMUX_PART_LOCK (reader);
g_object_set (reader->src, "location", reader->path, NULL);
reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
reader->running = TRUE;
SPLITMUX_PART_UNLOCK (reader);
- SPLITMUX_PART_TYPE_LOCK (reader);
+
+ /* we go to PAUSED asynchronously once all streams have been collected
+ * and seeks to measure the stream lengths are done */
+ do_async_start (reader);
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE) {
- if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
- /* Make sure to release the lock we took above */
- SPLITMUX_PART_TYPE_UNLOCK (reader);
- }
+ do_async_done (reader);
goto beach;
}
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
- /* Sleep and wait until all streams have been collected, then do the seeks
- * to measure the stream lengths. This took the type lock above,
- * but it's OK to release it now and let typefinding happen... */
- SPLITMUX_PART_TYPE_UNLOCK (reader);
-
- SPLITMUX_PART_LOCK (reader);
-
- while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
- GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
- SPLITMUX_PART_WAIT (reader);
- }
-
- if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
- reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
- gst_splitmux_part_reader_measure_streams (reader);
- } else if (reader->prep_state == PART_STATE_FAILED)
- ret = GST_STATE_CHANGE_FAILURE;
- SPLITMUX_PART_UNLOCK (reader);
+ ret = GST_STATE_CHANGE_ASYNC;
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ do_async_done (reader);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
SPLITMUX_PART_LOCK (reader);
return ret;
}
-static gboolean
-check_bus_messages (GstSplitMuxPartReader * part)
-{
- gboolean ret = FALSE;
- GstBus *bus;
- GstMessage *m;
-
- bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
- while ((m = gst_bus_pop (bus)) != NULL) {
- if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
- GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
- gst_message_unref (m);
- goto done;
- }
- gst_message_unref (m);
- }
- ret = TRUE;
-done:
- gst_object_unref (bus);
- return ret;
-}
-
gboolean
gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
{
ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
- if (ret != GST_STATE_CHANGE_SUCCESS)
+ if (ret == GST_STATE_CHANGE_FAILURE)
return FALSE;
- return check_bus_messages (part);
+ return TRUE;
}
void
reader->prep_state = PART_STATE_FAILED;
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
+ do_async_done (reader);
break;
default:
break;
SplitMuxSrcPad * pad);
static gboolean gst_splitmux_check_new_caps (SplitMuxSrcPad * splitpad,
GstEvent * event);
+static gboolean gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux);
+static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
+ guint part, GstSeekFlags extra_flags);
#define _do_init \
G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, splitmux_src_uri_handler_init);
}
}
+static void
+do_async_start (GstSplitMuxSrc * splitmux)
+{
+ GstMessage *message;
+
+ GST_STATE_LOCK (splitmux);
+ splitmux->async_pending = TRUE;
+
+ message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
+ GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux),
+ message);
+ GST_STATE_UNLOCK (splitmux);
+}
+
+static void
+do_async_done (GstSplitMuxSrc * splitmux)
+{
+ GstMessage *message;
+
+ GST_STATE_LOCK (splitmux);
+ if (splitmux->async_pending) {
+ message =
+ gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
+ GST_CLOCK_TIME_NONE);
+ GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux),
+ message);
+
+ splitmux->async_pending = FALSE;
+ }
+ GST_STATE_UNLOCK (splitmux);
+}
+
static GstStateChangeReturn
gst_splitmux_src_change_state (GstElement * element, GstStateChange transition)
{
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
- if (!gst_splitmux_src_start (splitmux))
+ do_async_start (splitmux);
+
+ if (!gst_splitmux_src_start (splitmux)) {
+ do_async_done (splitmux);
return GST_STATE_CHANGE_FAILURE;
+ }
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ if (ret == GST_STATE_CHANGE_FAILURE) {
+ do_async_done (splitmux);
+ return ret;
+ }
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ ret = GST_STATE_CHANGE_ASYNC;
+ break;
+ default:
+ break;
+ }
+
return ret;
}
+static gboolean gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux);
+static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
+ guint part, GstSeekFlags extra_flags);
+
+static void
+gst_splitmux_src_activate_first_part (GstSplitMuxSrc * splitmux)
+{
+ if (!gst_splitmux_src_activate_part (splitmux, 0, GST_SEEK_FLAG_NONE)) {
+ GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
+ ("Failed to activate first part for playback"));
+ }
+}
+
+static GstBusSyncReply
+gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
+ gpointer user_data)
+{
+ GstSplitMuxSrc *splitmux = user_data;
+
+ switch (GST_MESSAGE_TYPE (msg)) {
+ case GST_MESSAGE_ASYNC_DONE:{
+ guint idx = splitmux->num_prepared_parts;
+
+ if (idx >= splitmux->num_parts) {
+ /* Shouldn't really happen! */
+ do_async_done (splitmux);
+ g_warn_if_reached ();
+ break;
+ }
+
+ GST_DEBUG_OBJECT (splitmux, "Prepared file part %s (%u)",
+ splitmux->parts[idx]->path, idx);
+
+ /* Extend our total duration to cover this part */
+ GST_OBJECT_LOCK (splitmux);
+ splitmux->total_duration +=
+ gst_splitmux_part_reader_get_duration (splitmux->parts[idx]);
+ splitmux->play_segment.duration = splitmux->total_duration;
+ GST_OBJECT_UNLOCK (splitmux);
+
+ splitmux->end_offset =
+ gst_splitmux_part_reader_get_end_offset (splitmux->parts[idx]);
+
+ GST_DEBUG_OBJECT (splitmux,
+ "Duration %" GST_TIME_FORMAT ", total duration now: %" GST_TIME_FORMAT
+ " and end offset %" GST_TIME_FORMAT,
+ gst_splitmux_part_reader_get_duration (splitmux->parts[idx]),
+ splitmux->total_duration, splitmux->end_offset);
+
+ splitmux->num_prepared_parts++;
+
+ /* If we're done or preparing the next part fails, finish here */
+ if (splitmux->num_prepared_parts >= splitmux->num_parts
+ || !gst_splitmux_src_prepare_next_part (splitmux)) {
+ /* Store how many parts we actually prepared in the end */
+ splitmux->num_parts = splitmux->num_prepared_parts;
+ do_async_done (splitmux);
+
+ /* All done preparing, activate the first part */
+ GST_INFO_OBJECT (splitmux,
+ "All parts prepared. Total duration %" GST_TIME_FORMAT
+ " Activating first part", GST_TIME_ARGS (splitmux->total_duration));
+ gst_element_call_async (GST_ELEMENT_CAST (splitmux),
+ (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
+ NULL, NULL);
+ }
+
+ break;
+ }
+ case GST_MESSAGE_ERROR:{
+ GST_ERROR_OBJECT (splitmux,
+ "Got error message from part %" GST_PTR_FORMAT ": %" GST_PTR_FORMAT,
+ GST_MESSAGE_SRC (msg), msg);
+ if (splitmux->num_prepared_parts < splitmux->num_parts) {
+ guint idx = splitmux->num_prepared_parts;
+
+ if (idx == 0) {
+ GST_ERROR_OBJECT (splitmux,
+ "Failed to prepare first file part %s for playback",
+ splitmux->parts[idx]->path);
+ GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
+ ("Failed to prepare first file part %s for playback",
+ splitmux->parts[idx]->path));
+ } else {
+ GST_WARNING_OBJECT (splitmux,
+ "Failed to prepare file part %s. Cannot play past there.",
+ splitmux->parts[idx]->path);
+ GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
+ ("Failed to prepare file part %s. Cannot play past there.",
+ splitmux->parts[idx]->path));
+ }
+
+ /* Store how many parts we actually prepared in the end */
+ splitmux->num_parts = splitmux->num_prepared_parts;
+ do_async_done (splitmux);
+
+ if (idx > 0) {
+ /* All done preparing, activate the first part */
+ GST_INFO_OBJECT (splitmux,
+ "All parts prepared. Total duration %" GST_TIME_FORMAT
+ " Activating first part",
+ GST_TIME_ARGS (splitmux->total_duration));
+ gst_element_call_async (GST_ELEMENT_CAST (splitmux),
+ (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
+ NULL, NULL);
+ }
+ } else {
+ /* Need to update the message source so that it's part of the element
+ * hierarchy the application would expect */
+ msg = gst_message_copy (msg);
+ gst_object_replace ((GstObject **) & msg->src, (GstObject *) splitmux);
+ gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ return GST_BUS_PASS;
+}
+
static GstSplitMuxPartReader *
gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename)
{
GstSplitMuxPartReader *r;
+ GstBus *bus;
r = g_object_new (GST_TYPE_SPLITMUX_PART_READER, NULL);
(GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad);
gst_splitmux_part_reader_set_location (r, filename);
+ bus = gst_element_get_bus (GST_ELEMENT_CAST (r));
+ gst_bus_set_sync_handler (bus, gst_splitmux_part_bus_handler, splitmux, NULL);
+ gst_object_unref (bus);
+
return r;
}
}
static gboolean
+gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux)
+{
+ guint idx = splitmux->num_prepared_parts;
+
+ g_assert (idx < splitmux->num_parts);
+
+ GST_DEBUG_OBJECT (splitmux, "Preparing file part %s (%u)",
+ splitmux->parts[idx]->path, idx);
+
+ gst_splitmux_part_reader_set_start_offset (splitmux->parts[idx],
+ splitmux->end_offset);
+ if (!gst_splitmux_part_reader_prepare (splitmux->parts[idx])) {
+ GST_WARNING_OBJECT (splitmux,
+ "Failed to prepare file part %s. Cannot play past there.",
+ splitmux->parts[idx]->path);
+ GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
+ ("Failed to prepare file part %s. Cannot play past there.",
+ splitmux->parts[idx]->path));
+ gst_splitmux_part_reader_unprepare (splitmux->parts[idx]);
+ g_object_unref (splitmux->parts[idx]);
+ splitmux->parts[idx] = NULL;
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
{
gboolean ret = FALSE;
gchar *basename = NULL;
gchar *dirname = NULL;
gchar **files;
- GstClockTime next_offset = 0;
guint i;
- GstClockTime total_duration = 0;
GST_DEBUG_OBJECT (splitmux, "Starting");
splitmux->parts = g_new0 (GstSplitMuxPartReader *, splitmux->num_parts);
+ /* Create all part pipelines */
for (i = 0; i < splitmux->num_parts; i++) {
splitmux->parts[i] = gst_splitmux_part_create (splitmux, files[i]);
if (splitmux->parts[i] == NULL)
break;
-
- /* Figure out the next offset - the smallest one */
- gst_splitmux_part_reader_set_start_offset (splitmux->parts[i], next_offset);
- if (!gst_splitmux_part_reader_prepare (splitmux->parts[i])) {
- GST_WARNING_OBJECT (splitmux,
- "Failed to prepare file part %s. Cannot play past there.", files[i]);
- GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
- ("Failed to prepare file part %s. Cannot play past there.",
- files[i]));
- gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
- g_object_unref (splitmux->parts[i]);
- splitmux->parts[i] = NULL;
- break;
- }
-
- /* Extend our total duration to cover this part */
- total_duration =
- next_offset +
- gst_splitmux_part_reader_get_duration (splitmux->parts[i]);
- splitmux->play_segment.duration = total_duration;
-
- next_offset = gst_splitmux_part_reader_get_end_offset (splitmux->parts[i]);
}
+ /* Store how many parts we actually created */
+ splitmux->num_created_parts = splitmux->num_parts = i;
+ splitmux->num_prepared_parts = 0;
+
/* Update total_duration state variable */
GST_OBJECT_LOCK (splitmux);
- splitmux->total_duration = total_duration;
+ splitmux->total_duration = 0;
+ splitmux->end_offset = 0;
GST_OBJECT_UNLOCK (splitmux);
- /* Store how many parts we actually created */
- splitmux->num_parts = i;
-
- if (splitmux->num_parts < 1)
+ /* Then start the first: it will asynchronously go to PAUSED
+ * or error out and then we can proceed with the next one
+ */
+ if (!gst_splitmux_src_prepare_next_part (splitmux) || splitmux->num_parts < 1)
goto failed_part;
- /* All done preparing, activate the first part */
- GST_INFO_OBJECT (splitmux,
- "All parts prepared. Total duration %" GST_TIME_FORMAT
- " Activating first part", GST_TIME_ARGS (total_duration));
- ret = gst_splitmux_src_activate_part (splitmux, 0, GST_SEEK_FLAG_NONE);
- if (ret == FALSE)
- goto failed_first_part;
+ /* All good now: we have to wait for all parts to be asynchronously
+ * prepared to know the total duration we can play */
+ ret = TRUE;
+
done:
if (err != NULL)
g_error_free (err);
("Failed to open any files for reading"));
goto done;
}
-failed_first_part:
- {
- GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
- ("Failed to activate first part for playback"));
- goto done;
- }
}
static gboolean
GST_DEBUG_OBJECT (splitmux, "Stopping");
/* Stop and destroy all parts */
- for (i = 0; i < splitmux->num_parts; i++) {
+ for (i = 0; i < splitmux->num_created_parts; i++) {
if (splitmux->parts[i] == NULL)
continue;
gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
g_free (splitmux->parts);
splitmux->parts = NULL;
splitmux->num_parts = 0;
+ splitmux->num_prepared_parts = 0;
+ splitmux->num_created_parts = 0;
splitmux->running = FALSE;
splitmux->total_duration = GST_CLOCK_TIME_NONE;
/* Reset playback segment */