/* An optional typefind */
GstElement *typefind;
- /* An optional demuxer */
+ /* Pre-parsebin buffering elements. Only present is parse-streams and
+ * downloading *or* ring-buffer-max-size */
+ GstElement *pre_parse_queue;
+
+ /* Post-parsebin multiqueue. Only present if parse-streams and buffering is
+ * required */
+ GstElement *multiqueue;
+
+ /* An optional demuxer or parsebin */
GstElement *demuxer;
gboolean demuxer_handles_buffering;
/* list of output slots */
GList *outputs;
+
+ /* The following fields specify how this output should be handled */
+
+ /* use_downloadbuffer : TRUE if the content from the source should be
+ * downloaded with a downloadbuffer element */
+ gboolean use_downloadbuffer;
+
+ /* use_queue2: TRUE if the contents should be buffered through a queue2
+ * element */
+ gboolean use_queue2;
};
/* Output Slot:
gboolean use_buffering;
gdouble low_watermark;
gdouble high_watermark;
+ gboolean parse_streams;
GstElement *source;
#define DEFAULT_RING_BUFFER_MAX_SIZE 0
#define DEFAULT_LOW_WATERMARK 0.01
#define DEFAULT_HIGH_WATERMARK 0.60
+#define DEFAULT_PARSE_STREAMS FALSE
#define ACTUAL_DEFAULT_BUFFER_SIZE 10 * 1024 * 1024 /* The value used for byte limits when buffer-size == -1 */
#define ACTUAL_DEFAULT_BUFFER_DURATION 5 * GST_SECOND /* The value used for time limits when buffer-duration == -1 */
PROP_LOW_WATERMARK,
PROP_HIGH_WATERMARK,
PROP_STATISTICS,
+ PROP_PARSE_STREAMS,
};
#define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
static gboolean setup_typefind (ChildSrcPadInfo * info);
static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info,
- gboolean do_download, gboolean is_adaptive, gboolean no_buffering,
GstPad * originating_pad);
static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
static void free_output_slot_async (GstURISourceBin * urisrc,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
+ * GstURISourceBin:parse-streams:
+ *
+ * A `parsebin` element will be used on all non-raw streams, and urisourcebin
+ * will output the elementary streams. Recommended when buffering is used
+ * since it will provide accurate buffering levels.
+ *
+ * Since: 1.22
+ */
+ g_object_class_install_property (gobject_class, PROP_PARSE_STREAMS,
+ g_param_spec_boolean ("parse-streams", "Parse Streams",
+ "Extract the elementary streams of non-raw sources",
+ DEFAULT_PARSE_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GstURISourceBin::drained:
*
* This signal is emitted when the data for the current uri is played.
urisrc->high_watermark = g_value_get_double (value);
update_queue_values (urisrc);
break;
+ case PROP_PARSE_STREAMS:
+ urisrc->parse_streams = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_STATISTICS:
g_value_take_boxed (value, get_queue_statistics (urisrc));
break;
+ case PROP_PARSE_STREAMS:
+ g_value_set_boolean (value, urisrc->parse_streams);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc);
g_list_free (info->outputs);
+ if (info->multiqueue) {
+ GST_DEBUG_OBJECT (urisrc, "Removing multiqueue");
+ gst_element_set_state (info->multiqueue, GST_STATE_NULL);
+ remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->multiqueue));
+ gst_bin_remove (GST_BIN_CAST (urisrc), info->multiqueue);
+ }
+
+ if (info->pre_parse_queue) {
+ gst_element_set_state (info->pre_parse_queue, GST_STATE_NULL);
+ remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->pre_parse_queue));
+ gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue);
+ }
+
g_free (info);
}
OutputSlotInfo *slot;
GstPad *output_pad;
+ GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad);
+
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* If the demuxer handles buffering and is streams-aware, we can expose it
as-is directly. We still add an event probe to deal with EOS */
- slot =
- new_output_slot (info, FALSE, FALSE, info->demuxer_handles_buffering,
- pad);
+ slot = new_output_slot (info, pad);
output_pad = gst_object_ref (slot->output_pad);
- GST_DEBUG_OBJECT (element,
- "New streams-aware demuxer pad %s:%s , exposing directly",
- GST_DEBUG_PAD_NAME (pad));
slot->demuxer_event_probe_id =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events,
(GstElementCallAsyncFunc) update_queue_values, NULL, NULL);
}
+static void
+setup_downloadbuffer (GstURISourceBin * urisrc, GstElement * downloadbuffer)
+{
+ gchar *temp_template, *filename;
+ const gchar *tmp_dir, *prgname;
+
+ tmp_dir = g_get_user_cache_dir ();
+ prgname = g_get_prgname ();
+ if (prgname == NULL)
+ prgname = "GStreamer";
+
+ filename = g_strdup_printf ("%s-XXXXXX", prgname);
+
+ /* build our filename */
+ temp_template = g_build_filename (tmp_dir, filename, NULL);
+
+ GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
+ temp_template, tmp_dir, prgname, filename);
+
+ /* configure progressive download for selected media types */
+ g_object_set (downloadbuffer, "temp-template", temp_template, NULL);
+
+ g_free (filename);
+ g_free (temp_template);
+}
+
+static void
+setup_multiqueue (GstURISourceBin * urisrc, ChildSrcPadInfo * info,
+ GstElement * multiqueue)
+{
+ if (info->use_downloadbuffer) {
+ /* If we have a downloadbuffer we will let that one deal with buffering,
+ and we only use multiqueue for dealing with interleave */
+ g_object_set (info->multiqueue, "use-buffering", FALSE, NULL);
+ } else {
+ /* Else we set the minimum interleave time of multiqueue to the required
+ * buffering duration and ask it to report buffering */
+ g_object_set (info->multiqueue, "use-buffering", TRUE,
+ "min-interleave-time", GET_BUFFER_DURATION (urisrc), NULL);
+ }
+ /* Common properties */
+ g_object_set (info->multiqueue,
+ "sync-by-running-time", TRUE,
+ "use-interleave", TRUE,
+ "max-size-bytes", 0,
+ "max-size-buffers", 0,
+ "low-watermark", urisrc->low_watermark,
+ "high-watermark", urisrc->high_watermark, NULL);
+ gst_bin_add (GST_BIN_CAST (urisrc), info->multiqueue);
+ gst_element_sync_state_with_parent (info->multiqueue);
+}
+
/* Called with lock held */
static OutputSlotInfo *
-new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
- gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad)
+new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
{
GstURISourceBin *urisrc = info->urisrc;
OutputSlotInfo *slot;
GstPad *srcpad;
GstElement *queue = NULL;
const gchar *elem_name;
+ gboolean use_downloadbuffer;
GST_DEBUG_OBJECT (urisrc,
- "do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%"
- GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad);
+ "use_queue2:%d use_downloadbuffer:%d, demuxer:%d, originating_pad:%"
+ GST_PTR_FORMAT, info->use_queue2, info->use_downloadbuffer,
+ info->demuxer != NULL, originating_pad);
slot = g_new0 (OutputSlotInfo, 1);
slot->linked_info = info;
- /* If buffering is required, create the element */
- if (!no_buffering) {
- if (do_download)
+ /* If a demuxer/parsebin is present, then the downloadbuffer will have been handled before that */
+ use_downloadbuffer = info->use_downloadbuffer && !info->demuxer;
+
+ /* If parsebin is used and buffering is required, we go through a multiqueue */
+ if (urisrc->parse_streams && (info->use_queue2 || info->use_downloadbuffer)) {
+ GST_DEBUG_OBJECT (urisrc, "Using multiqueue");
+ if (!info->multiqueue) {
+ GST_DEBUG_OBJECT (urisrc,
+ "Creating multiqueue for buffering elementary streams");
+ elem_name = "multiqueue";
+ info->multiqueue = gst_element_factory_make (elem_name, NULL);
+ if (!info->multiqueue)
+ goto no_buffer_element;
+ setup_multiqueue (urisrc, info, info->multiqueue);
+ }
+
+ slot->queue_sinkpad =
+ gst_element_request_pad_simple (info->multiqueue, "sink_%u");
+ srcpad = gst_pad_get_single_internal_link (slot->queue_sinkpad);
+ slot->output_pad = create_output_pad (slot, srcpad);
+ gst_object_unref (srcpad);
+ gst_pad_link (originating_pad, slot->queue_sinkpad);
+ }
+ /* If buffering is required, create the element. If downloadbuffer is
+ * required, it will take precedence over queue2 */
+ else if (use_downloadbuffer || info->use_queue2) {
+ if (use_downloadbuffer)
elem_name = "downloadbuffer";
else
elem_name = "queue2";
g_signal_connect (G_OBJECT (queue), "notify::bitrate",
(GCallback) on_queue_bitrate_changed, urisrc);
- if (do_download) {
- gchar *temp_template, *filename;
- const gchar *tmp_dir, *prgname;
-
- tmp_dir = g_get_user_cache_dir ();
- prgname = g_get_prgname ();
- if (prgname == NULL)
- prgname = "GStreamer";
-
- filename = g_strdup_printf ("%s-XXXXXX", prgname);
-
- /* build our filename */
- temp_template = g_build_filename (tmp_dir, filename, NULL);
-
- GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
- temp_template, tmp_dir, prgname, filename);
-
- /* configure progressive download for selected media types */
- g_object_set (queue, "temp-template", temp_template, NULL);
-
- g_free (filename);
- g_free (temp_template);
+ if (use_downloadbuffer) {
+ setup_downloadbuffer (urisrc, slot->queue);
} else {
- if (is_adaptive) {
- GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream");
- g_object_set (queue, "use-buffering", urisrc->use_buffering,
- "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
+ g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
+ if (info->demuxer) {
+ /* If a adaptive demuxer or parsebin is used, use more accurate information */
+ g_object_set (queue, "use-tags-bitrate", TRUE, "use-rate-estimate",
+ FALSE, NULL);
} else {
- GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
- g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
+ GST_DEBUG_OBJECT (queue,
+ "Setting ring-buffer-max-size %" G_GUINT64_FORMAT,
+ urisrc->ring_buffer_max_size);
+ /* Else allow ring-buffer-max-size setting to be used */
+ g_object_set (queue, "ring-buffer-max-size",
+ urisrc->ring_buffer_max_size, NULL);
}
- g_object_set (queue, "ring-buffer-max-size",
- urisrc->ring_buffer_max_size, NULL);
/* Disable max-size-buffers - queue based on data rate to the default time limit */
g_object_set (queue, "max-size-buffers", 0, NULL);
/* save output slot so we can remove it later */
info->outputs = g_list_append (info->outputs, slot);
- GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT,
- slot->output_pad);
+ GST_DEBUG_OBJECT (urisrc,
+ "New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT,
+ slot->output_pad, originating_pad);
return slot;
OutputSlotInfo *slot;
GST_URI_SOURCE_BIN_LOCK (urisrc);
- /* Only use buffering on raw pads in very specific conditions */
+ /* Only use buffering (via queue2) on raw pads in very specific
+ * conditions */
+ info->use_queue2 = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri);
+
GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d",
urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri));
- slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering
- || !IS_QUEUE_URI (urisrc->uri), pad);
+ slot = new_output_slot (info, pad);
if (!slot) {
res = FALSE;
}
}
+static gboolean
+setup_parsebin_for_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
+{
+ GstURISourceBin *urisrc = info->urisrc;
+ GstPad *sinkpad;
+ GstPadLinkReturn link_res;
+
+ GST_DEBUG_OBJECT (urisrc, "Setting up parsebin for %" GST_PTR_FORMAT,
+ originating_pad);
+
+ GST_STATE_LOCK (urisrc);
+ GST_URI_SOURCE_BIN_LOCK (urisrc);
+
+ /* Set up optional pre-parsebin download/ringbuffer elements */
+ if (info->use_downloadbuffer || urisrc->ring_buffer_max_size) {
+ if (info->use_downloadbuffer) {
+ GST_DEBUG_OBJECT (urisrc, "Setting up pre-parsebin downloadbuffer");
+ info->pre_parse_queue = gst_element_factory_make ("downloadbuffer", NULL);
+ setup_downloadbuffer (urisrc, info->pre_parse_queue);
+ g_object_set (info->pre_parse_queue, "max-size-bytes",
+ GET_BUFFER_SIZE (urisrc), "max-size-time",
+ (guint64) GET_BUFFER_DURATION (urisrc), NULL);
+ } else if (urisrc->ring_buffer_max_size) {
+ /* If a ring-buffer-max-size is specified with parsebin, we set it up on
+ * the queue2 *before* parsebin. We will use its buffering levels instead
+ * of the ones from multiqueue */
+ GST_DEBUG_OBJECT (urisrc,
+ "Setting up pre-parsebin queue2 for ring-buffer-max-size %"
+ G_GUINT64_FORMAT, urisrc->ring_buffer_max_size);
+ info->pre_parse_queue = gst_element_factory_make ("queue2", NULL);
+ /* We do not use this queue2 for buffering levels, but the multiqueue */
+ g_object_set (info->pre_parse_queue, "use-buffering", FALSE,
+ "ring-buffer-max-size", urisrc->ring_buffer_max_size,
+ "max-size-buffers", 0, NULL);
+ }
+ gst_element_set_locked_state (info->pre_parse_queue, TRUE);
+ gst_bin_add (GST_BIN_CAST (urisrc), info->pre_parse_queue);
+ sinkpad = gst_element_get_static_pad (info->pre_parse_queue, "sink");
+ link_res = gst_pad_link (originating_pad, sinkpad);
+
+ gst_object_unref (sinkpad);
+ if (link_res != GST_PAD_LINK_OK)
+ goto could_not_link;
+ }
+
+ info->demuxer = gst_element_factory_make ("parsebin", NULL);
+ if (!info->demuxer) {
+ post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "parsebin");
+ return FALSE;
+ }
+ gst_element_set_locked_state (info->demuxer, TRUE);
+ gst_bin_add (GST_BIN_CAST (urisrc), info->demuxer);
+
+ if (info->pre_parse_queue) {
+ if (!gst_element_link_pads (info->pre_parse_queue, "src", info->demuxer,
+ "sink"))
+ goto could_not_link;
+ } else {
+ sinkpad = gst_element_get_static_pad (info->demuxer, "sink");
+
+ link_res = gst_pad_link (originating_pad, sinkpad);
+
+ gst_object_unref (sinkpad);
+ if (link_res != GST_PAD_LINK_OK)
+ goto could_not_link;
+ }
+
+ /* set up callbacks to create the links between parsebin and output */
+ g_signal_connect (info->demuxer,
+ "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
+ g_signal_connect (info->demuxer,
+ "pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
+
+ if (info->pre_parse_queue) {
+ gst_element_set_locked_state (info->pre_parse_queue, FALSE);
+ gst_element_sync_state_with_parent (info->pre_parse_queue);
+ }
+ gst_element_set_locked_state (info->demuxer, FALSE);
+ gst_element_sync_state_with_parent (info->demuxer);
+ GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+ GST_STATE_UNLOCK (urisrc);
+ return TRUE;
+
+could_not_link:
+ {
+ GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+ GST_STATE_UNLOCK (urisrc);
+ GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
+ (NULL), ("Can't link to (pre-)parsebin element"));
+ return FALSE;
+ }
+}
+
/* Called when:
* * Source element adds a new pad
* * typefind has found a type
gboolean is_raw;
GstStructure *s;
const gchar *media_type;
- gboolean do_download = FALSE;
GST_URI_SOURCE_BIN_LOCK (urisrc);
GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
", exposing", caps);
- slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
+ slot = new_output_slot (info, srcpad);
output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
/* Query the demuxer to see if it can handle buffering */
query = gst_query_new_buffering (GST_FORMAT_TIME);
- info->demuxer_handles_buffering = gst_element_query (info->demuxer, query);
+ info->use_queue2 = urisrc->use_buffering
+ && !gst_element_query (info->demuxer, query);
gst_query_unref (query);
GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d",
info->demuxer_handles_buffering);
gst_element_sync_state_with_parent (info->demuxer);
} else if (!urisrc->is_stream) {
- OutputSlotInfo *slot;
- GstPad *output_pad;
+ if (urisrc->parse_streams) {
+ /* GST_URI_SOURCE_BIN_LOCK (urisrc); */
+ setup_parsebin_for_slot (info, srcpad);
+ /* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
+ } else {
+ /* We don't need buffering here, expose immediately */
+ OutputSlotInfo *slot;
+ GstPad *output_pad;
- /* We don't need buffering here, expose immediately */
- GST_URI_SOURCE_BIN_LOCK (urisrc);
- slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad);
- output_pad = gst_object_ref (slot->output_pad);
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- expose_output_pad (urisrc, output_pad);
- gst_object_unref (output_pad);
+ GST_URI_SOURCE_BIN_LOCK (urisrc);
+ slot = new_output_slot (info, srcpad);
+ output_pad = gst_object_ref (slot->output_pad);
+ GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+ expose_output_pad (urisrc, output_pad);
+ gst_object_unref (output_pad);
+ }
} else {
- OutputSlotInfo *slot;
- GstPad *output_pad;
-
/* only enable download buffering if the upstream duration is known */
if (urisrc->download) {
GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES);
if (gst_pad_query (srcpad, query)) {
gint64 dur;
gst_query_parse_duration (query, NULL, &dur);
- do_download = (dur != -1);
+ info->use_downloadbuffer = (dur != -1);
}
gst_query_unref (query);
}
+ info->use_queue2 = urisrc->use_buffering;
- GST_DEBUG_OBJECT (urisrc, "check media-type %s, do_download:%d", media_type,
- do_download);
+ if (urisrc->parse_streams) {
+ /* GST_URI_SOURCE_BIN_LOCK (urisrc); */
+ setup_parsebin_for_slot (info, srcpad);
+ /* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
+ } else {
+ OutputSlotInfo *slot;
+ GstPad *output_pad;
- GST_URI_SOURCE_BIN_LOCK (urisrc);
- slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad);
+ GST_URI_SOURCE_BIN_LOCK (urisrc);
+ slot = new_output_slot (info, srcpad);
- gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
- pre_queue_event_probe, urisrc, NULL);
+ gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+ pre_queue_event_probe, urisrc, NULL);
- output_pad = gst_object_ref (slot->output_pad);
- GST_URI_SOURCE_BIN_UNLOCK (urisrc);
+ output_pad = gst_object_ref (slot->output_pad);
+ GST_URI_SOURCE_BIN_UNLOCK (urisrc);
- expose_output_pad (urisrc, output_pad);
- gst_object_unref (output_pad);
+ expose_output_pad (urisrc, output_pad);
+ gst_object_unref (output_pad);
+ }
}
return;
gst_element_set_state (slot->queue, GST_STATE_NULL);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
-
- gst_object_unref (slot->queue_sinkpad);
+ }
+ if (slot->queue_sinkpad) {
+ if (slot->linked_info && slot->linked_info->multiqueue)
+ gst_element_release_request_pad (slot->linked_info->multiqueue,
+ slot->queue_sinkpad);
+ gst_object_replace ((GstObject **) & slot->queue_sinkpad, NULL);
}
if (slot->demuxer_event_probe_id)
return;
}
-
g_mutex_lock (&urisrc->buffering_post_lock);
/*
}
break;
}
+ case GST_MESSAGE_STREAM_COLLECTION:
+ /* We only want to forward stream collection from the source element *OR*
+ * from adaptive demuxers. We do not want to forward them from the
+ * potential parsebins since there might be many and require aggregation
+ * to be useful/coherent. */
+ if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source
+ && !urisrc->is_adaptive) {
+ GST_DEBUG_OBJECT (bin,
+ "Dropping stream-collection from non-adaptive-demuxer %"
+ GST_PTR_FORMAT, GST_MESSAGE_SRC (msg));
+ gst_message_unref (msg);
+ msg = NULL;
+ }
+ break;
case GST_MESSAGE_BUFFERING:
handle_buffering_message (urisrc, msg);
msg = NULL;