mssdemux: remove the stream loop task
authorThiago Santos <ts.santos@sisa.samsung.com>
Mon, 16 Dec 2013 19:14:24 +0000 (16:14 -0300)
committerThiago Santos <ts.santos@sisa.samsung.com>
Wed, 18 Dec 2013 21:52:19 +0000 (18:52 -0300)
Download and push from the same task, makes code a lot simpler
to maintain. Also pushing from separate threads avoids deadlocking
when gst_pad_push blocks due to downstream queues being full

ext/smoothstreaming/gstmssdemux.c
ext/smoothstreaming/gstmssdemux.h

index b15e991..1f863e0 100644 (file)
@@ -133,9 +133,11 @@ static gboolean gst_mss_demux_src_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
 
 static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
-static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
-static void gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
-    GstMiniObject * obj);
+static GstFlowReturn gst_mss_demux_stream_push (GstMssDemuxStream * stream,
+    GstBuffer * buffer);
+static GstFlowReturn gst_mss_demux_stream_push_event (GstMssDemuxStream *
+    stream, GstEvent * event);
+static GstFlowReturn gst_mss_demux_combine_flows (GstMssDemux * mssdemux);
 
 static gboolean gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
 
@@ -199,12 +201,6 @@ gst_mss_demux_init (GstMssDemux * mssdemux)
       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
 
-  g_rec_mutex_init (&mssdemux->stream_lock);
-  mssdemux->stream_task =
-      gst_task_new ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux,
-      NULL);
-  gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
-
   mssdemux->data_queue_max_size = DEFAULT_MAX_QUEUE_SIZE_BUFFERS;
   mssdemux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
 
@@ -212,18 +208,6 @@ gst_mss_demux_init (GstMssDemux * mssdemux)
   mssdemux->group_id = G_MAXUINT;
 }
 
-static gboolean
-_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
-    guint64 time, gpointer checkdata)
-{
-  GstMssDemuxStream *stream = checkdata;
-  GstMssDemux *mssdemux = stream->parent;
-
-  if (mssdemux->data_queue_max_size == 0)
-    return FALSE;               /* never full */
-  return visible >= mssdemux->data_queue_max_size;
-}
-
 static GstMssDemuxStream *
 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
     GstMssStream * manifeststream, GstPad * srcpad)
@@ -232,8 +216,6 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
 
   stream = g_new0 (GstMssDemuxStream, 1);
   stream->downloader = gst_uri_downloader_new ();
-  stream->dataqueue =
-      gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream);
 
   /* Downloading task */
   g_rec_mutex_init (&stream->download_lock);
@@ -261,8 +243,6 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
           GST_DEBUG_PAD_NAME (stream->pad));
       gst_uri_downloader_cancel (stream->downloader);
       gst_task_stop (stream->download_task);
-      g_rec_mutex_lock (&stream->download_lock);
-      g_rec_mutex_unlock (&stream->download_lock);
       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
       gst_task_join (stream->download_task);
       GST_LOG_OBJECT (stream->parent, "Finished");
@@ -282,10 +262,6 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
     g_object_unref (stream->downloader);
     stream->downloader = NULL;
   }
-  if (stream->dataqueue) {
-    g_object_unref (stream->dataqueue);
-    stream->dataqueue = NULL;
-  }
   if (stream->pad) {
     gst_object_unref (stream->pad);
     stream->pad = NULL;
@@ -302,17 +278,10 @@ gst_mss_demux_reset (GstMssDemux * mssdemux)
 
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
+
     if (stream->downloader)
       gst_uri_downloader_cancel (stream->downloader);
 
-    gst_data_queue_set_flushing (stream->dataqueue, TRUE);
-  }
-
-  if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
-    gst_task_stop (mssdemux->stream_task);
-    g_rec_mutex_lock (&mssdemux->stream_lock);
-    g_rec_mutex_unlock (&mssdemux->stream_lock);
-    gst_task_join (mssdemux->stream_task);
   }
 
   if (mssdemux->manifest_buffer) {
@@ -351,12 +320,6 @@ gst_mss_demux_dispose (GObject * object)
 
   gst_mss_demux_reset (mssdemux);
 
-  if (mssdemux->stream_task) {
-    gst_object_unref (mssdemux->stream_task);
-    g_rec_mutex_clear (&mssdemux->stream_lock);
-    mssdemux->stream_task = NULL;
-  }
-
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
 
@@ -465,8 +428,6 @@ gst_mss_demux_start (GstMssDemux * mssdemux)
     GstMssDemuxStream *stream = iter->data;
     gst_task_start (stream->download_task);
   }
-
-  gst_task_start (mssdemux->stream_task);
 }
 
 static gboolean
@@ -527,14 +488,11 @@ gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
 
-    gst_data_queue_set_flushing (stream->dataqueue, TRUE);
-
     stream->cancelled = TRUE;
     if (immediate)
       gst_uri_downloader_cancel (stream->downloader);
     gst_task_pause (stream->download_task);
   }
-  gst_task_pause (mssdemux->stream_task);
 
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
@@ -542,7 +500,6 @@ gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
     stream->cancelled = FALSE;
     stream->download_error_count = 0;
   }
-  g_rec_mutex_lock (&mssdemux->stream_lock);
 }
 
 static void
@@ -554,14 +511,11 @@ gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
     gst_uri_downloader_reset (stream->downloader);
     g_rec_mutex_unlock (&stream->download_lock);
   }
-  g_rec_mutex_unlock (&mssdemux->stream_lock);
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
 
-    gst_data_queue_set_flushing (stream->dataqueue, FALSE);
     gst_task_start (stream->download_task);
   }
-  gst_task_start (mssdemux->stream_task);
 }
 
 static gboolean
@@ -622,7 +576,6 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         if (flags & GST_SEEK_FLAG_FLUSH) {
           stream->last_ret = GST_FLOW_OK;
         }
-        gst_data_queue_flush (stream->dataqueue);
         gst_event_replace (&stream->pending_newsegment, newsegment);
       }
       gst_event_unref (newsegment);
@@ -648,8 +601,8 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
         if (stream->pad == pad) {
           GST_OBJECT_LOCK (mssdemux);
-          if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED
-              && stream->last_ret == GST_FLOW_NOT_LINKED) {
+
+          if (stream->last_ret == GST_FLOW_NOT_LINKED) {
             GST_DEBUG_OBJECT (stream->pad, "Received reconfigure");
             stream->restart_download = TRUE;
             gst_task_start (stream->download_task);
@@ -998,9 +951,10 @@ gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
   g_object_unref (downloader);
 }
 
-static void
+static GstEvent *
 gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
 {
+  GstEvent *capsevent = NULL;
   GstMssDemux *mssdemux = stream->parent;
   guint64 new_bitrate;
 
@@ -1015,7 +969,6 @@ gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
       "Current stream download bitrate %" G_GUINT64_FORMAT, new_bitrate);
 
   if (gst_mss_stream_select_bitrate (stream->manifest_stream, new_bitrate)) {
-    GstEvent *capsevent;
     GstCaps *caps;
     caps = gst_mss_stream_get_caps (stream->manifest_stream);
 
@@ -1032,47 +985,9 @@ gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
         gst_mss_stream_get_current_bitrate (stream->manifest_stream), caps);
 
     capsevent = gst_event_new_caps (stream->caps);
-    gst_mss_demux_stream_store_object (stream,
-        GST_MINI_OBJECT_CAST (capsevent));
     GST_DEBUG_OBJECT (stream->pad, "Finished streams reconfiguration");
   }
-}
-
-static void
-_free_data_queue_item (gpointer obj)
-{
-  GstDataQueueItem *item = obj;
-
-  gst_mini_object_unref (item->object);
-  g_slice_free (GstDataQueueItem, item);
-}
-
-static void
-gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
-    GstMiniObject * obj)
-{
-  GstDataQueueItem *item;
-  gboolean ret = FALSE;
-
-  item = g_slice_new (GstDataQueueItem);
-  item->object = (GstMiniObject *) obj;
-
-  item->duration = 0;           /* we don't care */
-  item->size = 0;
-  item->visible = TRUE;
-
-  item->destroy = (GDestroyNotify) _free_data_queue_item;
-
-  if (G_LIKELY (GST_IS_BUFFER (obj))) {
-    ret = gst_data_queue_push (stream->dataqueue, item);
-  } else {
-    ret = gst_data_queue_push_force (stream->dataqueue, item);
-  }
-
-  if (!ret) {
-    GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
-    item->destroy (item);
-  }
+  return capsevent;
 }
 
 static GstFlowReturn
@@ -1089,14 +1004,14 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
 
   /* special case for not-linked streams */
   if (stream->last_ret == GST_FLOW_NOT_LINKED) {
-    GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p",
+    GST_DEBUG_OBJECT (stream->pad, "Skipping download for not-linked stream %p",
         stream);
     return GST_FLOW_NOT_LINKED;
   }
 
   before_download = g_get_real_time ();
 
-  GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
+  GST_DEBUG_OBJECT (stream->pad, "Getting url for stream");
   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
   switch (ret) {
     case GST_FLOW_OK:
@@ -1186,6 +1101,7 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
   GstBuffer *buffer = NULL;
   gboolean buffer_downloaded = FALSE;
   GstEvent *gap = NULL;
+  GstEvent *capsevent = NULL;
 
   GST_LOG_OBJECT (stream->pad, "download loop start");
 
@@ -1214,9 +1130,7 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
     ts = MAX (ts, stream->next_timestamp);
 
     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
-        "position %" GST_TIME_FORMAT ", catching up until segment position %"
-        GST_TIME_FORMAT,
-        GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position));
+        "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
 
     if (GST_CLOCK_TIME_IS_VALID (ts)) {
       gst_mss_stream_seek (stream->manifest_stream, ts);
@@ -1226,33 +1140,16 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
       }
     }
 
-    /* This stream might be entering into catching up mode,
-     * meaning that it will push buffers from this same download thread
-     * until it reaches the segment position.
-     *
-     * The reason for this is that in case of stream switching, the other
-     * stream that was previously active might be blocking the stream_loop
-     * in case it is ahead enough that all queues are filled.
-     * In this case, it is possible that a downstream input-selector is
-     * blocking waiting for the currently active stream to reach the
-     * same position of the old linked stream because of the 'sync-streams'
-     * behavior.
-     *
-     * We can push from this thread up to segment position as all other
-     * streams should be around the same timestamp.
-     */
-    stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
-    stream->eos = FALSE;
-
-    gst_data_queue_set_flushing (stream->dataqueue, FALSE);
     stream->restart_download = FALSE;
+    stream->last_ret = GST_FLOW_OK;
   }
-
-  gst_mss_demux_reconfigure_stream (stream);
+  capsevent = gst_mss_demux_reconfigure_stream (stream);
   GST_OBJECT_UNLOCK (mssdemux);
 
-  if (gap != NULL)
+  if (G_UNLIKELY (gap != NULL))
     gst_pad_push_event (stream->pad, gap);
+  if (G_UNLIKELY (capsevent != NULL))
+    gst_pad_push_event (stream->pad, capsevent);
 
   ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
   buffer_downloaded = buffer != NULL;
@@ -1264,174 +1161,75 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
   }
 
   if (buffer) {
-    gboolean catch_up = FALSE;
-
-    /* Check if this stream is on catch up mode */
-    if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
-      GST_DEBUG_OBJECT (stream->pad,
-          "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
-          GST_TIME_ARGS (mssdemux->segment.position),
-          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
-      if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) {
-        catch_up = TRUE;
-      } else {
-        GST_OBJECT_LOCK (mssdemux);
-        stream->last_ret = GST_FLOW_OK;
-        gst_task_start (mssdemux->stream_task);
-        GST_OBJECT_UNLOCK (mssdemux);
-      }
-    }
-
-    GST_DEBUG_OBJECT (stream->pad,
-        "%s buffer for stream. Timestamp: %" GST_TIME_FORMAT
-        " Duration: %" GST_TIME_FORMAT,
-        catch_up ? "Catch up push for" : "Storing",
-        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
-        GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
-    if (catch_up) {
-      ret = gst_pad_push (stream->pad, buffer);
-      if (G_LIKELY (ret == GST_FLOW_OK))
-        stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
-      else {
-        stream->last_ret = ret;
-      }
-    } else {
-      gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer));
-    }
+    ret = gst_mss_demux_stream_push (stream, buffer);
   }
 
+  GST_OBJECT_LOCK (mssdemux);
+  stream->last_ret = ret;
   switch (ret) {
     case GST_FLOW_OK:
       break;                    /* all is good, let's go */
+
     case GST_FLOW_EOS:
-      goto eos;
-    case GST_FLOW_ERROR:
-      goto error;
+      GST_DEBUG_OBJECT (stream->pad, "EOS, stopping download loop");
+      gst_mss_demux_stream_push_event (stream, gst_event_new_eos ());
+      gst_task_pause (stream->download_task);
+      break;
+
     case GST_FLOW_NOT_LINKED:
-      goto notlinked;
-    case GST_FLOW_FLUSHING:
-      goto flushing;
+      gst_task_pause (stream->download_task);
+      if (gst_mss_demux_combine_flows (mssdemux) == GST_FLOW_NOT_LINKED) {
+        GST_ELEMENT_ERROR (mssdemux, STREAM, FAILED,
+            (_("Internal data stream error.")),
+            ("stream stopped, reason %s",
+                gst_flow_get_name (GST_FLOW_NOT_LINKED)));
+      }
+      break;
+
+    case GST_FLOW_FLUSHING:{
+      GSList *iter;
+
+      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+        GstMssDemuxStream *other;
+
+        other = iter->data;
+        gst_task_pause (other->download_task);
+      }
+    }
+      break;
+
     default:
-      if (ret < GST_FLOW_ERROR)
-        goto error;
+      if (ret <= GST_FLOW_ERROR) {
+        if (buffer_downloaded) {
+          GST_ERROR_OBJECT (mssdemux, "Error while pushing fragment");
+        } else {
+          GST_WARNING_OBJECT (mssdemux, "Error while downloading fragment");
+          if (++stream->download_error_count >=
+              DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
+            GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
+                (_("Couldn't download fragments")),
+                ("fragment downloading has failed too much consecutive times"));
+          }
+        }
+      }
       break;
   }
-
-  stream->download_error_count = 0;
+  GST_OBJECT_UNLOCK (mssdemux);
 
   if (buffer_downloaded) {
+    stream->download_error_count = 0;
     gst_mss_stream_advance_fragment (stream->manifest_stream);
   }
 
   GST_LOG_OBJECT (stream->pad, "download loop end");
   return;
 
-eos:
-  {
-    GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
-        GST_DEBUG_PAD_NAME (stream->pad));
-    gst_mss_demux_stream_store_object (stream,
-        GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
-    GST_OBJECT_LOCK (mssdemux);
-    gst_task_pause (stream->download_task);
-    gst_task_start (mssdemux->stream_task);
-    GST_OBJECT_UNLOCK (mssdemux);
-    return;
-  }
-error:
-  {
-    GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
-    if (++stream->download_error_count >= DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
-      GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
-          (_("Couldn't download fragments")),
-          ("fragment downloading has failed too much consecutive times"));
-    }
-    return;
-  }
 cancelled:
   {
     GST_DEBUG_OBJECT (mssdemux, "Stream %p has been cancelled", stream);
     gst_task_pause (stream->download_task);
     return;
   }
-notlinked:
-  {
-    GST_OBJECT_LOCK (mssdemux);
-    if (stream->last_ret == GST_FLOW_NOT_LINKED) {
-      gst_task_pause (stream->download_task);
-      gst_data_queue_set_flushing (stream->dataqueue, TRUE);
-    }
-    GST_OBJECT_UNLOCK (mssdemux);
-    return;
-  }
-flushing:
-  {
-    GSList *iter;
-
-    GST_OBJECT_LOCK (mssdemux);
-    gst_task_pause (mssdemux->stream_task);
-    for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
-      GstMssDemuxStream *other;
-
-      other = iter->data;
-      gst_task_pause (other->download_task);
-      gst_data_queue_set_flushing (other->dataqueue, TRUE);
-    }
-    GST_OBJECT_UNLOCK (mssdemux);
-    return;
-  }
-}
-
-static GstFlowReturn
-gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
-    GstMssDemuxStream ** stream)
-{
-  GstFlowReturn ret = GST_FLOW_OK;
-  GstMssDemuxStream *current = NULL;
-  GstClockTime cur_time = GST_CLOCK_TIME_NONE;
-  GSList *iter;
-
-  if (!mssdemux->streams)
-    return GST_FLOW_ERROR;
-
-  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
-    GstClockTime time;
-    GstMssDemuxStream *other;
-    GstDataQueueItem *item;
-
-    other = iter->data;
-    if (other->eos || other->last_ret != GST_FLOW_OK) {
-      GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d",
-          other, other->eos, other->last_ret);
-      continue;
-    }
-
-    if (!gst_data_queue_peek (other->dataqueue, &item)) {
-      /* flushing */
-      if (other->last_ret == GST_FLOW_NOT_LINKED) {
-        /* might have been unlinked and won't receive data for now */
-        continue;
-      }
-      return GST_FLOW_FLUSHING;
-    }
-
-    if (GST_IS_EVENT (item->object)) {
-      /* events have higher priority */
-      current = other;
-      break;
-    }
-    time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
-    if (time < cur_time) {
-      cur_time = time;
-      current = other;
-    }
-  }
-
-  *stream = current;
-  if (current == NULL)
-    ret = GST_FLOW_EOS;
-  return ret;
 }
 
 static GstFlowReturn
@@ -1455,144 +1253,53 @@ gst_mss_demux_combine_flows (GstMssDemux * mssdemux)
   return GST_FLOW_OK;
 }
 
-static void
-gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
+static gboolean
+gst_mss_demux_stream_push (GstMssDemuxStream * stream, GstBuffer * buf)
 {
-  GstMssDemuxStream *stream = NULL;
   GstFlowReturn ret;
-  GstMiniObject *object = NULL;
-  GstDataQueueItem *item = NULL;
-
-  GST_LOG_OBJECT (mssdemux, "Starting stream loop");
-
-  ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
-
-  if (stream)
-    GST_DEBUG_OBJECT (mssdemux,
-        "Stream loop selected %p stream of pad %s. %d - %s", stream,
-        GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
-  else
-    GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
-        gst_flow_get_name (ret));
-
-  /* Lock as this may change the tasks state */
-  GST_OBJECT_LOCK (mssdemux);
-  switch (ret) {
-    case GST_FLOW_OK:
-      break;
-    case GST_FLOW_ERROR:
-      goto error;
-    case GST_FLOW_EOS:
-      goto eos;
-    case GST_FLOW_FLUSHING:
-      GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
-      goto stop;
-    default:
-      g_assert_not_reached ();
-  }
-  GST_OBJECT_UNLOCK (mssdemux);
-
-  GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
-      stream, GST_PAD_NAME (stream->pad));
-  if (gst_data_queue_pop (stream->dataqueue, &item)) {
-    if (item->object)
-      object = gst_mini_object_ref (item->object);
-    item->destroy (item);
-  } else {
-    GST_DEBUG_OBJECT (mssdemux,
-        "Failed to get object from dataqueue on stream %p %s", stream,
-        GST_PAD_NAME (stream->pad));
-    goto stop;
-  }
 
   if (G_UNLIKELY (stream->pending_newsegment)) {
     gst_pad_push_event (stream->pad, stream->pending_newsegment);
     stream->pending_newsegment = NULL;
   }
 
-  if (G_LIKELY (GST_IS_BUFFER (object))) {
-    if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
-      GST_DEBUG_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
-          GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
-          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
-          GST_TIME_ARGS (stream->next_timestamp));
-      GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
-    }
-
-    GST_DEBUG_OBJECT (mssdemux,
-        "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
-        " discont:%d on pad %s:%s", object,
-        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
-        GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
-        GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
-        GST_DEBUG_PAD_NAME (stream->pad));
-
-    stream->next_timestamp =
-        GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
-
-    stream->have_data = TRUE;
-    mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object);
-
-    ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
-    GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)",
-        GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
-  } else if (GST_IS_EVENT (object)) {
-    if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
-      stream->eos = TRUE;
-    }
-    GST_DEBUG_OBJECT (mssdemux, "Pushing event %" GST_PTR_FORMAT " on pad %s",
-        object, GST_PAD_NAME (stream->pad));
-    gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
-    ret = GST_FLOW_EOS;
-  } else {
-    g_return_if_reached ();
+  if (GST_BUFFER_TIMESTAMP (buf) != stream->next_timestamp) {
+    GST_DEBUG_OBJECT (stream->pad, "Marking buffer %p as discont buffer:%"
+        GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, buf,
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+        GST_TIME_ARGS (stream->next_timestamp));
+    GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
   }
 
-  /* Lock as this may change the tasks state */
-  GST_OBJECT_LOCK (mssdemux);
-  stream->last_ret = ret;
-  ret = gst_mss_demux_combine_flows (mssdemux);
-  switch (ret) {
-    case GST_FLOW_EOS:
-      goto eos;
-    case GST_FLOW_ERROR:
-      goto error;
-    case GST_FLOW_FLUSHING:
-      goto stop;
-    case GST_FLOW_NOT_LINKED:
-      /* stream won't download any more data until it gets a reconfigure */
-      break;
-    case GST_FLOW_OK:
-      break;
-    default:
-      if (ret < GST_FLOW_ERROR)
-        goto error;
-      break;
-  }
-  GST_OBJECT_UNLOCK (mssdemux);
+  GST_DEBUG_OBJECT (stream->pad,
+      "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
+      " discont:%d", buf,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
+      GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DISCONT));
 
-  GST_LOG_OBJECT (mssdemux, "Stream loop end");
-  return;
+  stream->next_timestamp =
+      GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf);
 
-eos:
-  {
-    GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
-    gst_task_pause (mssdemux->stream_task);
-    GST_OBJECT_UNLOCK (mssdemux);
-    return;
-  }
-error:
-  {
-    GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
-    gst_task_pause (mssdemux->stream_task);
-    GST_OBJECT_UNLOCK (mssdemux);
-    return;
-  }
-stop:
-  {
-    GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task");
-    gst_task_pause (mssdemux->stream_task);
-    GST_OBJECT_UNLOCK (mssdemux);
-    return;
+  stream->have_data = TRUE;
+  stream->parent->segment.position = GST_BUFFER_TIMESTAMP (buf);
+
+  ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (buf));
+  GST_DEBUG_OBJECT (stream->pad, "Pushed. result: %d (%s)",
+      ret, gst_flow_get_name (ret));
+
+  return ret;
+}
+
+static gboolean
+gst_mss_demux_stream_push_event (GstMssDemuxStream * stream, GstEvent * event)
+{
+  gboolean ret;
+
+  if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+    stream->eos = TRUE;
   }
+  GST_DEBUG_OBJECT (stream->pad, "Pushing event %" GST_PTR_FORMAT, event);
+  ret = gst_pad_push_event (stream->pad, event);
+  return ret;
 }
index 9a7249e..4550eb9 100644 (file)
@@ -62,7 +62,6 @@ struct _GstMssDemuxStream {
   GstMssStream *manifest_stream;
 
   GstUriDownloader *downloader;
-  GstDataQueue *dataqueue;
 
   GstEvent *pending_newsegment;
 
@@ -106,10 +105,6 @@ struct _GstMssDemux {
 
   gboolean update_bitrates;
 
-  /* Streaming task */
-  GstTask *stream_task;
-  GRecMutex stream_lock;
-
   /* properties */
   guint64 connection_speed; /* in bps */
   guint data_queue_max_size;