GstEvent * event)
{
GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
- GstAdaptiveDemuxClass *demux_class;
gboolean ret;
switch (event->type) {
- case GST_EVENT_FLUSH_STOP:
+ case GST_EVENT_FLUSH_STOP:{
GST_API_LOCK (demux);
GST_MANIFEST_LOCK (demux);
GST_API_UNLOCK (demux);
return ret;
+ }
case GST_EVENT_EOS:{
+ GstAdaptiveDemuxClass *demux_class;
GstQuery *query;
gboolean query_res;
gboolean ret = TRUE;
if (!gst_adaptive_demux_prepare_stream (demux,
GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
/* TODO act on error */
+ GST_FIXME_OBJECT (stream->pad,
+ "Do something on failure to expose stream");
}
if (first_and_live) {
- /* TODO we only need the first timestamp, maybe create a simple function */
+ /* TODO we only need the first timestamp, maybe create a simple function to
+ * get the current PTS of a fragment ? */
+ GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
gst_adaptive_demux_stream_update_fragment_info (demux, stream);
if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
GList *pending_events = NULL;
+ /* FIXME :
+ * This is duplicating *exactly* the same thing as what is done at the beginning
+ * of _src_chain if starting_fragment is TRUE */
if (stream->first_fragment_buffer) {
GstClockTime offset =
gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
}
/* Wait for preroll if blocking */
+ GST_DEBUG_OBJECT (stream->pad,
+ "About to push buffer of size %" G_GUINT64_FORMAT,
+ gst_buffer_get_size (buffer));
ret = gst_pad_push (stream->pad, buffer);
}
g_mutex_unlock (&stream->fragment_download_lock);
+ /* starting_fragment is set to TRUE at the beginning of
+ * _stream_download_fragment()
+ * /!\ If there is a header/index being downloaded, then this will
+ * be TRUE for the first one ... but FALSE for the remaining ones,
+ * including the *actual* fragment ! */
if (stream->starting_fragment) {
GstClockTime offset =
gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
} else {
GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
}
+
+ /* downloading_first_buffer is set to TRUE in download_uri() just before
+ * activating the source (i.e. requesting a given URI)
+ *
+ * The difference with starting_fragment is that this will be called
+ * for *all* first buffers (of index, and header, and fragment)
+ *
+ * ... to then only do something useful (in this block) for actual
+ * fragments... */
if (stream->downloading_first_buffer) {
gint64 chunk_size = 0;
gst_adaptive_demux_eos_handling (stream);
+ /* FIXME ?
+ * _eos_handling() calls fragment_download_finish() which does the
+ * same thing as below.
+ * Could this cause races ? */
g_mutex_lock (&stream->fragment_download_lock);
stream->download_finished = TRUE;
g_cond_signal (&stream->fragment_download_cond);
return FALSE;
}
+ /* Try to re-use existing source element */
if (stream->src != NULL) {
gchar *old_protocol, *new_protocol;
gchar *old_uri;
/* must be called with manifest_lock taken.
* Can temporarily release manifest_lock
+ *
+ * Will return when URI is fully downloaded (or aborted/errored)
*/
static GstFlowReturn
gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
GST_MANIFEST_UNLOCK (demux);
if (gst_element_set_state (stream->src,
GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
+ /* If ranges are specified, seek to it */
if (start != 0 || end != -1) {
/* HTTP ranges are inclusive, GStreamer segments are exclusive for the
* stop position */
ret = stream->last_ret = GST_FLOW_FLUSHING;
return ret;
}
+ /* download_finished is only set:
+ * * in ::fragment_download_finish()
+ * * if EOS is received on the _src pad
+ * */
while (!stream->cancelled && !stream->download_finished) {
g_cond_wait (&stream->fragment_download_cond,
&stream->fragment_download_lock);
}
g_mutex_unlock (&stream->fragment_download_lock);
+ GST_DEBUG_OBJECT (stream->pad,
+ "Finished Waiting for %s download: %s", uritype (stream), uri);
+
GST_MANIFEST_LOCK (demux);
g_mutex_lock (&stream->fragment_download_lock);
if (G_UNLIKELY (stream->cancelled)) {
guint http_status;
guint last_status_code;
+ /* FIXME : */
+ /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
stream->starting_fragment = TRUE;
stream->last_ret = GST_FLOW_OK;
stream->first_fragment_buffer = TRUE;
+ GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
+ stream->fragment.uri ? "FRAGMENT " : "",
+ stream->fragment.header_uri ? "HEADER " : "",
+ stream->fragment.index_uri ? "INDEX" : "");
+
if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
stream->fragment.index_uri == NULL)
goto no_url_error;
stream->last_ret = GST_FLOW_OK;
http_status = 200;
+ /* Download the actual fragment, either in fragments or in one go */
if (klass->need_another_chunk && klass->need_another_chunk (stream)
&& stream->fragment.chunk_size != 0) {
+ /* Handle chunk downloading */
gint64 range_start, range_end, chunk_start, chunk_end;
guint64 download_total_bytes;
gint chunk_size = stream->fragment.chunk_size;
last_status_code, stream->download_error_count);
live = gst_adaptive_demux_is_live (demux);
- if (!retried_once && ((last_status_code / 100 == 4 && live) || last_status_code / 100 == 5)) { /* 4xx/5xx */
+ if (!retried_once && ((last_status_code / 100 == 4 && live)
+ || last_status_code / 100 == 5)) {
+ /* 4xx/5xx */
/* if current position is before available start, switch to next */
if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
goto flushing;
ret = gst_adaptive_demux_eos_handling (stream);
GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
gst_flow_get_name (ret));
+ GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
gst_flow_get_name (ret));
GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
return GST_FLOW_EOS;
}
- }
-
- else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
+ } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
/* If this is the last fragment, consider failures EOS and not actual
* errors. Due to rounding errors in the durations, the last fragment
* might not actually exist */
g_mutex_unlock (&stream->fragment_download_lock);
}
+ /* Restarting download, figure out new position
+ * FIXME : Move this to a separate function ? */
if (G_UNLIKELY (stream->restart_download)) {
GstEvent *seg_event;
GstClockTime cur, ts = 0;
live = gst_adaptive_demux_is_live (demux);
+ /* Get information about the fragment to download */
+ GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
ret, gst_flow_get_name (ret));
}
/* must be called with manifest_lock taken */
+/* Called from:
+ * the ::finish_fragment() handlers when an *actual* fragment is done
+ * */
GstFlowReturn
gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime duration)
g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
+ GST_LOG_OBJECT (stream->pad,
+ "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
+
stream->download_error_count = 0;
g_clear_error (&stream->last_error);
/* FIXME - url has no indication of byte ranges for subsegments */
+ /* FIXME : All those time statistics are biased, since they are calculated
+ * *AFTER* the queue2, which might be blocking. They should ideally be
+ * calculated *before* queue2 in the uri_handler_probe */
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_element (GST_OBJECT_CAST (demux),
gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
GstAdaptiveDemuxStream * stream)
{
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
+ GstFlowReturn ret;
g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
GST_FLOW_ERROR);
stream->fragment.bitrate = 0;
stream->fragment.finished = FALSE;
- return klass->stream_update_fragment_info (stream);
+ GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->segment.position));
+
+ ret = klass->stream_update_fragment_info (stream);
+
+ GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
+ stream->fragment.uri);
+ if (ret == GST_FLOW_OK) {
+ GST_LOG_OBJECT (stream->pad,
+ "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->fragment.timestamp),
+ GST_TIME_ARGS (stream->fragment.duration));
+ GST_LOG_OBJECT (stream->pad,
+ "range start:%" G_GUINT64_FORMAT " end:%" G_GUINT64_FORMAT,
+ stream->fragment.range_start, stream->fragment.range_end);
+ }
+
+ return ret;
}
/* must be called with manifest_lock taken */