#include <string.h>
#include <inttypes.h>
#include <gst/base/gsttypefindhelper.h>
+#include "gst/gst-i18n-plugin.h"
#include "gstdashdemux.h"
#include "gstdash_debug.h"
#define GST_DASH_DEMUX_CLIENT_LOCK(d) g_mutex_lock (&d->client_lock)
#define GST_DASH_DEMUX_CLIENT_UNLOCK(d) g_mutex_unlock (&d->client_lock)
-/* Custom internal event to signal end of period */
-#define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED)
-static GstEvent *
-gst_event_new_dash_eop (void)
-{
- return gst_event_new_custom (GST_EVENT_DASH_EOP, NULL);
-}
-
/* GObject */
static void gst_dash_demux_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
GstEvent * event);
static gboolean gst_dash_demux_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
-static void gst_dash_demux_stream_loop (GstDashDemux * demux);
static void gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream);
static void gst_dash_demux_stop (GstDashDemux * demux);
-static void gst_dash_demux_resume_stream_task (GstDashDemux * demux);
+static void gst_dash_demux_wait_stop (GstDashDemux * demux);
static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
-static gboolean
-gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
+static GstEvent
+ * gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
stream);
-static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux,
- GstDashDemuxStream * stream, GstClockTime * next_ts);
+static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream
+ * stream, GstClockTime * ts);
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
static void gst_dash_demux_download_wait (GstDashDemuxStream * stream,
GstClockTime time_diff);
GSList * streams);
static void gst_dash_demux_stream_free (GstDashDemuxStream * stream);
static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
-#ifndef GST_DISABLE_GST_DEBUG
-static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
-static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
- * stream);
-#endif
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
GstActiveStream * stream);
static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux);
gst_dash_demux_reset (demux, TRUE);
- if (demux->stream_task) {
- gst_object_unref (demux->stream_task);
- g_rec_mutex_clear (&demux->stream_task_lock);
- demux->stream_task = NULL;
- }
-
if (demux->downloader != NULL) {
g_object_unref (demux->downloader);
demux->downloader = NULL;
demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
demux->max_bitrate = DEFAULT_MAX_BITRATE;
- /* Streaming task */
- g_rec_mutex_init (&demux->stream_task_lock);
- demux->stream_task =
- gst_task_new ((GstTaskFunction) gst_dash_demux_stream_loop, demux, NULL);
- gst_task_set_lock (demux->stream_task, &demux->stream_task_lock);
-
g_mutex_init (&demux->client_lock);
}
return ret;
}
-static gboolean
-_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
- GstDashDemux * demux)
-{
- return time >= demux->max_buffering_time;
-}
-
-static void
-_data_queue_item_destroy (GstDataQueueItem * item)
-{
- gst_mini_object_unref (item->object);
- g_free (item);
-}
-
-static void
-gst_dash_demux_stream_push_event (GstDashDemuxStream * stream, GstEvent * event)
-{
- GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1);
-
- item->object = GST_MINI_OBJECT_CAST (event);
- item->destroy = (GDestroyNotify) _data_queue_item_destroy;
-
- gst_data_queue_push_force (stream->queue, item);
-}
-
-static void
-gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
- GstBuffer * fragment)
-{
- GstDataQueueItem *item = g_new (GstDataQueueItem, 1);
-
- item->object = GST_MINI_OBJECT_CAST (fragment);
- item->duration = GST_BUFFER_DURATION (fragment);
- item->visible = TRUE;
- item->size = gst_buffer_get_size (fragment);
-
- item->destroy = (GDestroyNotify) _data_queue_item_destroy;
-
- gst_data_queue_push (stream->queue, item);
-}
-
static void
gst_dash_demux_stream_seek (GstDashDemuxStream * stream,
GstClockTime target_pos)
start, stop_type, stop, &update);
if (update) {
+ GstEvent *seg_evt;
+
+ GST_DASH_DEMUX_CLIENT_LOCK (demux);
+
if (flags & GST_SEEK_FLAG_FLUSH) {
GST_DEBUG_OBJECT (demux, "sending flush start");
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
}
}
- /* Stop the demux */
- demux->cancelled = TRUE;
- /* Stop the demux, also clears the buffering queue */
gst_dash_demux_stop (demux);
-
- /* Wait for streaming to finish */
- g_rec_mutex_lock (&demux->stream_task_lock);
+ GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
+ gst_dash_demux_wait_stop (demux);
/* select the requested Period in the Media Presentation */
target_pos = (GstClockTime) demux->segment.start;
}
/* Update the current sequence on all streams */
+ seg_evt = gst_event_new_segment (&demux->segment);
+ gst_event_set_seqnum (seg_evt, gst_event_get_seqnum (event));
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
- gst_dash_demux_stream_seek (iter->data, target_pos);
+ GstDashDemuxStream *stream = iter->data;
+ gst_dash_demux_stream_seek (stream, target_pos);
+
+ gst_event_replace (&stream->pending_segment, seg_evt);
}
+ gst_event_unref (seg_evt);
if (flags & GST_SEEK_FLAG_FLUSH) {
GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
GstDashDemuxStream *stream;
stream = iter->data;
- stream->has_data_queued = FALSE;
stream->need_header = TRUE;
- stream->download_end_of_period = FALSE;
- stream->stream_end_of_period = FALSE;
stream->stream_eos = FALSE;
gst_pad_push_event (stream->pad, gst_event_new_flush_stop (TRUE));
}
}
/* Restart the demux */
+ GST_DASH_DEMUX_CLIENT_LOCK (demux);
demux->cancelled = FALSE;
demux->end_of_manifest = FALSE;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- gst_data_queue_set_flushing (stream->queue, FALSE);
stream->last_ret = GST_FLOW_OK;
gst_uri_downloader_reset (stream->downloader);
}
demux->timestamp_offset = 0;
- demux->need_segment = TRUE;
gst_uri_downloader_reset (demux->downloader);
GST_DEBUG_OBJECT (demux, "Resuming tasks after seeking");
gst_dash_demux_resume_download_task (demux);
- gst_dash_demux_resume_stream_task (demux);
- g_rec_mutex_unlock (&demux->stream_task_lock);
+ GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
}
return TRUE;
stream->restart_download = TRUE;
stream->need_header = TRUE;
GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
+ gst_task_start (stream->download_task);
}
- gst_task_start (stream->download_task);
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
gst_event_unref (event);
return TRUE;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)",
- stream->download_end_of_period,
+ GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)", stream->stream_eos,
stream->last_ret != GST_FLOW_NOT_LINKED);
- if (!stream->download_end_of_period
- && stream->last_ret != GST_FLOW_NOT_LINKED)
+ if (!stream->stream_eos && stream->last_ret != GST_FLOW_NOT_LINKED)
return FALSE;
}
stream->demux = demux;
stream->active_stream = active_stream;
caps = gst_dash_demux_get_input_caps (demux, active_stream);
- stream->queue =
- gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full,
- NULL, NULL, demux);
g_rec_mutex_init (&stream->download_task_lock);
stream->download_task =
stream->index = i;
stream->input_caps = caps;
stream->need_header = TRUE;
- stream->has_data_queued = FALSE;
gst_download_rate_init (&stream->dnl_rate);
gst_download_rate_set_max_length (&stream->dnl_rate,
DOWNLOAD_RATE_HISTORY_MAX);
gst_pad_push_event (stream->pad, event);
g_free (stream_id);
- gst_dash_demux_stream_push_event (stream, gst_event_new_caps (caps));
+ gst_pad_push_event (stream->pad, gst_event_new_caps (caps));
}
streams = g_slist_reverse (streams);
GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
+ GST_DASH_DEMUX_CLIENT_LOCK (demux);
if (demux->client)
gst_mpd_client_free (demux->client);
demux->client = gst_mpd_client_new ();
}
}
demux->timestamp_offset = -1;
- demux->need_segment = TRUE;
gst_dash_demux_resume_download_task (demux);
- gst_dash_demux_resume_stream_task (demux);
+ GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
seek_quit:
gst_event_unref (event);
}
static void
-gst_dash_demux_stop (GstDashDemux * demux)
+gst_dash_demux_wait_stop (GstDashDemux * demux)
{
GSList *iter;
- GST_DEBUG_OBJECT (demux, "Stopping demux");
-
- if (demux->downloader)
- gst_uri_downloader_cancel (demux->downloader);
-
+ GST_DEBUG_OBJECT (demux, "Waiting for threads to stop");
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- gst_data_queue_set_flushing (stream->queue, TRUE);
- if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
- stream->last_ret = GST_FLOW_FLUSHING;
- stream->need_header = TRUE;
- gst_task_stop (stream->download_task);
- GST_TASK_SIGNAL (stream->download_task);
- gst_uri_downloader_cancel (stream->downloader);
-
- gst_task_join (stream->download_task);
- }
+ gst_task_join (stream->download_task);
}
+}
- if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
- GST_TASK_SIGNAL (demux->stream_task);
- gst_task_stop (demux->stream_task);
- g_rec_mutex_lock (&demux->stream_task_lock);
- g_rec_mutex_unlock (&demux->stream_task_lock);
- gst_task_join (demux->stream_task);
- }
+static void
+gst_dash_demux_stop (GstDashDemux * demux)
+{
+ GSList *iter;
+
+ GST_DEBUG_OBJECT (demux, "Stopping demux");
+ demux->cancelled = TRUE;
+
+ if (demux->downloader)
+ gst_uri_downloader_cancel (demux->downloader);
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- gst_data_queue_flush (stream->queue);
- stream->has_data_queued = FALSE;
+ stream->last_ret = GST_FLOW_FLUSHING;
+ stream->need_header = TRUE;
+ gst_task_stop (stream->download_task);
+ GST_TASK_SIGNAL (stream->download_task);
+ gst_uri_downloader_cancel (stream->downloader);
}
}
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- GST_LOG_OBJECT (demux, "Exposing stream %d %" GST_PTR_FORMAT, stream->index,
- stream->input_caps);
+ GST_LOG_OBJECT (stream->pad, "Exposing stream %d %" GST_PTR_FORMAT,
+ stream->index, stream->input_caps);
+
gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad));
}
gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
for (iter = streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;;
- GST_LOG_OBJECT (demux, "Removing stream %d %" GST_PTR_FORMAT, stream->index,
- stream->input_caps);
+ GST_LOG_OBJECT (stream->pad, "Removing stream %d %" GST_PTR_FORMAT,
+ stream->index, stream->input_caps);
gst_pad_push_event (stream->pad, gst_event_ref (eos));
gst_pad_set_active (stream->pad, FALSE);
gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
gst_dash_demux_advance_period (GstDashDemux * demux)
{
GSList *old_period = NULL;
- GST_DASH_DEMUX_CLIENT_LOCK (demux);
+ GSList *iter;
+ GstEvent *seg_evt;
GST_DEBUG_OBJECT (demux, "Advancing period from %p", demux->streams);
return FALSE;
}
- gst_dash_demux_expose_streams (demux);
- gst_dash_demux_remove_streams (demux, old_period);
-
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
- return TRUE;
-}
-
-static GstFlowReturn
-gst_dash_demux_combine_flows (GstDashDemux * demux)
-{
- gboolean all_notlinked = TRUE;
- GSList *iter;
+ /* TODO protect with lock, using the client lock isn't useful
+ * because causes deadlocks with the event_src handler */
+ gst_dash_demux_expose_streams (demux);
+ seg_evt = gst_event_new_segment (&demux->segment);
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
- if (stream->last_ret != GST_FLOW_NOT_LINKED)
- all_notlinked = FALSE;
-
- if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
- || stream->last_ret == GST_FLOW_FLUSHING)
- return stream->last_ret;
+ gst_event_replace (&stream->pending_segment, seg_evt);
}
- if (all_notlinked)
- return GST_FLOW_NOT_LINKED;
- return GST_FLOW_OK;
-}
+ gst_event_unref (seg_evt);
+ gst_dash_demux_remove_streams (demux, old_period);
+ GST_DASH_DEMUX_CLIENT_LOCK (demux);
+ return TRUE;
+}
-/* gst_dash_demux_stream_loop:
- *
- * Loop for the "stream' task that pushes fragments to the src pads.
- *
- * Startup:
- * The task is started as soon as we have received the manifest and
- * waits for the first fragment to be downloaded and pushed in the
- * queue. Once this fragment has been pushed, the task pauses itself
- * until actual playback begins.
- *
- * During playback:
- * The task pushes fragments downstream at regular intervals based on
- * the fragment duration. If it detects a queue underrun, it sends
- * a buffering event to tell the main application to pause.
- *
- * Teardown:
- * The task is stopped when we have reached the end of the manifest
- * and emptied our queue.
- *
- */
-static void
-gst_dash_demux_stream_loop (GstDashDemux * demux)
+static GstFlowReturn
+gst_dash_demux_push (GstDashDemuxStream * stream, GstBuffer * buffer)
{
- GstFlowReturn ret;
- GSList *iter;
- GstClockTime best_time;
- GstDashDemuxStream *selected_stream;
- gboolean eos = TRUE;
- gboolean eop = TRUE;
-
- GST_LOG_OBJECT (demux, "Starting stream loop");
-
- best_time = GST_CLOCK_TIME_NONE;
- selected_stream = NULL;
-
- for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
- GstDashDemuxStream *stream = iter->data;
- GstDataQueueItem *item;
-
- GST_DEBUG_OBJECT (demux, "Peeking stream %d", stream->index);
-
- if (stream->last_ret == GST_FLOW_NOT_LINKED)
- continue;
-
- if (stream->stream_eos) {
- GST_DEBUG_OBJECT (demux, "Stream %d is eos, skipping", stream->index);
- continue;
- }
-
- if (stream->stream_end_of_period) {
- GST_DEBUG_OBJECT (demux, "Stream %d is eop, skipping", stream->index);
- eos = FALSE;
- continue;
- }
- eos = FALSE;
- eop = FALSE;
-
- GST_DEBUG_OBJECT (demux, "peeking at the queue for stream %d",
- stream->index);
- if (!gst_data_queue_peek (stream->queue, &item)) {
- /* flushing */
- goto flushing;
- }
-
- if (G_LIKELY (GST_IS_BUFFER (item->object))) {
- GstBuffer *buffer;
- GstClockTime timestamp;
-
- buffer = GST_BUFFER_CAST (item->object);
- timestamp = GST_BUFFER_TIMESTAMP (buffer);
-
- GST_LOG_OBJECT (demux, "Buffer with time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timestamp));
-
- if (timestamp < best_time) {
- GST_DEBUG_OBJECT (demux, "Found new best time: %" GST_TIME_FORMAT " %p",
- GST_TIME_ARGS (timestamp), buffer);
- best_time = timestamp;
- selected_stream = stream;
- } else if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
- selected_stream = stream;
- GST_DEBUG_OBJECT (demux, "Buffer without timestamp selected %p",
- buffer);
- break;
- }
- } else {
- GST_DEBUG_OBJECT (demux, "Non buffers have preference %" GST_PTR_FORMAT,
- item->object);
- selected_stream = stream;
- break;
- }
- }
-
- ret = GST_FLOW_OK;
- if (selected_stream) {
- GstDataQueueItem *item;
-
- GST_DEBUG_OBJECT (demux, "Selected stream %p %d", selected_stream,
- selected_stream->index);
-
- if (!gst_data_queue_pop (selected_stream->queue, &item))
- goto end;
-
- if (G_LIKELY (GST_IS_BUFFER (item->object))) {
- GstBuffer *buffer;
- GstClockTime timestamp, duration;
-
- buffer = GST_BUFFER_CAST (item->object);
- timestamp = GST_BUFFER_TIMESTAMP (buffer);
-
- if (demux->need_segment) {
- if (demux->timestamp_offset == -1)
- demux->timestamp_offset = timestamp;
+ GstDashDemux *demux = stream->demux;
+ GstClockTime timestamp, duration;
+ GstFlowReturn ret = GST_FLOW_OK;
- /* And send a newsegment */
- for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
- GstDashDemuxStream *stream = iter->data;
- gst_pad_push_event (stream->pad,
- gst_event_new_segment (&demux->segment));
- }
- demux->need_segment = FALSE;
- }
- /* make timestamp start from 0 by subtracting the offset */
- timestamp -= demux->timestamp_offset;
- duration = GST_BUFFER_DURATION (buffer);
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
- GST_BUFFER_TIMESTAMP (buffer) = timestamp;
+ if (stream->pending_segment) {
+ if (demux->timestamp_offset == -1)
+ demux->timestamp_offset = timestamp;
+ else
+ demux->timestamp_offset = MIN (timestamp, demux->timestamp_offset);
- GST_DEBUG_OBJECT (demux,
- "Pushing fragment ts: %" GST_TIME_FORMAT " at pad %s",
- GST_TIME_ARGS (timestamp), GST_PAD_NAME (selected_stream->pad));
- GST_DEBUG_OBJECT (demux,
- "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
- GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " at pad: %s:%s", buffer,
- GST_BUFFER_OFFSET (buffer), selected_stream->index,
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
- GST_DEBUG_PAD_NAME (selected_stream->pad));
- ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
- GST_DEBUG_OBJECT (demux, "Push result: %d %s", ret,
- gst_flow_get_name (ret));
-
- demux->segment.position = timestamp;
- selected_stream->position = timestamp;
- if (GST_CLOCK_TIME_IS_VALID (duration))
- selected_stream->position += duration;
-
- item->destroy (item);
- } else {
- /* a GstEvent */
- if (GST_EVENT_TYPE (item->object) == GST_EVENT_EOS) {
- selected_stream->stream_end_of_period = TRUE;
- selected_stream->stream_eos = TRUE;
- ret = GST_FLOW_EOS;
- } else if (GST_EVENT_TYPE (item->object) == GST_EVENT_DASH_EOP) {
- selected_stream->stream_end_of_period = TRUE;
- }
-
- if (GST_EVENT_TYPE (item->object) != GST_EVENT_DASH_EOP) {
- gst_pad_push_event (selected_stream->pad,
- gst_event_ref (GST_EVENT_CAST (item->object)));
- }
- item->destroy (item);
- }
- } else {
- if (eos) {
- goto end_of_manifest;
- } else if (eop) {
- gst_dash_demux_advance_period (demux);
- demux->need_segment = TRUE;
- }
+ /* And send a newsegment */
+ gst_pad_push_event (stream->pad, stream->pending_segment);
+ stream->pending_segment = NULL;
}
- if (selected_stream) {
- GST_DASH_DEMUX_CLIENT_LOCK (demux);
- if (ret != selected_stream->last_ret) {
- gst_task_start (selected_stream->download_task);
- selected_stream->last_ret = ret;
- }
- switch (selected_stream->last_ret) {
- case GST_FLOW_NOT_LINKED:
- gst_data_queue_set_flushing (selected_stream->queue, TRUE);
- break;
- default:
- break;
- }
-
- /* combine flow returns */
- ret = gst_dash_demux_combine_flows (demux);
- if (ret < 0) {
- goto error;
- }
- GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
- }
+ /* make timestamp start from 0 by subtracting the offset */
+ timestamp -= demux->timestamp_offset;
+ duration = GST_BUFFER_DURATION (buffer);
-end:
- GST_INFO_OBJECT (demux, "Leaving streaming task");
- return;
+ GST_BUFFER_TIMESTAMP (buffer) = timestamp;
-flushing:
- {
- GST_WARNING_OBJECT (demux, "Flushing, leaving streaming task");
- gst_task_stop (demux->stream_task);
- return;
- }
+ GST_DEBUG_OBJECT (stream->pad,
+ "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
+ GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT, buffer,
+ GST_BUFFER_OFFSET (buffer), stream->index,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+ ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer));
+ GST_DEBUG_OBJECT (stream->pad, "Push result: %d %s", ret,
+ gst_flow_get_name (ret));
-end_of_manifest:
- {
- GST_INFO_OBJECT (demux, "Reached end of manifest, stopping streaming task");
- gst_task_stop (demux->stream_task);
- return;
- }
+ demux->segment.position = timestamp;
+ stream->position = timestamp;
+ if (GST_CLOCK_TIME_IS_VALID (duration))
+ stream->position += duration;
-error:
- {
- GST_ERROR_OBJECT (demux,
- "Error pushing buffer: %s... terminating the demux",
- gst_flow_get_name (ret));
- gst_task_stop (demux->stream_task);
- return;
- }
+ return ret;
}
static void
gst_caps_unref (stream->input_caps);
stream->input_caps = NULL;
}
+ if (stream->pending_segment) {
+ gst_event_unref (stream->pending_segment);
+ }
if (stream->pad) {
gst_object_unref (stream->pad);
stream->pad = NULL;
}
- if (stream->queue) {
- g_object_unref (stream->queue);
- stream->queue = NULL;
- }
if (stream->download_task) {
gst_task_stop (stream->download_task);
- gst_task_join (stream->download_task);
gst_object_unref (stream->download_task);
g_rec_mutex_clear (&stream->download_task_lock);
}
demux->end_of_period = FALSE;
demux->end_of_manifest = FALSE;
- demux->cancelled = TRUE;
gst_dash_demux_stop (demux);
+ gst_dash_demux_wait_stop (demux);
if (demux->downloader)
gst_uri_downloader_reset (demux->downloader);
demux->cancelled = FALSE;
}
-#ifndef GST_DISABLE_GST_DEBUG
-static GstClockTime
-gst_dash_demux_get_buffering_time (GstDashDemux * demux)
-{
- GstClockTime buffer_time = GST_CLOCK_TIME_NONE;
- GSList *iter;
-
- for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
- GstClockTime btime = gst_dash_demux_stream_get_buffering_time (iter->data);
-
- if (!GST_CLOCK_TIME_IS_VALID (buffer_time) || buffer_time > btime)
- buffer_time = btime;
- }
-
- if (!GST_CLOCK_TIME_IS_VALID (buffer_time))
- buffer_time = 0;
- return buffer_time;
-}
-
-static GstClockTime
-gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
-{
- GstDataQueueSize level;
-
- gst_data_queue_get_level (stream->queue, &level);
-
- return (GstClockTime) level.time;
-}
-#endif
-
static GstFlowReturn
gst_dash_demux_refresh_mpd (GstDashDemux * demux)
{
return GST_FLOW_OK;
}
+static GstFlowReturn
+gst_dash_demux_combine_flows (GstDashDemux * demux)
+{
+ gboolean all_notlinked = TRUE;
+ GSList *iter;
+
+ for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+ GstDashDemuxStream *stream = iter->data;
+
+ if (stream->last_ret != GST_FLOW_NOT_LINKED)
+ all_notlinked = FALSE;
+
+ if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
+ || stream->last_ret == GST_FLOW_FLUSHING)
+ return stream->last_ret;
+ }
+ if (all_notlinked)
+ return GST_FLOW_NOT_LINKED;
+ return GST_FLOW_OK;
+}
+
+
/* gst_dash_demux_stream_download_loop:
*
* Loop for the "download' task that fetches fragments based on the
* selected representations.
*
- * Startup:
- *
- * The task is started from the stream loop.
- *
* During playback:
*
* It sequentially fetches fragments corresponding to the current
- * representations and pushes them into a queue.
- *
- * It tries to maintain the number of queued items within a predefined
- * range: if the queue is full, it will pause, checking every 100 ms if
- * it needs to restart downloading fragments.
+ * representations and pushes them downstream
*
* When a new set of fragments has been downloaded, it evaluates the
* download time to check if we can or should switch to a different
*
* The task will exit when it encounters an error or when the end of the
* manifest has been reached.
- *
*/
static void
gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
{
- GstClockTime fragment_ts = GST_CLOCK_TIME_NONE;
GstDashDemux *demux = stream->demux;
GstFlowReturn flow_ret = GST_FLOW_OK;
+ GstClockTime fragment_ts = GST_CLOCK_TIME_NONE;
+ GstEvent *caps_event;
GST_LOG_OBJECT (stream->pad, "Starting download loop");
GST_DASH_DEMUX_CLIENT_LOCK (demux);
- if (stream->last_ret < GST_FLOW_OK) {
- if (demux->cancelled) {
- GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
- goto cancelled;
- }
- GST_DEBUG_OBJECT (stream->pad, "Download loop waiting due to flow return: "
- "%d %s", stream->last_ret, gst_flow_get_name (stream->last_ret));
- gst_task_pause (stream->download_task);
- GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
- return;
- }
- GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
-
if (demux->cancelled) {
goto cancelled;
}
- GST_DASH_DEMUX_CLIENT_LOCK (demux);
if (gst_mpd_client_is_live (demux->client)
&& demux->client->mpd_uri != NULL) {
switch (gst_dash_demux_refresh_mpd (demux)) {
}
}
- GST_DEBUG_OBJECT (stream->pad, "End of manifest: %d", demux->end_of_manifest);
-
/* try to switch to another set of representations if needed */
- gst_dash_demux_stream_select_representation_unlocked (stream);
+ caps_event = gst_dash_demux_stream_select_representation_unlocked (stream);
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
+ if (G_UNLIKELY (caps_event && !gst_pad_push_event (stream->pad, caps_event))) {
+ /* TODO fail if caps is rejected */
+ }
+
/* fetch the next fragment */
- flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_ts);
+ flow_ret = gst_dash_demux_stream_get_next_fragment (stream, &fragment_ts);
+ GST_DASH_DEMUX_CLIENT_LOCK (demux);
+ if (demux->cancelled) {
+ goto cancelled;
+ }
+
+ stream->last_ret = flow_ret;
switch (flow_ret) {
case GST_FLOW_OK:
break;
+ case GST_FLOW_NOT_LINKED:
+ gst_task_pause (stream->download_task);
+ if (gst_dash_demux_combine_flows (demux) == GST_FLOW_NOT_LINKED) {
+ GST_ELEMENT_ERROR (demux, STREAM, FAILED,
+ (_("Internal data stream error.")),
+ ("stream stopped, reason %s",
+ gst_flow_get_name (GST_FLOW_NOT_LINKED)));
+ }
+ break;
+
+ case GST_FLOW_FLUSHING:
+ gst_dash_demux_stop (demux);
+ break;
+
case GST_FLOW_EOS:
- GST_DASH_DEMUX_CLIENT_LOCK (demux);
+ GST_DEBUG_OBJECT (stream->pad, "EOS");
+ stream->stream_eos = TRUE;
if (gst_dash_demux_all_streams_eop (demux)) {
GST_INFO_OBJECT (stream->pad, "Reached the end of the Period");
if (gst_mpd_client_has_next_period (demux->client)) {
+
+ GST_INFO_OBJECT (demux, "Starting next period");
/* setup video, audio and subtitle streams, starting from the next Period */
if (!gst_mpd_client_set_period_index (demux->client,
gst_mpd_client_get_period_index (demux->client) + 1)
|| !gst_dash_demux_setup_all_streams (demux)) {
}
+
+ stream->last_ret = GST_FLOW_OK;
/* start playing from the first segment of the new period */
gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
- demux->end_of_period = FALSE;
- gst_task_start (demux->stream_task);
- } else if (!demux->end_of_manifest) {
- GST_INFO_OBJECT (stream->pad, "Reached the end of the manifest file");
- demux->end_of_manifest = TRUE;
- gst_task_start (demux->stream_task);
+
+ gst_dash_demux_advance_period (demux);
+
+ gst_dash_demux_resume_download_task (demux);
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
- goto end_of_manifest;
+
+ /* This pad is now finished, we lost its reference */
+ return;
}
}
gst_task_pause (stream->download_task);
- GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
break;
case GST_FLOW_ERROR:
/* Download failed 'by itself'
break;
}
- if (demux->cancelled) {
- goto cancelled;
- }
-
- GST_INFO_OBJECT (stream->pad, "Internal buffering : %" G_GUINT64_FORMAT " s",
- gst_dash_demux_get_buffering_time (demux) / GST_SECOND);
- demux->client->update_failed_count = 0;
+ if (G_LIKELY (stream->last_ret != GST_FLOW_ERROR))
+ demux->client->update_failed_count = 0;
quit:
+ GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
+
+ if (G_UNLIKELY (stream->last_ret == GST_FLOW_EOS)) {
+ gst_pad_push_event (stream->pad, gst_event_new_eos ());
+ }
+
GST_DEBUG_OBJECT (stream->pad, "Finishing download loop");
return;
cancelled:
{
GST_WARNING_OBJECT (stream->pad, "Cancelled, leaving download task");
+ GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
return;
}
{
GST_INFO_OBJECT (stream->pad, "End of manifest, leaving download task");
gst_task_stop (stream->download_task);
+ /* TODO should push eos? */
+ GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
return;
}
GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
("Could not fetch the next fragment, leaving download task"), (NULL));
gst_task_stop (stream->download_task);
+ GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
return;
}
}
static void
-gst_dash_demux_resume_stream_task (GstDashDemux * demux)
-{
- gst_task_start (demux->stream_task);
-}
-
-static void
gst_dash_demux_resume_download_task (GstDashDemux * demux)
{
GSList *iter;
* Select the most appropriate media representation based on current target
* bitrate.
*/
-static gboolean
+static GstEvent *
gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
stream)
{
stream->index, new_index, rep->bandwidth);
if (gst_mpd_client_setup_representation (demux->client, active_stream, rep)) {
stream->need_header = TRUE;
- stream->has_data_queued = FALSE;
GST_INFO_OBJECT (demux, "Switching bitrate to %d",
active_stream->cur_representation->bandwidth);
gst_caps_unref (stream->input_caps);
stream->input_caps = gst_dash_demux_get_input_caps (demux, active_stream);
- gst_dash_demux_stream_push_event (stream,
- gst_event_new_caps (stream->input_caps));
- return TRUE;
+ return gst_event_new_caps (gst_caps_ref (stream->input_caps));
} else {
GST_WARNING_OBJECT (demux, "Can not switch representation, aborting...");
- return FALSE;
+ return NULL;
}
}
- return FALSE;
+ return NULL;
}
static GstBuffer *
}
}
-static gboolean
-gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
+static GstBuffer *
+gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
GstDashDemuxStream * stream, guint64 * size_buffer,
GstClockTime * download_time)
{
GTimeVal now;
GTimeVal start;
guint stream_idx = stream->index;
- GstBuffer *buffer;
+ GstBuffer *buffer = NULL;
GstBuffer *header_buffer;
GstMediaFragmentInfo fragment;
*/
stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
- gst_data_queue_set_flushing (stream->queue, FALSE);
stream->restart_download = FALSE;
- gst_task_start (demux->stream_task);
}
if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) {
- gboolean catch_up = FALSE;
-
g_get_current_time (&start);
- GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
- GST_INFO_OBJECT (demux,
+ GST_INFO_OBJECT (stream->pad,
"Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT " Range:%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
fragment.uri, GST_TIME_ARGS (fragment.timestamp),
if (download == NULL) {
gst_media_fragment_info_clear (&fragment);
- return FALSE;
+ return NULL;
}
active_stream = stream->active_stream;
if (active_stream == NULL) {
gst_media_fragment_info_clear (&fragment);
g_object_unref (download);
- return FALSE;
+ return NULL;
}
buffer = gst_fragment_get_buffer (download);
if (!uri) /* fallback to default media uri */
uri = fragment.uri;
- GST_DEBUG_OBJECT (demux,
+ GST_DEBUG_OBJECT (stream->pad,
"Fragment index download: %s %" G_GINT64_FORMAT "-%"
G_GINT64_FORMAT, uri, fragment.index_range_start,
fragment.index_range_end);
gst_mpd_client_get_segment_index (active_stream) - 1;
gst_media_fragment_info_clear (&fragment);
-
- /* Check if this stream is on catch up mode */
- if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
- GST_DEBUG_OBJECT (stream->pad,
- "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
- GST_TIME_ARGS (demux->segment.position),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
- if (GST_BUFFER_TIMESTAMP (buffer) < demux->segment.position) {
- catch_up = TRUE;
- } else {
- stream->last_ret = GST_FLOW_OK;
- gst_task_start (demux->stream_task);
- }
- }
-
*size_buffer += gst_buffer_get_size (buffer);
- if (catch_up) {
- GstFlowReturn ret;
-
- ret = gst_pad_push (stream->pad, buffer);
- if (G_LIKELY (ret == GST_FLOW_OK))
- stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
- /* TODO handle return */
- } else {
- gst_dash_demux_stream_push_data (stream, buffer);
- stream->has_data_queued = TRUE;
- }
} else {
GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
stream, stream->index);
}
- return TRUE;
+ return buffer;
}
-/* gst_dash_demux_get_next_fragment:
+/* gst_dash_demux_stream_get_next_fragment:
*
* Get the next fragments for the stream with the earlier timestamp.
* It returns the selected timestamp so the caller can deal with
* This function uses the generic URI downloader API.
*
* Returns FALSE if an error occured while downloading fragments
- *
*/
static GstFlowReturn
-gst_dash_demux_get_next_fragment (GstDashDemux * demux,
- GstDashDemuxStream * stream, GstClockTime * selected_ts)
+gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
+ GstClockTime * ts)
{
guint64 buffer_size = 0;
GstClockTime diff;
gboolean end_of_period = TRUE;
- GstClockTime ts;
+ GstBuffer *buffer = NULL;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstDashDemux *demux = stream->demux;
- if (stream->download_end_of_period)
+ if (stream->stream_eos)
return GST_FLOW_EOS;
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
return GST_FLOW_NOT_LINKED;
}
- if (gst_mpd_client_get_next_fragment_timestamp (demux->client,
- stream->index, &ts)) {
- } else {
- GstEvent *event = NULL;
-
+ if (!gst_mpd_client_get_next_fragment_timestamp (demux->client,
+ stream->index, ts)) {
GST_INFO_OBJECT (demux,
"This Period doesn't contain more fragments for stream %u",
stream->index);
end_of_period = FALSE;
return GST_FLOW_OK; /* TODO wait */
}
-
- if (gst_mpd_client_has_next_period (demux->client)) {
- event = gst_event_new_dash_eop ();
- } else {
- GST_DEBUG_OBJECT (demux,
- "No more fragments or periods for this stream, setting EOS");
- event = gst_event_new_eos ();
- }
- stream->download_end_of_period = TRUE;
- gst_dash_demux_stream_push_event (stream, event);
+ return GST_FLOW_EOS;
}
/*
* If this is a live stream, check the segment end time to make sure
* it is available to download
*/
- if (stream && gst_mpd_client_is_live (demux->client) &&
+ if (gst_mpd_client_is_live (demux->client) &&
demux->client->mpd_node->minimumUpdatePeriod != -1) {
gst_dash_demux_wait_for_fragment_to_be_available (demux, stream);
}
/* Get the fragment corresponding to each stream index */
- if (stream) {
- gst_dash_demux_get_next_fragment_for_stream (demux, stream, &buffer_size,
- &diff);
- end_of_period = FALSE;
- }
+ buffer =
+ gst_dash_demux_stream_download_fragment (demux, stream,
+ &buffer_size, &diff);
+ end_of_period = FALSE;
demux->end_of_period = end_of_period;
+
+ if (buffer) {
+ ret = gst_dash_demux_push (stream, buffer);
+ } else {
+ GST_WARNING_OBJECT (stream->pad, "Failed to download fragment");
+ return GST_FLOW_ERROR;
+ }
+
if (end_of_period)
return GST_FLOW_EOS;
G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000,
buffer_size / 1024, ((double) diff / GST_SECOND));
}
- return GST_FLOW_OK;
+ return ret;
}
static void