From 5b41ea2fe81cc6667a3e6097e47ed6a15801277a Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Tue, 1 Nov 2022 09:49:39 +0100 Subject: [PATCH] urisourcebin: Refactor ChildSrcPadInfo and OutputSlot usage Make an explicit topology/tree of structures: * ChildSrcPadInfo is created for each source element source pad * ChildSrcPadInfo contains the chain of optional elements specific to that pad (ex: typefind) * A ChildSrcPadInfo links to one or more OutputSlot, which contain what is specific to the output (i.e. optional buffering and ghostpad) * No longer use GObject {set|get}_data() functions to store those structures and instead make them explicit * Pass those structures around explicitely in each function/callback Part-of: --- .../gst/playback/gsturisourcebin.c | 1064 ++++++++++---------- 1 file changed, 524 insertions(+), 540 deletions(-) diff --git a/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c b/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c index af6ebbe..b1a543e 100644 --- a/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c +++ b/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c @@ -90,40 +90,41 @@ typedef struct _OutputSlotInfo OutputSlotInfo; g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \ } G_STMT_END -/* Track a source pad from a child (source, typefind, demuxer) that is linked or - * needs linking to an output slot, or source pads that are directly exposed as - * ghost pads */ +/* Track a source pad from the source element and the chain of (optional) + * elements that are linked to it up to the output slots */ struct _ChildSrcPadInfo { - /* Source pad this info is attached to (not reffed, since the pad owns the - * ChildSrcPadInfo as qdata) */ - GstPad *src_pad; + GstURISourceBin *urisrc; - /* The output GhostPad if this info is for a directly exposed pad (raw or - * not), rather than linked through a slot. */ - GstPad *output_pad; + /* Source pad this info is attached to (reffed) */ + GstPad *src_pad; - /* Configured output slot, if any buffering/download is required */ - OutputSlotInfo *output_slot; + /* An optional typefind */ + GstElement *typefind; - /* ADAPTIVE DEMUXER SPECIFIC */ - guint blocking_probe_id; - guint event_probe_id; + /* list of output slots */ + GList *outputs; }; /* Output Slot: * - * Buffered output + * Handles everything related to outputing, including optional buffering. */ struct _OutputSlotInfo { - ChildSrcPadInfo *linked_info; /* demux source pad info feeding this slot, if any */ - GstElement *queue; /* queue2 or downloadbuffer */ - GstPad *queue_sinkpad; /* Sink pad of the queue eleemnt */ + ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */ + + GstPad *originating_pad; /* Pad that created this OutputSlotInfo (ref held) */ GstPad *output_pad; /* Output ghost pad */ + gboolean is_eos; /* Did EOS get fed into the buffering element */ + GstElement *queue; /* queue2 or downloadbuffer */ + GstPad *queue_sinkpad; /* Sink pad of the queue eleemnt */ + gulong bitrate_changed_id; /* queue bitrate changed notification */ + + guint demuxer_event_probe_id; }; /** @@ -147,7 +148,6 @@ struct _GstURISourceBin gboolean is_stream; gboolean is_adaptive; gboolean demuxer_handles_buffering; /* If TRUE: Don't use buffering elements */ - gboolean source_streams_aware; /* if TRUE: Don't block output pads */ guint64 buffer_duration; /* When buffering, buffer duration (ns) */ guint buffer_size; /* When buffering, buffer size (bytes) */ gboolean download; @@ -156,10 +156,10 @@ struct _GstURISourceBin gdouble high_watermark; GstElement *source; - GList *typefinds; /* list of typefind element */ + + GList *src_infos; /* List of ChildSrcPadInfo for the source */ GstElement *demuxer; /* Adaptive demuxer if any */ - GSList *out_slots; guint numpads; @@ -283,17 +283,18 @@ static gboolean gst_uri_source_bin_query (GstElement * element, static GstStateChangeReturn gst_uri_source_bin_change_state (GstElement * element, GstStateChange transition); -static void handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, +static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps); -static gboolean setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad); +static gboolean setup_typefind (ChildSrcPadInfo * info); static void remove_demuxer (GstURISourceBin * bin); static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad); -static OutputSlotInfo *get_output_slot (GstURISourceBin * urisrc, - gboolean do_download, gboolean is_adaptive); +static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info, + gboolean do_download, gboolean is_adaptive, gboolean no_buffering, + GstPad * originating_pad); static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc); static void free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot); -static GstPad *create_output_pad (GstURISourceBin * urisrc, GstPad * pad); +static GstPad *create_output_pad (OutputSlotInfo * slot, GstPad * pad); static void remove_buffering_msgs (GstURISourceBin * bin, GstObject * src); static void update_queue_values (GstURISourceBin * urisrc); @@ -636,79 +637,129 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) } static GstPadProbeReturn -demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data); +demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot); +/* CALL WITH URISOURCEBIN LOCK */ static void -free_child_src_pad_info (ChildSrcPadInfo * info) +free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc) { - if (info->output_pad) - gst_object_unref (info->output_pad); + g_assert (info->src_pad); + + GST_DEBUG_OBJECT (urisrc, + "Freeing ChildSrcPadInfo for %" GST_PTR_FORMAT, info->src_pad); + if (info->typefind) { + gst_element_set_state (info->typefind, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (urisrc), info->typefind); + } + + gst_object_unref (info->src_pad); + + g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc); + g_list_free (info->outputs); + g_free (info); } -/* Called by the signal handlers when a demuxer has produced a new stream */ -static void -new_demuxer_pad_added_cb (GstElement * element, GstPad * pad, - GstURISourceBin * urisrc) +static ChildSrcPadInfo * +get_cspi_for_pad (GstURISourceBin * urisrc, GstPad * pad) +{ + GList *iter; + + for (iter = urisrc->src_infos; iter; iter = iter->next) { + ChildSrcPadInfo *info = iter->data; + if (info->src_pad == pad) + return info; + } + return NULL; +} + +static ChildSrcPadInfo * +new_child_src_pad_info (GstURISourceBin * urisrc, GstPad * pad) { ChildSrcPadInfo *info; + GST_LOG_OBJECT (urisrc, "New ChildSrcPadInfo for %" GST_PTR_FORMAT, pad); + info = g_new0 (ChildSrcPadInfo, 1); - info->src_pad = pad; + info->urisrc = urisrc; + info->src_pad = gst_object_ref (pad); - g_object_set_data_full (G_OBJECT (pad), "urisourcebin.srcpadinfo", - info, (GDestroyNotify) free_child_src_pad_info); + urisrc->src_infos = g_list_append (urisrc->src_infos, info); + + return info; +} + +/* Called by the signal handlers when a demuxer has produced a new stream */ +static void +new_demuxer_pad_added_cb (GstElement * element, GstPad * pad, + ChildSrcPadInfo * info) +{ + GstURISourceBin *urisrc = info->urisrc; + OutputSlotInfo *slot; + GstPad *output_pad; GST_URI_SOURCE_BIN_LOCK (urisrc); /* If the demuxer handles buffering and is streams-aware, we can expose it as-is directly. We still add an event probe to deal with EOS */ - if (urisrc->demuxer_handles_buffering && urisrc->source_streams_aware) { - info->output_pad = gst_object_ref (create_output_pad (urisrc, pad)); - GST_DEBUG_OBJECT (element, - "New streams-aware demuxer pad %s:%s , exposing directly", - GST_DEBUG_PAD_NAME (pad)); - GST_URI_SOURCE_BIN_UNLOCK (urisrc); - expose_output_pad (urisrc, info->output_pad); - } else { - g_return_if_reached (); - } - info->event_probe_id = + slot = + new_output_slot (info, FALSE, FALSE, urisrc->demuxer_handles_buffering, + pad); + output_pad = gst_object_ref (slot->output_pad); + + GST_DEBUG_OBJECT (element, + "New streams-aware demuxer pad %s:%s , exposing directly", + GST_DEBUG_PAD_NAME (pad)); + slot->demuxer_event_probe_id = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | - GST_PAD_PROBE_TYPE_EVENT_FLUSH, demux_pad_events, urisrc, NULL); + GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events, + slot, NULL); + + GST_URI_SOURCE_BIN_UNLOCK (urisrc); + expose_output_pad (urisrc, output_pad); + gst_object_unref (output_pad); } /* Called with lock held */ static gboolean all_slots_are_eos (GstURISourceBin * urisrc) { - GSList *tmp; + GList *tmp; - for (tmp = urisrc->out_slots; tmp; tmp = tmp->next) { - OutputSlotInfo *slot = (OutputSlotInfo *) tmp->data; - if (slot->is_eos == FALSE) - return FALSE; + for (tmp = urisrc->src_infos; tmp; tmp = tmp->next) { + ChildSrcPadInfo *cspi = tmp->data; + GList *iter2; + for (iter2 = cspi->outputs; iter2; iter2 = iter2->next) { + OutputSlotInfo *slot = (OutputSlotInfo *) iter2->data; + if (slot->is_eos == FALSE) + return FALSE; + } } return TRUE; } +/* CALL WITH URISOURCEBIN LOCK */ +static OutputSlotInfo * +output_slot_for_originating_pad (ChildSrcPadInfo * info, + GstPad * originating_pad) +{ + GList *iter; + for (iter = info->outputs; iter; iter = iter->next) { + OutputSlotInfo *slot = iter->data; + if (slot->originating_pad == originating_pad) + return slot; + } + + return NULL; +} + static GstPadProbeReturn -demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot) { - GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data); - ChildSrcPadInfo *child_info; + GstURISourceBin *urisrc = slot->linked_info->urisrc; GstPadProbeReturn ret = GST_PAD_PROBE_OK; GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info); - if (!(child_info = - g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo"))) - goto done; - GST_URI_SOURCE_BIN_LOCK (urisrc); - /* If not linked to a slot, nothing more to do */ - if (child_info->output_slot == NULL) { - GST_URI_SOURCE_BIN_UNLOCK (urisrc); - goto done; - } switch (GST_EVENT_TYPE (ev)) { case GST_EVENT_EOS: @@ -719,20 +770,13 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) BUFFERING_LOCK (urisrc); /* Mark that we fed an EOS to this slot */ - child_info->output_slot->is_eos = TRUE; + slot->is_eos = TRUE; all_streams_eos = all_slots_are_eos (urisrc); BUFFERING_UNLOCK (urisrc); - /* EOS means this element is no longer buffering */ - remove_buffering_msgs (urisrc, - GST_OBJECT_CAST (child_info->output_slot->queue)); - - /* Mark this custom EOS, replacing the event in the probe data */ - ev = gst_event_make_writable (ev); - GST_PAD_PROBE_INFO_DATA (info) = ev; - - gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_EOS_QUARK, - (gchar *) CUSTOM_EOS_QUARK_DATA, NULL); + if (slot->queue) + /* EOS means this element is no longer buffering */ + remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue)); if (all_streams_eos) { GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH"); @@ -744,7 +788,7 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) case GST_EVENT_STREAM_START: case GST_EVENT_FLUSH_STOP: BUFFERING_LOCK (urisrc); - child_info->output_slot->is_eos = FALSE; + slot->is_eos = FALSE; BUFFERING_UNLOCK (urisrc); break; default: @@ -753,7 +797,6 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) GST_URI_SOURCE_BIN_UNLOCK (urisrc); -done: return ret; } @@ -787,31 +830,37 @@ get_queue_statistics (GstURISourceBin * urisrc) guint64 min_time_level = 0, max_time_level = 0; gdouble avg_byte_level = 0., avg_time_level = 0.; guint i = 0; - GSList *cur; + GList *iter, *cur; GST_URI_SOURCE_BIN_LOCK (urisrc); - for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) { - OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data); - guint byte_limit = 0; - guint64 time_limit = 0; + for (iter = urisrc->src_infos; iter; iter = iter->next) { + ChildSrcPadInfo *info = iter->data; + for (cur = info->outputs; cur; cur = cur->next) { + OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data); + guint byte_limit = 0; + guint64 time_limit = 0; + + if (!slot->queue) + continue; - g_object_get (slot->queue, "current-level-bytes", &byte_limit, - "current-level-time", &time_limit, NULL); + g_object_get (slot->queue, "current-level-bytes", &byte_limit, + "current-level-time", &time_limit, NULL); - if (byte_limit < min_byte_level) - min_byte_level = byte_limit; - if (byte_limit > max_byte_level) - max_byte_level = byte_limit; - avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1); + if (byte_limit < min_byte_level) + min_byte_level = byte_limit; + if (byte_limit > max_byte_level) + max_byte_level = byte_limit; + avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1); - if (time_limit < min_time_level) - min_time_level = time_limit; - if (time_limit > max_time_level) - max_time_level = time_limit; - avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1); + if (time_limit < min_time_level) + min_time_level = time_limit; + if (time_limit > max_time_level) + max_time_level = time_limit; + avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1); - i++; + i++; + } } GST_URI_SOURCE_BIN_UNLOCK (urisrc); @@ -833,7 +882,7 @@ update_queue_values (GstURISourceBin * urisrc) guint buffer_size; gdouble low_watermark, high_watermark; guint64 cumulative_bitrate = 0; - GSList *cur; + GList *iter, *cur; GST_URI_SOURCE_BIN_LOCK (urisrc); duration = GET_BUFFER_DURATION (urisrc); @@ -841,22 +890,29 @@ update_queue_values (GstURISourceBin * urisrc) low_watermark = urisrc->low_watermark; high_watermark = urisrc->high_watermark; - for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) { - OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data); - guint64 bitrate = 0; + for (iter = urisrc->src_infos; iter; iter = iter->next) { + ChildSrcPadInfo *info = iter->data; + for (cur = info->outputs; cur; cur = cur->next) { + OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data); + guint64 bitrate = 0; - if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue), - "bitrate")) { - g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL); - } + if (!slot->queue) + continue; - if (bitrate > 0) - cumulative_bitrate += bitrate; - else { - GST_TRACE_OBJECT (urisrc, "Unknown bitrate detected from %" GST_PTR_FORMAT - ", resetting all bitrates", slot->queue); - cumulative_bitrate = 0; - break; + if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue), + "bitrate")) { + g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL); + } + + if (bitrate > 0) + cumulative_bitrate += bitrate; + else { + GST_TRACE_OBJECT (urisrc, + "Unknown bitrate detected from %" GST_PTR_FORMAT + ", resetting all bitrates", slot->queue); + cumulative_bitrate = 0; + break; + } } } @@ -864,32 +920,38 @@ update_queue_values (GstURISourceBin * urisrc) "bitrate %" G_GUINT64_FORMAT ", buffer size %u, buffer duration %" G_GINT64_FORMAT, cumulative_bitrate, buffer_size, duration); - for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) { - OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data); - guint byte_limit; - - if (cumulative_bitrate > 0 - && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue), - "bitrate")) { - guint64 bitrate; - g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL); - byte_limit = - gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate); - } else { - /* if not all queue's have valid bitrates, use the buffer-size as the - * limit */ - byte_limit = buffer_size; - } + for (iter = urisrc->src_infos; iter; iter = iter->next) { + ChildSrcPadInfo *info = iter->data; + for (cur = info->outputs; cur; cur = cur->next) { + OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data); + guint byte_limit; + + if (!slot->queue) + continue; + + if (cumulative_bitrate > 0 + && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue), + "bitrate")) { + guint64 bitrate; + g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL); + byte_limit = + gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate); + } else { + /* if not all queue's have valid bitrates, use the buffer-size as the + * limit */ + byte_limit = buffer_size; + } - GST_DEBUG_OBJECT (urisrc, - "calculated new limits for queue-like element %" GST_PTR_FORMAT - ", bytes:%u, time:%" G_GUINT64_FORMAT - ", low-watermark:%f, high-watermark:%f", - slot->queue, byte_limit, (guint64) duration, low_watermark, - high_watermark); - g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit, - "max-size-time", (guint64) duration, "low-watermark", low_watermark, - "high-watermark", high_watermark, NULL); + GST_DEBUG_OBJECT (urisrc, + "calculated new limits for queue-like element %" GST_PTR_FORMAT + ", bytes:%u, time:%" G_GUINT64_FORMAT + ", low-watermark:%f, high-watermark:%f", + slot->queue, byte_limit, (guint64) duration, low_watermark, + high_watermark); + g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit, + "max-size-time", (guint64) duration, "low-watermark", low_watermark, + "high-watermark", high_watermark, NULL); + } } GST_URI_SOURCE_BIN_UNLOCK (urisrc); } @@ -906,112 +968,119 @@ on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec, /* Called with lock held */ static OutputSlotInfo * -get_output_slot (GstURISourceBin * urisrc, gboolean do_download, - gboolean is_adaptive) +new_output_slot (ChildSrcPadInfo * info, gboolean do_download, + gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad) { + GstURISourceBin *urisrc = info->urisrc; OutputSlotInfo *slot; GstPad *srcpad; - GstElement *queue; + GstElement *queue = NULL; const gchar *elem_name; - /* Otherwise create the new slot */ - if (do_download) - elem_name = "downloadbuffer"; - else - elem_name = "queue2"; - - queue = gst_element_factory_make (elem_name, NULL); - if (!queue) - goto no_buffer_element; + GST_DEBUG_OBJECT (urisrc, + "do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%" + GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad); slot = g_new0 (OutputSlotInfo, 1); - slot->queue = queue; + slot->linked_info = info; - /* Set the slot onto the queue (needed in buffering msg handling) */ - g_object_set_data (G_OBJECT (queue), "urisourcebin.slotinfo", slot); + /* If buffering is required, create the element */ + if (!no_buffering) { + if (do_download) + elem_name = "downloadbuffer"; + else + elem_name = "queue2"; - slot->bitrate_changed_id = - g_signal_connect (G_OBJECT (queue), "notify::bitrate", - (GCallback) on_queue_bitrate_changed, urisrc); + queue = gst_element_factory_make (elem_name, NULL); + if (!queue) + goto no_buffer_element; - if (do_download) { - gchar *temp_template, *filename; - const gchar *tmp_dir, *prgname; + slot->queue = queue; - tmp_dir = g_get_user_cache_dir (); - prgname = g_get_prgname (); - if (prgname == NULL) - prgname = "GStreamer"; + slot->bitrate_changed_id = + g_signal_connect (G_OBJECT (queue), "notify::bitrate", + (GCallback) on_queue_bitrate_changed, urisrc); - filename = g_strdup_printf ("%s-XXXXXX", prgname); + if (do_download) { + gchar *temp_template, *filename; + const gchar *tmp_dir, *prgname; - /* build our filename */ - temp_template = g_build_filename (tmp_dir, filename, NULL); + tmp_dir = g_get_user_cache_dir (); + prgname = g_get_prgname (); + if (prgname == NULL) + prgname = "GStreamer"; - GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)", - temp_template, tmp_dir, prgname, filename); + filename = g_strdup_printf ("%s-XXXXXX", prgname); - /* configure progressive download for selected media types */ - g_object_set (queue, "temp-template", temp_template, NULL); + /* build our filename */ + temp_template = g_build_filename (tmp_dir, filename, NULL); - g_free (filename); - g_free (temp_template); - } else { - if (is_adaptive) { - GST_LOG_OBJECT (urisrc, "Adding queue for adaptive streaming stream"); - g_object_set (queue, "use-buffering", urisrc->use_buffering, - "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL); + GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)", + temp_template, tmp_dir, prgname, filename); + + /* configure progressive download for selected media types */ + g_object_set (queue, "temp-template", temp_template, NULL); + + g_free (filename); + g_free (temp_template); } else { - GST_LOG_OBJECT (urisrc, "Adding queue for buffering"); - g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL); - } + if (is_adaptive) { + GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream"); + g_object_set (queue, "use-buffering", urisrc->use_buffering, + "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL); + } else { + GST_LOG_OBJECT (urisrc, "Adding queue for buffering"); + g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL); + } - g_object_set (queue, "ring-buffer-max-size", - urisrc->ring_buffer_max_size, NULL); - /* Disable max-size-buffers - queue based on data rate to the default time limit */ - g_object_set (queue, "max-size-buffers", 0, NULL); + g_object_set (queue, "ring-buffer-max-size", + urisrc->ring_buffer_max_size, NULL); + /* Disable max-size-buffers - queue based on data rate to the default time limit */ + g_object_set (queue, "max-size-buffers", 0, NULL); - /* Don't start buffering until the queue is empty (< 1%). - * Start playback when the queue is 60% full, leaving a bit more room - * for upstream to push more without getting bursty */ - g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL); + /* Don't start buffering until the queue is empty (< 1%). + * Start playback when the queue is 60% full, leaving a bit more room + * for upstream to push more without getting bursty */ + g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL); - g_object_set (queue, "low-watermark", urisrc->low_watermark, - "high-watermark", urisrc->high_watermark, NULL); - } + g_object_set (queue, "low-watermark", urisrc->low_watermark, + "high-watermark", urisrc->high_watermark, NULL); + } - /* set the necessary limits on the queue-like elements */ - g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc), - "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL); -#if 0 - /* Disabled because this makes initial startup slower for radio streams */ - else { - /* Buffer 4 seconds by default - some extra headroom over the - * core default, because we trigger playback sooner */ - //g_object_set (queue, "max-size-time", 4 * GST_SECOND, NULL); - } -#endif + /* set the necessary limits on the queue-like elements */ + g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc), + "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL); + + gst_bin_add (GST_BIN_CAST (urisrc), queue); + gst_element_sync_state_with_parent (queue); + + slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink"); - /* save queue pointer so we can remove it later */ - urisrc->out_slots = g_slist_prepend (urisrc->out_slots, slot); + /* get the new raw srcpad */ + srcpad = gst_element_get_static_pad (queue, "src"); - gst_bin_add (GST_BIN_CAST (urisrc), queue); - gst_element_sync_state_with_parent (queue); + slot->output_pad = create_output_pad (slot, srcpad); - slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink"); + gst_object_unref (srcpad); - /* get the new raw srcpad */ - srcpad = gst_element_get_static_pad (queue, "src"); - g_object_set_data (G_OBJECT (srcpad), "urisourcebin.slotinfo", slot); + gst_pad_link (originating_pad, slot->queue_sinkpad); + } else { + /* Expose pad directly */ + slot->output_pad = create_output_pad (slot, originating_pad); + } + slot->originating_pad = gst_object_ref (originating_pad); - slot->output_pad = create_output_pad (urisrc, srcpad); + /* save output slot so we can remove it later */ + info->outputs = g_list_append (info->outputs, slot); - gst_object_unref (srcpad); + GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT, + slot->output_pad); return slot; no_buffer_element: { + g_free (slot); post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), elem_name); return NULL; } @@ -1022,15 +1091,19 @@ source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); - GstURISourceBin *urisrc = user_data; + OutputSlotInfo *slot = user_data; + GstURISourceBin *urisrc = slot->linked_info->urisrc; - GST_LOG_OBJECT (pad, "%s, urisrc %p", GST_EVENT_TYPE_NAME (event), event); + GST_LOG_OBJECT (pad, "%" GST_PTR_FORMAT, event); + /* A custom EOS will be received if an adaptive demuxer source pad removed a + * pad and buffering was present on that slot */ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS && gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK)) { - OutputSlotInfo *slot; - GST_DEBUG_OBJECT (pad, "we received EOS"); + GstPadProbeReturn probe_ret = GST_PAD_PROBE_DROP; + + GST_DEBUG_OBJECT (pad, "we received custom EOS"); /* remove custom-eos */ gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK, @@ -1038,41 +1111,19 @@ source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info, GST_URI_SOURCE_BIN_LOCK (urisrc); - slot = g_object_get_data (G_OBJECT (pad), "urisourcebin.slotinfo"); - - if (slot) { - GstEvent *eos; - guint32 seqnum; - - if (slot->linked_info) { - if (slot->is_eos) { - /* linked_info is old input which is still linked without removal */ - GST_DEBUG_OBJECT (pad, "push actual EOS"); - seqnum = gst_event_get_seqnum (event); - eos = gst_event_new_eos (); - gst_event_set_seqnum (eos, seqnum); - gst_pad_push_event (slot->output_pad, eos); - } else { - /* Do not clear output slot yet. A new input was - * connected. We should just drop this EOS */ - } - GST_URI_SOURCE_BIN_UNLOCK (urisrc); - return GST_PAD_PROBE_DROP; - } - - seqnum = gst_event_get_seqnum (event); - eos = gst_event_new_eos (); - gst_event_set_seqnum (eos, seqnum); - gst_pad_push_event (slot->output_pad, eos); - free_output_slot_async (urisrc, slot); + if (slot->is_eos) { + /* linked_info is old input which is still linked without removal */ + GST_DEBUG_OBJECT (pad, "push actual EOS"); + gst_pad_push_event (slot->output_pad, event); + probe_ret = GST_PAD_PROBE_HANDLED; } - /* FIXME: Only emit drained if all output pads are done and there's no - * pending pads */ - g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_DRAINED], 0, NULL); + /* And finally remove the output. This is done asynchronously since we can't + * do it from the streaming thread */ + free_output_slot_async (urisrc, slot); GST_URI_SOURCE_BIN_UNLOCK (urisrc); - return GST_PAD_PROBE_DROP; + return probe_ret; } /* never drop events */ return GST_PAD_PROBE_OK; @@ -1082,14 +1133,17 @@ source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info, * padprobe to detect EOS before exposing the pad. * Called with LOCK held. */ static GstPad * -create_output_pad (GstURISourceBin * urisrc, GstPad * pad) +create_output_pad (OutputSlotInfo * slot, GstPad * pad) { + GstURISourceBin *urisrc = slot->linked_info->urisrc; GstPad *newpad; GstPadTemplate *pad_tmpl; gchar *padname; - gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, - source_pad_event_probe, urisrc, NULL); + /* If the output slot does buffering, add a probe to detect drainage */ + if (slot->queue) + gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, + source_pad_event_probe, slot, NULL); pad_tmpl = gst_static_pad_template_get (&srctemplate); @@ -1162,97 +1216,61 @@ expose_output_pad (GstURISourceBin * urisrc, GstPad * pad) } static void -expose_raw_output_pad (GstURISourceBin * urisrc, GstPad * srcpad, - GstPad * output_pad) +demuxer_pad_removed_cb (GstElement * element, GstPad * pad, + ChildSrcPadInfo * info) { - ChildSrcPadInfo *info = g_new0 (ChildSrcPadInfo, 1); - info->src_pad = srcpad; - info->output_pad = gst_object_ref (output_pad); - - g_assert (g_object_get_data (G_OBJECT (srcpad), - "urisourcebin.srcpadinfo") == NULL); - - g_object_set_data_full (G_OBJECT (srcpad), "urisourcebin.srcpadinfo", - info, (GDestroyNotify) free_child_src_pad_info); - - expose_output_pad (urisrc, output_pad); -} - -static void -remove_output_pad (GstURISourceBin * urisrc, GstPad * pad) -{ - if (!gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc))) - return; /* Pad is not exposed */ - - GST_DEBUG_OBJECT (urisrc, "Removing pad %s:%s", GST_DEBUG_PAD_NAME (pad)); - - gst_pad_set_active (pad, FALSE); - gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), pad); -} - -static void -pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc) -{ - ChildSrcPadInfo *info; - - GST_DEBUG_OBJECT (element, "pad removed name: <%s:%s>", - GST_DEBUG_PAD_NAME (pad)); + GstURISourceBin *urisrc; + OutputSlotInfo *slot; /* we only care about srcpads */ if (!GST_PAD_IS_SRC (pad)) return; - if (!(info = g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo"))) - goto no_info; + urisrc = info->urisrc; + + GST_DEBUG_OBJECT (urisrc, "pad removed name: <%s:%s>", + GST_DEBUG_PAD_NAME (pad)); GST_URI_SOURCE_BIN_LOCK (urisrc); + slot = output_slot_for_originating_pad (info, pad); + g_assert (slot); - /* Send EOS to the output slot if the demuxer didn't already */ - if (info->output_slot) { - GstStructure *s; - GstEvent *event; - OutputSlotInfo *slot; + gst_pad_remove_probe (pad, slot->demuxer_event_probe_id); + slot->demuxer_event_probe_id = 0; - slot = info->output_slot; + if (slot->queue) { + gboolean was_eos; + + /* Propagate custom EOS to buffering elements. The slot will be removed when + * it is received on the output of the buffering elements */ BUFFERING_LOCK (urisrc); /* Unlink this pad from its output slot and send a fake EOS event * to drain the queue */ + was_eos = slot->is_eos; slot->is_eos = TRUE; BUFFERING_UNLOCK (urisrc); remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue)); - - slot->linked_info = NULL; - - info->output_slot = NULL; - - GST_LOG_OBJECT (element, - "Pad %" GST_PTR_FORMAT " was removed without EOS. Sending.", pad); - - event = gst_event_new_eos (); - s = gst_event_writable_structure (event); - gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE, - NULL); - gst_pad_send_event (slot->queue_sinkpad, event); - } else if (info->output_pad != NULL) { - GST_LOG_OBJECT (element, - "Pad %" GST_PTR_FORMAT " was removed. Unexposing %" GST_PTR_FORMAT, - pad, info->output_pad); - remove_output_pad (urisrc, info->output_pad); + if (!was_eos) { + GstStructure *s; + GstEvent *event; + event = gst_event_new_eos (); + s = gst_event_writable_structure (event); + gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE, + NULL); + gst_pad_send_event (slot->queue_sinkpad, event); + } } else { - GST_WARNING_OBJECT (urisrc, "Removed pad has no output slot or pad"); + GST_LOG_OBJECT (urisrc, + "No buffering involved, removing output slot immediately"); + /* Remove output slot immediately */ + info->outputs = g_list_remove (info->outputs, slot); + free_output_slot (slot, urisrc); } GST_URI_SOURCE_BIN_UNLOCK (urisrc); return; - - /* ERRORS */ -no_info: - { - GST_WARNING_OBJECT (element, "no info found for pad"); - return; - } } /* helper function to lookup stuff in lists */ @@ -1488,6 +1506,80 @@ post_missing_plugin_error (GstElement * urisrc, const gchar * element_name) element_name), (NULL)); } +typedef struct +{ + GstURISourceBin *urisrc; + gboolean have_out; + gboolean res; +} AnalyseData; + +static void +analyse_pad_foreach (const GValue * item, AnalyseData * data) +{ + GstURISourceBin *urisrc = data->urisrc; + GstPad *pad = g_value_dup_object (item); + ChildSrcPadInfo *info; + GstCaps *padcaps = NULL; + gboolean pad_is_raw; + gboolean res = TRUE; + + GST_LOG_OBJECT (urisrc, "pad %" GST_PTR_FORMAT, pad); + + data->have_out = TRUE; + + /* The info might already exist if there was an iterator resync */ + if (get_cspi_for_pad (urisrc, pad)) { + GST_LOG_OBJECT (urisrc, "Already analysed"); + goto out; + } + + info = new_child_src_pad_info (urisrc, pad); + padcaps = gst_pad_query_caps (pad, NULL); + + if (!is_all_raw_caps (padcaps, DEFAULT_CAPS, &pad_is_raw) || !pad_is_raw) { + /* if FALSE, this pad has no caps, we setup typefinding on it */ + if (!setup_typefind (info)) { + res = FALSE; + goto out; + } + } else if (pad_is_raw) { + /* caps on source pad are all raw, we can add the pad */ + GstPad *output_pad; + OutputSlotInfo *slot; + + GST_URI_SOURCE_BIN_LOCK (urisrc); + /* Only use buffering on raw pads in very specific conditions */ + GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d", + urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri)); + slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering + || !IS_QUEUE_URI (urisrc->uri), pad); + + if (!slot) { + res = FALSE; + GST_URI_SOURCE_BIN_UNLOCK (urisrc); + goto out; + } + + /* get the new raw srcpad */ + output_pad = gst_object_ref (slot->output_pad); + + GST_URI_SOURCE_BIN_UNLOCK (urisrc); + + expose_output_pad (urisrc, output_pad); + gst_object_unref (output_pad); + } else { + GST_DEBUG_OBJECT (urisrc, "Handling non-raw pad"); + /* The caps are non-raw, we handle it directly */ + handle_new_pad (info, pad, padcaps); + } + +out: + if (padcaps) + gst_caps_unref (padcaps); + gst_object_unref (pad); + data->res &= res; +} + /** * analyse_source_and_expose_raw_pads: * @urisrc: a #GstURISourceBin @@ -1514,108 +1606,35 @@ post_missing_plugin_error (GstElement * urisrc, const gchar * element_name) */ static gboolean analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc, - gboolean * all_pads_raw, gboolean * have_out, gboolean * is_dynamic) + gboolean * have_out, gboolean * is_dynamic) { GstElementClass *elemclass; + AnalyseData data = { 0, }; + GstIteratorResult iterres; GList *walk; GstIterator *pads_iter; - gboolean done = FALSE; gboolean res = TRUE; - GstPad *pad; - GValue item = { 0, }; - guint nb_raw = 0; - guint nb_pads = 0; - GstCaps *rawcaps = DEFAULT_CAPS; - gboolean pad_is_raw; - gboolean use_queue; - - *have_out = FALSE; - *all_pads_raw = FALSE; - *is_dynamic = FALSE; - - /* Add buffering elements on raw pads only is very specific conditions */ - use_queue = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri); pads_iter = gst_element_iterate_src_pads (urisrc->source); - while (!done) { - switch (gst_iterator_next (pads_iter, &item)) { - case GST_ITERATOR_ERROR: - res = FALSE; - /* FALLTHROUGH */ - case GST_ITERATOR_DONE: - done = TRUE; - break; - case GST_ITERATOR_RESYNC: - /* reset results and resync */ - *have_out = FALSE; - *all_pads_raw = FALSE; - *is_dynamic = FALSE; - nb_pads = nb_raw = 0; - gst_iterator_resync (pads_iter); - break; - case GST_ITERATOR_OK: - { - GstCaps *padcaps; - - pad = g_value_dup_object (&item); - /* we now officially have an output pad */ - *have_out = TRUE; - nb_pads++; - padcaps = gst_pad_query_caps (pad, NULL); - - if (!is_all_raw_caps (padcaps, rawcaps, &pad_is_raw)) { - /* if FALSE, this pad has no caps, we setup typefinding on it */ - if (!setup_typefind (urisrc, pad)) { - res = FALSE; - done = TRUE; - } - } else if (pad_is_raw) { - /* caps on source pad are all raw, we can add the pad */ - GstPad *output_pad; - - nb_raw++; - GST_URI_SOURCE_BIN_LOCK (urisrc); - if (use_queue) { - OutputSlotInfo *slot = get_output_slot (urisrc, FALSE, FALSE); - if (!slot) { - gst_caps_unref (padcaps); - gst_object_unref (pad); - goto no_slot; - } - - gst_pad_link (pad, slot->queue_sinkpad); - - /* get the new raw srcpad */ - output_pad = gst_object_ref (slot->output_pad); - - GST_URI_SOURCE_BIN_UNLOCK (urisrc); - - expose_output_pad (urisrc, output_pad); - gst_object_unref (output_pad); - } else { - output_pad = create_output_pad (urisrc, pad); - - GST_URI_SOURCE_BIN_UNLOCK (urisrc); - - expose_raw_output_pad (urisrc, pad, output_pad); - } - } else { - /* The caps are non-raw, we handle it directly */ - handle_new_pad (urisrc, pad, padcaps); - } - gst_caps_unref (padcaps); - gst_object_unref (pad); - g_value_reset (&item); - break; - } - } - } - g_value_unset (&item); + +restart: + data.res = TRUE; + data.have_out = FALSE; + data.urisrc = urisrc; + iterres = + gst_iterator_foreach (pads_iter, + (GstIteratorForeachFunction) analyse_pad_foreach, &data); + if (iterres == GST_ITERATOR_RESYNC) + goto restart; + if (iterres == GST_ITERATOR_ERROR) + res = FALSE; + else + res = data.res; gst_iterator_free (pads_iter); - gst_caps_unref (rawcaps); /* check for padtemplates that list SOMETIMES pads to * determine if the element is dynamic. */ + *is_dynamic = FALSE; elemclass = GST_ELEMENT_GET_CLASS (urisrc->source); walk = gst_element_class_get_pad_template_list (elemclass); while (walk != NULL) { @@ -1630,20 +1649,9 @@ analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc, walk = g_list_next (walk); } - if (nb_pads && nb_pads == nb_raw) - *all_pads_raw = TRUE; + *have_out = data.have_out; return res; -no_slot: - { - GST_URI_SOURCE_BIN_UNLOCK (urisrc); - gst_object_unref (pad); - g_value_unset (&item); - gst_iterator_free (pads_iter); - gst_caps_unref (rawcaps); - - return FALSE; - } } /* Remove any adaptive demuxer element */ @@ -1661,7 +1669,7 @@ remove_demuxer (GstURISourceBin * bin) /* make a demuxer and connect to all the signals */ static GstElement * -make_demuxer (GstURISourceBin * urisrc, GstCaps * caps) +make_demuxer (GstURISourceBin * urisrc, ChildSrcPadInfo * info, GstCaps * caps) { GList *factories, *eligible, *cur; GstElement *demuxer = NULL; @@ -1712,9 +1720,9 @@ make_demuxer (GstURISourceBin * urisrc, GstCaps * caps) /* set up callbacks to create the links between * demuxer streams and output */ g_signal_connect (demuxer, - "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), urisrc); + "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info); g_signal_connect (demuxer, - "pad-removed", G_CALLBACK (pad_removed_cb), urisrc); + "pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info); /* Propagate connection-speed property */ pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (demuxer), @@ -1740,8 +1748,9 @@ no_demuxer: * * typefind has found a type */ static void -handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps) +handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps) { + GstURISourceBin *urisrc = info->urisrc; gboolean is_raw; GstStructure *s; const gchar *media_type; @@ -1751,14 +1760,17 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps) /* if this is a pad with all raw caps, we can expose it */ if (is_all_raw_caps (caps, DEFAULT_CAPS, &is_raw) && is_raw) { + OutputSlotInfo *slot; GstPad *output_pad; GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT ", exposing", caps); - output_pad = create_output_pad (urisrc, srcpad); + slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad); + output_pad = gst_object_ref (slot->output_pad); GST_URI_SOURCE_BIN_UNLOCK (urisrc); - expose_raw_output_pad (urisrc, srcpad, output_pad); + expose_output_pad (urisrc, slot->output_pad); + gst_object_unref (output_pad); return; } GST_URI_SOURCE_BIN_UNLOCK (urisrc); @@ -1773,7 +1785,7 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps) GstPadLinkReturn link_res; GstQuery *query; - urisrc->demuxer = make_demuxer (urisrc, caps); + urisrc->demuxer = make_demuxer (urisrc, info, caps); if (!urisrc->demuxer) goto no_demuxer; gst_bin_add (GST_BIN_CAST (urisrc), urisrc->demuxer); @@ -1798,12 +1810,16 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps) gst_element_sync_state_with_parent (urisrc->demuxer); } else if (!urisrc->is_stream) { + OutputSlotInfo *slot; GstPad *output_pad; - /* We don't need slot here, expose immediately */ + + /* We don't need buffering here, expose immediately */ GST_URI_SOURCE_BIN_LOCK (urisrc); - output_pad = create_output_pad (urisrc, srcpad); + slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad); + output_pad = gst_object_ref (slot->output_pad); GST_URI_SOURCE_BIN_UNLOCK (urisrc); - expose_raw_output_pad (urisrc, srcpad, output_pad); + expose_output_pad (urisrc, output_pad); + gst_object_unref (output_pad); } else { OutputSlotInfo *slot; GstPad *output_pad; @@ -1823,11 +1839,7 @@ handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps) do_download); GST_URI_SOURCE_BIN_LOCK (urisrc); - slot = get_output_slot (urisrc, do_download, FALSE); - - if (slot == NULL - || gst_pad_link (srcpad, slot->queue_sinkpad) != GST_PAD_LINK_OK) - goto could_not_link; + slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad); gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, pre_queue_event_probe, urisrc, NULL); @@ -1866,13 +1878,14 @@ could_not_link: * buffering or regular buffering */ static void type_found (GstElement * typefind, guint probability, - GstCaps * caps, GstURISourceBin * urisrc) + GstCaps * caps, ChildSrcPadInfo * info) { + GstURISourceBin *urisrc = info->urisrc; GstPad *srcpad = gst_element_get_static_pad (typefind, "src"); GST_DEBUG_OBJECT (urisrc, "typefind found caps %" GST_PTR_FORMAT " on pad %" GST_PTR_FORMAT, caps, srcpad); - handle_new_pad (urisrc, srcpad, caps); + handle_new_pad (info, srcpad, caps); gst_object_unref (GST_OBJECT (srcpad)); } @@ -1881,42 +1894,33 @@ type_found (GstElement * typefind, guint probability, * source. After we find the type, we decide to whether to plug an adaptive * demuxer, or just link through queue2 (if needed) and expose the data */ static gboolean -setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad) +setup_typefind (ChildSrcPadInfo * info) { - GstElement *typefind; + GstURISourceBin *urisrc = info->urisrc; + GstPad *sinkpad; /* now create the typefind element */ - typefind = gst_element_factory_make ("typefind", NULL); - if (!typefind) + info->typefind = gst_element_factory_make ("typefind", NULL); + if (!info->typefind) goto no_typefind; /* Make sure the bin doesn't set the typefind running yet */ - gst_element_set_locked_state (typefind, TRUE); - - gst_bin_add (GST_BIN_CAST (urisrc), typefind); - - if (!srcpad) { - if (!gst_element_link_pads (urisrc->source, NULL, typefind, "sink")) - goto could_not_link; - } else { - GstPad *sinkpad = gst_element_get_static_pad (typefind, "sink"); - GstPadLinkReturn ret; + gst_element_set_locked_state (info->typefind, TRUE); - ret = gst_pad_link (srcpad, sinkpad); - gst_object_unref (sinkpad); - if (ret != GST_PAD_LINK_OK) - goto could_not_link; - } + gst_bin_add (GST_BIN_CAST (urisrc), info->typefind); - urisrc->typefinds = g_list_append (urisrc->typefinds, typefind); + sinkpad = gst_element_get_static_pad (info->typefind, "sink"); + if (gst_pad_link (info->src_pad, sinkpad) != GST_PAD_LINK_OK) + goto could_not_link; + gst_object_unref (sinkpad); /* connect a signal to find out when the typefind element found * a type */ - g_signal_connect (typefind, "have-type", G_CALLBACK (type_found), urisrc); + g_signal_connect (info->typefind, "have-type", G_CALLBACK (type_found), info); /* Now it can start */ - gst_element_set_locked_state (typefind, FALSE); - gst_element_sync_state_with_parent (typefind); + gst_element_set_locked_state (info->typefind, FALSE); + gst_element_sync_state_with_parent (info->typefind); return TRUE; @@ -1930,29 +1934,38 @@ no_typefind: } could_not_link: { + gst_object_unref (sinkpad); GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION, (NULL), ("Can't link source to typefind element")); - gst_bin_remove (GST_BIN_CAST (urisrc), typefind); return FALSE; } } +/* CALL WITH URISOURCEBIN LOCK */ static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc) { - GST_DEBUG_OBJECT (urisrc, "removing old queue element and freeing slot %p", - slot); - if (slot->bitrate_changed_id > 0) - g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id); - slot->bitrate_changed_id = 0; + GST_DEBUG_OBJECT (urisrc, + "removing output slot %" GST_PTR_FORMAT " -> %" GST_PTR_FORMAT, + slot->originating_pad, slot->output_pad); + + if (slot->queue) { + if (slot->bitrate_changed_id > 0) + g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id); + slot->bitrate_changed_id = 0; + + gst_element_set_locked_state (slot->queue, TRUE); + gst_element_set_state (slot->queue, GST_STATE_NULL); + remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue)); + gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue); - gst_element_set_locked_state (slot->queue, TRUE); - gst_element_set_state (slot->queue, GST_STATE_NULL); - remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue)); - gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue); + gst_object_unref (slot->queue_sinkpad); + } - gst_object_unref (slot->queue_sinkpad); + if (slot->demuxer_event_probe_id) + gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id); + gst_object_unref (slot->originating_pad); /* deactivate and remove the srcpad */ gst_pad_set_active (slot->output_pad, FALSE); gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), slot->output_pad); @@ -1972,41 +1985,11 @@ static void free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot) { GST_LOG_OBJECT (urisrc, "pushing output slot on thread pool to free"); - urisrc->out_slots = g_slist_remove (urisrc->out_slots, slot); + slot->linked_info->outputs = g_list_remove (slot->linked_info->outputs, slot); gst_element_call_async (GST_ELEMENT_CAST (urisrc), (GstElementCallAsyncFunc) call_free_output_slot, slot, NULL); } -static void -unexpose_raw_pad_func (const GValue * item, GstURISourceBin * urisrc) -{ - GstPad *pad = g_value_get_object (item); - ChildSrcPadInfo *info = - g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo"); - - if (info && info->output_pad != NULL) - remove_output_pad (urisrc, info->output_pad); -} - -static void -unexpose_src_pads (GstURISourceBin * urisrc, GstElement * element) -{ - GstIterator *pads_iter; - - pads_iter = gst_element_iterate_src_pads (element); - gst_iterator_foreach (pads_iter, - (GstIteratorForeachFunction) unexpose_raw_pad_func, urisrc); - gst_iterator_free (pads_iter); -} - -static void -remove_typefind (GstElement * typefind, GstURISourceBin * urisrc) -{ - unexpose_src_pads (urisrc, typefind); - gst_element_set_state (typefind, GST_STATE_NULL); - gst_bin_remove (GST_BIN_CAST (urisrc), typefind); -} - /* remove source and all related elements */ static void remove_source (GstURISourceBin * urisrc) @@ -2015,7 +1998,6 @@ remove_source (GstURISourceBin * urisrc) GstElement *source = urisrc->source; GST_DEBUG_OBJECT (urisrc, "removing old src element"); - unexpose_src_pads (urisrc, source); gst_element_set_state (source, GST_STATE_NULL); if (urisrc->src_np_sig_id) { @@ -2026,17 +2008,12 @@ remove_source (GstURISourceBin * urisrc) urisrc->source = NULL; } - if (urisrc->typefinds) { - GST_DEBUG_OBJECT (urisrc, "removing old typefind elements"); - g_list_foreach (urisrc->typefinds, (GFunc) remove_typefind, urisrc); - g_list_free (urisrc->typefinds); - urisrc->typefinds = NULL; - } - GST_URI_SOURCE_BIN_LOCK (urisrc); - g_slist_foreach (urisrc->out_slots, (GFunc) free_output_slot, urisrc); - g_slist_free (urisrc->out_slots); - urisrc->out_slots = NULL; + if (urisrc->src_infos) { + g_list_foreach (urisrc->src_infos, (GFunc) free_child_src_pad_info, urisrc); + g_list_free (urisrc->src_infos); + urisrc->src_infos = NULL; + } GST_URI_SOURCE_BIN_UNLOCK (urisrc); if (urisrc->demuxer) @@ -2048,14 +2025,17 @@ static void source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc) { GstCaps *caps; + ChildSrcPadInfo *info = new_child_src_pad_info (urisrc, pad); GST_DEBUG_OBJECT (urisrc, "Found new pad %s.%s in source element %s", GST_DEBUG_PAD_NAME (pad), GST_ELEMENT_NAME (element)); + caps = gst_pad_get_current_caps (pad); + GST_DEBUG_OBJECT (urisrc, "caps %" GST_PTR_FORMAT, caps); if (caps == NULL) - setup_typefind (urisrc, pad); + setup_typefind (info); else { - handle_new_pad (urisrc, pad, caps); + handle_new_pad (info, pad, caps); gst_caps_unref (caps); } } @@ -2066,7 +2046,7 @@ source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc) static gboolean setup_source (GstURISourceBin * urisrc) { - gboolean all_pads_raw, have_out, is_dynamic; + gboolean have_out, is_dynamic; GST_DEBUG_OBJECT (urisrc, "setup source"); @@ -2088,18 +2068,10 @@ setup_source (GstURISourceBin * urisrc) * if so, we can create streams for the pads and be done with it. * Also check that is has source pads, if not, we assume it will * do everything itself. */ - if (!analyse_source_and_expose_raw_pads (urisrc, &all_pads_raw, &have_out, - &is_dynamic)) + if (!analyse_source_and_expose_raw_pads (urisrc, &have_out, &is_dynamic)) goto invalid_source; if (!is_dynamic) { - if (all_pads_raw) { - GST_DEBUG_OBJECT (urisrc, "Source provides all raw data"); - /* source provides raw data, we added the pads and we can now signal a - * no_more pads because we are done. */ - gst_element_no_more_pads (GST_ELEMENT_CAST (urisrc)); - return TRUE; - } if (!have_out) goto no_pads; } else { @@ -2221,6 +2193,24 @@ handle_redirect_message (GstURISourceBin * urisrc, GstMessage * msg) return new_msg; } +/* CALL WITH URISOURCEBIN LOCK */ +static OutputSlotInfo * +output_slot_for_buffering_element (GstURISourceBin * urisrc, + GstElement * element) +{ + GList *top, *iter; + for (top = urisrc->src_infos; top; top = top->next) { + ChildSrcPadInfo *info = top->data; + for (iter = info->outputs; iter; iter = iter->next) { + OutputSlotInfo *slot = iter->data; + if (slot->queue == element) + return slot; + } + } + + return NULL; +} + static void handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) { @@ -2231,9 +2221,9 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) GList *iter; OutputSlotInfo *slot; - /* buffering messages must be aggregated as there might be multiple - * multiqueue in the pipeline and their independent buffering messages - * will confuse the application + /* buffering messages must be aggregated as there might be multiple buffering + * elements in the pipeline and their independent buffering messages will + * confuse the application * * urisourcebin keeps a list of messages received from elements that are * buffering. @@ -2249,10 +2239,10 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT " with %d%%", GST_MESSAGE_SRC (msg), msg_perc); - slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)), - "urisourcebin.slotinfo"); - BUFFERING_LOCK (urisrc); + slot = + output_slot_for_buffering_element (urisrc, + (GstElement *) GST_MESSAGE_SRC (msg)); if (slot && slot->is_eos) { /* Ignore buffering messages from queues we marked as EOS, * we already removed those from the list of buffering @@ -2275,8 +2265,9 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) GstMessage *bufstats = iter->data; gboolean is_eos = FALSE; - slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)), - "urisourcebin.slotinfo"); + slot = + output_slot_for_buffering_element (urisrc, + (GstElement *) GST_MESSAGE_SRC (msg)); if (slot) is_eos = slot->is_eos; @@ -2403,12 +2394,6 @@ handle_message (GstBin * bin, GstMessage * msg) } break; } - case GST_MESSAGE_STREAM_COLLECTION: - { - GST_DEBUG_OBJECT (urisrc, "Source is streams-aware"); - urisrc->source_streams_aware = TRUE; - break; - } case GST_MESSAGE_BUFFERING: handle_buffering_message (urisrc, msg); msg = NULL; @@ -2749,7 +2734,6 @@ gst_uri_source_bin_change_state (GstElement * element, (GDestroyNotify) gst_message_unref); urisrc->buffering_status = NULL; urisrc->last_buffering_pct = -1; - urisrc->source_streams_aware = FALSE; break; case GST_STATE_CHANGE_READY_TO_NULL: GST_DEBUG ("ready to null"); -- 2.7.4