typefind: Do typefinding from a separate thread and not from the state change function
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Fri, 27 Jan 2012 15:18:00 +0000 (16:18 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Fri, 27 Jan 2012 15:19:33 +0000 (16:19 +0100)
plugins/elements/gsttypefindelement.c
plugins/elements/gsttypefindelement.h

index 1f7e03a..7134efb 100644 (file)
@@ -157,8 +157,10 @@ static GstFlowReturn gst_type_find_element_getrange (GstPad * srcpad,
 static GstStateChangeReturn
 gst_type_find_element_change_state (GstElement * element,
     GstStateChange transition);
-static gboolean gst_type_find_element_activate (GstPad * pad,
+static gboolean gst_type_find_element_activate_sink (GstPad * pad,
     GstObject * parent);
+static gboolean gst_type_find_element_activate_sink_mode (GstPad * pad,
+    GstObject * parent, GstPadMode mode, gboolean active);
 static gboolean gst_type_find_element_activate_src_mode (GstPad * pad,
     GstObject * parent, GstPadMode mode, gboolean active);
 static GstFlowReturn
@@ -167,6 +169,8 @@ gst_type_find_element_chain_do_typefinding (GstTypeFindElement * typefind,
 static void gst_type_find_element_send_cached_events (GstTypeFindElement *
     typefind);
 
+static void gst_type_find_element_loop (GstPad * pad);
+
 static guint gst_type_find_element_signals[LAST_SIGNAL] = { 0 };
 
 static void
@@ -260,7 +264,9 @@ gst_type_find_element_init (GstTypeFindElement * typefind)
       "sink");
 
   gst_pad_set_activate_function (typefind->sink,
-      GST_DEBUG_FUNCPTR (gst_type_find_element_activate));
+      GST_DEBUG_FUNCPTR (gst_type_find_element_activate_sink));
+  gst_pad_set_activatemode_function (typefind->sink,
+      GST_DEBUG_FUNCPTR (gst_type_find_element_activate_sink_mode));
   gst_pad_set_chain_function (typefind->sink,
       GST_DEBUG_FUNCPTR (gst_type_find_element_chain));
   gst_pad_set_event_function (typefind->sink,
@@ -410,21 +416,85 @@ out:
   return res;
 }
 
-#if 0
-static const GstEventMask *
-gst_type_find_element_src_event_mask (GstPad * pad)
+static gboolean
+gst_type_find_element_seek (GstTypeFindElement * typefind, GstEvent * event)
 {
-  static const GstEventMask mask[] = {
-    {GST_EVENT_SEEK,
-        GST_SEEK_METHOD_SET | GST_SEEK_METHOD_CUR | GST_SEEK_METHOD_END |
-          GST_SEEK_FLAG_FLUSH},
-    /* add more if you want, event masks suck and need to die anyway */
-    {0,}
-  };
-
-  return mask;
+  GstSeekFlags flags;
+  GstSeekType cur_type, stop_type;
+  GstFormat format;
+  gboolean flush;
+  gdouble rate;
+  gint64 cur, stop;
+  GstSegment seeksegment = { 0, };
+
+  gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
+      &stop_type, &stop);
+
+  /* we can only seek on bytes */
+  if (format != GST_FORMAT_BYTES) {
+    GST_DEBUG_OBJECT (typefind, "Can only seek on BYTES");
+    return FALSE;
+  }
+
+  /* copy segment, we need this because we still need the old
+   * segment when we close the current segment. */
+  memcpy (&seeksegment, &typefind->segment, sizeof (GstSegment));
+
+  GST_DEBUG_OBJECT (typefind, "configuring seek");
+  gst_segment_do_seek (&seeksegment, rate, format, flags,
+      cur_type, cur, stop_type, stop, NULL);
+
+  flush = ! !(flags & GST_SEEK_FLAG_FLUSH);
+
+  GST_DEBUG_OBJECT (typefind, "New segment %" GST_SEGMENT_FORMAT, &seeksegment);
+
+  if (flush) {
+    GST_DEBUG_OBJECT (typefind, "Starting flush");
+    gst_pad_push_event (typefind->sink, gst_event_new_flush_start ());
+    gst_pad_push_event (typefind->src, gst_event_new_flush_start ());
+  } else {
+    GST_DEBUG_OBJECT (typefind, "Non-flushing seek, pausing task");
+    gst_pad_pause_task (typefind->sink);
+  }
+
+  /* now grab the stream lock so that streaming cannot continue, for
+   * non flushing seeks when the element is in PAUSED this could block
+   * forever. */
+  GST_DEBUG_OBJECT (typefind, "Waiting for streaming to stop");
+  GST_PAD_STREAM_LOCK (typefind->sink);
+
+  if (flush) {
+    GST_DEBUG_OBJECT (typefind, "Stopping flush");
+    gst_pad_push_event (typefind->sink, gst_event_new_flush_stop (TRUE));
+    gst_pad_push_event (typefind->src, gst_event_new_flush_stop (TRUE));
+  }
+
+  /* now update the real segment info */
+  GST_DEBUG_OBJECT (typefind, "Committing new seek segment");
+  memcpy (&typefind->segment, &seeksegment, sizeof (GstSegment));
+  typefind->offset = typefind->segment.start;
+
+  /* notify start of new segment */
+  if (typefind->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+    GstMessage *msg;
+
+    msg = gst_message_new_segment_start (GST_OBJECT (typefind),
+        GST_FORMAT_BYTES, typefind->segment.start);
+    gst_element_post_message (GST_ELEMENT (typefind), msg);
+  }
+
+  typefind->need_segment = TRUE;
+
+  /* restart our task since it might have been stopped when we did the
+   * flush. */
+  gst_pad_start_task (typefind->sink,
+      (GstTaskFunction) gst_type_find_element_loop, typefind->sink);
+
+  /* streaming can continue now */
+  GST_PAD_STREAM_UNLOCK (typefind->sink);
+
+  return TRUE;
 }
-#endif
 
 static gboolean
 gst_type_find_element_src_event (GstPad * pad, GstObject * parent,
@@ -437,7 +507,14 @@ gst_type_find_element_src_event (GstPad * pad, GstObject * parent,
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (event));
     return FALSE;
   }
-  return gst_pad_push_event (typefind->sink, event);
+
+  /* Only handle seeks here if driving the pipeline */
+  if (typefind->segment.format != GST_FORMAT_UNDEFINED &&
+      GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
+    return gst_type_find_element_seek (typefind, event);
+  } else {
+    return gst_pad_push_event (typefind->sink, event);
+  }
 }
 
 static void
@@ -899,61 +976,20 @@ gst_type_find_element_activate_src_mode (GstPad * pad, GstObject * parent,
   return res;
 }
 
-static gboolean
-gst_type_find_element_activate (GstPad * pad, GstObject * parent)
+static void
+gst_type_find_element_loop (GstPad * pad)
 {
-  GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
-  GstCaps *found_caps = NULL;
   GstTypeFindElement *typefind;
-  GstQuery *query;
-  gboolean pull_mode;
+  GstFlowReturn ret = GST_FLOW_OK;
 
-  typefind = GST_TYPE_FIND_ELEMENT (parent);
+  typefind = GST_TYPE_FIND_ELEMENT (GST_PAD_PARENT (pad));
 
-  /* if we have force caps, use those */
-  GST_OBJECT_LOCK (typefind);
-  if (typefind->force_caps) {
-    found_caps = gst_caps_ref (typefind->force_caps);
-    probability = GST_TYPE_FIND_MAXIMUM;
-    GST_OBJECT_UNLOCK (typefind);
-    goto done;
-  }
-  GST_OBJECT_UNLOCK (typefind);
-
-  /* 1. try to activate in pull mode. if not, switch to push and succeed.
-     2. try to pull type find.
-     3. deactivate pull mode.
-     4. src pad might have been activated push by the state change. deactivate.
-     5. if we didn't find any caps, try getting the uri extension by doing an uri
-     query.
-     6. if we didn't find any caps, fail.
-     7. emit have-type; maybe the app connected the source pad to something.
-     8. if the sink pad is activated, we are in pull mode. succeed.
-     otherwise activate both pads in push mode and succeed.
-   */
-
-  /* 1 */
-  query = gst_query_new_scheduling ();
-
-  if (!gst_pad_peer_query (pad, query)) {
-    gst_query_unref (query);
-    goto typefind_push;
-  }
-
-  pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
-  gst_query_unref (query);
-
-  if (!pull_mode)
-    goto typefind_push;
-
-  if (!gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE))
-    goto typefind_push;
-
-  /* 2 */
-  GST_DEBUG_OBJECT (typefind, "find type in pull mode");
-
-  {
+  if (typefind->mode == MODE_TYPEFIND) {
     GstPad *peer;
+    GstCaps *found_caps = NULL;
+    GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
+
+    GST_DEBUG_OBJECT (typefind, "find type in pull mode");
 
     peer = gst_pad_get_peer (pad);
     if (peer) {
@@ -963,8 +999,9 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
       if (!gst_pad_query_duration (peer, GST_FORMAT_BYTES, &size)) {
         GST_WARNING_OBJECT (typefind, "Could not query upstream length!");
         gst_object_unref (peer);
-        gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
-        return FALSE;
+
+        ret = GST_FLOW_ERROR;
+        goto pause;
       }
 
       /* the size if 0, we cannot continue */
@@ -973,8 +1010,8 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
         GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND,
             (_("Stream contains no data.")), ("Can't typefind empty stream"));
         gst_object_unref (peer);
-        gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
-        return FALSE;
+        ret = GST_FLOW_ERROR;
+        goto pause;
       }
       ext = gst_type_find_get_extension (typefind, pad);
 
@@ -989,68 +1026,172 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
 
       gst_object_unref (peer);
     }
-  }
 
-  /* the type find helpers might have triggered setcaps here (due to upstream)
-   * setting caps on buffers, which emits typefound signal and an element
-   * could have been linked and have its pads activated
-   *
-   * If we deactivate the pads in the following steps we might mess up
-   * downstream element. We should prevent that.
-   */
-  if (typefind->mode == MODE_NORMAL) {
-    /* this means we already emitted typefound */
-    GST_DEBUG ("Already managed to typefind !");
-    goto really_done;
+    if (!found_caps || probability < typefind->min_probability) {
+      GST_DEBUG ("Trying to guess using extension");
+      found_caps =
+          gst_type_find_guess_by_extension (typefind, pad, &probability);
+    }
+
+    if (!found_caps || probability < typefind->min_probability) {
+      GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
+      gst_caps_replace (&found_caps, NULL);
+      ret = GST_FLOW_ERROR;
+      goto pause;
+    }
+
+    GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
+    g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
+        0, probability, found_caps);
+    typefind->mode = MODE_NORMAL;
+  } else if (typefind->mode == MODE_NORMAL) {
+    GstBuffer *outbuf;
+
+    if (typefind->need_segment) {
+      typefind->need_segment = FALSE;
+      gst_pad_push_event (typefind->src,
+          gst_event_new_segment (&typefind->segment));
+    }
+
+    /* Pull 4k blocks and send downstream */
+    ret = gst_pad_pull_range (typefind->sink, typefind->offset, 4096, &outbuf);
+    if (ret != GST_FLOW_OK)
+      goto pause;
+
+    typefind->offset += 4096;
+
+    ret = gst_pad_push (typefind->src, outbuf);
+    if (ret != GST_FLOW_OK)
+      goto pause;
+  } else {
+    /* Error out */
+    ret = GST_FLOW_ERROR;
+    goto pause;
   }
 
-  /* 3 */
-  GST_DEBUG ("Deactivate pull mode");
-  gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
+  return;
 
-#if 0
-  /* 4 */
-  GST_DEBUG ("Deactivate push mode mode");
-  gst_pad_activate_mode (typefind->src, GST_PAD_MODE_PUSH, FALSE);
-#endif
+pause:
+  {
+    const gchar *reason = gst_flow_get_name (ret);
+    gboolean push_eos = FALSE;
+
+    GST_LOG_OBJECT (typefind, "pausing task, reason %s", reason);
+    gst_pad_pause_task (typefind->sink);
+
+    if (ret == GST_FLOW_EOS) {
+      /* perform EOS logic */
+
+      if (typefind->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+        gint64 stop;
 
-  /* 5 */
-  if (!found_caps || probability < typefind->min_probability) {
-    GST_DEBUG ("Trying to guess using extension");
-    found_caps = gst_type_find_guess_by_extension (typefind, pad, &probability);
+        /* for segment playback we need to post when (in stream time)
+         * we stopped, this is either stop (when set) or the duration. */
+        if ((stop = typefind->segment.stop) == -1)
+          stop = typefind->offset;
+
+        GST_LOG_OBJECT (typefind, "Sending segment done, at end of segment");
+        gst_element_post_message (GST_ELEMENT (typefind),
+            gst_message_new_segment_done (GST_OBJECT (typefind),
+                GST_FORMAT_BYTES, stop));
+      } else {
+        push_eos = TRUE;
+      }
+    } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
+      /* for fatal errors we post an error message */
+      GST_ELEMENT_ERROR (typefind, STREAM, FAILED, (NULL),
+          ("stream stopped, reason %s", reason));
+      push_eos = TRUE;
+    }
+    if (push_eos) {
+      /* send EOS, and prevent hanging if no streams yet */
+      GST_LOG_OBJECT (typefind, "Sending EOS, at end of stream");
+      gst_pad_push_event (typefind->src, gst_event_new_eos ());
+    }
+    return;
   }
+}
 
-  /* 6 */
-  if (!found_caps || probability < typefind->min_probability) {
-    GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
-    gst_caps_replace (&found_caps, NULL);
-    return FALSE;
+static gboolean
+gst_type_find_element_activate_sink_mode (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active)
+{
+  GstTypeFindElement *typefind;
+
+  typefind = GST_TYPE_FIND_ELEMENT (parent);
+
+  switch (mode) {
+    case GST_PAD_MODE_PULL:
+      if (active) {
+        gst_segment_init (&typefind->segment, GST_FORMAT_BYTES);
+        typefind->need_segment = TRUE;
+        typefind->offset = 0;
+        gst_pad_start_task (pad, (GstTaskFunction) gst_type_find_element_loop,
+            pad);
+      } else {
+        gst_pad_stop_task (pad);
+      }
+      return TRUE;
+      break;
+    case GST_PAD_MODE_PUSH:
+      if (active)
+        start_typefinding (typefind);
+      else
+        stop_typefinding (typefind);
+
+      return TRUE;
+      break;
+    default:
+      return FALSE;
   }
+}
 
-done:
-  /* 7 */
-  GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
-  g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
-      0, probability, found_caps);
-  typefind->mode = MODE_NORMAL;
+static gboolean
+gst_type_find_element_activate_sink (GstPad * pad, GstObject * parent)
+{
+  GstTypeFindElement *typefind;
+  GstQuery *query;
+  gboolean pull_mode;
+  GstCaps *found_caps = NULL;
+  GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
 
-really_done:
-  gst_caps_unref (found_caps);
+  typefind = GST_TYPE_FIND_ELEMENT (parent);
 
-  /* 8 */
-  if (gst_pad_is_active (pad))
-    return TRUE;
-  else {
-    gboolean ret;
+  /* if we have force caps, use those */
+  GST_OBJECT_LOCK (typefind);
+  if (typefind->force_caps) {
+    found_caps = gst_caps_ref (typefind->force_caps);
+    probability = GST_TYPE_FIND_MAXIMUM;
+    GST_OBJECT_UNLOCK (typefind);
 
-    GST_DEBUG ("Activating in push mode");
-    ret = gst_pad_activate_mode (typefind->src, GST_PAD_MODE_PUSH, TRUE);
-    ret &= gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
-    return ret;
+    GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
+    g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
+        0, probability, found_caps);
+    typefind->mode = MODE_NORMAL;
+    goto typefind_push;
   }
+  GST_OBJECT_UNLOCK (typefind);
+
+  query = gst_query_new_scheduling ();
+
+  if (!gst_pad_peer_query (pad, query)) {
+    gst_query_unref (query);
+    goto typefind_push;
+  }
+
+  pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
+  gst_query_unref (query);
+
+  if (!pull_mode)
+    goto typefind_push;
+
+  if (!gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE))
+    goto typefind_push;
+
+  return TRUE;
+
 typefind_push:
   {
-    start_typefinding (typefind);
     return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
   }
 }
index 5c34758..4a36369 100644 (file)
@@ -59,6 +59,11 @@ struct _GstTypeFindElement {
 
   GList *               cached_events;
   GstCaps *             force_caps;
+
+  /* Only used when driving the pipeline */
+  gboolean need_segment;
+  GstSegment segment;
+  guint64 offset;
 };
 
 struct _GstTypeFindElementClass {