mssdemux->n_videos = mssdemux->n_audios = 0;
g_free (mssdemux->base_url);
+ g_free (mssdemux->manifest_uri);
mssdemux->base_url = NULL;
}
GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
break;
}
- case GST_QUERY_LATENCY:
- gst_query_set_latency (query, FALSE, 0, -1);
+ case GST_QUERY_LATENCY:{
+ gboolean live = FALSE;
+
+ live = mssdemux->manifest
+ && gst_mss_manifest_is_live (mssdemux->manifest);
+
+ gst_query_set_latency (query, live, 0, -1);
ret = TRUE;
break;
+ }
case GST_QUERY_SEEKING:{
GstFormat fmt;
gint64 stop = -1;
+ if (mssdemux->manifest && gst_mss_manifest_is_live (mssdemux->manifest)) {
+ return FALSE; /* no live seeking */
+ }
+
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
fmt);
gst_query_parse_uri (query, &uri);
GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
+ mssdemux->manifest_uri = g_strdup (uri);
baseurl_end = g_strrstr (uri, "/Manifest");
if (baseurl_end) {
/* set the new end of the string */
return FALSE;
}
+ GST_INFO_OBJECT (mssdemux, "Live stream: %d",
+ gst_mss_manifest_is_live (mssdemux->manifest));
+
gst_mss_demux_create_streams (mssdemux);
for (iter = mssdemux->streams; iter;) {
GSList *current = iter;
return TRUE;
}
+static void
+gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
+{
+ GstUriDownloader *downloader;
+ GstFragment *manifest_data;
+ GstBuffer *manifest_buffer;
+
+ downloader = gst_uri_downloader_new ();
+
+ manifest_data =
+ gst_uri_downloader_fetch_uri (downloader, mssdemux->manifest_uri);
+ manifest_buffer = gst_fragment_get_buffer (manifest_data);
+ g_object_unref (manifest_data);
+
+ gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer);
+ gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer);
+ gst_buffer_unref (manifest_buffer);
+
+ g_object_unref (downloader);
+}
+
static void
gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
{
case GST_FLOW_OK:
break; /* all is good, let's go */
case GST_FLOW_UNEXPECTED: /* EOS */
+ gst_mss_demux_reload_manifest (mssdemux);
+ return GST_FLOW_OK;
return GST_FLOW_UNEXPECTED;
case GST_FLOW_ERROR:
goto error;
url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
+ GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
+
fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
g_free (path);
g_free (url);
if (!fragment) {
GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
/* TODO check if we are truly stoping */
+ if (gst_mss_manifest_is_live (mssdemux->manifest)) {
+ /* looks like there is no way of knowing when a live stream has ended
+ * Have to assume we are falling behind and cause a manifest reload */
+ return GST_FLOW_OK;
+ }
return GST_FLOW_ERROR;
}
if (_buffer) {
GST_DEBUG_OBJECT (mssdemux,
- "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT,
+ "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
+ " Duration: %" GST_TIME_FORMAT,
stream, GST_PAD_NAME (stream->pad),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)));
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
}
break;
}
- g_assert (buffer != NULL);
-
- gst_mss_stream_advance_fragment (stream->manifest_stream);
+ if (buffer) {
+ gst_mss_stream_advance_fragment (stream->manifest_stream);
+ }
GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
return;
}
if (G_LIKELY (GST_IS_BUFFER (object))) {
+ if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
+ GST_ERROR_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
+ GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
+ GST_TIME_ARGS (stream->next_timestamp));
+ GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
+ }
+
GST_DEBUG_OBJECT (mssdemux,
- "Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object,
+ "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
+ " discont:%d on pad %s", object,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
+ GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
GST_PAD_NAME (stream->pad));
+
+ stream->next_timestamp =
+ GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
+
ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
} else if (GST_IS_EVENT (object)) {
if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
GstEvent *pending_newsegment;
+ GstClockTime next_timestamp;
+
/* Downloading task */
GstTask *download_task;
GStaticRecMutex download_lock;
GstMssManifest *manifest;
gchar *base_url;
+ gchar *manifest_uri;
GSList *streams;
guint n_videos;
#include <glib.h>
#include <string.h>
+#include <stdio.h>
+#include <ctype.h>
#include <libxml/parser.h>
#include <libxml/tree.h>
#define MSS_PROP_TIMESCALE "TimeScale"
#define MSS_PROP_URL "Url"
+#define TO_LOWER(str) { char* p = str; for ( ; *p; ++p) *p = tolower(*p); }
+
/* TODO check if atoi is successful? */
typedef struct _GstMssStreamFragment
xmlDocPtr xml;
xmlNodePtr xmlrootnode;
+ gboolean is_live;
+
GSList *streams;
};
/* we reverse it later */
stream->fragments = g_list_prepend (stream->fragments, fragment);
-
} else if (node_has_type (iter, MSS_NODE_STREAM_QUALITY)) {
GstMssStreamQuality *quality = gst_mss_stream_quality_new (iter);
stream->qualities = g_list_prepend (stream->qualities, quality);
GstMssManifest *manifest;
xmlNodePtr root;
xmlNodePtr nodeiter;
+ gchar *live_str;
manifest = g_malloc0 (sizeof (GstMssManifest));
GST_BUFFER_SIZE (data), "manifest", NULL, 0);
root = manifest->xmlrootnode = xmlDocGetRootElement (manifest->xml);
+ live_str = (gchar *) xmlGetProp (root, (xmlChar *) "IsLive");
+ if (live_str) {
+ TO_LOWER (live_str);
+ manifest->is_live = strcmp (live_str, "true") == 0;
+ }
+
for (nodeiter = root->children; nodeiter; nodeiter = nodeiter->next) {
if (nodeiter->type == XML_ELEMENT_NODE
&& (strcmp ((const char *) nodeiter->name, "StreamIndex") == 0)) {
return bitrate;
}
+gboolean
+gst_mss_manifest_is_live (GstMssManifest * manifest)
+{
+ return manifest->is_live;
+}
+
+static void
+gst_mss_stream_reload_fragments (GstMssStream * stream, xmlNodePtr streamIndex)
+{
+ xmlNodePtr iter;
+ GList *new_fragments = NULL;
+ GstMssStreamFragment *previous_fragment = NULL;
+ GstMssStreamFragment *current_fragment =
+ stream->current_fragment ? stream->current_fragment->data : NULL;
+ guint64 current_time = gst_mss_stream_get_fragment_gst_timestamp (stream);
+ guint fragment_number = 0;
+ guint64 fragment_time_accum = 0;
+
+ if (!current_fragment && stream->fragments) {
+ current_fragment = g_list_last (stream->fragments)->data;
+ } else if (g_list_previous (stream->current_fragment)) {
+ /* rewind one as this is the next to be pushed */
+ current_fragment = g_list_previous (stream->current_fragment)->data;
+ } else {
+ current_fragment = NULL;
+ }
+
+ if (current_fragment) {
+ current_time = current_fragment->time;
+ fragment_number = current_fragment->number;
+ fragment_time_accum = current_fragment->time;
+ }
+
+ for (iter = streamIndex->children; iter; iter = iter->next) {
+ if (node_has_type (iter, MSS_NODE_STREAM_FRAGMENT)) {
+ gchar *duration_str;
+ gchar *time_str;
+ gchar *seqnum_str;
+ GstMssStreamFragment *fragment = g_new (GstMssStreamFragment, 1);
+
+ duration_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_DURATION);
+ time_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_TIME);
+ seqnum_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_NUMBER);
+
+ /* use the node's seq number or use the previous + 1 */
+ if (seqnum_str) {
+ fragment->number = g_ascii_strtoull (seqnum_str, NULL, 10);
+ g_free (seqnum_str);
+ } else {
+ fragment->number = fragment_number;
+ }
+ fragment_number = fragment->number + 1;
+
+ if (time_str) {
+ fragment->time = g_ascii_strtoull (time_str, NULL, 10);
+ g_free (time_str);
+ fragment_time_accum = fragment->time;
+ } else {
+ fragment->time = fragment_time_accum;
+ }
+
+ /* if we have a previous fragment, means we need to set its duration */
+ if (previous_fragment)
+ previous_fragment->duration = fragment->time - previous_fragment->time;
+
+ if (duration_str) {
+ fragment->duration = g_ascii_strtoull (duration_str, NULL, 10);
+
+ previous_fragment = NULL;
+ fragment_time_accum += fragment->duration;
+ g_free (duration_str);
+ } else {
+ /* store to set the duration at the next iteration */
+ previous_fragment = fragment;
+ }
+
+ if (fragment->time > current_time) {
+ new_fragments = g_list_append (new_fragments, fragment);
+ } else {
+ previous_fragment = NULL;
+ g_free (fragment);
+ }
+
+ } else {
+ /* TODO gst log this */
+ }
+ }
+
+ /* store the new fragments list */
+ if (new_fragments) {
+ g_list_free_full (stream->fragments, g_free);
+ stream->fragments = new_fragments;
+ stream->current_fragment = new_fragments;
+ }
+}
+
+static void
+gst_mss_manifest_reload_fragments_from_xml (GstMssManifest * manifest,
+ xmlNodePtr root)
+{
+ xmlNodePtr nodeiter;
+ GSList *streams = manifest->streams;
+
+ /* we assume the server is providing the streams in the same order in
+ * every manifest */
+ for (nodeiter = root->children; nodeiter && streams;
+ nodeiter = nodeiter->next) {
+ if (nodeiter->type == XML_ELEMENT_NODE
+ && (strcmp ((const char *) nodeiter->name, "StreamIndex") == 0)) {
+ gst_mss_stream_reload_fragments (streams->data, nodeiter);
+ streams = g_slist_next (streams);
+ }
+ }
+}
+
+void
+gst_mss_manifest_reload_fragments (GstMssManifest * manifest, GstBuffer * data)
+{
+ xmlDocPtr xml;
+ xmlNodePtr root;
+
+ g_return_if_fail (manifest->is_live);
+
+ xml = xmlReadMemory ((const gchar *) GST_BUFFER_DATA (data),
+ GST_BUFFER_SIZE (data), "manifest", NULL, 0);
+ root = xmlDocGetRootElement (xml);
+
+ gst_mss_manifest_reload_fragments_from_xml (manifest, root);
+
+ xmlFreeDoc (xml);
+}
+
static gboolean
gst_mss_stream_select_bitrate (GstMssStream * stream, guint64 bitrate)
{
gboolean gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time);
gboolean gst_mss_manifest_change_bitrate (GstMssManifest *manifest, guint64 bitrate);
guint64 gst_mss_manifest_get_current_bitrate (GstMssManifest * manifest);
+gboolean gst_mss_manifest_is_live (GstMssManifest * manifest);
+void gst_mss_manifest_reload_fragments (GstMssManifest * manifest, GstBuffer * data);
GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream);
GstCaps * gst_mss_stream_get_caps (GstMssStream * stream);