static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
-static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
+static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
+static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
gst_pad_set_event_function (mssdemux->sinkpad,
GST_DEBUG_FUNCPTR (gst_mss_demux_event));
gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
+
+ g_static_rec_mutex_init (&mssdemux->stream_lock);
+ mssdemux->stream_task =
+ gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
+ gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
+}
+
+static gboolean
+_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
+ guint64 time, gpointer checkdata)
+{
+ GstMssDemuxStream *stream = checkdata;
+ GstMssDemux *mssdemux = stream->parent;
+
+ if (mssdemux->data_queue_max_size == 0)
+ return FALSE; /* never full */
+ return visible >= mssdemux->data_queue_max_size;
}
static GstMssDemuxStream *
stream = g_new0 (GstMssDemuxStream, 1);
stream->downloader = gst_uri_downloader_new ();
+ stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
- /* Streaming task */
- g_static_rec_mutex_init (&stream->stream_lock);
- stream->stream_task =
- gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
- gst_task_set_lock (stream->stream_task, &stream->stream_lock);
+ /* Downloading task */
+ g_static_rec_mutex_init (&stream->download_lock);
+ stream->download_task =
+ gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
+ gst_task_set_lock (stream->download_task, &stream->download_lock);
stream->pad = srcpad;
stream->manifest_stream = manifeststream;
static void
gst_mss_demux_stream_free (GstMssDemuxStream * stream)
{
- if (stream->stream_task) {
- if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
+ if (stream->download_task) {
+ if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
- gst_task_stop (stream->stream_task);
- g_static_rec_mutex_lock (&stream->stream_lock);
- g_static_rec_mutex_unlock (&stream->stream_lock);
+ gst_task_stop (stream->download_task);
+ g_static_rec_mutex_lock (&stream->download_lock);
+ g_static_rec_mutex_unlock (&stream->download_lock);
GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
- gst_task_join (stream->stream_task);
+ gst_task_join (stream->download_task);
GST_LOG_OBJECT (stream->parent, "Finished");
}
- gst_object_unref (stream->stream_task);
- g_static_rec_mutex_free (&stream->stream_lock);
- stream->stream_task = NULL;
+ gst_object_unref (stream->download_task);
+ g_static_rec_mutex_free (&stream->download_lock);
+ stream->download_task = NULL;
}
if (stream->pending_newsegment) {
g_object_unref (stream->downloader);
stream->downloader = NULL;
}
+ if (stream->dataqueue) {
+ g_object_unref (stream->dataqueue);
+ stream->dataqueue = NULL;
+ }
if (stream->pad) {
gst_object_unref (stream->pad);
stream->pad = NULL;
gst_mss_demux_reset (GstMssDemux * mssdemux)
{
GSList *iter;
+
+ if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
+ gst_task_stop (mssdemux->stream_task);
+ g_static_rec_mutex_lock (&mssdemux->stream_lock);
+ g_static_rec_mutex_unlock (&mssdemux->stream_lock);
+ gst_task_join (mssdemux->stream_task);
+ }
+
if (mssdemux->manifest_buffer) {
gst_buffer_unref (mssdemux->manifest_buffer);
mssdemux->manifest_buffer = NULL;
static void
gst_mss_demux_dispose (GObject * object)
{
- /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
+ GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
+
+ if (mssdemux->stream_task) {
+ gst_object_unref (mssdemux->stream_task);
+ g_static_rec_mutex_free (&mssdemux->stream_lock);
+ mssdemux->stream_task = NULL;
+ }
G_OBJECT_CLASS (parent_class)->dispose (object);
}
GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- gst_task_start (stream->stream_task);
+ gst_task_start (stream->download_task);
}
+
+ gst_task_start (mssdemux->stream_task);
}
static gboolean
gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
{
GSList *iter;
+
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
+ gst_data_queue_set_flushing (stream->dataqueue, TRUE);
+
if (immediate)
gst_uri_downloader_cancel (stream->downloader);
- gst_task_pause (stream->stream_task);
+ gst_task_pause (stream->download_task);
}
+ gst_task_pause (mssdemux->stream_task);
+
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- g_static_rec_mutex_lock (&stream->stream_lock);
+ g_static_rec_mutex_lock (&stream->download_lock);
}
+ g_static_rec_mutex_lock (&mssdemux->stream_lock);
}
static void
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- g_static_rec_mutex_unlock (&stream->stream_lock);
+ g_static_rec_mutex_unlock (&stream->download_lock);
}
+ g_static_rec_mutex_unlock (&mssdemux->stream_lock);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- gst_task_start (stream->stream_task);
+ gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+ gst_task_start (stream->download_task);
}
+ gst_task_start (mssdemux->stream_task);
}
static gboolean
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
+ stream->eos = FALSE;
+ gst_data_queue_flush (stream->dataqueue);
stream->pending_newsegment = gst_event_ref (newsegment);
}
gst_event_unref (newsegment);
GSList *oldpads = NULL;
GSList *iter;
- gst_mss_demux_stop_tasks (mssdemux, FALSE);
+ gst_mss_demux_stop_tasks (mssdemux, TRUE);
if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
mssdemux->connection_speed)) {
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
GstPad *oldpad = stream->pad;
- GstClockTime ts =
- gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
+ GstClockTime ts = GST_CLOCK_TIME_NONE;
oldpads = g_slist_prepend (oldpads, oldpad);
+ /* since we are flushing the queue, get the next un-pushed timestamp to seek
+ * and avoid gaps */
+ gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+ if (!gst_data_queue_is_empty (stream->dataqueue)) {
+ GstDataQueueItem *item = NULL;
+
+ while (!gst_data_queue_is_empty (stream->dataqueue)
+ && !GST_CLOCK_TIME_IS_VALID (ts)) {
+ gst_data_queue_pop (stream->dataqueue, &item);
+
+ if (!item) {
+ g_assert_not_reached ();
+ break;
+ }
+
+ if (GST_IS_BUFFER (item->object)) {
+ GstBuffer *buffer = GST_BUFFER_CAST (item->object);
+
+ ts = GST_BUFFER_TIMESTAMP (buffer);
+ }
+ item->destroy (item);
+ }
+
+ }
+ if (!GST_CLOCK_TIME_IS_VALID (ts)) {
+ ts = gst_mss_stream_get_fragment_gst_timestamp
+ (stream->manifest_stream);
+ }
+
+ GST_DEBUG_OBJECT (mssdemux,
+ "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
+ GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
+ gst_mss_stream_seek (stream->manifest_stream, ts);
+ gst_data_queue_flush (stream->dataqueue);
+
stream->pad = _create_pad (mssdemux, stream->manifest_stream);
- /* TODO keep the same playback rate */
- stream->pending_newsegment =
- gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts);
gst_mss_demux_expose_stream (mssdemux, stream);
gst_pad_push_event (oldpad, gst_event_new_eos ());
gst_mss_demux_restart_tasks (mssdemux);
}
+static void
+_free_data_queue_item (gpointer obj)
+{
+ GstDataQueueItem *item = obj;
+
+ gst_mini_object_unref (item->object);
+ g_slice_free (GstDataQueueItem, item);
+}
+
+static void
+gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
+ GstMiniObject * obj)
+{
+ GstDataQueueItem *item;
+
+ item = g_slice_new (GstDataQueueItem);
+ item->object = (GstMiniObject *) obj;
+
+ item->duration = 0; /* we don't care */
+ item->size = 0;
+ item->visible = TRUE;
+
+ item->destroy = (GDestroyNotify) _free_data_queue_item;
+
+ if (!gst_data_queue_push (stream->dataqueue, item)) {
+ GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
+ gst_mini_object_unref (obj);
+ g_slice_free (GstDataQueueItem, item);
+ }
+}
+
static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
GstBuffer ** buffer)
GST_BUFFER_DURATION (_buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
- *buffer = _buffer;
+ if (buffer)
+ *buffer = _buffer;
+
+ if (_buffer) {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT,
+ stream, GST_PAD_NAME (stream->pad),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)));
+ gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
+ }
+
return ret;
no_url_error:
GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
(_("Failed to get fragment URL.")),
("An error happened when getting fragment URL"));
- gst_task_stop (stream->stream_task);
+ gst_task_stop (stream->download_task);
return GST_FLOW_ERROR;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
- gst_task_stop (stream->stream_task);
+ gst_task_stop (stream->download_task);
return GST_FLOW_ERROR;
}
}
static void
-gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
+gst_mss_demux_download_loop (GstMssDemuxStream * stream)
{
GstMssDemux *mssdemux = stream->parent;
GstBuffer *buffer = NULL;
GstFlowReturn ret;
+ GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
+
+
+ ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+ switch (ret) {
+ case GST_FLOW_OK:
+ break; /* all is good, let's go */
+ case GST_FLOW_UNEXPECTED: /* EOS */
+ goto eos;
+ case GST_FLOW_ERROR:
+ goto error;
+ default:
+ break;
+ }
+
+ g_assert (buffer != NULL);
+
+ gst_mss_stream_advance_fragment (stream->manifest_stream);
+ GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
+ return;
+
+eos:
+ {
+ GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
+ GST_DEBUG_PAD_NAME (stream->pad));
+ gst_mss_demux_stream_store_object (stream,
+ GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
+ gst_task_stop (stream->download_task);
+ return;
+ }
+error:
+ {
+ GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
+ gst_task_stop (stream->download_task);
+ return;
+ }
+}
+
+static GstFlowReturn
+gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
+ GstMssDemuxStream ** stream)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstMssDemuxStream *current = NULL;
+ GstClockTime cur_time = GST_CLOCK_TIME_NONE;
+ GSList *iter;
+
+ if (!mssdemux->streams)
+ return GST_FLOW_ERROR;
+
+ for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+ GstClockTime time;
+ GstMssDemuxStream *other;
+ GstDataQueueItem *item;
+
+ other = iter->data;
+ if (other->eos) {
+ continue;
+ }
+
+ if (gst_data_queue_peek (other->dataqueue, &item)) {
+ } else {
+ /* flushing */
+ return GST_FLOW_WRONG_STATE;
+ }
+
+ if (GST_IS_EVENT (item->object)) {
+ /* events have higher priority */
+ current = other;
+ break;
+ }
+ time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
+ if (time < cur_time) {
+ cur_time = time;
+ current = other;
+ }
+ }
+
+ *stream = current;
+ if (current == NULL)
+ ret = GST_FLOW_UNEXPECTED;
+ return ret;
+}
+
+static void
+gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
+{
+ GstMssDemuxStream *stream = NULL;
+ GstFlowReturn ret;
+ GstMiniObject *object = NULL;
+ GstDataQueueItem *item = NULL;
+
+ GST_LOG_OBJECT (mssdemux, "Starting stream loop");
+
GST_OBJECT_LOCK (mssdemux);
if (mssdemux->update_bitrates) {
mssdemux->update_bitrates = FALSE;
GST_DEBUG_OBJECT (mssdemux,
"Starting streams reconfiguration due to bitrate changes");
- g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE,
- NULL);
+ gst_mss_demux_reconfigure (mssdemux);
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
- gst_task_stop (stream->stream_task);
- return;
} else {
GST_OBJECT_UNLOCK (mssdemux);
}
- ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+ ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
+
+ if (stream)
+ GST_DEBUG_OBJECT (mssdemux,
+ "Stream loop selected %p stream of pad %s. %d - %s", stream,
+ GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
+ else
+ GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
+ gst_flow_get_name (ret));
+
switch (ret) {
case GST_FLOW_OK:
- break; /* all is good, let's go */
- case GST_FLOW_UNEXPECTED: /* EOS */
- goto eos;
+ break;
case GST_FLOW_ERROR:
goto error;
+ case GST_FLOW_UNEXPECTED:
+ goto eos;
+ case GST_FLOW_WRONG_STATE:
+ GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
+ goto stop;
default:
- break;
+ g_assert_not_reached ();
}
- g_assert (buffer != NULL);
+ GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
+ stream, GST_PAD_NAME (stream->pad));
+ if (gst_data_queue_pop (stream->dataqueue, &item)) {
+ if (item->object)
+ object = gst_mini_object_ref (item->object);
+ item->destroy (item);
+ } else {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Failed to get object from dataqueue on stream %p %s", stream,
+ GST_PAD_NAME (stream->pad));
+ goto stop;
+ }
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
}
- GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
- GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
- ret = gst_pad_push (stream->pad, buffer);
+ if (G_LIKELY (GST_IS_BUFFER (object))) {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
+ GST_PAD_NAME (stream->pad));
+ ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
+ } else if (GST_IS_EVENT (object)) {
+ if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
+ stream->eos = TRUE;
+ GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
+ GST_PAD_NAME (stream->pad));
+ gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
+ } else {
+ g_return_if_reached ();
+ }
+
switch (ret) {
case GST_FLOW_UNEXPECTED:
goto eos; /* EOS ? */
break;
}
- gst_mss_stream_advance_fragment (stream->manifest_stream);
+ GST_LOG_OBJECT (mssdemux, "Stream loop end");
return;
eos:
{
- GstEvent *eos = gst_event_new_eos ();
- GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
- GST_DEBUG_PAD_NAME (stream->pad));
- gst_pad_push_event (stream->pad, eos);
- gst_task_stop (stream->stream_task);
+ GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
+ gst_task_stop (mssdemux->stream_task);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
- gst_task_stop (stream->stream_task);
+ gst_task_stop (mssdemux->stream_task);
+ return;
+ }
+stop:
+ {
+ GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task");
+ gst_task_stop (mssdemux->stream_task);
return;
}
}