GstQuery * query);
static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
-static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
-static void gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
- GstMiniObject * obj);
+static GstFlowReturn gst_mss_demux_stream_push (GstMssDemuxStream * stream,
+ GstBuffer * buffer);
+static GstFlowReturn gst_mss_demux_stream_push_event (GstMssDemuxStream *
+ stream, GstEvent * event);
+static GstFlowReturn gst_mss_demux_combine_flows (GstMssDemux * mssdemux);
static gboolean gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
GST_DEBUG_FUNCPTR (gst_mss_demux_event));
gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
- g_rec_mutex_init (&mssdemux->stream_lock);
- mssdemux->stream_task =
- gst_task_new ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux,
- NULL);
- gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
-
mssdemux->data_queue_max_size = DEFAULT_MAX_QUEUE_SIZE_BUFFERS;
mssdemux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
mssdemux->group_id = G_MAXUINT;
}
-static gboolean
-_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
- guint64 time, gpointer checkdata)
-{
- GstMssDemuxStream *stream = checkdata;
- GstMssDemux *mssdemux = stream->parent;
-
- if (mssdemux->data_queue_max_size == 0)
- return FALSE; /* never full */
- return visible >= mssdemux->data_queue_max_size;
-}
-
static GstMssDemuxStream *
gst_mss_demux_stream_new (GstMssDemux * mssdemux,
GstMssStream * manifeststream, GstPad * srcpad)
stream = g_new0 (GstMssDemuxStream, 1);
stream->downloader = gst_uri_downloader_new ();
- stream->dataqueue =
- gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream);
/* Downloading task */
g_rec_mutex_init (&stream->download_lock);
GST_DEBUG_PAD_NAME (stream->pad));
gst_uri_downloader_cancel (stream->downloader);
gst_task_stop (stream->download_task);
- g_rec_mutex_lock (&stream->download_lock);
- g_rec_mutex_unlock (&stream->download_lock);
GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
gst_task_join (stream->download_task);
GST_LOG_OBJECT (stream->parent, "Finished");
g_object_unref (stream->downloader);
stream->downloader = NULL;
}
- if (stream->dataqueue) {
- g_object_unref (stream->dataqueue);
- stream->dataqueue = NULL;
- }
if (stream->pad) {
gst_object_unref (stream->pad);
stream->pad = NULL;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
+
if (stream->downloader)
gst_uri_downloader_cancel (stream->downloader);
- gst_data_queue_set_flushing (stream->dataqueue, TRUE);
- }
-
- if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
- gst_task_stop (mssdemux->stream_task);
- g_rec_mutex_lock (&mssdemux->stream_lock);
- g_rec_mutex_unlock (&mssdemux->stream_lock);
- gst_task_join (mssdemux->stream_task);
}
if (mssdemux->manifest_buffer) {
gst_mss_demux_reset (mssdemux);
- if (mssdemux->stream_task) {
- gst_object_unref (mssdemux->stream_task);
- g_rec_mutex_clear (&mssdemux->stream_lock);
- mssdemux->stream_task = NULL;
- }
-
G_OBJECT_CLASS (parent_class)->dispose (object);
}
GstMssDemuxStream *stream = iter->data;
gst_task_start (stream->download_task);
}
-
- gst_task_start (mssdemux->stream_task);
}
static gboolean
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- gst_data_queue_set_flushing (stream->dataqueue, TRUE);
-
stream->cancelled = TRUE;
if (immediate)
gst_uri_downloader_cancel (stream->downloader);
gst_task_pause (stream->download_task);
}
- gst_task_pause (mssdemux->stream_task);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
stream->cancelled = FALSE;
stream->download_error_count = 0;
}
- g_rec_mutex_lock (&mssdemux->stream_lock);
}
static void
gst_uri_downloader_reset (stream->downloader);
g_rec_mutex_unlock (&stream->download_lock);
}
- g_rec_mutex_unlock (&mssdemux->stream_lock);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
- gst_data_queue_set_flushing (stream->dataqueue, FALSE);
gst_task_start (stream->download_task);
}
- gst_task_start (mssdemux->stream_task);
}
static gboolean
if (flags & GST_SEEK_FLAG_FLUSH) {
stream->last_ret = GST_FLOW_OK;
}
- gst_data_queue_flush (stream->dataqueue);
gst_event_replace (&stream->pending_newsegment, newsegment);
}
gst_event_unref (newsegment);
if (stream->pad == pad) {
GST_OBJECT_LOCK (mssdemux);
- if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED
- && stream->last_ret == GST_FLOW_NOT_LINKED) {
+
+ if (stream->last_ret == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (stream->pad, "Received reconfigure");
stream->restart_download = TRUE;
gst_task_start (stream->download_task);
g_object_unref (downloader);
}
-static void
+static GstEvent *
gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
{
+ GstEvent *capsevent = NULL;
GstMssDemux *mssdemux = stream->parent;
guint64 new_bitrate;
"Current stream download bitrate %" G_GUINT64_FORMAT, new_bitrate);
if (gst_mss_stream_select_bitrate (stream->manifest_stream, new_bitrate)) {
- GstEvent *capsevent;
GstCaps *caps;
caps = gst_mss_stream_get_caps (stream->manifest_stream);
gst_mss_stream_get_current_bitrate (stream->manifest_stream), caps);
capsevent = gst_event_new_caps (stream->caps);
- gst_mss_demux_stream_store_object (stream,
- GST_MINI_OBJECT_CAST (capsevent));
GST_DEBUG_OBJECT (stream->pad, "Finished streams reconfiguration");
}
-}
-
-static void
-_free_data_queue_item (gpointer obj)
-{
- GstDataQueueItem *item = obj;
-
- gst_mini_object_unref (item->object);
- g_slice_free (GstDataQueueItem, item);
-}
-
-static void
-gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
- GstMiniObject * obj)
-{
- GstDataQueueItem *item;
- gboolean ret = FALSE;
-
- item = g_slice_new (GstDataQueueItem);
- item->object = (GstMiniObject *) obj;
-
- item->duration = 0; /* we don't care */
- item->size = 0;
- item->visible = TRUE;
-
- item->destroy = (GDestroyNotify) _free_data_queue_item;
-
- if (G_LIKELY (GST_IS_BUFFER (obj))) {
- ret = gst_data_queue_push (stream->dataqueue, item);
- } else {
- ret = gst_data_queue_push_force (stream->dataqueue, item);
- }
-
- if (!ret) {
- GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
- item->destroy (item);
- }
+ return capsevent;
}
static GstFlowReturn
/* special case for not-linked streams */
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
- GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p",
+ GST_DEBUG_OBJECT (stream->pad, "Skipping download for not-linked stream %p",
stream);
return GST_FLOW_NOT_LINKED;
}
before_download = g_get_real_time ();
- GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
+ GST_DEBUG_OBJECT (stream->pad, "Getting url for stream");
ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
switch (ret) {
case GST_FLOW_OK:
GstBuffer *buffer = NULL;
gboolean buffer_downloaded = FALSE;
GstEvent *gap = NULL;
+ GstEvent *capsevent = NULL;
GST_LOG_OBJECT (stream->pad, "download loop start");
ts = MAX (ts, stream->next_timestamp);
GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
- "position %" GST_TIME_FORMAT ", catching up until segment position %"
- GST_TIME_FORMAT,
- GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position));
+ "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
if (GST_CLOCK_TIME_IS_VALID (ts)) {
gst_mss_stream_seek (stream->manifest_stream, ts);
}
}
- /* This stream might be entering into catching up mode,
- * meaning that it will push buffers from this same download thread
- * until it reaches the segment position.
- *
- * The reason for this is that in case of stream switching, the other
- * stream that was previously active might be blocking the stream_loop
- * in case it is ahead enough that all queues are filled.
- * In this case, it is possible that a downstream input-selector is
- * blocking waiting for the currently active stream to reach the
- * same position of the old linked stream because of the 'sync-streams'
- * behavior.
- *
- * We can push from this thread up to segment position as all other
- * streams should be around the same timestamp.
- */
- stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
- stream->eos = FALSE;
-
- gst_data_queue_set_flushing (stream->dataqueue, FALSE);
stream->restart_download = FALSE;
+ stream->last_ret = GST_FLOW_OK;
}
-
- gst_mss_demux_reconfigure_stream (stream);
+ capsevent = gst_mss_demux_reconfigure_stream (stream);
GST_OBJECT_UNLOCK (mssdemux);
- if (gap != NULL)
+ if (G_UNLIKELY (gap != NULL))
gst_pad_push_event (stream->pad, gap);
+ if (G_UNLIKELY (capsevent != NULL))
+ gst_pad_push_event (stream->pad, capsevent);
ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
buffer_downloaded = buffer != NULL;
}
if (buffer) {
- gboolean catch_up = FALSE;
-
- /* Check if this stream is on catch up mode */
- if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
- GST_DEBUG_OBJECT (stream->pad,
- "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
- GST_TIME_ARGS (mssdemux->segment.position),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
- if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) {
- catch_up = TRUE;
- } else {
- GST_OBJECT_LOCK (mssdemux);
- stream->last_ret = GST_FLOW_OK;
- gst_task_start (mssdemux->stream_task);
- GST_OBJECT_UNLOCK (mssdemux);
- }
- }
-
- GST_DEBUG_OBJECT (stream->pad,
- "%s buffer for stream. Timestamp: %" GST_TIME_FORMAT
- " Duration: %" GST_TIME_FORMAT,
- catch_up ? "Catch up push for" : "Storing",
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
- if (catch_up) {
- ret = gst_pad_push (stream->pad, buffer);
- if (G_LIKELY (ret == GST_FLOW_OK))
- stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
- else {
- stream->last_ret = ret;
- }
- } else {
- gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer));
- }
+ ret = gst_mss_demux_stream_push (stream, buffer);
}
+ GST_OBJECT_LOCK (mssdemux);
+ stream->last_ret = ret;
switch (ret) {
case GST_FLOW_OK:
break; /* all is good, let's go */
+
case GST_FLOW_EOS:
- goto eos;
- case GST_FLOW_ERROR:
- goto error;
+ GST_DEBUG_OBJECT (stream->pad, "EOS, stopping download loop");
+ gst_mss_demux_stream_push_event (stream, gst_event_new_eos ());
+ gst_task_pause (stream->download_task);
+ break;
+
case GST_FLOW_NOT_LINKED:
- goto notlinked;
- case GST_FLOW_FLUSHING:
- goto flushing;
+ gst_task_pause (stream->download_task);
+ if (gst_mss_demux_combine_flows (mssdemux) == GST_FLOW_NOT_LINKED) {
+ GST_ELEMENT_ERROR (mssdemux, STREAM, FAILED,
+ (_("Internal data stream error.")),
+ ("stream stopped, reason %s",
+ gst_flow_get_name (GST_FLOW_NOT_LINKED)));
+ }
+ break;
+
+ case GST_FLOW_FLUSHING:{
+ GSList *iter;
+
+ for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+ GstMssDemuxStream *other;
+
+ other = iter->data;
+ gst_task_pause (other->download_task);
+ }
+ }
+ break;
+
default:
- if (ret < GST_FLOW_ERROR)
- goto error;
+ if (ret <= GST_FLOW_ERROR) {
+ if (buffer_downloaded) {
+ GST_ERROR_OBJECT (mssdemux, "Error while pushing fragment");
+ } else {
+ GST_WARNING_OBJECT (mssdemux, "Error while downloading fragment");
+ if (++stream->download_error_count >=
+ DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
+ GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
+ (_("Couldn't download fragments")),
+ ("fragment downloading has failed too much consecutive times"));
+ }
+ }
+ }
break;
}
-
- stream->download_error_count = 0;
+ GST_OBJECT_UNLOCK (mssdemux);
if (buffer_downloaded) {
+ stream->download_error_count = 0;
gst_mss_stream_advance_fragment (stream->manifest_stream);
}
GST_LOG_OBJECT (stream->pad, "download loop end");
return;
-eos:
- {
- GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
- GST_DEBUG_PAD_NAME (stream->pad));
- gst_mss_demux_stream_store_object (stream,
- GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
- GST_OBJECT_LOCK (mssdemux);
- gst_task_pause (stream->download_task);
- gst_task_start (mssdemux->stream_task);
- GST_OBJECT_UNLOCK (mssdemux);
- return;
- }
-error:
- {
- GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
- if (++stream->download_error_count >= DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
- GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
- (_("Couldn't download fragments")),
- ("fragment downloading has failed too much consecutive times"));
- }
- return;
- }
cancelled:
{
GST_DEBUG_OBJECT (mssdemux, "Stream %p has been cancelled", stream);
gst_task_pause (stream->download_task);
return;
}
-notlinked:
- {
- GST_OBJECT_LOCK (mssdemux);
- if (stream->last_ret == GST_FLOW_NOT_LINKED) {
- gst_task_pause (stream->download_task);
- gst_data_queue_set_flushing (stream->dataqueue, TRUE);
- }
- GST_OBJECT_UNLOCK (mssdemux);
- return;
- }
-flushing:
- {
- GSList *iter;
-
- GST_OBJECT_LOCK (mssdemux);
- gst_task_pause (mssdemux->stream_task);
- for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
- GstMssDemuxStream *other;
-
- other = iter->data;
- gst_task_pause (other->download_task);
- gst_data_queue_set_flushing (other->dataqueue, TRUE);
- }
- GST_OBJECT_UNLOCK (mssdemux);
- return;
- }
-}
-
-static GstFlowReturn
-gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
- GstMssDemuxStream ** stream)
-{
- GstFlowReturn ret = GST_FLOW_OK;
- GstMssDemuxStream *current = NULL;
- GstClockTime cur_time = GST_CLOCK_TIME_NONE;
- GSList *iter;
-
- if (!mssdemux->streams)
- return GST_FLOW_ERROR;
-
- for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
- GstClockTime time;
- GstMssDemuxStream *other;
- GstDataQueueItem *item;
-
- other = iter->data;
- if (other->eos || other->last_ret != GST_FLOW_OK) {
- GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d",
- other, other->eos, other->last_ret);
- continue;
- }
-
- if (!gst_data_queue_peek (other->dataqueue, &item)) {
- /* flushing */
- if (other->last_ret == GST_FLOW_NOT_LINKED) {
- /* might have been unlinked and won't receive data for now */
- continue;
- }
- return GST_FLOW_FLUSHING;
- }
-
- if (GST_IS_EVENT (item->object)) {
- /* events have higher priority */
- current = other;
- break;
- }
- time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
- if (time < cur_time) {
- cur_time = time;
- current = other;
- }
- }
-
- *stream = current;
- if (current == NULL)
- ret = GST_FLOW_EOS;
- return ret;
}
static GstFlowReturn
return GST_FLOW_OK;
}
-static void
-gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
+static gboolean
+gst_mss_demux_stream_push (GstMssDemuxStream * stream, GstBuffer * buf)
{
- GstMssDemuxStream *stream = NULL;
GstFlowReturn ret;
- GstMiniObject *object = NULL;
- GstDataQueueItem *item = NULL;
-
- GST_LOG_OBJECT (mssdemux, "Starting stream loop");
-
- ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
-
- if (stream)
- GST_DEBUG_OBJECT (mssdemux,
- "Stream loop selected %p stream of pad %s. %d - %s", stream,
- GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
- else
- GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
- gst_flow_get_name (ret));
-
- /* Lock as this may change the tasks state */
- GST_OBJECT_LOCK (mssdemux);
- switch (ret) {
- case GST_FLOW_OK:
- break;
- case GST_FLOW_ERROR:
- goto error;
- case GST_FLOW_EOS:
- goto eos;
- case GST_FLOW_FLUSHING:
- GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
- goto stop;
- default:
- g_assert_not_reached ();
- }
- GST_OBJECT_UNLOCK (mssdemux);
-
- GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
- stream, GST_PAD_NAME (stream->pad));
- if (gst_data_queue_pop (stream->dataqueue, &item)) {
- if (item->object)
- object = gst_mini_object_ref (item->object);
- item->destroy (item);
- } else {
- GST_DEBUG_OBJECT (mssdemux,
- "Failed to get object from dataqueue on stream %p %s", stream,
- GST_PAD_NAME (stream->pad));
- goto stop;
- }
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
}
- if (G_LIKELY (GST_IS_BUFFER (object))) {
- if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
- GST_DEBUG_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
- GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
- GST_TIME_ARGS (stream->next_timestamp));
- GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
- }
-
- GST_DEBUG_OBJECT (mssdemux,
- "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
- " discont:%d on pad %s:%s", object,
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
- GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
- GST_DEBUG_PAD_NAME (stream->pad));
-
- stream->next_timestamp =
- GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
-
- stream->have_data = TRUE;
- mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object);
-
- ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
- GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)",
- GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
- } else if (GST_IS_EVENT (object)) {
- if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
- stream->eos = TRUE;
- }
- GST_DEBUG_OBJECT (mssdemux, "Pushing event %" GST_PTR_FORMAT " on pad %s",
- object, GST_PAD_NAME (stream->pad));
- gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
- ret = GST_FLOW_EOS;
- } else {
- g_return_if_reached ();
+ if (GST_BUFFER_TIMESTAMP (buf) != stream->next_timestamp) {
+ GST_DEBUG_OBJECT (stream->pad, "Marking buffer %p as discont buffer:%"
+ GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, buf,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+ GST_TIME_ARGS (stream->next_timestamp));
+ GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
}
- /* Lock as this may change the tasks state */
- GST_OBJECT_LOCK (mssdemux);
- stream->last_ret = ret;
- ret = gst_mss_demux_combine_flows (mssdemux);
- switch (ret) {
- case GST_FLOW_EOS:
- goto eos;
- case GST_FLOW_ERROR:
- goto error;
- case GST_FLOW_FLUSHING:
- goto stop;
- case GST_FLOW_NOT_LINKED:
- /* stream won't download any more data until it gets a reconfigure */
- break;
- case GST_FLOW_OK:
- break;
- default:
- if (ret < GST_FLOW_ERROR)
- goto error;
- break;
- }
- GST_OBJECT_UNLOCK (mssdemux);
+ GST_DEBUG_OBJECT (stream->pad,
+ "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
+ " discont:%d", buf,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
+ GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DISCONT));
- GST_LOG_OBJECT (mssdemux, "Stream loop end");
- return;
+ stream->next_timestamp =
+ GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf);
-eos:
- {
- GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
- gst_task_pause (mssdemux->stream_task);
- GST_OBJECT_UNLOCK (mssdemux);
- return;
- }
-error:
- {
- GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
- gst_task_pause (mssdemux->stream_task);
- GST_OBJECT_UNLOCK (mssdemux);
- return;
- }
-stop:
- {
- GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task");
- gst_task_pause (mssdemux->stream_task);
- GST_OBJECT_UNLOCK (mssdemux);
- return;
+ stream->have_data = TRUE;
+ stream->parent->segment.position = GST_BUFFER_TIMESTAMP (buf);
+
+ ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (buf));
+ GST_DEBUG_OBJECT (stream->pad, "Pushed. result: %d (%s)",
+ ret, gst_flow_get_name (ret));
+
+ return ret;
+}
+
+static gboolean
+gst_mss_demux_stream_push_event (GstMssDemuxStream * stream, GstEvent * event)
+{
+ gboolean ret;
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+ stream->eos = TRUE;
}
+ GST_DEBUG_OBJECT (stream->pad, "Pushing event %" GST_PTR_FORMAT, event);
+ ret = gst_pad_push_event (stream->pad, event);
+ return ret;
}