stream->downloader = gst_uri_downloader_new ();
stream->dataqueue =
gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream);
+ g_mutex_init (&stream->mutex);
/* Downloading task */
g_rec_mutex_init (&stream->download_lock);
}
if (stream->caps)
gst_caps_unref (stream->caps);
+ g_mutex_clear (&stream->mutex);
g_free (stream);
}
GstMssDemuxStream *stream = iter->data;
stream->eos = FALSE;
+ if (flags & GST_SEEK_FLAG_FLUSH) {
+ stream->last_ret = GST_FLOW_OK;
+ }
gst_data_queue_flush (stream->dataqueue);
gst_event_replace (&stream->pending_newsegment, newsegment);
}
gst_event_unref (event);
return TRUE;
}
+ case GST_EVENT_RECONFIGURE:{
+ GSList *iter;
+
+ for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+ GstMssDemuxStream *stream = iter->data;
+
+ if (stream->pad == pad) {
+ GST_MSS_DEMUX_STREAM_LOCK (stream);
+ if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED
+ && stream->last_ret == GST_FLOW_NOT_LINKED) {
+ stream->restart_download = TRUE;
+ gst_task_start (stream->download_task);
+ }
+ GST_MSS_DEMUX_STREAM_UNLOCK (stream);
+ gst_event_unref (event);
+ return TRUE;
+ }
+ }
+ }
+ break;
default:
break;
}
static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
- gboolean * buffer_downloaded)
+ GstBuffer ** _buffer)
{
GstMssDemux *mssdemux = stream->parent;
gchar *path;
gchar *url;
GstFragment *fragment;
- GstBuffer *_buffer;
+ GstBuffer *buffer;
GstFlowReturn ret = GST_FLOW_OK;
guint64 before_download, after_download;
+ /* special case for not-linked streams */
+ if (stream->last_ret == GST_FLOW_NOT_LINKED) {
+ GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p",
+ stream);
+ return GST_FLOW_NOT_LINKED;
+ }
+
before_download = g_get_real_time ();
GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
return GST_FLOW_ERROR;
}
- _buffer = gst_fragment_get_buffer (fragment);
- _buffer = gst_buffer_make_writable (_buffer);
- GST_BUFFER_TIMESTAMP (_buffer) =
+ buffer = gst_fragment_get_buffer (fragment);
+ *_buffer = buffer = gst_buffer_make_writable (buffer);
+ GST_BUFFER_TIMESTAMP (buffer) =
gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
- GST_BUFFER_DURATION (_buffer) =
+ GST_BUFFER_DURATION (buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
g_object_unref (fragment);
- if (buffer_downloaded)
- *buffer_downloaded = _buffer != NULL;
-
after_download = g_get_real_time ();
if (_buffer) {
#ifndef GST_DISABLE_GST_DEBUG
- guint64 bitrate = (8 * gst_buffer_get_size (_buffer) * 1000000LLU) /
+ guint64 bitrate = (8 * gst_buffer_get_size (buffer) * 1000000LLU) /
(after_download - before_download);
#endif
"Measured download bitrate: %s %" G_GUINT64_FORMAT " bps",
GST_PAD_NAME (stream->pad), bitrate);
gst_download_rate_add_rate (&stream->download_rate,
- gst_buffer_get_size (_buffer),
+ gst_buffer_get_size (buffer),
1000 * (after_download - before_download));
-
- GST_DEBUG_OBJECT (mssdemux,
- "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
- " Duration: %" GST_TIME_FORMAT,
- stream, GST_PAD_NAME (stream->pad),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
- gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
}
return ret;
gst_mss_demux_download_loop (GstMssDemuxStream * stream)
{
GstMssDemux *mssdemux = stream->parent;
- gboolean buffer_downloaded = FALSE;
GstFlowReturn ret;
+ GstBuffer *buffer = NULL;
+ gboolean buffer_downloaded = FALSE;
+ GstEvent *gap = NULL;
GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
GST_OBJECT_LOCK (mssdemux);
+ if (G_UNLIKELY (stream->restart_download)) {
+ GstClockTime cur, ts;
+ gint64 pos;
+
+ GST_MSS_DEMUX_STREAM_LOCK (stream);
+
+ GST_DEBUG_OBJECT (mssdemux,
+ "Activating stream %p due to reconfigure " "event", stream);
+
+ cur = GST_CLOCK_TIME_IS_VALID (stream->next_timestamp) ?
+ stream->next_timestamp : 0;
+
+ if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
+ ts = (GstClockTime) pos;
+ GST_DEBUG_OBJECT (mssdemux, "Downstream position: %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+ } else {
+ GST_DEBUG_OBJECT (mssdemux, "Downstream position query failed, "
+ "failling back to segment position");
+ ts = mssdemux->segment.position;
+ }
+
+ /* we might have already pushed this data */
+ ts = MAX (ts, stream->next_timestamp);
+
+ GST_DEBUG_OBJECT (mssdemux, "Restarting stream %p %s:%s at "
+ "position %" GST_TIME_FORMAT ", catching up until segment position %"
+ GST_TIME_FORMAT, stream, GST_DEBUG_PAD_NAME (stream->pad),
+ GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position));
+
+ if (GST_CLOCK_TIME_IS_VALID (ts)) {
+ gst_mss_stream_seek (stream->manifest_stream, ts);
+
+ if (cur < ts) {
+ gap = gst_event_new_gap (cur, ts - cur);
+ }
+ }
+
+ /* This stream might be entering into catching up mode,
+ * meaning that it will push buffers from this same download thread
+ * until it reaches the segment position.
+ *
+ * The reason for this is that in case of stream switching, the other
+ * stream that was previously active might be blocking the stream_loop
+ * in case it is ahead enough that all queues are filled.
+ * In this case, it is possible that a downstream input-selector is
+ * blocking waiting for the currently active stream to reach the
+ * same position of the old linked stream because of the 'sync-streams'
+ * behavior.
+ *
+ * We can push from this thread up to segment position as all other
+ * streams should be around the same timestamp.
+ */
+ stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+ stream->eos = FALSE;
+
+ gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+ stream->restart_download = FALSE;
+ gst_task_start (mssdemux->stream_task);
+ GST_MSS_DEMUX_STREAM_UNLOCK (stream);
+ }
+
GST_DEBUG_OBJECT (mssdemux,
"Starting streams reconfiguration due to bitrate changes");
gst_mss_demux_reconfigure_stream (stream);
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
GST_OBJECT_UNLOCK (mssdemux);
- ret = gst_mss_demux_stream_download_fragment (stream, &buffer_downloaded);
+ if (gap != NULL)
+ gst_pad_push_event (stream->pad, gap);
+
+ ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+ buffer_downloaded = buffer != NULL;
- if (stream->cancelled)
+ if (stream->cancelled) {
+ if (buffer)
+ gst_buffer_unref (buffer);
goto cancelled;
+ }
+
+ if (buffer) {
+ gboolean catch_up = FALSE;
+
+ /* Check if this stream is on catch up mode */
+ if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (mssdemux->segment.position),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
+ if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) {
+ catch_up = TRUE;
+ } else {
+ GST_OBJECT_LOCK (mssdemux);
+ stream->last_ret = GST_FLOW_OK;
+ gst_task_start (mssdemux->stream_task);
+ GST_OBJECT_UNLOCK (mssdemux);
+ }
+ }
+
+ GST_DEBUG_OBJECT (mssdemux,
+ "%s buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
+ " Duration: %" GST_TIME_FORMAT,
+ catch_up ? "Catch up push for" : "Storing", stream,
+ GST_PAD_NAME (stream->pad),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+ if (catch_up) {
+ ret = stream->last_ret = gst_pad_push (stream->pad, buffer);
+ if (G_LIKELY (ret == GST_FLOW_OK))
+ stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+ /* TODO handle return */
+ } else {
+ gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer));
+ }
+ }
switch (ret) {
case GST_FLOW_OK:
goto eos;
case GST_FLOW_ERROR:
goto error;
+ case GST_FLOW_NOT_LINKED:
+ goto notlinked;
default:
break;
}
GST_DEBUG_PAD_NAME (stream->pad));
gst_mss_demux_stream_store_object (stream,
GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
+ GST_OBJECT_LOCK (mssdemux);
gst_task_pause (stream->download_task);
+ gst_task_start (mssdemux->stream_task);
+ GST_OBJECT_UNLOCK (mssdemux);
return;
}
error:
gst_task_pause (stream->download_task);
return;
}
+notlinked:
+ {
+ GST_MSS_DEMUX_STREAM_LOCK (stream);
+ if (stream->last_ret == GST_FLOW_NOT_LINKED) {
+ gst_task_pause (stream->download_task);
+ gst_data_queue_set_flushing (stream->dataqueue, TRUE);
+ }
+ GST_MSS_DEMUX_STREAM_UNLOCK (stream);
+ }
}
static GstFlowReturn
GstDataQueueItem *item;
other = iter->data;
- if (other->eos) {
+ if (other->eos || other->last_ret == GST_FLOW_NOT_LINKED) {
+ GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d",
+ other, other->eos, other->last_ret);
continue;
}
if (!gst_data_queue_peek (other->dataqueue, &item)) {
/* flushing */
+ if (other->last_ret == GST_FLOW_NOT_LINKED) {
+ /* might have been unlinked and won't receive data for now */
+ continue;
+ }
return GST_FLOW_FLUSHING;
}
return ret;
}
+static GstFlowReturn
+gst_mss_demux_combine_flows (GstMssDemux * mssdemux)
+{
+ gboolean all_notlinked = TRUE;
+ GSList *iter;
+
+ for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+ GstMssDemuxStream *stream = iter->data;
+
+ if (stream->last_ret != GST_FLOW_NOT_LINKED)
+ all_notlinked = FALSE;
+
+ if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
+ || stream->last_ret == GST_FLOW_FLUSHING)
+ return stream->last_ret;
+ }
+ if (all_notlinked)
+ return GST_FLOW_NOT_LINKED;
+ return GST_FLOW_OK;
+}
+
static void
gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
{
GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
gst_flow_get_name (ret));
+ /* Lock as this may change the tasks state */
+ GST_OBJECT_LOCK (mssdemux);
switch (ret) {
case GST_FLOW_OK:
break;
default:
g_assert_not_reached ();
}
+ GST_OBJECT_UNLOCK (mssdemux);
GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
stream, GST_PAD_NAME (stream->pad));
GST_DEBUG_OBJECT (mssdemux,
"Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
- " discont:%d on pad %s", object,
+ " discont:%d on pad %s:%s", object,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
- GST_PAD_NAME (stream->pad));
+ GST_DEBUG_PAD_NAME (stream->pad));
stream->next_timestamp =
GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
stream->have_data = TRUE;
- ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
+ mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object);
+ stream->last_ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
+ GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)",
+ GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
} else if (GST_IS_EVENT (object)) {
if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
stream->eos = TRUE;
GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
GST_PAD_NAME (stream->pad));
gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
+ stream->last_ret = GST_FLOW_EOS;
} else {
g_return_if_reached ();
}
+ /* Lock as this may change the tasks state */
+ GST_OBJECT_LOCK (mssdemux);
+ ret = gst_mss_demux_combine_flows (mssdemux);
switch (ret) {
case GST_FLOW_EOS:
goto eos; /* EOS ? */
case GST_FLOW_ERROR:
goto error;
case GST_FLOW_NOT_LINKED:
- break; /* TODO what to do here? pause the task or just keep pushing? */
+ /* stream won't download any more data until it gets a reconfigure */
+ break;
case GST_FLOW_OK:
default:
break;
}
+ GST_OBJECT_UNLOCK (mssdemux);
GST_LOG_OBJECT (mssdemux, "Stream loop end");
return;
{
GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
gst_task_pause (mssdemux->stream_task);
+ GST_OBJECT_UNLOCK (mssdemux);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
gst_task_pause (mssdemux->stream_task);
+ GST_OBJECT_UNLOCK (mssdemux);
return;
}
stop:
{
GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task");
gst_task_pause (mssdemux->stream_task);
+ GST_OBJECT_UNLOCK (mssdemux);
return;
}
}