static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux);
-static GstBuffer *gst_dash_demux_merge_buffer_list (GstFragment * fragment);
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
GstActiveStream * stream);
static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- if (g_queue_is_empty (stream->queue)) {
+ if (gst_data_queue_is_empty (stream->queue)) {
return FALSE;
}
}
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- while (!g_queue_is_empty (stream->queue)) {
- GstFragment *fragment = g_queue_pop_head (stream->queue);
- g_object_unref (fragment);
- }
- g_queue_clear (stream->queue);
+
+ gst_data_queue_flush (stream->queue);
}
}
static gboolean
+_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
+ GstDashDemuxStream * stream)
+{
+ /* TODO add limits */
+ return FALSE;
+}
+
+static void
+_data_queue_item_destroy (GstDataQueueItem * item)
+{
+ gst_mini_object_unref (item->object);
+ g_free (item);
+}
+
+static void
+gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
+ GstBuffer * fragment)
+{
+ GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1);
+
+ item->object = GST_MINI_OBJECT_CAST (fragment);
+ item->duration = GST_BUFFER_DURATION (fragment);
+ item->visible = TRUE;
+ item->size = GST_BUFFER_SIZE (fragment);
+
+ item->destroy = (GDestroyNotify) _data_queue_item_destroy;
+
+ gst_data_queue_push (stream->queue, item);
+}
+
+static gboolean
gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
{
GstDashDemux *demux;
chunk = list->data;
current_pos = chunk->start_time;
//current_sequence = chunk->number;
- GST_WARNING_OBJECT (demux, "%i <= %i (%i)", current_pos, target_pos,
- chunk->duration);
+ GST_WARNING_OBJECT (demux, "%llu <= %llu (%llu)", current_pos,
+ target_pos, chunk->duration);
if (current_pos <= target_pos
&& target_pos < current_pos + chunk->duration) {
break;
/* Restart the demux */
demux->cancelled = FALSE;
demux->end_of_manifest = FALSE;
+ for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+ GstDashDemuxStream *stream = iter->data;
+ gst_data_queue_set_flushing (stream->queue, FALSE);
+ }
gst_dash_demux_resume_download_task (demux);
gst_dash_demux_resume_stream_task (demux);
g_static_rec_mutex_unlock (&demux->stream_lock);
stream = g_new0 (GstDashDemuxStream, 1);
caps = gst_dash_demux_get_input_caps (demux, active_stream);
- stream->queue = g_queue_new ();
+ stream->queue =
+ gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full,
+ stream);
stream->index = i;
stream->input_caps = caps;
static void
gst_dash_demux_stop (GstDashDemux * demux)
{
+ GSList *iter;
+
gst_uri_downloader_cancel (demux->downloader);
+ for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+ GstDashDemuxStream *stream = iter->data;
+
+ gst_data_queue_set_flushing (stream->queue, TRUE);
+ }
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->download_task);
GSList *iter;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+ GstDataQueueItem *item;
GstDashDemuxStream *stream = iter->data;
- GstFragment *newFragment = g_queue_peek_head (stream->queue);
GstCaps *srccaps = NULL;
+ GstBuffer *buffer;
- if (newFragment == NULL) {
+ if (!gst_data_queue_peek (stream->queue, &item))
continue;
- }
- if (stream->output_caps)
- gst_caps_unref (stream->output_caps);
- stream->output_caps = gst_fragment_get_caps (newFragment);
+
+ buffer = GST_BUFFER_CAST (item->object);
+
+ gst_caps_replace (&stream->output_caps, GST_BUFFER_CAPS (buffer));
+
if (G_LIKELY (stream->pad))
srccaps = gst_pad_get_negotiated_caps (stream->pad);
if (G_UNLIKELY (!srccaps
gst_dash_demux_stream_loop (GstDashDemux * demux)
{
GstFlowReturn ret;
- GstBufferList *buffer_list;
guint nb_adaptation_set = 0;
GstActiveStream *active_stream;
gboolean switch_pad;
}
for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
+ GstDataQueueItem *item;
+ GstBuffer *buffer;
GstDashDemuxStream *stream = iter->data;
- GstFragment *fragment = g_queue_pop_head (stream->queue);
-
- if (!fragment)
+ if (!gst_data_queue_pop (stream->queue, &item))
continue;
+ buffer = GST_BUFFER_CAST (item->object);
+
active_stream = gst_mpdparser_get_active_stream_by_index (demux->client, i);
if (demux->need_segment) {
- GstClockTime start = fragment->start_time + demux->position_shift;
+ GstClockTime start =
+ GST_BUFFER_TIMESTAMP (buffer) + demux->position_shift;
/* And send a newsegment */
GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
GST_TIME_FORMAT, GST_TIME_ARGS (start));
start, GST_CLOCK_TIME_NONE, start));
}
- GST_DEBUG_OBJECT (demux, "Pushing fragment #%d (stream %i)",
- fragment->index, i);
- buffer_list = gst_fragment_get_buffer_list (fragment);
- g_object_unref (fragment);
- ret = gst_pad_push_list (stream->pad, buffer_list);
+ GST_DEBUG_OBJECT (demux,
+ "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
+ GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer), i,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+ ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer));
+ item->destroy (item);
if ((ret != GST_FLOW_OK) && (active_stream->mimeType == GST_STREAM_VIDEO))
goto error_pushing;
}
gst_object_unref (stream->pad);
/* TODO flush the queue */
- g_queue_free (stream->queue);
+ g_object_unref (stream->queue);
g_free (stream);
}
static GstClockTime
gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
{
- GstFragment *first_fragment, *last_fragment;
- GstClockTime buffer_time = 0;
-
- /* get first fragment */
- first_fragment = g_queue_peek_head (stream->queue);
- /* get last fragment */
- last_fragment = g_queue_peek_tail (stream->queue);
-
- if (!first_fragment && !last_fragment)
- return 0;
+ GstDataQueueSize level;
- if (first_fragment && last_fragment) {
- buffer_time = last_fragment->stop_time - first_fragment->start_time;
- }
+ gst_data_queue_get_level (stream->queue, &level);
- return buffer_time;
+ return (GstClockTime) level.time;
}
static float
return buffering_time / demux->min_buffering_time;
}
-static GstBuffer *
-gst_dash_demux_merge_buffer_list (GstFragment * fragment)
-{
- GstBufferList *list;
- GstBufferListIterator *it;
- GstBuffer *buffer, *ret = NULL;
- GstAdapter *adapter;
- gsize size;
-
- adapter = gst_adapter_new ();
- list = gst_fragment_get_buffer_list (fragment);
- it = gst_buffer_list_iterate (list);
- while (gst_buffer_list_iterator_next_group (it)) {
- while ((buffer = gst_buffer_list_iterator_next (it)) != NULL) {
- gst_adapter_push (adapter, gst_buffer_ref (buffer));
- }
- }
- gst_buffer_list_iterator_free (it);
- gst_buffer_list_unref (list);
- size = gst_adapter_available (adapter);
- if (size > 0)
- ret = gst_adapter_take_buffer (adapter, size);
- GST_DEBUG ("Extracted a buffer of size %d from the fragment", size);
- g_object_unref (adapter);
-
- return ret;
-}
-
/* gst_dash_demux_download_loop:
*
* Loop for the "download' task that fetches fragments based on the
"Failed to update the manifest file from URL %s",
demux->client->mpd_uri);
} else {
- buffer = gst_dash_demux_merge_buffer_list (download);
+ buffer = gst_fragment_get_buffer (download);
g_object_unref (download);
/* parse the manifest file */
if (buffer == NULL) {
return fragment;
}
-static GstBufferListItem
-gst_dash_demux_add_buffer_cb (GstBuffer ** buffer,
- guint group, guint idx, gpointer user_data)
-{
- GstFragment *frag = GST_FRAGMENT (user_data);
- /* This buffer still belongs to the original fragment */
- /* so we need to increase refcount */
- gst_fragment_add_buffer (frag, gst_buffer_ref (*buffer));
- return GST_BUFFER_LIST_CONTINUE;
-}
-
-/* Since we cannot add headers after the chunk has been downloaded, we have to recreate a new fragment */
-static GstFragment *
-gst_dash_demux_prepend_header (GstDashDemux * demux,
- GstFragment * frag, GstFragment * header)
-{
- GstBufferList *list;
- GstFragment *res = gst_fragment_new ();
- res->name = g_strdup (frag->name);
- res->download_start_time = frag->download_start_time;
- res->download_stop_time = frag->download_stop_time;
- res->start_time = frag->start_time;
- res->stop_time = frag->stop_time;
- res->index = frag->index;
- res->discontinuous = frag->discontinuous;
-
- list = gst_fragment_get_buffer_list (header);
- gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
- gst_buffer_list_unref (list);
- list = gst_fragment_get_buffer_list (frag);
- gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
- gst_buffer_list_unref (list);
-
- res->completed = TRUE;
-
- return res;
-}
-
static GstCaps *
gst_dash_demux_get_video_input_caps (GstDashDemux * demux,
GstActiveStream * stream)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
guint stream_idx = stream->index;
+ GstBuffer *buffer;
if (!gst_mpd_client_get_next_fragment (demux->client,
stream_idx, &discont, &next_fragment_uri, &duration, ×tamp)) {
}
GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
- GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
+ GST_INFO_OBJECT (demux,
+ "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
+ GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp),
+ GST_TIME_ARGS (duration));
download = gst_uri_downloader_fetch_uri (demux->downloader,
next_fragment_uri);
if (download == NULL)
return FALSE;
- download->start_time = timestamp;
- download->stop_time = timestamp + duration;
+ buffer = gst_fragment_get_buffer (download);
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
- if (stream == NULL)
+ if (stream == NULL) /* TODO unref fragments */
return FALSE;
- download->index = gst_mpd_client_get_segment_index (active_stream) - 1;
-
if (need_header) {
/* We need to fetch a new header */
if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
GST_INFO_OBJECT (demux, "Unable to fetch header");
} else {
+ GstBuffer *header_buffer;
/* Replace fragment with a new one including the header */
- GstFragment *new_fragment =
- gst_dash_demux_prepend_header (demux, download, header);
- g_object_unref (header);
- g_object_unref (download);
- download = new_fragment;
+
+ header_buffer = gst_fragment_get_buffer (header);
+ buffer = gst_buffer_join (header_buffer, buffer);
}
}
- gst_fragment_set_caps (download, stream->input_caps);
- g_queue_push_tail (stream->queue, download);
- size_buffer += gst_fragment_get_buffer_size (download);
+ buffer = gst_buffer_make_metadata_writable (buffer);
+
+ GST_BUFFER_TIMESTAMP (buffer) = timestamp;
+ GST_BUFFER_DURATION (buffer) = duration;
+ GST_BUFFER_OFFSET (buffer) =
+ gst_mpd_client_get_segment_index (active_stream) - 1;
+
+ gst_buffer_set_caps (buffer, stream->input_caps);
+ gst_dash_demux_stream_push_data (stream, buffer);
+ size_buffer += GST_BUFFER_SIZE (buffer);
}
/* Wake the download task up */
*/
#include <glib.h>
-#include <gst/base/gsttypefindhelper.h>
+#include <gst/base/gstadapter.h>
#include "gstfragmented.h"
#include "gstfragment.h"
PROP_NAME,
PROP_DURATION,
PROP_DISCONTINOUS,
- PROP_BUFFER_LIST,
- PROP_CAPS,
PROP_LAST
};
struct _GstFragmentPrivate
{
- GstBufferList *buffer_list;
- guint64 size;
- GstBufferListIterator *buffer_iterator;
- GstCaps *caps;
- GMutex lock;
+ GstAdapter *adapter;
+ GstBuffer *buffer;
};
G_DEFINE_TYPE (GstFragment, gst_fragment, G_TYPE_OBJECT);
static void gst_fragment_finalize (GObject * object);
static void
-gst_fragment_set_property (GObject * object,
- guint property_id, const GValue * value, GParamSpec * pspec)
-{
- GstFragment *fragment = GST_FRAGMENT (object);
-
- switch (property_id) {
- case PROP_CAPS:
- gst_fragment_set_caps (fragment, g_value_get_boxed (value));
- break;
-
- default:
- /* We don't have any other property... */
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
- break;
- }
-}
-
-static void
gst_fragment_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec)
{
g_value_set_boolean (value, fragment->discontinuous);
break;
- case PROP_BUFFER_LIST:
- g_value_set_object (value, gst_fragment_get_buffer_list (fragment));
- break;
-
- case PROP_CAPS:
- g_value_set_boxed (value, gst_fragment_get_caps (fragment));
- break;
-
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
}
}
-
-
static void
gst_fragment_class_init (GstFragmentClass * klass)
{
g_type_class_add_private (klass, sizeof (GstFragmentPrivate));
- gobject_class->set_property = gst_fragment_set_property;
gobject_class->get_property = gst_fragment_get_property;
gobject_class->dispose = gst_fragment_dispose;
gobject_class->finalize = gst_fragment_finalize;
g_object_class_install_property (gobject_class, PROP_DURATION,
g_param_spec_uint64 ("duration", "Fragment duration",
"Duration of the fragment", 0, G_MAXUINT64, 0, G_PARAM_READABLE));
-
- g_object_class_install_property (gobject_class, PROP_BUFFER_LIST,
- g_param_spec_object ("buffer-list", "Buffer List",
- "A list with the fragment's buffers", GST_TYPE_FRAGMENT,
- G_PARAM_READABLE));
-
- g_object_class_install_property (gobject_class, PROP_CAPS,
- g_param_spec_boxed ("caps", "Fragment caps",
- "The caps of the fragment's buffer. (NULL = detect)", GST_TYPE_CAPS,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
fragment->priv = priv = GST_FRAGMENT_GET_PRIVATE (fragment);
- g_mutex_init (&fragment->priv->lock);
- priv->buffer_list = gst_buffer_list_new ();
- priv->size = 0;
- priv->buffer_iterator = gst_buffer_list_iterate (priv->buffer_list);
- gst_buffer_list_iterator_add_group (priv->buffer_iterator);
- fragment->download_start_time = g_get_real_time ();
+ priv->adapter = gst_adapter_new ();
+ fragment->download_start_time = gst_util_get_timestamp ();
fragment->start_time = 0;
fragment->stop_time = 0;
fragment->index = 0;
GstFragment *fragment = GST_FRAGMENT (gobject);
g_free (fragment->name);
- g_mutex_clear (&fragment->priv->lock);
G_OBJECT_CLASS (gst_fragment_parent_class)->finalize (gobject);
}
{
GstFragmentPrivate *priv = GST_FRAGMENT (object)->priv;
- if (priv->buffer_list != NULL) {
- gst_buffer_list_iterator_free (priv->buffer_iterator);
- gst_buffer_list_unref (priv->buffer_list);
- priv->buffer_list = NULL;
- priv->size = 0;
+ if (priv->adapter) {
+ gst_object_unref (priv->adapter);
+ priv->adapter = NULL;
}
-
- if (priv->caps != NULL) {
- gst_caps_unref (priv->caps);
- priv->caps = NULL;
+ if (priv->buffer) {
+ gst_buffer_unref (priv->buffer);
+ priv->buffer = NULL;
}
G_OBJECT_CLASS (gst_fragment_parent_class)->dispose (object);
}
-GstBufferList *
-gst_fragment_get_buffer_list (GstFragment * fragment)
-{
- g_return_val_if_fail (fragment != NULL, NULL);
-
- if (!fragment->completed)
- return NULL;
-
- gst_buffer_list_ref (fragment->priv->buffer_list);
- return fragment->priv->buffer_list;
-}
-
-void
-gst_fragment_set_caps (GstFragment * fragment, GstCaps * caps)
-{
- g_return_if_fail (fragment != NULL);
-
- g_mutex_lock (&fragment->priv->lock);
- gst_caps_replace (&fragment->priv->caps, caps);
- g_mutex_unlock (&fragment->priv->lock);
-}
-
-GstCaps *
-gst_fragment_get_caps (GstFragment * fragment)
+GstBuffer *
+gst_fragment_get_buffer (GstFragment * fragment)
{
g_return_val_if_fail (fragment != NULL, NULL);
if (!fragment->completed)
return NULL;
- g_mutex_lock (&fragment->priv->lock);
- if (fragment->priv->caps == NULL) {
- GstBuffer *buf = gst_buffer_list_get (fragment->priv->buffer_list, 0, 0);
- fragment->priv->caps = gst_type_find_helper_for_buffer (NULL, buf, NULL);
+ if (!fragment->priv->buffer) {
+ fragment->priv->buffer = gst_adapter_take_buffer (fragment->priv->adapter,
+ gst_adapter_available (fragment->priv->adapter));
}
- gst_caps_ref (fragment->priv->caps);
- g_mutex_unlock (&fragment->priv->lock);
- return fragment->priv->caps;
+ return gst_buffer_ref (fragment->priv->buffer);
}
-guint64
-gst_fragment_get_buffer_size (GstFragment * fragment)
-{
- g_return_val_if_fail (fragment != NULL, 0);
-
- if (!fragment->completed)
- return 0;
- return fragment->priv->size;
-}
-
-
-
gboolean
gst_fragment_add_buffer (GstFragment * fragment, GstBuffer * buffer)
{
return FALSE;
}
- gst_buffer_list_iterator_add (fragment->priv->buffer_iterator, buffer);
- fragment->priv->size = fragment->priv->size + GST_BUFFER_SIZE (buffer);
+ GST_DEBUG ("Adding new buffer to the fragment");
+ /* We steal the buffers you pass in */
+ gst_adapter_push (fragment->priv->adapter, buffer);
+
return TRUE;
}
+gsize
+gst_fragment_get_total_size (GstFragment * fragment)
+{
+ g_return_val_if_fail (GST_IS_FRAGMENT (fragment), 0);
+
+ if (fragment->priv->buffer)
+ return GST_BUFFER_SIZE (fragment->priv->buffer);
+
+ return gst_adapter_available (fragment->priv->adapter);
+}