tagdemux: don't to data processing in state change
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 15 Feb 2012 09:12:55 +0000 (10:12 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 15 Feb 2012 09:12:55 +0000 (10:12 +0100)
Start a task to perform the pulling and typefind of the tags.

gst-libs/gst/tag/gsttagdemux.c

index 6e12506..3020a17 100644 (file)
@@ -112,6 +112,8 @@ struct _GstTagDemuxPrivate
   GstSegment segment;
   gboolean need_newseg;
 
+  guint64 offset;
+
   GList *pending_events;
 };
 
@@ -139,6 +141,8 @@ static GstFlowReturn gst_tag_demux_chain (GstPad * pad, GstObject * parent,
 static gboolean gst_tag_demux_sink_event (GstPad * pad, GstObject * parent,
     GstEvent * event);
 
+static gboolean gst_tag_demux_sink_activate_mode (GstPad * pad,
+    GstObject * parent, GstPadMode mode, gboolean active);
 static gboolean gst_tag_demux_src_activate_mode (GstPad * pad,
     GstObject * parent, GstPadMode mode, gboolean active);
 static GstFlowReturn gst_tag_demux_read_range (GstTagDemux * tagdemux,
@@ -292,6 +296,8 @@ gst_tag_demux_init (GstTagDemux * demux, GstTagDemuxClass * gclass)
   if (tmpl) {
     demux->priv->sinkpad = gst_pad_new_from_template (tmpl, "sink");
 
+    gst_pad_set_activatemode_function (demux->priv->sinkpad,
+        GST_DEBUG_FUNCPTR (gst_tag_demux_sink_activate_mode));
     gst_pad_set_activate_function (demux->priv->sinkpad,
         GST_DEBUG_FUNCPTR (gst_tag_demux_sink_activate));
     gst_pad_set_event_function (demux->priv->sinkpad,
@@ -703,6 +709,10 @@ gst_tag_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
     case GST_EVENT_FLUSH_START:
       ret = gst_pad_event_default (pad, parent, event);
       break;
+    case GST_EVENT_CAPS:
+      /* we drop the caps event. We do typefind and push a new caps event. */
+      ret = gst_pad_event_default (pad, parent, event);
+      break;
     default:
       if (demux->priv->need_newseg && GST_EVENT_IS_SERIALIZED (event)) {
         /* Cache all events if we have a pending segment, so they don't get
@@ -1078,67 +1088,38 @@ done:
   return res;
 }
 
-/* This function operates similarly to gst_type_find_element_activate
+/* This function operates similarly to gst_type_find_element_loop
  * in the typefind element
- * 1. try to activate in pull mode. if not, switch to push and succeed.
- * 2. try to read tags in pull mode
- * 3. typefind the contents
- * 4. deactivate pull mode.
- * 5. if we didn't find any caps, fail.
- * 6. Add the srcpad
- * 7. if the sink pad is activated, we are in pull mode. succeed.
- *    otherwise activate both pads in push mode and succeed.
+ * 1. try to read tags in pull mode
+ * 2. typefind the contents
+ * 3. if we didn't find any caps, fail.
+ * 4. set caps on srcpad
  */
-static gboolean
-gst_tag_demux_sink_activate (GstPad * sinkpad, GstObject * parent)
+static void
+gst_tag_demux_element_loop (GstTagDemux * demux)
 {
-  GstTypeFindProbability probability = 0;
   GstTagDemuxClass *klass;
-  GstTagDemux *demux;
+  GstTypeFindProbability probability = 0;
+  GstFlowReturn ret = GST_FLOW_OK;
   GstTagList *start_tags = NULL;
   GstTagList *end_tags = NULL;
   gboolean e_tag_ok, s_tag_ok;
-  gboolean ret = FALSE;
   GstCaps *caps = NULL;
-  GstQuery *query;
-  gboolean pull_mode;
-
-  demux = GST_TAG_DEMUX (parent);
-  klass = GST_TAG_DEMUX_CLASS (G_OBJECT_GET_CLASS (demux));
-
-  /* 1: */
-  /* If we can activate pull_range upstream, then read any end and start
-   * tags, otherwise activate in push mode and the chain function will 
-   * collect buffers, read the start tag and output a buffer to end
-   * preroll.
-   */
-  query = gst_query_new_scheduling ();
-
-  if (!gst_pad_peer_query (sinkpad, query)) {
-    gst_query_unref (query);
-    goto activate_push;
-  }
-
-  pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
-  gst_query_unref (query);
-
-  if (!pull_mode)
-    goto activate_push;
-
-  if (!gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, TRUE))
-    goto activate_push;
 
   /* Look for tags at start and end of file */
   GST_DEBUG_OBJECT (demux, "Activated pull mode. Looking for tags");
   if (!gst_tag_demux_get_upstream_size (demux))
-    return FALSE;
+    goto no_size;
 
   demux->priv->strip_start = 0;
   demux->priv->strip_end = 0;
 
+  /* 1 - Read tags */
   s_tag_ok = gst_tag_demux_pull_start_tag (demux, &start_tags);
   e_tag_ok = gst_tag_demux_pull_end_tag (demux, &end_tags);
 
+  klass = GST_TAG_DEMUX_CLASS (G_OBJECT_GET_CLASS (demux));
+
   if (klass->merge_tags != NULL) {
     demux->priv->parsed_tags = klass->merge_tags (demux, start_tags, end_tags);
   } else {
@@ -1155,22 +1136,17 @@ gst_tag_demux_sink_activate (GstPad * sinkpad, GstObject * parent)
     gst_tag_list_free (end_tags);
 
   if (!e_tag_ok && !s_tag_ok)
-    return FALSE;
+    goto no_tags;
 
   if (demux->priv->parsed_tags != NULL) {
     demux->priv->send_tag_event = TRUE;
   }
 
   if (demux->priv->upstream_size <=
-      demux->priv->strip_start + demux->priv->strip_end) {
-    /* There was no data (probably due to a truncated file) */
-    GST_DEBUG_OBJECT (demux, "No data in file");
-    /* so we don't know about type either */
-    GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
-    goto done_activate;
-  }
+      demux->priv->strip_start + demux->priv->strip_end)
+    goto no_data;
 
-  /* 3 - Do typefinding on data */
+  /* 2 - Do typefinding on data */
   caps = gst_type_find_helper_get_range (GST_OBJECT (demux), NULL,
       (GstTypeFindHelperGetRangeFunction) gst_tag_demux_read_range,
       demux->priv->upstream_size
@@ -1180,20 +1156,9 @@ gst_tag_demux_sink_activate (GstPad * sinkpad, GstObject * parent)
   GST_DEBUG_OBJECT (demux, "Found type %" GST_PTR_FORMAT " with a "
       "probability of %u", caps, probability);
 
-  /* 4 - Deactivate pull mode */
-  if (!gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, FALSE)) {
-    if (caps)
-      gst_caps_unref (caps);
-    GST_DEBUG_OBJECT (demux, "Could not deactivate sinkpad after reading tags");
-    return FALSE;
-  }
-
-  /* 5 - If we didn't find the caps, fail */
-  if (caps == NULL) {
-    GST_DEBUG_OBJECT (demux, "Could not detect type of contents");
-    GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
-    goto done_activate;
-  }
+  /* 3 - If we didn't find the caps, fail */
+  if (caps == NULL)
+    goto no_caps;
 
   /* tag reading and typefinding were already done, don't do them again in
    * the chain function if we end up in push mode */
@@ -1202,20 +1167,138 @@ gst_tag_demux_sink_activate (GstPad * sinkpad, GstObject * parent)
   /* 6 Set the srcpad caps now we know them */
   gst_tag_demux_set_src_caps (demux, caps);
 
-  /* 7 - if the sinkpad is active, it was done by downstream so we're 
-   * done, otherwise switch to push */
-  ret = TRUE;
-  if (!gst_pad_is_active (sinkpad)) {
-    ret = gst_pad_activate_mode (demux->priv->srcpad, GST_PAD_MODE_PUSH, TRUE);
-    ret &= gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PUSH, TRUE);
+  if (caps)
+    gst_caps_unref (caps);
+
+  /* now we pause */
+  goto pause;
+
+  /* ERRORS */
+no_size:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
+        ("Could not get stream size"), (NULL));
+    ret = GST_FLOW_ERROR;
+    goto pause;
+  }
+no_tags:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
+        ("Could not get start and/or end tag"), (NULL));
+    ret = GST_FLOW_ERROR;
+    goto pause;
+  }
+no_data:
+  {
+    /* There was no data (probably due to a truncated file) */
+    /* so we don't know about type either */
+    GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, ("No data in file"),
+        (NULL));
+    ret = GST_FLOW_ERROR;
+    goto pause;
+  }
+no_caps:
+  {
+    GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
+        ("Could not detect type of contents"), (NULL));
+    ret = GST_FLOW_ERROR;
+    goto pause;
+  }
+pause:
+  {
+    const gchar *reason = gst_flow_get_name (ret);
+    gboolean push_eos = FALSE;
+
+    GST_LOG_OBJECT (demux, "pausing task, reason %s", reason);
+    gst_pad_pause_task (demux->priv->sinkpad);
+
+    if (ret == GST_FLOW_EOS) {
+      /* perform EOS logic */
+
+      if (demux->priv->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+        gint64 stop;
+
+        /* for segment playback we need to post when (in stream time)
+         * we stopped, this is either stop (when set) or the duration. */
+        if ((stop = demux->priv->segment.stop) == -1)
+          stop = demux->priv->offset;
+
+        GST_LOG_OBJECT (demux, "Sending segment done, at end of segment");
+        gst_element_post_message (GST_ELEMENT_CAST (demux),
+            gst_message_new_segment_done (GST_OBJECT_CAST (demux),
+                GST_FORMAT_BYTES, stop));
+      } else {
+        push_eos = TRUE;
+      }
+    } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
+      /* for fatal errors we post an error message */
+      GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
+          ("Stream stopped, reason %s", reason));
+      push_eos = TRUE;
+    }
+    if (push_eos) {
+      /* send EOS, and prevent hanging if no streams yet */
+      GST_LOG_OBJECT (demux, "Sending EOS, at end of stream");
+      gst_pad_push_event (demux->priv->srcpad, gst_event_new_eos ());
+    }
+    return;
   }
+}
 
-done_activate:
+static gboolean
+gst_tag_demux_sink_activate_mode (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active)
+{
+  gboolean res;
 
-  if (caps)
-    gst_caps_unref (caps);
+  switch (mode) {
+    case GST_PAD_MODE_PULL:
+      if (active)
+        res = TRUE;
+      else
+        res = gst_pad_stop_task (pad);
+      break;
+    default:
+      res = TRUE;
+      break;
+  }
+  return res;
+}
 
-  return ret;
+static gboolean
+gst_tag_demux_sink_activate (GstPad * sinkpad, GstObject * parent)
+{
+  GstTagDemux *demux;
+  GstQuery *query;
+  gboolean pull_mode;
+
+  demux = GST_TAG_DEMUX (parent);
+
+  /* 1: */
+  /* If we can activate pull_range upstream, then read any end and start
+   * tags, otherwise activate in push mode and the chain function will 
+   * collect buffers, read the start tag and output a buffer to end
+   * preroll.
+   */
+  query = gst_query_new_scheduling ();
+
+  if (!gst_pad_peer_query (sinkpad, query)) {
+    gst_query_unref (query);
+    goto activate_push;
+  }
+
+  pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
+  gst_query_unref (query);
+
+  if (!pull_mode)
+    goto activate_push;
+
+  if (!gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, TRUE))
+    goto activate_push;
+
+  /* only start our task if we ourselves decide to start in pull mode */
+  return gst_pad_start_task (sinkpad,
+      (GstTaskFunction) gst_tag_demux_element_loop, demux);
 
 activate_push:
   {
@@ -1230,14 +1313,18 @@ static gboolean
 gst_tag_demux_src_activate_mode (GstPad * pad, GstObject * parent,
     GstPadMode mode, gboolean active)
 {
-  gboolean res = TRUE;
+  gboolean res;
   GstTagDemux *demux = GST_TAG_DEMUX (parent);
 
   switch (mode) {
     case GST_PAD_MODE_PULL:
+      /* make sure our task stops pushing, we can't call _stop here
+       * because this activation might happen from the streaming thread. */
+      gst_pad_pause_task (demux->priv->sinkpad);
       res = gst_pad_activate_mode (demux->priv->sinkpad, mode, active);
       break;
     default:
+      res = TRUE;
       break;
   }
   return res;
@@ -1392,8 +1479,7 @@ gst_tag_demux_send_pending_events (GstTagDemux * demux)
   while (events != NULL) {
     GST_DEBUG_OBJECT (demux->priv->srcpad, "sending cached %s event: %"
         GST_PTR_FORMAT, GST_EVENT_TYPE_NAME (events->data), events->data);
-    gst_pad_event_default (demux->priv->sinkpad, GST_OBJECT (demux),
-        GST_EVENT (events->data));
+    gst_pad_push_event (demux->priv->srcpad, GST_EVENT (events->data));
     events = g_list_delete_link (events, events);
   }
 }