g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \
} G_STMT_END
-/* Track a source pad from a child (source, typefind, demuxer) that is linked or
- * needs linking to an output slot, or source pads that are directly exposed as
- * ghost pads */
+/* Track a source pad from the source element and the chain of (optional)
+ * elements that are linked to it up to the output slots */
struct _ChildSrcPadInfo
{
- /* Source pad this info is attached to (not reffed, since the pad owns the
- * ChildSrcPadInfo as qdata) */
- GstPad *src_pad;
+ GstURISourceBin *urisrc;
- /* The output GhostPad if this info is for a directly exposed pad (raw or
- * not), rather than linked through a slot. */
- GstPad *output_pad;
+ /* Source pad this info is attached to (reffed) */
+ GstPad *src_pad;
- /* Configured output slot, if any buffering/download is required */
- OutputSlotInfo *output_slot;
+ /* An optional typefind */
+ GstElement *typefind;
- /* ADAPTIVE DEMUXER SPECIFIC */
- guint blocking_probe_id;
- guint event_probe_id;
+ /* list of output slots */
+ GList *outputs;
};
/* Output Slot:
*
- * Buffered output
+ * Handles everything related to outputing, including optional buffering.
*/
struct _OutputSlotInfo
{
- ChildSrcPadInfo *linked_info; /* demux source pad info feeding this slot, if any */
- GstElement *queue; /* queue2 or downloadbuffer */
- GstPad *queue_sinkpad; /* Sink pad of the queue eleemnt */
+ ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */
+
+ GstPad *originating_pad; /* Pad that created this OutputSlotInfo (ref held) */
GstPad *output_pad; /* Output ghost pad */
+
gboolean is_eos; /* Did EOS get fed into the buffering element */
+ GstElement *queue; /* queue2 or downloadbuffer */
+ GstPad *queue_sinkpad; /* Sink pad of the queue eleemnt */
+
gulong bitrate_changed_id; /* queue bitrate changed notification */
+
+ guint demuxer_event_probe_id;
};
/**
gboolean is_stream;
gboolean is_adaptive;
gboolean demuxer_handles_buffering; /* If TRUE: Don't use buffering elements */
- gboolean source_streams_aware; /* if TRUE: Don't block output pads */
guint64 buffer_duration; /* When buffering, buffer duration (ns) */
guint buffer_size; /* When buffering, buffer size (bytes) */
gboolean download;
gdouble high_watermark;
GstElement *source;
- GList *typefinds; /* list of typefind element */
+
+ GList *src_infos; /* List of ChildSrcPadInfo for the source */
GstElement *demuxer; /* Adaptive demuxer if any */
- GSList *out_slots;
guint numpads;
static GstStateChangeReturn gst_uri_source_bin_change_state (GstElement *
element, GstStateChange transition);
-static void handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad,
+static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad,
GstCaps * caps);
-static gboolean setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad);
+static gboolean setup_typefind (ChildSrcPadInfo * info);
static void remove_demuxer (GstURISourceBin * bin);
static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
-static OutputSlotInfo *get_output_slot (GstURISourceBin * urisrc,
- gboolean do_download, gboolean is_adaptive);
+static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info,
+ gboolean do_download, gboolean is_adaptive, gboolean no_buffering,
+ GstPad * originating_pad);
static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
static void free_output_slot_async (GstURISourceBin * urisrc,
OutputSlotInfo * slot);
-static GstPad *create_output_pad (GstURISourceBin * urisrc, GstPad * pad);
+static GstPad *create_output_pad (OutputSlotInfo * slot, GstPad * pad);
static void remove_buffering_msgs (GstURISourceBin * bin, GstObject * src);
static void update_queue_values (GstURISourceBin * urisrc);
}
static GstPadProbeReturn
-demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
+demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot);
+/* CALL WITH URISOURCEBIN LOCK */
static void
-free_child_src_pad_info (ChildSrcPadInfo * info)
+free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
{
- if (info->output_pad)
- gst_object_unref (info->output_pad);
+ g_assert (info->src_pad);
+
+ GST_DEBUG_OBJECT (urisrc,
+ "Freeing ChildSrcPadInfo for %" GST_PTR_FORMAT, info->src_pad);
+ if (info->typefind) {
+ gst_element_set_state (info->typefind, GST_STATE_NULL);
+ gst_bin_remove (GST_BIN_CAST (urisrc), info->typefind);
+ }
+
+ gst_object_unref (info->src_pad);
+
+ g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc);
+ g_list_free (info->outputs);
+
g_free (info);
}
-/* Called by the signal handlers when a demuxer has produced a new stream */
-static void
-new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
- GstURISourceBin * urisrc)
+static ChildSrcPadInfo *
+get_cspi_for_pad (GstURISourceBin * urisrc, GstPad * pad)
+{
+ GList *iter;
+
+ for (iter = urisrc->src_infos; iter; iter = iter->next) {
+ ChildSrcPadInfo *info = iter->data;
+ if (info->src_pad == pad)
+ return info;
+ }
+ return NULL;
+}
+
+static ChildSrcPadInfo *
+new_child_src_pad_info (GstURISourceBin * urisrc, GstPad * pad)
{
ChildSrcPadInfo *info;
+ GST_LOG_OBJECT (urisrc, "New ChildSrcPadInfo for %" GST_PTR_FORMAT, pad);
+
info = g_new0 (ChildSrcPadInfo, 1);
- info->src_pad = pad;
+ info->urisrc = urisrc;
+ info->src_pad = gst_object_ref (pad);
- g_object_set_data_full (G_OBJECT (pad), "urisourcebin.srcpadinfo",
- info, (GDestroyNotify) free_child_src_pad_info);
+ urisrc->src_infos = g_list_append (urisrc->src_infos, info);
+
+ return info;
+}
+
+/* Called by the signal handlers when a demuxer has produced a new stream */
+static void
+new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
+ ChildSrcPadInfo * info)
+{
+ GstURISourceBin *urisrc = info->urisrc;
+ OutputSlotInfo *slot;
+ GstPad *output_pad;
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* If the demuxer handles buffering and is streams-aware, we can expose it
as-is directly. We still add an event probe to deal with EOS */
- if (urisrc->demuxer_handles_buffering && urisrc->source_streams_aware) {
- info->output_pad = gst_object_ref (create_output_pad (urisrc, pad));
- GST_DEBUG_OBJECT (element,
- "New streams-aware demuxer pad %s:%s , exposing directly",
- GST_DEBUG_PAD_NAME (pad));
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- expose_output_pad (urisrc, info->output_pad);
- } else {
- g_return_if_reached ();
- }
- info->event_probe_id =
+ slot =
+ new_output_slot (info, FALSE, FALSE, urisrc->demuxer_handles_buffering,
+ pad);
+ output_pad = gst_object_ref (slot->output_pad);
+
+ GST_DEBUG_OBJECT (element,
+ "New streams-aware demuxer pad %s:%s , exposing directly",
+ GST_DEBUG_PAD_NAME (pad));
+ slot->demuxer_event_probe_id =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
- GST_PAD_PROBE_TYPE_EVENT_FLUSH, demux_pad_events, urisrc, NULL);
+ GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events,
+ slot, NULL);
+
+ GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+ expose_output_pad (urisrc, output_pad);
+ gst_object_unref (output_pad);
}
/* Called with lock held */
static gboolean
all_slots_are_eos (GstURISourceBin * urisrc)
{
- GSList *tmp;
+ GList *tmp;
- for (tmp = urisrc->out_slots; tmp; tmp = tmp->next) {
- OutputSlotInfo *slot = (OutputSlotInfo *) tmp->data;
- if (slot->is_eos == FALSE)
- return FALSE;
+ for (tmp = urisrc->src_infos; tmp; tmp = tmp->next) {
+ ChildSrcPadInfo *cspi = tmp->data;
+ GList *iter2;
+ for (iter2 = cspi->outputs; iter2; iter2 = iter2->next) {
+ OutputSlotInfo *slot = (OutputSlotInfo *) iter2->data;
+ if (slot->is_eos == FALSE)
+ return FALSE;
+ }
}
return TRUE;
}
+/* CALL WITH URISOURCEBIN LOCK */
+static OutputSlotInfo *
+output_slot_for_originating_pad (ChildSrcPadInfo * info,
+ GstPad * originating_pad)
+{
+ GList *iter;
+ for (iter = info->outputs; iter; iter = iter->next) {
+ OutputSlotInfo *slot = iter->data;
+ if (slot->originating_pad == originating_pad)
+ return slot;
+ }
+
+ return NULL;
+}
+
static GstPadProbeReturn
-demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot)
{
- GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
- ChildSrcPadInfo *child_info;
+ GstURISourceBin *urisrc = slot->linked_info->urisrc;
GstPadProbeReturn ret = GST_PAD_PROBE_OK;
GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
- if (!(child_info =
- g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
- goto done;
-
GST_URI_SOURCE_BIN_LOCK (urisrc);
- /* If not linked to a slot, nothing more to do */
- if (child_info->output_slot == NULL) {
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- goto done;
- }
switch (GST_EVENT_TYPE (ev)) {
case GST_EVENT_EOS:
BUFFERING_LOCK (urisrc);
/* Mark that we fed an EOS to this slot */
- child_info->output_slot->is_eos = TRUE;
+ slot->is_eos = TRUE;
all_streams_eos = all_slots_are_eos (urisrc);
BUFFERING_UNLOCK (urisrc);
- /* EOS means this element is no longer buffering */
- remove_buffering_msgs (urisrc,
- GST_OBJECT_CAST (child_info->output_slot->queue));
-
- /* Mark this custom EOS, replacing the event in the probe data */
- ev = gst_event_make_writable (ev);
- GST_PAD_PROBE_INFO_DATA (info) = ev;
-
- gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_EOS_QUARK,
- (gchar *) CUSTOM_EOS_QUARK_DATA, NULL);
+ if (slot->queue)
+ /* EOS means this element is no longer buffering */
+ remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
if (all_streams_eos) {
GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
case GST_EVENT_STREAM_START:
case GST_EVENT_FLUSH_STOP:
BUFFERING_LOCK (urisrc);
- child_info->output_slot->is_eos = FALSE;
+ slot->is_eos = FALSE;
BUFFERING_UNLOCK (urisrc);
break;
default:
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-done:
return ret;
}
guint64 min_time_level = 0, max_time_level = 0;
gdouble avg_byte_level = 0., avg_time_level = 0.;
guint i = 0;
- GSList *cur;
+ GList *iter, *cur;
GST_URI_SOURCE_BIN_LOCK (urisrc);
- for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
- OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
- guint byte_limit = 0;
- guint64 time_limit = 0;
+ for (iter = urisrc->src_infos; iter; iter = iter->next) {
+ ChildSrcPadInfo *info = iter->data;
+ for (cur = info->outputs; cur; cur = cur->next) {
+ OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
+ guint byte_limit = 0;
+ guint64 time_limit = 0;
+
+ if (!slot->queue)
+ continue;
- g_object_get (slot->queue, "current-level-bytes", &byte_limit,
- "current-level-time", &time_limit, NULL);
+ g_object_get (slot->queue, "current-level-bytes", &byte_limit,
+ "current-level-time", &time_limit, NULL);
- if (byte_limit < min_byte_level)
- min_byte_level = byte_limit;
- if (byte_limit > max_byte_level)
- max_byte_level = byte_limit;
- avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
+ if (byte_limit < min_byte_level)
+ min_byte_level = byte_limit;
+ if (byte_limit > max_byte_level)
+ max_byte_level = byte_limit;
+ avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
- if (time_limit < min_time_level)
- min_time_level = time_limit;
- if (time_limit > max_time_level)
- max_time_level = time_limit;
- avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
+ if (time_limit < min_time_level)
+ min_time_level = time_limit;
+ if (time_limit > max_time_level)
+ max_time_level = time_limit;
+ avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
- i++;
+ i++;
+ }
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
guint buffer_size;
gdouble low_watermark, high_watermark;
guint64 cumulative_bitrate = 0;
- GSList *cur;
+ GList *iter, *cur;
GST_URI_SOURCE_BIN_LOCK (urisrc);
duration = GET_BUFFER_DURATION (urisrc);
low_watermark = urisrc->low_watermark;
high_watermark = urisrc->high_watermark;
- for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
- OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
- guint64 bitrate = 0;
+ for (iter = urisrc->src_infos; iter; iter = iter->next) {
+ ChildSrcPadInfo *info = iter->data;
+ for (cur = info->outputs; cur; cur = cur->next) {
+ OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
+ guint64 bitrate = 0;
- if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
- "bitrate")) {
- g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
- }
+ if (!slot->queue)
+ continue;
- if (bitrate > 0)
- cumulative_bitrate += bitrate;
- else {
- GST_TRACE_OBJECT (urisrc, "Unknown bitrate detected from %" GST_PTR_FORMAT
- ", resetting all bitrates", slot->queue);
- cumulative_bitrate = 0;
- break;
+ if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
+ "bitrate")) {
+ g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
+ }
+
+ if (bitrate > 0)
+ cumulative_bitrate += bitrate;
+ else {
+ GST_TRACE_OBJECT (urisrc,
+ "Unknown bitrate detected from %" GST_PTR_FORMAT
+ ", resetting all bitrates", slot->queue);
+ cumulative_bitrate = 0;
+ break;
+ }
}
}
"bitrate %" G_GUINT64_FORMAT ", buffer size %u, buffer duration %"
G_GINT64_FORMAT, cumulative_bitrate, buffer_size, duration);
- for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
- OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
- guint byte_limit;
-
- if (cumulative_bitrate > 0
- && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
- "bitrate")) {
- guint64 bitrate;
- g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
- byte_limit =
- gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
- } else {
- /* if not all queue's have valid bitrates, use the buffer-size as the
- * limit */
- byte_limit = buffer_size;
- }
+ for (iter = urisrc->src_infos; iter; iter = iter->next) {
+ ChildSrcPadInfo *info = iter->data;
+ for (cur = info->outputs; cur; cur = cur->next) {
+ OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
+ guint byte_limit;
+
+ if (!slot->queue)
+ continue;
+
+ if (cumulative_bitrate > 0
+ && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
+ "bitrate")) {
+ guint64 bitrate;
+ g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
+ byte_limit =
+ gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
+ } else {
+ /* if not all queue's have valid bitrates, use the buffer-size as the
+ * limit */
+ byte_limit = buffer_size;
+ }
- GST_DEBUG_OBJECT (urisrc,
- "calculated new limits for queue-like element %" GST_PTR_FORMAT
- ", bytes:%u, time:%" G_GUINT64_FORMAT
- ", low-watermark:%f, high-watermark:%f",
- slot->queue, byte_limit, (guint64) duration, low_watermark,
- high_watermark);
- g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
- "max-size-time", (guint64) duration, "low-watermark", low_watermark,
- "high-watermark", high_watermark, NULL);
+ GST_DEBUG_OBJECT (urisrc,
+ "calculated new limits for queue-like element %" GST_PTR_FORMAT
+ ", bytes:%u, time:%" G_GUINT64_FORMAT
+ ", low-watermark:%f, high-watermark:%f",
+ slot->queue, byte_limit, (guint64) duration, low_watermark,
+ high_watermark);
+ g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
+ "max-size-time", (guint64) duration, "low-watermark", low_watermark,
+ "high-watermark", high_watermark, NULL);
+ }
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
}
/* Called with lock held */
static OutputSlotInfo *
-get_output_slot (GstURISourceBin * urisrc, gboolean do_download,
- gboolean is_adaptive)
+new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
+ gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad)
{
+ GstURISourceBin *urisrc = info->urisrc;
OutputSlotInfo *slot;
GstPad *srcpad;
- GstElement *queue;
+ GstElement *queue = NULL;
const gchar *elem_name;
- /* Otherwise create the new slot */
- if (do_download)
- elem_name = "downloadbuffer";
- else
- elem_name = "queue2";
-
- queue = gst_element_factory_make (elem_name, NULL);
- if (!queue)
- goto no_buffer_element;
+ GST_DEBUG_OBJECT (urisrc,
+ "do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%"
+ GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad);
slot = g_new0 (OutputSlotInfo, 1);
- slot->queue = queue;
+ slot->linked_info = info;
- /* Set the slot onto the queue (needed in buffering msg handling) */
- g_object_set_data (G_OBJECT (queue), "urisourcebin.slotinfo", slot);
+ /* If buffering is required, create the element */
+ if (!no_buffering) {
+ if (do_download)
+ elem_name = "downloadbuffer";
+ else
+ elem_name = "queue2";
- slot->bitrate_changed_id =
- g_signal_connect (G_OBJECT (queue), "notify::bitrate",
- (GCallback) on_queue_bitrate_changed, urisrc);
+ queue = gst_element_factory_make (elem_name, NULL);
+ if (!queue)
+ goto no_buffer_element;
- if (do_download) {
- gchar *temp_template, *filename;
- const gchar *tmp_dir, *prgname;
+ slot->queue = queue;
- tmp_dir = g_get_user_cache_dir ();
- prgname = g_get_prgname ();
- if (prgname == NULL)
- prgname = "GStreamer";
+ slot->bitrate_changed_id =
+ g_signal_connect (G_OBJECT (queue), "notify::bitrate",
+ (GCallback) on_queue_bitrate_changed, urisrc);
- filename = g_strdup_printf ("%s-XXXXXX", prgname);
+ if (do_download) {
+ gchar *temp_template, *filename;
+ const gchar *tmp_dir, *prgname;
- /* build our filename */
- temp_template = g_build_filename (tmp_dir, filename, NULL);
+ tmp_dir = g_get_user_cache_dir ();
+ prgname = g_get_prgname ();
+ if (prgname == NULL)
+ prgname = "GStreamer";
- GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
- temp_template, tmp_dir, prgname, filename);
+ filename = g_strdup_printf ("%s-XXXXXX", prgname);
- /* configure progressive download for selected media types */
- g_object_set (queue, "temp-template", temp_template, NULL);
+ /* build our filename */
+ temp_template = g_build_filename (tmp_dir, filename, NULL);
- g_free (filename);
- g_free (temp_template);
- } else {
- if (is_adaptive) {
- GST_LOG_OBJECT (urisrc, "Adding queue for adaptive streaming stream");
- g_object_set (queue, "use-buffering", urisrc->use_buffering,
- "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
+ GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
+ temp_template, tmp_dir, prgname, filename);
+
+ /* configure progressive download for selected media types */
+ g_object_set (queue, "temp-template", temp_template, NULL);
+
+ g_free (filename);
+ g_free (temp_template);
} else {
- GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
- g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
- }
+ if (is_adaptive) {
+ GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream");
+ g_object_set (queue, "use-buffering", urisrc->use_buffering,
+ "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
+ } else {
+ GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
+ g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
+ }
- g_object_set (queue, "ring-buffer-max-size",
- urisrc->ring_buffer_max_size, NULL);
- /* Disable max-size-buffers - queue based on data rate to the default time limit */
- g_object_set (queue, "max-size-buffers", 0, NULL);
+ g_object_set (queue, "ring-buffer-max-size",
+ urisrc->ring_buffer_max_size, NULL);
+ /* Disable max-size-buffers - queue based on data rate to the default time limit */
+ g_object_set (queue, "max-size-buffers", 0, NULL);
- /* Don't start buffering until the queue is empty (< 1%).
- * Start playback when the queue is 60% full, leaving a bit more room
- * for upstream to push more without getting bursty */
- g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
+ /* Don't start buffering until the queue is empty (< 1%).
+ * Start playback when the queue is 60% full, leaving a bit more room
+ * for upstream to push more without getting bursty */
+ g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
- g_object_set (queue, "low-watermark", urisrc->low_watermark,
- "high-watermark", urisrc->high_watermark, NULL);
- }
+ g_object_set (queue, "low-watermark", urisrc->low_watermark,
+ "high-watermark", urisrc->high_watermark, NULL);
+ }
- /* set the necessary limits on the queue-like elements */
- g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
- "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
-#if 0
- /* Disabled because this makes initial startup slower for radio streams */
- else {
- /* Buffer 4 seconds by default - some extra headroom over the
- * core default, because we trigger playback sooner */
- //g_object_set (queue, "max-size-time", 4 * GST_SECOND, NULL);
- }
-#endif
+ /* set the necessary limits on the queue-like elements */
+ g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
+ "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
+
+ gst_bin_add (GST_BIN_CAST (urisrc), queue);
+ gst_element_sync_state_with_parent (queue);
+
+ slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink");
- /* save queue pointer so we can remove it later */
- urisrc->out_slots = g_slist_prepend (urisrc->out_slots, slot);
+ /* get the new raw srcpad */
+ srcpad = gst_element_get_static_pad (queue, "src");
- gst_bin_add (GST_BIN_CAST (urisrc), queue);
- gst_element_sync_state_with_parent (queue);
+ slot->output_pad = create_output_pad (slot, srcpad);
- slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink");
+ gst_object_unref (srcpad);
- /* get the new raw srcpad */
- srcpad = gst_element_get_static_pad (queue, "src");
- g_object_set_data (G_OBJECT (srcpad), "urisourcebin.slotinfo", slot);
+ gst_pad_link (originating_pad, slot->queue_sinkpad);
+ } else {
+ /* Expose pad directly */
+ slot->output_pad = create_output_pad (slot, originating_pad);
+ }
+ slot->originating_pad = gst_object_ref (originating_pad);
- slot->output_pad = create_output_pad (urisrc, srcpad);
+ /* save output slot so we can remove it later */
+ info->outputs = g_list_append (info->outputs, slot);
- gst_object_unref (srcpad);
+ GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT,
+ slot->output_pad);
return slot;
no_buffer_element:
{
+ g_free (slot);
post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), elem_name);
return NULL;
}
gpointer user_data)
{
GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
- GstURISourceBin *urisrc = user_data;
+ OutputSlotInfo *slot = user_data;
+ GstURISourceBin *urisrc = slot->linked_info->urisrc;
- GST_LOG_OBJECT (pad, "%s, urisrc %p", GST_EVENT_TYPE_NAME (event), event);
+ GST_LOG_OBJECT (pad, "%" GST_PTR_FORMAT, event);
+ /* A custom EOS will be received if an adaptive demuxer source pad removed a
+ * pad and buffering was present on that slot */
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (event),
CUSTOM_EOS_QUARK)) {
- OutputSlotInfo *slot;
- GST_DEBUG_OBJECT (pad, "we received EOS");
+ GstPadProbeReturn probe_ret = GST_PAD_PROBE_DROP;
+
+ GST_DEBUG_OBJECT (pad, "we received custom EOS");
/* remove custom-eos */
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK,
GST_URI_SOURCE_BIN_LOCK (urisrc);
- slot = g_object_get_data (G_OBJECT (pad), "urisourcebin.slotinfo");
-
- if (slot) {
- GstEvent *eos;
- guint32 seqnum;
-
- if (slot->linked_info) {
- if (slot->is_eos) {
- /* linked_info is old input which is still linked without removal */
- GST_DEBUG_OBJECT (pad, "push actual EOS");
- seqnum = gst_event_get_seqnum (event);
- eos = gst_event_new_eos ();
- gst_event_set_seqnum (eos, seqnum);
- gst_pad_push_event (slot->output_pad, eos);
- } else {
- /* Do not clear output slot yet. A new input was
- * connected. We should just drop this EOS */
- }
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- return GST_PAD_PROBE_DROP;
- }
-
- seqnum = gst_event_get_seqnum (event);
- eos = gst_event_new_eos ();
- gst_event_set_seqnum (eos, seqnum);
- gst_pad_push_event (slot->output_pad, eos);
- free_output_slot_async (urisrc, slot);
+ if (slot->is_eos) {
+ /* linked_info is old input which is still linked without removal */
+ GST_DEBUG_OBJECT (pad, "push actual EOS");
+ gst_pad_push_event (slot->output_pad, event);
+ probe_ret = GST_PAD_PROBE_HANDLED;
}
- /* FIXME: Only emit drained if all output pads are done and there's no
- * pending pads */
- g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_DRAINED], 0, NULL);
+ /* And finally remove the output. This is done asynchronously since we can't
+ * do it from the streaming thread */
+ free_output_slot_async (urisrc, slot);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- return GST_PAD_PROBE_DROP;
+ return probe_ret;
}
/* never drop events */
return GST_PAD_PROBE_OK;
* padprobe to detect EOS before exposing the pad.
* Called with LOCK held. */
static GstPad *
-create_output_pad (GstURISourceBin * urisrc, GstPad * pad)
+create_output_pad (OutputSlotInfo * slot, GstPad * pad)
{
+ GstURISourceBin *urisrc = slot->linked_info->urisrc;
GstPad *newpad;
GstPadTemplate *pad_tmpl;
gchar *padname;
- gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
- source_pad_event_probe, urisrc, NULL);
+ /* If the output slot does buffering, add a probe to detect drainage */
+ if (slot->queue)
+ gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+ source_pad_event_probe, slot, NULL);
pad_tmpl = gst_static_pad_template_get (&srctemplate);
}
static void
-expose_raw_output_pad (GstURISourceBin * urisrc, GstPad * srcpad,
- GstPad * output_pad)
+demuxer_pad_removed_cb (GstElement * element, GstPad * pad,
+ ChildSrcPadInfo * info)
{
- ChildSrcPadInfo *info = g_new0 (ChildSrcPadInfo, 1);
- info->src_pad = srcpad;
- info->output_pad = gst_object_ref (output_pad);
-
- g_assert (g_object_get_data (G_OBJECT (srcpad),
- "urisourcebin.srcpadinfo") == NULL);
-
- g_object_set_data_full (G_OBJECT (srcpad), "urisourcebin.srcpadinfo",
- info, (GDestroyNotify) free_child_src_pad_info);
-
- expose_output_pad (urisrc, output_pad);
-}
-
-static void
-remove_output_pad (GstURISourceBin * urisrc, GstPad * pad)
-{
- if (!gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
- return; /* Pad is not exposed */
-
- GST_DEBUG_OBJECT (urisrc, "Removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
-
- gst_pad_set_active (pad, FALSE);
- gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), pad);
-}
-
-static void
-pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
-{
- ChildSrcPadInfo *info;
-
- GST_DEBUG_OBJECT (element, "pad removed name: <%s:%s>",
- GST_DEBUG_PAD_NAME (pad));
+ GstURISourceBin *urisrc;
+ OutputSlotInfo *slot;
/* we only care about srcpads */
if (!GST_PAD_IS_SRC (pad))
return;
- if (!(info = g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
- goto no_info;
+ urisrc = info->urisrc;
+
+ GST_DEBUG_OBJECT (urisrc, "pad removed name: <%s:%s>",
+ GST_DEBUG_PAD_NAME (pad));
GST_URI_SOURCE_BIN_LOCK (urisrc);
+ slot = output_slot_for_originating_pad (info, pad);
+ g_assert (slot);
- /* Send EOS to the output slot if the demuxer didn't already */
- if (info->output_slot) {
- GstStructure *s;
- GstEvent *event;
- OutputSlotInfo *slot;
+ gst_pad_remove_probe (pad, slot->demuxer_event_probe_id);
+ slot->demuxer_event_probe_id = 0;
- slot = info->output_slot;
+ if (slot->queue) {
+ gboolean was_eos;
+
+ /* Propagate custom EOS to buffering elements. The slot will be removed when
+ * it is received on the output of the buffering elements */
BUFFERING_LOCK (urisrc);
/* Unlink this pad from its output slot and send a fake EOS event
* to drain the queue */
+ was_eos = slot->is_eos;
slot->is_eos = TRUE;
BUFFERING_UNLOCK (urisrc);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
-
- slot->linked_info = NULL;
-
- info->output_slot = NULL;
-
- GST_LOG_OBJECT (element,
- "Pad %" GST_PTR_FORMAT " was removed without EOS. Sending.", pad);
-
- event = gst_event_new_eos ();
- s = gst_event_writable_structure (event);
- gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
- NULL);
- gst_pad_send_event (slot->queue_sinkpad, event);
- } else if (info->output_pad != NULL) {
- GST_LOG_OBJECT (element,
- "Pad %" GST_PTR_FORMAT " was removed. Unexposing %" GST_PTR_FORMAT,
- pad, info->output_pad);
- remove_output_pad (urisrc, info->output_pad);
+ if (!was_eos) {
+ GstStructure *s;
+ GstEvent *event;
+ event = gst_event_new_eos ();
+ s = gst_event_writable_structure (event);
+ gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
+ NULL);
+ gst_pad_send_event (slot->queue_sinkpad, event);
+ }
} else {
- GST_WARNING_OBJECT (urisrc, "Removed pad has no output slot or pad");
+ GST_LOG_OBJECT (urisrc,
+ "No buffering involved, removing output slot immediately");
+ /* Remove output slot immediately */
+ info->outputs = g_list_remove (info->outputs, slot);
+ free_output_slot (slot, urisrc);
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
-
- /* ERRORS */
-no_info:
- {
- GST_WARNING_OBJECT (element, "no info found for pad");
- return;
- }
}
/* helper function to lookup stuff in lists */
element_name), (NULL));
}
+typedef struct
+{
+ GstURISourceBin *urisrc;
+ gboolean have_out;
+ gboolean res;
+} AnalyseData;
+
+static void
+analyse_pad_foreach (const GValue * item, AnalyseData * data)
+{
+ GstURISourceBin *urisrc = data->urisrc;
+ GstPad *pad = g_value_dup_object (item);
+ ChildSrcPadInfo *info;
+ GstCaps *padcaps = NULL;
+ gboolean pad_is_raw;
+ gboolean res = TRUE;
+
+ GST_LOG_OBJECT (urisrc, "pad %" GST_PTR_FORMAT, pad);
+
+ data->have_out = TRUE;
+
+ /* The info might already exist if there was an iterator resync */
+ if (get_cspi_for_pad (urisrc, pad)) {
+ GST_LOG_OBJECT (urisrc, "Already analysed");
+ goto out;
+ }
+
+ info = new_child_src_pad_info (urisrc, pad);
+ padcaps = gst_pad_query_caps (pad, NULL);
+
+ if (!is_all_raw_caps (padcaps, DEFAULT_CAPS, &pad_is_raw) || !pad_is_raw) {
+ /* if FALSE, this pad has no caps, we setup typefinding on it */
+ if (!setup_typefind (info)) {
+ res = FALSE;
+ goto out;
+ }
+ } else if (pad_is_raw) {
+ /* caps on source pad are all raw, we can add the pad */
+ GstPad *output_pad;
+ OutputSlotInfo *slot;
+
+ GST_URI_SOURCE_BIN_LOCK (urisrc);
+ /* Only use buffering on raw pads in very specific conditions */
+ GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d",
+ urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri));
+ slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering
+ || !IS_QUEUE_URI (urisrc->uri), pad);
+
+ if (!slot) {
+ res = FALSE;
+ GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+ goto out;
+ }
+
+ /* get the new raw srcpad */
+ output_pad = gst_object_ref (slot->output_pad);
+
+ GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+
+ expose_output_pad (urisrc, output_pad);
+ gst_object_unref (output_pad);
+ } else {
+ GST_DEBUG_OBJECT (urisrc, "Handling non-raw pad");
+ /* The caps are non-raw, we handle it directly */
+ handle_new_pad (info, pad, padcaps);
+ }
+
+out:
+ if (padcaps)
+ gst_caps_unref (padcaps);
+ gst_object_unref (pad);
+ data->res &= res;
+}
+
/**
* analyse_source_and_expose_raw_pads:
* @urisrc: a #GstURISourceBin
*/
static gboolean
analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc,
- gboolean * all_pads_raw, gboolean * have_out, gboolean * is_dynamic)
+ gboolean * have_out, gboolean * is_dynamic)
{
GstElementClass *elemclass;
+ AnalyseData data = { 0, };
+ GstIteratorResult iterres;
GList *walk;
GstIterator *pads_iter;
- gboolean done = FALSE;
gboolean res = TRUE;
- GstPad *pad;
- GValue item = { 0, };
- guint nb_raw = 0;
- guint nb_pads = 0;
- GstCaps *rawcaps = DEFAULT_CAPS;
- gboolean pad_is_raw;
- gboolean use_queue;
-
- *have_out = FALSE;
- *all_pads_raw = FALSE;
- *is_dynamic = FALSE;
-
- /* Add buffering elements on raw pads only is very specific conditions */
- use_queue = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri);
pads_iter = gst_element_iterate_src_pads (urisrc->source);
- while (!done) {
- switch (gst_iterator_next (pads_iter, &item)) {
- case GST_ITERATOR_ERROR:
- res = FALSE;
- /* FALLTHROUGH */
- case GST_ITERATOR_DONE:
- done = TRUE;
- break;
- case GST_ITERATOR_RESYNC:
- /* reset results and resync */
- *have_out = FALSE;
- *all_pads_raw = FALSE;
- *is_dynamic = FALSE;
- nb_pads = nb_raw = 0;
- gst_iterator_resync (pads_iter);
- break;
- case GST_ITERATOR_OK:
- {
- GstCaps *padcaps;
-
- pad = g_value_dup_object (&item);
- /* we now officially have an output pad */
- *have_out = TRUE;
- nb_pads++;
- padcaps = gst_pad_query_caps (pad, NULL);
-
- if (!is_all_raw_caps (padcaps, rawcaps, &pad_is_raw)) {
- /* if FALSE, this pad has no caps, we setup typefinding on it */
- if (!setup_typefind (urisrc, pad)) {
- res = FALSE;
- done = TRUE;
- }
- } else if (pad_is_raw) {
- /* caps on source pad are all raw, we can add the pad */
- GstPad *output_pad;
-
- nb_raw++;
- GST_URI_SOURCE_BIN_LOCK (urisrc);
- if (use_queue) {
- OutputSlotInfo *slot = get_output_slot (urisrc, FALSE, FALSE);
- if (!slot) {
- gst_caps_unref (padcaps);
- gst_object_unref (pad);
- goto no_slot;
- }
-
- gst_pad_link (pad, slot->queue_sinkpad);
-
- /* get the new raw srcpad */
- output_pad = gst_object_ref (slot->output_pad);
-
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-
- expose_output_pad (urisrc, output_pad);
- gst_object_unref (output_pad);
- } else {
- output_pad = create_output_pad (urisrc, pad);
-
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
-
- expose_raw_output_pad (urisrc, pad, output_pad);
- }
- } else {
- /* The caps are non-raw, we handle it directly */
- handle_new_pad (urisrc, pad, padcaps);
- }
- gst_caps_unref (padcaps);
- gst_object_unref (pad);
- g_value_reset (&item);
- break;
- }
- }
- }
- g_value_unset (&item);
+
+restart:
+ data.res = TRUE;
+ data.have_out = FALSE;
+ data.urisrc = urisrc;
+ iterres =
+ gst_iterator_foreach (pads_iter,
+ (GstIteratorForeachFunction) analyse_pad_foreach, &data);
+ if (iterres == GST_ITERATOR_RESYNC)
+ goto restart;
+ if (iterres == GST_ITERATOR_ERROR)
+ res = FALSE;
+ else
+ res = data.res;
gst_iterator_free (pads_iter);
- gst_caps_unref (rawcaps);
/* check for padtemplates that list SOMETIMES pads to
* determine if the element is dynamic. */
+ *is_dynamic = FALSE;
elemclass = GST_ELEMENT_GET_CLASS (urisrc->source);
walk = gst_element_class_get_pad_template_list (elemclass);
while (walk != NULL) {
walk = g_list_next (walk);
}
- if (nb_pads && nb_pads == nb_raw)
- *all_pads_raw = TRUE;
+ *have_out = data.have_out;
return res;
-no_slot:
- {
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- gst_object_unref (pad);
- g_value_unset (&item);
- gst_iterator_free (pads_iter);
- gst_caps_unref (rawcaps);
-
- return FALSE;
- }
}
/* Remove any adaptive demuxer element */
/* make a demuxer and connect to all the signals */
static GstElement *
-make_demuxer (GstURISourceBin * urisrc, GstCaps * caps)
+make_demuxer (GstURISourceBin * urisrc, ChildSrcPadInfo * info, GstCaps * caps)
{
GList *factories, *eligible, *cur;
GstElement *demuxer = NULL;
/* set up callbacks to create the links between
* demuxer streams and output */
g_signal_connect (demuxer,
- "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), urisrc);
+ "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
g_signal_connect (demuxer,
- "pad-removed", G_CALLBACK (pad_removed_cb), urisrc);
+ "pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
/* Propagate connection-speed property */
pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (demuxer),
* * typefind has found a type
*/
static void
-handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
+handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
{
+ GstURISourceBin *urisrc = info->urisrc;
gboolean is_raw;
GstStructure *s;
const gchar *media_type;
/* if this is a pad with all raw caps, we can expose it */
if (is_all_raw_caps (caps, DEFAULT_CAPS, &is_raw) && is_raw) {
+ OutputSlotInfo *slot;
GstPad *output_pad;
GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
", exposing", caps);
- output_pad = create_output_pad (urisrc, srcpad);
+ slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
+ output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- expose_raw_output_pad (urisrc, srcpad, output_pad);
+ expose_output_pad (urisrc, slot->output_pad);
+ gst_object_unref (output_pad);
return;
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
GstPadLinkReturn link_res;
GstQuery *query;
- urisrc->demuxer = make_demuxer (urisrc, caps);
+ urisrc->demuxer = make_demuxer (urisrc, info, caps);
if (!urisrc->demuxer)
goto no_demuxer;
gst_bin_add (GST_BIN_CAST (urisrc), urisrc->demuxer);
gst_element_sync_state_with_parent (urisrc->demuxer);
} else if (!urisrc->is_stream) {
+ OutputSlotInfo *slot;
GstPad *output_pad;
- /* We don't need slot here, expose immediately */
+
+ /* We don't need buffering here, expose immediately */
GST_URI_SOURCE_BIN_LOCK (urisrc);
- output_pad = create_output_pad (urisrc, srcpad);
+ slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
+ output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- expose_raw_output_pad (urisrc, srcpad, output_pad);
+ expose_output_pad (urisrc, output_pad);
+ gst_object_unref (output_pad);
} else {
OutputSlotInfo *slot;
GstPad *output_pad;
do_download);
GST_URI_SOURCE_BIN_LOCK (urisrc);
- slot = get_output_slot (urisrc, do_download, FALSE);
-
- if (slot == NULL
- || gst_pad_link (srcpad, slot->queue_sinkpad) != GST_PAD_LINK_OK)
- goto could_not_link;
+ slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad);
gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
pre_queue_event_probe, urisrc, NULL);
* buffering or regular buffering */
static void
type_found (GstElement * typefind, guint probability,
- GstCaps * caps, GstURISourceBin * urisrc)
+ GstCaps * caps, ChildSrcPadInfo * info)
{
+ GstURISourceBin *urisrc = info->urisrc;
GstPad *srcpad = gst_element_get_static_pad (typefind, "src");
GST_DEBUG_OBJECT (urisrc, "typefind found caps %" GST_PTR_FORMAT
" on pad %" GST_PTR_FORMAT, caps, srcpad);
- handle_new_pad (urisrc, srcpad, caps);
+ handle_new_pad (info, srcpad, caps);
gst_object_unref (GST_OBJECT (srcpad));
}
* source. After we find the type, we decide to whether to plug an adaptive
* demuxer, or just link through queue2 (if needed) and expose the data */
static gboolean
-setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad)
+setup_typefind (ChildSrcPadInfo * info)
{
- GstElement *typefind;
+ GstURISourceBin *urisrc = info->urisrc;
+ GstPad *sinkpad;
/* now create the typefind element */
- typefind = gst_element_factory_make ("typefind", NULL);
- if (!typefind)
+ info->typefind = gst_element_factory_make ("typefind", NULL);
+ if (!info->typefind)
goto no_typefind;
/* Make sure the bin doesn't set the typefind running yet */
- gst_element_set_locked_state (typefind, TRUE);
-
- gst_bin_add (GST_BIN_CAST (urisrc), typefind);
-
- if (!srcpad) {
- if (!gst_element_link_pads (urisrc->source, NULL, typefind, "sink"))
- goto could_not_link;
- } else {
- GstPad *sinkpad = gst_element_get_static_pad (typefind, "sink");
- GstPadLinkReturn ret;
+ gst_element_set_locked_state (info->typefind, TRUE);
- ret = gst_pad_link (srcpad, sinkpad);
- gst_object_unref (sinkpad);
- if (ret != GST_PAD_LINK_OK)
- goto could_not_link;
- }
+ gst_bin_add (GST_BIN_CAST (urisrc), info->typefind);
- urisrc->typefinds = g_list_append (urisrc->typefinds, typefind);
+ sinkpad = gst_element_get_static_pad (info->typefind, "sink");
+ if (gst_pad_link (info->src_pad, sinkpad) != GST_PAD_LINK_OK)
+ goto could_not_link;
+ gst_object_unref (sinkpad);
/* connect a signal to find out when the typefind element found
* a type */
- g_signal_connect (typefind, "have-type", G_CALLBACK (type_found), urisrc);
+ g_signal_connect (info->typefind, "have-type", G_CALLBACK (type_found), info);
/* Now it can start */
- gst_element_set_locked_state (typefind, FALSE);
- gst_element_sync_state_with_parent (typefind);
+ gst_element_set_locked_state (info->typefind, FALSE);
+ gst_element_sync_state_with_parent (info->typefind);
return TRUE;
}
could_not_link:
{
+ gst_object_unref (sinkpad);
GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
(NULL), ("Can't link source to typefind element"));
- gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
return FALSE;
}
}
+/* CALL WITH URISOURCEBIN LOCK */
static void
free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
{
- GST_DEBUG_OBJECT (urisrc, "removing old queue element and freeing slot %p",
- slot);
- if (slot->bitrate_changed_id > 0)
- g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
- slot->bitrate_changed_id = 0;
+ GST_DEBUG_OBJECT (urisrc,
+ "removing output slot %" GST_PTR_FORMAT " -> %" GST_PTR_FORMAT,
+ slot->originating_pad, slot->output_pad);
+
+ if (slot->queue) {
+ if (slot->bitrate_changed_id > 0)
+ g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
+ slot->bitrate_changed_id = 0;
+
+ gst_element_set_locked_state (slot->queue, TRUE);
+ gst_element_set_state (slot->queue, GST_STATE_NULL);
+ remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
+ gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
- gst_element_set_locked_state (slot->queue, TRUE);
- gst_element_set_state (slot->queue, GST_STATE_NULL);
- remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
- gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
+ gst_object_unref (slot->queue_sinkpad);
+ }
- gst_object_unref (slot->queue_sinkpad);
+ if (slot->demuxer_event_probe_id)
+ gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id);
+ gst_object_unref (slot->originating_pad);
/* deactivate and remove the srcpad */
gst_pad_set_active (slot->output_pad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), slot->output_pad);
free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot)
{
GST_LOG_OBJECT (urisrc, "pushing output slot on thread pool to free");
- urisrc->out_slots = g_slist_remove (urisrc->out_slots, slot);
+ slot->linked_info->outputs = g_list_remove (slot->linked_info->outputs, slot);
gst_element_call_async (GST_ELEMENT_CAST (urisrc),
(GstElementCallAsyncFunc) call_free_output_slot, slot, NULL);
}
-static void
-unexpose_raw_pad_func (const GValue * item, GstURISourceBin * urisrc)
-{
- GstPad *pad = g_value_get_object (item);
- ChildSrcPadInfo *info =
- g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo");
-
- if (info && info->output_pad != NULL)
- remove_output_pad (urisrc, info->output_pad);
-}
-
-static void
-unexpose_src_pads (GstURISourceBin * urisrc, GstElement * element)
-{
- GstIterator *pads_iter;
-
- pads_iter = gst_element_iterate_src_pads (element);
- gst_iterator_foreach (pads_iter,
- (GstIteratorForeachFunction) unexpose_raw_pad_func, urisrc);
- gst_iterator_free (pads_iter);
-}
-
-static void
-remove_typefind (GstElement * typefind, GstURISourceBin * urisrc)
-{
- unexpose_src_pads (urisrc, typefind);
- gst_element_set_state (typefind, GST_STATE_NULL);
- gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
-}
-
/* remove source and all related elements */
static void
remove_source (GstURISourceBin * urisrc)
GstElement *source = urisrc->source;
GST_DEBUG_OBJECT (urisrc, "removing old src element");
- unexpose_src_pads (urisrc, source);
gst_element_set_state (source, GST_STATE_NULL);
if (urisrc->src_np_sig_id) {
urisrc->source = NULL;
}
- if (urisrc->typefinds) {
- GST_DEBUG_OBJECT (urisrc, "removing old typefind elements");
- g_list_foreach (urisrc->typefinds, (GFunc) remove_typefind, urisrc);
- g_list_free (urisrc->typefinds);
- urisrc->typefinds = NULL;
- }
-
GST_URI_SOURCE_BIN_LOCK (urisrc);
- g_slist_foreach (urisrc->out_slots, (GFunc) free_output_slot, urisrc);
- g_slist_free (urisrc->out_slots);
- urisrc->out_slots = NULL;
+ if (urisrc->src_infos) {
+ g_list_foreach (urisrc->src_infos, (GFunc) free_child_src_pad_info, urisrc);
+ g_list_free (urisrc->src_infos);
+ urisrc->src_infos = NULL;
+ }
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
if (urisrc->demuxer)
source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
{
GstCaps *caps;
+ ChildSrcPadInfo *info = new_child_src_pad_info (urisrc, pad);
GST_DEBUG_OBJECT (urisrc, "Found new pad %s.%s in source element %s",
GST_DEBUG_PAD_NAME (pad), GST_ELEMENT_NAME (element));
+
caps = gst_pad_get_current_caps (pad);
+ GST_DEBUG_OBJECT (urisrc, "caps %" GST_PTR_FORMAT, caps);
if (caps == NULL)
- setup_typefind (urisrc, pad);
+ setup_typefind (info);
else {
- handle_new_pad (urisrc, pad, caps);
+ handle_new_pad (info, pad, caps);
gst_caps_unref (caps);
}
}
static gboolean
setup_source (GstURISourceBin * urisrc)
{
- gboolean all_pads_raw, have_out, is_dynamic;
+ gboolean have_out, is_dynamic;
GST_DEBUG_OBJECT (urisrc, "setup source");
* if so, we can create streams for the pads and be done with it.
* Also check that is has source pads, if not, we assume it will
* do everything itself. */
- if (!analyse_source_and_expose_raw_pads (urisrc, &all_pads_raw, &have_out,
- &is_dynamic))
+ if (!analyse_source_and_expose_raw_pads (urisrc, &have_out, &is_dynamic))
goto invalid_source;
if (!is_dynamic) {
- if (all_pads_raw) {
- GST_DEBUG_OBJECT (urisrc, "Source provides all raw data");
- /* source provides raw data, we added the pads and we can now signal a
- * no_more pads because we are done. */
- gst_element_no_more_pads (GST_ELEMENT_CAST (urisrc));
- return TRUE;
- }
if (!have_out)
goto no_pads;
} else {
return new_msg;
}
+/* CALL WITH URISOURCEBIN LOCK */
+static OutputSlotInfo *
+output_slot_for_buffering_element (GstURISourceBin * urisrc,
+ GstElement * element)
+{
+ GList *top, *iter;
+ for (top = urisrc->src_infos; top; top = top->next) {
+ ChildSrcPadInfo *info = top->data;
+ for (iter = info->outputs; iter; iter = iter->next) {
+ OutputSlotInfo *slot = iter->data;
+ if (slot->queue == element)
+ return slot;
+ }
+ }
+
+ return NULL;
+}
+
static void
handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
{
GList *iter;
OutputSlotInfo *slot;
- /* buffering messages must be aggregated as there might be multiple
- * multiqueue in the pipeline and their independent buffering messages
- * will confuse the application
+ /* buffering messages must be aggregated as there might be multiple buffering
+ * elements in the pipeline and their independent buffering messages will
+ * confuse the application
*
* urisourcebin keeps a list of messages received from elements that are
* buffering.
GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
" with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
- slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)),
- "urisourcebin.slotinfo");
-
BUFFERING_LOCK (urisrc);
+ slot =
+ output_slot_for_buffering_element (urisrc,
+ (GstElement *) GST_MESSAGE_SRC (msg));
if (slot && slot->is_eos) {
/* Ignore buffering messages from queues we marked as EOS,
* we already removed those from the list of buffering
GstMessage *bufstats = iter->data;
gboolean is_eos = FALSE;
- slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
- "urisourcebin.slotinfo");
+ slot =
+ output_slot_for_buffering_element (urisrc,
+ (GstElement *) GST_MESSAGE_SRC (msg));
if (slot)
is_eos = slot->is_eos;
}
break;
}
- case GST_MESSAGE_STREAM_COLLECTION:
- {
- GST_DEBUG_OBJECT (urisrc, "Source is streams-aware");
- urisrc->source_streams_aware = TRUE;
- break;
- }
case GST_MESSAGE_BUFFERING:
handle_buffering_message (urisrc, msg);
msg = NULL;
(GDestroyNotify) gst_message_unref);
urisrc->buffering_status = NULL;
urisrc->last_buffering_pct = -1;
- urisrc->source_streams_aware = FALSE;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
GST_DEBUG ("ready to null");